Source code for pysb.simulator.base

from abc import ABCMeta, abstractmethod
import numpy as np
import itertools
import sympy
import collections
import numbers
from pysb.core import MonomerPattern, ComplexPattern, as_complex_pattern, \
                      Component
from pysb.logging import get_logger, EXTENDED_DEBUG
import logging

try:
    import pandas as pd
except ImportError:
    pd = None


class SimulatorException(Exception):
    pass


class Simulator(object):
    """An abstract base class for numerical simulation of models.

    .. warning::
        The interface for this class is considered experimental and may
        change without warning as PySB is updated.

    Parameters
    ----------
    model : pysb.Model
        Model to simulate.
    tspan : vector-like, optional
        Time values over which to simulate. The first and last values define
        the time range. Returned trajectories are sampled at every value unless
        the simulation is interrupted for some reason, e.g., due to
        satisfaction
        of a logical stopping criterion (see 'tout' below).
    initials : vector-like or dict, optional
        Values to use for the initial condition of all species. Ordering is
        determined by the order of model.species. If not specified, initial
        conditions will be taken from model.initial_conditions (with
        initial condition parameter values taken from `param_values` if
        specified).
    param_values : vector-like or dict, optional
        Values to use for every parameter in the model. Ordering is
        determined by the order of model.parameters.
        If passed as a dictionary, keys must be parameter names.
        If not specified, parameter values will be taken directly from
        model.parameters.
    verbose : bool or int, optional (default: False)
        Sets the verbosity level of the logger. See the logging levels and
        constants from Python's logging module for interpretation of integer
        values. False is equal to the PySB default level (currently WARNING),
        True is equal to DEBUG.

    Attributes
    ----------
    verbose: bool
        Verbosity flag passed to the constructor.
    model : pysb.Model
        Model passed to the constructor.
    tspan : vector-like
        Time values passed to the constructor.

    Notes
    -----
    If ``tspan`` is not defined, it may be defined in the call to the
    ``run`` method.

    The dimensionality of ``tout`` depends on whether a single simulation
    or multiple simulations are run.

    The dimensionalities of ``y``, ``yobs``, ``yobs_view``, ``yexpr``, and
    ``yexpr_view`` depend on the number of simulations being run as well
    as on the type of simulation, i.e., spatial vs. non-spatial.

    """
    __metaclass__ = ABCMeta

    _supports = { 'multi_initials' : False,
                  'multi_param_values' : False }

    @abstractmethod
    def __init__(self, model, tspan=None, initials=None,
                 param_values=None, verbose=False, **kwargs):
        # Get or create base a PySB logger for this module and model
        self._logger = get_logger(self.__module__, model=model,
                                  log_level=verbose)
        self._logger.debug('Simulator created')
        self._model = model
        self.verbose = verbose
        self.tout = None
        # Per-run initial conditions/parameter/tspan override
        self.tspan = tspan
        self._initials = None
        self.initials = initials
        self._params = None
        self.param_values = param_values

    @property
    def model(self):
        return self._model

    @property
    def initials(self):
        return self._initials if isinstance(self._initials, np.ndarray) \
               else self._get_initials()

    @initials.setter
    def initials(self, new_initials):
        if new_initials is None:
            self._initials = None
        else:
            # check if new_initials is a Mapping, and if so validate the keys
            # (ComplexPatterns)
            if isinstance(new_initials, collections.Mapping):
                for cplx_pat, val in new_initials.items():
                    if not isinstance(cplx_pat, (MonomerPattern,
                                                 ComplexPattern)):
                        raise SimulatorException('Dictionary key %s is not a '
                                                 'MonomerPattern or '
                                                 'ComplexPattern' %
                                                 repr(cplx_pat))
                    # if val is a number, convert it to a single-element array
                    if not isinstance(val, (collections.Sequence, np.ndarray)):
                        val = [val]
                        new_initials[cplx_pat] = np.array(val)
                    # otherwise, check whether simulator supports multiple
                    # initial values
                    elif len(val) > 1:
                        if not self._supports['multi_initials']:
                            raise SimulatorException(
                                self.__class__.__name__ +
                                " does not support multiple initial"
                                " values at this time.")
                        if 1 < len(self.param_values) != len(val):
                            raise SimulatorException(
                                'Cannot set initials for {} simulations '
                                'when param_values has been set for {} '
                                'simulations'.format(
                                    len(val), len(self.param_values)))
                    if not np.isfinite(val).all():
                        raise SimulatorException('Please check initial {} '
                                                 'for non-finite '
                                                 'values'.format(cplx_pat))
                self._initials = new_initials
            else:
                if not isinstance(new_initials, np.ndarray):
                    new_initials = np.array(new_initials, copy=False)
                # if new_initials is a 1D array, convert to a 2D array of length 1
                if len(new_initials.shape) == 1:
                    new_initials = np.resize(new_initials, (1,len(new_initials)))
                # check whether simulator supports multiple initial values
                else:
                    if not self._supports['multi_initials']:
                        raise SimulatorException(
                            self.__class__.__name__ +
                            " does not support multiple initial"
                            " values at this time.")
                    if 1 < len(self.param_values) != new_initials.shape[0]:
                        raise SimulatorException(
                            'Cannot set initials for {} simulations '
                            'when param_values has been set for {} '
                            'simulations'.format(
                                new_initials.shape[0], len(self.param_values)))
                # make sure number of initials values equals len(model.species)
                if new_initials.shape[1] != len(self._model.species):
                    raise ValueError("new_initials must be the same length as "
                                     "model.species")
                if not np.isfinite(new_initials).all():
                    raise SimulatorException('Please check initials array'
                                             'for non-finite values')
                self._initials = new_initials

    def _get_initials(self):
        """
        Returns the model's initial conditions, with the order
        matching model.initial_conditions
        """
        # If we already have a list internally, just return that
        if isinstance(self._initials, np.ndarray):
            return self._initials
        # Otherwise, build the list from the model, and any overrides
        # specified in the self._initials dictionary
        if isinstance(self._initials, dict):
            try:
                n_sims_initials = len(self._initials.values()[0])
            except IndexError:
                n_sims_initials = 1
            # record the length of the arrays and make
            # sure they're all the same.
            for key, val in self._initials.items():
                if len(val) != n_sims_initials:
                    raise Exception("all arrays in new_initials dictionary "
                                    "must be equal length")
        else:
            n_sims_initials = 1
            self._initials = {}
        n_sims_params = len(self.param_values)
        n_sims_actual = max(n_sims_params, n_sims_initials)

        y0 = np.full((n_sims_actual, len(self.model.species)), np.nan)

        # note that param_vals is a 2D array
        subs = [dict((p, pv[i]) for i, p in enumerate(self._model.parameters))
                for pv in self.param_values]
        if len(subs) == 1 and n_sims_initials > 1:
            subs = list(itertools.repeat(subs[0], n_sims_initials))

        def _set_initials(initials_source):
            for cp, value_obj in initials_source:
                cp = as_complex_pattern(cp)
                si = self._model.get_species_index(cp)
                if si is None:
                    raise IndexError("Species not found in model: %s" %
                                     repr(cp))
                # Loop over all simulations
                for sim in range(len(y0)):
                    # If this initial condition has already been set, skip it
                    # (i.e., an override)
                    if not np.isnan(y0[sim][si]):
                        continue

                    def _get_value(sim):
                        if isinstance(value_obj, (collections.Sequence,
                                                  np.ndarray)) and \
                           isinstance(value_obj[sim], numbers.Number):
                            value = value_obj[sim]
                        elif isinstance(value_obj, Component):
                            if value_obj in self._model.parameters:
                                pi = self._model.parameters.index(value_obj)
                                value = self.param_values[
                                    sim if n_sims_params > 1 else 0][pi]
                            elif value_obj in self._model.expressions:
                                value = value_obj.expand_expr().evalf(subs=subs[sim])
                        else:
                            raise TypeError("Unexpected initial condition "
                                            "value type: %s" % type(value_obj))
                        return value

                    # initials from the model
                    if isinstance(initials_source, np.ndarray):
                        if len(initials_source.shape) == 1:
                            if sim == 0:
                                value = _get_value(0)
                            else:
                                # if the parameters are different for each sim,
                                # the expressions could be different too
                                if value_obj in self._model.expressions:
                                    value = value_obj.expand_expr().evalf(
                                        subs=subs[sim])
                                else:
                                    value = y0[sim-1][si]
                    # initials from dict
                    else:
                         value = _get_value(sim)
                    y0[sim][si] = value

        # Process any overrides
        if isinstance(self._initials, dict):
            _set_initials(self._initials.items())
        # Get remaining initials from the model itself
        _set_initials(self._model.initial_conditions)

        # Any remaining unset initials should be set to zero
        y0 = np.nan_to_num(y0)

        return y0

    @property
    def param_values(self):
        if self._params is not None and not isinstance(self._params, dict):
            return self._params
        else:
            # create parameter vector from the values in the model
            n_sims = 1
            if isinstance(self._params, dict):
                param_values_dict = self._params
                # record the length of the arrays and make
                # sure they're all the same.
                for key, val in param_values_dict.items():
                    if n_sims == 1:
                        n_sims = len(val)
                    elif len(val) != n_sims:
                        raise Exception("all arrays in new_params dictionary "
                                        "must be equal length")
            else:
                param_values_dict = {}
            param_values = np.array([p.value for p in self._model.parameters])
            param_values = np.repeat([param_values], n_sims, axis=0)
            # overrides
            for key in param_values_dict.keys():
                try:
                    pi = self._model.parameters.index(
                                    self._model.parameters[key])
                except KeyError:
                    raise IndexError("new_params dictionary has unknown "
                                     "parameter name (%s)" % key)
                # loop over n_sims
                for n in range(n_sims):
                    param_values[n][pi] = param_values_dict[key][n]
            # return array
            return param_values

    @param_values.setter
    def param_values(self, new_params):
        if new_params is None:
            self._params = None
            return
        if isinstance(new_params, dict):
            for key, val in new_params.items():
                if key not in self._model.parameters.keys():
                    raise IndexError("new_params dictionary has unknown "
                                     "parameter name (%s)" % key)
                # if val is a number, convert it to a single-element array
                if not isinstance(val, collections.Sequence):
                    new_params[key] = np.array([val])
                # otherwise, check whether simulator supports multiple
                # param_values
                elif len(val) > 1 and not self._supports['multi_param_values']:
                    raise SimulatorException(
                        self.__class__.__name__ +
                        " does not support multiple parameter"
                        " values at this time.")

            self._params = new_params
        else:
            if not isinstance(new_params, np.ndarray):
                new_params = np.array(new_params)
            # if new_params is a 1D array, convert to a 2D array of length 1
            if len(new_params.shape) == 1:
                new_params = np.resize(new_params, (1,len(new_params)))
            # check whether simulator supports multiple parameter values
            elif not self._supports['multi_param_values']:
                raise SimulatorException(
                    self.__class__.__name__ +
                    " does not support multiple parameter"
                    " values at this time.")
            # make sure number of param values equals len(model.parameters)
            if new_params.shape[1] != len(self._model.parameters):
                raise ValueError("new_params must be the same length as "
                                 "model.parameters")
            self._params = new_params

    @abstractmethod
    def run(self, tspan=None, initials=None, param_values=None):
        """Run a simulation.

        Implementations should return a :class:`.SimulationResult` object.
        """
        self._logger.info('Simulation(s) started')
        if tspan is not None:
            self.tspan = tspan
        if self.tspan is None:
            raise SimulatorException("tspan must be defined before "
                                     "simulation can run")
        if param_values is not None:
            self.param_values = param_values
        if initials is not None:
            self.initials = initials
        # If only one set of param_values, run all simulations
        # with the same parameters
        if len(self.param_values) == 1:
            self.param_values = np.repeat(self.param_values,
                                          len(self.initials),
                                          axis=0)

        assert len(self.initials) == len(self.param_values)

        # Error checks on 'param_values' and 'initials'
        if len(self.param_values) != len(self.initials):
            raise SimulatorException(
                    "'param_values' and 'initials' must be equal lengths.\n"
                    "len(param_values): %d\n"
                    "len(initials): %d" %
                    (len(self.param_values), len(self.initials)))
        elif len(self.param_values.shape) != 2 or \
                self.param_values.shape[1] != len(self._model.parameters):
            raise SimulatorException(
                    "'param_values' must be a 2D array of dimension N_SIMS x "
                    "len(model.parameters).\n"
                    "param_values.shape: " + str(self.param_values.shape) +
                    "\nlen(model.parameters): %d" %
                    len(self._model.parameters))
        elif len(self.initials.shape) != 2 or \
                self.initials.shape[1] != len(self._model.species):
            raise SimulatorException(
                    "'initials' must be a 2D array of dimension N_SIMS x "
                    "len(model.species).\n"
                    "initials.shape: " + str(self.initials.shape) +
                    "\nlen(model.species): %d" % len(self._model.species))
            # NOTE: Not sure if the check on 'initials' should be here or not.
            # Network-free simulators don't have species, but we will want to
            # allow users to supply initials. Right now, that will raise an
            # exception. Need to think about this. --LAH
        return None


