feat 🐛: init

v2025-07-11
许标 1 week ago
commit febff32998

@ -0,0 +1,24 @@
# 使用 Python 基础镜像
FROM python:3.12
# 设置工作目录
WORKDIR /app
# 更换 pip 源为阿里云源
RUN pip config set global.index-url https://mirrors.aliyun.com/pypi/simple/
# 复制项目文件
COPY requirements.txt .
# 安装 Python 依赖
RUN pip install --no-cache-dir -r requirements.txt
# 复制项目代码
COPY . .
# 暴露端口
EXPOSE 8000
# 启动 FastAPI 应用
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "5000"]

@ -0,0 +1,347 @@
from datetime import datetime
from fastapi import File, UploadFile, APIRouter, BackgroundTasks, HTTPException
from fastapi.responses import JSONResponse, StreamingResponse, FileResponse
from pathlib import Path
from typing import List
from time import sleep
import time
import os
import shutil
import zipfile
import logging
from app.tools.doc2docx import doc2docx
from app.tools.final_doc import deal_docx
from app.tools.docx2html import docx2html
from app.tools.get_final_name import get_final_name
from app.tools.clean_file_names import clean_file_names
from app.tools.doc2mysql import (
save_word_document,
get_file_path,
get_weekly_file,
save_raw_files,
get_raw_file,
)
from app.tools.move_raw_files import move_raw_files
# 获取日志记录器
logger = logging.getLogger(__name__)
router = APIRouter()
# 文件保存目录
UPLOAD_DIR = "temp_uploads"
# 下载文件的目录
DOWNLOAD_DIR = "temp_downloads"
# 原始数据下载文件夹
DOWNLOAD_RAW_DIR = "temp_download_raw"
# word上传格式要求
ALLOWED_EXTENSIONS_DOC = {
"application/msword",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
}
# excel上传格式要求
ALLOWED_EXTENSIONS_EXCEL = {
"application/vnd.ms-excel",
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
}
# 日报的原始字典
data_dict = {}
# 总上传接口
@router.post(
"/upload/",
summary="可上传所有文件",
description="完成文件上传如果文件doc格式则转换成docx",
)
async def upload_file(files: List[UploadFile] = File(...)):
try:
if not os.path.exists(UPLOAD_DIR):
os.makedirs(UPLOAD_DIR)
# 如果有文件,则清空
if len(os.listdir(UPLOAD_DIR)) > 0:
for file in os.listdir(UPLOAD_DIR):
os.remove(os.path.join(UPLOAD_DIR, file))
logger.info(f"删除旧日报{file}")
# 保存到本地
for file in files:
logger.info(f"上传的文件有")
# 对文件名进行数据清洗
cleaned_filename = clean_file_names(file.filename)
logger.info(f"清洗后的文件名:{cleaned_filename}")
file_path = os.path.join(UPLOAD_DIR, cleaned_filename)
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# ---------------------------------------测试版本从doc转docx代码---------------------------------------
# 如果上传为doc需要转成docx
"""
if file_path.endswith(".doc"):
doc2docx(file_path)
logger.info(f"文件{file.filename}格式转换为docx成功")
"""
# ---------------------------------------测试版本从doc转docx代码---------------------------------------
# ---------------------------------------线上版本从doc转docx代码---------------------------------------
# 需要先全部写入等待后台shell脚本转完格式再遍历文件夹
final_files = os.listdir(UPLOAD_DIR)
for i, file_saved in enumerate(final_files):
if file_saved.endswith(".doc"):
file_doc = file_saved
# 新版本轮询新增判断条件,如果满足,也跳出循环,避免页面卡死
while True:
# 开始循环时间
start_time = time.time()
if not file_doc in os.listdir(UPLOAD_DIR):
break
# 如果超过20秒不仅跳出循环还需要清空文件夹
elif time.time() - start_time > 30:
# 清空文件夹
for file in os.listdir(UPLOAD_DIR):
os.remove(os.path.join(UPLOAD_DIR, file))
logger.info(f"删除旧文件,方便用户重新上传{file}")
break
else:
sleep(2)
return JSONResponse(
content={"status_code": 200, "detail": "文件上传并成功处理数据。"}
)
# ---------------------------------------线上版本从doc转docx代码---------------------------------------
# 保存文件到本地
except Exception as e:
logger.exception(f"文件上传失败:{e}")
return JSONResponse(content={"status_code": 500, "detail": f"文件上传失败{e}"})
@router.get(
"/generate_report/",
summary="生成日报",
description="生成日报,将生成的简报和日报文档转成html返回前端",
)
async def generate_report(background_tasks: BackgroundTasks):
global data_dict
try:
logger.info("开始生成日报")
# 下载文件的文件夹是否存在
if not os.path.exists(DOWNLOAD_DIR):
os.makedirs(DOWNLOAD_DIR)
# 存储文件的路径
fold_path = str(Path(UPLOAD_DIR).resolve()).replace("\\", "/")
data_dict = deal_docx(fold_path, DOWNLOAD_DIR)
# 判断是否生成日报成功如果成功则转成html返回前端
report_sim_html = docx2html(data_dict["daily_repo_simple"])
report_html = docx2html(data_dict["daily_report"])
logger.info("日程生成html成功")
# 将数据写入数据库
save_word_document(data_dict)
# 返回 JSON 包含 HTML 内容
return JSONResponse(
content={
"status_code": 200,
"detail": "日报生成成功",
"report_simple": report_sim_html,
"report": report_html,
}
)
except Exception as e:
logger.exception(f"日报生成失败:{e}")
return JSONResponse(
content={
"status_code": 500,
"detail": f"日报生成失败:请确认上传文件是否同一天或者文件格式是否发生改变",
}
)
# 将原始数据保存到数据库
finally:
try:
if os.listdir(UPLOAD_DIR):
raw_data_path = move_raw_files(
UPLOAD_DIR, DOWNLOAD_RAW_DIR, data_dict["save_folder"]
)
raw_data_dict = {
"report_title": data_dict["report_title"],
"raw_data_path": raw_data_path,
"statistics_time": data_dict["statistics_time"],
"save_folder": data_dict["save_folder"],
"created_at": datetime.now(),
"updated_at": datetime.now(),
}
save_raw_files(raw_data_dict)
print("原始文件存入mysql成功")
except Exception as e:
logger.exception(f"原始文件存入mysql失败{e}")
# 通过时间下载文件接口
@router.get(
"/download/",
summary="下载用户上传分析后的日报",
description="下载用户上传分析的当前日报",
)
async def download_file():
# 最终下载的日报名称
zip_name = "日报.zip"
# 是否有之前的文件
file_zip = os.path.join(f"{DOWNLOAD_DIR}/{data_dict['save_folder']}", zip_name)
# 有旧文件就删除
if os.path.exists(file_zip):
os.remove(file_zip)
try:
logger.info("开始下载文件")
file_info = get_file_path(data_dict["statistics_time"])
if not file_info:
logger.info("查询需下载的记录失败")
return None
logger.info("查询需下载的记录成功")
# 创建 ZIP 文件
with zipfile.ZipFile(file_zip, "w") as zipf:
logger.info("进入文件压缩阶段")
zipf.write(file_info.daily_repo_simple)
zipf.write(file_info.daily_report)
# zipf.write(file_info.daily_repo_simple_excel)
if os.path.exists(file_zip):
logger.info("文件下载成功")
# 返回 ZIP 文件
return FileResponse(
file_zip,
filename=zip_name,
media_type="application/zip",
)
else:
logger.info("压缩文件失败")
return JSONResponse(content={"status_code": 404, "detail": "文件不存在"})
except Exception as e:
logger.exception(f"下载文件失败:{e}")
return JSONResponse(content={"status_code": 500, "detail": "文件下载出错"})
@router.get(
"/files_path/",
summary="查询每周的日报文件全路径",
description="查询周报在磁盘的全路径",
)
async def download_weekly_file(start_time: datetime, end_time: datetime):
try:
logger.info("开始查询周报路径")
file_info = get_weekly_file(start_time, end_time)
if not file_info:
logger.info("查询周报路径失败")
return None
logger.info("查询周报路径成功")
file_info = [file_single.to_dict() for file_single in file_info]
# for file in file_info:
# file_info1 = file.daily_report
# print(file_info)
return JSONResponse(content={"status_code": 200, "detail": file_info})
except Exception as e:
logger.exception(f"查询周报路径失败:{e}")
return JSONResponse(content={"status_code": 500, "detail": "查询周报路径出错"})
@router.get(
"/raw_files_path/",
summary="查询原始文件全路径",
description="查询原始文件在磁盘的全路径",
)
async def download_raw_file(start_time: datetime, end_time: datetime):
try:
logger.info("开始查询原始文件路径")
file_info = get_raw_file(start_time, end_time)
if not file_info:
logger.info("无该时间段原始文件路径")
return None
logger.info("查询原始文件路径成功")
file_info = [file_single.to_dict() for file_single in file_info]
# for file in file_info:
# file_info1 = file.daily_report
# print(file_info)
return JSONResponse(content={"status_code": 200, "detail": file_info})
except Exception as e:
logger.exception(f"查询原始文件路径失败:{e}")
return JSONResponse(content={"status_code": 500, "detail": "查询原始文件出错"})
@router.get(
"/download/weekly_file/", summary="下载每周日报文件", description="下载每周日报文件"
)
async def download_files(file: str):
try:
if os.path.exists(file):
if file.endswith(".docx"):
# 单个word文件直接返回
return FileResponse(
file,
filename=file.split("/")[-1], # 下载时显示的文件名
media_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document",
)
elif file.endswith(".xlsx"):
# 单个excel文件直接返回
return FileResponse(
file,
filename=file.split("/")[-1], # 下载时显示的文件名
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
)
elif file.endswith(".xls"):
# 单个excel文件直接返回
return FileResponse(
file,
filename=file.split("/")[-1], # 下载时显示的文件名
media_type="application/vnd.ms-excel",
)
else:
return JSONResponse(content={"status_code": 404, "detail": "文件不存在"})
except Exception as e:
logger.exception(f"逐个下载日报出错:{e}")
return JSONResponse(content={"status_code": 500, "detail": "逐个下载日报出错"})

@ -0,0 +1,6 @@
from app.entity.database.base import Base
from app.entity.database.session import engine
from app.entity.models.PowerOutageStats import DailyReport
# 创建所有表
Base.metadata.create_all(bind=engine)

@ -0,0 +1,4 @@
from sqlalchemy.ext.declarative import declarative_base
# 创建基类
Base = declarative_base()

@ -0,0 +1,20 @@
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
# 数据库连接 URL根据实际情况修改用户名、密码、主机、端口和数据库名
# SQLALCHEMY_DATABASE_URL = "mysql+pymysql://root:ultrapower123@localhost:3306/ultra_me"
SQLALCHEMY_DATABASE_URL = "mysql+pymysql://root:ngsk0809cruise@localhost:33306/gcgj"
# 创建数据库引擎
engine = create_engine(SQLALCHEMY_DATABASE_URL)
# 创建会话工厂
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# 依赖函数,用于获取数据库会
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

