Metadata-Version: 2.4
Name: toolsbq
Version: 0.1.3
Summary: Helpers for Google BigQuery: client creation, schema helpers, and a convenience BqTools wrapper.
Author: MH
License-Expression: MIT
Requires-Python: >=3.10
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: google-cloud-bigquery>=3.0.0
Requires-Dist: google-auth>=2.0.0
Dynamic: license-file

# toolsbq

Utilities for working with **Google BigQuery** in Python.

Covers authentication, running queries, streaming inserts, upserts (via temp table + MERGE), load jobs, and table creation with partitioning/clustering.

## Install

```bash
pip install toolsbq
```

## Quick start

```python
from toolsbq import bq_get_client, BqTools

client = bq_get_client()  # uses ADC by default (recommended on Cloud Run / Functions)
bq = BqTools(bq_client=client)
```

## Authentication options

`bq_get_client()` resolves credentials in this order:

1. `keyfile_json` — SA key as dict
2. `path_keyfile` — path to SA JSON file (supports `~` and `$HOME` expansion)
3. `GOOGLE_APPLICATION_CREDENTIALS` env var
4. Local RAM-ADC fast path (macOS ramdisk / Linux `/dev/shm`)
5. ADC fallback (Cloud Run metadata, `gcloud auth application-default login`, etc.)

Examples:

```python
from toolsbq import bq_get_client

# 1) ADC (default)
client = bq_get_client(project_id="my-project")

# 2) Service account file
client = bq_get_client(path_keyfile="~/.config/gcloud/sa-keys/key.json")

# 3) Service account info dict
client = bq_get_client(keyfile_json={"type": "service_account", "project_id": "...", "...": "..."})
```

## Examples

The original script contained the following guidance and example notes:

```text
# ===============================================================================
# 0) Define overall variables for uploads
# ===============================================================================

datetime_system = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
#  datetime_utc = datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S.%f')
datetime_utc = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
print("Current datetime system:", datetime_system)
print("Current datetime UTC   :", datetime_utc)

# ===============================================================================
# 1) Provide BQ auth via file path / via json string
# ===============================================================================

# path_keyfile = "~/.config/gcloud/sa-keys/keyfile.json"
#
# # client = bq_get_client(sql_keyfile_json=sql_keyfile_json)
# client = bq_get_client(path_keyfile=path_keyfile)
# #  client = bq_get_client(keyfile_json=keyfile_json)
#
# # pass none for test (not creating an actual client)
# #  client = None

#  NEW default: ADC
client = bq_get_client()

# ===============================================================================
# 2) Example fields_schema fields to copy over
# ===============================================================================

#  bq_upload = BqTools(
#      bq_client=client,
#      table_id="",
#      fields_schema=[
#          # fields list: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
#          {"name": "", "type": "INT64", "isKey": 0, "mode": "nullable", "default": None},
#          {"name": "", "type": "INT64", "isKey": 0, "mode": "required", "default": None},

#          {"name": "", "type": "STRING", "isKey": 0, "mode": "nullable", "default": None},
#          {"name": "", "type": "STRING", "isKey": 0, "mode": "required", "default": None},

#          {"name": "", "type": "DATE", "isKey": 0, "mode": "nullable", "default": None},
#          {"name": "", "type": "DATE", "isKey": 0, "mode": "required", "default": None},

#          {"name": "", "type": "DATETIME", "isKey": 0, "mode": "nullable", "default": None},
#          {"name": "", "type": "DATETIME", "isKey": 0, "mode": "required", "default": None},

#          {"name": "", "type": "TIMESTAMP", "isKey": 0, "mode": "nullable", "default": None},
#          {"name": "", "type": "TIMESTAMP", "isKey": 0, "mode": "required", "default": None},

#          {"name": "", "type": "NUMERIC", "isKey": 0, "mode": "nullable", "default": None},
#          {"name": "", "type": "NUMERIC", "isKey": 0, "mode": "required", "default": None},

#          {"name": "", "type": "BOOL", "isKey": 0, "mode": "nullable", "default": None},
#          {"name": "", "type": "BOOL", "isKey": 0, "mode": "required", "default": None},

#          {"name": "", "type": "JSON", "isKey": 0, "mode": "nullable", "default": None},
#          {"name": "", "type": "JSON", "isKey": 0, "mode": "required", "default": None},

#          {"name": "last_updated", "type": "TIMESTAMP", "isKey": 0, "mode": "required", "default": "current_timestamp"},
#      ],
#      #  https://cloud.google.com/bigquery/docs/creating-partitioned-tables#python
#      #  https://cloud.google.com/bigquery/docs/creating-clustered-tables
#      #  https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TimePartitioning
#      table_options={
#          "partition_field": None,
#          "cluster_fields": [],  # max 4 fields - by order provided
#          "partition_expiration_days": None,  # number of days for expiration (0.08 = 2 hours) -> creates options
#          # fields to define expiring partition by ingestion -> need partition_expiration_days too
#          "is_expiring_partition_ingestion_hour": None,  # defines expiring partitiong by ingestion time - by hour
#          "is_expiring_partition_ingestion_date": None,  # defines expiring partitiong by ingestion time  - by date
#      },
#      table_suffix="xxxxxx"
#  )

# ===============================================================================
# 3) Simple most basic Tools connection to run query / to pull data / get total rows
# ===============================================================================

# # to simply run a query without doing anything else
# bq_pull = BqTools(
#     bq_client=client,
# )
#
# query = """
#   SELECT * FROM testdb.testproject.testtable LIMIT 5;
# """
#
# print("Total rows in table:", bq_pull.get_row_count("testdb.testproject.testtable"))
# #  quit()
#
# bq_pull.runsql(query)
# print(bq_pull.sql_result)
# for row in bq_pull.sql_result:
#     print(row)

# ===============================================================================
# 4) Create a table by defining a schema and then running create table query
# ===============================================================================
# client = None

# bq_new_table = BqTools(
#     bq_client=client,
#     table_id="testdb.testproject.testtable",
#     fields_schema=[
#         {
#             "name": "employee_id",
#             "type": "int64",
#             "isKey": 1,
#             "mode": "nullable",
#             "default": None,
#         },
#         {"name": "stats_date", "type": "date", "isKey": 1, "mode": "nullable", "default": None},
#         {
#             "name": "annual_ctc",
#             "type": "int64",
#             "isKey": 0,
#             "mode": "nullable",
#             "default": None,
#         },
#         {
#             "name": "last_updated",
#             "type": "timestamp",
#             "isKey": 0,
#             "mode": "required",
#             "default": "current_timestamp",
#         },
#     ],
#     #  table_options={
#     #      "time_partition_field": None,  # youe _PARTITIONTIMEME, if field is not set
#     #      "time_partitioning_type": "HOUR",  # day, hour, month, year -> nothing: day
#     #      "expiration_ms": 3600000,  # 1 hour
#     #      "cluster_fields": [],  # max 4 fields - by order provided
#     #  },
#     table_options={
#         "partition_field": "stats_date",
#         "cluster_fields": ["employee_id"],  # max 4 fields - by order provided
#         "partition_expiration_days": None,  # number of days for expiration (0.08 = 2 hours) -> creates options
#         # fields to define expiring partition by ingestion -> need partition_expiration_days too
#         "is_expiring_partition_ingestion_hour": None,  # defines expiring partitiong by ingestion time - by hour
#         "is_expiring_partition_ingestion_date": None,  # defines expiring partitiong by ingestion time  - by date
#     },
#     table_suffix="xxxxxx",
# )
#
# print(bq_new_table.create_table_query)
# print(bq_new_table.merge_query)
# print(bq_new_table.table_id_temp)
# # quit()
#
# bq_new_table.run_create_table_main()
# quit()

#  # drop table via manual query
#  #  bq_new_table.runsql("drop table if exists {}".format(bq_new_table.table_id))
#  #  print("table dropped")

# ===============================================================================
# 5) Simple client to insert all into an existing table (creating duplicates, no upsert), no need for schema
# ===============================================================================

#  rows_to_insert = [
#      {"employee_id": 157, "annual_ctc": 182},
#      {"employee_id": 158, "annual_ctc": 183},
#      {"employee_id": 159, "annual_ctc": 184},
#      {"employee_id": 160, "annual_ctc": 1840},
#      {"employee_id": 161, "annual_ctc": 1840},
#      {"employee_id": 1000, "annual_ctc": 5000},
#  ]
#  print("numnber of rows:", len(rows_to_insert))

#  # 5a) generic -> define table name in function call
#  bq_insert = BqTools(
#      bq_client=client,
#  )
#  bq_insert.insert_stream_generic("testdb.testproject.testtable", rows_to_insert, max_rows_per_request=1000)

# 5b) table_id in class definition
#  bq_insert = BqTools(
#      bq_client=client,
#      table_id="testdb.testproject.testtable",
#  )
#  bq_insert.insert_stream_table_main(rows_to_insert, max_rows_per_request=1000)

# ===============================================================================
# 6) Upsert example: Define schema, insert all values into temp table, use specific suffic and uuid
# ===============================================================================

#  rows_to_insert = [
#      {"employee_id": 1579, "annual_ctc": 182},
#      {"employee_id": 1589, "annual_ctc": 183},
#      {"employee_id": 1599, "annual_ctc": 1840},
#      {"employee_id": 160, "annual_ctc": 18400},
#      {"employee_id": 161, "annual_ctc": 18400},
#      {"employee_id": 1000, "annual_ctc": 50000},
#  ]
#  print("number of rows:", len(rows_to_insert))

#  bq_upsert = BqTools(
#      bq_client=client,
#      table_id="testdb.testproject.testtable",
#      fields_schema=[
#          {"name": "employee_id", "type": "int64", "isKey": 1, "mode": "nullable", "default": None},
#          {"name": "stats_date", "type": "date", "isKey": 0, "mode": "nullable", "default": None},
#          {"name": "annual_ctc", "type": "int64", "isKey": 0, "mode": "nullable", "default": None},
#          {"name": "last_updated", "type": "timestamp", "isKey": 0, "mode": "required", "default": "current_timestamp"},
#      ],
#      table_options={
#          #  "partition_field": 'stats_date',
#          "cluster_fields": ['employee_id'],  # max 4 fields - by order provided
#      },
#      #  run_uuid="xxx-xxx-xxx-xxx",  # can pass over a uuid if needed to re-use connection and upsert is still working
#      #  table_suffix=None,
#      table_suffix="skoeis",  # use a different table_suffix on each upsert definition (e.g. when different amount of columns are updated)
#  )

#  # Generate a UUID in normal code, if we want to pass it over in tools definition
#  #  uuid_test = uuid4()
#  #  print(uuid_test)

#  print("the uuid is:", bq_upsert.run_uuid)
#  print(bq_upsert.table_id)
#  #  print(json.dumps(bq_upsert.fields_schema, indent=2))
#  print(bq_upsert.table_id_temp)
#  print("schema is safe:", bq_upsert.schema_is_safe)
#  #  print(json.dumps(bq_upsert.fields_schema_temp, indent=2))
#  #  print("create main table:", bq_upsert.create_table_query)
#  #  bq_upsert.run_create_table_main()
#  #  print("create temp table:", bq_upsert.create_table_query_temp)
#  print("merge query:", bq_upsert.merge_query)

#  # run the upsert
#  bq_upsert.run_upsert(rows_to_insert)

#  # check runUuid and merge query after upsert (should have changed now)
#  print("the uuid is:", bq_upsert.run_uuid)
#  print("merge query:", bq_upsert.merge_query)

#  # force run only the merge query --> need to fix the run_uuid to the proper run_uuid!
#  #  bq_upsert.run_merge()

# ===============================================================================
# 7) Load job with defined schema into new/existing table (from mysql results dict)
# ===============================================================================

#  bq_load = BqTools(
#      bq_client=client,
#      table_id="testdb.testproject.testtable",
#      fields_schema=[
#          {"name": "employee_id", "type": "int64", "isKey": 1, "mode": "nullable", "default": None},
#          {"name": "stats_date", "type": "date", "isKey": 0, "mode": "nullable", "default": None},
#          {"name": "annual_ctc", "type": "int64", "isKey": 0, "mode": "nullable", "default": None},
#          {"name": "last_updated", "type": "timestamp", "isKey": 0, "mode": "required", "default": "current_timestamp"},
#      ],
#      table_options={
#          "partition_field": None,
#          "cluster_fields": ["stats_date"],
#      },
#  )

#  # use mysql to run test sql -> into sql_results / rows_to_insert (has exactly the same layout)
#  # need to pass all fields, including required last_updated for example! -> add to dict
#  rows_to_insert = [
#      {"employee_id": 1579, "annual_ctc": 182},
#      {"employee_id": 1589, "annual_ctc": 183},
#      {"employee_id": 1599, "annual_ctc": 1840},
#      {"employee_id": 160, "annual_ctc": 18400},
#      {"employee_id": 161, "annual_ctc": 18400},
#      {"employee_id": 1000, "annual_ctc": 50000},
#  ]

#  # Attention: required field has to be passed via load job!
#  #  add additional field for all items in results dict, e.g., last_updated date
#  for i in range(0, len(rows_to_insert)):
#      rows_to_insert[i].update({"last_updated": datetime_utc})

#  # drop existing table first -> like that we make sure it is empty
#  bq_load.runsql("drop table if exists {}".format(bq_load.table_id))
#  print("table dropped")

#  # run upload from mysql dict -> load job (table to be created, if it doesn't exist via schema)
#  bq_load.load_job_from_json(rows_to_insert, convert_dict_json=True)

# ===============================================================================
# 8) Load job with autodetect schema into new table (from mysql results dict)
# ===============================================================================

#  bq_load = BqTools(
#      bq_client=client,
#      table_id="testdb.testproject.testtable",
#  )

#  # use mysql to run test sql -> into sql_results / rows_to_insert (has exactly the same layout)
#  # need to pass all fields, including required last_updated for example! -> add to dict
#  rows_to_insert = [
#      {"employee_id": 1579, "annual_ctc": 182},
#      {"employee_id": 1589, "annual_ctc": 183},
#  ]

#  # drop existing table first -> like that we make sure it is empty
#  bq_load.runsql("drop table if exists {}".format(bq_load.table_id))
#  print("table dropped")

#  # run upload from mysql dict -> load job (table to be created, if it doesn't exist via schema)
#  bq_load.load_job_from_json(rows_to_insert, convert_dict_json=True, autodetect_schema=True)

```

## Development

Build locally:

```bash
python -m pip install --upgrade build twine
python -m build
twine check dist/*
```

Publish (manual):

```bash
twine upload dist/*
```
