From ed4b4440ab69b991731494b02d8838a56d9c6934 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E8=AE=B8=E6=A0=87?= Date: Fri, 1 Aug 2025 17:15:42 +0800 Subject: [PATCH] =?UTF-8?q?feat=20=F0=9F=90=9B:add=20db?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- assets/config.json | 4 +- scheduler/scheduler_module.py | 121 +++++++++++++++++----------------- test/test_db.py | 8 +++ util/database.py | 25 +++++-- 4 files changed, 91 insertions(+), 67 deletions(-) create mode 100644 test/test_db.py diff --git a/assets/config.json b/assets/config.json index 1a603b1..9392eef 100644 --- a/assets/config.json +++ b/assets/config.json @@ -3,7 +3,7 @@ "文件传输助手" ], "push_message_url":"http://192.168.0.216:17000/api/win/sendMixedFiles", - "query_report_url": "http://192.168.197.97:5001/wind-api/v1/windflood/report/queryReport", - "download_report_url": "http://127.0.0.1:5001/wind-api/v1/windflood/report/download" + "query_report_url": "http://192.168.0.216:5001/wind-api/v1/windflood/report/queryReport", + "download_report_url": "http://192.168.0.216:5001/wind-api/v1/windflood/report/download", "download_dir": "report" } \ No newline at end of file diff --git a/scheduler/scheduler_module.py b/scheduler/scheduler_module.py index b199402..2bda3b6 100644 --- a/scheduler/scheduler_module.py +++ b/scheduler/scheduler_module.py @@ -1,3 +1,4 @@ +import asyncio import datetime import json import logging @@ -8,7 +9,7 @@ import httpx from apscheduler.schedulers.asyncio import AsyncIOScheduler from util.config import DataLoader -from util.database import get_latest_update_time, insert_many_messages, update_sent_status +from util.database import get_latest_update_time, insert_many_messages, update_sent_status, query_by_sent logger = logging.getLogger(__name__) # 获取当前模块的logger logger.setLevel(logging.INFO) @@ -38,32 +39,53 @@ async def poll_external_api(): download_report_url = config_data["download_report_url"] current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") logger.info(f"\n--- [{current_time}] 定时任务开始调用外部消息推送接口: {push_message_url} ---") - try: - max_time = get_latest_update_time() - params = { - "start_time": "2025-07-24 14:36:00", - "end_time": "2025-07-25 09:11:01" - } - report_data = request_report(query_report_url,params) - if report_data is None: - logger.info("没有报告数据。") - return - reports_list = [] - if report_data['code'] == 0: - # 按照嵌套结构逐层解析,获得最终的数据列表 - reports_list = report_data['data']['data'] - else: - logger.info("接口返回错误,请检查响应。") - return - for report in reports_list: - id = report["id"] - file_id = report["file_id"] - download_dir_file_id = download_dir_all+"/"+file_id - file_path = download_file(download_report_url,download_dir_file_id) - if file_path is not None: - messages_to_insert =[(id,file_id, current_time, 0)] - error_message = insert_many_messages(messages_to_insert) - if error_message is None: + """ + report_list_history= query_by_sent(1) + for report_path in report_list_history: + is_update_sent = True + try: + os.remove(report_path[4]) + except Exception as e: + logger.info(f"删除失败,下一次轮询删除") + is_update_sent= False + if is_update_sent: + update_sent_status(report_path[0],2) + """ + max_time = get_latest_update_time() + """ + params = { + "start_time": "2025-07-25 09:09:01", + "end_time": "2025-07-25 09:11:01" + } + """ + params = { + "start_time": current_time, + "end_time": max_time + } + report_data = request_report(query_report_url,params) + if report_data is None: + logger.info("没有报告数据。") + return + reports_list = [] + if report_data['code'] == 0: + # 按照嵌套结构逐层解析,获得最终的数据列表 + reports_list = report_data['data']['data'] + else: + logger.info("接口返回错误,请检查响应。") + return + for report in reports_list: + id = report["id"] + file_id = report["file_id"] + title = report["title"] + download_dir_file_id = download_dir_all+"/"+file_id + file_path = download_file(download_report_url,download_dir_file_id,title,file_id) + if file_path is not None: + messages_to_insert = [] + for name in config_data["user_group"]: + messages_to_insert.append((id,file_id, current_time, 0,file_path,name)) + error_message = insert_many_messages(messages_to_insert) + if error_message is None: + try: async with httpx.AsyncClient() as client: # 1,获取需要推送的文档ID 2,根据ID获取文档,3,文档写成文件,4,发送文件,5,删除文件 files = [] @@ -87,15 +109,13 @@ async def poll_external_api(): logger.info(f"响应文本: {response.text}") response.raise_for_status() logger.info("文件发送成功!") - # 都发送成功,则删除文件,回填到数据库 - update_sent_status(id) - os.remove(file_path) - except httpx.HTTPStatusError as e: - logger.info(f"调用外部接口 {push_message_url} 时发生 HTTP 状态码错误: {e.response.status_code} - {e.response.text}") - except httpx.RequestError as e: - logger.info(f"调用外部接口 {push_message_url} 时发生请求错误: {e}") - except Exception as e: - logger.info(f"调用外部接口 {push_message_url} 时发生未知错误: {e}") + update_sent_status(id,name,1) + except httpx.HTTPStatusError as e: + logger.info(f"调用外部接口 {push_message_url} 时发生 HTTP 状态码错误: {e.response.status_code} - {e.response.text}") + except httpx.RequestError as e: + logger.info(f"调用外部接口 {push_message_url} 时发生请求错误: {e}") + except Exception as e: + logger.info(f"调用外部接口 {push_message_url} 时发生未知错误: {e}") logger.info(f"--- 定时任务外部接口调用结束 ---") @@ -123,7 +143,7 @@ def request_report(base_url,params): print(f"响应内容: {exc.response.text}") return None -def download_file(url, download_dir_all): +def download_file(url, download_dir_all,filename,file_id): """ 使用GET请求从URL下载文件并保存到指定目录。 """ @@ -131,34 +151,14 @@ def download_file(url, download_dir_all): if not os.path.exists(download_dir_all): os.makedirs(download_dir_all) print(f"开始从 {url} 下载文件...") - filename = None + params = {"id": file_id} try: # 使用流式方式(stream=True)下载 - with httpx.stream("GET", url, follow_redirects=True) as response: + with httpx.stream("GET", url, params=params, follow_redirects=True) as response: # 检查响应状态码,如果不是2xx则抛出异常 response.raise_for_status() - # --- 核心逻辑:从响应头获取文件名 --- - content_disposition = response.headers.get("Content-Disposition") - if content_disposition: - # 寻找文件名参数 - # 例如: 'attachment; filename="report.pdf"' - parts = content_disposition.split(';') - for part in parts: - if 'filename=' in part: - # 提取文件名并去除多余的引号 - filename = part.split('filename=')[1].strip().strip('"') - break - - # 如果文件名没有扩展名,根据MIME类型添加一个 - if '.' not in filename: - content_type = response.headers.get("Content-Type") - if content_type: - ext = mimetypes.guess_extension(content_type) - if ext: - filename += ext - # 构造完整的文件路径 - file_path = os.path.join(download_dir_all, filename) + file_path = os.path.join(download_dir_all, filename+".docx") print(f"文件名从响应头中获取为: {filename}") # 将文件内容写入本地文件 with open(file_path, "wb") as f: @@ -171,6 +171,7 @@ def download_file(url, download_dir_all): except httpx.RequestError as e: print(f"下载文件时发生请求错误: {e}") + def start_scheduler(): """ 启动调度器并添加所有定时任务。 diff --git a/test/test_db.py b/test/test_db.py new file mode 100644 index 0000000..0ae1ce6 --- /dev/null +++ b/test/test_db.py @@ -0,0 +1,8 @@ +import unittest + +from util.database import update_sent_status + + +class TestUpdataDb(unittest.TestCase): + def test_updata_db(self): + update_sent_status(350) diff --git a/util/database.py b/util/database.py index 021318b..b41f4c4 100644 --- a/util/database.py +++ b/util/database.py @@ -36,7 +36,9 @@ def init_db(): id TEXT , file_id TEXT, update_time TEXT, - sent INTEGER + sent INTEGER, + file_path TEXT, + contact_name TEXT ) """) conn.commit() @@ -81,7 +83,7 @@ def insert_many_messages(messages_to_insert): cursor = conn.cursor() # 使用 executemany 进行批量插入,性能更好 - cursor.executemany("INSERT OR IGNORE INTO messages VALUES (?, ?, ?,?)", messages_to_insert) + cursor.executemany("INSERT OR IGNORE INTO messages VALUES (?, ?, ?,?,?,?)", messages_to_insert) conn.commit() conn.close() @@ -91,7 +93,7 @@ def insert_many_messages(messages_to_insert): return None -def update_sent_status(id): +def update_sent_status(id,contact_name,sent): """ 根据给定的 ID 列表,将 sent 字段更新为 1 (已发送)。 ids_to_update 是一个包含 ID 的列表或元组。 @@ -101,10 +103,23 @@ def update_sent_status(id): cursor = conn.cursor() # 构建 SQL 语句,使用 IN 子句批量更新 - sql_query = f"UPDATE messages SET sent = 1 WHERE id = ?" + sql_query = "UPDATE messages SET sent = ? WHERE id = ? and contact_name = ?" - cursor.execute(sql_query, id) + cursor.execute(sql_query, (sent, id,contact_name)) conn.commit() conn.close() print(f"已更新 {cursor.rowcount} 条数据的 sent 状态。") + +def query_by_sent(sent): + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() + + # 查询所有匹配给定 ID 的记录 + cursor.execute("SELECT id, file_id, update_time, sent,file_path,contact_name FROM messages WHERE sent = ?", (sent,)) + results = cursor.fetchall() + + conn.close() + + # 返回查询结果列表 + return results