import inspect
import traceback
import json
import logging
from typing import List, Dict, Optional, Type, TypeVar, cast
from ..db.preferences import find_preferred_models
from ..db.organize_models import uses_models, matching_models
from ..utils.check_args import *
from ..utils.debug import debug_box
import sys
import nanoid
from termcolor import colored
# Import protocol support
try:
from .protocols.registry import ServiceProxy, get_protocol, list_protocols
PROTOCOLS_AVAILABLE = True
except ImportError:
PROTOCOLS_AVAILABLE = False
ServiceProxy = None
get_protocol = None
list_protocols = None
# Import the new v2 preferences system with try/except for backward compatibility
try:
from .model_preferences_v2 import ModelPreferencesV2
except ImportError:
ModelPreferencesV2 = None
# TypeVar for Protocol typing
P = TypeVar('P')
[docs]
class ProviderManager:
def __init__(self):
self.functions = {}
# Fast path for services that don't need model selection
# These services skip all the overhead and go directly to implementation
self.fast_path_services = {
'send_s2s_audio_chunk',
'send_s2s_message',
'close_s2s_session',
}
self._prefs_manager = None # Cache for ModelPreferencesV2
[docs]
def register_function(self, name, provider, implementation, signature, docstring, flags):
if name not in self.functions:
self.functions[name] = []
if provider in [func_info['provider'] for func_info in self.functions[name]]:
return
self.functions[name].append({'implementation': implementation, 'docstring': docstring, 'flags': flags, 'provider': provider})
[docs]
async def exec_with_provider(self, name, provider, *args, **kwargs):
if name not in self.functions:
raise ValueError(f"function '{name}' not found.")
func_info = None
for f in self.functions[name]:
if f['provider'] == provider:
func_info = f
break
implementation = func_info['implementation']
if implementation is None:
raise ValueError(f"function '{name}' not found for provider '{provider}'.")
try:
result = await implementation(*args, **kwargs)
except Exception as e:
raise e
return result
[docs]
async def execute(self, name, *args, **kwargs):
if check_empty_args(args, kwargs=kwargs):
raise ValueError(f"function '{name}' called with empty arguments.")
# FAST PATH: Skip all overhead for simple services that don't need model selection
if name in self.fast_path_services:
if name not in self.functions:
raise ValueError(f"function '{name}' not found.")
# Ensure context is in kwargs for fast path services
if 'context' not in kwargs:
# Try to find context in args
found_context = False
for arg in args:
if arg.__class__.__name__ == 'ChatContext' or hasattr(arg, 'agent'):
kwargs['context'] = arg
found_context = True
break
# If still no context, use self.context
if not found_context:
if hasattr(self, 'context'):
kwargs['context'] = self.context
else:
raise ValueError(f"No context available for service '{name}'")
return await self.functions[name][0]['implementation'](*args, **kwargs)
if name not in self.functions:
raise ValueError(f"function '{name}' not found.")
preferred_models = None
preferred_provider = None
preferred_providers = None
found_context = False
context = None
for arg in args:
if arg.__class__.__name__ == 'ChatContext' or hasattr(arg, 'agent'):
found_context = True
context = arg
break
if not found_context and 'context' in kwargs:
context = kwargs['context']
found_context = True
if not found_context and (not 'context' in kwargs):
kwargs['context'] = self.context
context = self.context
need_model = await uses_models(name)
if (len(args) > 0 and args[0] is None) and not 'model' in kwargs or ('model' in kwargs and kwargs['model'] is None):
if context is not None and context.agent is not None and 'service_models' in context.agent:
service_models = context.agent['service_models']
if name in service_models:
for func_info in self.functions[name]:
if func_info['provider'] == service_models[name]['provider']:
args = (service_models[name]['model'], *args[1:])
return await func_info['implementation'](*args, **kwargs)
else:
# NEW V2 PREFERENCES LOGIC - Only as fallback when no agent-specific model
if ModelPreferencesV2 is not None:
try:
# Use cached preferences manager
if self._prefs_manager is None:
self._prefs_manager = ModelPreferencesV2()
prefs_manager = self._prefs_manager
ordered_providers = prefs_manager.get_ordered_providers_for_service(name)
if ordered_providers:
for provider_name, model_name in ordered_providers:
# Check if this provider is available for this function
if name in self.functions:
for func_info in self.functions[name]:
if func_info['provider'] == provider_name:
try:
# Set the model as first argument if needed
if len(args) > 0 and (args[0] is None or not args[0]):
args = (model_name, *args[1:])
elif 'model' not in kwargs:
kwargs['model'] = model_name
return await func_info['implementation'](*args, **kwargs)
except Exception as e:
continue
except Exception as e:
pass # Continue with existing logic
else:
if len(args) > 0:
from coreplugins.admin.service_models import cached_get_service_models
all_service_models = await cached_get_service_models()
this_service_models = all_service_models.get(name, {})
model_name = args[0]
try:
if '__' in model_name:
#provider = model_name.split('__')[0]
model_name = model_name.split('__')[1]
for provider, model_list in this_service_models.items():
if model_name in model_list:
for func_info in self.functions[name]:
if func_info['provider'] == provider:
return await func_info['implementation'](*args, **kwargs)
except Exception as e:
pass
required_plugins = []
if context and hasattr(context, 'agent') and context.agent:
required_plugins = context.agent.get('required_plugins', [])
if required_plugins and name in self.functions:
for plugin in required_plugins:
for func_info in self.functions[name]:
if func_info['provider'] == plugin:
function_info = func_info
return await function_info['implementation'](*args, **kwargs)
preferred_providers_list = []
if context is not None and hasattr(context, 'agent') and context.agent:
preferred_providers = context.agent.get('preferred_providers', [])
if isinstance(preferred_providers, list):
preferred_providers_list = preferred_providers
elif isinstance(preferred_providers, dict):
if name in preferred_providers:
preferred_provider = preferred_providers[name]
for func_info in self.functions[name]:
if func_info['provider'] == preferred_provider:
function_info = func_info
return await function_info['implementation'](*args, **kwargs)
if name == 'stream_chat' and context is None:
raise ValueError('stream_chat, context is None')
if name == 'stream_chat' and context.agent is None:
raise ValueError('stream_chat, context.agent is None')
if context is not None and hasattr(context, 'data') and 'PREFERRED_PROVIDER' in context.data:
preferred_providers_list = [ context.data['PREFERRED_PROVIDER'] ]
if preferred_providers_list and name in self.functions:
for func_info in self.functions[name]:
if func_info['provider'] in preferred_providers_list:
return await func_info['implementation'](*args, **kwargs)
if preferred_providers and name in preferred_providers:
preferred_provider = preferred_providers[name]
for func_info in self.functions[name]:
if func_info['provider'] == preferred_provider:
function_info = func_info
return await function_info['implementation'](*args, **kwargs)
if context.__class__.__name__ == 'ChatContext':
preferred_models = await find_preferred_models(name, context.flags)
context.data['model'] = None
if need_model and preferred_models is None:
preferred_models = await matching_models(name, context.flags)
if preferred_models is not None:
if len(preferred_models) > 0:
context.data['model'] = preferred_models[0]
if preferred_models is not None:
if len(preferred_models) > 0:
try:
preferred_provider = preferred_models[0]['provider']
except KeyError:
preferred_provider = None
function_info = None
if not need_model and preferred_provider is None:
preferred_provider = self.functions[name][0]['provider']
if preferred_provider is not None:
for func_info in self.functions[name]:
if func_info['provider'] == preferred_provider:
function_info = func_info
break
function_info = self.functions[name][0]
if function_info is None:
raise ValueError(f"1. function '{name}' not found. preferred_provider is '{preferred_provider}'.")
implementation = function_info['implementation']
if implementation is None:
raise ValueError(f"2. function '{name}' not found. preferred_provider is '{preferred_provider}'.")
try:
result = await implementation(*args, **kwargs)
except Exception as e:
raise e
return result
[docs]
def get_docstring(self, name):
if name not in self.functions:
logging.warning(f"docstring for '{name}' not found.")
return []
return self.functions[name][0]['docstring']
[docs]
def get_detailed_functions(self):
return self.functions
[docs]
def get_functions(self):
return list(self.functions.keys())
[docs]
def get_docstrings(self):
return {name: self.get_docstring(name) for name in self.functions.keys()}
[docs]
def get_some_docstrings(self, names):
filtered = []
for name in names:
if name not in self.functions:
logging.warning(f"agent function '{name}' not found")
else:
filtered.append(name)
return {name: self.get_docstring(name) for name in filtered}
def __getattr__(self, name):
# Handle special protocol-related methods
if name == 'typed':
return self._typed
if name == 'get_protocol':
return self._get_protocol
if name == 'list_protocols':
return self._list_protocols
async def method(*args, **kwargs):
return await self.execute(name, *args, **kwargs)
return method
def _typed(self, protocol: Type[P]) -> P:
"""Get a typed proxy for a service protocol.
This enables IDE autocomplete and type checking for services.
Args:
protocol: A Protocol class defining the service interface
Returns:
A proxy object typed as the Protocol, delegating to service_manager
Example:
from lib.providers.protocols import LLM
llm: LLM = service_manager.typed(LLM)
stream = await llm.stream_chat('gpt-4', messages=[...])
"""
if not PROTOCOLS_AVAILABLE:
raise RuntimeError("Protocol support not available. Check protocols module installation.")
return cast(P, ServiceProxy(self, protocol))
def _get_protocol(self, name: str):
"""Get a registered Protocol by name.
Args:
name: The protocol name (e.g., 'llm', 'sip')
Returns:
The Protocol class, or None if not found
"""
if not PROTOCOLS_AVAILABLE:
return None
return get_protocol(name)
def _list_protocols(self) -> Dict[str, type]:
"""List all registered Protocols.
Returns:
Dict mapping protocol names to Protocol classes
"""
if not PROTOCOLS_AVAILABLE:
return {}
return list_protocols()
[docs]
class HookManager:
_instance = None
_initialized = False
_hook_manager = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._hook_manager = cls._instance
return cls._instance
def __init__(self):
if not self._initialized:
self.unique_id = nanoid.generate()
self.hooks = {}
self.__class__._initialized = True
[docs]
def register_hook(self, name, implementation, signature, docstring):
if name not in self.hooks:
self.hooks[name] = []
self.hooks[name].append({'implementation': implementation, 'docstring': docstring})
[docs]
async def execute_hooks(self, name, *args, **kwargs):
if name not in self.hooks:
return []
results = []
for hook_info in self.hooks[name]:
implementation = hook_info['implementation']
result = await implementation(*args, **kwargs)
results.append(result)
return results
[docs]
def get_docstring(self, name):
if name not in self.hooks:
raise ValueError(f"hook '{name}' not found.")
return [hook_info['docstring'] for hook_info in self.hooks[name]]
[docs]
def get_hooks(self):
return list(self.hooks.keys())
[docs]
def get_docstrings(self):
return {name: self.get_docstring(name) for name in self.hooks.keys()}
def __getattr__(self, name):
async def method(*args, **kwargs):
return await self.execute_hooks(name, *args, **kwargs)
return method