Source code for commonnexus.tools.combine

"""
Combine data from multiple NEXUS files and put it in a new one.

The following blocks can be handled:

 - TAXA: Taxa are identified across NEXUS files based on label (not number).
 - CHARACTERS/DATA: Characters are aggregated across NEXUS files (with character labels prefixed,
   for disambiguation).
 - TREES: Trees are (translated and) aggregated across NEXUS files.

"""
import collections

from commonnexus import Nexus
from commonnexus.blocks import Taxa, Trees, Characters
from commonnexus.blocks.characters import StateMatrix

SUPPORTED_BLOCKS = {'TAXA', 'CHARACTERS', 'DATA', 'TREES'}


[docs]def combine(*nexus: Nexus, **kw) -> Nexus: """ :param nexus: `Nexus` objects to be combined. :return: A new `Nexus` object with the combined data. """ # Make sure we are only dealing with blocks (and datatypes) that we know how to handle. for nex in nexus: if not kw.get('drop_unsupported', False): assert set(nex.blocks).issubset(SUPPORTED_BLOCKS), \ f"Only {SUPPORTED_BLOCKS} blocks are supported." for block in SUPPORTED_BLOCKS: assert len(nex.blocks.get(block, [])) <= 1 # Determine the superset of taxa, preserving the order in which they appear. seen = set() seen_add = seen.add taxa = [ taxon for nex in nexus for taxon in (nex.taxa or []) if not (taxon in seen or seen_add(taxon))] # Create a super-matrix, with all taxa and all characters. matrices, last_datatype = collections.OrderedDict(), None for i, nex in enumerate(nexus, start=1): if nex.characters: datatype = nex.characters.FORMAT.datatype if nex.characters.FORMAT else 'STANDARD' if last_datatype and last_datatype != datatype: raise ValueError( 'Only CHARACTER or DATA blocks of the same datatype can be combined!') last_datatype = datatype matrices[i] = nex.characters.get_matrix() matrix = merged_matrix(matrices, taxa) # Add all translated trees. trees = [] for i, nex in enumerate(nexus, start=1): if nex.TREES: for tree in nex.TREES.trees: nwk = nex.TREES.translate(tree) if nex.TREES.TRANSLATE else tree.newick trees.append((f'{i}.{tree.name}', nwk, tree.rooted)) nex = Nexus() if taxa: nex.append_block(Taxa.from_data(taxa)) if matrix: nex.append_block(Characters.from_data(matrix)) if trees: nex.append_block(Trees.from_data(*trees)) return nex
[docs]def merged_matrix( matrices: collections.OrderedDict[int, StateMatrix], taxa: list[str], ) -> StateMatrix: """Merge matrices.""" matrix = collections.OrderedDict() if matrices: for taxon in taxa: row = collections.OrderedDict() for i, m in matrices.items(): charlabels = list(list(m.values())[0].keys()) if taxon in m: for charlabel, val in m[taxon].items(): row['{}.{}'.format(i, charlabel)] = val # pylint: disable=C0209 else: for charlabel in charlabels: row['{}.{}'.format(i, charlabel)] = None # pylint: disable=C0209 matrix[taxon] = row return matrix