Source code for mindroot.coreplugins.email.services
from typing import Dict, List, Optional
from .email_provider import EmailProvider
# Global provider instance
_provider = None
[docs]
async def init_provider(config: Dict) -> None:
"""Initialize the email provider with config"""
global _provider
_provider = EmailProvider(config)
[docs]
async def get_provider() -> EmailProvider:
"""Get the provider instance, initializing with defaults if needed"""
global _provider
if _provider is None:
raise RuntimeError("Email provider not initialized. Call init_provider first.")
return _provider
# Removed @service() decorator - this is now just a helper function
# The actual service is in mod.py
[docs]
async def send_email_helper(config: Dict, to: str, subject: str, body: str,
reply_to_message: Dict = None,
headers: Dict = None) -> Dict:
"""Helper function to send an email (not a service)"""
provider = await get_provider()
return await provider.send_email(
to=to,
subject=subject,
body=body,
reply_to_message=reply_to_message,
headers=headers
)
[docs]
async def check_emails(config: Dict, folder: str = "INBOX",
criteria: Dict = None,
batch_size: int = None,
max_messages: int = None,
start_id: str = None) -> Dict:
"""Service to check for new emails with pagination support"""
provider = await get_provider()
return await provider.check_emails(
folder=folder,
criteria=criteria,
batch_size=batch_size,
max_messages=max_messages,
start_id=start_id
)
[docs]
async def mark_as_processed(config: Dict, message_ids: List[str],
folder: str = "INBOX") -> Dict:
"""Service to mark emails as processed"""
provider = await get_provider()
return await provider.mark_as_processed(
message_ids=message_ids,
folder=folder
)