Metadata-Version: 2.2
Name: pgmanage
Version: 1.0.4
Summary: pgsql数据库工具
Home-page: https://gitee.com/manjim/pgmanage
Author: manji
Author-email: pnsm@qq.com
Classifier: Programming Language :: Python :: 3
Classifier: License :: OSI Approved :: MIT License
Classifier: Operating System :: OS Independent
Requires-Python: >=3
Description-Content-Type: text/markdown
License-File: LICENSE
Requires-Dist: psycopg[binary,pool]
Dynamic: author
Dynamic: author-email
Dynamic: classifier
Dynamic: description
Dynamic: description-content-type
Dynamic: home-page
Dynamic: requires-dist
Dynamic: requires-python
Dynamic: summary



# 使用说明

本程序包用到的库：psycopg3 
官方文档：https://www.psycopg.org/psycopg3/docs/advanced/pool.html#other-ways-to-create-a-pool
开源地址：https://gitee.com/manjim/pgmanage
```python
# 依赖模块
psycopg[binary,pool]
```

## 1. 下载安装

```python
# 安装
pip install pgmanage

# 升级
pip install --upgrade pgmanage

# 卸载
pip uninstall pgmanage
```

## 2. 导入包模块

```python
from pgmanage import PgPool         # 数据库连接池模块
from pgmanage import PgExec         # 全功能模块 包含连接池
from pgmanage import AsyncPgPool    # 异步数据库连接池模块
from pgmanage import AsyncPgExec    # 异步全功能模块 包含连接池
```

## 数据库连接字符串
根据环境变量配置的数据库字符串，自动链接数据库。也可传入连接字符串参数。
```
环境变量名称：PGLINK_URL
链接字符串示例：'dbname=shop_data user=postgres password=1116666688 host=127.0.0.1 port=5432'
环境变量写法示例：PGLINK_URL='dbname=shop_data user=postgres password=1116666688 host=127.0.0.1 port=5432'
```

## 3.使用示例

### 同步模式
- 示例1——PgPool
```python
from pgmanage import PgPool

dbm = PgPool()
with dbm.pool.connection() as conn:
    with conn.cursor() as cur:
        cur.execute("SELECT 1")
        data = cur.fetchone()
        print("查询结果:",data)
# 使用完成后会自动关闭连接池
```

- 示例2-PgExec

```python
# 设置日志
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

from pgmanage import PgExec

def main():

    # 生成10万行的测试数据
    data_to_bulk_insert = []
    for i in range(100000):
        data_to_bulk_insert.append({'id': f'{i}', 'name': f'name{i}88','name2': f'name{i}88', 'age': i % 100, 'email': f'email{i}@example.com','fasle': True})


    # 创建数据库管理器实例
    db_mgr = PgExec()

    # 写入数据，主键冲突时更新所有非主键列 upsert_data
    rows_inserted = db_mgr.upsert_data(
        schema='data_test', 
        table='table_test', 
        data=data_to_bulk_insert,
        batch_size=10000,
        update=True,
        create=True,
        add_columns=True,
        max_workers=10,
    )
    logger.info(f"总共插入 {rows_inserted} 行")

    # 执行 SQL 查询
    query = "SELECT * FROM data_test.table_test LIMIT 10;"
    result = db_mgr.exec_query(query)
    for row in result:
        logger.info(row)
        
    # 删除数据
    query = "delete  FROM data_test.table_test;"
    result = db_mgr.exec_query_rowcount(query)
    logger.info(f"删除结果:{result}")

if __name__ == '__main__':
    main()
```

### 异步模式
- 示例1——AsyncPgPool
```python
import asyncio
from pgmanage import AsyncPgPool
async def main():
    try:
        # 初始化连接池
        pool = await AsyncPgPool.initialize()

        # 使用连接池执行数据库操作
        async with pool.connection() as conn:
            async with conn.cursor() as cur:
                await cur.execute("SELECT 1")
                rows = await cur.fetchall()
                for row in rows:
                    print("查询结果:",row)
    except Exception as e:
        logger.error(f"发生错误: {e}")
        raise

if __name__ == "__main__":
    asyncio.run(main())
```
- 示例2-AsyncPgExec
```python
import logging
import asyncio
from pgmanage import AsyncPgExec

# 设置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


# 使用示例
async def main():
    import os
    DBURI = os.environ.get("PGLINK_URL")
    
    # 方式1：使用上下文管理器（推荐）
    async with AsyncPgExec(DBURI) as dbmange:
        # 生成测试数据
        data_to_bulk_insert = [
            {'id': f'{i}', 'name': f'name{i}88', 'age': i % 100, 
             'email': f'email{i}@example.com', 'fasle': True}
            for i in range(100000)
        ]

        result = await dbmange.upsert_data(
            'data_test', 'table_test', data_to_bulk_insert,
            batch_size=10000, update=True, create=True, add_columns=True
        )
        
        logger.info(f"总共成功插入或更新了 {result} 条记录")


    # 方式2：直接使用（程序结束时会自动关闭连接池）
    # 生成测试数据
    data_to_bulk_insert = [
        {'id': f'{i}', 'name': f'name{i}88', 'age': i % 100, 
         'email': f'email{i}@example.com', 'fasle': True}
        for i in range(100000)
    ]
    db_exec = AsyncPgExec(DBURI)
    db_exec.pool = await db_exec.async_pool.initialize()
    result = await db_exec.upsert_data(
        'data_test', 'table_test', data_to_bulk_insert,
        batch_size=10000, update=True, create=True, add_columns=True
    )
    logger.info(f"总共成功插入或更新了 {result} 条记录")
    # 不需要手动关闭，atexit 会处理

if __name__ == "__main__":
    asyncio.run(main())
```
