import pandas as pd from typing import Dict, Any, List import asyncio from util.random_string import generate_random_string from util.use_mysql import AsyncMySQLClient async def read_excel_to_mysql(excel_file: str, db_client: AsyncMySQLClient, table_name: str): """ 读取Excel每个sheet的每一行,并将每一行存到mysql Args: excel_file (str): Excel文件路径 db_client (AsyncMySQLClient): 数据库客户端 table_name (str): 目标数据库表名 """ try: # 连接数据库 await db_client.connect() # 检查表是否存在,不存在则创建 await create_table_if_not_exists(db_client, table_name) # 读取Excel文件的所有sheet excel_file_obj = pd.ExcelFile(excel_file) success_count = 0 fail_details = [] # 遍历每个sheet for sheet_name in excel_file_obj.sheet_names: # 读取sheet数据 df = pd.read_excel(excel_file, sheet_name=sheet_name) # 遍历每一行 for index, row in df.iterrows(): try: # 将行数据转换为字典 row_data : dict = row.to_dict() sql = (f"INSERT INTO {table_name} (id, table_name_eng, table_name_zh, field_eng, field_zh, " "integrity, consistency, timeliness, accuracy, standardization, type) " "VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)") params = (generate_random_string(32), row_data['表名'], row_data['表中文名'], row_data['字段名'], row_data['字段中文名'], row_data['完整性'], row_data['一致性'], row_data['及时性'], row_data['准确性'], row_data['规范性'], row_data['业务数据类型']) # 执行插入操作 await db_client.execute(sql, params) success_count += 1 except Exception as e: fail_details.append({ 'sheet': sheet_name, 'row': index + 1, 'error': str(e) }) print(f"处理sheet '{sheet_name}' 第 {index + 1} 行时出错: {e}") result_msg = f"成功插入 {success_count} 行数据" if fail_details: result_msg += f",失败 {len(fail_details)} 行" return { 'message': result_msg, 'success_count': success_count, 'fail_details': fail_details } finally: # 关闭数据库连接 await db_client.close() async def create_table_if_not_exists(db_client: AsyncMySQLClient, table_name: str): """ 检查表是否存在,如果不存在则创建表 Args: db_client (AsyncMySQLClient): 数据库客户端 table_name (str): 表名 """ create_table_sql = f""" CREATE TABLE {table_name} ( `id` varchar(32) COLLATE utf8mb4_general_ci NOT NULL, `table_name_eng` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '表名', `table_name_zh` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '表中文名', `field_eng` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '字段名', `field_zh` varchar(1024) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '字段中文名', `integrity` varchar(100) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '完整性', `consistency` varchar(100) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '一致性', `timeliness` varchar(100) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '及时性', `accuracy` varchar(100) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '准确性', `standardization` varchar(1024) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '规范性', `type` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '业务数据类型', PRIMARY KEY (`id`), UNIQUE KEY `business_table_schema_table_name_eng_IDX` (`table_name_eng`,`field_eng`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; """ try: await db_client.execute(create_table_sql) print(f"表 {table_name} 已存在或创建成功") except Exception as e: print(f"创建表 {table_name} 时出错: {e}") raise # 使用示例 async def main(): # 数据库配置 db_client = AsyncMySQLClient( host='ngsk.tech', port=33306, user='root', password='ngsk0809cruise', db='data_governance' ) # Excel文件路径 excel_file = "电网管理平台(规建域)数据质量标准导出.xlsx" table_name = "business_table_schema" # 读取Excel数据并插入数据库 result = await read_excel_to_mysql(excel_file, db_client, table_name) print(result['message']) if result['fail_details']: print("失败详情:") for fail in result['fail_details']: print(f" Sheet: {fail['sheet']}, 行: {fail['row']}, 错误: {fail['error']}") # 如果需要直接运行 if __name__ == "__main__": asyncio.run(main()) pass