Skip to content

Events

flowtask.events

FlowTask Events.

Event System for Flowtask.

NotifyEvent

NotifyEvent(*args, event='done', **kwargs)

Bases: AbstractEvent

Using Notify to send notifications for task Execution.

events

FlowTask Events.

Event System for Flowtask.

abstract

AbstractEvent
AbstractEvent(*args, **kwargs)

Bases: MaskSupport, LogSupport, LocaleSupport, ABC

Abstract Event Class.

This class is the base class for all events in FlowTask.

__call__ abstractmethod async
__call__()

Called when event is dispatched.

get_env_value
get_env_value(key, default=None)

Retrieves a value from the environment variables or the configuration.

:param key: The key for the environment variable. :param default: The default value to return if the key is not found. :return: The value of the environment variable or the default value.

alerts

Alert
Alert(*args, **kwargs)

Bases: Notification, AbstractEvent

get_function
get_function(payload)

Get the function name, function object and parameters from the payload.

recursive_lookup
recursive_lookup(d, target_key)

Recursively finds the first dictionary that contains a given key inside a nested dictionary.

:param d: Dictionary to search :param target_key: Key to find ("result" by default) :return: The dictionary containing the key, or None if not found

colfunctions
average
average(df, desc, column_name, threshold, deviation=2, allow_below=False, allow_above=False)

average.

Parameters:

Name Type Description Default
df DataFrame

Dataframe.

required
desc Any

Description of the DataFrame.

required
colname str

Column Name.

required
threshold Union[int, float]

Threshold value.

required
deviation Union[int, float]

percent of deviation from the threshold

2
allow_below bool

how many percent below the threshold is allowed

False
allow_above bool

how many percent above the threshold is allowed

False

Returns:

Type Description
tuple
between
between(df, desc, column_name, values)

Check if the values in a DataFrame column are between the given min and max values.

Args: - df (pd.DataFrame): The DataFrame to check. - desc (Any): The description (usually from df.describe()) of the DataFrame. - column_name (str): The name of the column to check. - values (tuple): A tuple containing the (min, max) values.

column_size
column_size(df, desc, column_name, min_length, max_length)

Check if all values in a string column have lengths within the specified range.

Parameters:

Name Type Description Default
df DataFrame

The DataFrame to check.

required
desc Any

Ignored, used for compatibility.

required
column_name str

The name of the column to check.

required
min_length int

The minimum length allowed for strings.

required
max_length int

The maximum length allowed for strings.

required

Returns:

Name Type Description
tuple tuple

(column_name, min_length, max_length, True/False)

count_nulls
count_nulls(df, desc, column_name, value)

Check if the number of non-null values in a column is greater than a given threshold.

Parameters:

Name Type Description Default
- df (pd.DataFrame

The DataFrame to check.

required
- desc (Any

The description (usually from df.describe()) of the DataFrame.

required
- column_name (str

The name of the column to check.

required
- min_length (int

The minimum number of non-null values required.

required

Returns:

Name Type Description
tuple tuple

(min_length, True/False)

equal
equal(df, desc, column_name, values)

Check if all values in a DataFrame column are within the provided list of strings.

Args: - df (pd.DataFrame): The DataFrame to check. - desc (Any): The description (usually from df.describe()) of the DataFrame. - column_name (str): The name of the column to check. - values (tuple): A tuple containing the allowed strings.

not_null
not_null(df, desc, column_name)

Check if a DataFrame column contains only non-null values.

Parameters:

Name Type Description Default
- df (pd.DataFrame

The DataFrame to check.

required
- desc (Any

The description (usually from df.describe()) of the DataFrame.

required
- column_name (str

The name of the column to check.

required

Returns:

Name Type Description
tuple

(column_name, True/False)

functions
average
average(data, column, threshold, deviation, allow_below=False)

average.

Calculates the average of a value compared with a threshold. Args: data (dict): extract column from data. column (str): column to calculate. threshold (float): value to be used for threshold deviation (float): max deviation acceptable for threshold allow_below (bool): if True, the threshold is not evaluated on minimum values.

between
between(data, column, value)

Checks if the actual value in the specified column is between two given values.

Parameters:

Name Type Description Default
data dict

Dictionary containing the column data.

required
column str

Column name to evaluate.

required
value list

A list containing two values: [min_value, max_value].

required

Returns:

Name Type Description
tuple Tuple[str, Union[int, float], bool]

(column, actual_value, True/False)

equal
equal(data, column, value)

Check if the actual value on a specified column in the data is equal to the given threshold value.

gt
gt(data, column, value)

Check if the actual value on a specified column in the data is greater than to the given threshold value.

has_columns
has_columns(data, column='columns', value=[])

Check if the actual value on a specified column in the data is equal to the given threshold value.

lt
lt(data, column, value)

Check if the actual value on a specified column in the data is less than to the given threshold value.

max_value
max_value(data, column, value)

Checks if the actual value of a specified column in the data is less than or equal to the given threshold value.

Parameters:

Name Type Description Default
data dict

Dictionary containing the data to be checked.

required
column str

Name of the column in the data whose value needs to be checked.

required
value Union[int, float]

The threshold value. The actual value in the data

required

Returns:

Name Type Description
tuple Tuple[str, Union[int, float], bool]

A tuple containing: - column (str): Name of the column that was checked. - actual_value (Union[int, float]): The actual value from the data for the specified column. - val (bool): True if the actual value is less than or equal to the threshold, False otherwise.

min_value
min_value(data, column, value)

Checks if the actual value of a specified column in the data is greater than or equal to the given threshold value.

Parameters:

Name Type Description Default
data dict

Dictionary containing the data to be checked.

required
column str

Name of the column in the data whose value needs to be checked.

required
value Union[int, float]

The threshold value. The actual value in the data should be greater than or equal to this.

required

Returns:

Name Type Description
tuple Tuple[str, Union[int, float], bool]

A tuple containing: - column (str): Name of the column that was checked. - actual_value (Union[int, float]): The actual value from the data for the specified column. - val (bool): True if the actual value is greater than or equal to the threshold, False otherwise.

missing_columns
missing_columns(data, column='columns', value=[])

Check if all required columns exist in the 'columns' field of the given data.

:param data: Dictionary containing the structure with "columns". :param column: Name of the column to check (by default: "columns"). :param value: List of columns to check. :return: Tuple (checked_key, missing_columns, bool)

exec

LogExecution
LogExecution(*args, **kwargs)

Bases: AbstractEvent

LogExecution.

Log the execution of a Task into a InfluxDB measurement bucket.

interfaces

ClientInterface
ClientInterface(*args, **kwargs)

Bases: CredentialsInterface

close abstractmethod async
close(timeout=5)

close. Closing the connection.

open abstractmethod async
open(credentials, **kwargs)

open. Starts (open) a connection to an external resource.

client
ClientInterface
ClientInterface(*args, **kwargs)

Bases: CredentialsInterface

close abstractmethod async
close(timeout=5)

close. Closing the connection.

open abstractmethod async
open(credentials, **kwargs)

open. Starts (open) a connection to an external resource.

jira

Jira
Jira(*args, **kwargs)

Bases: AbstractEvent

Jira.

Jira Event to create a new Ticket on Error/Exception.

notify_event

NotifyEvent
NotifyEvent(*args, event='done', **kwargs)

Bases: AbstractEvent

Using Notify to send notifications for task Execution.

sendfile

SendFile
SendFile(*args, **kwargs)

Bases: ClientInterface, AbstractEvent

close async
close()

close. Closing the connection.

open async
open()

open. Starts (open) a connection to an external resource.

manager

EventManager

EventManager(name=None)

Basic Event Manager of flowtask.

This manager allows for dynamic loading and dispatching events based on a provided payload. Each event can have multiple actions, and actions can be loaded dynamically from modules.

Event
Event(functions, event_name)
LoadEvents
LoadEvents(event_payload=None)

Load all events and their associated actions based on the provided payload.

The payload should be a dictionary where each key is an event name and the associated value is a list of action dictionaries. Each action dictionary should have a single key (the action name) and a value that is a dictionary of parameters for that action.

Example payload: { "completed": [ { "Dummy": { "message": "Finished" } } ], ... }

Parameters:

Name Type Description Default
event_payload dict

The event payload dictionary.

None

Raises:

Type Description
RuntimeError

If there's an error loading an action.

addEvent classmethod
addEvent(**kwargs)

addEvent( event1 = [f1,f2,...], event2 = [g1,g2,...], ... ) creates events using **kwargs to create any number of events. Each event recieves a list of functions, where every function in the list recieves the same parameters. Example: def hello(): print("Hello ") def world(): print("World")

EventManager.addEvent( salute = [hello] ) EventManager.salute += world

EventManager.salute()

Output: Hello World