Metadata-Version: 2.4
Name: aws-util
Version: 0.4.0
Summary: A utility library for common AWS services
Author-email: Masrik Dahir <info@masrikdahir.com>
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.10
Classifier: Programming Language :: Python :: 3.11
Classifier: Programming Language :: Python :: 3.12
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Intended Audience :: Developers
Requires-Python: >=3.10
Description-Content-Type: text/markdown
Requires-Dist: boto3
Requires-Dist: pydantic>=2.0
Requires-Dist: cryptography>=42.0
Provides-Extra: dev
Requires-Dist: build; extra == "dev"
Requires-Dist: twine; extra == "dev"
Requires-Dist: taskipy; extra == "dev"
Requires-Dist: pytest; extra == "dev"
Requires-Dist: pytest-cov; extra == "dev"
Requires-Dist: black; extra == "dev"
Requires-Dist: ruff; extra == "dev"
Requires-Dist: mypy; extra == "dev"
Requires-Dist: boto3-stubs[acm,athena,bedrock,bedrock-runtime,cloudformation,cloudwatch,cognito-idp,comprehend,dynamodb,ec2,ecr,ecs,events,firehose,glue,iam,kinesis,kms,lambda,logs,rds,rekognition,route53,s3,secretsmanager,ses,sns,sqs,ssm,stepfunctions,sts,textract,translate]; extra == "dev"
Requires-Dist: moto[acm,athena,cloudformation,cloudwatch,cognito-idp,dynamodb,ec2,ecr,ecs,events,firehose,glue,iam,kinesis,kms,lambda,logs,rds,route53,s3,secretsmanager,ses,sns,sqs,ssm,stepfunctions,sts]; extra == "dev"

# aws-util

A comprehensive Python utility library for **32+ AWS services**. Every module provides clean, typed helper functions backed by Pydantic data models, `lru_cache`-powered boto3 clients, automatic pagination, built-in `wait_for_*` polling helpers, and **complex multi-step utilities** for real-world workflows. Includes dedicated multi-service orchestration modules for config loading, deployments, alerting, and data pipelines.

## Installation

```bash
pip install aws-util
```

## Requirements

- Python 3.10+
- `boto3`
- `pydantic >= 2.0`
- `cryptography >= 42.0` (for KMS envelope encryption)
- AWS credentials configured (environment variables, IAM role, or `~/.aws/credentials`)

---

## Service Coverage

| # | Module | AWS Service | Key Utilities |
|---|---|---|---|
| 1 | `placeholder` | SSM + Secrets Manager | `retrieve` — resolves `${ssm:...}` / `${secret:...}` placeholders |
| 2 | `parameter_store` | SSM Parameter Store | `get_parameter`, `put_parameter`, `delete_parameter`, **`get_parameters_by_path`**, **`get_parameters_batch`** |
| 3 | `secrets_manager` | Secrets Manager | `get_secret`, **`create_secret`**, **`update_secret`**, **`delete_secret`**, **`list_secrets`**, **`rotate_secret`** |
| 4 | `s3` | S3 | upload, download, list, copy, delete, presigned URL, **`delete_prefix`**, **`move_object`**, **`batch_copy`**, **`download_as_text`**, **`get_object_metadata`** |
| 5 | `dynamodb` | DynamoDB | get, put, update, delete, query, scan, batch |
| 6 | `sqs` | SQS | send, receive, delete, batch, purge, **`send_large_batch`**, **`wait_for_message`**, **`drain_queue`**, **`replay_dlq`** |
| 7 | `sns` | SNS | publish, publish_batch, **publish_fan_out**, **create_topic_if_not_exists** |
| 8 | `lambda_` | Lambda | invoke, invoke_async |
| 9 | `cloudwatch` | CloudWatch Metrics + Logs | put_metric, put_log_events, get_log_events |
| 10 | `sts` | STS | get_caller_identity, get_account_id, assume_role, **assume_role_session** |
| 11 | `eventbridge` | EventBridge | put_event, put_events, **put_events_chunked**, **list_rules** |
| 12 | `kms` | KMS | encrypt, decrypt, generate_data_key, **envelope_encrypt**, **envelope_decrypt**, **re_encrypt** |
| 13 | `ec2` | EC2 | describe, start, stop, reboot, terminate, create_image |
| 14 | `rds` | RDS | describe, start, stop, create/delete snapshots |
| 15 | `ecs` | ECS | run_task, stop_task, describe, list, update_service |
| 16 | `ecr` | ECR | get_auth_token, list_repositories, list_images, **ensure_repository**, **get_latest_image_tag** |
| 17 | `iam` | IAM | create/delete/list roles, attach/detach policies, list users, **create_role_with_policies**, **ensure_role** |
| 18 | `cognito` | Cognito User Pools | create/get/delete user, list users, set password, auth, **get_or_create_user**, **bulk_create_users**, **reset_user_password** |
| 19 | `route53` | Route 53 | list_hosted_zones, upsert_record, delete_record, **wait_for_change**, **bulk_upsert_records** |
| 20 | `acm` | ACM | list, describe, request, delete certificates, **wait_for_certificate**, **find_certificate_by_domain** |
| 21 | `stepfunctions` | Step Functions | start, describe, stop, list, wait_for_execution, **run_and_wait**, **get_execution_history** |
| 22 | `cloudformation` | CloudFormation | create, update, delete, describe, get_outputs, wait, **deploy_stack**, **get_export_value** |
| 23 | `kinesis` | Kinesis Data Streams | put_record, put_records, get_records, describe_stream, **consume_stream** |
| 24 | `firehose` | Kinesis Firehose | put_record, put_record_batch, list_delivery_streams, **put_record_batch_with_retry** |
| 25 | `ses` | SES | send_email, send_templated_email, send_raw_email, **send_with_attachment**, **send_bulk** |
| 26 | `glue` | Glue | start_job_run, get_job_run, list_jobs, wait_for_job_run, **run_job_and_wait**, **stop_job_run** |
| 27 | `athena` | Athena | start_query, get_results, run_query, **get_table_schema**, **run_ddl** |
| 28 | `bedrock` | Bedrock | invoke_model, invoke_claude, invoke_titan_text, **chat**, **embed_text**, **stream_invoke_claude** |
| 29 | `rekognition` | Rekognition | detect_labels, detect_faces, detect_text, compare_faces, **create_collection**, **index_face**, **search_face_by_image**, delete_collection, **ensure_collection** |
| 30 | `textract` | Textract | detect_document_text, analyze_document, async jobs, **extract_text**, **extract_tables**, **extract_form_fields**, **extract_all** |
| 31 | `comprehend` | Comprehend | detect_sentiment, detect_entities, detect_key_phrases, detect_pii, **analyze_text**, **redact_pii**, **batch_detect_sentiment** |
| 32 | `translate` | Translate | translate_text, list_languages, **translate_batch** |
| 33 | `config_loader` | SSM + Secrets Manager *(multi-service)* | **`load_app_config`** (concurrent), **`resolve_config`** (placeholder expansion), **`get_db_credentials`**, **`get_ssm_parameter_map`** |
| 34 | `deployer` | Lambda + ECS + ECR + SSM *(multi-service)* | **`deploy_lambda_with_config`**, **`deploy_ecs_image`**, **`deploy_ecs_from_ecr`**, **`update_lambda_code_from_s3`**, **`update_lambda_alias`** |
| 35 | `notifier` | SNS + SES + SQS *(multi-service)* | **`send_alert`** (concurrent), **`notify_on_exception`** (decorator), **`broadcast`**, **`resolve_and_notify`** |
| 36 | `data_pipeline` | S3 + Glue + Athena + Kinesis + DynamoDB + SQS *(multi-service)* | **`run_glue_then_query`**, **`export_query_to_s3_json`**, **`s3_json_to_dynamodb`**, **`s3_jsonl_to_sqs`**, **`kinesis_to_s3_snapshot`**, **`parallel_export`** |

