Metadata-Version: 2.1
Name: kstreams
Version: 0.5.3
Summary: Build simple kafka streams applications
License: Apache-2.0
Keywords: stream,processing,kafka,event streaming
Author: Marcos Schroh
Author-email: marcos.schroh@kpn.com
Requires-Python: >=3.8,<4.0
Classifier: Framework :: AsyncIO
Classifier: Intended Audience :: Developers
Classifier: License :: OSI Approved :: Apache Software License
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.8
Classifier: Programming Language :: Python :: 3.9
Classifier: Programming Language :: Python :: Implementation :: CPython
Classifier: Topic :: Software Development :: Libraries :: Application Frameworks
Classifier: Topic :: System :: Distributed Computing
Classifier: Topic :: System :: Networking
Requires-Dist: PyYAML (>=5.4.1,<6.0.0)
Requires-Dist: aiokafka (<1.0)
Requires-Dist: future (>=0.18.2,<0.19.0)
Requires-Dist: prometheus-client (<1.0)
Requires-Dist: pydantic (>=1.9.0,<2.0.0)
Description-Content-Type: text/markdown

# Kstreams

`kstreams` is a library/micro framework to use with `kafka`. It has simple kafka streams implementation that gives certain guarantees, see below.

![Build status](https://github.com/kpn/kstreams/actions/workflows/pr-tests.yaml/badge.svg?branch=master)
[![codecov](https://codecov.io/gh/kpn/kstreams/branch/master/graph/badge.svg?token=t7pxIPtphF)](https://codecov.io/gh/kpn/kstreams)
![python version](https://img.shields.io/badge/python-3.8%2B-yellowgreen)

---

**Documentation**: https://kpn.github.io/kstreams/

---

## Installation

```bash
pip install kstreams
```

You will need a worker, we recommend [aiorun](https://github.com/cjrh/aiorun)

```bash
pip install aiorun
```

## Usage

```python
import aiorun
from kstreams import create_engine, Stream


stream_engine = create_engine(title="my-stream-engine")

@stream_engine.stream("local--kstream")
async def consume(stream: Stream):
    async for cr in stream:
        print(f"Event consumed: headers: {cr.headers}, payload: {cr.value}")


async def produce():
    payload = b'{"message": "Hello world!"}'

    for i in range(5):
        metadata = await create_engine.send("local--kstreams", value=payload)
        print(f"Message sent: {metadata}")


async def start():
    await stream_engine.start()
    await produce()


async def shutdown(loop):
    await stream_engine.stop()


if __name__ == "__main__":
    aiorun.run(start(), stop_on_unhandled_errors=True, shutdown_callback=shutdown)
```

## Features

- [x] Produce events
- [x] Consumer events with `Streams`
- [x] `Prometheus` metrics and custom monitoring
- [x] TestClient
- [x] Custom Serialization and Deserialization
- [x] Easy to integrate with any `async` framework. No tied to any library!!
- [x] Yield events from streams
- [ ] Store (kafka streams pattern)
- [ ] Stream Join
- [ ] Windowing

## Development

This repo requires the use of [poetry](https://python-poetry.org/docs/basic-usage/) instead of pip.
*Note*: If you want to have the `virtualenv` in the same path as the project first you should run `poetry config --local virtualenvs.in-project true`

To install the dependencies just execute:

```bash
poetry install
```

Then you can activate the `virtualenv` with

```bash
poetry shell
```

Run test:

```bash
./scripts/test
```

Run code formatting (`black` and `isort`)

```bash
./scripts/format
```

### Commit messages

We use [conventional commits](https://www.conventionalcommits.org/en/v1.0.0/) for the commit message.

The use of [commitizen](https://commitizen-tools.github.io/commitizen/) is recommended. Commitizen is part of the dev dependencies.

```bash
cz commit
```

