'''
Created on Mar 9, 2014
@author: thocu
'''
from collections import OrderedDict
import collections
import math
from orderedset import OrderedSet
import random
import string
import warnings
import Grid
from VirtualMicrobes.event.Molecule import Molecule, MoleculeClass
from VirtualMicrobes.event.Reaction import Degradation, Transport, Diffusion, Influx, \
ClassConvert, Convert
from VirtualMicrobes.my_tools.utility import OrderedDefaultdict
import VirtualMicrobes.my_tools.utility as util
from VirtualMicrobes.readwrite import read_obj
import itertools as it
import numpy as np
[docs]class Locality(object):
class_version = '1.0'
def __init__(self, params, internal_molecules, influx_reactions, degradation_reactions, env_rand,
max_time_course_length=0):
'''
Create a new locality.
initializes a locality that holds cells and molecules of gridpoints:
* external molecule dictionaries, and time courses to track external concentrations
* list of cells in this locality (usually 1)
* locality volume
Parameters
----------
params : :class:`attrdict.AttrDict`
mapping object that holds global simulation parameters
internal_molecules : :class:`VirtualMicrobes.Event.Molecule`
list of molecules
influx_reactions : :class:`VirtualMicrobes.Event.Reaction`
list of influx reactions into the locality
degradation : :class:`VirtualMicrobes.Event.Reaction`
list of degr reactions out of the locality
env_rand : RNG
max_time_course_length : int
the length of time courses kept in memory
'''
self.version = self.__class__.class_version
self.params = params
self.max_time_course_length = max_time_course_length
self.volume = self.params.per_grid_cell_volume
self.internal_molecules = internal_molecules
self.influx_reactions = influx_reactions
self.degradation_reactions = degradation_reactions
self.env_rand = env_rand
self.molecules = dict() # NOTE: unordered ok
self.molecules["gene_products"] = util.OrderedDefaultdict(util.GeneProduct)
self.molecules["small_molecules"] = util.OrderedDefaultdict(util.SmallMol)
self.init_mol_views()
self.cells = []
self.nr_time_points_stored = 0
self.init_external_mols()
self.init_variables_map()
self.new_concentrations = False
[docs] def init_variables_map(self):
self.dimension = 0
self.variables_map = dict() # NOTE: unordered ok
self.variables_map["small_molecules"] = dict() # NOTE: unordered ok : keys are unique objects
self.variables_map["gene_products"] = dict() # NOTE: unordered ok
self.variables_map["cell_growth"] = dict() # NOTE: unordered ok
self.variables_map["cell_toxicity"] =dict() # NOTE: unordered ok
self.variables_map["cell_volume"] =dict() # NOTE: unordered ok
[docs] def init_mol_views(self):
'''
Sets alias to molecules in locality (QoL implementation)
'''
self.small_mols, self.gene_products = (self.molecules['small_molecules'].viewkeys(),
self.molecules['gene_products'].viewkeys())
[docs] def get_mol_influx_dict(self): # NOTE: unordered ok : use for output only
''' Get influx dictionary from locality dictionary '''
return dict([ (mol, self.molecules['small_molecules'][mol].influx)
for mol in self.small_mols] )
[docs] def get_mol_concentration_dict(self): # NOTE: unordered ok : use for output only
''' Get molecule concentration dictionary '''
return dict([ (mol, self.get_small_mol_conc(mol))
for mol in self.small_mols] )
[docs] def get_mol_time_course_dict(self, max_tp=None):
''' Get time course of molecule '''
if max_tp is None:
max_tp = self.nr_time_points_stored
return dict([ (mol, np.vstack((self.time_points[:max_tp],
self.molecules['small_molecules'][mol].time_course[:max_tp])))
for mol in self.small_mols] )
def _get_mol_property_dict(self, prop_name):
mol_property_dict = dict() # NOTE: unordered ok : use for output only
for mol in self.small_mols:
mol_property_dict[mol] = self.molecules["small_molecules"][mol][prop_name]
return mol_property_dict
[docs] def get_mol_per_class_influx_dict(self): # NOTE: not used, purge?
mol_property_dict = util.OrderedDefaultdict(dict) # NOTE: unordered ok
for mol in self.small_mols:
mol_property_dict[str(mol.paired.mol_class)][mol] = self.molecules["small_molecules"][mol].influx
return mol_property_dict
[docs] def get_mol_per_class_concentration_dict(self): # NOTE: not used, purge?
mol_property_dict = util.OrderedDefaultdict(dict) # NOTE: unordered ok
for mol in self.small_mols:
mol_property_dict[str(mol.paired.mol_class)][mol] = self.get_small_mol_conc(mol)
return mol_property_dict
[docs] def get_cells(self):
return self.cells[:]
[docs] def init_external_mols(self, conc=None):
if conc is None:
conc=10e-20
for mol_ext in [ mol_int.paired for mol_int in self.internal_molecules]:
self.add_small_molecule(mol_ext, conc)
[docs] def add_small_molecule(self, mol, concentration=0.):
self.molecules["small_molecules"][mol].influx = self.influx_reactions[mol], 0.
self.molecules["small_molecules"][mol].degradation = self.degradation_reactions[mol.paired], 0.
self.init_mol_time_course(self.molecules["small_molecules"][mol])
self.set_small_mol_conc(mol, concentration)
[docs] def update_small_mol_concentrations(self, concentration_dict):
for mol_ext, conc in concentration_dict.items():
self.set_small_mol_conc(mol_ext, conc)
self.new_concentrations = True
[docs] def update_small_mol_degradation_rates(self, degradation_dict):
for mol_ext, degr in degradation_dict.items():
(degr_reac, _) = self.molecules["small_molecules"][mol_ext].influx
self.molecules["small_molecules"][mol_ext].degradation = (degr_reac,degr)
[docs] def update_small_mol_influxes(self, influx_dict, no_influx=1e-20):
for mol_ext, influx in influx_dict.items():
if influx is None:
influx = no_influx
(influx_reac,_) = self.molecules["small_molecules"][mol_ext].influx
self.molecules["small_molecules"][mol_ext].influx = (influx_reac, influx)
[docs] def map_variables(self):
'''
Setup of the mapping of variables in the system to indexes used by the C
encoded integrator part of the simulation:
* small molecules (metabolites, toxins, resources)
* gene products
In practice we start with an empty dictionary and let 'system' map
molecules and gene products sepselfarately, creating the indexes on the go
and reporting back on the last selfused index.
'''
self.init_variables_map()
self.map_external_molecules()
for cell in self.cells:
self.map_cell_internal_mols(cell)
self.map_cell_gene_products(cell)
[docs] def map_external_molecules(self):
'''
Mapping of the small molecules to indexes for use in the C integrator.
The map is an initially empty dictionary. The first level of indexing is
the container , while the second level is the molecules (type)
. The container can be a cell or it can be the environment. (see
also map_variables)
:param start_index:
'''
for mol in self.internal_molecules:
external_mol = mol.paired
if self not in self.variables_map['small_molecules']:
self.variables_map["small_molecules"][self] = dict() # NOTE: unordered ok: only accessed by unique key
self.variables_map["small_molecules"][self][external_mol] = self.dimension
self.dimension += 1
[docs] def map_cell_internal_mols(self, cell):
for mol in self.internal_molecules:
if cell not in self.variables_map["small_molecules"]:
self.variables_map["small_molecules"][cell] = dict() # NOTE: unordered ok: only accessed by unique key
self.variables_map["small_molecules"][cell][mol] = self.dimension
self.dimension += 1
[docs] def map_cell_gene_products(self, cell):
'''
The mapping is a dictionary of dictionaries; in this way, it is
possible to recognize the same protein in the two different cells as two
separate variables. (see also map_external_molecules)
:param start_index:
'''
if cell not in self.variables_map["gene_products"]:
self.variables_map["gene_products"][cell] = dict() # NOTE: unordered ok: only accessed by unique key
for prod in cell.gene_products:
self.variables_map["gene_products"][cell][prod] = self.dimension
self.dimension += 1
self.variables_map["cell_growth"][cell] = self.dimension
self.dimension += 1
self.variables_map["cell_toxicity"][cell] = self.dimension
self.dimension += 1
self.variables_map["cell_volume"][cell] = self.dimension
self.dimension += 1
[docs] def init_mol_time_course(self, mol_struct, length=None):
mol_struct.time_course = util.time_course_array(length)
[docs] def init_time_courses(self, length=None):
'''initialize an array to hold time course data of molecules
:param new_max_time_points: max number of time points
'''
if length is None:
length = self.max_time_course_length
self.time_points = util.time_course_array(length)
for mol_struct in self.molecules["small_molecules"].values():
self.init_mol_time_course(mol_struct, length)
for mol_struct in self.molecules["gene_products"].values():
self.init_mol_time_course(mol_struct, length)
[docs] def resize_time_courses(self, new_max_time_points):
if new_max_time_points > self.max_time_course_length:
self.max_time_course_length = new_max_time_points
self.init_time_courses()
[docs] def clear_mol_time_courses(self):
for mol_struct in self.molecules["small_molecules"].values():
mol_struct.time_course = None
for mol_struct in self.molecules["gene_products"].values():
mol_struct.time_course = None
[docs] def get_small_mol_conc(self, mol):
if self.tp_index is not None:
return self.molecules['small_molecules'][mol].time_course[self.tp_index]
else:
return self.molecules['small_molecules'][mol].concentration
[docs] def get_internal_mol_conc(self, mol):
cells = self.cells
if len(cells) > 0:
return cells[0].get_small_mol_conc(mol)
else:
return 0.0
[docs] def get_expression_level(self, rea, exporting=False):
cells = self.cells
if len(cells) > 0:
return cells[0].get_total_expression_level(rea, exporting=exporting)
else:
return 0.0
[docs] def set_small_mol_conc(self, mol, val):
#print "setting " + str(mol) + " to " + str(val) # Debugging purposes
if self.tp_index is not None:
self.molecules['small_molecules'][mol].time_course[self.tp_index] = val
self.molecules['small_molecules'][mol].concentration = val
#print self.molecules['small_molecules'][mol].time_course
[docs] def get_gene_prod_conc(self, gene):
if self.tp_index is not None:
return self.molecules['gene_products'][gene].time_course[self.tp_index]
else:
return self.molecules['gene_products'][gene].concentration
[docs] def set_gene_prod_conc(self, gene, conc):
if self.tp_index is not None:
self.molecules['gene_products'][gene].time_course[self.tp_index] = conc
self.molecules['gene_products'][gene].concentration = conc
@property
def tp_index(self):
index = self.nr_time_points_stored - 1
return index if index > -1 else None
[docs] def set_mol_concentrations_from_time_point(self):
for mol in self.small_mols:
self.set_small_mol_conc(mol, self.get_small_mol_conc(mol))
for gene_prod in self.gene_products:
self.set_gene_prod_conc(gene_prod, self.get_gene_prod_conc(gene_prod))
[docs] def spill_dead_cell_contents(self, cell, prod_as_bb=None, factor=None):
if factor is None:
factor = self.params.spill_conc_factor
if prod_as_bb is None:
prod_as_bb = self.params.spill_product_as_bb
util.within_range(factor, (0., 1.))
for m in cell.small_mols:
cell_conc = cell.get_small_mol_conc(m)
ext_conc = self.get_small_mol_conc(m.paired)
self.set_small_mol_conc(m.paired, ext_conc + (factor * cell_conc * cell.volume)/ self.volume)
if prod_as_bb:
bbs = [bb for bb in cell.small_mols if bb.is_building_block]
sum_stoich = sum(cell.building_blocks_dict.values())
product = cell.raw_production
for bb in bbs:
#print 'spilling {} product as {} of bb {}'.format(product, (cell.building_blocks_dict[bb]/sum_stoich)*(product*cell.volume)/self.volume ,bb )
if self.tp_index is not None:
ext_conc = self.get_small_mol_conc(bb.paired)
self.set_small_mol_conc(bb.paired, ext_conc + (cell.building_blocks_dict[bb]/sum_stoich)*(product*cell.volume)/self.volume)
self.new_concentrations = True
[docs] def clear_locality(self):
'''
Clear the cells in this locality.
'''
for cell in self.cells:
self.remove_cell(cell)
[docs] def remove_cell(self, cell):
self.cells.remove(cell)
[docs] def add_cell(self, cell):
self.cells.append(cell)
def _clear_dead_cells(self):
removed = []
for c in self.get_cells():
if not c.alive:
if self.params.spill_conc_factor and not c.wiped:
self.spill_dead_cell_contents(c)
self.remove_cell(c)
removed.append(c)
return removed
def __setitem__(self,key,value):
self.params[key] = value
def __getitem__(self,key):
return self.params[key]
[docs] def upgrade(self):
'''
Upgrading from older pickled version of class to latest version. Version
information is saved as class variable and should be updated when class
invariants (e.g. fields) are added.
'''
version = float(self.version)
if version < 1.:
self.new_concentrations = False
self.version = self.class_version
if version > float(self.class_version):
print 'upgraded class',
else:
print 'reset class',
print self.__class__.__name__, ' from version', version ,'to version', self.version
def __getstate__(self):
odict = self.__dict__.copy() # copy the dict since we change it
del odict['small_mols'] # remove filehandle entry
del odict['gene_products']
return odict
def __setstate__(self, obj_dict):
self.__dict__.update(obj_dict) # update attributes
self.init_mol_views()
if not hasattr(self, 'version'):
self.version = '0.0'
if self.version != self.class_version:
self.upgrade()
[docs]class Environment(object):
class_version = '1.1'
'''
'''
def __init__(self,params, max_time_course_length=0):
'''
:param nr_resource_classes:
:param mol_per_res_class:
:param nr_energy_classes:
:param mol_per_ene_class:
:param nr_building_blocks: how many molecules are building blocks
:param consume_range: range for stoichiometric constants for a resource of a conversion reaction
:param max_yield: maximum stoichiometric yield of new product in a conversion reaction
:param transport_cost_range: range of stoichiometric energy cost of an import reaction
:param per_grid_cell_volume:
:param fraction_realized_reactions: fraction of actualized reactions in the simulation of all possible reactions given the set of molecule classes and possible reaction schemes
:param fraction_mol_transports: fraction of resources that have a import reaction assigned for importing it
:param reaction_schemes: nr-of-resources to nr-of-product pairs that define the potential reaction space
:param number_localities:
:param toxicity:
'''
self.version = self.__class__.class_version
### take a reference of the global paramaters ###
self.params = params
self.max_time_course_length = max_time_course_length
self._init_rand_gens(self.params.env_rand_seed)
self.resource_classes = collections.OrderedDict()
self.energy_classes = collections.OrderedDict()
if self.params.env_from_file is not None: # Follow similar structure as random init below, but based on the composing dict
composing_dict = read_obj.parse_environment_stringrepr(self.params.env_from_file)
read_obj.init_mol_classes(self, composing_dict['molecules']) # All molecules are now defined
read_obj.init_reactions(self, composing_dict) # Reactions parsed. Also handles toxicity, so init_mol_toxicity is not necessary
read_obj.init_membrane_diffusion_dict(self, composing_dict['membrane diffusion rates'])
read_obj.init_degradation_dict(self, composing_dict['degradation rates'])
read_obj.read_grid_params(self, composing_dict['grid'])
self.init_grid()
if(params.init_pop_size is None):
params.init_pop_size = params.grid_rows * params.grid_cols # This is usually set even before the env is initialised, so it needs to be reset here if you change the gridsize
read_obj.init_influx_dicts(self, flux_dicts=composing_dict['grid'], flux=self.params.influx)
self.init_external_mol_vals_on_grid(self.params.init_external_conc)
else:
self.init_mol_class_sizes()
self.init_mol_classes()
self.init_reactions()
self.init_mol_toxicities()
self.init_influxed_mols()
self.init_global_influx_dict()
self.init_degradation_dict()
self.init_membrane_diffusion_dict()
self.init_grid()
self.init_sub_envs()
self.init_external_mol_vals_on_grid()
if self.params.microfluid is not None:
self.init_microfluid_cycle()
def _init_rand_gens(self, env_rand_seed=None):
if env_rand_seed is None:
env_rand_seed = self.params.env_rand_seed
self.env_rand = random.Random(int(env_rand_seed))
[docs] def init_mol_class_sizes(self):
nr_resource_classes = self.params.nr_resource_classes
mol_per_res_class = self.params.mol_per_res_class
nr_energy_classes = self.params.nr_energy_classes
mol_per_ene_class = self.params.mol_per_ene_class
if isinstance(mol_per_res_class, int): # If parameter is an int
self.resource_mol_class_sizes = [ mol_per_res_class for _ in range(nr_resource_classes)]
else: #assume a range
self.resource_mol_class_sizes = [ self.env_rand.randint(*mol_per_res_class) for _ in range(nr_resource_classes)]
if isinstance(mol_per_ene_class, int):
self.energy_mol_class_sizes = [ mol_per_ene_class for _ in range(nr_energy_classes) ]
else:
self.energy_mol_class_sizes = [ self.env_rand.randint(*mol_per_ene_class) for _ in range(nr_energy_classes) ]
[docs] def init_mol_toxicities(self, toxicity_avrg=None, toxicity_variance_shape=None,
toxic_building_block=None):
if toxicity_avrg is None:
toxicity_avrg = self.params.toxicity
if toxicity_variance_shape is None:
toxicity_variance_shape = self.params.toxicity_variance_shape
if toxic_building_block is None:
toxic_building_block = self.params.toxic_building_blocks
for mol in self.internal_molecules:
if mol.is_building_block and not toxic_building_block:
toxicity = 0
elif toxicity_variance_shape is not None:
theta = toxicity_avrg / toxicity_variance_shape
toxicity = self.env_rand.gammavariate(toxicity_variance_shape, theta)
else:
toxicity = toxicity_avrg
mol.toxic_level = toxicity
[docs] def find_reaction(self, match_string):
'''
Returns reaction that matches stringrepr.
'''
reac_repr_dict = dict( [ (rea.short_repr(), rea) for rea in self.transports+self.conversions ])
return reac_repr_dict[match_string]
[docs] def update_reaction_universe(self, path):
'''
Updates reaction universe by adding newly added molecules and reactions to the environment.
Does NOT support the removal of metabolites / reactions, as this will often be very
artificial for cells that are still using them.
'''
composing_dict = read_obj.parse_environment_stringrepr(path)
read_obj.init_mol_classes(self, composing_dict['molecules'], reset=False) # All molecules are now defined
read_obj.init_reactions(self, composing_dict, reset=False) # Reactions parsed. Also handles toxicity, so init_mol_toxicity is not necessary
read_obj.init_membrane_diffusion_dict(self, composing_dict['membrane diffusion rates'])
read_obj.init_degradation_dict(self, composing_dict['degradation rates'])
#self.grid.toggle_gps_updated()
old_grid = self.grid
self.init_grid()
print 'made a new grid'
for loc in self.localities:
loc.init_time_courses() # I think this works?
for gp_new,gp_old in zip(self.grid.gp_iter,old_grid.gp_iter): # Add cells back to the new grid
for cell in gp_old.content.cells:
gp_new.content.add_cell(cell)
for cell in gp_new.content.cells:
cell.update_small_molecules(env=self, conc=10e-20) #conc = conc of new mols
cell.init_cell_time_courses()
for m in gp_old.content.molecules['small_molecules']:
conc = gp_old.content.get_small_mol_conc(m)
#print 'setting ', conc
gp_new.content.init_external_mols(conc)
gp_new.content.init_variables_map()
read_obj.init_influx_dicts(self, flux_dicts=composing_dict['grid'], flux=self.params.influx)
[docs] def print_values(self):
print 'conversion reactions:'
for c in self.conversions:
print c
print
print 'transport reactions:'
for t in self.transports:
print t
print
print 'toxicities:'
for mol in self.internal_molecules:
print mol, '\t', mol.toxic_level
print
print 'degradation rates:'
for mol, degr in self.degradation_dict.items():
print mol, '\t', degr
print
print 'membrane diffusion rates:'
for mol, diff in self.membrane_diffusion_dict.items():
print mol, '\t', diff
print
print 'influx rates:'
for i, sub_env in enumerate(self.subenvs):
print 'subenv {}'.format(i)
for mol,inf_rate in sub_env.influx_dict.items():
print '\t', mol, '\t', inf_rate
print
print 'not influxed:'
for mol in [ mol for mol in self.external_molecules if mol not in self.influxed_mols ]:
print mol
@property
def molecule_classes(self):
return self.resource_classes.values() + self.energy_classes.values()
@property
def energy_mols(self):
return [ m for ec in self.energy_classes.values() for m in ec.molecules ]
[docs] def select_building_blocks(self, nr_bb, mol_classes, substrate_weight_function = lambda m: m.energy_level, weight_scaling=2, rand_gen=None):
if rand_gen is None:
rand_gen = self.env_rand
bb_class_sample = util.OrderedSet()
mol_classes = util.OrderedSet(mol_classes)
while len(bb_class_sample) < nr_bb and len(mol_classes):
weighted_classes = [ ( m, pow(substrate_weight_function(m), weight_scaling) ) for m in mol_classes ]
mol_class, _energy , _ = self.pick_mol_class(weighted_classes, rand_gen)
bb_class_sample.add(mol_class)
mol_classes.remove(mol_class)
return bb_class_sample
[docs] def init_mol_classes(self):
nr_building_blocks = self.params.nr_building_blocks
res_energy_range = self.params.res_energy_range
ene_energy_range = self.params.ene_energy_range
mol_class_names = (n+str(i) if i>0 else n for i in xrange(100) for n in string.uppercase)
self.internal_molecules = []
for s in self.resource_mol_class_sizes:
mol_class_name = mol_class_names.next()
mol_names = [ mol_class_name.lower()+"."+str(i) for i in range(s) ]
molecules = [ Molecule(name=n, environment=self) for n in mol_names ] #creates external mol variatns implicitly
self.internal_molecules += molecules
mol_class = MoleculeClass(name=mol_class_name,
energy_level=self.env_rand.randint(*res_energy_range),
molecule_species=molecules)
self.resource_classes[mol_class.name] = mol_class
if self.params.high_energy_bbs:
bb_class_sample = self.select_building_blocks(nr_building_blocks, self.resource_classes.values())
else:
bb_class_sample = self.env_rand.sample(self.resource_classes.values(), min(len(self.resource_classes), nr_building_blocks))
building_blocks = []
while len(building_blocks) < nr_building_blocks:
for rc in bb_class_sample:
if not util.OrderedSet(rc.molecules) - util.OrderedSet(building_blocks):
continue
while True: #TODO: make this crappy function better
bb_temp = self.env_rand.choice(rc.molecules.values())
if bb_temp not in building_blocks:
building_blocks.append(bb_temp)
break
for block in building_blocks:
block.set_building_block()
for s in self.energy_mol_class_sizes:
mol_class_name = mol_class_names.next()
mol_names = [ mol_class_name.lower()+"."+str(i) for i in range(s) ]
molecules = [ Molecule(name=n, environment=self) for n in mol_names ]
self.internal_molecules += molecules
mol_class = MoleculeClass(name=mol_class_name,
energy_level=self.env_rand.randint(*ene_energy_range),
molecule_species=molecules, is_energy=True)
self.energy_classes[mol_class.name] = mol_class
[docs] def init_reactions(self):
self.init_diffusion()
self.init_degradation()
self.init_reaction_universe()
self.conversions += self.init_class_conversions(1.) #fraction_realized_reactions)
self.init_transports()
self.init_influx()
self.init_mols_to_reactions()
[docs] def pick_mol_class(self, weighted_classes, rand_gen=None):
'''
Randomly select a molecule class with probability proportional to a weight.
Parameters
----------
weighted_classes : list of :class:`VirtualMicrobes.event.Molecule.MoleculeClass`, weight (float) tuples
molecule classes with their respective weights
rand_gen : RNG
RNG
Returns
-------
:class:`VirtualMicrobes.event.Molecule.MoleculeClass`, energy, pos index
the selected class with its energy level and list index
'''
if rand_gen is None:
rand_gen = self.env_rand
total_ene = sum( [ w for m,w in weighted_classes] )
rand = rand_gen.uniform(0, 1.) * total_ene
cumulative_chance = 0.
catabolite, energy, index = None, 0., None
for i,(m, w) in enumerate(weighted_classes):
cumulative_chance += w
if rand <= cumulative_chance:
catabolite, energy, index = m, m.energy_level, i
break
return catabolite, energy, index
[docs] def resource_combis_within_energy(self, resource_classes, nr, total_energy):
'''
Return combinations of resource classes that have exactly a total sum in energy values.
Parameters
----------
resource_classes : set of :class:`VirtualMicrobes.event.Molecules.MoleculeClass`s
the set to make combinations from
nr : int
number of MCs to combine
total_energy : int
the sum of energy values that MCs should have
Returns
-------
combinations of MCs
'''
if nr > 1:
combis = it.product(*[resource_classes]*nr)
else:
combis = [(r,) for r in resource_classes ]
if isinstance(total_energy, tuple):
low,high = total_energy
filtered_low = [ c for c in combis if sum( map(lambda m: m.energy_level, c) ) >= low ]
filtered = [ c for c in filtered_low if sum( map(lambda m: m.energy_level, c) ) <= high ]
else:
filtered = [ c for c in combis if sum( map(lambda m: m.energy_level, c) ) == total_energy ]
return filtered
[docs] def rand_catabolic_reaction_scheme(self, substrate_classes, product_classes, energy_classes,
max_products, min_energy,rand_gen=None): # lambda m: 1.): # alternatively
'''
Construct a random catabolic reaction scheme.
Typically a catabolic reaction breaks down a large, energy rich molecule
into smaller molecules. Energy molecules may be produced in this
reaction.
Construction is done by randomly selecting a molecule class as the
substrate of the reaction and than select the molecule classes that will
be the product of the reaction. The total energy of reactants = products
(+ energy).
Parameters
----------
substrate_classes : list of :class:`VirtualMicrobes.event.Molecules.MoleculeClass`
all resource molecule classes
product_classes : list of :class:`VirtualMicrobes.event.Molecules.MoleculeClass`
molecule classes that can be products of the reaction
energy_classes : list of :class:`VirtualMicrobes.event.Molecules.MoleculeClass`
energy molecule classes
max_products : int
maximum number of products in reaction scheme
min_energy : int
minimum yield in energy metabolites
Returns
-------
reaction scheme tuple ( list of reactants, list of products, stoichiometries
'''
if rand_gen is None:
rand_gen = self.env_rand
# weigh potential substrate classes by their energy level
potential_substrates = substrate_classes[:]
while potential_substrates:
# start by randomly picking a resource class as the substrate of the reaction
substrate = rand_gen.choice(potential_substrates)
# remove this substrate from the list, so we don't pick it again.
energy = substrate.energy_level
potential_substrates.remove(substrate)
products = []
try:
product_classes.remove(substrate) # make sure we cannot produce the substrate
except:
pass
# set initial energy_yield equal to energy of substrate
energy_yield = energy
# subtract the energy that should be converted to energy molecules
nr_products = rand_gen.randint(1,max_products)
if isinstance(min_energy, tuple):
min_,max_ = min_energy
if nr_products == 1: # there is a single product, the reaction should yield a minimum of 1 energy
min_= max(1, min_)
max_ = max(1,max_)
energy_yield = energy_yield - max_, energy_yield - min_
else:
min_ = min_energy
if nr_products == 1:
min_ = max(1,min_)
energy_yield -= min_
potential_combinations = self.resource_combis_within_energy(product_classes, nr_products, energy_yield)
if not potential_combinations:
continue # try again, if the substrate list is not empty
products = list(rand_gen.choice(potential_combinations))
for p in products:
energy -= p.energy_level
break
if not products:
return None
while True:
potential_energy_classes = [ m for m in energy_classes if m.energy_level <= energy]
if not potential_energy_classes:
break
energy_class = rand_gen.choice(potential_energy_classes)
products.append(energy_class)
energy -= energy_class.energy_level
products, stois = zip(*sorted(collections.Counter(products).items()))
reaction_scheme = ((substrate,), tuple(products), tuple([1] + list(stois)))
return reaction_scheme
[docs] def rand_anabolic_reaction_scheme(self, resource_classes, energy_classes,
product_classes, nr_reactants, max_products,
max_free_energy, max_ene_energy=1, rand=None):
'''
Construct a random anabolic reaction scheme.
Construction is done by randomly selecting a molecule classes as the
substrates of the reaction and than select the molecule classes that
will be the product of the reaction. The total energy of
reactants ( + energy) >= products.
Parameters
----------
resource_classes : list of :class:`VirtualMicrobes.event.Molecules.MoleculeClass`
all resource molecule classes
energy_classes : list of :class:`VirtualMicrobes.event.Molecules.MoleculeClass`
energy molecule classes
max_products : int
maximum number of products in reaction scheme
max_ene_energy : int
maximum energy level that can be provided by energy metabolites as reaction substrate
Returns
-------
reaction scheme tuple ( list of reactants, list of products, stoichiometries
'''
if rand is None:
rand = self.env_rand
if isinstance(nr_reactants, tuple):
nr_reac = rand.randint(*nr_reactants)
else:
nr_reac = nr_reactants
if nr_reac > 1:
combis = list(it.product(*[resource_classes]*nr_reac))
else:
combis = [(r,) for r in resource_classes ]
while len(combis):
potential_products = product_classes[:]
combi = rand.choice(combis)
combis.remove(combi)
reactants = list(combi)
free_energy = 0
for r in reactants:
free_energy += r.energy_level
for reactant in set(reactants):
try:
potential_products.remove(reactant) # ensure that we will not produce one of the reactants
except:
pass
nrs = range(1,max_products+1)
rand.shuffle(nrs)
for nr_products in nrs:
min_energy = free_energy - max_free_energy
max_energy = free_energy + max_ene_energy
product_combis = self.resource_combis_within_energy(potential_products, nr_products, (min_energy,max_energy))
if product_combis:
break
if not product_combis:
return None
products = list(rand.choice(product_combis))
for p in products:
free_energy -= p.energy_level
while True: # add energy to lhs if needed
if free_energy >= 0:
break
rand_ene = rand.choice(energy_classes)
reactants.append(rand_ene)
free_energy += rand_ene.energy_level
reactants, reac_stois = zip(*sorted(collections.Counter(reactants).items()))
products, prod_stois = zip(*sorted(collections.Counter(products).items()))
reaction_scheme = (tuple(reactants), tuple(products), tuple(list(reac_stois) + list(prod_stois)))
return reaction_scheme
[docs] def init_catabolic_reactions(self, nr_catabolic=None, max_products=None,
min_energy=None, max_path_conversion=None):
'''
Initialize the set of catabolic reactions.
Catabolic reactions break down large molecules into smaller molecules.
If the total free energy of products is lower than that of the
substrate, energy molecules will also be produced in the reaction until
reaction is (as) balanced (as possible).
Parameters
----------
nr_catabolic : int
number of reactions to generate
max_products : int
maximum number of species produced in the reaction, excluding energy
min_energy : int
minimum amount of energy produced as energy molecules
max_path_conversion : int
maximum nr of reactions producing any molecule species
Returns
-------
list of :class:`VirtualMicrobes.event.Reaction.Convert` reactions
'''
if nr_catabolic is None:
nr_catabolic = self.params.nr_cat_reactions
if max_products is None:
max_products = self.params.max_nr_cat_products
if min_energy is None:
min_energy = self.params.min_cat_energy
if max_path_conversion is None:
max_path_conversion = self.params.max_cat_path_conv
resource_classes, energy_classes = self.resource_classes.values(), self.energy_classes.values()
catabolic_reaction_schemes = OrderedSet()
tries = 0
max_tries = 1000
prod_counts = collections.Counter()
while len(catabolic_reaction_schemes) < nr_catabolic and tries < max_tries:
if max_path_conversion is not None:
product_classes = sorted([ p for p in resource_classes if prod_counts[p] < max_path_conversion ])
else:
product_classes = resource_classes[:]
if not len(product_classes):
break
reac_scheme = self.rand_catabolic_reaction_scheme(substrate_classes=resource_classes,
product_classes=product_classes,
energy_classes=energy_classes,
max_products=max_products,
min_energy=min_energy)
if reac_scheme: # check that have a non-empty scheme
_r, products,_s = reac_scheme
allowed = True
if max_path_conversion is not None:
# check if the products of this reaction don't violate the maximum pathway conversion constraint.
for p in products:
count = prod_counts[p]
if not p.is_energy and count >= max_path_conversion:
allowed = False
break
if allowed:
for p in products:
prod_counts[p] += 1
catabolic_reaction_schemes.add(reac_scheme)
tries += 1
if tries == max_tries:
warnings.warn('Maximum number of tries to create {} catabolic reactions. Created a total of {}.'.format(nr_catabolic,
len(catabolic_reaction_schemes)))
return [ Convert(*reac_scheme) for reac_scheme in catabolic_reaction_schemes ]
[docs] def init_anabolic_reactions(self, nr_anabolic=None, nr_reactants=None,
max_products=None, max_path_conversion=None,
max_free_energy=None):
'''
Initialize the set of anabolic reactions.
Anabolic reactions combine small molecules into larger molecules. If the
total free energy of product(s) is larger than that of the substrates,
energy molecules will also be consumed in the reaction until reaction is
(as) balanced (as possible).
Parameters
----------
nr_catabolic : int
number of reactions to generate
max_products : int
maximum number of species produced in the reaction
min_energy : int
minimum amount of energy produced as energy molecules
max_path_conversion : int
maximum nr of reactions producing any molecule species
max_free_energy : int
maximum free energy loss from reaction
Returns
-------
list of :class:`VirtualMicrobes.event.Reaction.Convert` reactions
'''
if nr_anabolic is None:
nr_anabolic = self.params.nr_ana_reactions
if nr_reactants is None:
nr_reactants = self.params.nr_ana_reactants
if max_products is None:
max_products = self.params.max_nr_ana_products
if max_path_conversion is None:
max_path_conversion = self.params.max_ana_path_conv
if max_free_energy is None:
max_free_energy = self.params.max_ana_free_energy
resource_classes, energy_classes = self.resource_classes.values(), self.energy_classes.values()
anabolic_reaction_schemes = OrderedSet()
tries = 0
max_tries = 1000
prod_counts = collections.Counter()
while len(anabolic_reaction_schemes) < nr_anabolic and tries < max_tries:
if max_path_conversion is not None:
product_classes = sorted([ p for p in resource_classes if prod_counts[p] < max_path_conversion])
else:
product_classes = resource_classes[:]
if not len(product_classes):
break
reac_scheme = self.rand_anabolic_reaction_scheme(resource_classes=resource_classes,
product_classes=product_classes,
energy_classes=energy_classes,
nr_reactants=nr_reactants,
max_products=max_products,
max_free_energy=max_free_energy)
if reac_scheme:
_r, products,_s = reac_scheme
allowed = True
if max_path_conversion is not None:
# check if the products of this reaction don't violate the maximum pathway conversion constraint.
for p in products:
count = prod_counts[p]
if not p.is_energy and count >= max_path_conversion:
allowed = False
break
if allowed:
for p in products:
prod_counts[p] += 1
anabolic_reaction_schemes.add(reac_scheme)
tries += 1
if tries == max_tries:
warnings.warn('Maximum number of tries to create {} anabolic reactions. Created a total of {}.'.format(nr_anabolic,
len(anabolic_reaction_schemes)))
return [ Convert(*reac_scheme) for reac_scheme in anabolic_reaction_schemes ]
[docs] def init_reaction_universe(self):
cat_reactions = self.init_catabolic_reactions()
if not len(cat_reactions):
raise Exception('Could not initialize catabolic reactions. Try again'
' with different reaction parameters.')
ana_reactions = self.init_anabolic_reactions()
if not len(ana_reactions):
raise Exception('Could not initialize anabolic reactions. Try again'
' with different reaction parameters.')
self.conversions = cat_reactions + ana_reactions
return self.conversions
[docs] def init_class_conversions(self, fraction_reactions):
util.within_range(fraction_reactions, (0., 1.))
class_conversions = []
metabolite_ene_class_combis = list(it.product(self.resource_classes.values(), self.energy_classes.values()))
class_convert_combis = []
for meta_class, ene_class in metabolite_ene_class_combis:
meta_combis = it.permutations(meta_class.molecules.values(), 2)
for sub,prod in meta_combis:
class_convert_combis.append((sub, ene_class, prod))
nr_reactions = int(len(class_convert_combis) * fraction_reactions)
for sub, ene_class, prod in self.env_rand.sample(class_convert_combis, nr_reactions):
cc = ClassConvert(sub, ene_class, prod)
class_conversions.append(cc)
return class_conversions
[docs] def init_diffusion(self):
self.diffusion_reactions = collections.OrderedDict()
for mc in self.molecule_classes:
for m in mc.molecules.values():
self.diffusion_reactions[m] = Diffusion(m)
[docs] def init_degradation(self):
self.degradation_reactions = collections.OrderedDict()
for mc in self.molecule_classes:
for m in mc.molecules.values():
self.degradation_reactions[m] = Degradation(m)
[docs] def init_microfluid_cycle(self):
'''
Sets up a list of dictionaries to iterate cycles of fixed external concentrations
'''
mols = self.params.microfluid['metabolites'].split(',')
concsets = [i.split(' ') for i in self.params.microfluid['concs'].split(',')]
self.microfluid_list = list()
self.microfluid_cycle_length = int(self.params.microfluid['cyclelength'])
self.microfluid_num_cycles = len(concsets)
for i,set in enumerate(concsets):
concdict = dict() #unordered OK
print '\nSetting up cycle ' + str(i+1) + ' for microfluid device.'
concdict_set = dict(zip(mols,set))
for mol in self.localities[0].small_mols:
if mol.name in mols:
print '\033[92mSetting conc of ' + mol.name + ' to ' + concdict_set[mol.name]
concdict[mol] = float(concdict_set[mol.name])
else:
print '\033[93mDefaulting ' + mol.name + ' to 10e-20\033[0m'
concdict[mol] = float(10e-20)
self.microfluid_list.append(concdict)
self.microfluid_current_cycle = 0 # nth cycle, keeps looping if you want
[docs] def init_influx(self):
self.influx_reactions = collections.OrderedDict()
for mc in self.molecule_classes:
for m in mc.molecules.values():
self.influx_reactions[m.paired] = Influx(m.paired)
[docs] def init_transports(self):
util.within_range(self.params.fraction_mol_transports, (0., 1.))
self.transports = []
all_transports = list(it.product(self.resource_classes.values(), self.energy_classes.values()))
nr_transports = int(len(all_transports) * self.params.fraction_mol_transports)
for res,ene in self.env_rand.sample( all_transports, nr_transports):
cost = self.env_rand.randint(*self.params.transport_cost_range)
self.transports.append(Transport(res, ene, 1,cost)) # Sub stoi defaults to 1
[docs] def init_mols_to_reactions(self):
'''
Create dictionaries of reactions, keyed on the molecules procuced and
consumed in the reactions.
'''
class_convert_dict = OrderedDefaultdict(list)
conversion_dict = {'produced':OrderedDefaultdict(list),
'consumed': OrderedDefaultdict(list)}
for c in self.conversions:
if isinstance(c, ClassConvert):
class_convert_dict[c.substrate.mol_class].append(c)
else:
for reac_class in c.reactants:
conversion_dict['consumed'][reac_class].append(c)
for prod_class in c.products:
conversion_dict['produced'][prod_class].append(c)
transport_dict = OrderedDefaultdict(list)
for t in self.transports:
transport_dict[t.substrate_class].append(t)
self.mols_to_reactions_dict = {'class_convert': class_convert_dict,
'conversion': conversion_dict,
'transport': transport_dict}
@property
def external_molecules(self):
return [ mol.paired for mol in self.internal_molecules ]
@property
def reactions_dict(self):
return dict([('import',self.transports), ('diffusion', self.diffusion_reactions),
('conversion', self.conversions), ('degradation', self.degradation_reactions),
('influx', self.influx_reactions)])
@property
def mols_per_class_dict(self):
d = util.OrderedDefaultdict(list)
for mol in self.internal_molecules:
d[mol.mol_class].append(mol)
return d
[docs] def clear_mol_time_courses(self): #TODO translate to grid function (func_on_grid)
for l in self.localities:
l.clear_mol_time_courses()
[docs] def resize_time_courses(self, new_max_time_points):
for l in self.localities:
l.resize_time_courses(new_max_time_points)
[docs] def set_mol_concentrations_from_time_point(self): #TODO translate to grid function (func_on_grid)
for l in self.localities:
l.set_mol_concentrations_from_time_point()
[docs] def influx_change_range(self, param_space):
new_influx = 0
if param_space.base is not None:
new_influx = param_space.base ** self.env_rand.uniform(param_space.lower,
param_space.upper)
else:
new_influx = self.env_rand.uniform(param_space.lower,
param_space.upper)
return new_influx
[docs] def influx_change_gamma(self, influx, variance_fact, upper=None, lower=None):
if influx <= 0 or variance_fact <= 0:
return influx
theta = influx / variance_fact
new_influx = self.env_rand.gammavariate(variance_fact, theta)
if upper != None:
new_influx = min(new_influx, upper)
if lower != None:
new_influx = max(lower, new_influx)
return new_influx
[docs] def init_sub_envs(self, row_divs=None, col_divs=None, partial_influx=None, influx_combinations=None):
'''
Initialize sub-environments by dividing up the grid along rows and columns.
Within each subenvironment, influx can change independently. When partial_influx
is chosen between 0 and 1, influxed molecules will only appear in a fraction of
the subenvironments on the grid.
Parameters
----------
row_divs : int
nr of divisions on y-axis
col_divs : int
nr of divisions on x-axis
partial_influx : float
fraction of molecules that will be influxed in each sub-environment
'''
if row_divs is None:
row_divs = self.params.grid_sub_div.row
if col_divs is None:
col_divs = self.params.grid_sub_div.col
if partial_influx is None:
partial_influx = self.params.sub_env_part_influx
if influx_combinations is None:
influx_combinations = self.params.sub_env_influx_combinations
self.subenvs = []
subgrids = list(self.subgrids_gen(row_divs, col_divs))
for sg in subgrids:
influx_dict = OrderedDict( (m,inf) for m,inf in self.influx_dict.items() if inf is not None )
sub_env = util.SubEnv(sub_grid=sg, influx_dict=influx_dict)
self.subenvs.append(sub_env)
if partial_influx is not None:
self.sub_envs_partial_influx(partial_influx)
elif influx_combinations is not None:
if influx_combinations == 'richest_first':
self.sub_envs_influx_combinations(richest_first=True)
else:
self.sub_envs_influx_combinations(richest_first=False)
self._init_subenv_influx_rates() # Moved by Brem, does not belong where it was, and it messed up the architecture
[docs] def sub_envs_partial_influx(self, fract):
'''
Assign a subset of all influxed molecules per individual sub-environment.
Parameters
----------
fract : float
fraction of subenvironments where a molecule is fluxed in
'''
influxed_mols = [ mol for (mol,inf) in self.influx_dict.items() if inf is not None ]
influxed_mols_per_subenv = collections.defaultdict(list)
for mol in influxed_mols:
for i in self.env_rand.sample(range(len(self.subenvs)) , max(1, int(fract * len(self.subenvs))) ):
influxed_mols_per_subenv[i].append(mol)
for i,subenv in enumerate(self.subenvs[:]):
influx_dict = OrderedDict([ (mol, self.influx_dict[mol]) for mol in influxed_mols_per_subenv[i] ])
self.subenvs[i] = subenv._replace(influx_dict=influx_dict)
[docs] def sub_envs_influx_combinations(self, richest_first=True ):
'''
Assign a subset of all influxed molecules per individual sub-environment.
Parameters
----------
fract : float
fraction of subenvironments where a molecule is fluxed in
'''
influxed_mols = [ mol for (mol,inf) in self.influx_dict.items() if inf is not None ]
combination_lengths = range(len(influxed_mols) + 1)
if richest_first:
combination_lengths.reverse()
combinations = it.chain.from_iterable( it.combinations( influxed_mols, n) for n in combination_lengths )
for i, (subenv, combi) in enumerate(it.izip(self.subenvs[:], combinations)):
influx_dict = OrderedDict([ (mol, self.influx_dict[mol]) for mol in combi ])
self.subenvs[i] = subenv._replace(influx_dict=influx_dict)
[docs] def subgrids_gen(self, row_divs=1, col_divs=1):
'''
Generate partitions of the grid.
Partitions are constructed from the product of row-chunks and column-
chunks, such that the grid is divided (approximately) equally according
to the number of row- and column divisions.
Parameters
----------
row_divs : int
number of divisions in the row dimension
col_divs : int
number of divisions in the column dimension
Yields
------
iterable of '(row_nrs, col_nrs)' partitions of the grid
'''
def chunks(l, n):
'''Yield successive n-sized chunks from l.'''
for i in range(0, len(l), n):
yield l[i:i+n]
chunk_size = int(round(self.grid.rows/float(row_divs))) # round to nearest integer
row_splits = chunks(range(self.grid.rows), chunk_size)
chunk_size = int(round(self.grid.cols/float(col_divs)))
col_splits = chunks(range(self.grid.cols), chunk_size)
return it.product(row_splits, col_splits)
[docs] def fluctuate(self, time, p_fluct = None, influx_rows=None, influx_cols=None, mols=None, influx_dict=None):
'''
Influx fluctuation on the grid.
Sets new influx rates for influxed molecules depending on fluctuation
frequency. Influx rates may vary between different sub-environments.
Each sub-environment may define its own set of molecules that can be
fluxed in.
Parameters
----------
time : int
simulation time
p_fluct : float
fluctuation frequency per influxed molecule
influx_rows : iterable
a sequence of row indices on which molecules are exclusively influxed
influx_cols : iterable
a sequence of columnt indices on which molecules are exclusively influxed
mols : list of :class:`VirtualMicrobes.event.Molecule.Molecule`s
molecules for which new influx rates should be set
influx_dict : dict
mapping from :class:`VirtualMicrobes.event.Molecule.Molecule` to influx rate (float)
'''
if p_fluct is None:
if self.params.fluctuate_frequencies is not None: # (simulation) time depencent fluctuation frequency
p_fluct = 0.
for t, p in self.params.fluctuate_frequencies:
if t < time: # set the fluctation frequency that
p_fluct = p
else:
break
else:
return
if influx_rows is None:
influx_rows = self.params.influx_rows
if influx_cols is None:
influx_cols = self.params.influx_cols
if mols is None:
mols = self.influxed_mols
for sub_env in self.subenvs:
rows, cols = sub_env.sub_grid
if influx_rows is not None:
rows = [ row for row in rows if row in influx_rows ]
if influx_cols is not None:
cols = [ col for col in cols if col in influx_cols ]
_influx_dict = sub_env.influx_dict if influx_dict is None else influx_dict
self._fluctuate(p_fluct, rows, cols, influx_dict=_influx_dict)
def _fluctuate(self, p_fluct, rows, cols, influx_dict):
'''
Influx fluctuation in points in the grid.
See Also
--------
func:`fluctuate`
'''
for mol_ext in influx_dict:
if p_fluct is None or self.env_rand.uniform(0, 1.) < p_fluct:
if self.params.influx_variance_shape is not None:
influx_dict[mol_ext] = self.influx_change_gamma(influx_dict[mol_ext], self.params.influx_variance_shape)
elif self.params.influx_range is not None:
influx = influx_dict[mol_ext]
if self.params.fluctuate_extremes:
upper = pow(self.params.influx_range.base, self.params.influx_range.upper)
lower = pow(self.params.influx_range.base, self.params.influx_range.lower)
influx_dict[mol_ext] = upper if influx != upper else lower
else:
influx_dict[mol_ext] = self.influx_change_range(self.params.influx_range)
self.func_on_grid(lambda l: l.update_small_mol_influxes(influx_dict), rows, cols)
[docs] def microfluid_chemostat(self, run_time=None):
if run_time != 0 and run_time % self.microfluid_cycle_length == 0:
self.microfluid_current_cycle+=1
print 'Switching to cycle ' + str(self.microfluid_current_cycle+1)
currentcyc = self.microfluid_list[self.microfluid_current_cycle%self.microfluid_num_cycles]
print 'Microfluid cycle ' + str(self.microfluid_current_cycle+1)
for sub_env in self.subenvs:
rows, cols = sub_env.sub_grid
for mol in currentcyc:
sub_env.influx_dict[mol] = currentcyc[mol]
self.func_on_grid(lambda l: l.update_small_mol_influxes(currentcyc), rows, cols)
#for l in self.localities:
# for mol in currentcyc:
# l.set_small_mol_conc(mol,currentcyc[mol])
[docs] def func_on_grid(self, gp_func, rows=None, cols=None):
'''
Apply a function to a set of grid points.
The function `gp_func` should take a
:class:`VirtualMicrobes.environment.Environment.Locality` as argument. A subset of grid
points are selected by using rows and cols lists. If both rows and cols
are given, select gps as a mesh of intersecting rows and cols.
Parameters
----------
gp_func : function
function on :class:`VirtualMicrobes.environment.Environment.Locality`
rows : iterable
sequence of indices for grid row selection
cols : iterable
sequence of indices for grid column selection
'''
gp_iter = None
if rows is not None and cols is not None:
gp_iter = self.grid.mesh_iter(rows, cols)
elif rows is not None:
gp_iter = self.grid.rows_iter(rows)
elif cols is not None:
gp_iter = self.grid.cols_iter(cols)
else:
gp_iter = self.grid.gp_iter
for gp in gp_iter:
gp_func(gp.content)
[docs] def energy_precursors(self):
'''
Return set of metabolites that are direct precursors for the energy class
molecules.
Construct this set by iterating over energy classes and map them to
reactions that produce the energy classes.
Returns
-------
:class:OrderedSet
'''
energy_precursors = OrderedSet()
for e in self.energy_classes.values():
for reac in self.mols_to_reactions_dict['conversion']['produced'][e]:
for sub_class in reac.reactants:
for sub in sub_class.molecules.values():
energy_precursors.add(sub)
return energy_precursors
[docs] def init_influxed_mols(self, fraction_influx=None):
'''
Select molecules that will be fluxed in in the external environment.
'''
if fraction_influx is None:
fraction_influx = self.params.fraction_influx
fixed = []
if self.params.influx_energy_precursor:
fixed = [ mol.paired for mol in self.energy_precursors() ]
if fraction_influx is None:
fraction_influx = self.params.fraction_influx
if isinstance(fraction_influx, float):
util.within_range(fraction_influx, (0., 1.))
influxed_mols = [mol for mol in self.external_molecules if mol not in fixed]
if not self.params.energy_influx:
influxed_mols = filter(lambda x: not x.is_energy, influxed_mols)
if not self.params.building_block_influx:
influxed_mols = filter(lambda x: not x.is_building_block, influxed_mols)
if not self.params.bb_class_influx:
influxed_mols = filter(lambda x: not x.mol_class.has_building_block, influxed_mols)
nr = min(len(influxed_mols), fraction_influx) if isinstance(fraction_influx, int) else int(len(influxed_mols)*fraction_influx)
if self.params.prioritize_energy_rich_influx:
influxed_mols = sorted(influxed_mols, key=lambda x: x.energy_level, reverse = True)
influxed_mols = influxed_mols[:nr]
elif self.params.prioritize_energy_poor_influx:
influxed_mols = sorted(influxed_mols, key=lambda x: x.energy_level)
influxed_mols = influxed_mols[:nr]
else:
influxed_mols = self.env_rand.sample(influxed_mols, nr)
self.influxed_mols = influxed_mols + fixed
[docs] def init_global_influx_dict(self, influx=None):
'''
Initialize a global dictionary for influx rates of molecules in the
external environment.
'''
def expected_value(base, domain):
'''
calculates the expected value of a log-uniformly distributed random
variable with *base* and *domain* .
This is the mean of the integral of base^x with U(x) a uniform
distribution on the given domain.
'''
m,n = domain
if m > n:
m,n = n,m
elif m == n:
return base ** m
return 1./(n - m) * ( base ** n / math.log(base) - base ** m / math.log(base) )
if influx is None:
influx = 1e-20
if self.params.influx:
influx = self.params.influx
elif self.params.influx_range:
influx = expected_value(self.params.influx_range.base, (self.params.influx_range.lower, self.params.influx_range.upper))
self.influx_dict = OrderedDict()
for mol_ext in self.external_molecules:
if mol_ext in self.influxed_mols:
self.influx_dict[mol_ext] = influx
else:
self.influx_dict[mol_ext] = None
[docs] def init_degradation_dict(self, degr_const=None, ene_degr_const=None,
bb_degr_const=None,degradation_variance_shape=None):
'''
Initialize a global dictionary for degradation rates of molecules in the
external environment.
'''
if degr_const is None:
degr_const = self.params.small_mol_ext_degr_const
if ene_degr_const is None:
ene_degr_const = self.params.ene_ext_degr_const
if bb_degr_const is None:
bb_degr_const = self.params.bb_ext_degr_const
if degradation_variance_shape is None:
degradation_variance_shape = self.params.degradation_variance_shape
self.degradation_dict = OrderedDict()
for mol_ext in self.external_molecules:
if mol_ext.is_energy and ene_degr_const is not None:
self.degradation_dict[mol_ext] = ene_degr_const
elif mol_ext.is_building_block and bb_degr_const is not None:
self.degradation_dict[mol_ext] = bb_degr_const
else:
if degradation_variance_shape is not None:
theta = degr_const / degradation_variance_shape
self.degradation_dict[mol_ext] = self.env_rand.gammavariate(degradation_variance_shape, theta)
else:
self.degradation_dict[mol_ext] = degr_const
[docs] def init_membrane_diffusion_dict(self, diff_const=None, ene_diff_const=None,
energy_proportional=None,
diff_scaling_func=lambda c, e: c / (e**0.7)):
if diff_const is None:
diff_const = self.params.small_mol_diff_const
if ene_diff_const is None:
ene_diff_const = self.params.ene_diff_const
if energy_proportional is None:
energy_proportional = self.params.diff_energy_prop
self.membrane_diffusion_dict = OrderedDict()
for mol_ext in self.external_molecules:
diff = diff_const
if mol_ext.is_energy and ene_diff_const is not None:
diff = ene_diff_const
if energy_proportional:
self.membrane_diffusion_dict[mol_ext] = diff_scaling_func(diff, mol_ext.energy_level)
else:
self.membrane_diffusion_dict[mol_ext] = diff
[docs] def start_concentration_dict(self, init_conc=None, init_conc_dict=None,
no_influx_conc=1e-20):
'''
Make dictionary of start concentrations for external molecules.
If the init_conc is not given, start with a concentration that is the
equilibrium value based on influx and degradation rates of the
metabolite.
:param init_conc:
'''
if init_conc_dict is None:
init_conc_dict = self.params.init_external_conc_vals
concentration_dict = OrderedDict()
def equilibrium_conc(mol):
if(self.degradation_dict[mol] == 0.0):
return 1.0
influx_points = 1.
if self.params.influx_rows is not None:
influx_points *= len(self.params.influx_rows)
else:
influx_points *= self.params.grid_rows
if self.params.influx_cols is not None:
influx_points *= len(self.params.influx_cols)
else:
influx_points *= self.params.grid_cols
grid_influx_scale_fact = influx_points / float(len(self.grid))
return grid_influx_scale_fact * self.influx_dict[mol] / self.degradation_dict[mol]
for mol_ext in self.external_molecules:
if mol_ext in self.influxed_mols:
concentration_dict[mol_ext] = equilibrium_conc(mol_ext)
if init_conc_dict is not None:
try:
concentration_dict[mol_ext] = init_conc_dict[mol_ext.name]
except KeyError:
pass
else:
concentration_dict[mol_ext] = no_influx_conc
print 'setting', mol_ext, ' concentration to', concentration_dict[mol_ext]
return concentration_dict
[docs] def reset_grid_influx(self):
'''
Reinitialize the global infux dict and the per sub environment influx
rates.
'''
print 'resetting grid influx'
self.init_global_influx_dict()
self._init_subenv_influx_rates()
def _init_subenv_influx_rates(self):
'''
Set influx rates per molecule for each sub-environment.
'''
influx_rows, influx_cols = self.params.influx_rows, self.params.influx_cols
for sub_env in self.subenvs:
rows, cols = sub_env.sub_grid
if influx_rows is not None:
rows = [ row for row in rows if row in influx_rows ]
if influx_cols is not None:
cols = [ col for col in cols if col in influx_cols ]
for mol in sub_env.influx_dict:
sub_env.influx_dict[mol] = self.influx_dict[mol]
self.func_on_grid(lambda l: l.update_small_mol_influxes(sub_env.influx_dict), rows, cols)
[docs] def reset_grid_concentrations(self, conc=None):
if conc is None:
conc = self.params.init_external_conc
print '\nResetting environmental concentrations'
concentration_dict = self.start_concentration_dict(conc)
self.func_on_grid(lambda l: l.update_small_mol_concentrations(concentration_dict))
[docs] def init_external_mol_vals_on_grid(self, init_conc=None):
'''
Initialize concentrations, degradation and influx rates of molecules on
the grid.
'''
if init_conc is None:
init_conc = self.params.init_external_conc
concentration_dict = self.start_concentration_dict(init_conc)
self.func_on_grid(lambda l: l.update_small_mol_degradation_rates(self.degradation_dict))
self.func_on_grid(lambda l: l.update_small_mol_concentrations(concentration_dict))
[docs] def init_grid(self, verbose=False):
'''
Initialize the spatial grid. Set wrapping and barriers on particular
neighborhoods.
'''
nr_grid_rows = self.params.grid_rows
nr_grid_cols = self.params.grid_cols
wrap_ew = self.params.wrap_ew
wrap_ns = self.params.wrap_ns
frac_horizontal_bar = self.params.frac_horizontal_bar
max_frac_horizontal = self.params.max_frac_horizontal
pos_horizontal_bars = self.params.pos_horizontal_bars
frac_vertical_bar = self.params.frac_vertical_bar
pos_vertical_bars = self.params.pos_vertical_bars
max_frac_vertical = self.params.max_frac_vertical
neighborhoods = self.params.barrier_neighborhoods
rand_gen = self.env_rand
util.within_range(frac_horizontal_bar, (0.,1.))
util.within_range(frac_vertical_bar, (0.,1.))
util.within_range(max_frac_horizontal, (0.,1.))
util.within_range(max_frac_vertical, (0.,1.))
self.grid = Grid.Grid(nr_grid_rows, nr_grid_cols,
nei_wrapped_ew=wrap_ew, nei_wrapped_ns=wrap_ns)
self.grid.grid_barriers(rand_gen, frac_horizontal_bar, pos_horizontal_bars, max_frac_horizontal,
frac_vertical_bar, pos_vertical_bars, max_frac_vertical,
neighborhoods)
if verbose:
print self.grid
# initialize the localities that will be coupled to the grid
self.init_localities(len(self.grid))
self.grid.fill_grid(self.localities)
return self.grid
[docs] def init_localities(self, number_localities, max_time_course_length=0, params_dict=None):
if params_dict is None:
params_dict = self.params
self.localities = []
for _ in range(number_localities):
l = Locality(params_dict,
internal_molecules=self.internal_molecules,
influx_reactions=self.influx_reactions,
degradation_reactions=self.degradation_reactions,
env_rand=self.env_rand,
max_time_course_length=max_time_course_length
)
self.localities.append(l)
self.set_tot_volume()
return self.localities
[docs] def update_localities(self):
for loc in self.localities:
loc.reset_time_courses()
[docs] def update_volume(self,volume=None):
if volume is None:
volume = self.params.per_grid_cell_volume
for l in self.localities:
l.volume = volume
self.set_tot_volume()
[docs] def set_tot_volume(self):
self.volume = sum( [ l.volume for l in self.localities ])
[docs] def update_cells_on_grid(self, cell_gp_dict):
for _c, gp in cell_gp_dict.items():
gp.updated = True
[docs] def add_new_cells_to_grid(self, cell_gp_dict):
for cell, gp in cell_gp_dict.items():
# In chemostat, first remove a cell IF the locality if it still contains a cell
if self.params.chemostat:
all_cells = gp.content.get_cells()
if(len(all_cells) > 0):
self.env_rand.shuffle(all_cells) # Shuffle to select a random cell
all_cells[0].marked_for_death = True
all_cells[0].wiped = True
# place the new cell
gp.content.add_cell(cell)
gp.updated = True
[docs] def clear_dead_cells_from_grid(self):
tot_removed = 0
for gp in self.grid.gp_iter:
dead_cells = gp.content._clear_dead_cells()
tot_removed += len(dead_cells)
if len(dead_cells) > 0:
gp.updated = True
[docs] def grid_toggle_update(self, updated=False):
self.grid.toggle_gps_updated(updated)
[docs] def map_variables(self):
for gp in self.grid.gp_iter:
if gp.updated:
gp.content.map_variables()
[docs] def grid_data_from_func(self, func):
grid_array = self.grid.as_numpy_array
vfunc = np.vectorize(func)
return vfunc(grid_array)
[docs] def populate_localities(self,population):
print "Populating localities with", len(population.cells), "cells."
cells = list(population.cells)
shuff = self.params.init_mixed
if shuff:
print "Mixing initial population of cells."
self.env_rand.shuffle(cells)
while len(cells):
gps = list(self.grid.gp_iter)
if shuff:
self.env_rand.shuffle(gps)
for gp in gps:
gp.content.add_cell(cells.pop())
gp.updated = True
if not len(cells):
break
# Deprecated? Commented in version 0.2.8
# def repopulate_localities(self,population,cells,mixed=False):
# population.new_offspring += cells
# population.wipe_pop(1.0,0)
# self.clear_dead_cells_from_grid()
#
# print "Repopulating localities with", len(cells), "cells."
#
# while len(cells):
# gps = list(self.grid.gp_iter)
# if mixed:
# self.env_rand.shuffle(gps)
# for gp in gps:
# gp.updated = True
# if len(cells) > 0:
# newcell = cells.pop()
# newcell.alive = True
# gp.content.add_cell(newcell)
# population.add_cell(newcell)
#
# population.init_current_ancestors()
# population.init_roots()
# population.update_phylogeny()
#
# print 'Population size now', population.current_pop_size
#
# assert len(population.cells) == sum( map(lambda gp: len(gp.content.cells), self.grid.gp_iter))
[docs] def expression_grid_data_dict(self):
'''
Return dictionary of spatial concentration data per metabolite (within cells)
'''
e_data_dict = dict() # NOTE: unordered ok
for rea in self.transports:
data = self.grid_data_from_func(lambda x: x.content.get_expression_level(rea, False))
key = 'import pump '+str(rea.stoichiometry[0])+str(rea.energy_source_class)+'->'+str(rea.stoichiometry[1])+str(rea.substrate_class)
e_data_dict[key] = data
for rea in self.transports:
data = self.grid_data_from_func(lambda x: x.content.get_expression_level(rea, True))
key = 'export pump '+str(rea.stoichiometry[0])+str(rea.energy_source_class)+'->'+str(rea.stoichiometry[1])+str(rea.substrate_class)
e_data_dict[key] = data
for rea in self.conversions:
data = self.grid_data_from_func(lambda x: x.content.get_expression_level(rea))
short_rea = rea.short_repr()
#if(len(short_rea)>30): short_rea = short_rea[:30] + '...'
e_data_dict['conversion ' + short_rea] = data
return e_data_dict
[docs] def population_grid_data_dict(self, marker_names, marker_select_func):
'''Return a dictionary of spatial distribution of values per marker
:param marker_names: markers to probe on the grid
:param marker_select_func: how to select a marker value
when there are multiple individuals with different values per grid
'''
marker_data_dict = dict() # NOTE: unordered ok
for marker_name in marker_names:
data = self.grid_data_from_func(lambda x: marker_select_func(marker_name, x.content.get_cells()) )
marker_data_dict[marker_name] = data
return marker_data_dict
[docs] def population_grid_neighborhood_data(self, neighborhood_pop_func, neighborhood='competition'):
return self.grid_data_from_func(lambda x: neighborhood_pop_func(sum([ n.get_cells() for n in x.neighbors(neighborhood) ] ,[] )))
[docs] def population_grid_data(self, data_func):
with warnings.catch_warnings():
warnings.simplefilter('ignore')
data = self.grid_data_from_func(lambda x: data_func(x.content.get_cells()))
return data
[docs] def perfect_mix(self, verbose=False):
'''Perfect mix shuffles all gps'''
self.grid.perfect_mix(self.env_rand)
self.grid_toggle_update(True)
[docs] def cells_grid_diffusion(self, diff_rate=None):
'''
Cell diffusion on the grid.
diff_rate : float
Diffusion rate of cells to neighboring grid points.
'''
if diff_rate is None:
diff_rate = self.params.cells_grid_diffusion
before = sum ( map(lambda l: len(l.cells), self.grid.content_iter))
for gp in self.grid.gp_iter:
for c in gp.content.get_cells():
if self.env_rand.uniform(0, 1.) < diff_rate:
rn = gp.random_neighbor('diffusion', self.env_rand)
if rn is None:
continue
gp.content.remove_cell(c)
gp.updated = True
rn.content.add_cell(c)
rn.updated = True
assert before == sum (map(lambda l: len(l.cells), self.grid.content_iter))
[docs] def reset_locality_updates(self):
for l in self.localities:
l.new_concentrations = False
[docs] def print_state(self):
for i, l in enumerate(self.localities):
print 'locality', i
print 'CONCENTRATION'
for m, conc in l.get_mol_concentration_dict().items():
print '\t', m, conc
print
print 'INFLUX'
for m, inf in l.get_mol_influx_dict().items():
print "\t", m, inf[1]
print
def __setitem__(self,key,value):
self.params[key] = value
def __getitem__(self,key):
return self.params[key]
[docs] def upgrade(self):
'''
Upgrading from older pickled version of class to latest version. Version
information is saved as class variable and should be updated when class
invariants (e.g. fields) are added.
'''
version = float(self.version)
if version < 1.:
self.init_membrane_diffusion_dict()
if version < 1.1:
self.init_sub_envs(1,1,1.)
self.version = self.class_version
if version > float(self.class_version):
print 'upgraded class',
else:
print 'reset class',
print self.__class__.__name__, ' from version', version ,'to version', self.version
def __getstate__(self):
odict = self.__dict__.copy()
return odict
def __setstate__(self, d):
self.__dict__ = d
if not hasattr(self, 'version'):
self.version = '0.0'
if self.version != self.class_version:
self.upgrade()