You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
report_app/app/api/router.py

348 lines
12 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

from datetime import datetime
from fastapi import File, UploadFile, APIRouter, BackgroundTasks, HTTPException
from fastapi.responses import JSONResponse, StreamingResponse, FileResponse
from pathlib import Path
from typing import List
from time import sleep
import time
import os
import shutil
import zipfile
import logging
from app.tools.doc2docx import doc2docx
from app.tools.final_doc import deal_docx
from app.tools.docx2html import docx2html
from app.tools.get_final_name import get_final_name
from app.tools.clean_file_names import clean_file_names
from app.tools.doc2mysql import (
save_word_document,
get_file_path,
get_weekly_file,
save_raw_files,
get_raw_file,
)
from app.tools.move_raw_files import move_raw_files
# 获取日志记录器
logger = logging.getLogger(__name__)
router = APIRouter()
# 文件保存目录
UPLOAD_DIR = "temp_uploads"
# 下载文件的目录
DOWNLOAD_DIR = "temp_downloads"
# 原始数据下载文件夹
DOWNLOAD_RAW_DIR = "temp_download_raw"
# word上传格式要求
ALLOWED_EXTENSIONS_DOC = {
"application/msword",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document",
}
# excel上传格式要求
ALLOWED_EXTENSIONS_EXCEL = {
"application/vnd.ms-excel",
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
}
# 日报的原始字典
data_dict = {}
# 总上传接口
@router.post(
"/upload/",
summary="可上传所有文件",
description="完成文件上传如果文件doc格式则转换成docx",
)
async def upload_file(files: List[UploadFile] = File(...)):
try:
if not os.path.exists(UPLOAD_DIR):
os.makedirs(UPLOAD_DIR)
# 如果有文件,则清空
if len(os.listdir(UPLOAD_DIR)) > 0:
for file in os.listdir(UPLOAD_DIR):
os.remove(os.path.join(UPLOAD_DIR, file))
logger.info(f"删除旧日报{file}")
# 保存到本地
for file in files:
logger.info(f"上传的文件有")
# 对文件名进行数据清洗
cleaned_filename = clean_file_names(file.filename)
logger.info(f"清洗后的文件名:{cleaned_filename}")
file_path = os.path.join(UPLOAD_DIR, cleaned_filename)
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# ---------------------------------------测试版本从doc转docx代码---------------------------------------
# 如果上传为doc需要转成docx
"""
if file_path.endswith(".doc"):
doc2docx(file_path)
logger.info(f"文件{file.filename}格式转换为docx成功")
"""
# ---------------------------------------测试版本从doc转docx代码---------------------------------------
# ---------------------------------------线上版本从doc转docx代码---------------------------------------
# 需要先全部写入等待后台shell脚本转完格式再遍历文件夹
final_files = os.listdir(UPLOAD_DIR)
for i, file_saved in enumerate(final_files):
if file_saved.endswith(".doc"):
file_doc = file_saved
# 新版本轮询新增判断条件,如果满足,也跳出循环,避免页面卡死
while True:
# 开始循环时间
start_time = time.time()
if not file_doc in os.listdir(UPLOAD_DIR):
break
# 如果超过20秒不仅跳出循环还需要清空文件夹
elif time.time() - start_time > 30:
# 清空文件夹
for file in os.listdir(UPLOAD_DIR):
os.remove(os.path.join(UPLOAD_DIR, file))
logger.info(f"删除旧文件,方便用户重新上传{file}")
break
else:
sleep(2)
return JSONResponse(
content={"status_code": 200, "detail": "文件上传并成功处理数据。"}
)
# ---------------------------------------线上版本从doc转docx代码---------------------------------------
# 保存文件到本地
except Exception as e:
logger.exception(f"文件上传失败:{e}")
return JSONResponse(content={"status_code": 500, "detail": f"文件上传失败{e}"})
@router.get(
"/generate_report/",
summary="生成日报",
description="生成日报,将生成的简报和日报文档转成html返回前端",
)
async def generate_report(background_tasks: BackgroundTasks, time_type: int = 0):
global data_dict
try:
logger.info("开始生成日报")
# 下载文件的文件夹是否存在
if not os.path.exists(DOWNLOAD_DIR):
os.makedirs(DOWNLOAD_DIR)
# 存储文件的路径
fold_path = str(Path(UPLOAD_DIR).resolve()).replace("\\", "/")
data_dict = deal_docx(fold_path, DOWNLOAD_DIR,time_type=time_type)
# 判断是否生成日报成功如果成功则转成html返回前端
report_sim_html = docx2html(data_dict["daily_repo_simple"])
report_html = docx2html(data_dict["daily_report"])
logger.info("日程生成html成功")
# 将数据写入数据库
save_word_document(data_dict)
# 返回 JSON 包含 HTML 内容
return JSONResponse(
content={
"status_code": 200,
"detail": "日报生成成功",
"report_simple": report_sim_html,
"report": report_html,
}
)
except Exception as e:
logger.exception(f"日报生成失败:{e}")
return JSONResponse(
content={
"status_code": 500,
"detail": f"日报生成失败:请确认上传文件是否同一天或者文件格式是否发生改变",
}
)
# 将原始数据保存到数据库
finally:
try:
if os.listdir(UPLOAD_DIR):
raw_data_path = move_raw_files(
UPLOAD_DIR, DOWNLOAD_RAW_DIR, data_dict["save_folder"]
)
raw_data_dict = {
"report_title": data_dict["report_title"],
"raw_data_path": raw_data_path,
"statistics_time": data_dict["statistics_time"],
"save_folder": data_dict["save_folder"],
"created_at": datetime.now(),
"updated_at": datetime.now(),
}
save_raw_files(raw_data_dict)
print("原始文件存入mysql成功")
except Exception as e:
logger.exception(f"原始文件存入mysql失败{e}")
# 通过时间下载文件接口
@router.get(
"/download/",
summary="下载用户上传分析后的日报",
description="下载用户上传分析的当前日报",
)
async def download_file():
# 最终下载的日报名称
zip_name = "日报.zip"
# 是否有之前的文件
file_zip = os.path.join(f"{DOWNLOAD_DIR}/{data_dict['save_folder']}", zip_name)
# 有旧文件就删除
if os.path.exists(file_zip):
os.remove(file_zip)
try:
logger.info("开始下载文件")
file_info = get_file_path(data_dict["statistics_time"])
if not file_info:
logger.info("查询需下载的记录失败")
return None
logger.info("查询需下载的记录成功")
# 创建 ZIP 文件
with zipfile.ZipFile(file_zip, "w") as zipf:
logger.info("进入文件压缩阶段")
zipf.write(file_info.daily_repo_simple)
zipf.write(file_info.daily_report)
# zipf.write(file_info.daily_repo_simple_excel)
if os.path.exists(file_zip):
logger.info("文件下载成功")
# 返回 ZIP 文件
return FileResponse(
file_zip,
filename=zip_name,
media_type="application/zip",
)
else:
logger.info("压缩文件失败")
return JSONResponse(content={"status_code": 404, "detail": "文件不存在"})
except Exception as e:
logger.exception(f"下载文件失败:{e}")
return JSONResponse(content={"status_code": 500, "detail": "文件下载出错"})
@router.get(
"/files_path/",
summary="查询每周的日报文件全路径",
description="查询周报在磁盘的全路径",
)
async def download_weekly_file(start_time: datetime, end_time: datetime):
try:
logger.info("开始查询周报路径")
file_info = get_weekly_file(start_time, end_time)
if not file_info:
logger.info("查询周报路径失败")
return None
logger.info("查询周报路径成功")
file_info = [file_single.to_dict() for file_single in file_info]
# for file in file_info:
# file_info1 = file.daily_report
# print(file_info)
return JSONResponse(content={"status_code": 200, "detail": file_info})
except Exception as e:
logger.exception(f"查询周报路径失败:{e}")
return JSONResponse(content={"status_code": 500, "detail": "查询周报路径出错"})
@router.get(
"/raw_files_path/",
summary="查询原始文件全路径",
description="查询原始文件在磁盘的全路径",
)
async def download_raw_file(start_time: datetime, end_time: datetime):
try:
logger.info("开始查询原始文件路径")
file_info = get_raw_file(start_time, end_time)
if not file_info:
logger.info("无该时间段原始文件路径")
return None
logger.info("查询原始文件路径成功")
file_info = [file_single.to_dict() for file_single in file_info]
# for file in file_info:
# file_info1 = file.daily_report
# print(file_info)
return JSONResponse(content={"status_code": 200, "detail": file_info})
except Exception as e:
logger.exception(f"查询原始文件路径失败:{e}")
return JSONResponse(content={"status_code": 500, "detail": "查询原始文件出错"})
@router.get(
"/download/weekly_file/", summary="下载每周日报文件", description="下载每周日报文件"
)
async def download_files(file: str):
try:
if os.path.exists(file):
if file.endswith(".docx"):
# 单个word文件直接返回
return FileResponse(
file,
filename=file.split("/")[-1], # 下载时显示的文件名
media_type="application/vnd.openxmlformats-officedocument.wordprocessingml.document",
)
elif file.endswith(".xlsx"):
# 单个excel文件直接返回
return FileResponse(
file,
filename=file.split("/")[-1], # 下载时显示的文件名
media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
)
elif file.endswith(".xls"):
# 单个excel文件直接返回
return FileResponse(
file,
filename=file.split("/")[-1], # 下载时显示的文件名
media_type="application/vnd.ms-excel",
)
else:
return JSONResponse(content={"status_code": 404, "detail": "文件不存在"})
except Exception as e:
logger.exception(f"逐个下载日报出错:{e}")
return JSONResponse(content={"status_code": 500, "detail": "逐个下载日报出错"})