Skip to content

Interfaces Overview

FlowTask interfaces provide common functionality and contracts that components can implement to gain specific capabilities. They follow a mixin pattern, allowing components to combine multiple interfaces.

Core Interfaces

FlowComponent

The base class for all FlowTask components.

class FlowComponent:
    """Base class for all FlowTask components."""

    async def start(self) -> bool:
        """Initialize the component."""

    async def run(self):
        """Execute the component's main logic."""

    async def close(self):
        """Clean up resources."""

CredentialsInterface

Provides credential management functionality.

class CredentialsInterface:
    """Handle credentials for external services."""

    _credentials: dict = {}  # Define expected credential fields

    def processing_credentials(self):
        """Process and validate credentials."""

Example Usage:

ComponentWithCredentials:
  credentials:
    api_key: "your-api-key"
    secret: "your-secret"

TemplateSupport

Adds template rendering capabilities using Jinja2.

class TemplateSupport:
    """Provides Jinja2 template rendering."""

    use_template: bool = True
    template_dir: str = "templates/"

Example Usage:

TemplateComponent:
  template: "email_template.html"
  use_template: true
  template_dir: "/path/to/templates"

Data Interfaces

PandasDataframe

Provides pandas DataFrame manipulation capabilities.

class PandasDataframe:
    """Pandas DataFrame operations."""

    as_dataframe: bool = True

    def to_dataframe(self, data) -> pd.DataFrame:
        """Convert data to DataFrame."""

Example Usage:

DataProcessor:
  as_dataframe: true
  operation: "transform"

FileSupport

File system operations interface.

class FileSupport:
    """File system operations."""

    def read_file(self, filepath: str) -> str:
        """Read file contents."""

    def write_file(self, filepath: str, content: str):
        """Write content to file."""

HTTP Interfaces

HTTPService

Base class for HTTP-based services.

class HTTPService:
    """HTTP client functionality."""

    timeout: int = 30
    retries: int = 3

    async def request(self, method: str, url: str, **kwargs):
        """Make HTTP request."""

Example Usage:

HTTPComponent:
  timeout: 60
  retries: 5
  headers:
    User-Agent: "FlowTask/1.0"

Database Interfaces

DatabaseSupport

Database connection and operations.

class DatabaseSupport:
    """Database operations interface."""

    def get_connection(self):
        """Get database connection."""

    async def execute_query(self, query: str, params: dict = None):
        """Execute database query."""

Interface Combinations

Components can implement multiple interfaces:

class WebScraperComponent(FlowComponent, HTTPService, PandasDataframe):
    """
    Web scraper that fetches data via HTTP and outputs as DataFrame.
    """

    def __init__(self, **kwargs):
        # Initialize all parent classes
        super().__init__(**kwargs)

YAML Configuration:

WebScraper:
  # HTTP settings
  timeout: 30
  headers:
    User-Agent: "CustomBot/1.0"

  # DataFrame settings
  as_dataframe: true

  # Component-specific settings
  url: "https://api.example.com/data"
  selector: "table.data-table"

Creating Custom Interfaces

Define reusable functionality as interfaces:

class CacheInterface:
    """Provides caching capabilities."""

    cache_enabled: bool = True
    cache_ttl: int = 3600

    def get_cache_key(self, *args) -> str:
        """Generate cache key."""
        return hashlib.md5(str(args).encode()).hexdigest()

    async def get_cached(self, key: str):
        """Retrieve from cache."""
        pass

    async def set_cached(self, key: str, value, ttl: int = None):
        """Store in cache."""
        pass

Use in components:

class CachedAPIComponent(FlowComponent, HTTPService, CacheInterface):
    """API component with caching."""

    async def run(self):
        cache_key = self.get_cache_key(self.url)

        # Try cache first
        if self.cache_enabled:
            cached_result = await self.get_cached(cache_key)
            if cached_result:
                return cached_result

        # Fetch from API
        result = await self.request('GET', self.url)

        # Cache result
        if self.cache_enabled:
            await self.set_cached(cache_key, result, self.cache_ttl)

        return result

Best Practices

  1. Single Responsibility: Each interface should handle one specific concern
  2. Composition over Inheritance: Use multiple interfaces rather than deep inheritance
  3. Configuration: Make interface behavior configurable via YAML
  4. Documentation: Include interface usage in component docstrings
  5. Testing: Test interface combinations to ensure compatibility

Browse the API Reference for complete interface documentation.