Source code for mindroot.coreplugins.email.smtp_handler
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from typing import Dict
import logging
logger = logging.getLogger(__name__)
[docs]
class SMTPHandler:
def __init__(self, config: Dict):
self.smtp_server = config['smtp_server']
self.smtp_port = config['smtp_port']
self.use_tls = config.get('use_tls', True)
self.email = config['email']
self.password = config['password']
[docs]
async def connect(self) -> smtplib.SMTP:
"""Establish SMTP connection"""
try:
if self.use_tls:
server = smtplib.SMTP(self.smtp_server, self.smtp_port)
server.starttls()
else:
server = smtplib.SMTP_SSL(self.smtp_server, self.smtp_port)
server.login(self.email, self.password)
return server
except Exception as e:
logger.error(f"SMTP Connection error: {str(e)}")
raise
[docs]
async def send_email(self, to: str, subject: str, body: str,
reply_to_message: Dict = None,
headers: Dict = None) -> Dict:
"""Send an email"""
try:
server = await self.connect()
msg = MIMEMultipart('alternative')
msg['From'] = self.email
msg['To'] = to
msg['Subject'] = subject
# Add custom headers
if headers:
for key, value in headers.items():
msg[key] = value
# Add reply headers if this is a reply
if reply_to_message:
msg['In-Reply-To'] = reply_to_message['message_id']
if not subject.lower().startswith('re:'):
msg['Subject'] = f"Re: {subject}"
# Detect if body contains HTML
if '<html>' in body.lower() or '<p>' in body.lower() or '<br>' in body.lower():
# HTML email
msg.attach(MIMEText(body, 'html'))
else:
# Plain text email
msg.attach(MIMEText(body, 'plain'))
server.send_message(msg)
server.quit()
return {
"success": True,
"message_id": msg.get('Message-ID'),
"error": None
}
except Exception as e:
logger.error(f"Error sending email: {str(e)}")
return {
"success": False,
"message_id": None,
"error": str(e)
}