---

## Placeholder Resolution

```python
from aws_util import retrieve

db_host     = retrieve("${ssm:/myapp/db/host}")
api_key     = retrieve("${secret:myapp/api-key}")
db_password = retrieve("${secret:myapp/db-credentials:password}")
conn        = retrieve("host=${ssm:/myapp/db/host} port=5432")
retrieve(42)  # → 42  (non-strings pass through unchanged)
```

```python
from aws_util import clear_ssm_cache, clear_secret_cache, clear_all_caches
clear_all_caches()
```

---

## Parameter Store

```python
from aws_util.parameter_store import (
    get_parameter, put_parameter, delete_parameter,
    get_parameters_by_path, get_parameters_batch,
)

value = get_parameter("/myapp/prod/db/host")
put_parameter("/myapp/prod/db/host", "db.internal", description="DB host")

# Load all parameters under a path prefix as a flat dict
params = get_parameters_by_path("/myapp/prod/")
print(params["db/host"], params["db/port"])

# Fetch a specific list of parameters in one request (auto-chunks at 10)
values = get_parameters_batch(["/myapp/prod/db/host", "/myapp/prod/db/port"])

delete_parameter("/myapp/prod/db/host")
```

---

## Secrets Manager

```python
from aws_util.secrets_manager import (
    get_secret, create_secret, update_secret, delete_secret,
    list_secrets, rotate_secret,
)

# Fetch whole secret or a single JSON key
raw   = get_secret("myapp/db-credentials")
passw = get_secret("myapp/db-credentials:password")

arn = create_secret(
    "myapp/db-credentials",
    value={"username": "admin", "password": "s3cr3t"},
    description="App DB credentials",
    tags={"env": "prod"},
)

update_secret("myapp/db-credentials", {"username": "admin", "password": "newpass"})

# List secrets whose name starts with a prefix
secrets = list_secrets(name_prefix="myapp/")
for s in secrets:
    print(s["name"], s["arn"])

# Trigger immediate rotation (Lambda must already be configured)
rotate_secret("myapp/db-credentials")

# Soft delete with 14-day recovery window
delete_secret("myapp/db-credentials", recovery_window_in_days=14)
```

---

## S3

```python
from aws_util.s3 import (
    upload_file, upload_bytes, download_file, download_bytes,
    list_objects, object_exists, delete_object, copy_object, presigned_url,
    delete_prefix, move_object, batch_copy, download_as_text, get_object_metadata,
)

upload_file("bucket", "reports/q1.csv", "/tmp/q1.csv")
upload_bytes("bucket", "data.json", b'{"k":1}')
data = download_bytes("bucket", "data.json")
objects = list_objects("bucket", prefix="reports/")
url = presigned_url("bucket", "reports/q1.csv", expires_in=3600)
print(url.url)

# Delete everything under a prefix (batched automatically)
deleted = delete_prefix("bucket", "reports/2023/")
print(f"{deleted} objects removed")

# Atomic move (copy + delete)
move_object("src-bucket", "old/path/file.csv", "dst-bucket", "new/path/file.csv")

# Concurrent multi-object copy
batch_copy([
    {"src_bucket": "src", "src_key": "a.csv", "dst_bucket": "dst", "dst_key": "a.csv"},
    {"src_bucket": "src", "src_key": "b.csv", "dst_bucket": "dst", "dst_key": "b.csv"},
])

# Download a text file directly as a string
content = download_as_text("bucket", "config/settings.json")

# HEAD request — no body download
meta = get_object_metadata("bucket", "reports/q1.csv")
print(meta["ContentLength"], meta["LastModified"])
```