@ -0,0 +1,118 @@
from sqlalchemy import Column, Integer, TIMESTAMP, DateTime, String, JSON
from sqlalchemy import func as func_sql
from app.entity.database.base import Base
# class PowerOutageStats(Base):
# """
# 停电统计数据表 ORM 模型
# """
# __tablename__ = 'power_outage_stats' # 表名
#
# # 定义字段
# id = Column(Integer, primary_key=True, autoincrement=True, comment='主键ID')
# province_company = Column(String(100), nullable=False, comment='省公司')
# outage_users = Column(Integer, nullable=False, comment='停电用户数')
# outage_ratio = Column(String(100), nullable=False,comment='停电环比(百分比)')
# short_outage_users = Column(Integer, comment='短时停电用户数')
# outage_user_ratio = Column(String(100), nullable=False,comment='停电用户占本单位比例(百分比)')
# repair_count = Column(Integer, comment='故障抢修数')
# repair_arrival_time = Column(String(100), nullable=False,comment='故障抢修到位时间(小时)')
# repair_completion_time = Column(String(100), nullable=False, comment='故障抢修完成时间(小时)')
# complaint_count = Column(Integer, comment='投诉数量')
# complaint_ratio = Column(String(100), nullable=False,comment='投诉环比(百分比)')
# public_opinion_count = Column(Integer, comment='舆情数量')
# public_opinion_ratio = Column(String(100), nullable=False,comment='舆情环比(百分比)')
# major_event_count = Column(Integer, comment='重大事件数量')
# statistics_time = Column(Date, nullable=False,comment='统计时间')
# created_at = Column(TIMESTAMP, server_default=func_sql.now(), comment='记录创建时间')
# updated_at = Column(TIMESTAMP, server_default=func_sql.now(), onupdate=func_sql.now(), comment='记录更新时间')
# 日报存储路径
class DailyReport(Base):
"""
将日报/简报以二进制的形式保存到数据库
"""
__tablename__ = "daily_report"
# 定义字段
id = Column(Integer, primary_key=True, autoincrement=True, comment="主键ID")
report_title = Column(String(100), nullable=False, comment="日报标题")
daily_report = Column(String(100), nullable=False, comment="日报保存路径")
daily_repo_simple = Column(String(100), nullable=False, comment="简报保存路径")
save_folder = Column(String(100), nullable=False, comment="保存的子文件夹")
statistics_time = Column(
DateTime, nullable=False, comment="统计时间", unique=True, index=True
)
created_at = Column(
TIMESTAMP, server_default=func_sql.now(), comment="记录创建时间"
)
updated_at = Column(
TIMESTAMP,
server_default=func_sql.now(),
onupdate=func_sql.now(),
comment="记录更新时间",
)
# daily_repo_simple_excel = Column(String(100), nullable=False,comment='简报excel保存路径')
# img = Column(String(100), nullable=False,comment='图片保存路径')
def to_dict(self):
return {
"id": self.id,
"report_title": self.report_title,
"daily_report": self.daily_report,
"daily_repo_simple": self.daily_repo_simple,
"save_folder": self.save_folder,
"statistics_time": self.statistics_time.strftime("%Y-%m-%d %H:%M:%S"),
"created_at": self.created_at.strftime("%Y-%m-%d %H:%M:%S"),
"updated_at": self.updated_at.strftime("%Y-%m-%d %H:%M:%S"),
# 'daily_repo_simple_excel':self.daily_repo_simple_excel,
# 'img':self.img
}
# 原始文件存储路径
class DailyReportRawdata(Base):
"""
将日报/简报以二进制的形式保存到数据库
"""
__tablename__ = "daily_report_rawdata"
# 定义字段
id = Column(Integer, primary_key=True, autoincrement=True, comment="主键ID")
report_title = Column(String(100), nullable=False, comment="日报标题")
save_folder = Column(String(100), nullable=False, comment="保存的子文件夹")
# sentiment_doc = Column(String(100), nullable=False,comment='舆情word原始文件保存路径')
# complaint_doc = Column(String(100), nullable=False,comment='投诉word原始文件保存路径')
# complaint_tb = Column(String(100), nullable=False,comment='投诉excel原始文件保存路径')
# power_off_tb = Column(String(100), nullable=False,comment='停电excel原始文件保存路径')
raw_data_path = Column(JSON, nullable=False, comment="原始文件保存路径")
statistics_time = Column(
DateTime, nullable=False, comment="统计时间", unique=True, index=True
)
created_at = Column(
TIMESTAMP, server_default=func_sql.now(), comment="记录创建时间"
)
updated_at = Column(
TIMESTAMP,
server_default=func_sql.now(),
onupdate=func_sql.now(),
comment="记录更新时间",
)
# daily_repo_simple_excel = Column(String(100), nullable=False,comment='简报excel保存路径')
# img = Column(String(100), nullable=False,comment='图片保存路径')
def to_dict(self):
return {
"id": self.id,
"report_title": self.report_title,
"save_folder": self.save_folder,
"raw_data_path": self.raw_data_path,
"statistics_time": self.statistics_time.strftime("%Y-%m-%d %H:%M:%S"),
"created_at": self.created_at.strftime("%Y-%m-%d %H:%M:%S"),
"updated_at": self.updated_at.strftime("%Y-%m-%d %H:%M:%S"),
# 'daily_repo_simple_excel':self.daily_repo_simple_excel,
# 'img':self.img
}

@ -0,0 +1,31 @@
{
"version": 1.0,
"disable_existing_loggers": false,
"formatters": {
"standard": {
"format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
}
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"level": "INFO",
"formatter": "standard",
"stream": "ext://sys.stdout"
},
"file": {
"class": "logging.FileHandler",
"level": "DEBUG",
"formatter": "standard",
"filename": "app.log",
"mode": "a"
}
},
"loggers": {
"": {
"handlers": ["console", "file"],
"level": "DEBUG",
"propagate": true
}
}
}

@ -0,0 +1,24 @@
import json
import logging
import logging.config
import os
def setup_logging(
default_path="logging.conf", default_level=logging.INFO, env_key="LOG_CFG"
):
"""Setup logging configuration"""
path = default_path
value = os.getenv(env_key, None)
if value:
path = value
if os.path.exists(path):
with open(path, "rt") as f:
config = json.load(f)
logging.config.dictConfig(config)
else:
logging.basicConfig(level=default_level)
if __name__ == "__main__":
setup_logging()

Binary file not shown.

Binary file not shown.

@ -0,0 +1,106 @@
# from datetime import date
# from sqlalchemy.orm import Session
# from openpyxl import load_workbook
# from pathlib import Path
#
# import pandas as pd
# import os
# import logging
#
# from app.entity.database.session import get_db
# from app.entity.models.PowerOutageStats import PowerOutageStats
#
# # 获取日志记录器
# logger = logging.getLogger(__name__)
#
# # 三份累计表
# def accumulated_statistics(start_time, end_time, save_path=None):
#
# try:
# logger.info('对数据库的查询结果进行处理,完成三个累计表数据的组装')
#
# db: Session = next(get_db())
# # 查询某个时间段的数据
# results = db.query(PowerOutageStats.province_company,PowerOutageStats.outage_users,PowerOutageStats.short_outage_users,
# PowerOutageStats.repair_count,PowerOutageStats.complaint_count,PowerOutageStats.public_opinion_count,
# PowerOutageStats.major_event_count, PowerOutageStats.statistics_time)
#
# # Excel 模板路径
# # 获取当前文件夹路径
# current_path = Path(__file__).parent
# templates_path = str(os.path.join(current_path.parent, 'templates')).replace('\\', '/')
#
# # 加载 Excel 模板
# book = load_workbook(f'{templates_path}/累计数据模板.xlsx')
#
# # 选择要写入的 Sheet 页
# sheet_name = 'Sheet1' # 替换为你的 Sheet 页名称
# sheet = book[sheet_name]
#
# # 查询结果用pandas进行处理
# if results:
#
# # 将数据转成pandas数据结构
# df = pd.read_sql(results.statement, results.session.bind)
# # 插入序号列作为第一列
# df.insert(0, 'num', df.index + 1)
#
# # 组成表1数据
# df_temp = df[(df['statistics_time'] >= start_time) & (df['statistics_time'] <= end_time)]
# df_table1 = df_temp[['statistics_time', 'outage_users', 'complaint_count','public_opinion_count']]
# df_table1 = df_table1.groupby('statistics_time').sum()
# df_table1 = df_table1.reset_index()
#
# # 表1写入excel的位置
# start_row1 = 3
# start_col1 = 1
# print(df_table1)
# write_to_excel(df_table1,sheet,start_row1,start_col1)
#
#
# # 组成表2数据
# df_table2 = df_temp[['statistics_time', 'outage_users', 'short_outage_users', 'repair_count','complaint_count','public_opinion_count']]
# df_table2 = df_table2.groupby('statistics_time').sum()
# df_table2 = df_table2.reset_index()
#
# # 表2写入excel的位置
# start_row2 = 3
# start_col2 = 6
# print(df_table2)
# write_to_excel(df_table2,sheet,start_row2,start_col2)
#
# # 表3写入excel的位置
# start_row3 = 3
# start_col3 = 13
# df_table3 = df.drop('statistics_time', axis=1)
# write_to_excel(df_table3,sheet,start_row3,start_col3)
#
# # 最终结果生成
# book.save(f'{save_path}/累积统计表.xlsx')
# except Exception as e:
# logger.error(f'写入excel失败: {e}')
# raise e
#
# #对三张表进行组装
# def write_to_excel(df, sheet, start_row, start_col):
#
# try:
# logger.info('开始写入excel')
# # 将 DataFrame 写入指定位置
# for i, row in enumerate(df.itertuples(index=False), start=start_row):
# for j, value in enumerate(row, start=start_col):
# sheet.cell(row=i, column=j, value=value)
# except Exception as e:
# logger.error(f'写入excel失败: {e}')
# raise e
#
#
# if __name__ == '__main__':
#
#
# start_time = date(2025,3,9)
# end_time = date(2025,3,10)
# print(end_time)
# accumulated_statistics(start_time, end_time)
#
#

@ -0,0 +1,68 @@
import logging
from bs4 import BeautifulSoup
# 获取日志记录器
logger = logging.getLogger(__name__)
# 对日报的html的添加style方便前端渲染
# 日报添加style
def beautiful_report(html):
try:
logger.info("开始给日报添加style")
soup = BeautifulSoup(html, "lxml")
# 找到所有 <p> 标签设置对应的样式
list_p = soup.find_all("p")
for i in range(len(list_p)):
if i == 0:
list_p[i]["class"] = "title"
if i == 1:
list_p[i]["class"] = "subtitle"
# 第3、6、8、10、11、12的标签的字体一致
if i == 2 or i == 5 or i == 7 or i == 9 or i == 11:
list_p[i]["class"] = f"point{i}"
# 第3、4、681012
if i == 3 or i == 4 or i == 6 or i == 8 or i == 10 or i == 12:
list_p[i]["class"] = f"content{i}"
if i == 14 or i == 15 or i == 16 or i == 17:
list_p[i]["class"] = f"table_title{i}"
# 给表格设置样式,由于表格样式统一,因此不用单独设置
list_tables = soup.find_all("table")
# 为每个表格设置统一样式
for i, table in enumerate(list_tables):
# 设置表格整体样式
table["class"] = f"table{i}"
# # 设置表头样式
# for th in table.find_all('th'):
# th['style'] = 'background-color: #4CAF50; color: white; font-weight: bold; padding: 10px; text-align: center;'
#
# # 设置表格行样式
# for tr in table.find_all('tr'):
# tr['style'] = 'border-bottom: 1px solid #ddd;'
#
# # 设置表格单元格样式
# for td in table.find_all('td'):
# td['style'] = 'border: 1px solid #000; padding: 8px; text-align: left; width: 150px; height: 45px;text-align: center;'
html = soup.prettify()
return html
except Exception as e:
logger.exception(f"给日报添加style的方法执行失败{e}")
if __name__ == "__main__":
test_path = r"E:\work_data\work\test_result\日报的html.html"
beautiful_report(test_path)

@ -0,0 +1,16 @@
import re
# 针对文件名进行数据清洗
def clean_file_names(filename: str):
"""
针对文件名进行数据清洗
:param filename:
:return:
"""
# 移除所有非字母、数字、点和下划线的字符
cleaned = re.sub(r"[^\w.-]|[\s\r\n]*", "", filename)
# 确保文件名不为空
if not cleaned:
cleaned = "untitled"
return cleaned

@ -0,0 +1,122 @@
import re
# text = "6月15日17时至6月16日17时期间全网累计停电132.59万户次5分钟以内短时停电用户23.48万户次环比减少68.28万户次其中重要用户停电0户次用户停电情况总体平稳。"
def count_change_outage(text):
# 匹配数字和“万”单位
pattern = r"(\d+\.\d+)万"
matches = re.findall(pattern, text)
# 提取累计停电、短时停电和环比变化用户数
total_outage = float(matches[0]) # 累计停电用户数
short_term_outage = float(matches[1]) # 短时停电用户数
change_outage = float(matches[2]) # 环比变化用户数
# 判断是增加还是减少
if "减少" in text:
result = change_outage / (total_outage + change_outage)
type = "减少"
elif "增加" in text:
result = change_outage / (total_outage - change_outage)
type = "增加"
else:
result = None # 或者其他默认值
if result is not None:
percentage = f"{result * 100:.2f}%"
print(f"计算结果:{percentage}")
else:
print("未找到增加或减少的关键字")
short_percentage = f"{short_term_outage / total_outage * 100:.2f}%"
# 匹配“重要用户停电”后面的数字
pattern = r"重要用户停电(\d+)户"
match = re.search(pattern, text)
if match:
important_stop_outage = match.group(1)
print(f"重要用户停电户次:{result}")
else:
important_stop_outage = "0"
print("未找到重要用户停电户次")
return (
total_outage,
short_term_outage,
change_outage,
percentage,
short_percentage,
important_stop_outage,
type,
)
# count_change_outage(text)
def count_outage_sentiment(text):
print("开始分析舆情数据:")
print(text)
# text = "全网监测到涉电力供应类舆情风险信息11条环比减少2条"
# text = "涉电力供应类舆情风险信息22条环比持平。其中1条为官方媒体发布其余21条均为个人账号发布。"
# 使用正则表达式匹配数字和关键词
pattern = r"信息(\d+)条,环比(增加|减少)(\d)条"
pattern_equal = r"信息(\d+)条,环比持平"
match = re.search(pattern, text)
match_equal = re.search(pattern_equal, text)
num1 = ""
change = ""
num2 = ""
result = ""
if match:
num1 = int(match.group(1)) # 第一个数字,如 11
change = match.group(2) # 变化类型,如 “减少” 或 “增加”
num2 = int(match.group(3)) # 第二个数字,如 2
if change == "减少":
result = f"{num2 / (num1 + num2) * 100:.2f}%"
num2 = num1 + num2
elif change == "增加":
result = f"{num2 / (num1 - num2) * 100:.2f}%"
num2 = num1 - num2
else:
result = None # 如果不是增加或减少,可以处理成其他情况
print(f"第一个数字:{num1}")
print(f"变化类型:{change}")
print(f"第二个数字:{num2}")
if result is not None:
print(f"计算结果:{result}")
else:
print("变化类型未知,无法计算")
# update:2025-07-08 增加持平
elif match_equal:
num1 = int(match_equal.group(1))
change = "持平"
num2 = int(match_equal.group(1))
result = ""
# change = match_equal.group(2)
else:
pattern = r"信息(\d+)条,同比(增加|减少)(\d+)条"
match = re.search(pattern, text)
if match:
num1 = int(match.group(1)) # 第一个数字,如 11
change = match.group(2) # 变化类型,如 “减少” 或 “增加”
num2 = int(match.group(3)) # 第二个数字,如 2
if change == "减少":
result = f"{num2 / (num1 + num2) * 100:.2f}%"
num2 = num1 + num2
elif change == "增加":
result = f"{num2 / (num1 - num2) * 100:.2f}%"
num2 = num1 - num2
else:
result = None # 如果不是增加或减少,可以处理成其他情况
print("未匹配到符合条件的内容")
return num1, change, num2, result

