|
|
import pandas as pd
|
|
|
from typing import Dict, Any, List
|
|
|
import asyncio
|
|
|
|
|
|
from util.random_string import generate_random_string
|
|
|
from util.use_mysql import AsyncMySQLClient
|
|
|
|
|
|
async def read_excel_to_mysql(excel_file: str, db_client: AsyncMySQLClient, table_name: str):
|
|
|
"""
|
|
|
读取Excel每个sheet的每一行,并将每一行存到mysql
|
|
|
|
|
|
Args:
|
|
|
excel_file (str): Excel文件路径
|
|
|
db_client (AsyncMySQLClient): 数据库客户端
|
|
|
table_name (str): 目标数据库表名
|
|
|
"""
|
|
|
try:
|
|
|
# 连接数据库
|
|
|
await db_client.connect()
|
|
|
|
|
|
# 读取Excel文件的所有sheet
|
|
|
excel_file_obj = pd.ExcelFile(excel_file)
|
|
|
|
|
|
success_count = 0
|
|
|
fail_details = []
|
|
|
|
|
|
# 遍历每个sheet
|
|
|
for sheet_name in excel_file_obj.sheet_names:
|
|
|
# 读取sheet数据
|
|
|
df = pd.read_excel(excel_file, sheet_name=sheet_name)
|
|
|
|
|
|
# 遍历每一行
|
|
|
for index, row in df.iterrows():
|
|
|
try:
|
|
|
# 将行数据转换为字典
|
|
|
row_data : dict = row.to_dict()
|
|
|
|
|
|
sql = ("INSERT INTO business_table_schema (id, table_name_eng, table_name_zh, field_eng, field_zh, "
|
|
|
"integrity, consistency, timeliness, accuracy, standardization, type) "
|
|
|
"VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)")
|
|
|
|
|
|
params = (generate_random_string(32), row_data['表名'], row_data['表中文名'], row_data['字段名'],
|
|
|
row_data['字段中文名'], row_data['完整性'], row_data['一致性'], row_data['及时性'], row_data['准确性'],
|
|
|
row_data['规范性'], row_data['业务数据类型'])
|
|
|
|
|
|
# 执行插入操作
|
|
|
await db_client.execute(sql, params)
|
|
|
success_count += 1
|
|
|
|
|
|
except Exception as e:
|
|
|
fail_details.append({
|
|
|
'sheet': sheet_name,
|
|
|
'row': index + 1,
|
|
|
'error': str(e)
|
|
|
})
|
|
|
print(f"处理sheet '{sheet_name}' 第 {index + 1} 行时出错: {e}")
|
|
|
|
|
|
result_msg = f"成功插入 {success_count} 行数据"
|
|
|
if fail_details:
|
|
|
result_msg += f",失败 {len(fail_details)} 行"
|
|
|
|
|
|
return {
|
|
|
'message': result_msg,
|
|
|
'success_count': success_count,
|
|
|
'fail_details': fail_details
|
|
|
}
|
|
|
|
|
|
finally:
|
|
|
# 关闭数据库连接
|
|
|
await db_client.close()
|
|
|
|
|
|
# 使用示例
|
|
|
async def main():
|
|
|
# 数据库配置
|
|
|
db_client = AsyncMySQLClient(
|
|
|
host='ngsk.tech',
|
|
|
port=33306,
|
|
|
user='root',
|
|
|
password='ngsk0809cruise',
|
|
|
db='data_governance'
|
|
|
)
|
|
|
|
|
|
# Excel文件路径
|
|
|
excel_file = "电网管理平台(规建域)数据质量标准导出.xlsx"
|
|
|
table_name = "business_table_schema"
|
|
|
|
|
|
# 读取Excel数据并插入数据库
|
|
|
result = await read_excel_to_mysql(excel_file, db_client, table_name)
|
|
|
print(result['message'])
|
|
|
|
|
|
if result['fail_details']:
|
|
|
print("失败详情:")
|
|
|
for fail in result['fail_details']:
|
|
|
print(f" Sheet: {fail['sheet']}, 行: {fail['row']}, 错误: {fail['error']}")
|
|
|
|
|
|
# 如果需要直接运行
|
|
|
if __name__ == "__main__":
|
|
|
asyncio.run(main())
|
|
|
pass
|