---

## DynamoDB

```python
from aws_util.dynamodb import DynamoKey, get_item, put_item, update_item, query, scan, batch_get
from boto3.dynamodb.conditions import Key, Attr

key = DynamoKey(partition_key="pk", partition_value="user#1")
item = get_item("Users", key)
put_item("Users", {"pk": "user#1", "name": "Alice"})
update_item("Users", key, {"name": "Alicia"})
items = query("Orders", Key("pk").eq("user#1"), scan_index_forward=False)
```

---

## SQS

```python
from aws_util.sqs import (
    get_queue_url, send_message, receive_messages, delete_message,
    send_large_batch, drain_queue, replay_dlq, wait_for_message,
)

url = get_queue_url("my-queue")
send_message(url, {"order_id": "abc"})
messages = receive_messages(url, max_number=10, wait_seconds=20)
for m in messages:
    print(m.body_as_json())
    delete_message(url, m.receipt_handle)

# Send any number of messages — automatically split into batches of 10
total = send_large_batch(url, [{"n": i} for i in range(50)])

# Process and delete every message in a queue
def handle(msg):
    print(msg.body_as_json())

processed = drain_queue(url, handler=handle, batch_size=10)

# Move all DLQ messages back to the source queue
moved = replay_dlq(dlq_url="https://sqs.../my-queue-dlq", target_url=url)

# Block until a matching message arrives (or timeout)
msg = wait_for_message(
    url,
    predicate=lambda m: m.body_as_json().get("type") == "order_placed",
    timeout=30.0,
)
```

---

## SNS

```python
from aws_util.sns import publish, publish_batch, publish_fan_out, create_topic_if_not_exists

result = publish("arn:aws:sns:...:my-topic", {"event": "user_signup"})
publish_batch("arn:aws:sns:...:my-topic", [{"a": 1}, {"b": 2}])

# Fan-out: publish the same event to multiple topics concurrently
publish_fan_out(
    ["arn:aws:sns:...:topic-a", "arn:aws:sns:...:topic-b"],
    {"event": "deploy_complete"},
)

# Idempotent topic creation
arn = create_topic_if_not_exists("my-notifications")
```

---

## Lambda

```python
from aws_util.lambda_ import invoke, invoke_async

result = invoke("my-function", {"key": "value"})
if result.succeeded:
    print(result.payload)
invoke_async("my-function", {"key": "value"})
```

---

## CloudWatch

```python
from aws_util.cloudwatch import MetricDimension, put_metric, LogEvent, put_log_events

put_metric("MyApp", "Latency", 120.5, "Milliseconds",
           dimensions=[MetricDimension(name="Endpoint", value="/api")])

put_log_events("/myapp", "2024-01-01", [LogEvent.now("request received")])
```

---

## STS

```python
from aws_util.sts import get_caller_identity, assume_role, assume_role_session

identity = get_caller_identity()
print(identity.account_id)

creds = assume_role("arn:aws:iam::123456789012:role/MyRole", "session")

# Get a boto3 Session under an assumed role — ready to create service clients
session = assume_role_session("arn:aws:iam::999999999999:role/CrossAccountRole", "audit")
s3 = session.client("s3")
```

---

## EventBridge

```python
from aws_util.eventbridge import put_event, put_events_chunked, list_rules, EventEntry

put_event("com.myapp.orders", "Order Placed", {"order_id": "ord_001"})

# Publish > 10 events automatically chunked into batches of 10
events = [EventEntry(source="com.myapp", detail_type="Tick", detail={"n": i}) for i in range(35)]
put_events_chunked(events)

# List all rules on the default bus
rules = list_rules()
for rule in rules:
    print(rule["Name"], rule["State"])
```

---

## KMS

```python
from aws_util.kms import encrypt, decrypt, generate_data_key, \
    envelope_encrypt, envelope_decrypt, re_encrypt

result = encrypt("alias/my-key", "sensitive-value")
plaintext = decrypt(result.ciphertext_blob)

data_key = generate_data_key("alias/my-key")
# use data_key.plaintext locally, store data_key.ciphertext_blob

# Envelope encryption (AES-GCM + KMS-wrapped key)
payload = envelope_encrypt("alias/my-key", b"secret data")
# store payload["ciphertext"] and payload["encrypted_data_key"] together
original = envelope_decrypt(payload["ciphertext"], payload["encrypted_data_key"])

# Key rotation: move ciphertext to a new key without exposing the plaintext
rotated = re_encrypt(result.ciphertext_blob, destination_key_id="alias/my-new-key")
print(rotated.key_id)  # new key ARN
```

---

## EC2

```python
from aws_util.ec2 import describe_instances, start_instances, stop_instances, create_image

instances = describe_instances(filters=[{"Name": "instance-state-name", "Values": ["running"]}])
for inst in instances:
    print(inst.instance_id, inst.instance_type, inst.state)

stop_instances(["i-1234567890abcdef0"])
ami_id = create_image("i-1234567890abcdef0", "my-backup-ami")
```

---

## RDS

```python
from aws_util.rds import describe_db_instances, start_db_instance, create_db_snapshot

instances = describe_db_instances()
start_db_instance("my-db")
snapshot = create_db_snapshot("my-db", "my-db-snapshot-2024")
```

---

## ECS

```python
from aws_util.ecs import run_task, describe_services, update_service

tasks = run_task("my-cluster", "my-task:5", subnets=["subnet-abc"], security_groups=["sg-xyz"])
services = describe_services("my-cluster", ["my-service"])
update_service("my-cluster", "my-service", desired_count=3)
```

