You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

139 lines
5.3 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import pandas as pd
from typing import Dict, Any, List
import asyncio
from util.random_string import generate_random_string
from util.use_mysql import AsyncMySQLClient
async def read_excel_to_mysql(excel_file: str, db_client: AsyncMySQLClient, table_name: str):
"""
读取Excel每个sheet的每一行并将每一行存到mysql
Args:
excel_file (str): Excel文件路径
db_client (AsyncMySQLClient): 数据库客户端
table_name (str): 目标数据库表名
"""
try:
# 连接数据库
await db_client.connect()
# 检查表是否存在,不存在则创建
await create_table_if_not_exists(db_client, table_name)
# 读取Excel文件的所有sheet
excel_file_obj = pd.ExcelFile(excel_file)
success_count = 0
fail_details = []
# 遍历每个sheet
for sheet_name in excel_file_obj.sheet_names:
# 读取sheet数据
df = pd.read_excel(excel_file, sheet_name=sheet_name)
# 遍历每一行
for index, row in df.iterrows():
try:
# 将行数据转换为字典
row_data : dict = row.to_dict()
sql = (f"INSERT INTO {table_name} (id, table_name_eng, table_name_zh, field_eng, field_zh, "
"integrity, consistency, timeliness, accuracy, standardization, type) "
"VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)")
params = (generate_random_string(32), row_data['表名'], row_data['表中文名'], row_data['字段名'],
row_data['字段中文名'], row_data['完整性'], row_data['一致性'], row_data['及时性'], row_data['准确性'],
row_data['规范性'], row_data['业务数据类型'])
# 执行插入操作
await db_client.execute(sql, params)
success_count += 1
except Exception as e:
fail_details.append({
'sheet': sheet_name,
'row': index + 1,
'error': str(e)
})
print(f"处理sheet '{sheet_name}'{index + 1} 行时出错: {e}")
result_msg = f"成功插入 {success_count} 行数据"
if fail_details:
result_msg += f",失败 {len(fail_details)}"
return {
'message': result_msg,
'success_count': success_count,
'fail_details': fail_details
}
finally:
# 关闭数据库连接
await db_client.close()
async def create_table_if_not_exists(db_client: AsyncMySQLClient, table_name: str):
"""
检查表是否存在,如果不存在则创建表
Args:
db_client (AsyncMySQLClient): 数据库客户端
table_name (str): 表名
"""
create_table_sql = f"""
CREATE TABLE {table_name} (
`id` varchar(32) COLLATE utf8mb4_general_ci NOT NULL,
`table_name_eng` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '表名',
`table_name_zh` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '表中文名',
`field_eng` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '字段名',
`field_zh` varchar(1024) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '字段中文名',
`integrity` varchar(100) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '完整性',
`consistency` varchar(100) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '一致性',
`timeliness` varchar(100) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '及时性',
`accuracy` varchar(100) COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '准确性',
`standardization` varchar(1024) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '规范性',
`type` varchar(100) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '业务数据类型',
PRIMARY KEY (`id`),
UNIQUE KEY `business_table_schema_table_name_eng_IDX` (`table_name_eng`,`field_eng`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
"""
try:
await db_client.execute(create_table_sql)
print(f"{table_name} 已存在或创建成功")
except Exception as e:
print(f"创建表 {table_name} 时出错: {e}")
raise
# 使用示例
async def main():
# 数据库配置
db_client = AsyncMySQLClient(
host='ngsk.tech',
port=33306,
user='root',
password='ngsk0809cruise',
db='data_governance'
)
# Excel文件路径
excel_file = "电网管理平台(规建域)数据质量标准导出.xlsx"
table_name = "business_table_schema"
# 读取Excel数据并插入数据库
result = await read_excel_to_mysql(excel_file, db_client, table_name)
print(result['message'])
if result['fail_details']:
print("失败详情:")
for fail in result['fail_details']:
print(f" Sheet: {fail['sheet']}, 行: {fail['row']}, 错误: {fail['error']}")
# 如果需要直接运行
if __name__ == "__main__":
asyncio.run(main())
pass