#########################################################################
# MacSyLib - Python library to detect macromolecular systems #
# in prokaryotes protein dataset using systems modelling #
# and similarity search. #
# #
# Authors: Sophie Abby, Bertrand Neron #
# Copyright (c) 2014-2025 Institut Pasteur (Paris) and CNRS. #
# See the COPYRIGHT file for details #
# #
# This file is part of MacSyLib package. #
# #
# MacSyLib is free software: you can redistribute it and/or modify #
# it under the terms of the GNU General Public License as published by #
# the Free Software Foundation, either version 3 of the License, or #
# (at your option) any later version. #
# #
# MacSyLib is distributed in the hope that it will be useful, #
# but WITHOUT ANY WARRANTY; without even the implied warranty of #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the #
# GNU General Public License for more details . #
# #
# You should have received a copy of the GNU General Public License #
# along with MacSyLib (COPYING). #
# If not, see <https://www.gnu.org/licenses/>. #
#########################################################################
from __future__ import annotations
import itertools
import logging
from collections import defaultdict, Counter
from operator import attrgetter
import macsylib.gene
from .error import MacsylibError
from .gene import ModelGene, GeneStatus
from .hit import Loner, LonerMultiSystem, get_best_hit_4_func, ModelHit, CoreHit, HitWeight
from .database import RepliconInfo
from .model import Model
_log = logging.getLogger(__name__)
"""
Module to build and manage Clusters of Hit
A cluster is a set of hits each of which hits less than inter_gene_max_space from its neighbor
"""
def _colocates(h1: ModelHit, h2: ModelHit, rep_info: RepliconInfo) -> bool:
"""
compute the distance (in number of gene between) between 2 hits
:param h1: the first hit to compute inter hit distance
:param h2: the second hit to compute inter hit distance
:return: True if the 2 hits spaced by lesser or equal genes than inter_gene_max_space.
Managed circularity.
"""
# compute the number of genes between h1 and h2
dist = h2.position - h1.position - 1
g1 = h1.gene_ref
g2 = h2.gene_ref
model = g1.model
d1 = g1.inter_gene_max_space
d2 = g2.inter_gene_max_space
if d1 is None and d2 is None:
inter_gene_max_space = model.inter_gene_max_space
elif d1 is None:
inter_gene_max_space = d2
elif d2 is None:
inter_gene_max_space = d1
else: # d1 and d2 are defined
inter_gene_max_space = min(d1, d2)
if 0 <= dist <= inter_gene_max_space:
return True
elif dist <= 0 and rep_info.topology == 'circular':
# h1 and h2 overlap the ori
dist = rep_info.max - h1.position + h2.position - rep_info.min
return dist <= inter_gene_max_space
return False
[docs]
def scaffold_to_cluster(cluster_scaffold: list[ModelHit], model: Model, hit_weights: HitWeight) -> Cluster:
"""
transform a list of ModelHit in a cluster if the hit colocalize and they are not all neutral
and they do not code for same gene
add the new cluster to the clusters
:param cluster_scaffold: model hit to transform in cluster
:param model: The model related to thus cluster
:param hit_weights: the hit weight to compute scores
:return: Cluster
"""
gene_types = {hit.gene_ref.name for hit in cluster_scaffold}
if len(gene_types) > 1:
if all([_hit.gene_ref.status == GeneStatus.NEUTRAL for _hit in cluster_scaffold]):
# contains different genes but all are neutral
# we do not consider a group of neutral as a cluster
_log.debug(f"{', '.join([h.id for h in cluster_scaffold])} "
f"is composed of only neutral. It's not a cluster.")
return None
else:
return Cluster(cluster_scaffold, model, hit_weights)
else:
cluster = Cluster(cluster_scaffold, model, hit_weights)
if cluster.loner:
# it's a group of one loner add as cluster
# it will be squashed at next step (_get_true_loners )
# the hit transformation in loner is performed at the end when circularity and merging is done
return cluster
elif model.min_genes_required == 1:
if cluster.hits[0].gene_ref.status == GeneStatus.NEUTRAL:
# even min_genes_required == 1
# a neutral alone is not a cluster
_log.debug(f"{', '.join([h.id for h in cluster_scaffold])} "
f"is composed of only neutral. It's not a cluster.")
return None
else:
return cluster
else:
_log.debug(f"({', '.join([h.id for h in cluster_scaffold])}) "
f"is composed of only type of gene {cluster_scaffold[0].gene_ref.name}. It's not a cluster.")
return None
[docs]
def clusterize_hits_on_distance_only(hits: list[ModelHit], model: Model, hit_weights: HitWeight, rep_info: RepliconInfo) -> list[Cluster]:
"""
clusterize hit regarding the distance between them
:param hits: the hits to clusterize
:param model: the model to consider
:param hit_weights: the hit weight to compute the score
:param rep_info: The information on the replicon
:return: the clusters
"""
clusters = []
cluster_scaffold = []
# sort hits by increasing position and then descending score
hits.sort(key=lambda h: (h.position, - h.score))
# remove duplicates hits (several hits for the same sequence),
# keep the first one, this with the best score
# position == sequence rank in replicon
hits = [next(group) for pos, group in itertools.groupby(hits, lambda h: h.position)]
if hits:
hit = hits[0]
cluster_scaffold.append(hit)
previous_hit = cluster_scaffold[0]
for m_hit in hits[1:]:
if _colocates(previous_hit, m_hit, rep_info):
cluster_scaffold.append(m_hit)
else:
# close the current scaffold
cluster = scaffold_to_cluster(cluster_scaffold, model, hit_weights)
if cluster is not None:
clusters.append(cluster)
# open new scaffold
cluster_scaffold = [m_hit]
previous_hit = m_hit
# close the last current cluster
cluster = scaffold_to_cluster(cluster_scaffold, model, hit_weights)
if cluster is not None:
clusters.append(cluster)
else:
# handle circularity
if rep_info.topology == 'circular':
# if there are clusters
# maybe the last hit in scaffold collocate with the first hit of the first cluster
if clusters and _colocates(cluster_scaffold[-1], clusters[0].hits[0], rep_info):
new_cluster = Cluster(cluster_scaffold, model, hit_weights)
clusters[0].merge(new_cluster, before=True)
elif _colocates(cluster_scaffold[-1], hits[0], rep_info):
# maybe it colocalize with the first hit (which was not a cluster)
cluster_scaffold.append(hits[0])
cluster = scaffold_to_cluster(cluster_scaffold, model, hit_weights)
if cluster is not None:
clusters.append(cluster)
# handle circularity
if rep_info.topology == 'circular' and len(clusters):
if _colocates(clusters[-1].hits[-1], clusters[0].hits[0], rep_info):
clusters[0].merge(clusters[-1], before=True)
clusters = clusters[:-1]
return clusters
[docs]
def is_a(hit: ModelHit | CoreHit, ref_hits: set[str]) -> bool:
"""
:param hit: The hit to check
:param ref_hits: the gene name of the reference hit
:return: True if the *hit* belong to the reference hits, False otherwise
"""
return hit.gene_ref.name in ref_hits
[docs]
def closest_hit(hit: ModelHit, ref_hits:list[ModelHit]) -> ModelHit:
"""
:param hit: the hit
:param ref_hits: The reference hits. the distance between *hit* and each *ref_hit*
will be computed. the closest *ref_hit* will be returned
:return: The closest *ref_hit* to the hit. If two *ref_hits* are equidistant form the *hit*
return those with the lowest position.
for isnstance::
position 40 20 60
closest_hit( ref_hit, [H1, H2]
will return *H1*
"""
ref_hits = sorted(ref_hits, key=attrgetter('position'))
closest_int = ref_hits[0]
closest_dist = abs(hit.position - closest_int.position)
for integrase in ref_hits[1:]:
distance = abs(hit.position - integrase.position)
if distance < closest_dist:
closest_dist = distance
closest_int = integrase
return closest_int
[docs]
def split_cluster_on_key_genes(key_genes: set[str], cluster: Cluster) -> list[Cluster]:
"""
split a Cluster containing several key genes to have one cluster per key genes, with their closest hits
For instance if a set of gene clusterize as following (we considering that all gene are 10 genea between next one::
positions 10 20 30 40 50 60 70
genes A KG1 B C D KG2 E
The resulting cluster after split around the 2 KG (key genes)::
c1 = [A, KG1, B, C], c2 = [D, KG2, E]
The question is for gene C which is equidistant from KG1 KG2
C will be clustered with the most left cluster
:param key_genes: the gene names which be seed for cluster
:param cluster: The cluster to split
:return:
"""
clusters = []
scaffolds = defaultdict(list)
key_gene_hits = []
not_key_genes_hits = []
for hit in cluster.hits:
if is_a(hit, key_genes):
key_gene_hits.append(hit)
else:
not_key_genes_hits.append(hit)
if not key_gene_hits:
return []
for hit in not_key_genes_hits:
closest_int = closest_hit(hit, key_gene_hits)
scaffolds[closest_int].append(hit)
for integrase, scaffold in scaffolds.items():
scaffold.append(integrase)
scaffold.sort(key=lambda h: h.position)
cluster = scaffold_to_cluster(scaffold, cluster.model, cluster.hit_weights)
clusters.append(cluster)
clusters.sort(key=lambda c: c.hits[0].position)
return clusters
[docs]
def clusterize_hits_around_key_genes(key_genes: set[str],
hits: list[ModelHit],
model: Model,
hit_weights: HitWeight,
rep_info: RepliconInfo) -> list[Cluster]:
"""
clusterize hit regarding the distance between them and around key_gene
:param hits: the hits to clusterize
:type hits: list of :class:`macsylib.model.ModelHit` objects
:param model: the model to consider
:type model: :class:`macsylib.model.Model` object
:param hit_weights: the hit weight to compute the score
:type hit_weights: :class:`macsylib.hit.HitWeight` object
:type rep_info: :class:`macsylib.Indexes.RepliconInfo` object
:return: the clusters
:rtype: list of :class:`macsylib.cluster.Cluster` objects.
"""
dist_cls = clusterize_hits_on_distance_only(hits, model, hit_weights, rep_info)
key_gene_clst = []
for clst in dist_cls:
key_gene_nb = sum([1 for hit in clst.hits if is_a(hit, key_genes)])
if key_gene_nb == 0:
continue
elif key_gene_nb == 1:
key_gene_clst.append(clst)
else:
clusters = split_cluster_on_key_genes(key_genes, clst)
key_gene_clst.extend(clusters)
key_gene_clst.sort(key=lambda c: c.hits[0].position)
return key_gene_clst
def _get_true_loners(clusters: list[Cluster]) -> tuple[dict[str: Loner | LonerMultiSystem], list[Cluster]]:
"""
We call a True Loner a Cluster composed of one or several hit related to the same gene tagged as loner
(by opposition with hit representing a gene tagged loner but include in cluster with several other genes)
:param clusters: the clusters
:return: tuple of 2 elts
* dict containing true clusters {str func_name : :class:`macsylib.hit.Loner | :class:`macsylib.hit.LonerMultiSystem` object}
* list of :class:`macsylib.cluster.Cluster` objects
"""
def add_true_loner(clstr: Cluster) -> None:
"""
Add the hit of the Cluster clstr to the true_loners
The clstr must contain only one hit or a stretch of same gene.
:param clstr:
"""
hits = clstr.hits
clstr_len = len(hits)
if clstr_len > 1:
_log.warning(f"Squash cluster of {clstr_len} {clstr[0].gene_ref.name} loners "
f"({hits[0].position} -> {hits[-1].position})")
func_name = clstr[0].gene_ref.alternate_of().name
if func_name in true_loners:
true_loners[func_name].extend(hits)
else:
true_loners[func_name] = hits
###################
# get True Loners #
###################
# true_loner is a hit which encode for a gene tagged as loner
# and which does NOT clusterize with some other hits of interest
true_clusters = []
true_loners = {}
if clusters:
model = clusters[0].model
hit_weights = clusters[0].hit_weights
for clstr in clusters:
if clstr.loner:
# it's a true Loner or a stretch of same loner
add_true_loner(clstr)
else:
# it's a cluster of 1 hit
# but it's NOT a loner
# min_genes_required == 1
true_clusters.append(clstr)
for func_name, loners in true_loners.items():
# transform ModelHit in Loner
true_loners[func_name] = []
for i, _ in enumerate(loners):
if loners[i].multi_system:
# the counterpart have been already computed during the MS hit instantiation
# instead of the Loner not multisystem it include the hits which clusterize
true_loners[func_name].append(LonerMultiSystem(loners[i]))
else:
counterpart = loners[:]
hit = counterpart.pop(i)
true_loners[func_name].append(Loner(hit, counterpart=counterpart))
# replace List of Loners/MultiSystem by the best hit
best_loner = get_best_hit_4_func(func_name, true_loners[func_name], key='score')
true_loners[func_name] = best_loner
true_loners = {func_name: Cluster([loner], model, hit_weights) for func_name, loner in true_loners.items()}
return true_loners, true_clusters
[docs]
def build_clusters(hits: list[ModelHit],
rep_info: RepliconInfo,
model: Model,
hit_weights: HitWeight) -> tuple[list[Cluster], dict[str: Loner | LonerMultiSystem]]:
"""
From a list of filtered hits, and replicon information (topology, length),
build all lists of hits that satisfied the constraints:
* max_gene_inter_space
* loner
* multi_system
If Yes create a cluster.
A cluster contains at least two hits separated by less or equal than max_gene_inter_space
Except for loner genes which are allowed to be alone in a cluster
:param hits: list of filtered hits
:param rep_info: the replicon to analyse
:param model: the model to study
:param hit_weights: the hit weight needed to compute the cluster score
:return: list of regular clusters,
the special clusters (loners not in cluster and multi systems)
:rtype: tuple with 2 elements
* true_clusters which is list of :class:`Cluster` objects
* true_loners: a dict { str function: :class:macsylib.hit.Loner | :class:macsylib.hit.LonerMultiSystem object}
"""
if hits:
clusters = clusterize_hits_on_distance_only(hits, model, hit_weights, rep_info)
# The hits in clusters are either ModelHit or MultiSystem
# (they are cast during model.filter(hits) method)
# the MultiSystem have no yet counterpart
# which will compute once System will be computed
# to take in account only hits in true system candidates
# whereas the counterpart for loner & LonerMultiSystems during get_true_loners
true_loners, true_clusters = _get_true_loners(clusters)
else: # there is not hits
true_clusters = []
true_loners = {}
return true_clusters, true_loners
[docs]
class Cluster:
"""
Handle hits relative to a model which collocates
"""
_id = itertools.count(1)
[docs]
def __init__(self, hits: list[CoreHit]|list[ModelHit], model, hit_weights) -> None:
"""
:param hits: the hits constituting this cluster
:param model: the model associated to this cluster
:param hit_weights: the weight of the hit to compute the score
"""
self._hits = hits
self.model = model
self._check_replicon_consistency()
self._score = None
self._genes_roles = None
self._hit_weights = hit_weights
self.id = f"c{next(self._id)}"
def __len__(self) -> int:
return len(self.hits)
def __getitem__(self, index: str) -> CoreHit | ModelHit | Cluster:
if isinstance(index, int):
return self.hits[index]
elif isinstance(index, slice):
start, stop, step = index.indices((len(self._hits)))
return self.__class__([self._hits[index] for index in range(start, stop, step)],
self.model,
self._hit_weights)
else:
raise TypeError(f"{self.__class__.__name__} indices must be integers or slices, not {type(index).__name__}")
@property
def hits(self) -> list[CoreHit | ModelHit]:
"""
:return: the hits sorted by the increasing position
"""
return self._hits[:]
@hits.setter
def hits(self, hits: list[CoreHit | ModelHit]) -> None:
"""
set cluster hits
"""
self._hits = hits
@property
def hit_weights(self) -> HitWeight:
"""
:return: the different weight for the hits used to compute the score
"""
return self._hit_weights
@property
def loner(self) -> bool:
"""
:return: True if this cluster is made of only some hits representing the same gene and this gene is tag as loner
False otherwise:
- contains several hits coding for different genes
- contains one hit but gene is not tag as loner (max_gene_required = 1)
"""
# need this method in build_cluster before to transform ModelHit in Loner
# so cannot rely on Loner type
# beware return True if several hits of same gene composed this cluster (I use a set!)
return len({h.gene_ref.name for h in self.hits}) == 1 and self.hits[0].gene_ref.loner
@property
def multi_system(self) -> bool:
"""
:return: True if this cluster is made of only one hit representing a multi_system gene
False otherwise:
- contains several hits
- contains one hit but gene is not tag as loner (max_gene_required = 1)
"""
# by default gene_ref.multi_system == gene_ref.alternate_of().multi_system
return len(self) == 1 and self.hits[0].gene_ref.multi_system
[docs]
def _check_replicon_consistency(self) -> None:
"""
:raise: MacsylibError if all hits of a cluster are NOT related to the same replicon
"""
rep_name = self.hits[0].replicon_name
if not all([h.replicon_name == rep_name for h in self.hits]):
msg = "Cannot build a cluster from hits coming from different replicons"
_log.error(msg)
raise MacsylibError(msg)
[docs]
def __contains__(self, m_hit: ModelHit) -> bool:
"""
:param m_hit: The hit to test
:return: True if the hit is in the cluster hits, False otherwise
"""
return m_hit in self.hits
@property
def functions(self) -> frozenset[str]:
"""
:return: The set of functions encoded by this cluster
*function* mean gene name or reference gene name for exchangeables genes
for instance ::
<model vers="2.0">
<gene a presence="mandatory"/>
<gene b presence="accessory"/>
<exchangeable>
<gene c />
</exchangeable>
<gene/>
</model>
the functions for a cluster corresponding to this model wil be {'a' , 'b'}
"""
if self._genes_roles is None:
self._genes_roles = frozenset({h.gene_ref.alternate_of().name for h in self.hits})
return self._genes_roles
[docs]
def fulfilled_function(self, *genes: ModelGene | str) -> frozenset[str]:
"""
:param genes: The genes which must be tested.
:return: the common functions between genes and this cluster.
"""
# we do not filter out neutral from the model
genes_roles = self.functions
functions = set()
for gene in genes:
if isinstance(gene, macsylib.gene.ModelGene):
function = gene.name
else:
# gene is a string
function = gene
functions.add(function)
return genes_roles.intersection(functions)
def count_function(self) -> Counter[str]:
return Counter(h.gene_ref.alternate_of().name for h in self.hits)
[docs]
def merge(self, cluster: Cluster, before: bool = False) -> None:
"""
merge the cluster param in this one. (do it in place)
:param cluster:
:param bool before: If False the hits of the cluster will be added at the end of this one,
Otherwise the cluster hits will be inserted before the hits of this one.
:raise MacError: if the two clusters have not the same model
"""
if cluster.model != self.model:
raise MacsylibError("Try to merge Clusters from different model")
else:
if before:
self._hits = cluster.hits + self.hits
else:
self._hits.extend(cluster.hits)
@property
def replicon_name(self) -> str:
"""
:return: The name of the replicon where this cluster is located
:rtype: str
"""
return self.hits[0].replicon_name
@property
def score(self) -> float:
"""
:return: The score for this cluster
"""
if self._score is not None:
return self._score
else:
seen_hits = {}
_log.debug("===================== compute score for cluster =====================")
for m_hit in self.hits:
_log.debug(f"-------------- test model hit {m_hit.gene.name} --------------")
# attribute a score for this hit
# according to status of the gene_ref in the model: mandatory/accessory
if m_hit.status == GeneStatus.MANDATORY:
hit_score = self._hit_weights.mandatory
elif m_hit.status == GeneStatus.ACCESSORY:
hit_score = self._hit_weights.accessory
elif m_hit.status == GeneStatus.NEUTRAL:
hit_score = self._hit_weights.neutral
else:
raise MacsylibError(f"a Cluster contains hit {m_hit.gene.name} {m_hit.position}"
f" which is neither mandatory nor accessory: {m_hit.status}")
_log.debug(f"{m_hit.id} is {m_hit.status} hit score = {hit_score}")
# weighted the hit score according to the hit match the gene or
# is an exchangeable
if m_hit.gene_ref.is_exchangeable:
hit_score *= self._hit_weights.exchangeable
_log.debug(f"{m_hit.id} is exchangeable hit score = {hit_score}")
else:
hit_score *= self._hit_weights.itself
if self.loner or self.multi_system:
hit_score *= self._hit_weights.out_of_cluster
_log.debug(f"{m_hit.id} is out of cluster (Loner) score = {hit_score}")
# funct is the name of the gene if it code for itself
# or the name of the reference gene if it's an exchangeable
funct = m_hit.gene_ref.alternate_of().name
if funct in seen_hits:
# count only one occurrence of each function per cluster
# the score use is the max of hit score for this function
if hit_score > seen_hits[funct]:
seen_hits[funct] = hit_score
_log.debug(f"{m_hit.id} code for {funct} update hit_score to {hit_score}")
else:
_log.debug(f"{m_hit.id} code for {funct} which is already take in count in cluster")
else:
_log.debug(f"{m_hit.id} {m_hit.gene_ref.name} is not already in cluster")
seen_hits[funct] = hit_score
hits_scores = seen_hits.values()
score = sum(hits_scores)
_log.debug(f"cluster score = sum({list(hits_scores)}) = {score}")
_log.debug("===============================================================")
self._score = score
return score
[docs]
def __str__(self) -> str:
"""
:return: a string representation of this cluster
"""
rep = f"""Cluster:
- model = {self.model.name}
- replicon = {self.replicon_name}
- hits = {', '.join([f"({h.id}, {h.gene.name}, {h.position})" for h in self.hits])}"""
return rep
[docs]
def replace(self, old: ModelHit, new: ModelHit) -> None:
"""
replace hit old in this cluster by new one. (do it in place)
beware the hits in a cluster are sorted by their position so if old hit and new hit have not same position
the order will be changed
:param old: the hit to replace
:param new: the new hit
:return: None
"""
idx = self._hits.index(old)
self._hits[idx] = new