---

## ECR

```python
from aws_util.ecr import get_auth_token, list_repositories, list_images, \
    ensure_repository, get_latest_image_tag

tokens = get_auth_token()
print(tokens[0].endpoint, tokens[0].username)

repos = list_repositories()
images = list_images("my-repo")

# Idempotent — creates the repo only if it doesn't exist
repo = ensure_repository("my-app", image_tag_mutability="IMMUTABLE", scan_on_push=True)
print(repo.repository_uri)

# Find the most recently pushed tag
tag = get_latest_image_tag("my-app")
print(tag)  # e.g. "v1.4.2"
```

---

## IAM

```python
from aws_util.iam import create_role, attach_role_policy, list_roles, \
    create_role_with_policies, ensure_role

role = create_role("MyLambdaRole", {
    "Version": "2012-10-17",
    "Statement": [{"Effect": "Allow", "Principal": {"Service": "lambda.amazonaws.com"},
                   "Action": "sts:AssumeRole"}]
})
attach_role_policy(role.role_name, "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole")

# Create a role and attach multiple managed + inline policies in one call
role = create_role_with_policies(
    "MyAppRole",
    trust_policy={
        "Version": "2012-10-17",
        "Statement": [{"Effect": "Allow", "Principal": {"Service": "ec2.amazonaws.com"},
                       "Action": "sts:AssumeRole"}],
    },
    managed_policy_arns=["arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"],
    inline_policies={
        "AllowSQS": {
            "Version": "2012-10-17",
            "Statement": [{"Effect": "Allow", "Action": "sqs:*", "Resource": "*"}],
        }
    },
)

# Idempotent — creates the role only if it doesn't already exist
role, created = ensure_role(
    "MyAppRole",
    trust_policy={"Version": "2012-10-17", "Statement": [...]},
    managed_policy_arns=["arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess"],
)
```

---

## Cognito

```python
from aws_util.cognito import admin_create_user, admin_get_user, list_users, \
    admin_initiate_auth, get_or_create_user, bulk_create_users, reset_user_password

user = admin_create_user("us-east-1_abc123", "alice", attributes={"email": "alice@example.com"})
admin_set_user_password("us-east-1_abc123", "alice", "MyP@ssword1!", permanent=True)
auth = admin_initiate_auth("us-east-1_abc123", "client-id", "alice", "MyP@ssword1!")
print(auth.access_token)

# Idempotent — returns existing user or creates a new one
user, created = get_or_create_user(
    "us-east-1_abc123", "bob",
    attributes={"email": "bob@example.com"},
    temp_password="TempP@ss1!",
)

# Create multiple users in one call
users = bulk_create_users("us-east-1_abc123", [
    {"username": "carol", "attributes": {"email": "carol@example.com"}},
    {"username": "dave",  "attributes": {"email": "dave@example.com"}, "temp_password": "TempP@ss2!"},
])

# Trigger a password-reset email/SMS for a user
reset_user_password("us-east-1_abc123", "alice")
```

---

## Route 53

```python
from aws_util.route53 import list_hosted_zones, upsert_record, delete_record, \
    wait_for_change, bulk_upsert_records

zones = list_hosted_zones()
change_id = upsert_record("Z1234567890", "api.example.com", "A", ["1.2.3.4"])

# Wait for the DNS change to fully propagate (INSYNC)
wait_for_change(change_id, timeout=300)

# Upsert multiple records in a single API call
change_id = bulk_upsert_records("Z1234567890", [
    {"name": "api.example.com",  "record_type": "A",     "values": ["1.2.3.4"],         "ttl": 300},
    {"name": "www.example.com",  "record_type": "CNAME", "values": ["api.example.com"], "ttl": 60},
    {"name": "mail.example.com", "record_type": "MX",    "values": ["10 mail.example.com"]},
])
wait_for_change(change_id)
```

---

## ACM

```python
from aws_util.acm import list_certificates, request_certificate, describe_certificate, \
    wait_for_certificate, find_certificate_by_domain

certs = list_certificates(status_filter=["ISSUED"])
arn = request_certificate("api.example.com", subject_alternative_names=["www.example.com"])
cert = describe_certificate(arn)
print(cert.status, cert.not_after)

# Wait for DNS validation to complete
issued_cert = wait_for_certificate(arn, timeout=600)
print(issued_cert.status)  # "ISSUED"

# Look up a cert by domain without knowing its ARN
cert = find_certificate_by_domain("api.example.com")
```

---

## Step Functions

```python
from aws_util.stepfunctions import start_execution, wait_for_execution, \
    run_and_wait, get_execution_history

execution = start_execution("arn:aws:states:...:stateMachine:MyMachine", {"order_id": "abc"})
result = wait_for_execution(execution.execution_arn, timeout=300)
if result.succeeded:
    print(result.output)

# One-liner: start + wait
result = run_and_wait(
    "arn:aws:states:...:stateMachine:MyMachine",
    input_data={"order_id": "abc"},
    timeout=300,
)
if result.succeeded:
    print(result.output)

# Inspect every state transition and error for debugging
events = get_execution_history(result.execution_arn)
for event in events:
    print(event["type"], event.get("timestamp"))
```

---

## CloudFormation