@ -0,0 +1,203 @@
import pandas as pd
import numpy as np
import logging
import re
from .effective_cities import effective_cities
# 获取日志记录器
logger = logging.getLogger(__name__)
# 获取省份统计结果及地市统计结果,仅统计个数情况
def deal_excel(start_time, end_time, file_path):
try:
logger.info("开始分析停电excel")
logger.info(f"开始分析:{start_time}")
logger.info(f"结束时间:{end_time}")
# 获取所有sheet页名称
excel_file = pd.ExcelFile(file_path)
sheet_names = excel_file.sheet_names
pattern_sheet = r"(2025年?投诉明细|投诉明细)[\(\s]*供电类[\)\s]*"
# 使用正则表达式进行模糊匹配(不区分大小写)
matched_sheets = [
sheet
for sheet in sheet_names
if re.fullmatch(pattern_sheet, sheet, re.IGNORECASE)
]
if len(matched_sheets) == 1:
final_sheet = matched_sheets[0]
else:
logger.error("没有找到匹配的sheet页")
return None
df = pd.read_excel(
file_path,
sheet_name=final_sheet,
skiprows=1,
)
# 将时间列转成字符串
df["time"] = df["受理时间"].astype(str)
# ---------------------------------------去点中间或两侧空格---------------------------------
df[""] = df[""].str.strip().str.replace(r"\s+", "", regex=True)
df["地市"] = df["地市"].str.strip().str.replace(r"\s+", "", regex=True)
df["县区"] = df["县区"].str.strip().str.replace(r"\s+", "", regex=True)
df["一级分类"] = df["一级分类"].str.strip().str.replace(r"\s+", "", regex=True)
df["time"] = (
df["time"]
.str.strip()
.str.replace(r"-|年|月|日|\.|时|分|秒|点", "/", regex=True)
)
# ---------------------------------------去点中间或两侧空格---------------------------------
# 通过字符串功能格式化时间
# df['time'] = df['time'].str.replace(r'-|年|月|日|\.|时|分|秒|点', '/', regex=True)
# 转成date方便比较
df["datetime"] = pd.to_datetime(df["time"])
# 开始时间和结束时间
# start_time = datetime(2025, 3, 5, 17, 0, 0)
# end_time = datetime(2025, 3, 6, 17, 0, 0)
# 拿到供电质量在当天的数据
df = df[
(df["datetime"] > start_time)
& (df["datetime"] <= end_time)
& (df["一级分类"] == "供电质量")
]
print(f"只通过时间筛选的数据行数{len(df)}")
# 对省份数据进行清洗
province_list = ["广东", "广西", "云南", "贵州", "海南", "深圳"]
# 省份正则
province_pattern = "|".join(province_list)
# 对省份数据进行清洗
df[""] = df[""].apply(
lambda x: re.search(province_pattern, x).group()
if re.search(province_pattern, x)
else ""
)
# 删除省份为空的值
df = df[df[""] != ""]
# 判断数据区里面是否有深圳
df["地市"] = df["地市"].astype(str)
# df.loc[df['地市'].str.contains('深圳|罗湖|福田|南山|宝安|龙岗|盐田|龙华|坪山|光明|大鹏'), '省'] = '深圳'
# 条件1b 字段匹配正则
mask_b = df["地市"].str.contains(
"深圳|罗湖|福田|南山|宝安|龙岗|盐田|龙华|坪山|光明|大鹏",
regex=True,
na=False,
)
# 条件2a 字段是特定值(例如 a=1 或 a=3
mask_a = df[""].isin(["广东", "深圳"]) # 替换条件a=1 或 a=3
# 最终条件b 匹配正则 且 a 在允许范围内
final_mask = mask_b & mask_a
# 执行替换
df.loc[final_mask, ""] = "深圳"
# 对数据按照’省‘进行分类汇总
group_province = df.groupby("")
province_statistics = {
"广东": 0,
"广西": 0,
"云南": 0,
"贵州": 0,
"海南": 0,
"深圳": 0,
}
# 保存省份统计的数据到字典
province_temp = group_province.size().to_dict()
# 最终当天省份的统计数据,利用update更新旧字典
province_statistics.update(province_temp)
# 地市处理逻辑
# 81地市优化函数,非81地市的省份要不要算进去
df = effective_cities(df)
# 对数据按照区进行分组汇总
# 首先对省和地区进行拼接
# 替换掉'供电局'字样
df["地市"] = df["地市"].str.replace("供电局", "")
# 对非深圳的地市,拼接省的信息
df.loc[df[""] != "深圳", "地市"] = df[""] + "" + df["地市"]
# 按照地市进行分组统计
group_district = df.groupby("地市")
# 将地市的统计数据保存到字典
district_statistics = group_district.size().to_dict()
# 对数据进行降序排列
district_statistics = sorted(
district_statistics.items(), key=lambda x: x[1], reverse=True
)
return province_statistics, district_statistics
except Exception as e:
logger.exception(f"对数据按照’省‘进行分类汇总{e}")
# 判断地市top5环比方法
def top5_dod_analysis(top, district_stat_before):
try:
logger.info("开始分析地市top5环比")
if top[0] in district_stat_before.keys():
top_dod = top[1] - district_stat_before[top[0]]
if top_dod > 0:
top_dod = "+" + str(top_dod)
return top_dod
elif top_dod == 0:
top_dod = "持平"
return top_dod
else:
return top_dod
else:
top_dod = "+" + str(top[1])
return top_dod
except KeyError:
logger.exception(f"判断地市top5环比{KeyError}")
except AttributeError:
logger.exception(f"判断地市top5环比{AttributeError}")
except ArithmeticError:
logger.exception(f"判断地市top5环比{ArithmeticError}")
# 判断省份环比
# def province_dod_analysis(before:dict, after:dict):
#
# dod = sum(after.values()) - sum(before.values())
#
# if dod > 0:
# dod = '+' + str(dod)
# return dod
# elif dod == 0:
# dod = '持平'
# return dod
# 将统计出来的省份数据或者排名前五的数据、环比组成列表,再转至(行和列互换),方便写入表格
def transform_data(data):
try:
logger.info("开始将统计出来的数据转至(行和列互换),方便写入表格")
# 使用 NumPy 实现行转列
transposed_data = np.array(data).transpose().tolist()
return transposed_data
except Exception as e:
logger.exception(f"将统计出来的数据转至(行和列互换),方便写入表格{e}")

@ -0,0 +1,47 @@
import subprocess
import os
import logging
# 获取日志记录器
logger = logging.getLogger(__name__)
def doc2docx(input_path, output_dir=None):
logger.info("开始将doc转成docx")
# 如果未指定输出目录,则使用输入文件所在目录
if output_dir is None:
output_dir = os.path.dirname(input_path)
# 构建输出文件路径
output_file = os.path.join(
output_dir, os.path.splitext(os.path.basename(input_path))[0] + ".docx"
)
# LibreOffice 命令行工具
libreoffice_cmd = "soffice"
# 构建转换命令
command = [
libreoffice_cmd,
"--headless", # 无界面模式
"--convert-to",
"docx", # 转换为 docx
"--outdir",
output_dir, # 输出目录
input_path, # 输入文件
]
# 执行命令
try:
subprocess.run(command, check=True)
print(f"转换成功: {output_file}")
return output_file
except subprocess.CalledProcessError as e:
print(f"转换失败: {e}")
return None
if __name__ == "__main__":
input_path = "./南方电网公司“抢修、投诉、舆情”三工单联动监测日报3月6日.doc" # 替换为你的 .doc 文件路径
doc2docx(input_path)

@ -0,0 +1,159 @@
import logging
from datetime import datetime
# import io
# from datetime import date
from docx import Document
from sqlalchemy.orm import Session
from sqlalchemy.dialects.mysql import insert
# from io import BytesIO
from app.entity.database.session import get_db
from app.entity.models.PowerOutageStats import DailyReport, DailyReportRawdata
# 获取日志记录器
logger = logging.getLogger(__name__)
# 将数据保存到数据库
def save_word_document(data_dict):
try:
logger.info("开始写入mysql")
# 获取数据库连接
db: Session = next(get_db())
stmt = (
insert(DailyReport)
.values(**data_dict)
.on_duplicate_key_update(statistics_time=data_dict["statistics_time"])
)
result = db.execute(stmt)
db.commit()
logger.info(f"数据写入数据库成功,受影响的行数:{result.rowcount}")
return {"status": "success", "affected_rows": result.rowcount}
except Exception as e:
print(f"日报文档路径写入数据库失败{e}")
# 原始文件保存路径到数据库
# 将数据保存到数据库
def save_raw_files(data_dict):
try:
logger.info("开始写入mysql")
# 获取数据库连接
db: Session = next(get_db())
stmt = (
insert(DailyReportRawdata)
.values(**data_dict)
.on_duplicate_key_update(statistics_time=data_dict["statistics_time"])
)
result = db.execute(stmt)
db.commit()
logger.info(f"数据写入数据库成功,受影响的行数:{result.rowcount}")
return {"status": "success", "affected_rows": result.rowcount}
except Exception as e:
print(f"原数据文档路径写入数据库失败{e}")
def get_file_path(statistics_time: datetime):
try:
logger.info("开始查询需下载的记录")
db: Session = next(get_db())
print(f"statistics_time: {statistics_time}, type: {type(statistics_time)}")
file_info = (
db.query(DailyReport)
.filter(DailyReport.statistics_time == statistics_time)
.first()
)
query = db.query(DailyReport).filter(
DailyReport.statistics_time == statistics_time
)
print(query.statement.compile(compile_kwargs={"literal_binds": True}))
all_statistics_times = db.query(DailyReport.statistics_time).all()
print(f"All statistics_time in DB: {all_statistics_times}")
if not file_info:
logger.info("查询需下载的记录失败")
return None
logger.info("查询需下载的记录成功")
return file_info
except Exception as e:
logger.exception(f"查询需下载的记录失败:{e}")
return None
def get_weekly_file(start_time: datetime, end_time: datetime):
try:
logger.info("开始查询周报路径")
db: Session = next(get_db())
file_info = (
db.query(DailyReport)
.filter(
DailyReport.statistics_time >= start_time,
DailyReport.statistics_time <= end_time,
)
.all()
)
if not file_info:
logger.info("无该时间段周报路径数据")
return None
logger.info("查询周报路径成功")
return file_info
except Exception as e:
logger.exception(f"查询周报路径失败:{e}")
return None
# 原始文件的路径
def get_raw_file(start_time: datetime, end_time: datetime):
try:
logger.info("开始查询原始文件路径")
db: Session = next(get_db())
file_info = (
db.query(DailyReportRawdata)
.filter(
DailyReportRawdata.statistics_time >= start_time,
DailyReportRawdata.statistics_time <= end_time,
)
.all()
)
if not file_info:
logger.info("无该时间段原始文件路径")
return None
logger.info("查询原始文件路径成功")
return file_info
except Exception as e:
logger.exception(f"查询原始文件路径失败:{e}")
return None
if __name__ == "__main__":
file_path = r"E:\work_data\work\三工单日报\三工单\20250311\20250311日报\公司全国“两会”保供电期间配网设备运行及三工单监测日报-20250311.docx"
doc1 = Document(file_path)
# print(callable(save_word_document(doc1,2025,3,11)))

