Examples¶
Real-world examples of FlowTask workflows and configurations.
Basic Examples¶
Data Processing Pipeline¶
name: Sales Data Processing
description: Process daily sales CSV and generate Excel report
steps:
- OpenWithPandas:
mime: "text/csv"
filename: daily_sales.csv
directory: "/data/input"
trim: true
- TransformRows:
operation: "calculate_totals"
columns:
- amount
- tax
- total
- PandasToFile:
filename: "/data/output/sales_report_{today}.xlsx"
mime: "application/vnd.ms-excel"
masks:
today:
- today
- mask: "%Y%m%d"
Web Scraping to Database¶
name: Company Information Scraper
description: Scrape company data and save to database
steps:
- CompanyScraper:
use_proxies: true
paid_proxy: false
sources:
- "https://example.com/companies"
selectors:
name: ".company-name"
industry: ".industry"
revenue: ".revenue"
- DatabaseOutput:
table: "companies"
schema: "public"
credentials:
dsn: "postgresql://user:pass@localhost/db"
upsert: true
conflict_columns: ["name"]
Advanced Examples¶
Multi-Source Data Integration¶
name: Customer 360 Integration
description: Combine customer data from multiple sources
environment: production
timeout: 7200
steps:
# Load from CRM
- DatabaseConnector:
query: "SELECT * FROM customers WHERE updated_at >= NOW() - INTERVAL '1 day'"
credentials:
dsn: "${CRM_DATABASE_URL}"
as_dataframe: true
# Load from Support System
- HTTPConnector:
url: "${SUPPORT_API_URL}/customers"
headers:
Authorization: "Bearer ${SUPPORT_API_TOKEN}"
Content-Type: "application/json"
method: "GET"
as_dataframe: true
# Merge datasets
- JoinData:
left_data: "{{ steps[0].output }}"
right_data: "{{ steps[1].output }}"
left_on: "customer_id"
right_on: "id"
how: "left"
# Generate customer profiles
- TransformRows:
operation: "create_profile"
profile_fields:
- demographics
- purchase_history
- support_tickets
# Save to data warehouse
- PgVectorOutput:
table: "customer_profiles"
schema: "analytics"
credentials:
dsn: "${WAREHOUSE_DATABASE_URL}"
embedding_model:
model: "sentence-transformers/all-MiniLM-L6-v2"
model_type: "transformers"
create_table:
create: true
use_uuid: true
Automated Report Generation¶
name: Monthly Financial Report
description: Generate PDF financial reports with charts
steps:
# Extract financial data
- DatabaseConnector:
query: |
SELECT
DATE_TRUNC('month', transaction_date) as month,
SUM(amount) as total_amount,
COUNT(*) as transaction_count,
account_type
FROM transactions
WHERE transaction_date >= DATE_TRUNC('month', CURRENT_DATE - INTERVAL '12 months')
GROUP BY month, account_type
ORDER BY month, account_type
credentials:
dsn: "${FINANCE_DB_URL}"
# Create visualizations
- ChartGenerator:
chart_type: "line"
x_column: "month"
y_column: "total_amount"
group_by: "account_type"
title: "Monthly Revenue by Account Type"
output_file: "/tmp/revenue_chart.png"
# Generate PDF report
- PDFGenerator:
template: "financial_report.html"
output_file: "financial_report_{month}_{year}"
directory: "/reports/monthly"
config:
css:
- "reports.css"
- "charts.css"
masks:
month:
- today
- mask: "%m"
year:
- today
- mask: "%Y"
# Email to stakeholders
- EmailSender:
to:
- "cfo@company.com"
- "finance-team@company.com"
subject: "Monthly Financial Report - {month}/{year}"
template: "financial_report_email.html"
attachments:
- "{{ steps[2].output }}"
credentials:
smtp_host: "${SMTP_HOST}"
smtp_port: 587
username: "${SMTP_USERNAME}"
password: "${SMTP_PASSWORD}"
Event-Driven Workflow¶
name: Order Processing Workflow
description: Process new orders automatically via webhook
trigger:
type: webhook
path: "/hooks/new-order"
steps:
# Validate order data
- DataValidator:
schema:
customer_id:
type: integer
required: true
items:
type: array
required: true
total_amount:
type: number
required: true
minimum: 0
# Check inventory
- DatabaseConnector:
query: |
SELECT product_id, available_quantity
FROM inventory
WHERE product_id IN ({{ order.items | map('product_id') | join(',') }})
credentials:
dsn: "${INVENTORY_DB_URL}"
# Update inventory
- DatabaseOutput:
query: |
UPDATE inventory
SET available_quantity = available_quantity - {{ item.quantity }}
WHERE product_id = {{ item.product_id }}
table: "inventory"
credentials:
dsn: "${INVENTORY_DB_URL}"
# Generate invoice
- PDFGenerator:
template: "invoice.html"
output_file: "invoice_{order_id}"
directory: "/invoices/{year}/{month}"
# Send confirmation email
- EmailSender:
to: "{{ order.customer_email }}"
subject: "Order Confirmation - #{{ order.id }}"
template: "order_confirmation.html"
attachments:
- "{{ steps[3].output }}"
# Log to audit trail
- DatabaseOutput:
table: "order_audit"
data:
order_id: "{{ order.id }}"
action: "processed"
timestamp: "{{ now() }}"
workflow_id: "{{ workflow.id }}"
Task Organization Examples¶
Multi-Environment Setup¶
tasks/
├── programs/
│ ├── sales/
│ │ ├── tasks/
│ │ │ ├── daily_report.yaml
│ │ │ ├── weekly_summary.yaml
│ │ │ └── monthly_analysis.yaml
│ ├── marketing/
│ │ ├── tasks/
│ │ │ ├── lead_scoring.yaml
│ │ │ └── campaign_analysis.yaml
│ └── finance/
│ ├── tasks/
│ │ ├── reconciliation.yaml
│ │ └── reporting.yaml
Environment-Specific Configuration¶
# Run with development environment
ENV=dev task --program=sales --task=daily_report
# Run with production environment
ENV=production task --program=sales --task=daily_report
Environment files:
# .env.dev
DATABASE_URL=postgresql://localhost/sales_dev
API_BASE_URL=https://api-dev.company.com
LOG_LEVEL=DEBUG
# .env.production
DATABASE_URL=postgresql://prod-db:5432/sales_prod
API_BASE_URL=https://api.company.com
LOG_LEVEL=INFO
Programmatic Examples¶
Python Integration¶
import asyncio
from flowtask import Task
from flowtask.scheduler import TaskScheduler
async def main():
# Single task execution
task = Task(program='sales', task='daily_report')
result = await task.run()
print(f"Task completed: {result}")
# Scheduled execution
scheduler = TaskScheduler()
# Schedule daily at 9 AM
await scheduler.schedule_task(
program='sales',
task='daily_report',
cron='0 9 * * *'
)
# Run scheduler
await scheduler.start()
if __name__ == '__main__':
asyncio.run(main())
Custom Component Development¶
from flowtask.components.flow import FlowComponent
from flowtask.interfaces import HTTPService, PandasDataframe
class CustomAPIComponent(FlowComponent, HTTPService, PandasDataframe):
"""
Custom API integration component.
Example:
```yaml
CustomAPIComponent:
api_endpoint: "https://api.example.com/data"
api_key: "${API_KEY}"
as_dataframe: true
timeout: 60
```
"""
def __init__(self, **kwargs):
self.api_endpoint = kwargs.get('api_endpoint')
self.api_key = kwargs.get('api_key')
super().__init__(**kwargs)
async def start(self):
if not self.api_endpoint:
raise ValueError("api_endpoint is required")
return True
async def run(self):
headers = {"Authorization": f"Bearer {self.api_key}"}
response = await self.request('GET', self.api_endpoint, headers=headers)
if self.as_dataframe:
return self.to_dataframe(response.json())
return response.json()
Integration Examples¶
Docker Deployment¶
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
EXPOSE 5000
CMD ["python", "run.py"]
# docker-compose.yml
version: '3.8'
services:
flowtask:
build: .
ports:
- "5000:5000"
environment:
- DATABASE_URL=postgresql://postgres:password@db:5432/flowtask
- REDIS_URL=redis://redis:6379
depends_on:
- db
- redis
db:
image: postgres:17
environment:
POSTGRES_DB: flowtask
POSTGRES_USER: postgres
POSTGRES_PASSWORD: password
volumes:
- postgres_data:/var/lib/postgresql/data
redis:
image: redis:6-alpine
volumes:
postgres_data:
These examples demonstrate FlowTask's flexibility and power in handling various data processing scenarios. Adapt them to your specific use cases and requirements.