```python
from aws_util.cloudformation import create_stack, wait_for_stack, get_stack_outputs, \
    deploy_stack, get_export_value

create_stack("my-stack", template_body=my_template, capabilities=["CAPABILITY_IAM"])
stack = wait_for_stack("my-stack")
if stack.is_healthy:
    outputs = get_stack_outputs("my-stack")
    print(outputs["ApiEndpoint"])

# Create or update a stack and wait — handles both cases automatically
stack = deploy_stack(
    "my-stack",
    template_body=my_template,
    parameters={"Env": "prod"},
    capabilities=["CAPABILITY_IAM"],
    timeout=900,
)
print(stack.status)  # "CREATE_COMPLETE" or "UPDATE_COMPLETE"

# Retrieve a cross-stack export value by its Export.Name
vpc_id = get_export_value("my-network-stack-VpcId")
```

---

## Kinesis

```python
from aws_util.kinesis import put_record, put_records, get_records, consume_stream

put_record("my-stream", {"event": "click", "user": "u1"}, partition_key="u1")
put_records("my-stream", [
    {"data": {"event": "view"}, "partition_key": "u2"},
    {"data": {"event": "buy"},  "partition_key": "u3"},
])
records = get_records("my-stream", "shardId-000000000000", limit=50)

# Consume all shards concurrently for 60 seconds, calling handler per record
def handle(record):
    print(record["partition_key"], record["data"])

total = consume_stream(
    "my-stream",
    handler=handle,
    shard_iterator_type="TRIM_HORIZON",
    duration_seconds=60,
)
print(f"Processed {total} records")
```

---

## Firehose

```python
from aws_util.firehose import put_record, put_record_batch, put_record_batch_with_retry

put_record("my-delivery-stream", {"event": "pageview"})
put_record_batch("my-delivery-stream", [{"a": 1}, {"b": 2}, {"c": 3}])

# Automatic retry for throttled/failed records
delivered = put_record_batch_with_retry(
    "my-delivery-stream",
    large_list_of_records,
    max_retries=3,
)
print(f"{delivered} records delivered")
```

---

## SES

```python
from aws_util.ses import send_email, send_templated_email, send_with_attachment, send_bulk

result = send_email(
    from_address="no-reply@example.com",
    to_addresses=["user@example.com"],
    subject="Welcome!",
    body_html="<h1>Hello</h1>",
    body_text="Hello",
)
print(result.message_id)

send_templated_email("no-reply@example.com", ["user@example.com"],
                     "WelcomeTemplate", {"name": "Alice"})

# Email with attachment
with open("report.pdf", "rb") as f:
    send_with_attachment(
        from_address="no-reply@example.com",
        to_addresses=["user@example.com"],
        subject="Monthly Report",
        body_text="Please find the report attached.",
        attachments=[{"filename": "report.pdf", "data": f.read(), "mimetype": "application/pdf"}],
    )

# Bulk send
send_bulk("no-reply@example.com", [
    {"to_addresses": ["a@example.com"], "subject": "Hi A", "body_text": "Hello A"},
    {"to_addresses": ["b@example.com"], "subject": "Hi B", "body_text": "Hello B"},
])
```

---

## Glue

```python
from aws_util.glue import start_job_run, wait_for_job_run, run_job_and_wait, stop_job_run

run_id = start_job_run("my-etl-job", arguments={"--input": "s3://bucket/input/"})
run = wait_for_job_run("my-etl-job", run_id, timeout=3600)
if run.succeeded:
    print(f"Completed in {run.execution_time}s")

# One-liner: start + wait
run = run_job_and_wait("my-etl-job", arguments={"--date": "2024-01-01"})

# Stop a running job
stop_job_run("my-etl-job", run_id)
```

---

## Athena

```python
from aws_util.athena import run_query, get_table_schema, run_ddl

rows = run_query(
    query="SELECT * FROM orders WHERE status = 'PENDING' LIMIT 100",
    database="my_database",
    output_location="s3://my-bucket/athena-results/",
)
for row in rows:
    print(row["order_id"], row["amount"])

# Inspect a table's columns
schema = get_table_schema("my_database", "orders", "s3://my-bucket/athena-results/")
for col in schema:
    print(col["name"], col["type"])

# Execute DDL
run_ddl("CREATE TABLE IF NOT EXISTS logs (ts STRING, msg STRING)",
        database="my_database", output_location="s3://my-bucket/athena-results/")
```

---

## Bedrock

```python
from aws_util.bedrock import invoke_claude, invoke_titan_text, list_foundation_models, \
    chat, embed_text, stream_invoke_claude

response = invoke_claude("Summarise this document in 3 bullet points: ...")
print(response)

titan_response = invoke_titan_text("What is machine learning?")

models = list_foundation_models(provider_name="Anthropic")

# Multi-turn conversation
reply = chat([
    {"role": "user", "content": "What is the capital of France?"},
    {"role": "assistant", "content": "Paris."},
    {"role": "user", "content": "And the population?"},
])

# Generate embeddings (Titan)
vector = embed_text("the quick brown fox")  # returns list[float]

# Streaming response
for chunk in stream_invoke_claude("Write a short poem about clouds."):
    print(chunk, end="", flush=True)
```

---

## Rekognition

```python
from aws_util.rekognition import detect_labels, detect_faces, detect_text, compare_faces, \
    create_collection, index_face, search_face_by_image, delete_collection, ensure_collection

with open("photo.jpg", "rb") as f:
    image_bytes = f.read()

labels = detect_labels(image_bytes, min_confidence=80.0)
faces  = detect_faces(image_bytes, attributes=["ALL"])
texts  = detect_text(image_bytes)
matches = compare_faces(source_bytes, target_bytes, similarity_threshold=90.0)

# Face collection (1:N search)
create_collection("employees")
face_id = index_face("employees", image_bytes=image_bytes, external_image_id="emp_001")
results = search_face_by_image("employees", image_bytes=query_bytes, max_faces=3)
for r in results:
    print(r["external_image_id"], r["similarity"])
delete_collection("employees")

# Idempotent — creates the collection only if it doesn't already exist
arn, created = ensure_collection("employees")
print(f"Collection {'created' if created else 'already exists'}: {arn}")
```

