|
|
import asyncio
|
|
|
import aiomysql
|
|
|
from typing import List, Dict, Any, Optional
|
|
|
|
|
|
from env import MysqlDataBaseConfig
|
|
|
|
|
|
|
|
|
class AsyncMySQLClient:
|
|
|
def __init__(self, host: str, port: int, user: str, password: str, db: str):
|
|
|
"""
|
|
|
初始化异步MySQL客户端
|
|
|
|
|
|
Args:
|
|
|
host: MySQL服务器地址
|
|
|
port: MySQL服务器端口
|
|
|
user: 用户名
|
|
|
password: 密码
|
|
|
db: 数据库名
|
|
|
"""
|
|
|
self.host = host
|
|
|
self.port = port
|
|
|
self.user = user
|
|
|
self.password = password
|
|
|
self.db = db
|
|
|
self.pool = None
|
|
|
|
|
|
async def connect(self):
|
|
|
"""
|
|
|
建立连接池
|
|
|
"""
|
|
|
self.pool = await aiomysql.create_pool(
|
|
|
host=self.host,
|
|
|
port=self.port,
|
|
|
user=self.user,
|
|
|
password=self.password,
|
|
|
db=self.db,
|
|
|
charset='utf8mb4',
|
|
|
autocommit=True
|
|
|
)
|
|
|
|
|
|
async def close(self):
|
|
|
"""
|
|
|
关闭连接池
|
|
|
"""
|
|
|
if self.pool:
|
|
|
self.pool.close()
|
|
|
await self.pool.wait_closed()
|
|
|
|
|
|
async def query_one(self, sql: str, params: Optional[tuple] = None) -> Optional[Dict[str, Any]]:
|
|
|
"""
|
|
|
查询单条记录
|
|
|
|
|
|
Args:
|
|
|
sql: SQL查询语句
|
|
|
params: 查询参数
|
|
|
|
|
|
Returns:
|
|
|
查询结果字典或None
|
|
|
"""
|
|
|
async with self.pool.acquire() as conn:
|
|
|
async with conn.cursor(aiomysql.DictCursor) as cursor:
|
|
|
await cursor.execute(sql, params)
|
|
|
result = await cursor.fetchone()
|
|
|
return result
|
|
|
|
|
|
async def query_all(self, sql: str, params: Optional[tuple] = None) -> List[Dict[str, Any]]:
|
|
|
"""
|
|
|
查询多条记录
|
|
|
|
|
|
Args:
|
|
|
sql: SQL查询语句
|
|
|
params: 查询参数
|
|
|
|
|
|
Returns:
|
|
|
查询结果列表
|
|
|
"""
|
|
|
async with self.pool.acquire() as conn:
|
|
|
async with conn.cursor(aiomysql.DictCursor) as cursor:
|
|
|
await cursor.execute(sql, params)
|
|
|
results = await cursor.fetchall()
|
|
|
return results
|
|
|
|
|
|
async def execute(self, sql: str, params: Optional[tuple] = None) -> int:
|
|
|
"""
|
|
|
执行SQL语句(INSERT, UPDATE, DELETE等)
|
|
|
|
|
|
Args:
|
|
|
sql: SQL执行语句
|
|
|
params: 执行参数
|
|
|
|
|
|
Returns:
|
|
|
受影响的行数
|
|
|
"""
|
|
|
async with self.pool.acquire() as conn:
|
|
|
async with conn.cursor() as cursor:
|
|
|
await cursor.execute(sql, params)
|
|
|
return cursor.rowcount
|
|
|
|
|
|
async def search_desc_by_table_names(table_name_list : list, db_client: AsyncMySQLClient):
|
|
|
"""
|
|
|
搜索数据库中的表数据
|
|
|
参数:
|
|
|
table_name_list (list): 表名列表
|
|
|
db_client (AsyncMySQLClient): 异步MySQL数据库客户端,用于执行数据库操作
|
|
|
返回值:
|
|
|
返回所有表的描述
|
|
|
"""
|
|
|
ans = []
|
|
|
try:
|
|
|
# 连接数据库
|
|
|
await db_client.connect()
|
|
|
for table_name in table_name_list:
|
|
|
# 查询多条记录
|
|
|
records = await db_client.query_all("SELECT * FROM business_table_schema WHERE table_name_eng like %s",
|
|
|
(table_name,))
|
|
|
content = ""
|
|
|
table_name_zh = ""
|
|
|
for record in records:
|
|
|
content += record.get("field_eng")
|
|
|
content += "-"
|
|
|
content += record.get("field_zh")
|
|
|
content += "-"
|
|
|
content += record.get("type")
|
|
|
content += "-"
|
|
|
content += "完整性:"
|
|
|
content += record.get("integrity")
|
|
|
content += "-"
|
|
|
content += "一致性:"
|
|
|
content += record.get("consistency")
|
|
|
content += "-"
|
|
|
content += "及时性:"
|
|
|
content += record.get("timeliness")
|
|
|
content += "-"
|
|
|
content += "准确性:"
|
|
|
content += record.get("accuracy")
|
|
|
content += "-"
|
|
|
content += "规范性:"
|
|
|
content += record.get("standardization")
|
|
|
table_name_zh = record.get("table_name_zh")
|
|
|
|
|
|
ans.append({
|
|
|
"表英文名": table_name,
|
|
|
"表中文名": table_name_zh,
|
|
|
"字段介绍": content,
|
|
|
})
|
|
|
|
|
|
return ans
|
|
|
except Exception as e:
|
|
|
print(f"查询失败: {e}")
|
|
|
finally:
|
|
|
# 关闭连接
|
|
|
await db_client.close()
|
|
|
|
|
|
async def insert(data : list[dict[str, Any]], db_client: AsyncMySQLClient):
|
|
|
"""
|
|
|
插入或更新数据
|
|
|
参数:
|
|
|
data (dict): 要插入或更新的数据
|
|
|
db_client (AsyncMySQLClient): 异步MySQL数据库客户端,用于执行数据库操作
|
|
|
返回值:
|
|
|
返回受影响的行数
|
|
|
"""
|
|
|
|
|
|
try:
|
|
|
# 连接数据库
|
|
|
await db_client.connect()
|
|
|
i, success_count = 0, 0
|
|
|
fail_index = []
|
|
|
for data in data:
|
|
|
if "table_name_eng" not in data or "table_name_zh" not in data or "region" not in data or "description" not in data:
|
|
|
print(f"第{i}行数据不完整")
|
|
|
fail_index.append(i)
|
|
|
i += 1
|
|
|
continue
|
|
|
try:
|
|
|
affected_rows = await db_client.execute(
|
|
|
"INSERT INTO business_database_schema (table_name_eng, table_name_zh, region, description) VALUES (%s, %s, %s, %s)",
|
|
|
(data["table_name_eng"], data["table_name_zh"], data["region"], data["description"])
|
|
|
)
|
|
|
except Exception as e:
|
|
|
print(f"第{i}行数据插入失败: {e}")
|
|
|
fail_index.append(i)
|
|
|
i += 1
|
|
|
continue
|
|
|
i += 1
|
|
|
success_count += 1
|
|
|
return f"成功插入了 {success_count} 条记录,失败的行: {fail_index}"
|
|
|
finally:
|
|
|
# 关闭连接
|
|
|
await db_client.close()
|
|
|
|
|
|
# 创建数据库客户端实例
|
|
|
db_client = AsyncMySQLClient(
|
|
|
host=MysqlDataBaseConfig.db_host,
|
|
|
port=MysqlDataBaseConfig.db_port,
|
|
|
user=MysqlDataBaseConfig.db_username,
|
|
|
password=MysqlDataBaseConfig.db_password,
|
|
|
db=MysqlDataBaseConfig.db_database
|
|
|
)
|
|
|
async def get_db():
|
|
|
# 创建数据库客户端实例
|
|
|
return db_client
|
|
|
|
|
|
# 使用示例
|
|
|
async def main():
|
|
|
# 创建数据库客户端实例
|
|
|
db_client = await get_db()
|
|
|
|
|
|
data = [{
|
|
|
"table_name_eng": "BPMS_RU_DEF_ATT_NODE_REL",
|
|
|
"table_name_zh": "自由流程运行时扩展属性节点关系",
|
|
|
"region": "规建域",
|
|
|
"description": "PROCESS_NAME-VARCHAR (100)- 流程的名称,VERSION-NUMBER (5)- 流程的版本号,NODE_ID-VARCHAR (100)- 节点的编号,CREATE_TIME-DATETIME - 记录的创建时间,格式为 YYYY-MM-DD HH:mm:ss, ATT_NODE_REL_ID-VARCHAR (64)- 扩展属性与节点关系的编号,ATT_ID-VARCHAR (64)- 扩展属性的编号,ATT_VALUE-VARCHAR (2000)- 扩展属性的取值,DEPLOY_ID-VARCHAR (64)- 部署的编号,NODE_NAME-VARCHAR (100)- 节点的名称,PROCESS_INS_ID-VARCHAR (40)- 关联的流程实例 ID, MODIFY_DATE-DATETIME - 记录的修改时间,格式为 YYYY-MM-DD HH:mm:ss, PROCESS_ID-VARCHAR (100)- 流程的编号,CREATOR_ID-VARCHAR (40)- 创建人的标识 ID"
|
|
|
},{
|
|
|
"table_name_eng": "BPMS_RU_DEF_DEPLOYE",
|
|
|
"table_name_zh": "自由流程运行时部署基本信息表",
|
|
|
"region": "规建域",
|
|
|
"description": "CREATE_TIME-DATETIME - 记录的创建时间,格式为 YYYY-MM-DD HH:mm:ss, CREATOR_ID-VARCHAR (40)- 创建人的标识 ID, DEPLOY_DIR_CODE-VARCHAR (200)- 部署目录的编号,DEPLOY_PERSON-VARCHAR (40)- 部署人的标识 ID, DEPLOY_PERSON_NAME-VARCHAR (100)- 部署人的姓名,DEPLOY_TIME-DATETIME - 部署操作的时间,格式为 YYYY-MM-DD HH:mm:ss, LOCK_PERSON_NAME-VARCHAR (100)- 部署的姓名,LOCKED-NUMBER (1)- 锁定状态标识,MODIFY_DATE-DATETIME - 记录的修改时间,格式为 YYYY-MM-DD HH:mm:ss, PROCESS_DEF_ID-VARCHAR (40)- 流程定义文件的 ID, PROCESS_DEF_NAME-VARCHAR (100)- 流程定义文件的名称,PROCESS_INS_ID-VARCHAR (40)- 关联的流程实例 ID, STATE-CHAR (1)- 部署的状态代码(0 - 未处理,1 - 已完成), UNINSTALL_TIME-DATETIME - 卸载操作的时间,格式为 YYYY-MM-DD HH:mm:ss, UPDATE_PERSON-VARCHAR (40)- 权限人的标识 ID, UPDATE_PERSON_NAME-VARCHAR (100)- 权限人的姓名,VERSION-NUMBER (5)- 流程定义的版本号,VERSION_DES-VARCHAR (200)- 流程定义版本的描述信息"
|
|
|
}]
|
|
|
await insert(data, db_client)
|
|
|
|
|
|
# 如果需要直接运行测试
|
|
|
if __name__ == "__main__":
|
|
|
asyncio.run(main())
|