Source code for macsylib.utils

#########################################################################
# MacSyLib - Python library to detect macromolecular systems            #
#            in prokaryotes protein dataset using systems modelling     #
#            and similarity search.                                     #
#                                                                       #
# Authors: Sophie Abby, Bertrand Neron                                  #
# Copyright (c) 2014-2025  Institut Pasteur (Paris) and CNRS.           #
# See the COPYRIGHT file for details                                    #
#                                                                       #
# This file is part of MacSyLib package.                                #
#                                                                       #
# MacSyLib is free software: you can redistribute it and/or modify      #
# it under the terms of the GNU General Public License as published by  #
# the Free Software Foundation, either version 3 of the License, or     #
# (at your option) any later version.                                   #
#                                                                       #
# MacSyLib is distributed in the hope that it will be useful,           #
# but WITHOUT ANY WARRANTY; without even the implied warranty of        #
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the          #
# GNU General Public License for more details .                         #
#                                                                       #
# You should have received a copy of the GNU General Public License     #
# along with MacSyLib (COPYING).                                        #
# If not, see <https://www.gnu.org/licenses/>.                          #
#########################################################################

"""
Some mcsylib helper functions
"""

import os
import os.path
import gzip
import contextlib
import argparse
import logging
from itertools import groupby

from .error import MacsylibError
from .config import MacsyDefaults, Config
from .registries import DefinitionLocation, ModelRegistry, scan_models_dir

_log = logging.getLogger(__name__)


def list_models(args: argparse.Namespace) -> str:
    """
    :param args: The command line argument once parsed
    :return: a string representation of all models and submodels installed.
    """
    defaults = MacsyDefaults()
    config = Config(defaults, args)
    model_dirs = config.models_dir()
    registry = ModelRegistry()
    for model_dir in model_dirs:
        try:
            for model_loc in scan_models_dir(model_dir, profile_suffix=config.profile_suffix()):
                registry.add(model_loc)
        except PermissionError as err:
            _log.warning(f"{model_dir} is not readable: {err} : skip it.")
    return str(registry)


[docs] def get_def_to_detect(models: list[tuple[str, tuple[str]]], model_registry: ModelRegistry) -> tuple[list[DefinitionLocation], str, str]: """ :param models: the list of models to detect as returned by config.models. :type models: list of tuple with the following structure: [('model_fqn', ('def1, def2, ...)), ('model_2', ('def1', ...)), ...] :param model_registry: the models registry for this run. :return: the definitions to parse :raise ValueError: if a model name provided in models is not in model_registry. """ root, def_names = models root = root.rstrip(os.path.sep) model_family = DefinitionLocation.root_name(root) model_loc = model_registry[model_family] model_vers = model_loc.version if 'all' in [d.lower() for d in def_names]: if root == model_loc.name: root = None def_to_detect = model_loc.get_all_definitions(root_def_name=root) else: def_to_detect = [model_loc.get_definition(f'{root}/{one_def}') for one_def in def_names] return def_to_detect, model_family, model_vers
[docs] def get_replicon_names(genome_path, db_type) -> list[str]: if db_type == 'gembase': return _get_gembase_replicon_names(genome_path) elif db_type in ('ordered_replicon', 'unordered'): return [os.path.splitext(os.path.basename(genome_path))[0]] else: raise MacsylibError(f"Invalid genome type: {db_type}")
def _get_gembase_replicon_names(genome_path: str) -> list[str]: """ parse gembase file and get the list of replicon identifiers :param genome_path: The path to a file containing sequence in **gembase** format :return: the list of replicon identifiers """ def grp_replicon(ids: str) -> str: """ in gembase the identifier of fasta sequence follows the following schema: <replicon-name>_<seq-name> with eventually '_' inside the <replicon_name> but not in the <seq-name>. so grp_replicon allow to group sequences belonging to the same replicon. """ return "_".join(ids.split('_')[: -1]) seq_ids = [] with open(genome_path, 'r') as fh: for line in fh: if line.startswith('>'): seq_ids.append(line.split()[0][1:]) replicons = [rep_name for rep_name, _ in groupby(seq_ids, key=grp_replicon)] return replicons
[docs] def threads_available() -> int: """ :return: The maximal number of threads available. It's nice with cluster scheduler or linux. On Mac it uses the number of physical cores """ if hasattr(os, "sched_getaffinity"): threads_nb = len(os.sched_getaffinity(0)) else: threads_nb = os.cpu_count() return threads_nb
[docs] def parse_time(user_time: int | str) -> int: """ parse user-friendly time and return it in seconds user time supports units as s h m d for sec min hour day or a combination of them 1h10m50s means 1 hour 10 minutes 50 seconds all terms will be converted in seconds and added :param user_time: :return: seconds :raise: ValueError if user_time is not parseable """ try: user_time = int(user_time) return user_time # user time has no units , it's seconds except ValueError: pass import re parts_converter = {'s': lambda x: x, 'm': lambda x: x * 60, 'h': lambda x: x * 3600, 'd': lambda x: x * 86400 } time_parts = re.findall(r'(\d+)(\D+)', user_time) time = 0 for value, unit in time_parts: unit = unit.strip().lower() try: time += parts_converter[unit](int(value)) except KeyError: raise ValueError("Not valid time format. Units allowed h/m/s.") return time
@contextlib.contextmanager def open_compressed(path: str, mode: str = 'rt') -> str: """ :param path: the path to open :param mode: the opening mode by default read text :yield: the content of the file line by line """ _, ext = os.path.splitext(path) if ext == '.gz': my_open = gzip.open elif ext == '.bz2' or ext == '.zip': msg = f"MacSyLib does not support '{ext[1:]}' compression (only gzip)." raise ValueError(msg) else: # I assumed it's a fasta not compressed my_open = open with my_open(path, mode) as f: yield f