You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
report_app/app/tools/doc2mysql.py

208 lines
6.0 KiB
Python

import logging
from datetime import datetime
# import io
# from datetime import date
from docx import Document
from sqlalchemy import delete
from sqlalchemy.orm import Session
from sqlalchemy.dialects.mysql import insert
# from io import BytesIO
from app.entity.database.session import get_db
from app.entity.models.PowerOutageStats import DailyReport, DailyReportRawdata
from app.entity.models.VacationData import VacationData
# 获取日志记录器
logger = logging.getLogger(__name__)
# 将数据保存到数据库
def save_word_document(data_dict):
try:
logger.info("开始写入mysql")
# 获取数据库连接
db: Session = next(get_db())
stmt = (
insert(DailyReport)
.values(**data_dict)
.on_duplicate_key_update(statistics_time=data_dict["statistics_time"])
)
result = db.execute(stmt)
db.commit()
logger.info(f"数据写入数据库成功,受影响的行数:{result.rowcount}")
return {"status": "success", "affected_rows": result.rowcount}
except Exception as e:
print(f"日报文档路径写入数据库失败{e}")
# 原始文件保存路径到数据库
# 将数据保存到数据库
def save_raw_files(data_dict):
try:
logger.info("开始写入mysql")
# 获取数据库连接
db: Session = next(get_db())
stmt = (
insert(DailyReportRawdata)
.values(**data_dict)
.on_duplicate_key_update(statistics_time=data_dict["statistics_time"])
)
result = db.execute(stmt)
db.commit()
logger.info(f"数据写入数据库成功,受影响的行数:{result.rowcount}")
return {"status": "success", "affected_rows": result.rowcount}
except Exception as e:
print(f"原数据文档路径写入数据库失败{e}")
def delete_vacation_data(last_year_period: str):
try:
logger.info("开始查询去年记录")
db: Session = next(get_db())
print(f"last_year_period: {last_year_period}")
delete_statement = delete(VacationData).where(
VacationData.last_year_period == last_year_period
)
result = db.execute(delete_statement)
db.commit()
return result
except Exception as e:
logger.exception(f"查询需下载的记录失败:{e}")
return None
def query_vacation_data(last_year_period: str):
try:
logger.info("开始删除数据")
db: Session = next(get_db())
print(f"last_year_period: {last_year_period}")
vacation_data_info = (
db.query(VacationData)
.filter(VacationData.last_year_period == last_year_period)
.first()
)
return vacation_data_info
except Exception as e:
logger.exception(f"查询需下载的记录失败:{e}")
return None
def save_vacation_data(data_dict):
try:
logger.info("开始写入mysql")
# 获取数据库连接
db: Session = next(get_db())
new_vacation = VacationData(**data_dict)
db.add(new_vacation)
db.commit()
db.refresh(new_vacation)
return new_vacation
except Exception as e:
print(f"原数据文档路径写入数据库失败{e}")
def get_file_path(statistics_time: datetime):
try:
logger.info("开始查询需下载的记录")
db: Session = next(get_db())
print(f"statistics_time: {statistics_time}, type: {type(statistics_time)}")
file_info = (
db.query(DailyReport)
.filter(DailyReport.statistics_time == statistics_time)
.first()
)
query = db.query(DailyReport).filter(
DailyReport.statistics_time == statistics_time
)
print(query.statement.compile(compile_kwargs={"literal_binds": True}))
all_statistics_times = db.query(DailyReport.statistics_time).all()
print(f"All statistics_time in DB: {all_statistics_times}")
if not file_info:
logger.info("查询需下载的记录失败")
return None
logger.info("查询需下载的记录成功")
return file_info
except Exception as e:
logger.exception(f"查询需下载的记录失败:{e}")
return None
def get_weekly_file(start_time: datetime, end_time: datetime):
try:
logger.info("开始查询周报路径")
db: Session = next(get_db())
file_info = (
db.query(DailyReport)
.filter(
DailyReport.statistics_time >= start_time,
DailyReport.statistics_time <= end_time,
)
.all()
)
if not file_info:
logger.info("无该时间段周报路径数据")
return None
logger.info("查询周报路径成功")
return file_info
except Exception as e:
logger.exception(f"查询周报路径失败:{e}")
return None
# 原始文件的路径
def get_raw_file(start_time: datetime, end_time: datetime):
try:
logger.info("开始查询原始文件路径")
db: Session = next(get_db())
file_info = (
db.query(DailyReportRawdata)
.filter(
DailyReportRawdata.statistics_time >= start_time,
DailyReportRawdata.statistics_time <= end_time,
)
.all()
)
if not file_info:
logger.info("无该时间段原始文件路径")
return None
logger.info("查询原始文件路径成功")
return file_info
except Exception as e:
logger.exception(f"查询原始文件路径失败:{e}")
return None
if __name__ == "__main__":
file_path = r"E:\work_data\work\三工单日报\三工单\20250311\20250311日报\公司全国“两会”保供电期间配网设备运行及三工单监测日报-20250311.docx"
doc1 = Document(file_path)
# print(callable(save_word_document(doc1,2025,3,11)))