@ -0,0 +1,150 @@
import mammoth
import logging
from docx import Document
from bs4 import BeautifulSoup
from app.tools.beautiful_html import beautiful_report
# 获取日志记录器
logger = logging.getLogger(__name__)
# 接口目的为了给前端返回的数据直接是html
def docx2html(file_path):
try:
logger.info("进入解析后的html的单元格合并的主方法")
# 原docx转成的html其中的合并单元格解析错误
original_html = all_to_html(file_path)
# word所有table的html列表
table_new_list = table_to_html(file_path)
# 使用BeautifulSoup解析这两个HTML内容
original_soup = BeautifulSoup(original_html, "html.parser")
table_old_list = original_soup.find_all("table")
if len(table_old_list) == len(table_new_list):
for i in range(len(table_old_list)):
# 调用合并单元格方法
table_old_list[i].replace_with(
merge_cells(
BeautifulSoup(table_new_list[i], "html.parser").find("table")
)
)
html = original_soup.prettify()
return beautiful_report(html)
except Exception as e:
logger.exception(f"合并单元格主方法执行失败:{e}")
# 将docx解析成html此步骤不管表格是否解析正确
def all_to_html(docx_file):
try:
logger.info("进入通用docx转html方法此时单元格未合并")
with open(docx_file, "rb") as docx_file:
result = mammoth.convert_to_html(docx_file)
html = result.value
return html
except Exception as e:
logger.exception(f"通用docx转html方法执行失败{e}")
# 正确解析word中有合并单元格的表格
def table_to_html(docx_file):
try:
logger.info("进入正确解析合并的单元格的方法")
document = Document(docx_file)
# 将四个表格放到列表里
table_list = []
for table in document.tables:
html = "<table>"
for row in table.rows:
html += "<tr>"
for cell in row.cells:
# 这里需要额外逻辑来计算 colspan 和 rowspan
# python-docx 并不直接提供合并单元格跨越的行列数,需要自行计算
colspan = 1 # 示例值,实际应用中需替换为正确的计算逻辑
rowspan = 1 # 同上
html += (
f"<td colspan='{colspan}' rowspan='{rowspan}'>{cell.text}</td>"
)
html += "</tr>"
html += "</table>"
table_list.append(html)
return table_list
except Exception as e:
logger.exception(f"正确解析合并的单元格的方法执行失败:{e}")
# 合并单元格方法
def merge_cells(table):
try:
logger.info("进入合并单元格的方法")
# 获取前两行
rows = table.find_all("tr")[:2]
# 记录需要移除的单元格位置
merge_map = {}
# 遍历每一行
for row_idx, row in enumerate(rows):
cells = row.find_all(["th", "td"])
for col_idx, cell in enumerate(cells):
current_cell_text = cell.get_text(strip=True)
colspan = 1
rowspan = 1
# 检查右侧是否有相同文本的单元格
j = col_idx + 1
while (
j < len(cells)
and cells[j].get_text(strip=True) == current_cell_text
):
colspan += 1
# 标记这些单元格将被移除
merge_map[(row_idx, j)] = None
j += 1
# 检查下方是否有相同文本的单元格
i = row_idx + 1
while i < len(rows):
if (
col_idx >= len(rows[i].find_all(["th", "td"]))
or rows[i].find_all(["th", "td"])[col_idx].get_text(strip=True)
!= current_cell_text
):
break
rowspan += 1
# 标记这些单元格将被移除
merge_map[(i, col_idx)] = None
i += 1
if colspan > 1 or rowspan > 1:
if colspan > 1:
cell["colspan"] = str(colspan)
if rowspan > 1:
cell["rowspan"] = str(rowspan)
# 删除标记为要移除的单元格
for (row_idx, cell_idx), _ in sorted(merge_map.items(), reverse=True):
try:
rows[row_idx].find_all(["th", "td"])[cell_idx].decompose()
except IndexError:
continue
return table
except Exception as e:
logger.exception(f"合并单元格的方法执行失败:{e}")
if __name__ == "__main__":
docx_file = r"E:\work_data\work\三工单日报\20250311\20250311日报\公司全国“两会”保供电期间配网设备运行及三工单监测日报-20250311.docx"
docx2html(docx_file)

@ -0,0 +1,241 @@
# -*- coding: utf-8 -*-
import matplotlib
matplotlib.use("agg")
import matplotlib.pyplot as plt
import numpy as np
import os
from datetime import datetime
from matplotlib.font_manager import FontProperties
# # 数据
# data = {
# "停电用户\n万户": {"昨天": 200.87, "今天": 132.59},
# "过载配变\n": {"昨天": 126, "今天": 119},
# "95598供电类投诉\n": {"昨天": 18, "今天": 12},
# "涉电力供应类舆情风险信息\n": {"昨天": 79, "今天": 40}
# }
def plot_electricity_comparison(year, month, day, data):
year = int(year)
month = int(month)
day = int(day)
# # 设置中文字体
plt.rcParams["font.sans-serif"] = [
"Microsoft YaHei"
] # 字体设置,用来正常显示中文标签
plt.rcParams["axes.unicode_minus"] = False # 用来正常显示负号
# # 创建一个大图形1行4列的子图布局
# fig, axs = plt.subplots(1, 4, figsize=(8, 4)) # 1行4列的子图布局
# 定义横轴标签
categories = ["昨天", "今天"]
x = np.arange(len(categories))
# 计算变化百分比
def calculate_change_percentage(yesterday, today):
return ((today - yesterday) / yesterday) * 100
# 检查数据完整性并过滤掉不完整的数据
valid_data = {}
for title, values in data.items():
if "昨天" in values and "今天" in values:
if values["昨天"] is not None and values["今天"] is not None:
valid_data[title] = values
# 如果没有有效的数据,返回 None 或其他指示
if not valid_data:
return None # 没有有效的数据,不生成图片
# 根据有效数据的数量动态创建子图布局
num_valid_data = len(valid_data)
fig, axs = plt.subplots(
1, num_valid_data, figsize=(2 * num_valid_data, 4)
) # 动态调整布局
# 如果只有一个子图axs 是一个单个的 Axes 对象而不是数组,需要将其转换为列表
if num_valid_data == 1:
axs = [axs]
# 绘制每个子图
for i, (title, values) in enumerate(valid_data.items()):
ax = axs[i] # 获取当前子图
y = list(values.values())
# 将蓝色柱子改为暗蓝色
bars = ax.bar(x, y, color=["#1E3A8A", "#FF8C00"], width=0.6)
# 设置子图标题和坐标轴标签
ax.set_title(
title, fontsize=12, fontweight="bold", color="#00008B"
) # 设置标题字体加粗深蓝色
# 设置坐标轴标签字体加粗深蓝色
ax.set_xticks(x)
ax.set_xticklabels(categories, fontsize=10, fontweight="bold", color="#00008B")
# 动态设置纵坐标范围
max_y = max(y) * 1.2 # 增加20%的范围
ax.set_ylim(0, max_y)
# 隐藏纵轴刻度线
ax.tick_params(axis="y", length=0)
ax.tick_params(axis="x", length=0)
# 添加自定义的淡颜色细长分割线
for y_tick in ax.get_yticks():
ax.axhline(y=y_tick, color="#87CEEB", linestyle="--", alpha=0.3)
# 设置刻度标签字体加粗深蓝色
ax.tick_params(axis="y", labelsize=12, labelcolor="#00008B")
# 添加数据标签
for bar in bars:
height = bar.get_height()
# 根据柱子颜色设置数据标签颜色
if bar == bars[0]:
color = "#1E3A8A" # 暗蓝色
else:
color = "#FF8C00" # 暗橙色
ax.text(
bar.get_x() + bar.get_width() / 2,
height,
f"{height}",
ha="center",
va="bottom",
fontsize=10,
fontweight="bold",
color=color,
)
# 添加变化百分比和箭头
change_percent = calculate_change_percentage(y[0], y[1])
# 根据变化百分比设置符号和颜色
if change_percent < 0:
symbol = "\u25bc" # 倒三角
color = "#006400" # 深绿色
# 调整箭头起始点和终点位置:从柱子的边角开始指向边角
bar0_height = bars[0].get_height()
bar1_height = bars[1].get_height()
ax.annotate(
"",
xy=(x[1] - bars[1].get_width() / 2+0.1, bar1_height),
#xy=(x[1], bar1_height),
#xytext=((x[0] + bars[0].get_width() / 2) + 0.05, bar0_height * 0.95),
xytext=((x[0] + bars[0].get_width() / 2), bar0_height),
arrowprops=dict(
arrowstyle="-|>",
mutation_scale=20, # 箭头大小
connectionstyle="arc3,rad=-0.4", # 调整为负值,箭头凸起
color="#FFD580",
linewidth=3,
capstyle='round',
joinstyle='round'
),
) # 浅橙色箭头,加粗
# 在子图中间显示变化百分比
ax.text(
0.5,
0.9,
f"{symbol}{abs(change_percent):.2f}%",
ha="center",
va="center",
transform=ax.transAxes,
fontsize=12,
fontweight="bold",
color=color,
)
elif change_percent > 0:
symbol = "\u25b2" # 正三角
color = "#FF0000" # 红色
# 调整箭头起始点和终点位置:从柱子的边角开始指向边角
bar0_height = bars[0].get_height()
bar1_height = bars[1].get_height()
ax.annotate(
"",
#xy=(x[1] - bars[1].get_width() / 2, bar1_height),
xy=(x[1] - bars[1].get_width() / 2, bar1_height),
#xytext=((x[0] + bars[0].get_width() / 2) + 0.05, bar0_height),
xytext=((x[0] + bars[0].get_width() / 2), bar0_height),
arrowprops=dict(
arrowstyle="-|>",
mutation_scale=20, # 箭头大小
connectionstyle="arc3,rad=0.4", # 调整为负值,箭头凸起
color="#FFD580",
linewidth=3,
),
) # 浅橙色箭头,加粗
# 在子图中间显示变化百分比
ax.text(
0.5,
0.9,
f"{symbol}{abs(change_percent):.2f}%",
ha="center",
va="center",
transform=ax.transAxes,
fontsize=12,
fontweight="bold",
color=color,
)
else:
symbol = ""
color = "#FFA500" # 橙色
# 调整箭头起始点和终点位置:从柱子的边角开始指向边角
bar0_height = bars[0].get_height()
bar1_height = bars[1].get_height()
ax.annotate(
"",
xy=(x[1] - bars[1].get_width() / 2+0.1, bar1_height),
xytext=((x[0] + bars[0].get_width() / 2), bar0_height),
arrowprops=dict(
arrowstyle="-|>",
mutation_scale=20, # 箭头大小
connectionstyle="arc3,rad=0", # 调整为负值,箭头凸起
color="#FFD580",
linewidth=3,
),
) # 浅橙色箭头,加粗
# 在子图中间显示变化百分比
ax.text(
0.5,
0.9,
f"持平",
ha="center",
va="center",
transform=ax.transAxes,
fontsize=12,
fontweight="bold",
color=color,
)
# 调整子图间距
plt.subplots_adjust(wspace=0) # 进一步减小子图之间的水平间距
plt.tight_layout(rect=[0, 0, 1, 0.95]) # 调整整体布局
# 获取当前脚本的绝对路径
current_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(os.path.dirname(current_dir))
# 创建 temp_picture 目录
temp_picture_dir = os.path.join(project_root, "temp_picture")
if not os.path.exists(temp_picture_dir):
os.makedirs(temp_picture_dir)
# 按年月创建子目录
month_dir = os.path.join(temp_picture_dir, f"{year}{month:02d}")
if not os.path.exists(month_dir):
os.makedirs(month_dir)
# 保存图形到指定目录
file_name = f"电力供应数据变化对比{year}{month:02d}{day:02d}.png"
file_path = os.path.join(month_dir, file_name)
plt.savefig(file_path, dpi=1200, bbox_inches="tight")
return file_path
# # 显示图形
# plt.show()
# plot_electricity_comparison(data)

@ -0,0 +1,208 @@
import pandas as pd
from datetime import datetime
import numpy as np
import re
import logging
# 获取日志记录器
logger = logging.getLogger(__name__)
def effective_cities(df):
try:
logger.info("开始判断是否81地市先从地市判断如果没有则从县区判断")
# ---------------------------------test---------------------------------------
# df = pd.read_excel(
# r'E:\work_data\work\三工单日报\三工单\20250309\20250309\南方电网话务及投诉统计表3月9日.xlsx',
# sheet_name='投诉明细(供电类)',
# skiprows=1,
# )
# # 将时间列转成字符串
# df['time'] = df['受理时间'].astype(str)
# # 通过字符串功能格式化时间
# df['time'] = df['time'].str.replace('-', '/')
# # 转成date方便比较
# df['datetime'] = pd.to_datetime(df['time'])
#
# # 开始时间和结束时间
# start_time = datetime(2025, 3, 8, 17, 0, 0)
# end_time = datetime(2025, 3, 9, 17, 0, 0)
# # 拿到供电质量在当天的数据
# df = df[(df['datetime'] > start_time) & (df['datetime'] <= end_time) & (df['一级分类'] == '供电质量')]
#
# # 判断数据区里面是否有深圳
# df['地市'] = df['地市'].astype(str)
#
# df.loc[df['地市'].str.contains('深圳'), '省'] = '深圳'
# ---------------------------------test---------------------------------------
# 判断是否81个地市如果不是则忽略
# 省份正则
# 81地市正则
city_list = [
"文昌",
"丽江",
"贺州",
"澄迈",
"迪庆",
"玉林",
"河池",
"百色",
"梧州",
"崇左",
"怒江",
"贵港",
"韶关",
"琼中",
"肇庆",
"文山",
"桂林",
"都匀",
"大理",
"瑞丽",
"南宁",
"汕尾",
"来宾",
"防城港",
"钦州",
"柳州",
"清远",
"阳江",
"六盘水",
"梅州",
"北海",
"昆明",
"兴义",
"揭阳",
"万宁",
"红河",
"定安",
"潮州",
"茂名",
"海口",
"云浮",
"德宏",
"汕头",
"惠州",
"湛江",
"毕节",
"铜仁",
"江门",
"凯里",
"三亚",
"楚雄",
"儋州",
"东莞",
"河源",
"中山",
"珠海",
"临高",
"乐东",
"遵义",
"东方",
"佛山",
"安顺",
"琼海",
"贵阳",
"广州",
"陵水",
"深圳",
"保亭",
"屯昌",
"白沙",
"昌江",
"五指山",
"贵安",
"昭通",
"临沧",
"曲靖",
"西双版纳",
"普洱",
"玉溪",
"保山",
"三沙",
]
# 深圳区正则
sz_district_list = [
"罗湖",
"福田",
"南山",
"宝安",
"龙岗",
"盐田",
"龙华",
"坪山",
"光明",
"大鹏",
]
# 地市判断正则
city_pattern1 = "|".join(city_list)
# 深圳区判断正则
sz_district_pattern = "|".join(sz_district_list)
# 地市里需要将深圳的区改成深圳:“罗湖|福田|南山|宝安|龙岗|盐田|龙华|坪山|光明|大鹏”
# df['地市'] = df['地市'].replace(sz_district_pattern, '深圳', regex=True)
# df["地市"] = np.where(
# df["地市"].fillna('').str.contains(sz_district_pattern, regex=True),
# "深圳", # 是 → 替换成固定值
# np.where(
# df["县区"].fillna('').str.contains(sz_district_pattern, regex=True),
# "深圳", # 是 → 替换成另一个固定值
# df["地市"] # 否 → 保持 col1或改为 "" / np.nan
# )
# )
# -------------------------------------看起来是深圳的区,不一定是深圳----------------------------------------------
# 条件1b 字段匹配正则
mask_b = df["地市"].str.contains(sz_district_pattern, regex=True, na=False)
# 条件2b 不匹配,但 c 字段匹配正则
mask_c = (~mask_b) & df["县区"].str.contains(
sz_district_pattern, regex=True, na=False
)
# 结合 a 字段的条件(例如,仅当 a 是 1, 3, 5 时才允许替换)
mask_a = df[""].isin(["广东", "深圳"])
# 最终替换条件:满足 (b匹配 或 c匹配) 且 a 在允许范围内
final_mask = (mask_b | mask_c) & mask_a
# 执行替换
df["地市"] = np.where(final_mask, "深圳", df["地市"])
logger.info(f"判断县区是否有深圳的信息{df['地市']}")
# -------------------------------------看起来是深圳的区,不一定是深圳----------------------------------------------
# 不包含在地市,就在区县取值
df["地市"] = np.where(
df["地市"].fillna("").str.contains(city_pattern1, regex=True),
df["地市"], # 是 → 保留 col1
np.where(
df["县区"].fillna("").str.contains(city_pattern1, regex=True),
df["县区"], # 是 → 取 col2
df["地市"], # 否 → 保持 col1或改为 "" / np.nan
),
)
logger.info(f"81地市通过地市及县区修正后的数据{df}")
# df = df[df['地市'].str.contains(city_pattern1)]
# 如果以上述地市开头,并且中间还有信息,则直接用“地市”加“供电局”
# 遍历数组,然后一一替换
for city in city_list:
df["地市"] = df["地市"].apply(lambda x: city if re.search(city, x) else x)
logger.info(f"81地市修改成功")
return df
except Exception as e:
logger.info(f"81地市修改失败{e}")
#
# if __name__ == '__main__':
#
# df = effective_cities()
# print(df.head())

