From 064d6a0d7e77515554f60cb4ae634ca90b4e73a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=AE=B8=E6=A0=87?= Date: Fri, 1 Aug 2025 14:17:32 +0800 Subject: [PATCH] =?UTF-8?q?feat=20=F0=9F=90=9B:add=20report?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- {routes => api}/__init__.py | 0 api/sync_message.py | 11 ++ assets/config.json | 8 +- assets/database.db | Bin 0 -> 12288 bytes main.py | 34 +++++-- requirements.txt | 4 +- routes/sync_message.py | 23 ----- scheduler/__init__.py | 0 scheduler/scheduler_module.py | 183 ++++++++++++++++++++++++++++++++++ util/__init__.py | 0 {routes => util}/config.py | 12 --- util/database.py | 107 ++++++++++++++++++++ 12 files changed, 336 insertions(+), 46 deletions(-) rename {routes => api}/__init__.py (100%) create mode 100644 api/sync_message.py create mode 100644 assets/database.db delete mode 100644 routes/sync_message.py create mode 100644 scheduler/__init__.py create mode 100644 scheduler/scheduler_module.py create mode 100644 util/__init__.py rename {routes => util}/config.py (65%) create mode 100644 util/database.py diff --git a/routes/__init__.py b/api/__init__.py similarity index 100% rename from routes/__init__.py rename to api/__init__.py diff --git a/api/sync_message.py b/api/sync_message.py new file mode 100644 index 0000000..91bebb2 --- /dev/null +++ b/api/sync_message.py @@ -0,0 +1,11 @@ +from fastapi import APIRouter + +router = APIRouter() + +# 定义获取所有 pushmessage 的路由 +@router.get("/pushmessage/") +async def push_message(): + """ + 推送消息到elink + """ + return [{"item_id": "Foo", "name": "Foo Bar"}, {"item_id": "Baz", "name": "Baz Qux"}] \ No newline at end of file diff --git a/assets/config.json b/assets/config.json index c30d7b4..1a603b1 100644 --- a/assets/config.json +++ b/assets/config.json @@ -1,5 +1,9 @@ { "user_group": [ - "ceshi" - ] + "文件传输助手" + ], + "push_message_url":"http://192.168.0.216:17000/api/win/sendMixedFiles", + "query_report_url": "http://192.168.197.97:5001/wind-api/v1/windflood/report/queryReport", + "download_report_url": "http://127.0.0.1:5001/wind-api/v1/windflood/report/download" + "download_dir": "report" } \ No newline at end of file diff --git a/assets/database.db b/assets/database.db new file mode 100644 index 0000000000000000000000000000000000000000..13d4659f992da819bae14c9ae29535c5bf93f22b GIT binary patch literal 12288 zcmeI#O-sWt7{KwQippSgAh=T@M+K>fU%)wn45w)A!JSH_Q3~s(HUn?6t6$8o_lA@dy=DmKMHZ^^gFYLU{Gyf;qssD0y{hssex~}Is0s#aNKmY** t5I_I{1Q0*~0R%QxpyMt^{lBTt%e@dl009ILKmY**5I_I{1Q2KiegHqAL6iUh literal 0 HcmV?d00001 diff --git a/main.py b/main.py index e15906b..dd13e88 100644 --- a/main.py +++ b/main.py @@ -1,18 +1,36 @@ +from contextlib import asynccontextmanager import uvicorn from fastapi import FastAPI -from routes import sync_message, config +from api import sync_message +from util import config +from scheduler.scheduler_module import start_scheduler, shutdown_scheduler -app = FastAPI() -app.include_router(sync_message.router, prefix="/api/v1", tags=["SyncMessage"]) -app.include_router(config.router, prefix="/api/v1", tags=["config"]) +from util.database import init_db + -@app.get("/") -async def read_root(): +@asynccontextmanager +async def lifespan(app: FastAPI): """ - 根路径路由,欢迎信息。 + FastAPI 应用的生命周期管理器。 + 在应用启动时初始化数据库并启动调度器,在应用关闭时关闭调度器。 """ - return {"message": "Welcome to the FastAPI Application!"} + # 在应用启动时初始化数据库 + init_db() + + # 启动调度器 + start_scheduler() + + yield + + # 在应用关闭时关闭调度器 + shutdown_scheduler() + +app = FastAPI(lifespan=lifespan) + +app.include_router(sync_message.router, prefix="/api/v1", tags=["SyncMessage"]) +app.include_router(config.router, prefix="/api/v1", tags=["config"]) + if __name__ == "__main__": uvicorn.run("main:app", host="0.0.0.0", port=18000, reload=False) diff --git a/requirements.txt b/requirements.txt index 09be09c..d575cdc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,4 @@ fastapi==0.116.1 -uvicorn==0.35.0 \ No newline at end of file +uvicorn==0.35.0 +httpx==0.28.1 +apscheduler==3.11.0 \ No newline at end of file diff --git a/routes/sync_message.py b/routes/sync_message.py deleted file mode 100644 index 41083cb..0000000 --- a/routes/sync_message.py +++ /dev/null @@ -1,23 +0,0 @@ -from fastapi import APIRouter - -# 创建一个 APIRouter 实例。 -# APIRouter 类似于 FastAPI 实例,但它用于定义一组相关的路由。 -# 它可以被“包含”到主 FastAPI 应用中。 -router = APIRouter() - -# 定义获取所有 items 的路由 -@router.get("/items/") -async def read_items(): - """ - 获取所有 item 的列表。 - """ - return [{"item_id": "Foo", "name": "Foo Bar"}, {"item_id": "Baz", "name": "Baz Qux"}] - - -# 定义创建一个新 item 的 POST 路由 -@router.post("/items/") -async def create_item(): - """ - 创建一个新 item。 - """ - return {"status": "success", "message": "Item created successfully!"} \ No newline at end of file diff --git a/scheduler/__init__.py b/scheduler/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/scheduler/scheduler_module.py b/scheduler/scheduler_module.py new file mode 100644 index 0000000..7695453 --- /dev/null +++ b/scheduler/scheduler_module.py @@ -0,0 +1,183 @@ +import datetime +import json +import logging +import mimetypes +import os + +import httpx +from apscheduler.schedulers.asyncio import AsyncIOScheduler + +from util.config import DataLoader +from util.database import get_latest_update_time + +logger = logging.getLogger(__name__) # 获取当前模块的logger +logger.setLevel(logging.INFO) +if not logger.handlers: # 避免重复添加handler + console_handler = logging.StreamHandler() + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + console_handler.setFormatter(formatter) + logger.addHandler(console_handler) + +# 初始化调度器 +scheduler = AsyncIOScheduler() + +async def poll_external_api(): + """ + 定时任务:每分钟使用 httpx 调用一次外部接口。 + """ + current_file = os.path.abspath(__file__) + current_dir = os.path.dirname(current_file) + project_root = os.path.dirname(current_dir) + assets_dir = os.path.join(project_root, "assets") + config_path = os.path.join(assets_dir, "config.json") + config_data = DataLoader.load_json_data(config_path) + push_message_url = config_data["push_message_url"] + query_report_url = config_data["query_report_url"] + download_dir = config_data["download_dir"] + download_dir_all = os.path.join(project_root, download_dir) + download_report_url = config_data["download_report_url"] + current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + logger.info(f"\n--- [{current_time}] 定时任务开始调用外部消息推送接口: {push_message_url} ---") + try: + max_time = get_latest_update_time() + params = { + "start_time": "2025-07-24 14:36:00", + "end_time": "2025-07-25 09:11:01" + } + report_data = request_report(query_report_url,params) + if report_data is None: + logger.info("没有报告数据。") + return + reports_list = [] + if report_data['code'] == 0: + # 按照嵌套结构逐层解析,获得最终的数据列表 + reports_list = report_data['data']['data'] + else: + logger.info("接口返回错误,请检查响应。") + return + for report in reports_list: + file_id = report["file_id"] + download_dir_file_id = download_dir_all+"/"+file_id + file_path = download_file(download_report_url,download_dir_file_id) + async with httpx.AsyncClient() as client: + # 1,获取需要推送的文档ID 2,根据ID获取文档,3,文档写成文件,4,发送文件,5,删除文件 + files = [] + if not os.path.exists(file_path): + logger.info(f"警告: 文件不存在,跳过: {file_path}") + continue + files.append(("files", open(file_path, "rb"))) + if not files: + logger.info("没有有效文件可发送,请求终止。") + return + # data 参数用于表单字段,files 参数用于文件 + for name in config_data["user_group"]: + data = { + "message": "文档", + "contactName": name + } + async with httpx.AsyncClient() as client: + response = await client.post(push_message_url, data=data, files=files, timeout=60.0) + # 打印响应信息 + logger.info(f"状态码: {response.status_code}") + logger.info(f"响应文本: {response.text}") + response.raise_for_status() + logger.info("文件发送成功!") + # 都发送成功,则删除文件,回填到数据库 + except httpx.HTTPStatusError as e: + logger.info(f"调用外部接口 {push_message_url} 时发生 HTTP 状态码错误: {e.response.status_code} - {e.response.text}") + except httpx.RequestError as e: + logger.info(f"调用外部接口 {push_message_url} 时发生请求错误: {e}") + except Exception as e: + logger.info(f"调用外部接口 {push_message_url} 时发生未知错误: {e}") + logger.info(f"--- 定时任务外部接口调用结束 ---") + + +def request_report(base_url,params): + print(f"正在调用接口: {base_url}") + print(f"请求参数: {params}") + try: + # 使用 httpx.get 发送 GET 请求 + headers = { + "Accept": "application/json", + "Content-Type": "application/json" + } + response = httpx.get(base_url, params=params, timeout=60.0,headers=headers,follow_redirects=True) + response.raise_for_status() + # 检查响应的 Content-Type,确保它是 JSON + data = response.json() + print("\n请求成功,返回数据:") + print(json.dumps(data, indent=4, ensure_ascii=False)) + return data + except httpx.RequestError as exc: + print(f"\n请求失败,发生错误: {exc}") + return None + except httpx.HTTPStatusError as exc: + print(f"\n请求失败,HTTP 错误状态码: {exc.response.status_code}") + print(f"响应内容: {exc.response.text}") + return None + +def download_file(url, download_dir_all): + """ + 使用GET请求从URL下载文件并保存到指定目录。 + """ + # 确保保存目录存在,如果不存在则创建 + if not os.path.exists(download_dir_all): + os.makedirs(download_dir_all) + print(f"开始从 {url} 下载文件...") + try: + # 使用流式方式(stream=True)下载 + filename = "" + with httpx.stream("GET", url, follow_redirects=True) as response: + # 检查响应状态码,如果不是2xx则抛出异常 + response.raise_for_status() + # --- 核心逻辑:从响应头获取文件名 --- + content_disposition = response.headers.get("Content-Disposition") + if content_disposition: + # 寻找文件名参数 + # 例如: 'attachment; filename="report.pdf"' + parts = content_disposition.split(';') + for part in parts: + if 'filename=' in part: + # 提取文件名并去除多余的引号 + filename = part.split('filename=')[1].strip().strip('"') + break + + # 如果文件名没有扩展名,根据MIME类型添加一个 + if '.' not in filename: + content_type = response.headers.get("Content-Type") + if content_type: + ext = mimetypes.guess_extension(content_type) + if ext: + filename += ext + + # 构造完整的文件路径 + file_path = os.path.join(download_dir_all, filename) + print(f"文件名从响应头中获取为: {filename}") + # 将文件内容写入本地文件 + with open(file_path, "wb") as f: + for chunk in response.iter_bytes(): + f.write(chunk) + print(f"文件成功下载并保存到: {file_path}") + return file_path + except httpx.HTTPStatusError as e: + print(f"下载文件时发生HTTP错误: {e}") + except httpx.RequestError as e: + print(f"下载文件时发生请求错误: {e}") + +def start_scheduler(): + """ + 启动调度器并添加所有定时任务。 + """ + logger.info("调度器启动中...") + # 添加定时任务:每隔1分钟执行一次 poll_push_message_endpoint 函数 + scheduler.add_job(poll_external_api, 'interval', minutes=1) + scheduler.start() + logger.info("调度器已启动。") + +def shutdown_scheduler(): + """ + 关闭调度器。 + """ + logger.info("调度器关闭中...") + scheduler.shutdown() + logger.info("调度器已关闭。") \ No newline at end of file diff --git a/util/__init__.py b/util/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/routes/config.py b/util/config.py similarity index 65% rename from routes/config.py rename to util/config.py index 7e3dab6..de535b3 100644 --- a/routes/config.py +++ b/util/config.py @@ -24,15 +24,3 @@ class DataLoader: raise HTTPException(status_code=500, detail="Invalid JSON") return DataLoader._data - -# 定义一个路由来返回 JSON 数据 -@router.get("/get_data") -async def get_data( -) : - current_file = os.path.abspath(__file__) - current_dir = os.path.dirname(current_file) - project_root = os.path.dirname(current_dir) - assets_dir = os.path.join(project_root, "assets") - config_path = os.path.join(assets_dir, "config.json") - data = DataLoader.load_json_data(config_path) - return data diff --git a/util/database.py b/util/database.py new file mode 100644 index 0000000..50e459c --- /dev/null +++ b/util/database.py @@ -0,0 +1,107 @@ +import sqlite3 +import os +import datetime + +# 定义数据库文件的名称 +DATABASE_FILENAME = "database.db" + +# 获取当前文件 (database.py) 的绝对路径 +current_file_path = os.path.abspath(__file__) +# 获取当前文件所在的目录 +current_dir_path = os.path.dirname(current_file_path) +# 获取项目根目录 (假设 database.py 在某个子目录中) +project_root = os.path.dirname(current_dir_path) + +# 构建 assets 目录的路径 +assets_dir = os.path.join(project_root, "assets") + +# 构建数据库文件的完整路径 +DB_PATH = os.path.join(assets_dir, DATABASE_FILENAME) + +def init_db(): + """ + 初始化 SQLite 数据库和 messages 表。 + 如果表已存在,则不会重复创建。 + """ + if not os.path.exists(assets_dir): + os.makedirs(assets_dir) + + db_exists = os.path.exists(DB_PATH) + + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + + cursor.execute(""" + CREATE TABLE IF NOT EXISTS messages ( + id TEXT , + file_id TEXT, + update_time TEXT, + sent INTEGER + ) + """) + conn.commit() + conn.close() + + if not db_exists: + print(f"数据库文件 '{DB_PATH}' 已创建并初始化。") + else: + print(f"数据库文件 '{DB_PATH}' 已存在,跳过创建。") + + +def get_latest_update_time(): + """ + 查询 messages 表中最大的 update_time。 + 如果没有数据,则返回当前的系统时间。 + 返回格式: 'YYYY-MM-DD HH:MM:SS' + """ + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + + # 查询最大的 update_time + cursor.execute("SELECT MAX(update_time) FROM messages") + result = cursor.fetchone()[0] + + conn.close() + + if result: + return result + else: + # 如果没有数据,返回当前时间 + return datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + +def insert_many_messages(messages_to_insert): + """ + 批量插入数据到 messages 表。 + messages_to_insert 是一个包含 (id, update_time, sent) 元组的列表。 + 例如: [('id1', '2023-10-27 10:00:00', 0), ('id2', '2023-10-27 10:05:00', 0)] + """ + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + + # 使用 executemany 进行批量插入,性能更好 + cursor.executemany("INSERT OR IGNORE INTO messages VALUES (?, ?, ?,?)", messages_to_insert) + + conn.commit() + conn.close() + print(f"已批量插入 {len(messages_to_insert)} 条数据到数据库。") + + +def update_sent_status(ids_to_update): + """ + 根据给定的 ID 列表,将 sent 字段更新为 1 (已发送)。 + ids_to_update 是一个包含 ID 的列表或元组。 + 例如: ['id1', 'id2', 'id3'] + """ + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + + # 构建 SQL 语句,使用 IN 子句批量更新 + placeholders = ', '.join('?' for _ in ids_to_update) + sql_query = f"UPDATE messages SET sent = 1 WHERE id IN ({placeholders})" + + cursor.execute(sql_query, ids_to_update) + + conn.commit() + conn.close() + print(f"已更新 {cursor.rowcount} 条数据的 sent 状态。")