Source code for fiabilipy.system

#!/usr/bin/env python2
# -*- coding: utf-8 -*-

#Copyright (C) 2013 Chabot Simon, Sadaoui Akim

#This program is free software; you can redistribute it and/or modify
#it under the terms of the GNU General Public License as published by
#the Free Software Foundation; either version 2 of the License, or
#(at your option) any later version.

#This program is distributed in the hope that it will be useful,
#but WITHOUT ANY WARRANTY; without even the implied warranty of
#MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
#GNU General Public License for more details.

#You should have received a copy of the GNU General Public License along
#with this program; if not, write to the Free Software Foundation, Inc.,
#51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.

r""" Reliability system design and computation

This module gives classes and functions to design complex systems and
compute some metrics, such as the reliability, the availability, the
Mean-Time-To-Failure, and so on.

"""

from numpy import empty, ones, delete
from sympy import exp, Symbol, oo
from scipy.special import binom
from itertools import combinations, chain
from collections import Iterable
import networkx as nx

from fiabilipy import Component

__all__ = ['System']

ALLSUBSETS = lambda n: (chain(*[combinations(range(n), ni)
                        for ni in xrange(n+1)]))


[docs]class System(object): r""" Describe a system with different components. The components are linked together thanks to a reliability diagram. This reliability diagram is represented by a graph. This graph *must* have two special nodes called `E` and `S`. `E` represents the start of the system and `S` its end (names stand for “Entrée” (start) and “Sortie” (end) in French). Examples -------- Let’s have a look to the following system:: | -- C0 -- | E -- | | -- C2 -- S | -- C1 -- | Thus, to represent such a system, you must create the three components C0, C1 and C2 and link them. >>> C = [Component(i, 1e-4) for i in xrange(3)] >>> S = System() >>> S['E'] = [C[0], C[1]] >>> S[C[0]] = [C[2]] >>> S[C[1]] = [C[2]] >>> S[C[2]] = ['S'] So, you can use the `System` object as a simple python dictionnary where each key is a component and the value associated it the list of the component’s successors. """ def __init__(self, graph=None): self._graph = nx.DiGraph(graph) self._cache = {} self._t = Symbol('t', positive=True) def __getitem__(self, component): return self._graph[component] def __setitem__(self, component, successors): #Let’s do different checks before inserting the element if not isinstance(successors, Iterable): if not isinstance(successors, Component): msg = u'successors must be a list of components, a component ' raise ValueError(msg) successors = [successors] if component != 'E' and 'E' not in self._graph: msg = u"'E' must be the first inserted component" raise ValueError(msg) for successor in successors: if successor != 'S': successor._systems.add(self) self._graph.add_edge(component, successor) #reset the cache self._cache = {} def __delitem__(self, component): for c in self._graph: try: self._graph.remove_edge(c, component) except nx.NetworkXError: #i.e. edge(c, component) does not exist pass except AttributeError: assert self._graph[c] == 'S' self._graph.remove_node(component) if component not in self.components: component._systems.remove(self) #reset the cache self._cache = {} def __len__(self): return len(self._graph) def __repr__(self): return u'I\'m a system'
[docs] def copy(self): r""" Return a copy of the system. Returns ------- out: System A copy of the current system Notes ----- The components are the same (same reference). Only the internal graph is new """ _copy = System() _copy['E'] = [] #'E' must be the first inserted component for c in self._graph: _copy[c] = self[c][:] return _copy
@property def components(self): r""" The list of the components used by the system Returns ------- out: list the list of the components used by the system, except `E` and `S` """ return [comp for comp in self._graph if comp not in ('E', 'S')] def _probabilitiescomputation(self, t, method): """ Given a system and a `method` (either availability or maintainability or reliability), this method evaluates the asking value by exploring the graph at time `t`. """ #TODO : improve complexity ? # n # P(U a_i) = sum (-1)^{-1+|s|} P(^a_i) # i=1 s\in[1,n], i\in s # s != {} # paths = self.successpaths R = 0.0 for S in ALLSUBSETS(len(paths)): if not S: continue comps = set([c for i in S for c in paths[i][1:-1]]) r = reduce(lambda x, y:x*getattr(y, method)(t), comps, 1) R += -r if len(S) % 2 == 0 else r return R
[docs] def availability(self, t): r""" Compute the availability of the whole system This method compute the availability of the system at `t`. Parameters ---------- t : float or Symbol Returns ------- out : float or symbolic expression The availability calculated for the given `t` Examples -------- >>> motor = Component('M', 1e-4, 3e-2) >>> power = Component('P', 1e-6, 2e-4) >>> t = Symbol('t', positive=True) >>> S = System() >>> S['E'] = [power] >>> S[power] = [motor] >>> S[motor] = 'S' >>> S.availability(t) #doctest: +NORMALIZE_WHITESPACE (200/201 + exp(-201*t/1000000)/201)*(300/301 + exp(-301*t/10000)/301) >>> S.availability(1000) 0.995774842225189 """ try: formula = self._cache['availability'] except KeyError: formula = self._probabilitiescomputation(self._t, 'availability') self._cache['availability'] = formula if isinstance(t, Symbol): return formula.nsimplify() else: return formula.subs(self._t, t).evalf()
[docs] def reliability(self, t): r""" Compute the reliability of the whole system This method compute the reliability of the system at `t`. Parameters ---------- t : float or Symbol Returns ------- out : float or symbolic expression The reliability calculated for the given `t` Examples -------- >>> motor = Component('M', 1e-4, 3e-2) >>> power = Component('P', 1e-6, 2e-4) >>> t = Symbol('t', positive=True) >>> S = System() >>> S['E'] = [power] >>> S[power] = [motor] >>> S[motor] = 'S' >>> S.reliability(t) exp(-101*t/1000000) >>> S.reliability(1000) 0.903933032885864 """ try: formula = self._cache['reliability'] except KeyError: formula = self._probabilitiescomputation(self._t, 'reliability') self._cache['reliability'] = formula if isinstance(t, Symbol): return formula.nsimplify() else: return formula.subs(self._t, t).evalf()
[docs] def maintainability(self, t): r""" Compute the maintainability of the whole system This method compute the maintainability of the system at `t`. Parameters ---------- t : float or Symbol Returns ------- out : float or symbolic expression The maintainability calculated for the given `t` Examples -------- >>> motor = Component('M', 1e-4, 3e-2) >>> power = Component('P', 1e-6, 2e-4) >>> t = Symbol('t', positive=True) >>> S = System() >>> S['E'] = [power] >>> S[power] = [motor] >>> S[motor] = 'S' >>> S.maintainability(t) (1 - exp(-3*t/100))*(1 - exp(-t/5000)) >>> S.maintainability(1000) 0.181269246922001 """ try: formula = self._cache['maintainability'] except KeyError: formula = self._probabilitiescomputation(self._t, 'maintainability') self._cache['maintainability'] = formula if isinstance(t, Symbol): return formula.nsimplify() else: return formula.subs(self._t, t).evalf()
@property def mttf(self): r""" Compute the Mean-Time-To-Failure of the system The MTTF is defined as : :math:`MTTF = \int_{0}^{\infty} R(t)dt` Returns ------- out : float The system MTTF Examples -------- >>> motor = Component('M', 1e-4, 3e-2) >>> power = Component('P', 1e-6, 2e-4) >>> S = System() >>> S['E'] = [power] >>> S[power] = [motor] >>> S[motor] = 'S' >>> S.mttf 1000000/101 """ try: return self._cache['mttf'] except KeyError: t = Symbol('t', positive=True) self._cache['mttf'] = self.reliability(t).integrate((t, 0, oo)) return self._cache['mttf'] @property def mttr(self): r""" Compute the Mean-Time-To-Repair of the system The MTTR is defined as : :math:`MTTF = \int_{0}^{\infty} 1 - M(t)dt` Returns ------- out : float The system MTTR Examples -------- >>> motor = Component('M', 1e-4, 3e-2) >>> power = Component('P', 1e-6, 2e-4) >>> S = System() >>> S['E'] = [power] >>> S[power] = [motor] >>> S[motor] = 'S' >>> S.mttr 2265100/453 """ try: return self._cache['mttr'] except KeyError: t = Symbol('t', positive=True) mttr = (1 - self.maintainability(t)).integrate((t, 0, oo)) self._cache['mttr'] = mttr return self._cache['mttr'] @property def successpaths(self): r""" Return all the success paths of the reliability diagram A success path is defined as a path from 'E' to 'S'. Returns ------- out : list of paths the list of all the success paths. A path, is defined as a list of components Examples -------- >>> motor = Component('M', 1e-4, 3e-2) >>> powers = [Component('P{}'.format(i), 1e-6, 2e-4) for i in (0,1)] >>> S = System() >>> S['E'] = [powers[0], powers[1]] >>> S[powers[0]] = S[powers[1]] = [motor] >>> S[motor] = 'S' >>> S.successpaths #doctest: +NORMALIZE_WHITESPACE [['E', Component(P0), Component(M), 'S'], ['E', Component(P1), Component(M), 'S']] """ try: return self._cache['successpaths'] except KeyError: self._cache['successpaths'] = list(self.findallpaths('E', 'S')) return self._cache['successpaths']
[docs] def findallpaths(self, start='E', end='S'): r""" Find all paths between two components in the reliability diagram Parameters ---------- start : Component, optional find paths from this component end : Component, optional find paths to this component Returns ------- out : iterator an iterator on the paths from `start` to `stop` Examples -------- >>> motor = Component('M', 1e-4, 3e-2) >>> powers = [Component('P{}'.format(i), 1e-6, 2e-4) for i in (0,1)] >>> S = System() >>> S['E'] = [powers[0], powers[1]] >>> S[powers[0]] = S[powers[1]] = [motor] >>> S[motor] = 'S' >>> list(S.findallpaths(start=powers[0])) #doctest: +NORMALIZE_WHITESPACE [[Component(P0), Component(M), 'S']] """ return nx.all_simple_paths(self._graph, start, end)
[docs] def minimalcuts(self, order=1): r""" List the minimal cuts of the system of order <= `order` A minimal cut of order :math:`n`, is a set of :math:`n` components, such as if there all unavailable, the whole system is unavailable. This function aims to find out every minimal cuts of order inferior to `order`. Parameters ---------- order : int, optional The maximal order to look for. Returns ------- out : list of frozensets each frozenset contains the components that constitute a minimal cut Examples -------- >>> motor = Component('M', 1e-4, 3e-2) >>> powers = [Component('P{}'.format(i), 1e-6, 2e-4) for i in (0,1)] >>> S = System() >>> S['E'] = [powers[0], powers[1]] >>> S[powers[0]] = S[powers[1]] = [motor] >>> S[motor] = 'S' >>> S.minimalcuts(order=1) #doctest: +ELLIPSIS [frozenset(...)] >>> S.minimalcuts(order=2) #doctest: +ELLIPSIS [frozenset(...), frozenset(...)] """ paths = self.successpaths incidence = empty((len(paths), len(self.components))) for path in xrange(len(paths)): for comp in xrange(len(self.components)): if self.components[comp] in paths[path]: incidence[path, comp] = 1 else: incidence[path, comp] = 0 pairs = list(self.components) minimal = [] for k in xrange(1, order+1): if incidence.shape[1] == 0: #No more minimalcuts break #Let’s looking for column of ones vones = ones(len(paths)) indicetodelete = [] for comp in xrange(len(pairs)): if (incidence[:, comp] == vones).all(): if isinstance(pairs[comp], frozenset): minimal.append(pairs[comp]) else: minimal.append(frozenset([pairs[comp]])) indicetodelete.append(comp) if k >= order: #so it’s useless to compute newpairs and the new incidence #matrix because they won’t be used anymore. continue incidence = delete(incidence, indicetodelete, axis=1) pairs = [p for i, p in enumerate(pairs) if i not in indicetodelete] newpairs = list(combinations(range(len(pairs)), k+1)) incidence_ = empty((len(paths), len(newpairs))) for x in xrange(incidence_.shape[0]): for y in xrange(incidence_.shape[1]): value = 0 for comp in newpairs[y]: if incidence[x, comp]: value = 1 break incidence_[x, y] = value incidence = incidence_ pairs = [frozenset([pairs[x] for x in p]) for p in newpairs] return minimal
[docs] def faulttreeanalysis(self, output=None, order=2): r""" Build the fault tree analysis of the system Print (or write) the content of the dot file needed to draw the fault tree of the system. Parameters ---------- output : file-like object, optional If `output` is given, then the content is written into this file. `output` *must* have a :py:meth:`write` method. order : int, optional This is the maximum order of the minimal cuts the function looks for. Notes ----- Please, see the `Graphviz <http://graphviz.org/>` website for more information about how to transform the ouput code into a nice picture. """ #TODO the tree needs to be simplified cuts = self.minimalcuts(order) data = ['digraph G {'] data.append('\t"not_S" -> "or"') for i, cut in enumerate(cuts): data.append('\tor -> and_%s' % i) for comp in cut: data.append('\tand_%s -> "%s"' % (i, comp.name)) data.append('}') if not output: print('\n'.join(data)) else: try: output.write('\n'.join(data) + '\n') except AttributeError: with open(output, 'w') as fobj: fobj.write('\n'.join(data) + '\n')
[docs] def draw(self): r""" Draw the system Draw the system with graphviz. Examples -------- >>> import pylab as p >>> motor = Component('M', 1e-4, 3e-2) >>> powers = [Component('P{}'.format(i), 1e-6, 2e-4) for i in (0,1)] >>> S = System() >>> S['E'] = [powers[0], powers[1]] >>> S[powers[0]] = S[powers[1]] = [motor] >>> S[motor] = 'S' >>> S.draw() >>> p.show() """ nx.draw_graphviz(self._graph)