@ -0,0 +1,59 @@
# 得到数据前一天得到时间避免直接减1是错的
from datetime import datetime, timedelta
import logging
import calendar
# 获取日志记录器
logger = logging.getLogger(__name__)
# 获取前一天的日期
def effective_date(year, month, day):
try:
logger.info("开始组装获取前一天的时间")
# 拿到一个日期
date_now = year + month + day
# 转成有效时间
date = datetime.strptime(date_now, "%Y%m%d")
# 计算前一天
day_before = date - timedelta(days=1)
# 获得年月日并返回
year = day_before.year
month = day_before.month
day = day_before.day
return year, month, day
except AttributeError:
logger.exception(f"获取前一天时间失败:{AttributeError}")
# 获取后一天的日期
def is_valid_date(year, month, day):
try:
datetime(year=year, month=month, day=day)
return True
except ValueError:
return False
def get_next_day(year, month, day):
try:
if not is_valid_date(year, month, day):
raise ValueError("输入的日期无效")
current_date = datetime(year=year, month=month, day=day)
next_date = current_date + timedelta(days=1)
return next_date.year, next_date.month, next_date.day
except ValueError:
logger.exception(f"获取后一天时间失败:{ValueError}")
if __name__ == "__main__":
# 示例使用
year, month, day = 2025, 6, 5 # 闰年2月28日
next_year, next_month, next_day = get_next_day(year, month, day)
print(f"后一天是: {next_year}-{next_month}-{next_day}") # 输出: 2020-2-29

@ -0,0 +1,105 @@
from datetime import datetime, timedelta
from lunarcalendar import Converter, Lunar # 用于农历转换
# 为完成,需求不明确
holiday_dict = {
"元旦": {
"type": "fixed",
"date": {"month": 1, "day": 1},
"duration": 1, # 假期天数
},
"春节": {
"type": "lunar",
"date": {"month": 1, "day": 1}, # 农历正月初一
"duration": 7,
},
"清明节": {"type": "fixed", "date": {"month": 4, "day": 4}, "duration": 3},
"劳动节": {"type": "fixed", "date": {"month": 5, "day": 1}, "duration": 5},
"端午节": {
"type": "lunar",
"date": {"month": 5, "day": 5}, # 农历五月初五
"duration": 3,
},
"中秋节": {
"type": "lunar",
"date": {"month": 8, "day": 15}, # 农历八月十五
"duration": 1,
},
"国庆节": {"type": "fixed", "date": {"month": 10, "day": 1}, "duration": 7},
}
def is_holiday(year, month, day):
current_date = datetime(year, month, day)
# 检查是否是固定节假日
for holiday, info in holiday_dict.items():
if info["type"] == "fixed":
holiday_date = datetime(year, info["date"]["month"], info["date"]["day"])
if (current_date - holiday_date).days >= 0 and (
current_date - holiday_date
).days < info["duration"]:
print(f"今天是:{holiday}")
return True, holiday
# 检查是否是农历节假日
for holiday, info in holiday_dict.items():
if info["type"] == "lunar":
lunar_month = info["date"]["month"]
lunar_day = info["date"]["day"]
# 将农历转换为公历
lunar = Lunar(year, lunar_month, lunar_day)
solar_date = Converter.Lunar2Solar(lunar)
# 判断当前日期是否在农历节假日范围内
delta = (
current_date
- datetime(solar_date.year, solar_date.month, solar_date.day)
).days
if delta >= 0 and delta < info["duration"]:
print(f"今天是:{holiday}")
return True, holiday
return False, None
def get_last_year_holiday_data(year, month, day, holiday_name):
last_year = year - 1
if holiday_dict[holiday_name]["type"] == "fixed":
# 获取去年节假日的开始日期和结束日期
start_date = datetime(
last_year,
holiday_dict[holiday_name]["date"]["month"],
holiday_dict[holiday_name]["date"]["day"],
)
end_date = start_date + timedelta(
days=holiday_dict[holiday_name]["duration"] - 1
)
else:
lunar_month = holiday_dict[holiday_name]["date"]["month"]
lunar_day = holiday_dict[holiday_name]["date"]["day"]
# 将去年的农历转换为公历
lunar = Lunar(last_year, lunar_month, lunar_day)
solar_date = Converter.Lunar2Solar(lunar)
start_date = datetime(solar_date.year, solar_date.month, solar_date.day)
end_date = start_date + timedelta(
days=holiday_dict[holiday_name]["duration"] - 1
)
print(f"去年节假日数据时间段: {start_date}{end_date}")
# 在这里调用数据库查询或其他方法获取去年的数据
# last_year_data = query_data_from_db(start_date, end_date)
return start_date, end_date
year = 2024
month = 9
day = 17
is_holiday_flag, holiday_name = is_holiday(year, month, day)
if is_holiday_flag:
start_date, end_date = get_last_year_holiday_data(year, month, day, holiday_name)
print(f"去年{holiday_name}的日期范围:{start_date} - {end_date}")

