Source code for pysb.integrate

import pysb.core
import pysb.bng
import numpy
import scipy.integrate
import code
try:
    # weave is not available under Python 3.
    from scipy.weave import inline as weave_inline
    import scipy.weave.build_tools
except ImportError:
    weave_inline = None

import distutils.errors
import sympy
import re
import itertools
import warnings
try:
    from future_builtins import zip
except ImportError:
    pass

def _exec(code, locals):
    # This is function call under Python 3, and a statement with a
    # tuple under Python 2. The effect should be the same.
    exec(code, locals)

# some sane default options for a few well-known integrators
default_integrator_options = {
    'vode': {
        'method': 'bdf',
        'with_jacobian': True,
        # Set nsteps as high as possible to give our users flexibility in
        # choosing their time step. (Let's be safe and assume vode was compiled
        # with 32-bit ints. What would actually happen if it was and we passed
        # 2**64-1 though?)
        'nsteps': 2**31 - 1,
        },
    'cvode': {
        'method': 'bdf',
        'iteration': 'newton',
        },
    }

[docs]class Solver(object): """An interface for numeric integration of models. Parameters ---------- model : pysb.Model Model to integrate. tspan : vector-like Time values over which to integrate. The first and last values define the time range, and the returned trajectories will be sampled at every value. use_analytic_jacobian : boolean, optional Whether to provide the solver a Jacobian matrix derived analytically from the model ODEs. Defaults to False. If False, the integrator may approximate the Jacobian by finite-differences calculations when necessary (depending on the integrator and settings). integrator : string, optional (default: 'vode') Name of the integrator to use, taken from the list of integrators known to :py:class:`scipy.integrate.ode`. cleanup : bool, optional If True (default), delete the temporary files after the simulation is finished. If False, leave them in place. Useful for debugging. verbose : bool, optional (default: False) Verbose output integrator_options Additional parameters for the integrator. Attributes ---------- model : pysb.Model Model passed to the constructor tspan : vector-like Time values passed to the constructor. y : numpy.ndarray Species trajectories. Dimensionality is ``(len(tspan), len(model.species))``. yobs : numpy.ndarray with record-style data-type Observable trajectories. Length is ``len(tspan)`` and record names follow ``model.observables`` names. yobs_view : numpy.ndarray An array view (sharing the same data buffer) on ``yobs``. Dimensionality is ``(len(tspan), len(model.observables))``. yexpr : numpy.ndarray with record-style data-type Expression trajectories. Length is ``len(tspan)`` and record names follow ``model.expressions_dynamic()`` names. yexpr_view : numpy.ndarray An array view (sharing the same data buffer) on ``yexpr``. Dimensionality is ``(len(tspan), len(model.expressions_dynamic()))``. integrator : scipy.integrate.ode Integrator object. Notes ----- The expensive step of generating the code for the right-hand side of the model's ODEs is performed during initialization. If you need to integrate the same model repeatedly with different parameters then you should build a single Solver object and then call its ``run`` method as needed. """ @staticmethod def _test_inline(): """Detect whether scipy.weave.inline is functional.""" if not hasattr(Solver, '_use_inline'): Solver._use_inline = False try: if weave_inline is not None: weave_inline('int i=0; i=i;', force=1) Solver._use_inline = True except (scipy.weave.build_tools.CompileError, distutils.errors.CompileError, ImportError): pass def __init__(self, model, tspan, use_analytic_jacobian=False, integrator='vode', cleanup=True, verbose=False, **integrator_options): self.verbose = verbose self.model = model self.tspan = tspan # We'll need to know if we're using the Jacobian when we get to run() self._use_analytic_jacobian = use_analytic_jacobian # Generate the equations for the model pysb.bng.generate_equations(self.model, cleanup, self.verbose) def eqn_substitutions(eqns): """String substitutions on the sympy C code for the ODE RHS and Jacobian functions to use appropriate terms for variables and parameters.""" # Substitute expanded parameter formulas for any named expressions for e in self.model.expressions: eqns = re.sub(r'\b(%s)\b' % e.name, '('+sympy.ccode( e.expand_expr())+')', eqns) # Substitute sums of observable species that could've been added # by expressions for obs in self.model.observables: obs_string = '' for i in range(len(obs.coefficients)): if i > 0: obs_string += "+" if obs.coefficients[i] > 1: obs_string += str(obs.coefficients[i])+"*" obs_string += "__s"+str(obs.species[i]) if len(obs.coefficients) > 1: obs_string = '(' + obs_string + ')' eqns = re.sub(r'\b(%s)\b' % obs.name, obs_string, eqns) # Substitute 'y[i]' for 'si' eqns = re.sub(r'\b__s(\d+)\b', lambda m: 'y[%s]' % (int(m.group(1))), eqns) # Substitute 'p[i]' for any named parameters for i, p in enumerate(self.model.parameters): eqns = re.sub(r'\b(%s)\b' % p.name, 'p[%d]' % i, eqns) return eqns # ODE RHS ----------------------------------------------- # Prepare the string representations of the RHS equations code_eqs = '\n'.join(['ydot[%d] = %s;' % (i, sympy.ccode(self.model.odes[i])) for i in range(len(self.model.odes))]) code_eqs = eqn_substitutions(code_eqs) Solver._test_inline() # If we can't use weave.inline to run the C code, compile it as Python code instead for use with # exec. Note: C code with array indexing, basic math operations, and pow() just happens to also # be valid Python. If the equations ever have more complex things in them, this might fail. if not Solver._use_inline: code_eqs_py = compile(code_eqs, '<%s odes>' % self.model.name, 'exec') else: for arr_name in ('ydot', 'y', 'p'): macro = arr_name.upper() + '1' code_eqs = re.sub(r'\b%s\[(\d+)\]' % arr_name,'%s(\\1)' % macro, code_eqs) def rhs(t, y, p): ydot = self.ydot # note that the evaluated code sets ydot as a side effect if Solver._use_inline: weave_inline(code_eqs, ['ydot', 't', 'y', 'p']); else: _exec(code_eqs_py, locals()) return ydot # JACOBIAN ----------------------------------------------- # We'll keep the code for putting together the matrix in Sympy # in case we want to do manipulations of the matrix later (e.g., to # put together the sensitivity matrix) jac_fn = None if self._use_analytic_jacobian: species_names = ['__s%d' % i for i in range(len(self.model.species))] jac_matrix = [] # Rows of jac_matrix are by equation f_i: # [[df1/x1, df1/x2, ..., df1/xn], # [ ... ], # [dfn/x1, dfn/x2, ..., dfn/xn], for eqn in self.model.odes: # Derivatives for f_i... jac_row = [] for species_name in species_names: # ... with respect to s_j d = sympy.diff(eqn, species_name) jac_row.append(d) jac_matrix.append(jac_row) # Next, prepare the stringified Jacobian equations jac_eqs_list = [] for i, row in enumerate(jac_matrix): for j, entry in enumerate(row): # Skip zero entries in the Jacobian if entry == 0: continue jac_eq_str = 'jac[%d, %d] = %s;' % (i, j, sympy.ccode(entry)) jac_eqs_list.append(jac_eq_str) jac_eqs = eqn_substitutions('\n'.join(jac_eqs_list)) # Try to inline the Jacobian if possible (as above for RHS) if not Solver._use_inline: jac_eqs_py = compile(jac_eqs, '<%s jacobian>' % self.model.name, 'exec') else: # Substitute array refs with calls to the JAC1 macro for inline jac_eqs = re.sub(r'\bjac\[(\d+), (\d+)\]', r'JAC2(\1, \2)', jac_eqs) # Substitute calls to the Y1 and P1 macros for arr_name in ('y', 'p'): macro = arr_name.upper() + '1' jac_eqs = re.sub(r'\b%s\[(\d+)\]' % arr_name, '%s(\\1)' % macro, jac_eqs) def jacobian(t, y, p): jac = self.jac # note that the evaluated code sets jac as a side effect if Solver._use_inline: weave_inline(jac_eqs, ['jac', 't', 'y', 'p']); else: _exec(jac_eqs_py, locals()) return jac # Initialize the jacobian argument to None if we're not going to use it # jac = self.jac as defined in jacobian() earlier # Initialization of matrix for storing the Jacobian self.jac = numpy.zeros((len(self.model.odes), len(self.model.species))) jac_fn = jacobian # build integrator options list from our defaults and any kwargs passed to this function options = {} if default_integrator_options.get(integrator): options.update(default_integrator_options[integrator]) # default options options.update(integrator_options) # overwrite defaults self.opts = options self.y = numpy.ndarray((len(self.tspan), len(self.model.species))) # species concentrations self.ydot = numpy.ndarray(len(self.model.species)) # Initialize record array for observable timecourses if len(self.model.observables): self.yobs = numpy.ndarray(len(self.tspan), list(zip(self.model.observables.keys(), itertools.repeat(float)))) else: self.yobs = numpy.ndarray((len(self.tspan), 0)) self.yobs_view = self.yobs.view(float).reshape(len(self.yobs), -1) # Initialize array for expression timecourses exprs = self.model.expressions_dynamic() if len(exprs): self.yexpr = numpy.ndarray(len(self.tspan), list(zip(exprs.keys(), itertools.repeat( float)))) else: self.yexpr = numpy.ndarray((len(self.tspan), 0)) self.yexpr_view = self.yexpr.view(float).reshape(len(self.yexpr), -1) # Integrator if integrator == 'lsoda': # lsoda is accessed via scipy.integrate.odeint which, as a function, # requires that we pass its args at the point of call. Thus we need # to stash stuff like the rhs and jacobian functions in self so we # can pass them in later. self.integrator = integrator # lsoda's rhs and jacobian function arguments are in a different # order to other integrators, so we define these shims that swizzle # the argument order appropriately. self.func = lambda t, y, p: rhs(y, t, p) if jac_fn is None: self.jac_fn = None else: self.jac_fn = lambda t, y, p: jac_fn(y, t, p) else: # The scipy.integrate.ode integrators on the other hand are object # oriented and hold the functions and such internally. Once we set # up the integrator object we only need to retain a reference to it # and can forget about the other bits. self.integrator = scipy.integrate.ode(rhs, jac=jac_fn) with warnings.catch_warnings(): warnings.filterwarnings('error', 'No integrator name match') self.integrator.set_integrator(integrator, **options)
[docs] def run(self, param_values=None, y0=None): """Perform an integration. Returns nothing; access the Solver object's ``y``, ``yobs``, or ``yobs_view`` attributes to retrieve the results. Parameters ---------- param_values : vector-like or dictionary, optional Values to use for every parameter in the model. Ordering is determined by the order of model.parameters. If passed as a dictionary, keys must be parameter names. If not specified, parameter values will be taken directly from model.parameters. y0 : vector-like, optional Values to use for the initial condition of all species. Ordering is determined by the order of model.species. If not specified, initial conditions will be taken from model.initial_conditions (with initial condition parameter values taken from `param_values` if specified). """ if param_values is not None and not isinstance(param_values, dict): # accept vector of parameter values as an argument if len(param_values) != len(self.model.parameters): raise ValueError("param_values must be the same length as " "model.parameters") if not isinstance(param_values, numpy.ndarray): param_values = numpy.array(param_values) else: # create parameter vector from the values in the model param_values_dict = param_values if isinstance(param_values, dict) else {} param_values = numpy.array([p.value for p in self.model.parameters]) for key in param_values_dict.keys(): try: pi = self.model.parameters.index(self.model.parameters[key]) except KeyError: raise IndexError("param_values dictionary has unknown " "parameter name (%s)" % key) param_values[pi] = param_values_dict[key] # The substitution dict must have Symbols as keys, not strings subs = dict((p, param_values[i]) for i, p in enumerate(self.model.parameters)) if y0 is not None: # check if y0 is a dict (not supported yet) if isinstance(y0, dict): raise NotImplementedError # accept vector of species amounts as an argument if len(y0) != self.y.shape[1]: raise ValueError("y0 must be the same length as model.species") if not isinstance(y0, numpy.ndarray): y0 = numpy.array(y0) else: y0 = numpy.zeros((self.y.shape[1],)) for cp, value_obj in self.model.initial_conditions: if value_obj in self.model.parameters: pi = self.model.parameters.index(value_obj) value = param_values[pi] elif value_obj in self.model.expressions: value = value_obj.expand_expr().evalf(subs=subs) else: raise ValueError("Unexpected initial condition value type") si = self.model.get_species_index(cp) if si is None: raise IndexError("Species not found in model: %s" % repr(cp)) y0[si] = value if self.integrator == 'lsoda': self.y = scipy.integrate.odeint(self.func, y0, self.tspan, Dfun=self.jac_fn, args=(param_values,), **self.opts) else: # perform the actual integration self.integrator.set_initial_value(y0, self.tspan[0]) # Set parameter vectors for RHS func and Jacobian self.integrator.set_f_params(param_values) if self._use_analytic_jacobian: self.integrator.set_jac_params(param_values) self.y[0] = y0 i = 1 if self.verbose: print("Integrating...") print("\tTime") print("\t----") print("\t%g" % self.integrator.t) while self.integrator.successful() and self.integrator.t < self.tspan[-1]: self.y[i] = self.integrator.integrate(self.tspan[i]) # integration i += 1 ###### # self.integrator.integrate(self.tspan[i],step=True) # if self.integrator.t >= self.tspan[i]: i += 1 ###### if self.verbose: print("\t%g" % self.integrator.t) if self.verbose: print("...Done.") if self.integrator.t < self.tspan[-1]: self.y[i:, :] = 'nan' # calculate observables for i, obs in enumerate(self.model.observables): self.yobs_view[:, i] = (self.y[:, obs.species] * obs.coefficients).sum(1) # calculate expressions obs_names = self.model.observables.keys() obs_dict = dict((k, self.yobs[k]) for k in obs_names) for expr in self.model.expressions_dynamic(): expr_subs = expr.expand_expr().subs(subs) func = sympy.lambdify(obs_names, expr_subs, "numpy") self.yexpr[expr.name] = func(**obs_dict)
[docs]def odesolve(model, tspan, param_values=None, y0=None, integrator='vode', cleanup=True, verbose=False, **integrator_options): """Integrate a model's ODEs over a given timespan. This is a simple function-based interface to integrating (a.k.a. solving or simulating) a model. If you need to integrate a model repeatedly with different parameter values or initial conditions (as in parameter estimation), using the Solver class directly will provide much better performance. Parameters ---------- model : pysb.Model Model to integrate. tspan : vector-like Time values over which to integrate. The first and last values define the time range, and the returned trajectories will be sampled at every value. param_values : vector-like, optional Values to use for every parameter in the model. Ordering is determined by the order of model.parameters. If not specified, parameter values will be taken directly from model.parameters. y0 : vector-like, optional Values to use for the initial condition of all species. Ordering is determined by the order of model.species. If not specified, initial conditions will be taken from model.initial_conditions (with initial condition parameter values taken from `param_values` if specified). integrator : string, optional Name of the integrator to use, taken from the list of integrators known to :py:class:`scipy.integrate.ode`. integrator_params Additional parameters for the integrator. Returns ------- yfull : record array The trajectories calculated by the integration. The first dimension is time and its length is identical to that of `tspan`. The second dimension is species/observables and its length is the sum of the lengths of model.species and model.observables. The dtype of the array specifies field names: '__s0', '__s1', etc. for the species and observable names for the observables. See Notes below for further explanation and caveats. Notes ----- This function was the first implementation of integration support and accordingly it has a few warts: * It performs expensive code generation every time it is called. * The returned array, with its record-style data-type, allows convenient selection of individual columns by their field names, but does not permit slice ranges or indexing by integers for columns. If you only need access to your model's observables this is usually not a problem, but sometimes it's more convenient to have a "regular" array. See Examples below for code to do this. The actual integration code has since been moved to the Solver class and split up such that the code generation is only performed on initialization. The model may then be integrated repeatedly with different parameter values or initial conditions with much better performance. Additionally, Solver makes the species trajectories available as a simple array and only uses the record array for the observables where it makes sense. This function now simply serves as a wrapper for creating a Solver object, calling its ``run`` method, and building the record array to return. Examples -------- Simulate a model and display the results for an observable: >>> from pysb.examples.robertson import model >>> from numpy import linspace >>> numpy.set_printoptions(precision=4) >>> yfull = odesolve(model, linspace(0, 40, 10)) >>> print(yfull['A_total']) #doctest: +NORMALIZE_WHITESPACE [ 1. 0.899 0.8506 0.8179 0.793 0.7728 0.7557 0.7408 0.7277 0.7158] Obtain a view on a returned record array which uses an atomic data-type and integer indexing (note that the view's data buffer is shared with the original array so there is no extra memory cost): >>> print(yfull.shape) (10,) >>> print(yfull.dtype) #doctest: +NORMALIZE_WHITESPACE [('__s0', '<f8'), ('__s1', '<f8'), ('__s2', '<f8'), ('A_total', '<f8'), ('B_total', '<f8'), ('C_total', '<f8')] >>> print(yfull[0:4, 1:3]) #doctest: +ELLIPSIS Traceback (most recent call last): ... IndexError: too many indices... >>> yarray = yfull.view(float).reshape(len(yfull), -1) >>> print(yarray.shape) (10, 6) >>> print(yarray.dtype) float64 >>> print(yarray[0:4, 1:3]) [[ 0.0000e+00 0.0000e+00] [ 2.1672e-05 1.0093e-01] [ 1.6980e-05 1.4943e-01] [ 1.4502e-05 1.8209e-01]] """ solver = Solver(model, tspan, cleanup=cleanup, verbose=verbose, **integrator_options) solver.run(param_values, y0) species_names = ['__s%d' % i for i in range(solver.y.shape[1])] yfull_dtype = list(zip(species_names, itertools.repeat(float))) if len(solver.yobs.dtype): yfull_dtype += solver.yobs.dtype.descr if len(solver.yexpr.dtype): yfull_dtype += solver.yexpr.dtype.descr yfull = numpy.ndarray(len(solver.y), yfull_dtype) n_sp = solver.y.shape[1] n_ob = solver.yobs_view.shape[1] n_ex = solver.yexpr_view.shape[1] yfull_view = yfull.view(float).reshape(len(yfull), -1) yfull_view[:,:n_sp] = solver.y yfull_view[:,n_sp:n_sp+n_ob] = solver.yobs_view yfull_view[:,n_sp+n_ob:n_sp+n_ob+n_ex] = solver.yexpr_view return yfull
[docs]def setup_module(module): """Doctest fixture for nose.""" # Distutils' temp directory creation code has a more-or-less unsuppressable # print to stdout which will break the doctest which triggers it (via # scipy.weave.inline). So here we run an end-to-end test of the inlining # system to get that print out of the way at a point where it won't matter. # As a bonus, the test harness is suppressing writes to stdout at this time # anyway so the message is just swallowed silently. Solver._test_inline()