Source code for mindroot.lib.providers

import inspect
import traceback
import json
import logging
from typing import List, Dict, Optional, Type, TypeVar, cast
from ..db.preferences import find_preferred_models
from ..db.organize_models import uses_models, matching_models
from ..utils.check_args import *
from ..utils.debug import debug_box
import sys
import nanoid
from termcolor import colored

# Import protocol support
try:
    from .protocols.registry import ServiceProxy, get_protocol, list_protocols
    PROTOCOLS_AVAILABLE = True
except ImportError:
    PROTOCOLS_AVAILABLE = False
    ServiceProxy = None
    get_protocol = None
    list_protocols = None

# Import the new v2 preferences system with try/except for backward compatibility
try:
    from .model_preferences_v2 import ModelPreferencesV2
except ImportError:
    ModelPreferencesV2 = None

# TypeVar for Protocol typing
P = TypeVar('P')

[docs] class ProviderManager: def __init__(self): self.functions = {} # Fast path for services that don't need model selection # These services skip all the overhead and go directly to implementation self.fast_path_services = { 'send_s2s_audio_chunk', 'send_s2s_message', 'close_s2s_session', } self._prefs_manager = None # Cache for ModelPreferencesV2
[docs] def register_function(self, name, provider, implementation, signature, docstring, flags): if name not in self.functions: self.functions[name] = [] if provider in [func_info['provider'] for func_info in self.functions[name]]: return self.functions[name].append({'implementation': implementation, 'docstring': docstring, 'flags': flags, 'provider': provider})
[docs] async def exec_with_provider(self, name, provider, *args, **kwargs): if name not in self.functions: raise ValueError(f"function '{name}' not found.") func_info = None for f in self.functions[name]: if f['provider'] == provider: func_info = f break implementation = func_info['implementation'] if implementation is None: raise ValueError(f"function '{name}' not found for provider '{provider}'.") try: result = await implementation(*args, **kwargs) except Exception as e: raise e return result
[docs] async def execute(self, name, *args, **kwargs): if check_empty_args(args, kwargs=kwargs): raise ValueError(f"function '{name}' called with empty arguments.") # FAST PATH: Skip all overhead for simple services that don't need model selection if name in self.fast_path_services: if name not in self.functions: raise ValueError(f"function '{name}' not found.") # Ensure context is in kwargs for fast path services if 'context' not in kwargs: # Try to find context in args found_context = False for arg in args: if arg.__class__.__name__ == 'ChatContext' or hasattr(arg, 'agent'): kwargs['context'] = arg found_context = True break # If still no context, use self.context if not found_context: if hasattr(self, 'context'): kwargs['context'] = self.context else: raise ValueError(f"No context available for service '{name}'") return await self.functions[name][0]['implementation'](*args, **kwargs) if name not in self.functions: raise ValueError(f"function '{name}' not found.") preferred_models = None preferred_provider = None preferred_providers = None found_context = False context = None for arg in args: if arg.__class__.__name__ == 'ChatContext' or hasattr(arg, 'agent'): found_context = True context = arg break if not found_context and 'context' in kwargs: context = kwargs['context'] found_context = True if not found_context and (not 'context' in kwargs): kwargs['context'] = self.context context = self.context need_model = await uses_models(name) if (len(args) > 0 and args[0] is None) and not 'model' in kwargs or ('model' in kwargs and kwargs['model'] is None): if context is not None and context.agent is not None and 'service_models' in context.agent: service_models = context.agent['service_models'] if name in service_models: for func_info in self.functions[name]: if func_info['provider'] == service_models[name]['provider']: args = (service_models[name]['model'], *args[1:]) return await func_info['implementation'](*args, **kwargs) else: # NEW V2 PREFERENCES LOGIC - Only as fallback when no agent-specific model if ModelPreferencesV2 is not None: try: # Use cached preferences manager if self._prefs_manager is None: self._prefs_manager = ModelPreferencesV2() prefs_manager = self._prefs_manager ordered_providers = prefs_manager.get_ordered_providers_for_service(name) if ordered_providers: for provider_name, model_name in ordered_providers: # Check if this provider is available for this function if name in self.functions: for func_info in self.functions[name]: if func_info['provider'] == provider_name: try: # Set the model as first argument if needed if len(args) > 0 and (args[0] is None or not args[0]): args = (model_name, *args[1:]) elif 'model' not in kwargs: kwargs['model'] = model_name return await func_info['implementation'](*args, **kwargs) except Exception as e: continue except Exception as e: pass # Continue with existing logic else: if len(args) > 0: from coreplugins.admin.service_models import cached_get_service_models all_service_models = await cached_get_service_models() this_service_models = all_service_models.get(name, {}) model_name = args[0] try: if '__' in model_name: #provider = model_name.split('__')[0] model_name = model_name.split('__')[1] for provider, model_list in this_service_models.items(): if model_name in model_list: for func_info in self.functions[name]: if func_info['provider'] == provider: return await func_info['implementation'](*args, **kwargs) except Exception as e: pass required_plugins = [] if context and hasattr(context, 'agent') and context.agent: required_plugins = context.agent.get('required_plugins', []) if required_plugins and name in self.functions: for plugin in required_plugins: for func_info in self.functions[name]: if func_info['provider'] == plugin: function_info = func_info return await function_info['implementation'](*args, **kwargs) preferred_providers_list = [] if context is not None and hasattr(context, 'agent') and context.agent: preferred_providers = context.agent.get('preferred_providers', []) if isinstance(preferred_providers, list): preferred_providers_list = preferred_providers elif isinstance(preferred_providers, dict): if name in preferred_providers: preferred_provider = preferred_providers[name] for func_info in self.functions[name]: if func_info['provider'] == preferred_provider: function_info = func_info return await function_info['implementation'](*args, **kwargs) if name == 'stream_chat' and context is None: raise ValueError('stream_chat, context is None') if name == 'stream_chat' and context.agent is None: raise ValueError('stream_chat, context.agent is None') if context is not None and hasattr(context, 'data') and 'PREFERRED_PROVIDER' in context.data: preferred_providers_list = [ context.data['PREFERRED_PROVIDER'] ] if preferred_providers_list and name in self.functions: for func_info in self.functions[name]: if func_info['provider'] in preferred_providers_list: return await func_info['implementation'](*args, **kwargs) if preferred_providers and name in preferred_providers: preferred_provider = preferred_providers[name] for func_info in self.functions[name]: if func_info['provider'] == preferred_provider: function_info = func_info return await function_info['implementation'](*args, **kwargs) if context.__class__.__name__ == 'ChatContext': preferred_models = await find_preferred_models(name, context.flags) context.data['model'] = None if need_model and preferred_models is None: preferred_models = await matching_models(name, context.flags) if preferred_models is not None: if len(preferred_models) > 0: context.data['model'] = preferred_models[0] if preferred_models is not None: if len(preferred_models) > 0: try: preferred_provider = preferred_models[0]['provider'] except KeyError: preferred_provider = None function_info = None if not need_model and preferred_provider is None: preferred_provider = self.functions[name][0]['provider'] if preferred_provider is not None: for func_info in self.functions[name]: if func_info['provider'] == preferred_provider: function_info = func_info break function_info = self.functions[name][0] if function_info is None: raise ValueError(f"1. function '{name}' not found. preferred_provider is '{preferred_provider}'.") implementation = function_info['implementation'] if implementation is None: raise ValueError(f"2. function '{name}' not found. preferred_provider is '{preferred_provider}'.") try: result = await implementation(*args, **kwargs) except Exception as e: raise e return result
[docs] def get_docstring(self, name): if name not in self.functions: logging.warning(f"docstring for '{name}' not found.") return [] return self.functions[name][0]['docstring']
[docs] def get_detailed_functions(self): return self.functions
[docs] def get_functions(self): return list(self.functions.keys())
[docs] def get_docstrings(self): return {name: self.get_docstring(name) for name in self.functions.keys()}
[docs] def get_some_docstrings(self, names): filtered = [] for name in names: if name not in self.functions: logging.warning(f"agent function '{name}' not found") else: filtered.append(name) return {name: self.get_docstring(name) for name in filtered}
def __getattr__(self, name): # Handle special protocol-related methods if name == 'typed': return self._typed if name == 'get_protocol': return self._get_protocol if name == 'list_protocols': return self._list_protocols async def method(*args, **kwargs): return await self.execute(name, *args, **kwargs) return method def _typed(self, protocol: Type[P]) -> P: """Get a typed proxy for a service protocol. This enables IDE autocomplete and type checking for services. Args: protocol: A Protocol class defining the service interface Returns: A proxy object typed as the Protocol, delegating to service_manager Example: from lib.providers.protocols import LLM llm: LLM = service_manager.typed(LLM) stream = await llm.stream_chat('gpt-4', messages=[...]) """ if not PROTOCOLS_AVAILABLE: raise RuntimeError("Protocol support not available. Check protocols module installation.") return cast(P, ServiceProxy(self, protocol)) def _get_protocol(self, name: str): """Get a registered Protocol by name. Args: name: The protocol name (e.g., 'llm', 'sip') Returns: The Protocol class, or None if not found """ if not PROTOCOLS_AVAILABLE: return None return get_protocol(name) def _list_protocols(self) -> Dict[str, type]: """List all registered Protocols. Returns: Dict mapping protocol names to Protocol classes """ if not PROTOCOLS_AVAILABLE: return {} return list_protocols()
[docs] class HookManager: _instance = None _initialized = False _hook_manager = None def __new__(cls): if cls._instance is None: cls._instance = super().__new__(cls) cls._hook_manager = cls._instance return cls._instance def __init__(self): if not self._initialized: self.unique_id = nanoid.generate() self.hooks = {} self.__class__._initialized = True
[docs] def register_hook(self, name, implementation, signature, docstring): if name not in self.hooks: self.hooks[name] = [] self.hooks[name].append({'implementation': implementation, 'docstring': docstring})
[docs] async def execute_hooks(self, name, *args, **kwargs): if name not in self.hooks: return [] results = [] for hook_info in self.hooks[name]: implementation = hook_info['implementation'] result = await implementation(*args, **kwargs) results.append(result) return results
[docs] def get_docstring(self, name): if name not in self.hooks: raise ValueError(f"hook '{name}' not found.") return [hook_info['docstring'] for hook_info in self.hooks[name]]
[docs] def get_hooks(self): return list(self.hooks.keys())
[docs] def get_docstrings(self): return {name: self.get_docstring(name) for name in self.hooks.keys()}
def __getattr__(self, name): async def method(*args, **kwargs): return await self.execute_hooks(name, *args, **kwargs) return method