@ -0,0 +1,800 @@
# -*- coding: utf-8 -*-
import re
import os
import logging
import pandas as pd
from fastapi import HTTPException
from fastapi.responses import JSONResponse
from docx import Document
from pathlib import Path
from datetime import datetime
from docx.shared import Inches
from app.tools.deal_excels import deal_excel, top5_dod_analysis, transform_data
from app.tools.get_time import get_time
from app.tools.replace_text import replace_text_in_docx
from app.tools.replace_table import copy_table, copy_sta_table
from app.tools.style import table_style
from app.tools.effective_date import get_next_day
from app.tools.find_before_word import extract_overload_info_from_previous_day
from app.tools.count_data import count_change_outage, count_outage_sentiment
from app.tools.draw_picture import plot_electricity_comparison
# 获取日志记录器
logger = logging.getLogger(__name__)
def deal_docx(folder_path, save_path=None, time_type=0):
"""
:param folder_path: 文件上传后保存的路径
:param save_path: 最终生成的日报/简报的保存路径
:param time_type: 判断时间的统计方式0代表前一天17点之后到当天17点之前1代表当天00:00:00到当天23:59:59
:return: 返回生成日报的存储路径保存到mysql
"""
# 拿到文件夹下所有文件名
# folder_path = 'E:/work_data/work/三工单日报/20250308/源数据/源数据'
# folder_path = 'E:/work_data/work/三工单日报/20250309/20250309'
# folder_path = 'E:/work_data/work/三工单日报/20250310/20250310'
try:
logger.info("进入日报生成方法")
files = os.listdir(folder_path)
file_path_dict = {}
# 拿到需要分析的三个文档
for file in files:
# 停电word
if file.endswith(".docx") and "投诉服务" in file:
file_path_dict["power_off_doc"] = folder_path + "/" + file
continue
# 舆情word
if file.endswith(".docx") and "抢修投诉舆情" in file:
file_path_dict["sentiment_doc"] = folder_path + "/" + file
print(f"舆情文件路径{file_path_dict['sentiment_doc']}")
continue
# 投诉excel
if file.endswith(".xlsx") and "投诉统计表" in file:
file_path_dict["power_off_excel"] = folder_path + "/" + file
continue
# 如果传入的文件不对,抛出异常
if len(file_path_dict) != 3:
logger.exception("文件格式错误")
raise HTTPException(
status_code=400,
detail="文档无法正确解析,请确认上传的生成日报的资料是否完整",
)
# ————————————————————————处理word文档—————————————————————————
# 读取停电word文件信息
doc_poweroff = Document(file_path_dict["power_off_doc"])
# 读取舆情word文件
doc_sentiment = Document(file_path_dict["sentiment_doc"])
# 日报拼接数据字典
doc_dict = {}
if time_type == 0:
# 旧版正则
# update:2025-07-04 格式维06和7的匹配
time_re = re.compile(r"^(\d+年)?\d+月\d+日\d+时至.{7,15}期间[,]")
elif time_type == 1:
# 20250429过滤时间正则
time_re = re.compile(
r"^(\d+年)?\d+月\d+日[^,,。\.;;“”\']{0,10}至[^,,。\.;;“”\']{0,15}期间[,]"
)
# 避免拿错段落,则进行遍历
paragraphs_poweroff = doc_poweroff.paragraphs
for para in paragraphs_poweroff:
# 第一点内容
if re.match(r".*全网累计停电.*", para.text):
# print(para.text)
doc_dict["first_point_para1"] = re.sub(time_re, "", para.text)
continue
if re.match(r".*全网故障抢修工单.*", para.text):
# print(para.text)
doc_dict["first_point_para2"] = re.sub(
r"[,]整体抢修工作态势正常。+$", "", para.text
)
continue
# 第二点过载台数
if re.search(r"过载\d+台", para.text):
doc_dict["over_load"] = (
re.search(r"过载\d+台", para.text).group().replace("过载", "")
).replace("", "")
# print(doc_dict['over_load'])
continue
# 拿到舆情的段落
paragraphs_sentiment = doc_sentiment.paragraphs
for para in paragraphs_sentiment:
if re.match(r".*舆情风险信息\d+条.*", para.text):
text_temp = re.sub(time_re, "", para.text)
doc_dict["sentiment_trend"] = re.search(
r"[^,\\.。]*[,]舆情态势[^,\\.。]*[\\.。]$", text_temp
).group()
doc_dict["sentiment_para"] = re.sub(
r"[^,\\.。,]*[,]舆情态势[^,\\.。]*[\\.。]$", "", text_temp
)
continue
# 获取所有表格
tables = doc_poweroff.tables
# 舆情直接取第一个表格
table_sentiment = doc_sentiment.tables[0]
# 定义要查看的区域范围
start_row1_1 = 2 # 起始行索引从0开始
end_row1_1 = 8 # 结束行索引(不包括)
start_col1_1 = 10 # 起始列索引从0开始
end_col1_1 = 13 # 结束列索引(不包括)
# 表1 “抢修、投诉、舆情”三工单监测汇总表
table1 = tables[0]
# 定义要查看的区域范围
start_row1 = 2 # 起始行索引从0开始
end_row1 = 8 # 结束行索引(不包括)
start_col1 = 2 # 起始列索引从0开始
end_col1 = 9 # 结束列索引(不包括)
# 表2配变过载监测汇总表
table2 = tables[1]
# 定义要查看的区域范围
start_row2 = 1 # 起始行索引从0开始
end_row2 = 8 # 结束行索引(不包括)
start_col2 = 1 # 起始列索引从0开始
end_col2 = 4 # 结束列索引(不包括)
# 表3停电用户前五供电局
table3 = tables[2]
# 定义要查看的区域范围
start_row3 = 2 # 起始行索引从0开始
end_row3 = 7 # 结束行索引(不包括)
start_col3 = 1 # 起始列索引从0开始
end_col3 = 5 # 结束列索引(不包括)
# 新增表4 95598供电类投诉前五供电局统计表
# table4 = doc_poweroff.add_table(6, 5)
# ————————————————————————处理word文档—————————————————————————
# -----------------------------------------------------------------------------------------
# ————————————————————————表格环比统计——————————————————————————
# 首先拿到分析时间,明确要分析哪天的数据
(
start_time,
end_time,
before_start_time,
year,
month,
day,
day_before,
month_before,
) = get_time(files, time_type)
# 获取后一天的时间
year_now, month_now, day_now = get_next_day(int(year), int(month), int(day))
# 通过上述时间,统计停电excel的情况
# 当天情况
province_statistics, district_statistics = deal_excel(
start_time, end_time, file_path_dict["power_off_excel"]
)
print(f"省份统计{province_statistics}")
print(f"地市统计{district_statistics}")
province_statistics_list = list(province_statistics.values())
# 当天省份总投诉
province_statistics_total = sum(province_statistics.values())
print(f"省份总投诉{province_statistics_total}")
# 昨天情况
province_stat_before, district_stat_before = deal_excel(
before_start_time, start_time, file_path_dict["power_off_excel"]
)
print(f"省份昨日情况{province_stat_before}")
# 昨天省份总投诉
province_stat_be_total = sum(province_stat_before.values())
print(f"省份昨日总投诉{province_stat_be_total}")
# 省份环比
province_dod = {
k: province_statistics[k] - province_stat_before[k]
for k in province_statistics.keys()
}
# 最终省份环比结果
for key, value in province_dod.items():
if int(value) > 0:
province_dod[key] = "+" + str(value)
elif int(value) == 0:
province_dod[key] = "持平"
print(f"省份环比{province_dod}")
province_dod_list = list(province_dod.values())
# 表1中剩余的省份统计数据及舆情的统计数据、环比情况
table1_extra_data = transform_data(
[province_statistics_list, province_dod_list]
)
logger.info(
f"表1中剩余的省份统计数据及舆情的统计数据、环比情况{table1_extra_data}"
)
# 省份统计的表格数据在表格中的起始位置
start_row_pro_sta = 2
start_col_pro_sta = 9
# 将昨天的地市统计转成字典
district_stat_before = dict(district_stat_before)
# 查看今天的前五在昨天的情况
"""
情况1今天的数据大于5则可以直接用现有逻辑
情况2今天的数据小于5值判断小于5的这几条比如只有1条就判断这一条的情况
"""
top_dod_dict = {}
# 需要判断地市停电的有没有5个分小于5或者大于等于5
top5_name_list = []
top5_poweroff_list = []
# update:2025-07-04 修改供电类投诉前五供电局统计表的同排行
need_district_statistics = (
district_statistics[0:5]
if len(district_statistics) > 5
else district_statistics
)
other_district_statistic = (
district_statistics[5:] if len(district_statistics) > 5 else []
)
other_count = 0
if (
len(other_district_statistic) > 0
and district_statistics[4][1] == district_statistics[5][1]
):
for i in range(len(other_district_statistic)):
if other_district_statistic[i][1] == district_statistics[4][1]:
other_count += 1
poweroff_value = need_district_statistics[len(need_district_statistics) - 1][1]
count = 0
for i in range(len(need_district_statistics)):
current_poweroff_value = need_district_statistics[i][1]
if current_poweroff_value == poweroff_value:
count += 1
else:
top5_name_list.append(need_district_statistics[i][0])
top5_poweroff_list.append(need_district_statistics[i][1])
top_dod_dict[need_district_statistics[i][0]] = top5_dod_analysis(
need_district_statistics[i], district_stat_before
)
if count == 1:
top5_name_list.append(
need_district_statistics[len(need_district_statistics) - 1][0]
)
top5_poweroff_list.append(
need_district_statistics[len(need_district_statistics) - 1][1]
)
top_dod_dict[
need_district_statistics[len(need_district_statistics) - 1][0]
] = top5_dod_analysis(
need_district_statistics[len(need_district_statistics) - 1],
district_stat_before,
)
else:
top5_name_list.append(f"其他{count + other_count}单位")
top5_poweroff_list.append(poweroff_value)
top_dod_dict["其他单位"] = "-"
# old_version
"""
if len(district_statistics) >= 5:
# 地市前五统计
# print(district_statistics)
top1 = district_statistics[0]
top2 = district_statistics[1]
top3 = district_statistics[2]
top4 = district_statistics[3]
top5 = district_statistics[4]
print(f'地市前五{top1}{top2}{top3}{top4}{top5}')
top5_name_list = [top1[0], top2[0], top3[0], top4[0], top5[0]]
top5_poweroff_list = [top1[1], top2[1], top3[1], top4[1], top5[1]]
top_dod_dict[top1[0]] = top5_dod_analysis(top1, district_stat_before)
top_dod_dict[top2[0]] = top5_dod_analysis(top2, district_stat_before)
top_dod_dict[top3[0]] = top5_dod_analysis(top3, district_stat_before)
top_dod_dict[top4[0]] = top5_dod_analysis(top4, district_stat_before)
top_dod_dict[top5[0]] = top5_dod_analysis(top5, district_stat_before)
elif 0 < len(district_statistics) < 5:
for i in range(len(district_statistics)):
top5_name_list.append(district_statistics[i][0])
top5_poweroff_list.append(district_statistics[i][1])
top_dod_dict[district_statistics[i][0]] = top5_dod_analysis(district_statistics[i], district_stat_before)
"""
print(f"地市前五名称{top5_name_list}")
print(f"地市前五数据{top5_poweroff_list}")
# top_dod_dict[top1[0]] = top5_dod_analysis(top1, district_stat_before)
# top_dod_dict[top2[0]] = top5_dod_analysis(top2, district_stat_before)
# top_dod_dict[top3[0]] = top5_dod_analysis(top3, district_stat_before)
# top_dod_dict[top4[0]] = top5_dod_analysis(top4, district_stat_before)
# top_dod_dict[top5[0]] = top5_dod_analysis(top5, district_stat_before)
print(f"地市环比{top_dod_dict}")
top5_stat_list = list(top_dod_dict.values())
# 地市前5的名称、数据、环比放入列表并转至方便写入表格4
top5_list = transform_data([top5_name_list, top5_poweroff_list, top5_stat_list])
# 表4中的插入位置
start_tb4_row = 2
start_tb4_col = 1
# 省总的投诉情况及环比
complain_dod = int(province_statistics_total) - int(province_stat_be_total)
logger.info(f"省份总量环比{complain_dod}")
# 计算省份总量环比
if complain_dod > 0:
# 使用 f-string 进行格式化
complain_dod = f"增加{complain_dod / province_stat_be_total * 100:.2f}%"
elif complain_dod < 0:
# 使用 f-string 进行格式化
complain_dod = f"减少{-complain_dod / province_stat_be_total * 100:.2f}%"
else:
complain_dod = "持平"
# 异常处置情况
electricity_exception = ""
standardize_date = None
if time_type == 0:
# !!!旧版 前面已经过滤掉了时间信息,此处对时间进行单独赋值操作
standardize_date = (
f"{month_before}{day_before}日17时至{month}{day}日17时期间"
)
elif time_type == 1:
# -------------------------20250429更新修改开始和结束时间---------------------------------
standardize_date = f"{year}{month}{day}日0时至24时期间"
# standardize_date = ''
# -------------------------20250429更新修改开始和结束时间---------------------------------
# ————————————————————————表格环比统计——————————————————————————
# 获取当前文件夹路径
current_path = Path(__file__).parent
templates_path = str(os.path.join(current_path.parent, "templates")).replace(
"\\", "/"
)
# 默认标题
# 注意,标题会根据不同时期进行调整
report_title = r"南方电网公司“停电抢修、投诉服务、舆情管控”三工单联动监测日报"
# ————————————————————————组装完整简报——————————————————————————
if time_type == 0:
# 旧版正则
sample_first_para = (
f"{month_before}{day_before}日17时至{month}{day}日17时"
)
elif time_type == 1:
# 20250429过滤时间正则
sample_first_para = f"{year}{month}{day}日0时至24时"
# 简报舆情信息
doc_dict["sentiment_para_simple"] = doc_dict["sentiment_para"].replace(
"全网监测到", ""
)
if re.search(r"重要用户停电[^0]户", doc_dict["first_point_para1"]):
doc_dict["have_important"] = re.sub(
"[,]用户停电情况总体平稳",
"",
re.sub("其中[,]", "", doc_dict["first_point_para1"]),
)
else:
doc_dict["have_important"] = (
re.sub(
r"[,]其中.{0,3}重要用户停电0户.{0,5}停电情况总体平稳[\\.。]",
"",
doc_dict["first_point_para1"],
)
+ ",无重要用户停电。"
)
# 获取停电数字信息
(
total_outage,
short_term_outage,
change_outage,
percentage,
short_precentage,
important_stop_outage,
type,
) = count_change_outage(doc_dict["have_important"])
# 获取舆情数字信息
today_sentiment, type_sentiment, yesterday_sentiment, result_sentiment = (
count_outage_sentiment(doc_dict["sentiment_para_simple"])
)
# 简报的舆情信息只要总数和环比
complain_simple = (
f"95598供电类投诉{province_statistics_total}条,环比{complain_dod}"
)
print(doc_dict["have_important"])
print(doc_dict["sentiment_para_simple"])
current_doc_name = f"南方电网公司停电抢修投诉服务舆情管控三工单联动监测日报{year}{int(month):02d}{int(day):02d}.docx"
doc_dict_over_load = doc_dict["over_load"]
over_load_before = extract_overload_info_from_previous_day(
current_word=current_doc_name
)
if over_load_before:
# 将字符串转换为浮点数
over_load_before = float(over_load_before)
doc_dict_over_load = float(doc_dict_over_load)
if over_load_before > doc_dict_over_load:
over_load_percent = (
(over_load_before - doc_dict_over_load) / over_load_before * 100
)
over_load_percent = f"{over_load_percent:.2f}%"
over_load_type = "减少"
elif over_load_before < doc_dict_over_load:
over_load_percent = (
(doc_dict_over_load - over_load_before) / over_load_before * 100
)
over_load_percent = f"{over_load_percent:.2f}%"
over_load_type = "增加"
else:
over_load_percent = 0
over_load_type = "持平"
else:
over_load_before = ""
over_load_percent = ",缺少上一天数据"
over_load_type = "无法估计"
# 组装替换的文本
replacements_simple = {
"{{standardize_date}}": standardize_date,
"{{total_outage}}": str(total_outage),
"{{short_term_outage}}": str(short_term_outage),
"{{change_outage}}": str(change_outage),
"{{percentage}}": str(percentage),
"{{short_precentage}}": str(short_precentage),
"{{important_stop_outage}}": str(important_stop_outage),
"{{type}}": type,
"{{have_important}}": doc_dict["have_important"],
"{{over_load}}": doc_dict["over_load"],
"{{over_load_percent}}": str(over_load_percent),
"{{over_load_type}}": over_load_type,
"{{complain}}": complain_simple,
"{{sample_first_para}}": sample_first_para,
"{{today_sentiment}}": str(today_sentiment),
"{{type_sentiment}}": type_sentiment,
"{{yesterday_sentiment}}": str(yesterday_sentiment),
"{{result_sentiment}}": str(result_sentiment) if result_sentiment!="" else "",
"{{year}}": year,
"{{month}}": month,
"{{day}}": day,
}
# 组装简报
electricity_daily_simple = Document(f"{templates_path}/简报模板.docx")
# 替换模板字符串
replace_text_in_docx(electricity_daily_simple, replacements_simple)
datas = {
"停电用户\n(万户)": {
"昨天": total_outage + change_outage,
"今天": total_outage,
},
"过载配变\n(台)": {"昨天": over_load_before, "今天": doc_dict_over_load},
"95598供电类\n投诉(条)": {
"昨天": province_stat_be_total,
"今天": province_statistics_total,
},
"涉电力供应类舆情\n风险信息(条)": {
"昨天": yesterday_sentiment,
"今天": today_sentiment,
},
}
# 将数据转换为DataFrame
df = pd.DataFrame(datas)
# 遍历 datas 中的每个值,将 None 或 空字符串替换为 0
for key, value in datas.items():
for sub_key, sub_value in value.items():
if sub_value is None or sub_value == "":
datas[key][sub_key] = None # 将不存在或为空的值设置为 0
else:
datas[key][sub_key] = int(sub_value) # 确保值是整数
# 生成柱状图
img_path = plot_electricity_comparison(year, month, day, datas)
# 查找插入图片的位置(假设模板中有"{{IMG_PLACEHOLDER}}"作为占位符)
img_placeholder = "{{IMG_PLACEHOLDER}}"
img_inserted = False
for paragraph in electricity_daily_simple.paragraphs:
if img_placeholder in paragraph.text:
# 删除占位符文本
paragraph.text = paragraph.text.replace(img_placeholder, "")
# 插入图片
run = paragraph.add_run()
run.add_picture(img_path, width=Inches(6.0))
img_inserted = True
break
if not img_inserted:
# 如果未找到占位符,则在文档末尾添加图片
p = electricity_daily_simple.add_paragraph()
run = p.add_run()
run.add_picture(img_path, width=Inches(6.0))
# 将表格写入简报
# 设置全局样式
style = electricity_daily_simple.styles["Normal"]
style.font.name = "Times New Roman"
# 按照月份分门别类的文件夹
save_folder = f"{year}{str(month).zfill(2)}"
# 创建子文件夹
if not os.path.exists(f"{save_path}/{save_folder}"):
os.makedirs(f"{save_path}/{save_folder}", exist_ok=True)
final_file = None
final_sim_file = None
# 最终保存文件的路径情况
if time_type == 0:
final_file = f"{save_path}/{save_folder}/{report_title}-{year}{str(month).zfill(2)}{str(day).zfill(2)}.docx"
final_sim_file = f"{save_path}/{save_folder}/【简版】{report_title}-{year}{str(month).zfill(2)}{str(day).zfill(2)}.docx"
elif time_type == 1:
final_file = f"{save_path}/{save_folder}/{report_title}-{year_now}{str(month_now).zfill(2)}{str(day_now).zfill(2)}.docx"
final_sim_file = f"{save_path}/{save_folder}/【简版】{report_title}-{year_now}{str(month_now).zfill(2)}{str(day_now).zfill(2)}.docx"
# 删除旧文件,方便文件更新
delete_old_file(final_file)
delete_old_file(final_sim_file)
# 生成简报
# 接口保存路径地址
# 保存为Excel文件
path = f"{save_path}/{save_folder}/{year}{str(month).zfill(2)}{str(day).zfill(2)}电力统计数据.xlsx"
df.to_excel(path, index=True)
electricity_daily_simple.save(final_sim_file)
# 测试保存路径
# electricity_daily_simple.save(f'【简版】公司全国“两会”保供电期间配网设备运行及三工单监测日报-{year}{str(month).zfill(2)}{str(day).zfill(2)}.docx')
# # ————————————————————————组装完整简报——————————————————————————
# -----------------------------------------------------------------------------------------
# ————————————————————————组装完整日报——————————————————————————
# 将数据组装相关的时间内容
doc_dict["first_point_para1"] = standardize_date + doc_dict["first_point_para1"]
doc_dict["sentiment_para"] = standardize_date + doc_dict["sentiment_para"]
# {{standardize_date}}全网收到{{complain_num}}条供电类投诉,环比{{complain_dod}}条;
complain_text = (
standardize_date
+ f"全网收到{str(province_statistics_total)}条供电类投诉,环比{complain_dod}"
)
# update:2025-07-04 备注,增加过载环比
replacements = {}
if time_type == 0:
# 组装替换的文本
replacements = {
"{{year}}": year,
"{{month}}": month,
"{{day}}": day,
"{{power_off_one}}": doc_dict["first_point_para1"],
"{{power_off_two}}": doc_dict["first_point_para2"],
"{{over_load}}": doc_dict["over_load"],
"{{over_load_percent}}": str(over_load_percent),
"{{over_load_type}}": over_load_type,
"{{complain}}": complain_simple,
"{{sentiment}}": doc_dict["sentiment_para"],
"{{sentiment_trend}}": doc_dict["sentiment_trend"],
"{{exception}}": electricity_exception,
}
elif time_type == 1:
# 组装替换的文本
replacements = {
"{{year}}": str(year_now),
"{{month}}": str(month_now),
"{{day}}": str(day_now),
"{{power_off_one}}": doc_dict["first_point_para1"],
"{{power_off_two}}": doc_dict["first_point_para2"],
"{{over_load}}": doc_dict["over_load"],
"{{over_load_percent}}": str(over_load_percent),
"{{over_load_type}}": over_load_type,
"{{complain}}": complain_simple,
"{{sentiment}}": doc_dict["sentiment_para"],
"{{sentiment_trend}}": doc_dict["sentiment_trend"],
"{{exception}}": electricity_exception,
}
# 组装日报
electricity_daily = Document(f"{templates_path}/日报模板.docx")
#
replace_text_in_docx(electricity_daily, replacements)
# 将表格添加到新的文档里
# 组装表1的数据
# 此处缺少省份统计数据和舆情数据
logger.info("将数据写入word表格1")
copy_table(
table1,
electricity_daily.tables[0],
start_row1,
end_row1,
start_col1,
end_col1,
0,
)
# 插入各个省份的投诉数据及环比
logger.info("将自行统计的数据插入表格1")
copy_sta_table(
electricity_daily.tables[0],
table1_extra_data,
start_row_pro_sta,
start_col_pro_sta,
)
# 放入舆情的数据
copy_table(
table_sentiment,
electricity_daily.tables[0],
start_row1_1,
end_row1_1,
start_col1_1,
end_col1_1,
1,
)
# 复制表2的数据
copy_table(
table2,
electricity_daily.tables[1],
start_row2,
end_row2,
start_col2,
end_col2,
0,
)
# 复制表3的数据
copy_table(
table3,
electricity_daily.tables[2],
start_row3,
end_row3,
start_col3,
end_col3,
0,
)
# 填充表格4
# 需要判断是否前五数据不存在
if top5_list:
copy_sta_table(
electricity_daily.tables[3],
top5_list,
start_tb4_row,
start_tb4_col,
is_dynamics=True,
length=5,
)
# copy_sta_table(electricity_daily.tables[3], top5_list, start_tb4_row, start_tb4_col)
# 将表格中的字体中文设置成仿宋英文数字设置成新罗马均为11号大小
for table in electricity_daily.tables:
table_style(table)
# 设置英文数字样式
# 设置全局样式
# 显式设置每个段落的字体
for paragraph in electricity_daily.paragraphs:
for run in paragraph.runs:
run.font.name = "Times New Roman"
# 接口保存路径
electricity_daily.save(final_file)
# 返回doc、年月日然后在接口代码里进行分析后提取表1的数据保存到数据库
# return electricity_daily, year, month, day
# 日报本身的时间
statistics_time = None
if time_type == 0:
statistics_time = datetime(int(year), int(month), int(day))
elif time_type == 1:
statistics_time = datetime(int(year_now), int(month_now), int(day_now))
# 返回值保存到数据库,以二进制保存
if time_type == 0:
return {
"report_title": f"{report_title}-{year}{str(month).zfill(2)}{str(day).zfill(2)}.docx",
"daily_report": final_file,
"daily_repo_simple": final_sim_file,
"statistics_time": statistics_time,
"save_folder": save_folder,
# 'excel':path,
# 'img':img_path
}
elif time_type == 1:
return {
"report_title": f"{report_title}-{year_now}{str(month_now).zfill(2)}{str(day_now).zfill(2)}.docx",
"daily_report": final_file,
"daily_repo_simple": final_sim_file,
"statistics_time": statistics_time,
"save_folder": save_folder,
# 'excel':path,
# 'img':img_path
}
# 测试保存路径
# electricity_daily.save(f'公司全国“两会”保供电期间配网设备运行及三工单监测日报-{year}{str(month).zfill(2)}{str(day).zfill(2)}.docx')
# # ————————————————————————组装完整日报——————————————————————————
except Exception as e:
logger.exception("最终渲染阶段失败")
return JSONResponse(
content={"status_code": 500, "detail": f"word解析异常{e}"}
)
# 从磁盘删除旧文件方法
def delete_old_file(file):
try:
if os.path.exists(file):
os.remove(file)
logger.info("磁盘里的旧文件删除成功")
except Exception as e:
logger.info(f"删除旧文件失败:{e}")
# if __name__ == '__main__':
# folder_path = 'E:/work_data/work/三工单日报/20250310/20250310'
#
# deal_docx(folder_path)

