Source code for mindroot.coreplugins.user_service.email_service
from lib.providers.services import service_manager
from datetime import datetime, timedelta
import secrets
import os
import logging
logger = logging.getLogger(__name__)
REQUIRE_EMAIL_VERIFY = os.environ.get('REQUIRE_EMAIL_VERIFY', '').lower() == 'true'
BASE_URL = os.environ.get('BASE_URL', 'http://localhost:8011')
[docs]
async def send_verification_email(email: str, verification_token: str):
"""Send email verification link to user."""
verification_url = f"{BASE_URL}/verify-email?token={verification_token}"
email_html = f"""
<html>
<body>
<h1>Welcome to MindRoot!</h1>
<p>Please verify your email address by clicking the link below:</p>
<p><a href="{verification_url}" style="background-color: #4CAF50; color: white; padding: 10px 20px; text-decoration: none; border-radius: 4px;">Verify Email</a></p>
<p>Or copy and paste this link into your browser:</p>
<p><code>{verification_url}</code></p>
<p>This link will expire in 24 hours.</p>
<br>
<p><small>If you did not create this account, please ignore this email.</small></p>
</body>
</html>
"""
try:
result = await service_manager.send_email(
to=email,
subject="Verify Your MindRoot Account",
body=email_html # HTML content will be auto-detected
)
if result.get('success'):
logger.info(f"Verification email sent successfully to {email}")
return True
else:
logger.error(f"Failed to send verification email to {email}: {result.get('error')}")
return False
except Exception as e:
logger.error(f"Exception sending verification email to {email}: {e}")
return False
[docs]
async def send_password_reset_email(email: str, username: str, reset_token: str):
"""Send password reset email to user."""
reset_url = f"{BASE_URL}/reset-password?token={reset_token}"
email_html = f"""
<html>
<body>
<h1>Password Reset Request</h1>
<p>Hello {username},</p>
<p>You have requested to reset your password for your MindRoot account.</p>
<p>Click the link below to reset your password:</p>
<p><a href="{reset_url}" style="background-color: #f44336; color: white; padding: 10px 20px; text-decoration: none; border-radius: 4px;">Reset Password</a></p>
<p>Or copy and paste this link into your browser:</p>
<p><code>{reset_url}</code></p>
<p>This link will expire in 1 hour.</p>
<br>
<p><small>If you did not request this password reset, please ignore this email.</small></p>
</body>
</html>
"""
try:
result = await service_manager.send_email(
to=email,
subject="Password Reset Request - MindRoot",
body=email_html # HTML content will be auto-detected
)
if result.get('success'):
logger.info(f"Password reset email sent successfully to {email}")
return True
else:
logger.error(f"Failed to send password reset email to {email}: {result.get('error')}")
return False
except Exception as e:
logger.error(f"Exception sending password reset email to {email}: {e}")
return False
[docs]
def setup_verification() -> tuple[str, str, bool]:
"""Setup email verification token and expiry.
Returns: (token, expiry timestamp, verified status)
"""
if not REQUIRE_EMAIL_VERIFY:
return None, None, True
verification_token = secrets.token_urlsafe(32)
verification_expires = (datetime.utcnow() + timedelta(hours=24)).isoformat()
return verification_token, verification_expires, False