Metadata-Version: 2.4
Name: dataforge-sdk
Version: 10.1.0rc29
Summary: SDK for creating DataForge extensions
Author-email: Vadim Orlov <vorlov@dataforgelabs.com>
Project-URL: Homepage, https://docs.dataforgelabs.com
Project-URL: Issues, https://docs.dataforgelabs.com/hc/en-us/requests/new
Requires-Python: >=3.10
Description-Content-Type: text/markdown
Provides-Extra: psycopg2
Requires-Dist: psycopg2-binary>=2.9; extra == "psycopg2"

# dataforge-sdk
SDK for creating DataForge extensions.

## Postgres Utilities

The `dataforge.pg` module provides helper functions to execute SQL operations against the DataForge Postgres metastore:

```python
from dataforge.pg import select, update, pull

# Execute a SELECT query and return a Spark DataFrame
df = select("SELECT * FROM my_table")

# Execute an UPDATE/INSERT/DELETE query
update("UPDATE my_table SET col = 'value'")

# Trigger a new data pull for source_id 123
pull(123)
```

## IngestionSession

The `IngestionSession` class manages a custom data ingestion process lifecycle.

```python
from dataforge import IngestionSession

# Initialize a session (production use)
session = IngestionSession()

# Initialize a session (optional source_name/project_name for testing)
session = IngestionSession(source_name="my_source", project_name="my_project")

# Ingest data 
# pass a function returning a DataFrame (recommended to integrate logging with DataForge)
session.ingest(lambda: spark.read.csv("s3://bucket/path/input.csv"))

# pass a DataFrame (can be used for testing, not recommended for production deployment)
df = spark.read.csv("s3://bucket/path/input.csv")
session.ingest(df)

# ingest empty dataframe to create 0-record input
session.ingest()


# Fail the process with error message
session.fail("Error message")

# Retrieve latest tracking fields
tracking = session.latest_tracking_fields()

# Retrieve connection parameters for the current source
connection_parameters = session.connection_parameters()

# Retrieve custom parameters for the current source
custom_parameters = session.custom_parameters()

```

## ParsingSession

The `ParsingSession` class manages a custom parse process lifecycle.

```python
from dataforge import ParsingSession

# Initialize a session (production use)
session = ParsingSession()

# Initialize a session (optional input_id for testing)
session = ParsingSession(input_id=123)

# Retrieve custom parameters
params = session.custom_parameters()

# Get the path of file to be parsed
path = session.file_path

# Run parsing: pass a DataFrame, a function returning a DataFrame or None (0-record file)
session.run(lambda: spark.read.json(session.file_path))

# Fail the process with error message
session.fail("Error message")

```

## PostOutputSession

The `PostOutputSession` class manages a custom post-output process lifecycle.

```python
from dataforge import PostOutputSession

# Initialize a session (production use)
session = PostOutputSession()

# Initialize a session (optional names for testing)
session = PostOutputSession(output_name="report", output_source_name="my_source", project_name="my_project")


# Get the path of file generated by preceding output process
path = session.file_path()

# Retrieve connection parameters for the current output
connection_parameters = session.connection_parameters()

# Retrieve custom parameters for the current source
custom_parameters = session.custom_parameters()

# Run post-output logic: pass a function encapsulating custom code
session.run(lambda: print(f"Uploading file from {path}"))

# Fail the process with error message
session.fail("Error message")
```