@ -0,0 +1,110 @@
import os
import datetime
import re
from docx import Document
import logging
from fastapi import HTTPException
from datetime import date, timedelta
logger = logging.getLogger(__name__)
def extract_overload_info_from_previous_day(
current_word=None, base_folder="temp_download_raw"
):
"""
根据传入的 Word 文档名称查找前一天的 Word 文档并提取过载台数信息
参数
current_word (str): 当前 Word 文档名称默认 None必须动态传入
base_folder (str): 存放 Word 文档的基础文件夹路径默认为 'temp_download_raw'
返回
str: 前一天的 Word 文档中的过载台数信息如果没有找到则返回 None
"""
print(current_word)
print(type(current_word))
if not current_word:
logger.error("需要提供当前 Word 文档名称")
return None
try:
# 提取当前文档中的日期部分(假设文档名中日期格式为 YYYYMMDD
current_date_str = re.search(r"\d{8}", current_word).group()
current_date = date(
int(current_date_str[:4]),
int(current_date_str[4:6]),
int(current_date_str[6:8]),
)
except Exception as e:
logger.error(f"无法从当前文档名称中提取日期:{e}")
return None
try:
# 计算前一天的日期
previous_date = current_date - timedelta(days=1)
# 构造前一天的 Word 文档名称
previous_word_name = current_word.replace(
current_date_str, previous_date.strftime("%Y%m%d")
)
# 构造前一天的文件夹路径
previous_month_folder = previous_date.strftime("%Y%m")
previous_folder_path = os.path.join(base_folder, previous_month_folder)
# 检查前一天的文件夹是否存在
if not os.path.exists(previous_folder_path):
logger.error(f"前一天的文件夹不存在:{previous_folder_path}")
return None
# 查找前一天的 Word 文档
previous_word_path = None
for file_name in os.listdir(previous_folder_path):
if file_name == previous_word_name:
previous_word_path = os.path.join(previous_folder_path, file_name)
break
if not previous_word_path:
logger.error(f"前一天的 Word 文档不存在:{previous_word_name}")
return None
# 读取前一天的 Word 文档
doc = Document(previous_word_path)
# 提取过载台数信息
overload_info = None
for para in doc.paragraphs:
if re.search(r"过载\d+台", para.text):
overload_info = (
re.search(r"过载\d+台", para.text)
.group()
.replace("过载", "")
.replace("", "")
)
break
if overload_info:
logger.info(f"前一天的过载台数信息:{overload_info}")
return overload_info
else:
logger.error("未找到前一天的过载台数信息")
return None
except Exception as e:
logger.exception(f"提取前一天过载台数信息失败:{e}")
raise HTTPException(status_code=500, detail=f"提取前一天过载台数信息失败:{e}")
# # 示例用法
# year = "2025"
# month = "03"
# day = "23"
# current_word = f'南方电网公司停电抢修投诉服务舆情管控三工单联动监测日报{year}{month}{day}.docx'
# print(current_word)
# overload_info = extract_overload_info_from_previous_day(current_word)
# if overload_info:
# print(f"前一天的过载台数信息:{overload_info}")
# else:
# print("未找到前一天的过载台数信息")

@ -0,0 +1,28 @@
import os
import re
import logging
# 获取日志记录器
logger = logging.getLogger(__name__)
def get_final_name(download_dir):
try:
# 判断是否生成日报成功如果成功则转成html返回前端
report_name_dict = {}
if os.listdir(download_dir):
final_files = os.listdir(download_dir)
for file in final_files:
if re.match(r".*简版.*", file):
report_name_dict["report_sim_name"] = download_dir + "/" + file
else:
report_name_dict["report_name"] = download_dir + "/" + file
return report_name_dict["report_sim_name"], report_name_dict["report_name"]
except FileNotFoundError:
logger.exception(f"获取最终日报或简报路径失败:{FileNotFoundError}")
except KeyError:
logger.exception(f"获取最终日报或简报路径失败:{KeyError}")

@ -0,0 +1,113 @@
from datetime import datetime
import numpy as np
from fastapi.responses import JSONResponse
import re
import os
import logging
# 获取日志记录器
logger = logging.getLogger(__name__)
from app.tools.effective_date import effective_date
# files是通过os.listdir拿到的目标文件夹里的所有文件
def get_time(files, time_type):
try:
logger.info("开始获取日报需分析的时间段")
# 拿到文件夹下所有文件名
# folder_path = r'E:\work_data\work\三工单日报\三工单\20250306\源数据'
#
# files = os.listdir(folder_path)
# 获取生成日报的时间
pattern_time = r"\d{1,2}月\d{1,2}日"
# 另一个时间
pattern_time2 = r"(\d{4})(\.doc|\.xls|-[^\d]|[\)])"
time_list = []
for filename in files:
print(filename)
time_temp1 = re.search(pattern_time, filename)
time_temp2 = re.search(pattern_time2, filename)
if time_temp1:
time_list.append(time_temp1.group())
if time_temp2:
temp_month = re.sub(r"^0", "", time_temp2.group(1)[:2])
temp_day = re.sub(r"^0", "", time_temp2.group(1)[2:])
time_list.append(temp_month + "" + temp_day + "")
print(time_list)
if (
len(time_list) > 3
and time_list[0] == time_list[1] == time_list[2] == time_list[3]
):
# 把x月x日按照进行拆分
date_list = time_list[0].split("")
# 获取到今天的年月日信息
year = str(datetime.now().year)
month = date_list[0]
day = date_list[1].replace("", "")
# 调用自己写的方法获取昨天的年月日
# 昨天
year_before, month_before, day_before = effective_date(year, month, day)
# 前天
year_before2, month_before2, day_before2 = effective_date(
str(year_before), str(month_before), str(day_before)
)
# 先设置时间的默认值,然后根据统计时间类型进行区分
start_time = None
end_time = None
before_start_time = None
if time_type == 0:
# !!!旧版,拿到生成日报的开始时间和结束时间
# 当天17点开始时间/前一天17点结束时间
start_time = datetime(year_before, month_before, day_before, 17, 0, 0)
# 当天结束时间
end_time = datetime(int(year), int(month), int(day), 17, 0, 0)
# 前一天开始时间
before_start_time = datetime(
year_before2, month_before2, day_before2, 17, 0, 0
)
elif time_type == 1:
# --------------------------------20250429修改-------------------------------------------------
# 从0点开始计算到23:59:59
start_time = datetime(int(year), int(month), int(day), 0, 0, 0)
# 当天结束时间
end_time = datetime(int(year), int(month), int(day), 23, 59, 59)
# 前一天开始时间
before_start_time = datetime(
year_before, month_before, day_before, 0, 0, 0
)
# --------------------------------20250429修改-------------------------------------------------
return (
start_time,
end_time,
before_start_time,
year,
month,
day,
day_before,
month_before,
)
else:
raise Exception("请确认各文件是否为同一天的")
except Exception as e:
logger.exception(f"获取日报时间失败:{e}")
print(f"获取日报时间失败:{e}")
# if __name__ == '__main__':
# get_time()

