from __future__ import print_function as _
import pysb.core
from pysb.generator.bng import BngGenerator
import os
import subprocess
import re
import itertools
import sympy
import numpy
import tempfile
from pkg_resources import parse_version
import abc
from warnings import warn
import shutil
try:
from cStringIO import StringIO
except ImportError:
from io import StringIO
try:
from future_builtins import zip
except ImportError:
pass
# Cached value of BNG path
_bng_path = None
def set_bng_path(dir):
global _bng_path
_bng_path = os.path.join(dir,'BNG2.pl')
# Make sure file exists and that it is not a directory
if not os.access(_bng_path, os.F_OK) or not os.path.isfile(_bng_path):
raise Exception('Could not find BNG2.pl in ' + os.path.abspath(dir) + '.')
# Make sure file has executable permissions
elif not os.access(_bng_path, os.X_OK):
raise Exception("BNG2.pl in " + os.path.abspath(dir) + " does not have executable permissions.")
def _get_bng_path():
"""
Return the path to BioNetGen's BNG2.pl.
Looks for a BNG distribution at the path stored in the BNGPATH environment
variable if that's set, or else in a few hard-coded standard locations.
"""
global _bng_path
# Just return cached value if it's available
if _bng_path:
return _bng_path
path_var = 'BNGPATH'
dist_dirs = [
'/usr/local/share/BioNetGen',
'c:/Program Files/BioNetGen',
]
# BNG 2.1.8 moved BNG2.pl up out of the Perl2 subdirectory, so to be more
# compatible we check both the old and new locations.
script_subdirs = ['', 'Perl2']
def check_dist_dir(dist_dir):
# Return the full path to BNG2.pl inside a BioNetGen distribution
# directory, or False if directory does not contain a BNG2.pl in one of
# the expected places.
for subdir in script_subdirs:
script_path = os.path.join(dist_dir, subdir, 'BNG2.pl')
if os.access(script_path, os.F_OK):
return script_path
else:
return False
# First check the environment variable, which has the highest precedence
if path_var in os.environ:
script_path = check_dist_dir(os.environ[path_var])
if not script_path:
raise Exception('Environment variable %s is set but BNG2.pl could'
' not be found there' % path_var)
# If the environment variable isn't set, check the standard locations
else:
for dist_dir in dist_dirs:
script_path = check_dist_dir(dist_dir)
if script_path:
break
else:
raise Exception('Could not find BioNetGen installed in one of the '
'following locations:' +
''.join('\n ' + d for d in dist_dirs))
# Cache path for future use
_bng_path = script_path
return script_path
[docs]class BngInterfaceError(RuntimeError):
"""BNG reported an error"""
pass
[docs]class BngBaseInterface(object):
""" Abstract base class for interfacing with BNG """
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def __init__(self, model, verbose=False, cleanup=False,
output_prefix=None, output_dir=None):
self.verbose = verbose
self.cleanup = cleanup
self.output_prefix = 'tmpBNG' if output_prefix is None else \
output_prefix
self.generator = BngGenerator(model)
self._check_model()
self.base_directory = tempfile.mkdtemp(prefix=self.output_prefix,
dir=output_dir)
def __enter__(self):
return self
@abc.abstractmethod
def __exit__(self):
return
def _delete_tmpdir(self):
shutil.rmtree(self.base_directory)
def _check_model(self):
"""
Checks a model has at least one initial condition and rule, raising
an exception if not
"""
if not self.model.rules:
raise NoRulesError()
if not self.model.initial_conditions and not any(r.is_synth() for r
in self.model.rules):
raise NoInitialConditionsError()
@classmethod
def _bng_param(cls, param):
"""
Ensures a BNG console parameter is in the correct format
Strings are double quoted and booleans are mapped to [0,1]. Other
types are currently used verbatim.
Parameters
----------
param :
An argument to a BNG action call
"""
if isinstance(param, str):
return '"%s"' % param
if isinstance(param, bool):
return 1 if param else 0
return param
@abc.abstractmethod
[docs] def action(self, action, **kwargs):
"""
Generates code to execute a BNG action command
Parameters
----------
action: string
The name of the BNG action function
kwargs: kwargs, optional
Arguments and values to supply to BNG
"""
return
@classmethod
def _format_action_args(cls, **kwargs):
"""
Formats a set of arguments for BNG
Parameters
----------
kwargs: kwargs, optional
Arguments and values to supply to BNG
"""
if kwargs:
action_args = ','.join('%s=>%s' % (k, BngConsole._bng_param(v))
for k, v in kwargs.items())
else:
action_args = ''
return action_args
@property
def model(self):
"""
Convenience method to get the PySB model itself
:return: PySB model used in BNG interface constructor
"""
return self.generator.model
@property
def base_filename(self):
"""
Returns the base filename (without extension) for BNG output files
"""
return os.path.join(self.base_directory, 'pysb')
@property
def bng_filename(self):
"""
Returns the BNG command list (.bngl) filename (does not check
whether the file exists)
"""
return self.base_filename + '.bngl'
@property
def net_filename(self):
"""
Returns the BNG network filename (does not check whether the file
exists)
"""
return self.base_filename + '.net'
[docs] def read_netfile(self):
"""
Reads a BNG network file as a string. Note that you must execute
network generation separately before attempting this, or the file will
not be found.
:return: Contents of the BNG network file as a string
"""
with open(self.net_filename, 'r') as net_file:
output = net_file.read()
return output
[docs] def read_simulation_results(self):
"""
Reads the results of a BNG simulation and parses them into a numpy
array
"""
# Read concentrations data
cdat_arr = numpy.loadtxt(self.base_filename + '.cdat', skiprows=1)
# Read groups data
if len(self.model.observables):
# Exclude first column (time)
gdat_arr = numpy.loadtxt(self.base_filename + '.gdat',
skiprows=1)[:,1:]
else:
gdat_arr = numpy.ndarray((len(cdat_arr), 0))
# -1 for time column
names = ['time'] + ['__s%d' % i for i in range(cdat_arr.shape[1]-1)]
yfull_dtype = list(zip(names, itertools.repeat(float)))
if len(self.model.observables):
yfull_dtype += list(zip(self.model.observables.keys(),
itertools.repeat(float)))
yfull = numpy.ndarray(len(cdat_arr), yfull_dtype)
yfull_view = yfull.view(float).reshape(len(yfull), -1)
yfull_view[:, :len(names)] = cdat_arr
yfull_view[:, len(names):] = gdat_arr
return yfull
[docs]class BngConsole(BngBaseInterface):
""" Interact with BioNetGen through BNG Console """
def __init__(self, model, verbose=False, cleanup=True,
output_dir=None, output_prefix=None, timeout=30,
suppress_warnings=False):
super(BngConsole, self).__init__(model, verbose, cleanup,
output_prefix, output_dir)
try:
import pexpect
except ImportError:
raise ImportError("Library 'pexpect' is required to use "
"BNGConsole, please install it to continue.\n"
"It is not currently available on Windows.")
self.suppress_warnings = suppress_warnings
try:
# Generate BNGL file
with open(self.bng_filename, mode='w') as bng_file:
bng_file.write(self.generator.get_content())
# Start BNG Console and load BNGL
self.console = pexpect.spawn('perl %s --console' % _get_bng_path(),
cwd=self.base_directory,
timeout=timeout)
self._console_wait()
self.console.sendline('load %s' % self.bng_filename)
self._console_wait()
except Exception as e:
raise BngInterfaceError(e)
def __exit__(self, exc_type, exc_val, exc_tb):
"""
In console mode, commands have already been executed, so we simply
close down the console and erase the temporary directory if applicable.
"""
self.console.sendline('done')
self.console.close()
if self.cleanup:
self._delete_tmpdir()
def _console_wait(self):
"""
Wait for BNG console to process the command, and return the output
:return: BNG console output from the previous command
"""
self.console.expect('BNG>')
if self.verbose:
print(self.console.before)
elif not self.suppress_warnings and "WARNING:" in self.console.before:
warn(self.console.before)
return self.console.before
[docs] def generate_network(self, overwrite=False):
"""
Generates a network in BNG and returns the network file contents as
a string
Parameters
----------
overwrite: bool, optional
Overwrite existing network file, if any
"""
try:
self.action('generate_network', overwrite=overwrite)
bng_network = self.read_netfile()
except Exception as e:
raise BngInterfaceError(e)
return bng_network
[docs] def action(self, action, **kwargs):
"""
Generates a BNG action command and executes it through the console,
returning any console output
Parameters
----------
action : string
The name of the BNG action function
kwargs : kwargs, optional
Arguments and values to supply to BNG
"""
# Process BNG arguments into a string
action_args = self._format_action_args(**kwargs)
# Execute the command via the console
self.console.sendline('action %s({%s})' % (action, action_args))
# Wait for the command to execute and return the result
return self._console_wait()
class BngFileInterface(BngBaseInterface):
def __init__(self, model, verbose=False, output_dir=None,
output_prefix=None, cleanup=True):
super(BngFileInterface, self).__init__(model, verbose, cleanup,
output_prefix, output_dir)
self._init_command_queue()
def _init_command_queue(self):
"""
Initializes the BNG command queue
"""
self.command_queue = StringIO()
self.command_queue.write('begin actions\n')
def __exit__(self, exc_type, exc_val, exc_tb):
"""
In file interface mode, we close the command queue buffer (whether
or not it's been executed) and erase the temporary directory if
applicable.
"""
self.command_queue.close()
if self.cleanup:
self._delete_tmpdir()
def execute(self, reload_netfile=False):
"""
Executes all BNG commands in the command queue.
Parameters
----------
reload_netfile: bool
If true, attempts to reload an existing .net file from a
previous execute() iteration. This is useful for running
multiple actions in a row, where results need to be read
into PySB before a new series of actions is executed.
"""
self.command_queue.write('end actions\n')
bng_commands = self.command_queue.getvalue()
try:
# Generate BNGL file
with open(self.bng_filename, 'w') as bng_file:
if reload_netfile:
bng_commands = bng_commands.replace('begin actions\n',
'begin actions\n\treadFile({'
'file=>"%s"});\n' % self.net_filename)
else:
bng_file.write(self.generator.get_content())
bng_file.write(bng_commands)
# Reset the command queue, in case execute() is called again
self.command_queue.close()
self._init_command_queue()
p = subprocess.Popen(['perl', _get_bng_path(), self.bng_filename],
cwd=self.base_directory,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
if self.verbose:
for line in iter(p.stdout.readline, b''):
print(line, end="")
(p_out, p_err) = p.communicate()
if p.returncode:
raise BngInterfaceError(p_out.rstrip("at line") +
"\n" +
p_err.rstrip())
except Exception as e:
raise BngInterfaceError(e)
def action(self, action, **kwargs):
"""
Generates a BNG action command and adds it to the command queue
Parameters
----------
action : string
The name of the BNG action function
kwargs : kwargs, optional
Arguments and values to supply to BNG
"""
# Process BNG arguments into a string
action_args = self._format_action_args(**kwargs)
# Add the command to the queue
self.command_queue.write('\t%s({%s})\n' % (action, action_args))
return
[docs]def run_ssa(model, t_end=10, n_steps=100, param_values=None, output_dir=None,
output_file_basename=None, cleanup=True, verbose=False,
**additional_args):
"""
Simulate a model with BNG's SSA simulator and return the trajectories.
Parameters
----------
model : Model
Model to simulate.
t_end : number, optional
Final time point of the simulation.
n_steps : int, optional
Number of steps in the simulation.
param_values : vector-like or dictionary, optional
Values to use for every parameter in the model. Ordering is
determined by the order of model.parameters.
If not specified, parameter values will be taken directly from
model.parameters.
output_dir : string, optional
Location for temporary files generated by BNG. If None (the
default), uses a temporary directory provided by the system. A
temporary directory with a random name is created within the
supplied location.
output_file_basename : string, optional
This argument is used as a prefix for the temporary BNG
output directory, rather than the individual files.
cleanup : bool, optional
If True (default), delete the temporary files after the simulation is
finished. If False, leave them in place. Useful for debugging.
verbose: bool, optional
If True, print BNG screen output.
additional_args: kwargs, optional
Additional arguments to pass to BioNetGen
"""
additional_args['method'] = 'ssa'
additional_args['t_end'] = t_end
additional_args['n_steps'] = n_steps
additional_args['verbose'] = verbose
if param_values is not None:
if len(param_values) != len(model.parameters):
raise Exception("param_values must be the same length as model.parameters")
for i in range(len(param_values)):
model.parameters[i].value = param_values[i]
with BngFileInterface(model, verbose=verbose, output_dir=output_dir,
output_prefix=output_file_basename,
cleanup=cleanup) as bngfile:
bngfile.action('generate_network', overwrite=True, verbose=verbose)
bngfile.action('simulate', **additional_args)
bngfile.execute()
output = bngfile.read_netfile()
# Parse netfile (in case this hasn't already been done)
if not model.species:
_parse_netfile(model, iter(output.split('\n')))
yfull = bngfile.read_simulation_results()
return yfull
[docs]def generate_network(model, cleanup=True, append_stdout=False, verbose=False):
"""
Return the output from BNG's generate_network function given a model.
The output is a BNGL model definition with additional sections 'reactions'
and 'groups', and the 'species' section expanded to contain all possible
species. BNG refers to this as a 'net' file.
Parameters
----------
model : Model
Model to pass to generate_network.
cleanup : bool, optional
If True (default), delete the temporary files after the simulation is
finished. If False, leave them in place (in `output_dir`). Useful for
debugging.
append_stdout : bool, optional
This option is no longer supported and has been left here for API
compatibility reasons.
verbose : bool, optional
If True, print output from BNG to stdout.
"""
with BngFileInterface(model, verbose=verbose, cleanup=cleanup) as bngfile:
net_filename = bngfile.base_filename + '.net'
bngfile.action('generate_network', overwrite=True, verbose=verbose)
bngfile.execute()
output = bngfile.read_netfile()
return output
[docs]def generate_equations(model, cleanup=True, verbose=False):
"""
Generate math expressions for reaction rates and species in a model.
This fills in the following pieces of the model:
* odes
* species
* reactions
* reactions_bidirectional
* observables (just `coefficients` and `species` fields for each element)
"""
# only need to do this once
# TODO track "dirty" state, i.e. "has model been modified?"
# or, use a separate "math model" object to contain ODEs
if model.odes:
return
lines = iter(generate_network(model,cleanup,verbose=verbose).split('\n'))
_parse_netfile(model, lines)
def _parse_netfile(model, lines):
"""Parse 'species', 'reactions', and 'groups' blocks from a BNGL net file."""
try:
global new_reverse_convention
(bng_version, bng_codename) = re.match(r'# Created by BioNetGen (\d+\.\d+\.\d+)(?:-(\w+))?$', next(lines)).groups()
if parse_version(bng_version) > parse_version("2.2.6") or parse_version(bng_version) == parse_version("2.2.6") and bng_codename == "stable":
new_reverse_convention = True
else:
new_reverse_convention = False
while 'begin species' not in next(lines):
pass
model.species = []
while True:
line = next(lines)
if 'end species' in line: break
_parse_species(model, line)
while 'begin reactions' not in next(lines):
pass
model.odes = [sympy.numbers.Zero()] * len(model.species)
global reaction_cache
reaction_cache = {}
while True:
line = next(lines)
if 'end reactions' in line: break
_parse_reaction(model, line)
# fix up reactions whose reverse version we saw first
for r in model.reactions_bidirectional:
if all(r['reverse']):
r['reactants'], r['products'] = r['products'], r['reactants']
r['rate'] *= -1
# now the 'reverse' value is no longer needed
del r['reverse']
while 'begin groups' not in next(lines):
pass
while True:
line = next(lines)
if 'end groups' in line: break
_parse_group(model, line)
except StopIteration as e:
pass
def _parse_species(model, line):
"""Parse a 'species' line from a BNGL net file."""
index, species, value = line.strip().split()
species_compartment_name, complex_string = re.match(r'(?:@(\w+)::)?(.*)', species).groups()
species_compartment = model.compartments.get(species_compartment_name)
monomer_strings = complex_string.split('.')
monomer_patterns = []
for ms in monomer_strings:
monomer_name, site_strings, monomer_compartment_name = re.match(r'(\w+)\(([^)]*)\)(?:@(\w+))?', ms).groups()
site_conditions = {}
if len(site_strings):
for ss in site_strings.split(','):
# FIXME this should probably be done with regular expressions
if '!' in ss and '~' in ss:
site_name, condition = ss.split('~')
state, bond = condition.split('!')
if bond == '?':
bond = pysb.core.WILD
elif bond == '!':
bond = pysb.core.ANY
else:
bond = int(bond)
condition = (state, bond)
elif '!' in ss:
site_name, condition = ss.split('!', 1)
if '!' in condition:
condition = [int(c) for c in condition.split('!')]
else:
condition = int(condition)
elif '~' in ss:
site_name, condition = ss.split('~')
else:
site_name, condition = ss, None
site_conditions[site_name] = condition
monomer = model.monomers[monomer_name]
monomer_compartment = model.compartments.get(monomer_compartment_name)
# Compartment prefix notation in BNGL means "assign this compartment to
# all molecules without their own explicit compartment".
compartment = monomer_compartment or species_compartment
mp = pysb.core.MonomerPattern(monomer, site_conditions, compartment)
monomer_patterns.append(mp)
cp = pysb.core.ComplexPattern(monomer_patterns, None)
model.species.append(cp)
def _parse_reaction(model, line):
"""Parse a 'reaction' line from a BNGL net file."""
(number, reactants, products, rate, rule) = line.strip().split(' ', 4)
# the -1 is to switch from one-based to zero-based indexing
reactants = tuple(int(r) - 1 for r in reactants.split(','))
products = tuple(int(p) - 1 for p in products.split(','))
rate = rate.rsplit('*')
(rule_list, unit_conversion) = re.match(
r'#([\w,\(\)]+)(?: unit_conversion=(.*))?\s*$', rule).groups()
rule_list = rule_list.split(',') # BNG lists all rules that generate a rxn
# Support new (BNG 2.2.6-stable or greater) and old BNG naming convention for reverse rules
rule_name, is_reverse = zip(*[re.subn('^_reverse_|\(reverse\)$', '', r) for r in rule_list])
is_reverse = tuple(bool(i) for i in is_reverse)
r_names = ['__s%d' % r for r in reactants]
combined_rate = sympy.Mul(*[sympy.S(t) for t in r_names + rate])
reaction = {
'reactants': reactants,
'products': products,
'rate': combined_rate,
'rule': rule_name,
'reverse': is_reverse,
}
model.reactions.append(reaction)
# bidirectional reactions
key = (reactants, products)
key_reverse = (products, reactants)
if key in reaction_cache:
reaction_bd = reaction_cache.get(key)
reaction_bd['rate'] += combined_rate
reaction_bd['rule'] += tuple(r for r in rule_name if r not in
reaction_bd['rule'])
elif key_reverse in reaction_cache:
reaction_bd = reaction_cache.get(key_reverse)
reaction_bd['reversible'] = True
reaction_bd['rate'] -= combined_rate
reaction_bd['rule'] += tuple(r for r in rule_name if r not in
reaction_bd['rule'])
else:
# make a copy of the reaction dict
reaction_bd = dict(reaction)
# default to false until we find a matching reverse reaction
reaction_bd['reversible'] = False
reaction_cache[key] = reaction_bd
model.reactions_bidirectional.append(reaction_bd)
# odes
for p in products:
model.odes[p] += combined_rate
for r in reactants:
model.odes[r] -= combined_rate
def _parse_group(model, line):
"""Parse a 'group' line from a BNGL net file."""
# values are number (which we ignore), name, and species list
values = line.strip().split()
obs = model.observables[values[1]]
if len(values) == 3:
# combination is a comma separated list of [coeff*]speciesnumber
for product in values[2].split(','):
terms = product.split('*')
# if no coeff given (just species), insert a coeff of 1
if len(terms) == 1:
terms.insert(0, 1)
obs.coefficients.append(int(terms[0]))
# -1 to change to 0-based indexing
obs.species.append(int(terms[1]) - 1)
[docs]class NoInitialConditionsError(RuntimeError):
"""Model initial_conditions is empty."""
def __init__(self):
RuntimeError.__init__(self, "Model has no initial conditions or "
"zero-order synthesis rules")
[docs]class NoRulesError(RuntimeError):
"""Model rules is empty."""
def __init__(self):
RuntimeError.__init__(self, "Model has no rules")