---

## Textract

```python
from aws_util.textract import detect_document_text, analyze_document, \
    start_document_text_detection, wait_for_document_text_detection, \
    extract_text, extract_tables, extract_form_fields, extract_all

# Synchronous (single page)
with open("invoice.pdf", "rb") as f:
    doc_bytes = f.read()

# Plain text extraction
text = extract_text(document_bytes=doc_bytes)

# Tables as nested lists
tables = extract_tables(document_bytes=doc_bytes)
for table in tables:
    for row in table:
        print(row)

# Form key-value pairs
fields = extract_form_fields(document_bytes=doc_bytes)

# Everything in one call
result = extract_all(document_bytes=doc_bytes)
print(result["text"])
print(result["tables"])
print(result["form_fields"])

# Async (multi-page PDF in S3)
job_id = start_document_text_detection("my-bucket", "docs/report.pdf")
job_result = wait_for_document_text_detection(job_id, timeout=300)
words = [b.text for b in job_result.blocks if b.block_type == "WORD"]
```

---

## Comprehend

```python
from aws_util.comprehend import (
    detect_sentiment, detect_entities, detect_key_phrases,
    detect_dominant_language, detect_pii_entities,
    analyze_text, redact_pii, batch_detect_sentiment,
)

sentiment = detect_sentiment("I love this product!", language_code="en")
print(sentiment.sentiment, sentiment.positive)

entities = detect_entities("Jeff Bezos founded Amazon in Seattle.")
for e in entities:
    print(e.entity_type, e.text)

language = detect_dominant_language("Bonjour tout le monde")
print(language.language_code)  # "fr"

# All analyses in one parallel call
analysis = analyze_text("AWS is amazing! Jeff Bezos founded it. My SSN is 123-45-6789.")
print(analysis["sentiment"].sentiment)
print([e.text for e in analysis["entities"]])
print([p.pii_type for p in analysis["pii_entities"]])

# Redact PII from text
clean = redact_pii("Call me at 555-1234 or email bob@example.com")
# → "Call me at [REDACTED] or email [REDACTED]"

# Batch sentiment — up to 25 texts in one API call
results = batch_detect_sentiment([
    "The product is excellent!",
    "Terrible experience, very disappointing.",
    "It's okay, nothing special.",
])
for r in results:
    print(r.sentiment, r.positive)
```

---

## Translate

```python
from aws_util.translate import translate_text, list_languages, translate_batch

result = translate_text("Hello, world!", target_language_code="es")
print(result.translated_text)   # "¡Hola, mundo!"
print(result.source_language_code)  # "en" (auto-detected)

languages = list_languages()

# Translate a list of strings concurrently
results = translate_batch(
    ["Hello", "Good morning", "Thank you"],
    target_language_code="de",
)
for r in results:
    print(r.translated_text)
```

---

## Config Loader *(SSM + Secrets Manager)*

```python
from aws_util.config_loader import (
    load_app_config, load_config_from_ssm, load_config_from_secret,
    resolve_config, get_db_credentials, get_ssm_parameter_map,
)

# Load all config concurrently from SSM path + multiple secrets
config = load_app_config(
    ssm_prefix="/myapp/prod/",
    secret_names=["myapp/db-credentials", "myapp/api-keys"],
)
print(config["db/host"])     # from SSM
print(config["password"])    # from secret
print("db/host" in config)   # True

# Get DB credentials with required-field validation
creds = get_db_credentials("myapp/db-credentials")
print(creds["username"], creds["password"], creds.get("host"))

# Fetch a specific list of SSM parameters as a dict
params = get_ssm_parameter_map(["/myapp/db/host", "/myapp/db/port"])

# Expand ${ssm:...} / ${secret:...} placeholders in a static config dict
raw_config = {
    "db_host":   "${ssm:/myapp/prod/db/host}",
    "api_key":   "${secret:myapp/api-keys:key}",
    "log_level": "INFO",
}
config = resolve_config(raw_config)
```

---

## Deployer *(Lambda + ECS + ECR + SSM)*

```python
from aws_util.deployer import (
    deploy_lambda_with_config,
    update_lambda_code_from_s3,
    update_lambda_alias,
    deploy_ecs_image,
    deploy_ecs_from_ecr,
    get_latest_ecr_image_uri,
)

# Full Lambda deploy: upload zip, pull env vars from SSM, publish + alias
result = deploy_lambda_with_config(
    function_name="my-function",
    zip_path="/dist/function.zip",
    ssm_prefix="/myapp/prod/",      # merged into env vars
    env_vars={"LOG_LEVEL": "INFO"}, # static overrides
    publish=True,
    alias="live",
)
print(result.function_arn, result.version, result.alias_arn)

# Deploy from S3
result = deploy_lambda_with_config(
    function_name="my-function",
    s3_bucket="my-artifacts",
    s3_key="builds/function-v2.zip",
    publish=True,
)

# Update ECS service to a new container image and wait for stability
ecs_result = deploy_ecs_image(
    cluster="my-cluster",
    service="my-service",
    new_image_uri="123456789012.dkr.ecr.us-east-1.amazonaws.com/my-app:v1.5.0",
    wait=True,
    timeout=300,
)
print(ecs_result.new_task_definition_arn, ecs_result.deployment_id)

# Deploy the latest image from an ECR repository
ecs_result = deploy_ecs_from_ecr(
    cluster="my-cluster",
    service="my-service",
    repository_name="my-app",
    tag="latest",
)
```

---

## Notifier *(SNS + SES + SQS)*

