Coverage for fastblocks / mcp / server.py: 37%
60 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-26 03:30 -0800
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-26 03:30 -0800
1"""FastBlocks MCP (Model Context Protocol) server implementation.
3Provides IDE/AI assistant integration for FastBlocks capabilities including:
4- Template management and validation
5- Component creation and discovery
6- Adapter configuration and health checks
7"""
9import logging
10from contextlib import suppress
11from typing import Any
13logger = logging.getLogger(__name__)
16class FastBlocksMCPServer:
17 """FastBlocks MCP protocol server using ACB infrastructure."""
19 def __init__(self, name: str = "fastblocks", version: str = "0.16.0"):
20 """Initialize FastBlocks MCP server.
22 Args:
23 name: Server name for MCP protocol
24 version: FastBlocks version
25 """
26 self.name = name
27 self.version = version
28 self._server: Any | None = None
29 self._initialized = False
31 async def initialize(self) -> None:
32 """Initialize MCP server with ACB integration."""
33 if self._initialized:
34 return
36 try:
37 # Import ACB MCP utilities
38 from acb import HAS_MCP
40 if not HAS_MCP:
41 logger.warning("ACB MCP support not available - server disabled")
42 return
44 from acb import create_mcp_server
46 # Create server using ACB infrastructure
47 # ACB provides the FastMCP instance with rate limiting already configured
48 self._server = create_mcp_server()
50 # Register FastBlocks tools and resources
51 await self._register_tools()
52 await self._register_resources()
54 self._initialized = True
55 logger.info(
56 f"FastBlocks MCP server initialized: {self.name} v{self.version} "
57 f"(using ACB infrastructure with rate limiting: 15 req/sec, burst 40)"
58 )
60 except ImportError:
61 logger.debug("ACB MCP dependencies not available - graceful degradation")
62 except Exception as e:
63 logger.error(f"Failed to initialize MCP server: {e}")
65 async def _register_tools(self) -> None:
66 """Register FastBlocks MCP tools.
68 Tools will be implemented in tools.py and registered here.
69 """
70 with suppress(Exception):
71 from .tools import register_fastblocks_tools
73 await register_fastblocks_tools(self._server)
74 logger.debug("FastBlocks MCP tools registered")
76 async def _register_resources(self) -> None:
77 """Register FastBlocks MCP resources.
79 Resources will be implemented in resources.py and registered here.
80 """
81 with suppress(Exception):
82 from .resources import register_fastblocks_resources
84 await register_fastblocks_resources(self._server)
85 logger.debug("FastBlocks MCP resources registered")
87 async def start(self) -> None:
88 """Start the MCP server."""
89 if not self._initialized:
90 await self.initialize()
92 if self._server is None:
93 logger.warning("MCP server not available - skipping start")
94 return
96 try:
97 logger.info("Starting FastBlocks MCP server...")
98 await self._server.run()
99 except Exception as e:
100 logger.error(f"MCP server error: {e}")
101 raise
103 async def stop(self) -> None:
104 """Stop the MCP server gracefully."""
105 if self._server is None:
106 return
108 try:
109 logger.info("Stopping FastBlocks MCP server...")
110 # Server shutdown will be handled by ACB
111 await self._server.stop()
112 except Exception as e:
113 logger.error(f"Error stopping MCP server: {e}")
116async def create_fastblocks_mcp_server() -> FastBlocksMCPServer:
117 """Create and initialize FastBlocks MCP server.
119 Returns:
120 Initialized FastBlocksMCPServer instance
122 Example:
123 >>> server = await create_fastblocks_mcp_server()
124 >>> await server.start()
125 """
126 server = FastBlocksMCPServer()
127 await server.initialize()
128 return server