@ -0,0 +1,34 @@
import os
import shutil
# 将快报下载文件夹的文件,挪到最终保存的文件夹,之后清空下载文件夹
def move_files(folder_download, folder_all):
try:
# 先判断文件夹是否存在
if not os.path.exists(folder_download):
os.makedirs(folder_download)
if not os.path.exists(folder_all):
os.makedirs(folder_all)
# 要转移的文件名
keywords = ["简版", "日报.zip"]
# 遍历源文件夹中的所有文件
for root, dirs, files in os.walk(folder_download):
for file in files:
file_path = os.path.join(root, file)
# 检查文件名是否包含任何关键词
has_keyword = False
for keyword in keywords:
if keyword in file:
has_keyword = True
break
# 如果文件名不包含任何关键词,则移动文件
if not has_keyword:
shutil.move(file_path, os.path.join(folder_all, file))
print(f"已移动文件: {file_path}{folder_all}")
except Exception as e:
print(f"发生错误: {e}")

@ -0,0 +1,46 @@
import os
import shutil
from pathlib import Path
# 将原始的四份文件转移到下载文件夹
def move_raw_files(folder_before, folder_after, save_folder):
try:
# 确保目标文件夹存在
Path(folder_after).mkdir(parents=True, exist_ok=True)
# 文件格式
extensions = (".docx", ".xlsx", ".xls")
# 创建子文件夹
if not os.path.exists(f"{folder_after}/{save_folder}"):
os.makedirs(f"{folder_after}/{save_folder}", exist_ok=True)
# 遍历源文件夹
file_paths = []
for item in os.listdir(folder_before):
item_path = os.path.join(folder_before, item)
# 只处理文件(不处理子文件夹)且扩展名匹配
if os.path.isfile(item_path) and item.lower().endswith(extensions):
# 构造目标路径
target_path = os.path.join(f"{folder_after}/{save_folder}", item)
# 移动文件
shutil.move(item_path, target_path)
file_paths.append(f"{folder_after}/{save_folder}/{item}")
print(f"已移动: {item}")
print(f"\n移动完成! 共移动了 {len(file_paths)} 个文件到 {folder_after}")
print(f"文件路径: {file_paths}")
return file_paths
except Exception as e:
print(f"原始文件移动失败: {e}")
if __name__ == "__main__":
folder_before = r"E:\code\python_code\daily_work\backend\temp_uploads"
folder_after = r"E:\code\python_code\daily_work\backend\temp_download_raw"
move_raw_files(folder_before, folder_after, "202505")

Binary file not shown.

@ -0,0 +1,74 @@
import copy
import logging
from docx.enum.text import WD_ALIGN_PARAGRAPH
from docx.oxml import CT_P
# 获取日志记录器
logger = logging.getLogger(__name__)
# 多行多列的数据复制到表格里,由于舆情表格比表1少一列因此加个判断条件flag=0,则是从相同表格复制flag=1,则是从舆情表格复制
def copy_table(
source_table, target_table, start_row, end_row, start_col, end_col, flag
):
try:
logger.info("遍历源表格的指定范围,将数据复制到目标表格的相同位置")
# 遍历源表格的指定范围,将数据复制到目标表格的相同位置
for i in range(start_row, end_row):
for j in range(start_col, end_col):
# 获取源表格单元格的内容
source_cell = source_table.cell(i, j)
# 将内容复制到目标表格的对应单元格
if flag == 0:
target_table.cell(i, j).text = source_cell.text
if flag == 1:
# j+1即表给从后一列开始添加数据
target_table.cell(i, j + 1).text = source_cell.text
except Exception as e:
logger.exception(f"复制表格数据时发生错误: {e}")
raise e
# 将自行统计是数据插入word表格中
# update:2025-07-04 删除多余行
def copy_sta_table(
target_table,
data,
start_row,
start_col,
is_dynamics: bool = None,
length: int = None,
):
try:
logger.info("开始将自行统计的数据插入word表格中")
# update:2025-07-04 删除多余的行数
if is_dynamics is not None and is_dynamics and len(data) < length:
for i in range(len(data) - 1):
source_row = target_table.rows[-1]._element
new_row_element = copy.deepcopy(source_row)
target_table._element.append(new_row_element)
new_row = target_table.rows[-1]
target_cell = new_row.cells[0]
while len(target_cell.paragraphs) > 1:
p_to_remove = target_cell.paragraphs[-1]._element
target_cell._element.remove(p_to_remove)
if not target_cell.paragraphs:
target_cell._element.append(CT_P())
main_paragraph = target_cell.paragraphs[0]
for run in main_paragraph.runs:
run.text = ""
main_paragraph.text = ""
main_paragraph.add_run(str(i + 2))
main_paragraph.alignment = WD_ALIGN_PARAGRAPH.CENTER
# 遍历列表,将其插入到表格的指定位置
for i in range(len(data)):
for j in range(len(data[i])):
# 计算目标表格中的行和列索引
target_row = start_row + i
target_col = start_col + j
# 将数据插入到目标表格的对应单元格
target_table.cell(target_row, target_col).text = str(data[i][j])
except Exception as e:
logger.exception(f"自行统计的数据插入word表格中失败: {e}")
raise e

@ -0,0 +1,45 @@
import logging
# 获取日志记录器
logger = logging.getLogger(__name__)
# 将文档中的字符串变量替换成提取内容
def replace_text_in_paragraph(paragraph, old_text, new_text):
try:
if old_text in paragraph.text: # 检查段落中是否存在模板字符串
# 遍历段落的每个运行
for run in paragraph.runs:
if old_text in run.text:
run.text = run.text.replace(old_text, new_text)
except Exception as e:
logger.exception(f"替换段落里的文本失败:{e}")
print(f"替换段落里的文本失败:{e}")
def replace_text_in_docx(doc, replacements):
try:
logger.info("开始替换段落中的文本")
# 替换段落中的文本
for paragraph in doc.paragraphs:
for old_text, new_text in replacements.items():
replace_text_in_paragraph(paragraph, old_text, new_text)
except Exception as e:
logger.exception(f"替换段落中的文本失败:{e}")
print(f"替换段落中的文本失败:{e}")
try:
logger.info("开始替换表格中的文本")
# 替换表格中的文本
for table in doc.tables:
for row in table.rows:
for cell in row.cells:
for old_text, new_text in replacements.items():
if old_text in cell.text:
cell.text = cell.text.replace(old_text, new_text)
except Exception as e:
logger.exception(f"替换表格中的文本失败:{e}")
print(f"替换表格中的文本失败:{e}")

@ -0,0 +1,32 @@
from docx.shared import Pt
from docx.oxml.ns import qn
from docx.enum.text import WD_ALIGN_PARAGRAPH
import logging
# 获取日志记录器
logger = logging.getLogger(__name__)
def table_style(table):
try:
logger.info("开始日报格式渲染")
# 遍历表格的每一行
for row in table.rows:
# 遍历每一行的每个单元格
for cell in row.cells:
# 遍历单元格的每个段落
for paragraph in cell.paragraphs:
paragraph.alignment = WD_ALIGN_PARAGRAPH.CENTER
# 遍历段落的每个运行Run
for run in paragraph.runs:
# 设置英文字体(适用于数字和英文)
run.font.name = "Times New Roman"
# 设置中文字体
run._element.rPr.rFonts.set(qn("w:eastAsia"), "仿宋")
# 设置字体大小
run.font.size = Pt(11)
except Exception as e:
logger.exception(f"文件格式渲染失败:{e}")
print(f"Error: {e}")

@ -0,0 +1,69 @@
from fastapi import HTTPException
import re
import os
import logging
import shutil
from app.tools.doc2docx import doc2docx
# # word上传格式要求
# ALLOWED_EXTENSIONS_DOC = {
# 'application/msword',
# 'application/vnd.openxmlformats-officedocument.wordprocessingml.document'
# }
#
# # excel上传格式要求
# ALLOWED_EXTENSIONS_EXCEL = {
# 'application/vnd.ms-excel',
# 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
# }
# 获取日志记录器
logger = logging.getLogger(__name__)
# 验证上传的文件是否符合要求
def verification_files(file, UPLOAD_DIR, file_type, exception_type):
try:
# 检查文件类型
if file.content_type not in file_type:
raise HTTPException(status_code=400, detail="文件类型不支持")
# 判断各个文件名是否符合该上传需求
# 如果文件名与实际需要的文件不匹配,则抛出异常
if not re.search(file.filename[3:11], exception_type):
raise HTTPException(status_code=400, detail=f"请传入{exception_type}")
# 先判断文件是否已经上传,如果已经上传,则删除旧的,保存新的
# 保存文件到对应的位置,判断是否已经存在相关文件,如果有,则删除旧的
if os.path.exists(UPLOAD_DIR) and len(os.listdir(UPLOAD_DIR)) > 0:
for file_name in os.listdir(UPLOAD_DIR):
if re.search(file_name[3:11], exception_type):
os.remove(os.path.join(UPLOAD_DIR, file_name))
logger.info(f"删除旧文件{file_name}")
# 不管是不是有文件,都走这一步
logger.info(f"开始上传{exception_type}")
# 如果文件夹不存在,则新建
if not os.path.exists(UPLOAD_DIR):
os.makedirs(UPLOAD_DIR)
file_path = os.path.join(UPLOAD_DIR, file.filename)
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# 如果上传为doc需要转成docx
if file_path.endswith(".doc"):
doc2docx(file_path)
logger.info(f"文件{file.filename}格式转换为docx成功")
# elif file_path.endswith('.xls'):
#
# xls2xlsx(file_path)
# logger.info(f'文件{file.filename}格式转换为xlsx成功')
except Exception as e:
logger.error(f"文档格式校验失败:{e}")
raise HTTPException(status_code=500, detail=f"文档格式校验失败{e}")

Binary file not shown.

@ -0,0 +1,39 @@
#!/bin/bash
# 设置监听的目标文件夹
TARGET_DIR="/app/temp_uploads"
while true; do
# 检查文件夹是否存在
if [ ! -d "$TARGET_DIR" ]; then
exit 1
fi
# 查找文件夹下所有的 .doc 文件
doc_files=$(find "$TARGET_DIR" -type f -name "*.doc")
# 若找到 .doc 文件,则进行转换
if [ -n "$doc_files" ]; then
for doc_file in $doc_files; do
# 获取不带扩展名的文件名
base_name="${doc_file%.*}"
# 生成对应的 .docx 文件路径
docx_file="${base_name}.docx"
# 使用 soffice 进行转换
soffice --headless --convert-to docx "$doc_file" --outdir "$TARGET_DIR"
# 检查转换是否成功
if [ -f "$docx_file" ]; then
# 转换成功,删除原有的 .doc 文件
rm "$doc_file"
echo "已将 $doc_file 转换为 $docx_file 并删除原文件。"
sleep 3
else
echo "转换 $doc_file 失败。"
fi
done
fi
done

@ -0,0 +1,58 @@
services:
# 前端服务
frontend:
image: daily-report-frontend:1.0.0 # 替换为你的前端 Docker 镜像
logging:
driver: "json-file" # 日志驱动,默认为 json-file
options:
max-size: "10m" # 单个日志文件的最大大小
max-file: "3" # 最多保留的日志文件数量
ports:
- "8086:80" # 映射前端服务端口
volumes:
- ../dist:/usr/share/nginx/html
- ../nginx.conf:/etc/nginx/nginx.conf
networks:
- app-network
# 后端服务
backend:
image: daily-report-api:1.0.0 # 替换为你的后端 Docker 镜像
logging:
driver: "json-file" # 日志驱动,默认为 json-file
options:
max-size: "10m" # 单个日志文件的最大大小
max-file: "3" # 最多保留的日志文件数量
ports:
- "5000:5000" # 映射后端服务端口
environment:
- LOG_DIR=/app/logs
- DATA_DIR=/app/temp_data
# 挂载卷
volumes:
# # 挂载日志目录
- $PWD:/app
depends_on:
- libreoffice
networks:
- app-network
# LibreOffice 服务
libreoffice:
image: linuxserver/libreoffice:latest # 替换为你的 LibreOffice Docker 镜像
logging:
driver: "json-file" # 日志驱动,默认为 json-file
options:
max-size: "10m" # 单个日志文件的最大大小
max-file: "3" # 最多保留的日志文件数量
volumes:
- $PWD:/app
ports:
- "8100:8100" # 暴露 LibreOffice 服务端口
networks:
- app-network
# 相同网络
networks:
app-network:
driver: bridge

@ -0,0 +1,34 @@
from fastapi import FastAPI
from app.api.router import router
import logging
from app.logging_config import setup_logging
# 加载日志配置
setup_logging()
# 获取日志记录器
logger = logging.getLogger(__name__)
app = FastAPI(
title="Daily Report API",
description="三工单日报、简报的api",
version="1.0.0",
)
# 代理前端静态文件html等
from fastapi.staticfiles import StaticFiles
app.mount("/sgd/file", StaticFiles(directory="temp_downloads"), name="temp_downloads")
app.mount(
"/sgd/file", StaticFiles(directory="temp_downloads"), name="temp_download_raw"
)
# 使用路由
app.include_router(router)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)

Binary file not shown.
Loading…
Cancel
Save