```python
from aws_util.notifier import send_alert, notify_on_exception, broadcast, resolve_and_notify

# Send to any combination of channels concurrently
results = send_alert(
    subject="Deploy succeeded",
    message="Version 1.5 is live.",
    sns_topic_arn="arn:aws:sns:us-east-1:123:alerts",
    from_email="no-reply@example.com",
    to_emails=["ops@example.com", "cto@example.com"],
    queue_url="https://sqs.us-east-1.amazonaws.com/123/audit-log",
)
for r in results:
    print(r.channel, r.success, r.message_id)

# Decorator — auto-alert whenever the function raises
@notify_on_exception(
    sns_topic_arn="arn:aws:sns:us-east-1:123:alerts",
    from_email="no-reply@example.com",
    to_emails=["oncall@example.com"],
)
def nightly_job():
    ...  # any exception triggers an alert; exception is re-raised

# Fan-out to many destinations at once
result = broadcast(
    message="Scheduled maintenance in 30 minutes.",
    subject="Maintenance Notice",
    sns_topic_arns=["arn:aws:sns:...:team-a", "arn:aws:sns:...:team-b"],
    queue_urls=["https://sqs.../audit"],
    from_email="no-reply@example.com",
    to_email_groups=[["alice@example.com"], ["bob@example.com", "carol@example.com"]],
)
print(f"{len(result.succeeded)} delivered, {len(result.failed)} failed")

# Destinations resolved from SSM / Secrets Manager at runtime
resolve_and_notify(
    subject="Nightly ETL complete",
    message_template="Processed {rows} rows in {minutes} minutes.",
    ssm_topic_arn_param="/myapp/prod/alerts-topic-arn",
    secret_email_config="myapp/email-config",
    template_vars={"rows": 150_000, "minutes": 12},
)
```

---

## Data Pipeline *(S3 + Glue + Athena + Kinesis + DynamoDB + SQS)*

```python
from aws_util.data_pipeline import (
    run_glue_job, run_athena_query, fetch_athena_results,
    run_glue_then_query, export_query_to_s3_json,
    s3_json_to_dynamodb, s3_jsonl_to_sqs,
    kinesis_to_s3_snapshot, parallel_export,
)

# Run a Glue job and wait for completion
run = run_glue_job("my-etl-job", arguments={"input": "s3://raw/", "output": "s3://clean/"})
print(run.state, run.execution_time_seconds)

# Full pipeline: Glue ETL → Athena query
result = run_glue_then_query(
    glue_job_name="my-etl-job",
    athena_query="SELECT COUNT(*) FROM clean_orders WHERE status='COMPLETE'",
    athena_database="warehouse",
    athena_output_location="s3://my-bucket/athena-results/",
)
if result.athena_result and result.athena_result.state == "SUCCEEDED":
    rows = fetch_athena_results(result.athena_result.query_execution_id)

# Export Athena query results directly to S3 as a JSON array
count = export_query_to_s3_json(
    query="SELECT * FROM orders WHERE dt = '2024-01-01'",
    database="warehouse",
    staging_location="s3://my-bucket/athena-staging/",
    output_bucket="my-bucket",
    output_key="exports/orders-2024-01-01.json",
)
print(f"{count} rows exported")

# Load S3 JSON array into DynamoDB
written = s3_json_to_dynamodb("my-bucket", "exports/orders-2024-01-01.json", "Orders")

# Enqueue each line of a JSONL file as an SQS message
sent = s3_jsonl_to_sqs("my-bucket", "events/2024-01-01.jsonl", queue_url)

# Snapshot all Kinesis shards to S3 JSONL files
total = kinesis_to_s3_snapshot(
    stream_name="my-stream",
    output_bucket="my-bucket",
    output_key_prefix="snapshots/2024-01-01/",
)
print(f"{total} records written")

# Run multiple Athena queries concurrently and export each to S3
results = parallel_export(
    queries=[
        {"query": "SELECT * FROM orders", "database": "warehouse", "output_key": "orders.json", "label": "orders"},
        {"query": "SELECT * FROM users",  "database": "warehouse", "output_key": "users.json",  "label": "users"},
    ],
    staging_location="s3://my-bucket/staging/",
    output_bucket="my-bucket",
    output_key_prefix="exports/",
)
for r in results:
    print(r["label"], r["rows"], r["error"])
```

---

## Error handling

| Condition | Exception |
|---|---|
| Any AWS API call fails | `RuntimeError` |
| Secret not valid JSON when key specified | `RuntimeError` |
| JSON key not found in secret | `KeyError` |
| Batch size limit exceeded | `ValueError` |
| Batch partially fails | `RuntimeError` with details |
| Image/document source not specified | `ValueError` |
| Polling timeout exceeded | `TimeoutError` |

---

## AWS IAM permissions

Minimum permissions required per service:

