import collections
import copy
import itertools
from Identifier import Identifier
import VirtualMicrobes.my_tools.utility as util
import itertools as it
import numpy as np
def _gen_id_func(x):
return util.ugly_globals['UniqueGeneKey'].next() # Modified November 2018 to have actually unique IDs :)
[docs]class AddInheritanceType(type):
''' A metaclass that can set a class instances base type to support phylogenetic linking
The base type of the instantiated class can be either a PhyloBase or a PhyloUnit,
depending on its _phylo_type class attribute. This enables an at run-time decision
(via a program options) to fix the ancestry structure the class supports.
PhyloBase instances keep references to neither parents nor children and hence do not need use a
linker dict or unique_key generator. PhyloUnit does support ancestry. Phylogenetic linking
is delegated to a global linker dict.
'''
def __new__(cls, name, bases, attr):
phylo_type_attr = '_'+name+'__phylo_type'
if not bases and attr.has_key(phylo_type_attr):
phylo_class = {'base':PhyloBase, 'ancestry':PhyloUnit}[attr[phylo_type_attr]]
#print 'Setting base class of', name, 'to', phylo_class
bases = bases + (phylo_class,)
custom_cls = type.__new__(cls, name, bases, attr)
return custom_cls
[docs]class PhyloBase(object):
''' Base class for all classes that can behave as phylogenetic units of inheritance.
Phylogenetic Base units record their time of birth and death and have an identifier field
that can indicate a relation to parents and offspring. PhyloBase object may come into existence
when a mother cell gives rise to a daughter cell and all units of inheritance that it contains
(i.e. when a genome get's copied), but also when a phylogenetic unit (such as a gene or chromosome)
mutates and the ancestral version will be kept intact for analysis purposes.
'''
__slots__ = ['id', 'time_birth', 'time_death', 'alive', 'marker_dict', '_unique_key']
def __init__(self, time_birth):
self.id = Identifier(self, PhyloBase.__unique_id(self))
self.time_birth = time_birth
self.time_death = None
self.alive = True
self.marker_dict = dict() # NOTE: unordered ok
[docs] def die(self, time):
'''
Death of a phylo unit happens either when a cell dies and all its genetic material
with it, or when a mutation gives rise to a new variant of the unit.
:param time: Time of death in simulation time units.
'''
self.alive = False
self.time_death = time
[docs] def mark(self, marker, mark):
''' Set a marker on the phylo unit.
:param marker: marker type
:param mark: value
'''
self.marker_dict[marker] = mark
[docs] def living_offspring(self):
return []
[docs] def prune_dead_branch(self):
''' Return self to be removed from the global phylo linker dict if not alive.
This is the degenerate base version of pruning. See the version in Phylo Unit for
the case when units keep track of parent-child relationships.
'''
pruned_phylo_units = set()
if not self.alive:
pruned_phylo_units.add(self)
return pruned_phylo_units
#@classmethod
def __unique_id(self, gen_id=None):
"""
create a unique identifier for each newly created genomic element. No new ids
should be generated upon cell replication. We should be able to identify
orthologous elements by comparing identifiers between cells.
Within the identifier the type of genomic element can be encoded, eg:
transporter, tf, enzyme etc.
Ids can be generated by maintaining a counter within the derived classes of GEs
created by duplication.
:return :
"""
cls = self.__class__
if gen_id is None:
gen_id = _gen_id_func
cls.uid = gen_id(cls.uid)
return cls.uid
def _copy(self, time=None, new_id=True, flat_id=False):
'''
Makes a (partial) deep copy of a PhyloUnit, when a PhyloUnit is
mutated. In this way the pre- and post-mutation state of the PhyloUnit can be
independently stored, enabling resurrection of the pre-mutation PhyloUnit.
'''
cls = self.__class__
copied = cls.__new__(cls)
copied.marker_dict = copy.copy(self.marker_dict)
if new_id:
copied.id = copy.deepcopy(self.id)
copied.id.from_parent(self, flat=flat_id)
else:
copied.id = copy.copy(self.id)
copied.alive = True
copied.time_birth = time
copied.time_death = None
return copied
def __getitem__(self, key):
return self.marker_dict[key]
[docs]class PhyloUnit(PhyloBase):
'''
Extended Base class for all classes that can be represented in phylogenies.
These classes should support ancestor and child retrieval and setting time
of birth and death.
'''
__slots__ = ['parents', 'children', '_unique_key']
def __init__(self, time_birth):
'''
Constructor
'''
super(PhyloUnit, self).__init__(time_birth)
self.set_unique_key()
self.init_phylo_dicts()
self._register_phylo_unit()
[docs] def set_unique_key(self):
'''
Generate a unique key that can be used for mapping in a global linker dict.
'''
self._unique_key = str(util.ugly_globals['UniquePhyloKey'].next())
def _register_phylo_unit(self):
'''
Register phylo unit in the global linker dict under its unique key.
'''
partial_linker = util.ugly_globals['PhyloLinkerDict']
partial_linker[self._unique_key] = self
[docs] def init_phylo_dicts(self):
self.parents = util.LinkThroughSequence()
self.children = util.LinkThroughSequence()
[docs] def die(self, *args, **kwargs):
super(PhyloUnit, self).die(*args, **kwargs)
[docs] def set_ancestor(self,ge):
self.parents.append(ge)
ge.children.append(self)
def _remove_child(self, child):
i = 0
if child in self.children: #NOTE: child could have been removed during previous passage
self.children.remove(child)
i = i + 1
def _remove_parent(self, parent):
if parent in self.parents: #NOTE: parent could have been removed during previous passage
self.parents.remove(parent)
[docs] def living_offspring(self):
'''
Returns a list of all offspring of this phylo unit that are currently alive.
'''
offspring = [ c for c in self.children if c.alive ]
for o in offspring[:]:
offspring += o.living_offspring()
return offspring
[docs] def has_living_offspring(self, exclude_set=set(), depth=1):
'''
Returns True if any of the phylo units descendants are alive
'''
offspring_alive = False
#print depth
myset = self.children
#for c in self.children:
# print 'child'
# print c.id
#for c in exclude_set:
#print 'set'
#print c.id
for child in set(self.children):
#print 'hh'
if child.alive or child.has_living_offspring(depth=depth+1):
#print 'ca'
offspring_alive = True
break
#print 'exit ha'
return offspring_alive
[docs] def lod_down_single(self):
'''
Proceed down a single branch on the line of descent until there is a
branch point or terminal node.
'''
branch = []
node = self
while node is not None:
branch.append(node)
if len(node.children) > 1 or len(node.children) < 1:
return node, branch
else:
node = list(node.children)[0] # Convert to list because indexing is not working (yet) for
# the LinktroughSequence
[docs] def lod_up_single(self):
'''
Proceed up a single branch on the line of descent until there is a
branch point or terminal node.
'''
branch = []
node = self
while node is not None:
branch.append(node)
if len(node.parents) > 1 or len(node.parents) < 1:
return node, branch
else:
node = list(node.parents)[0]
[docs] def lods_down(self):
'''
Composes all the lines of descent leading down (forward in time) from
phylo unit in a non- recursive way (compare lods_up).
'''
lods = collections.deque([([],self)])
node = self
while len(lods) > 0:
lod, node = lods.popleft()
branch_point, branch = node.lod_down_single()
if len(branch_point.children) < 1:
yield itertools.chain(lod, branch)
else:
for c in branch_point.children:
lods.appendleft( (itertools.chain(lod, branch + [c] ), c) )
[docs] def lods_up(self):
'''
Composes all the lines of descent leading up (back in time) from phylo
unit (compare lods_down).
'''
lods = collections.deque([([],self)])
while len(lods) > 0:
lod, node = lods.pop()
branch_point, branch = node.lod_up_single()
if len(branch_point.parents) < 1:
yield itertools.chain(reversed(branch), lod)
else:
for p in branch_point.parents:
lods.appendleft( (itertools.chain(reversed(branch + [p]), lod), p) )
[docs] def parent_of(self, phylo_unit):
"""
Return whether this PhyloUnit is the parent of another PhyloUnit.
phylo_unit : `PhyloUnit`
"""
for lod in self.lods_up():
for pu in lod:
if pu is phylo_unit:
return True
return False
[docs] def child_of(self, phylo_unit):
"""
Return whether this PhyloUnit is the child of another PhyloUnit.
phylo_unit : `PhyloUnit`
"""
for lod in self.lods_down():
for pu in lod:
if pu is phylo_unit:
return True
return False
[docs] def common_ancestors(self, phylo_unit):
my_ancestors = self.parents
his_ancestors = phylo_unit.parents
common_ancestors = set()
if not len(my_ancestors) or not len(his_ancestors):
return common_ancestors
while True:
my_ancestors = sorted(my_ancestors, key=lambda p: p.time_birth)
his_ancestors = sorted(my_ancestors, key=lambda p: p.time_birth)
our_common_ancestors = set(my_ancestors) & set(his_ancestors)
if len(our_common_ancestors):
common_ancestors | our_common_ancestors
my_ancestors = filter(lambda anc: anc not in our_common_ancestors, my_ancestors)
his_ancestors = filter(lambda anc: anc not in our_common_ancestors, his_ancestors)
if not len(my_ancestors) or not len(his_ancestors):
break
if my_ancestors[-1].time_birth >= his_ancestors[-1]:
most_recent = my_ancestors.pop()
my_ancestors += most_recent.parents
else:
most_recent = his_ancestors.pop()
his_ancestors += most_recent.parents
return common_ancestors
def _detach_phylo_node(self):
'''
Remove the phylo unit from the offspring and parents list of its parents and
offspring respectively.
'''
for p in self.parents:
self._remove_parent(p)
p._remove_child(self)
for c in self.children:
self._remove_child(c)
c._remove_parent(self)
def _remove_from_linker_dict(self):
'''
Remove the phylo unit from the global linker dict. Any further referencing
using this units unique key will result in KeyError when it is looked up in
the linker dict.
'''
linker_dict = util.ugly_globals['PhyloLinkerDict']
del linker_dict[self._unique_key]
[docs] def prune_dead_branch(self, exclude_offspring_check_set=set(), depth=1):
'''
Return a set of phylogenetically related units that represent a dead
phylogenetic branch.
Recursively checks for parent nodes whether the nodes descendants are
all dead. In that case, the node can be pruned and its parents may
additionally be checked for being part of the extended dead branch. The
exclude is used to prevent superfluous checks of living offspring when
it is already known that the current phylo_unit has no living_offspring.
Parameters
----------
exclude_offspring_check_set : set of :class:`VirtualMicrobes.virtual_cell.PhyloUnit.PhyloUnit`
'''
pruned_phylo_units = set()
if not self.alive and not self.has_living_offspring(exclude_offspring_check_set):
pruned_phylo_units.add(self)
for p in self.parents:
pruned_phylo_units.update(p.prune_dead_branch(set([self]),depth=depth+1))
self._detach_phylo_node()
return pruned_phylo_units
def _copy(self, time=None, new_id=True, flat_id=False):
'''
Makes a (partial) deep copy of a PhyloUnit, when a PhyloUnit is
mutated. In this way the pre- and post-mutation state of the PhyloUnit can be
independently stored, enabling resurrection of the pre-mutation PhyloUnit.
'''
copied = super(PhyloUnit, self)._copy(time, new_id, flat_id)
copied.set_unique_key()
copied._register_phylo_unit()
copied.init_phylo_dicts()
copied.set_ancestor(self)
return copied
#http://stackoverflow.com/a/6720815/4293557
def __getstate__(self):
odict = dict() # NOTE: unordered ok
my_cls = self.__class__
all_slots =it.chain.from_iterable(getattr(cls, '__slots__', tuple()) for cls in my_cls.__mro__)
for slot in set(all_slots):
if not hasattr(self, slot):
continue
if slot in ['parents', 'children']:
odict[slot] = getattr(self, slot)._pickle_repr()
else:
odict[slot] = getattr(self, slot)
if hasattr(self, '__dict__'):
odict.update(self.__dict__)
return odict
def __setstate__(self, state):
for slot, value in state.items():
if slot in ['parents', 'children']:
value = util.LinkThroughSequence._unpickle_repr(value)
setattr(self, slot, value)