[docs]class SimulationResult(object): """ Results of a simulation with properties and methods to access them. .. warning:: Please note that the interface for this class is considered experimental and may change without warning as PySB is updated. Notes ----- In the attribute descriptions, a "trajectory set" is a 2D numpy array, species on first axis and time on second axis, with each element containing the concentration or count of the species at the specified time. A list of trajectory sets contains a trajectory set for each simulation. Parameters ---------- simulator : Simulator The simulator object that generated the trajectories tout: list-like Time points returned by the simulator (may be different from ``tspan`` if simulation is interrupted for some reason). trajectories : list or numpy.ndarray A set of species trajectories from a simulation. Should either be a list of 2D numpy arrays or a single 3D numpy array. squeeze : bool, optional (default: True) Return trajectories as a 2D array, rather than a 3d array, if only a single simulation was performed. simulations_per_param_set : int Number of trajectories per parameter set. Typically always 1 for deterministic simulators (e.g. ODE), but with stochastic simulators multiple trajectories per parameter/initial condition set are often desired. Examples -------- The following examples use a simple model with three observables and one expression, with a single simulation. >>> from pysb.examples.expression_observables import model >>> from pysb.simulator import ScipyOdeSimulator >>> import numpy as np >>> np.set_printoptions(precision=4) >>> sim = ScipyOdeSimulator(model, tspan=np.linspace(0, 40, 10), \ integrator_options={'atol': 1e-20}) >>> simulation_result = sim.run() ``simulation_result`` is a :class:`SimulationResult` object. An observable can be accessed like so: >>> print(simulation_result.observables['Bax_c0']) \ #doctest: +NORMALIZE_WHITESPACE [ 1.0000e+00 1.1744e-02 1.3791e-04 1.6196e-06 1.9020e-08 2.2337e-10 2.6232e-12 3.0806e-14 3.6178e-16 4.2492e-18] It is also possible to retrieve the value of all observables at a particular time point, e.g. the final concentrations: >>> print(simulation_result.observables[-1]) \ #doctest: +NORMALIZE_WHITESPACE ( 4.2492e-18, 1.6996e-16, 1.) Expressions are read in the same way as observables: >>> print(simulation_result.expressions['NBD_signal']) \ #doctest: +NORMALIZE_WHITESPACE [ 0. 4.7847 4.9956 4.9999 5. 5. 5. 5. 5. 5. ] The species trajectories can be accessed as a numpy ndarray: >>> print(simulation_result.species) [[ 1.0000e+00 0.0000e+00 0.0000e+00] [ 1.1744e-02 5.2194e-02 9.3606e-01] [ 1.3791e-04 1.2259e-03 9.9864e-01] [ 1.6196e-06 2.1595e-05 9.9998e-01] [ 1.9020e-08 3.3814e-07 1.0000e+00] [ 2.2337e-10 4.9637e-09 1.0000e+00] [ 2.6232e-12 6.9951e-11 1.0000e+00] [ 3.0806e-14 9.5840e-13 1.0000e+00] [ 3.6178e-16 1.2863e-14 1.0000e+00] [ 4.2492e-18 1.6996e-16 1.0000e+00]] Species, observables and expressions can be combined into a single numpy ndarray and accessed similarly. Here, the initial concentrations of all these entities are examined: >>> print(simulation_result.all[0]) #doctest: +NORMALIZE_WHITESPACE ( 1., 0., 0., 1., 0., 0., 0.) The ``all`` array can be accessed as a pandas DataFrame object, which allows for more convenient indexing and access to pandas advanced functionality, such as indexing and slicing. Here, the concentrations of the observable ``Bax_c0`` and the expression ``NBD_signal`` are read at time points between 5 and 15 seconds: >>> df = simulation_result.dataframe >>> print(df.loc[5:15, ['Bax_c0', 'NBD_signal']]) \ #doctest: +NORMALIZE_WHITESPACE Bax_c0 NBD_signal time 8.888889 0.000138 4.995633 13.333333 0.000002 4.999927 """ def __init__(self, simulator, tout, trajectories, squeeze=True, simulations_per_param_set=1): simulator._logger.debug('SimulationResult constructor started') self.squeeze = squeeze self.tout = tout self._yfull = None self._model = simulator._model self.n_sims_per_parameter_set = simulations_per_param_set # Validate incoming trajectories if hasattr(trajectories, 'ndim') and trajectories.ndim == 3: # trajectories is a 3D array, create a list of 2D arrays # This is just a view and doesn't copy the data self._y = [tr for tr in trajectories] else: # Not a 3D array, check for a list of 2D arrays try: if any([tr.ndim != 2 for tr in trajectories]): raise AttributeError except (AttributeError, TypeError): raise ValueError("trajectories should be a 3D array or a list " "of 2D arrays") self._y = trajectories self._nsims = len(self._y) if len(self.tout) != self.nsims: raise ValueError("Simulator tout should be the same length as " "trajectories") for i in range(self.nsims): if len(self.tout[i]) != self._y[i].shape[0]: raise ValueError("The number of time points in tout[{0}] " "should match the trajectories array for " "simulation {0}".format(i)) if self._y[i].shape[1] != len(self._model.species): raise ValueError("The number of species in trajectory {0} " "should match length of " "model.species".format(i)) # Calculate ``yobs`` and ``yexpr`` based on values of ``y`` exprs = self._model.expressions_dynamic() expr_names = [expr.name for expr in exprs] model_obs = self._model.observables obs_names = list(model_obs.keys()) param_names = list(p.name for p in self._model.parameters) if not _allow_unicode_recarray(): for name_list, name_type in zip( (expr_names, obs_names, param_names), ('Expression', 'Observable', 'Parameter')): for i, name in enumerate(name_list): try: name_list[i] = name.encode('ascii') except UnicodeEncodeError: error_msg = 'Non-ASCII compatible' + \ '%s names not allowed' % name_type raise ValueError(error_msg) yobs_dtype = zip(obs_names, itertools.repeat(float)) if obs_names \ else float yexpr_dtype = zip(expr_names, itertools.repeat(float)) if expr_names \ else float self._yobs = [np.ndarray((len(self.tout[n]),), dtype=yobs_dtype) for n in range(self.nsims)] self._yobs_view = [self._yobs[n].view(float). reshape(len(self._yobs[n]), -1) for n in range( self.nsims)] self._yexpr = [np.ndarray((len(self.tout[n]),), dtype=yexpr_dtype) for n in range( self.nsims)] self._yexpr_view = [self._yexpr[n].view(float).reshape(len( self._yexpr[n]), -1) for n in range(self.nsims)] param_values = simulator.param_values # loop over simulations sym_names = obs_names + param_names expanded_exprs = [sympy.lambdify(sym_names, expr.expand_expr(), "numpy") for expr in exprs] for n in range(self.nsims): simulator._logger.log(EXTENDED_DEBUG, 'Evaluating exprs/obs %d/%d' % (n + 1, self.nsims)) # observables for i, obs in enumerate(model_obs): self._yobs_view[n][:, i] = ( self._y[n][:, obs.species] * obs.coefficients).sum(axis=1) # expressions sym_dict = dict((k, self._yobs[n][k]) for k in obs_names) sym_dict.update(dict((p.name, param_values[ n // self.n_sims_per_parameter_set][i]) for i, p in enumerate(self._model.parameters))) for i, expr in enumerate(exprs): self._yexpr_view[n][:, i] = expanded_exprs[i](**sym_dict) simulator._logger.debug('SimulationResult constructor finished') def _squeeze_output(self, trajectories): """ Reduces trajectories to a 2D matrix if only one simulation present Can be disabled by setting self.squeeze to False """ if self.nsims == 1 and self.squeeze: return trajectories[0] else: return trajectories @property def nsims(self): """ The number of simulations in this SimulationResult """ return self._nsims @property def all(self): """ Aggregate species, observables, and expressions trajectories into a numpy.ndarray with record-style data-type for return to the user. """ if self._yfull is None: sp_names = ['__s%d' % i for i in range(len(self._model.species))] yfull_dtype = zip(sp_names, itertools.repeat(float)) if len(self._model.observables): yfull_dtype += zip(self._model.observables.keys(), itertools.repeat(float)) if len(self._model.expressions_dynamic()): yfull_dtype += zip(self._model.expressions_dynamic().keys(), itertools.repeat(float)) yfull = len(self._y) * [None] # loop over simulations for n in range(self.nsims): yfull[n] = np.ndarray(len(self.tout[n]), yfull_dtype) yfull_view = yfull[n].view(float).reshape((len(yfull[n]), -1)) n_sp = self._y[n].shape[1] n_ob = self._yobs_view[n].shape[1] n_ex = self._yexpr_view[n].shape[1] yfull_view[:, :n_sp] = self._y[n] yfull_view[:, n_sp:n_sp + n_ob] = self._yobs_view[n] yfull_view[:, n_sp + n_ob:n_sp + n_ob + n_ex] = \ self._yexpr_view[n] self._yfull = yfull return self._squeeze_output(self._yfull) @property def dataframe(self): """ A conversion of the trajectory sets (species, observables and expressions for all simulations) into a single :py:class:`pandas.DataFrame`. """ if pd is None: raise Exception('Please "pip install pandas" for this feature') sim_ids = (np.repeat(range(self.nsims), [len(t) for t in self.tout])) times = np.concatenate(self.tout) if self.nsims == 1 and self.squeeze: idx = pd.Index(times, name='time') else: idx = pd.MultiIndex.from_tuples(zip(sim_ids, times), names=['simulation', 'time']) simdata = self.all if not isinstance(simdata, np.ndarray): simdata = np.concatenate(simdata) return pd.DataFrame(simdata, index=idx) @property def species(self): """ List of trajectory sets. The first dimension contains species. """ return self._squeeze_output(self._y) @property def observables(self): """ List of trajectory sets. The first dimension contains observables. """ return self._squeeze_output(self._yobs) @property def expressions(self): """ List of trajectory sets. The first dimension contains expressions. """ return self._squeeze_output(self._yexpr)
def _allow_unicode_recarray(): """Return True if numpy recarray can take unicode data type. In python 2, numpy doesn't allow unicode strings as names in arrays even if they are ascii encodeable. This function tests this directly. """ try: np.ndarray((1,), dtype=[(u'X', float)]) except TypeError: return False return True