| Service | Required Actions |
|---|---|
| SSM | `ssm:GetParameter` `ssm:GetParameters` `ssm:GetParametersByPath` `ssm:PutParameter` `ssm:DeleteParameter` |
| Secrets Manager | `secretsmanager:GetSecretValue` `secretsmanager:CreateSecret` `secretsmanager:UpdateSecret` `secretsmanager:DeleteSecret` `secretsmanager:ListSecrets` `secretsmanager:RotateSecret` |
| S3 | `s3:GetObject` `s3:PutObject` `s3:DeleteObject` `s3:ListBucket` |
| DynamoDB | `dynamodb:GetItem` `dynamodb:PutItem` `dynamodb:UpdateItem` `dynamodb:DeleteItem` `dynamodb:Query` `dynamodb:Scan` `dynamodb:BatchGetItem` `dynamodb:BatchWriteItem` |
| SQS | `sqs:SendMessage` `sqs:ReceiveMessage` `sqs:DeleteMessage` `sqs:GetQueueUrl` `sqs:PurgeQueue` |
| SNS | `sns:Publish` `sns:CreateTopic` |
| Lambda | `lambda:InvokeFunction` |
| CloudWatch Metrics | `cloudwatch:PutMetricData` |
| CloudWatch Logs | `logs:CreateLogGroup` `logs:CreateLogStream` `logs:PutLogEvents` `logs:GetLogEvents` |
| STS | `sts:GetCallerIdentity` `sts:AssumeRole` |
| EventBridge | `events:PutEvents` |
| KMS | `kms:Encrypt` `kms:Decrypt` `kms:GenerateDataKey` |
| EC2 | `ec2:DescribeInstances` `ec2:StartInstances` `ec2:StopInstances` `ec2:RebootInstances` `ec2:TerminateInstances` `ec2:CreateImage` |
| RDS | `rds:DescribeDBInstances` `rds:StartDBInstance` `rds:StopDBInstance` `rds:CreateDBSnapshot` `rds:DeleteDBSnapshot` |
| ECS | `ecs:RunTask` `ecs:StopTask` `ecs:DescribeTasks` `ecs:ListTasks` `ecs:DescribeServices` `ecs:UpdateService` |
| ECR | `ecr:GetAuthorizationToken` `ecr:DescribeRepositories` `ecr:CreateRepository` `ecr:ListImages` `ecr:DescribeImages` |
| IAM | `iam:CreateRole` `iam:DeleteRole` `iam:GetRole` `iam:ListRoles` `iam:AttachRolePolicy` `iam:DetachRolePolicy` `iam:CreatePolicy` `iam:DeletePolicy` |
| Cognito | `cognito-idp:AdminCreateUser` `cognito-idp:AdminGetUser` `cognito-idp:AdminDeleteUser` `cognito-idp:ListUsers` `cognito-idp:AdminInitiateAuth` |
| Route 53 | `route53:ListHostedZones` `route53:ChangeResourceRecordSets` `route53:ListResourceRecordSets` |
| ACM | `acm:ListCertificates` `acm:DescribeCertificate` `acm:RequestCertificate` `acm:DeleteCertificate` |
| Step Functions | `states:StartExecution` `states:DescribeExecution` `states:StopExecution` `states:ListExecutions` |
| CloudFormation | `cloudformation:CreateStack` `cloudformation:UpdateStack` `cloudformation:DeleteStack` `cloudformation:DescribeStacks` `cloudformation:ListStacks` |
| Kinesis | `kinesis:PutRecord` `kinesis:PutRecords` `kinesis:GetRecords` `kinesis:GetShardIterator` `kinesis:DescribeStreamSummary` `kinesis:ListStreams` |
| Firehose | `firehose:PutRecord` `firehose:PutRecordBatch` `firehose:ListDeliveryStreams` `firehose:DescribeDeliveryStream` |
| SES | `ses:SendEmail` `ses:SendTemplatedEmail` `ses:SendRawEmail` `ses:VerifyEmailAddress` `ses:ListVerifiedEmailAddresses` |
| Glue | `glue:StartJobRun` `glue:GetJobRun` `glue:GetJob` `glue:GetJobs` `glue:GetJobRuns` `glue:BatchStopJobRun` |
| Athena | `athena:StartQueryExecution` `athena:GetQueryExecution` `athena:GetQueryResults` `athena:StopQueryExecution` |
| Bedrock | `bedrock:InvokeModel` `bedrock:InvokeModelWithResponseStream` `bedrock:ListFoundationModels` |
| Rekognition | `rekognition:DetectLabels` `rekognition:DetectFaces` `rekognition:DetectText` `rekognition:CompareFaces` `rekognition:DetectModerationLabels` `rekognition:CreateCollection` `rekognition:DeleteCollection` `rekognition:IndexFaces` `rekognition:SearchFacesByImage` |
| Textract | `textract:DetectDocumentText` `textract:AnalyzeDocument` `textract:StartDocumentTextDetection` `textract:GetDocumentTextDetection` |
| Comprehend | `comprehend:DetectSentiment` `comprehend:DetectEntities` `comprehend:DetectKeyPhrases` `comprehend:DetectDominantLanguage` `comprehend:DetectPiiEntities` `comprehend:BatchDetectSentiment` |
| Translate | `translate:TranslateText` `translate:ListLanguages` |
| config_loader | *(union of SSM + Secrets Manager permissions above)* |
| deployer | `lambda:UpdateFunctionCode` `lambda:UpdateFunctionConfiguration` `lambda:PublishVersion` `lambda:CreateAlias` `lambda:UpdateAlias` `lambda:GetFunctionConfiguration` `ecs:DescribeServices` `ecs:DescribeTaskDefinition` `ecs:RegisterTaskDefinition` `ecs:UpdateService` `ecs:DescribeContainerInstances` `ecr:DescribeImages` `ecr:DescribeRepositories` |
| notifier | *(union of SNS `sns:Publish`, SES `ses:SendEmail`, SQS `sqs:SendMessage` plus SSM/Secrets Manager read permissions for `resolve_and_notify`)* |
| data_pipeline | `glue:StartJobRun` `glue:GetJobRun` `athena:StartQueryExecution` `athena:GetQueryExecution` `athena:GetQueryResults` `kinesis:GetShardIterator` `kinesis:GetRecords` `kinesis:ListShards` `kinesis:DescribeStreamSummary` `dynamodb:BatchWriteItem` `sqs:SendMessage` `s3:GetObject` `s3:PutObject` |

---

## License

MIT
