Source code for mindroot.coreplugins.mcp_.oauth_storage
"""OAuth token storage implementation for MCP servers."""
from typing import Optional
from datetime import datetime
import json
try:
from mcp.client.auth import TokenStorage
from mcp.shared.auth import OAuthToken, OAuthClientInformationFull
except ImportError:
# Fallback if MCP not available
class TokenStorage:
pass
OAuthToken = None
OAuthClientInformationFull = None
[docs]
class MCPTokenStorage(TokenStorage):
"""Token storage implementation for MCP OAuth flows.
Stores tokens and client information in the MCP server configuration.
"""
def __init__(self, server_name: str, mcp_manager):
self.server_name = server_name
self.mcp_manager = mcp_manager
[docs]
async def get_tokens(self) -> Optional[OAuthToken]:
"""Get stored OAuth tokens for the server."""
if OAuthToken is None:
return None
if self.server_name not in self.mcp_manager.servers:
return None
server = self.mcp_manager.servers[self.server_name]
if not server.access_token:
return None
# Parse token expiration
expires_at = None
if server.token_expires_at:
try:
expires_at = datetime.fromisoformat(server.token_expires_at.replace('Z', '+00:00'))
except ValueError:
pass
return OAuthToken(
access_token=server.access_token,
refresh_token=server.refresh_token,
expires_at=expires_at,
scope=" ".join(server.scopes) if server.scopes else None
)
[docs]
async def set_tokens(self, tokens: OAuthToken) -> None:
"""Store OAuth tokens for the server."""
if self.server_name not in self.mcp_manager.servers:
return
server = self.mcp_manager.servers[self.server_name]
# Update server with token information
server.access_token = tokens.access_token
server.refresh_token = tokens.refresh_token
if hasattr(tokens, 'expires_at') and tokens.expires_at:
server.token_expires_at = tokens.expires_at.isoformat()
elif hasattr(tokens, 'expires_in') and tokens.expires_in:
# Calculate expires_at from expires_in
from datetime import datetime, timedelta
expires_at = datetime.now() + timedelta(seconds=tokens.expires_in)
server.token_expires_at = expires_at.isoformat()
else:
server.token_expires_at = None
if tokens.scope:
server.scopes = tokens.scope.split(" ")
# Save configuration
self.mcp_manager.save_config()
[docs]
async def get_client_info(self) -> Optional[OAuthClientInformationFull]:
"""Get stored OAuth client information."""
if OAuthClientInformationFull is None:
return None
if self.server_name not in self.mcp_manager.servers:
return None
server = self.mcp_manager.servers[self.server_name]
if not server.client_id:
return None
# Create client info from server configuration
client_info_data = {
"client_id": server.client_id,
"client_name": f"MindRoot MCP Client - {server.name}",
"redirect_uris": [server.redirect_uri] if server.redirect_uri else [],
"grant_types": ["authorization_code", "refresh_token"],
"response_types": ["code"],
"scope": " ".join(server.scopes) if server.scopes else "user"
}
if server.client_secret:
client_info_data["client_secret"] = server.client_secret
return OAuthClientInformationFull(**client_info_data)
[docs]
async def set_client_info(self, client_info: OAuthClientInformationFull) -> None:
"""Store OAuth client information."""
if self.server_name not in self.mcp_manager.servers:
return
server = self.mcp_manager.servers[self.server_name]
# Update server with client information
server.client_id = client_info.client_id
if hasattr(client_info, 'client_secret') and client_info.client_secret:
server.client_secret = client_info.client_secret
if client_info.redirect_uris:
server.redirect_uri = client_info.redirect_uris[0]
if client_info.scope:
server.scopes = client_info.scope.split(" ")
# Save configuration
self.mcp_manager.save_config()
[docs]
def clear_tokens(self):
"""Clear stored tokens for the server."""
if self.server_name not in self.mcp_manager.servers:
return
server = self.mcp_manager.servers[self.server_name]
server.access_token = None
server.refresh_token = None
server.token_expires_at = None
# Save configuration
self.mcp_manager.save_config()