Source code for VirtualMicrobes.virtual_cell.PhyloUnit

import collections
import copy
import itertools

from Identifier import Identifier
import VirtualMicrobes.my_tools.utility as util
import itertools as it
import numpy as np

def _gen_id_func(x):
    return util.ugly_globals['UniqueGeneKey'].next()  # Modified November 2018 to have actually unique IDs :)

[docs]class AddInheritanceType(type): ''' A metaclass that can set a class instances base type to support phylogenetic linking The base type of the instantiated class can be either a PhyloBase or a PhyloUnit, depending on its _phylo_type class attribute. This enables an at run-time decision (via a program options) to fix the ancestry structure the class supports. PhyloBase instances keep references to neither parents nor children and hence do not need use a linker dict or unique_key generator. PhyloUnit does support ancestry. Phylogenetic linking is delegated to a global linker dict. ''' def __new__(cls, name, bases, attr): phylo_type_attr = '_'+name+'__phylo_type' if not bases and attr.has_key(phylo_type_attr): phylo_class = {'base':PhyloBase, 'ancestry':PhyloUnit}[attr[phylo_type_attr]] #print 'Setting base class of', name, 'to', phylo_class bases = bases + (phylo_class,) custom_cls = type.__new__(cls, name, bases, attr) return custom_cls
[docs]class PhyloBase(object): ''' Base class for all classes that can behave as phylogenetic units of inheritance. Phylogenetic Base units record their time of birth and death and have an identifier field that can indicate a relation to parents and offspring. PhyloBase object may come into existence when a mother cell gives rise to a daughter cell and all units of inheritance that it contains (i.e. when a genome get's copied), but also when a phylogenetic unit (such as a gene or chromosome) mutates and the ancestral version will be kept intact for analysis purposes. ''' __slots__ = ['id', 'time_birth', 'time_death', 'alive', 'marker_dict', '_unique_key'] def __init__(self, time_birth): self.id = Identifier(self, PhyloBase.__unique_id(self)) self.time_birth = time_birth self.time_death = None self.alive = True self.marker_dict = dict() # NOTE: unordered ok
[docs] def die(self, time): ''' Death of a phylo unit happens either when a cell dies and all its genetic material with it, or when a mutation gives rise to a new variant of the unit. :param time: Time of death in simulation time units. ''' self.alive = False self.time_death = time
[docs] def mark(self, marker, mark): ''' Set a marker on the phylo unit. :param marker: marker type :param mark: value ''' self.marker_dict[marker] = mark
[docs] def living_offspring(self): return []
[docs] def prune_dead_branch(self): ''' Return self to be removed from the global phylo linker dict if not alive. This is the degenerate base version of pruning. See the version in Phylo Unit for the case when units keep track of parent-child relationships. ''' pruned_phylo_units = set() if not self.alive: pruned_phylo_units.add(self) return pruned_phylo_units
#@classmethod def __unique_id(self, gen_id=None): """ create a unique identifier for each newly created genomic element. No new ids should be generated upon cell replication. We should be able to identify orthologous elements by comparing identifiers between cells. Within the identifier the type of genomic element can be encoded, eg: transporter, tf, enzyme etc. Ids can be generated by maintaining a counter within the derived classes of GEs created by duplication. :return : """ cls = self.__class__ if gen_id is None: gen_id = _gen_id_func cls.uid = gen_id(cls.uid) return cls.uid def _copy(self, time=None, new_id=True, flat_id=False): ''' Makes a (partial) deep copy of a PhyloUnit, when a PhyloUnit is mutated. In this way the pre- and post-mutation state of the PhyloUnit can be independently stored, enabling resurrection of the pre-mutation PhyloUnit. ''' cls = self.__class__ copied = cls.__new__(cls) copied.marker_dict = copy.copy(self.marker_dict) if new_id: copied.id = copy.deepcopy(self.id) copied.id.from_parent(self, flat=flat_id) else: copied.id = copy.copy(self.id) copied.alive = True copied.time_birth = time copied.time_death = None return copied def __getitem__(self, key): return self.marker_dict[key]
[docs]class PhyloUnit(PhyloBase): ''' Extended Base class for all classes that can be represented in phylogenies. These classes should support ancestor and child retrieval and setting time of birth and death. ''' __slots__ = ['parents', 'children', '_unique_key'] def __init__(self, time_birth): ''' Constructor ''' super(PhyloUnit, self).__init__(time_birth) self.set_unique_key() self.init_phylo_dicts() self._register_phylo_unit()
[docs] def set_unique_key(self): ''' Generate a unique key that can be used for mapping in a global linker dict. ''' self._unique_key = str(util.ugly_globals['UniquePhyloKey'].next())
def _register_phylo_unit(self): ''' Register phylo unit in the global linker dict under its unique key. ''' partial_linker = util.ugly_globals['PhyloLinkerDict'] partial_linker[self._unique_key] = self
[docs] def init_phylo_dicts(self): self.parents = util.LinkThroughSequence() self.children = util.LinkThroughSequence()
[docs] def die(self, *args, **kwargs): super(PhyloUnit, self).die(*args, **kwargs)
[docs] def set_ancestor(self,ge): self.parents.append(ge) ge.children.append(self)
def _remove_child(self, child): i = 0 if child in self.children: #NOTE: child could have been removed during previous passage self.children.remove(child) i = i + 1 def _remove_parent(self, parent): if parent in self.parents: #NOTE: parent could have been removed during previous passage self.parents.remove(parent)
[docs] def living_offspring(self): ''' Returns a list of all offspring of this phylo unit that are currently alive. ''' offspring = [ c for c in self.children if c.alive ] for o in offspring[:]: offspring += o.living_offspring() return offspring
[docs] def has_living_offspring(self, exclude_set=set(), depth=1): ''' Returns True if any of the phylo units descendants are alive ''' offspring_alive = False #print depth myset = self.children #for c in self.children: # print 'child' # print c.id #for c in exclude_set: #print 'set' #print c.id for child in set(self.children): #print 'hh' if child.alive or child.has_living_offspring(depth=depth+1): #print 'ca' offspring_alive = True break #print 'exit ha' return offspring_alive
[docs] def lod_down_single(self): ''' Proceed down a single branch on the line of descent until there is a branch point or terminal node. ''' branch = [] node = self while node is not None: branch.append(node) if len(node.children) > 1 or len(node.children) < 1: return node, branch else: node = list(node.children)[0] # Convert to list because indexing is not working (yet) for
# the LinktroughSequence
[docs] def lod_up_single(self): ''' Proceed up a single branch on the line of descent until there is a branch point or terminal node. ''' branch = [] node = self while node is not None: branch.append(node) if len(node.parents) > 1 or len(node.parents) < 1: return node, branch else: node = list(node.parents)[0]
[docs] def lods_down(self): ''' Composes all the lines of descent leading down (forward in time) from phylo unit in a non- recursive way (compare lods_up). ''' lods = collections.deque([([],self)]) node = self while len(lods) > 0: lod, node = lods.popleft() branch_point, branch = node.lod_down_single() if len(branch_point.children) < 1: yield itertools.chain(lod, branch) else: for c in branch_point.children: lods.appendleft( (itertools.chain(lod, branch + [c] ), c) )
[docs] def lods_up(self): ''' Composes all the lines of descent leading up (back in time) from phylo unit (compare lods_down). ''' lods = collections.deque([([],self)]) while len(lods) > 0: lod, node = lods.pop() branch_point, branch = node.lod_up_single() if len(branch_point.parents) < 1: yield itertools.chain(reversed(branch), lod) else: for p in branch_point.parents: lods.appendleft( (itertools.chain(reversed(branch + [p]), lod), p) )
[docs] def parent_of(self, phylo_unit): """ Return whether this PhyloUnit is the parent of another PhyloUnit. phylo_unit : `PhyloUnit` """ for lod in self.lods_up(): for pu in lod: if pu is phylo_unit: return True return False
[docs] def child_of(self, phylo_unit): """ Return whether this PhyloUnit is the child of another PhyloUnit. phylo_unit : `PhyloUnit` """ for lod in self.lods_down(): for pu in lod: if pu is phylo_unit: return True return False
[docs] def common_ancestors(self, phylo_unit): my_ancestors = self.parents his_ancestors = phylo_unit.parents common_ancestors = set() if not len(my_ancestors) or not len(his_ancestors): return common_ancestors while True: my_ancestors = sorted(my_ancestors, key=lambda p: p.time_birth) his_ancestors = sorted(my_ancestors, key=lambda p: p.time_birth) our_common_ancestors = set(my_ancestors) & set(his_ancestors) if len(our_common_ancestors): common_ancestors | our_common_ancestors my_ancestors = filter(lambda anc: anc not in our_common_ancestors, my_ancestors) his_ancestors = filter(lambda anc: anc not in our_common_ancestors, his_ancestors) if not len(my_ancestors) or not len(his_ancestors): break if my_ancestors[-1].time_birth >= his_ancestors[-1]: most_recent = my_ancestors.pop() my_ancestors += most_recent.parents else: most_recent = his_ancestors.pop() his_ancestors += most_recent.parents return common_ancestors
def _detach_phylo_node(self): ''' Remove the phylo unit from the offspring and parents list of its parents and offspring respectively. ''' for p in self.parents: self._remove_parent(p) p._remove_child(self) for c in self.children: self._remove_child(c) c._remove_parent(self) def _remove_from_linker_dict(self): ''' Remove the phylo unit from the global linker dict. Any further referencing using this units unique key will result in KeyError when it is looked up in the linker dict. ''' linker_dict = util.ugly_globals['PhyloLinkerDict'] del linker_dict[self._unique_key]
[docs] def prune_dead_branch(self, exclude_offspring_check_set=set(), depth=1): ''' Return a set of phylogenetically related units that represent a dead phylogenetic branch. Recursively checks for parent nodes whether the nodes descendants are all dead. In that case, the node can be pruned and its parents may additionally be checked for being part of the extended dead branch. The exclude is used to prevent superfluous checks of living offspring when it is already known that the current phylo_unit has no living_offspring. Parameters ---------- exclude_offspring_check_set : set of :class:`VirtualMicrobes.virtual_cell.PhyloUnit.PhyloUnit` ''' pruned_phylo_units = set() if not self.alive and not self.has_living_offspring(exclude_offspring_check_set): pruned_phylo_units.add(self) for p in self.parents: pruned_phylo_units.update(p.prune_dead_branch(set([self]),depth=depth+1)) self._detach_phylo_node() return pruned_phylo_units
def _copy(self, time=None, new_id=True, flat_id=False): ''' Makes a (partial) deep copy of a PhyloUnit, when a PhyloUnit is mutated. In this way the pre- and post-mutation state of the PhyloUnit can be independently stored, enabling resurrection of the pre-mutation PhyloUnit. ''' copied = super(PhyloUnit, self)._copy(time, new_id, flat_id) copied.set_unique_key() copied._register_phylo_unit() copied.init_phylo_dicts() copied.set_ancestor(self) return copied #http://stackoverflow.com/a/6720815/4293557 def __getstate__(self): odict = dict() # NOTE: unordered ok my_cls = self.__class__ all_slots =it.chain.from_iterable(getattr(cls, '__slots__', tuple()) for cls in my_cls.__mro__) for slot in set(all_slots): if not hasattr(self, slot): continue if slot in ['parents', 'children']: odict[slot] = getattr(self, slot)._pickle_repr() else: odict[slot] = getattr(self, slot) if hasattr(self, '__dict__'): odict.update(self.__dict__) return odict def __setstate__(self, state): for slot, value in state.items(): if slot in ['parents', 'children']: value = util.LinkThroughSequence._unpickle_repr(value) setattr(self, slot, value)