Metadata-Version: 2.1
Name: awehflow
Version: 2.4.3.0
Summary: Configuration based Apache Airflow
Home-page: UNKNOWN
Author: Philip Perold
Author-email: philip@spatialedge.co.za
License: UNKNOWN
Platform: UNKNOWN
Classifier: Programming Language :: Python :: 3
Classifier: Operating System :: OS Independent
Requires-Python: >=3.6
Description-Content-Type: text/markdown
Provides-Extra: default
Provides-Extra: composer
License-File: LICENSE

# Awehflow

![coverage report](https://gitlab.com/spatialedge/awehflow/badges/master/coverage.svg)
![pipeline status](https://gitlab.com/spatialedge/awehflow/badges/master/pipeline.svg)

Configuration based Airflow pipelines with metric logging and alerting out the box.

## Prerequisites

You will need the following to run this code:
  * Python 3

## Installation

```
pip install awehflow[default]
```

If you are installing on Google Cloud Composer with Airflow 1.10.2:

```
pip install awehflow[composer]
```

### Event & metric tables
Create a `postgresql` database that can be referenced via Airflow connection. In the DB create the following tables
  - Jobs data table
    ```sql
    CREATE TABLE public.jobs (
      id serial4 NOT NULL,
      run_id varchar NOT NULL,
      dag_id varchar NULL,
      "name" varchar NULL,
      project varchar NULL,
      status varchar NULL,
      engineers json NULL,
      error json NULL,
      start_time timestamptz NULL,
      end_time timestamptz NULL,
      reference_time timestamptz NULL,
      CONSTRAINT job_id_pkey PRIMARY KEY (id),
      CONSTRAINT run_id_dag_id_unique UNIQUE (run_id, dag_id)
    );
    ```

  - Task metrics table
    ```sql
    CREATE TABLE public.task_metrics (
      id serial4 NOT NULL,
      run_id varchar NULL,
      dag_id varchar NULL,
      task_id varchar NULL,
      job_name varchar NULL,
      value json NULL,
      created_time timestamptz NULL,
      reference_time timestamptz NULL,
      CONSTRAINT task_metrics_id_pkey PRIMARY KEY (id)
    );
    ```

  - Data metrics table
    ```sql
    CREATE TABLE public.data_metrics (
      id serial4 NOT NULL,
      platform varchar NULL,
      "source" varchar NULL,
      "key" varchar NULL,
      value json NULL,
      reference_time timestamptz NULL,
      CONSTRAINT data_metrics_pkey PRIMARY KEY (id),
      CONSTRAINT unique_metric UNIQUE (platform, source, key, reference_time)
    );
    ```

## Usage

Usage of `awehflow` can be broken up into two parts: bootstrapping and configuration of pipelines

### Bootstrap

In order to expose the generated pipelines (`airflow` _DAGs_) for `airflow` to pick up when scanning for _DAGs_, one has to create a `DagLoader` that points to a folder where the pipeline configuration files will be located:

```python
import os

from awehflow.alerts.slack import SlackAlerter
from awehflow.core import DagLoader
from awehflow.events.postgres import PostgresMetricsEventHandler

"""airflow doesn't pick up DAGs in files unless 
the words 'airflow' and 'DAG' features"""

configs_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'configs')

metrics_handler = PostgresMetricsEventHandler(jobs_table='jobs', task_metrics_table='task_metrics')

slack_alerter = SlackAlerter(channel='#airflow')

loader = DagLoader(
    project="awehflow-demo",
    configs_path=configs_path,
    event_handlers=[metrics_handler],
    alerters=[slack_alerter]
)

dags = loader.load(global_symbol_table=globals())
```

As seen in the code snippet, one can also pass in _"event handlers"_ and _"alerters"_ to perform actions on certain pipeline events and potentially alert the user of certain events on a given channel. See the sections below for more detail.
The global symbol table needs to be passed to the `loader` since `airflow` scans it for objects of type `DAG`, and then synchronises the state with its own internal state store.

\*_caveat_: `airflow` ignores `python` files that don't contain the words _"airflow"_ and _"DAG"_. It is thus advised to put those words in a comment to ensure the generated _DAGs_ get picked up when the `DagBag` is getting filled.

#### Event Handlers

As a pipeline generated using `awehflow` is running, certain events get emitted. An event handler gives the user the option of running code when these events occur.

The following events are (potentially) potentially emitted as a pipeline runs:

* `start`
* `success`
* `failure`
* `task_metric`

Existing event handlers include:

* `PostgresMetricsEventHandler`: persists pipeline metrics to a Postgres database
* `PublishToGooglePubSubEventHandler`: events get passed straight to a Google Pub/Sub topic

An `AlertsEventHandler` gets automatically added to a pipeline. Events get passed along to registered alerters.

#### Alerters

An `Alerter` is merely a class that implements an `alert` method. By default a `SlackAlerter` is configured in the `dags/PROJECT/bootstrap.py` file of an awehflow project.  awehflow supports the addition of multiple alerters, which allows success or failure events to be sent to mutliple channels

##### YAML configuration
In order to add alerts to an awehflow DAG add the following to the root space of the configuration
```YAML
alert_on:
  - 'failure' # Send out a formatted message if a task in the DAG fails. This is optional
  - 'success' # Send out a formatted message once the DAG completes successfully. This is optional
```

##### Available alerters

###### `SlackAlerter` - `awehflow.alerts.slack.SlackAlerter`
Sends an alert to a specified slack channel via the Slack webhook functionality

- Parameters
  - `channel` - The name of the channel that the alerts should be sent to
  - `slack_conn_id` - The name of the airflow connection that contains the token information, default: `slack_default`
- Connection requirements - Create a HTTP connection with the name specified for `slack_conn_id`, the required HTTP fields are:
  - `password` - The slack token issued by your admin team, which allows for the sending of messages via the slack python API


##### `GoogleChatAlerter` - `awehflow.alerts.googlechat.GoogleChatAlerter`
Sends an alert to the configured Google Chat space
- Parameters
  - `gchat_conn_id` - The name of the airflow connection that contains the GChat space information, default: `gchat_default`
- Connection requirements - Create a HTTP connection with the name specified for the `gchat_conn_id`, the requried HTTP fields are:
  - `host` - The GChat spaces URL `https://chat.googleapis.com/v1/spaces`
  - `password` - The GChat spaces key configuration information, ex `https://chat.googleapis.com/v1/spaces/SPACES_ID?key=SPACES_KEY`
    - `SPACES_ID` - Should be supplied by your GChat admin team
    - `SPACES_KEY` - Should be supplied by your GChat admin team


### Configuration
Awehflow configuration files can be written as .yml OR .hocon files either formats are supported

Shown below is sample hocon configuration file
  ```h
  {
    name: my_first_dag,
    start_date: 2022-01-01,
    catchup: true,
    schedule: "10 0 * * *",
    version: 1,
    alert_on:[
      success,
      failure
    ],
    params: {
      default: {
        source_folder: /tmp
      },
      production: {
        source_folder: /data
      }
    },
    default_dag_args: {
      retries: 1
    },
    dependencies: [
      {
        id: 'ping_sensor'
        operator: 'airflow.sensors.bash.BashSensor'
        params: {
          bash_command: 'echo ping'
          mode: 'reschedule'
        }
      }
    ],
    tasks: [
        {
          id: first_dummy_task,
          operator: airflow.operators.dummy.DummyOperator,
        },
        {
          id: first_bash_task,
          operator: airflow.operators.bash.BashOperator,
          params: {
            bash_command: 'echo "Hello World"'
          },
          upstream: [
            first_dummy_task
          ]
        }
      ]
  }
  ```
This configuration does the following:
  - Creates a DAG called `my_first_dag`
    - Scheduled to run daily 10min past midnight
    - Catchup has been enabled to ensure all runs of the DAG since 2022-01-01 are executed
  - Dependancies
    - First check if the command `echo ping` succeeds
  - Tasks
    - First run a dummy task that does nothing
    - If the dummy task succeeds, execute the bash command


## Running the tests

Tests may be run with
```bash
python -m unittest discover tests
```

or to run code coverage too:

```bash
coverage run -m unittest discover tests && coverage html
```



