import asyncio import aiomysql from typing import List, Dict, Any, Optional from env import MysqlDataBaseConfig class AsyncMySQLClient: def __init__(self, host: str, port: int, user: str, password: str, db: str): """ 初始化异步MySQL客户端 Args: host: MySQL服务器地址 port: MySQL服务器端口 user: 用户名 password: 密码 db: 数据库名 """ self.host = host self.port = port self.user = user self.password = password self.db = db self.pool = None async def connect(self): """ 建立连接池 """ self.pool = await aiomysql.create_pool( host=self.host, port=self.port, user=self.user, password=self.password, db=self.db, charset='utf8mb4', autocommit=True ) async def close(self): """ 关闭连接池 """ if self.pool: self.pool.close() await self.pool.wait_closed() async def query_one(self, sql: str, params: Optional[tuple] = None) -> Optional[Dict[str, Any]]: """ 查询单条记录 Args: sql: SQL查询语句 params: 查询参数 Returns: 查询结果字典或None """ async with self.pool.acquire() as conn: async with conn.cursor(aiomysql.DictCursor) as cursor: await cursor.execute(sql, params) result = await cursor.fetchone() return result async def query_all(self, sql: str, params: Optional[tuple] = None) -> List[Dict[str, Any]]: """ 查询多条记录 Args: sql: SQL查询语句 params: 查询参数 Returns: 查询结果列表 """ async with self.pool.acquire() as conn: async with conn.cursor(aiomysql.DictCursor) as cursor: await cursor.execute(sql, params) results = await cursor.fetchall() return results async def execute(self, sql: str, params: Optional[tuple] = None) -> int: """ 执行SQL语句(INSERT, UPDATE, DELETE等) Args: sql: SQL执行语句 params: 执行参数 Returns: 受影响的行数 """ async with self.pool.acquire() as conn: async with conn.cursor() as cursor: await cursor.execute(sql, params) return cursor.rowcount async def search_desc_by_table_names(table_name_list : list, db_client: AsyncMySQLClient): """ 搜索数据库中的表数据 参数: table_name_list (list): 表名列表 db_client (AsyncMySQLClient): 异步MySQL数据库客户端,用于执行数据库操作 返回值: 返回所有表的描述 """ ans = [] try: # 连接数据库 await db_client.connect() for table_name in table_name_list: # 查询多条记录 records = await db_client.query_all("SELECT * FROM business_table_schema WHERE table_name_eng like %s", (table_name,)) content = "" table_name_zh = "" for record in records: content += record.get("field_eng") content += "-" content += record.get("field_zh") content += "-" content += record.get("type") content += "-" content += "完整性:" content += record.get("integrity") content += "-" content += "一致性:" content += record.get("consistency") content += "-" content += "及时性:" content += record.get("timeliness") content += "-" content += "准确性:" content += record.get("accuracy") content += "-" content += "规范性:" content += record.get("standardization") table_name_zh = record.get("table_name_zh") ans.append({ "表英文名": table_name, "表中文名": table_name_zh, "字段介绍": content, }) return ans except Exception as e: print(f"查询失败: {e}") finally: # 关闭连接 await db_client.close() async def insert(data : list[dict[str, Any]], db_client: AsyncMySQLClient): """ 插入或更新数据 参数: data (dict): 要插入或更新的数据 db_client (AsyncMySQLClient): 异步MySQL数据库客户端,用于执行数据库操作 返回值: 返回受影响的行数 """ try: # 连接数据库 await db_client.connect() i, success_count = 0, 0 fail_index = [] for data in data: if "table_name_eng" not in data or "table_name_zh" not in data or "region" not in data or "description" not in data: print(f"第{i}行数据不完整") fail_index.append(i) i += 1 continue try: affected_rows = await db_client.execute( "INSERT INTO business_database_schema (table_name_eng, table_name_zh, region, description) VALUES (%s, %s, %s, %s)", (data["table_name_eng"], data["table_name_zh"], data["region"], data["description"]) ) except Exception as e: print(f"第{i}行数据插入失败: {e}") fail_index.append(i) i += 1 continue i += 1 success_count += 1 return f"成功插入了 {success_count} 条记录,失败的行: {fail_index}" finally: # 关闭连接 await db_client.close() # 创建数据库客户端实例 db_client = AsyncMySQLClient( host=MysqlDataBaseConfig.db_host, port=MysqlDataBaseConfig.db_port, user=MysqlDataBaseConfig.db_username, password=MysqlDataBaseConfig.db_password, db=MysqlDataBaseConfig.db_database ) async def get_db(): # 创建数据库客户端实例 return db_client # 使用示例 async def main(): # 创建数据库客户端实例 db_client = await get_db() data = [{ "table_name_eng": "BPMS_RU_DEF_ATT_NODE_REL", "table_name_zh": "自由流程运行时扩展属性节点关系", "region": "规建域", "description": "PROCESS_NAME-VARCHAR (100)- 流程的名称,VERSION-NUMBER (5)- 流程的版本号,NODE_ID-VARCHAR (100)- 节点的编号,CREATE_TIME-DATETIME - 记录的创建时间,格式为 YYYY-MM-DD HH:mm:ss, ATT_NODE_REL_ID-VARCHAR (64)- 扩展属性与节点关系的编号,ATT_ID-VARCHAR (64)- 扩展属性的编号,ATT_VALUE-VARCHAR (2000)- 扩展属性的取值,DEPLOY_ID-VARCHAR (64)- 部署的编号,NODE_NAME-VARCHAR (100)- 节点的名称,PROCESS_INS_ID-VARCHAR (40)- 关联的流程实例 ID, MODIFY_DATE-DATETIME - 记录的修改时间,格式为 YYYY-MM-DD HH:mm:ss, PROCESS_ID-VARCHAR (100)- 流程的编号,CREATOR_ID-VARCHAR (40)- 创建人的标识 ID" },{ "table_name_eng": "BPMS_RU_DEF_DEPLOYE", "table_name_zh": "自由流程运行时部署基本信息表", "region": "规建域", "description": "CREATE_TIME-DATETIME - 记录的创建时间,格式为 YYYY-MM-DD HH:mm:ss, CREATOR_ID-VARCHAR (40)- 创建人的标识 ID, DEPLOY_DIR_CODE-VARCHAR (200)- 部署目录的编号,DEPLOY_PERSON-VARCHAR (40)- 部署人的标识 ID, DEPLOY_PERSON_NAME-VARCHAR (100)- 部署人的姓名,DEPLOY_TIME-DATETIME - 部署操作的时间,格式为 YYYY-MM-DD HH:mm:ss, LOCK_PERSON_NAME-VARCHAR (100)- 部署的姓名,LOCKED-NUMBER (1)- 锁定状态标识,MODIFY_DATE-DATETIME - 记录的修改时间,格式为 YYYY-MM-DD HH:mm:ss, PROCESS_DEF_ID-VARCHAR (40)- 流程定义文件的 ID, PROCESS_DEF_NAME-VARCHAR (100)- 流程定义文件的名称,PROCESS_INS_ID-VARCHAR (40)- 关联的流程实例 ID, STATE-CHAR (1)- 部署的状态代码(0 - 未处理,1 - 已完成), UNINSTALL_TIME-DATETIME - 卸载操作的时间,格式为 YYYY-MM-DD HH:mm:ss, UPDATE_PERSON-VARCHAR (40)- 权限人的标识 ID, UPDATE_PERSON_NAME-VARCHAR (100)- 权限人的姓名,VERSION-NUMBER (5)- 流程定义的版本号,VERSION_DES-VARCHAR (200)- 流程定义版本的描述信息" }] await insert(data, db_client) # 如果需要直接运行测试 if __name__ == "__main__": asyncio.run(main())