diff --git a/scheduler/scheduler_module.py b/scheduler/scheduler_module.py index 7695453..b199402 100644 --- a/scheduler/scheduler_module.py +++ b/scheduler/scheduler_module.py @@ -8,7 +8,7 @@ import httpx from apscheduler.schedulers.asyncio import AsyncIOScheduler from util.config import DataLoader -from util.database import get_latest_update_time +from util.database import get_latest_update_time, insert_many_messages, update_sent_status logger = logging.getLogger(__name__) # 获取当前模块的logger logger.setLevel(logging.INFO) @@ -56,33 +56,40 @@ async def poll_external_api(): logger.info("接口返回错误,请检查响应。") return for report in reports_list: + id = report["id"] file_id = report["file_id"] download_dir_file_id = download_dir_all+"/"+file_id file_path = download_file(download_report_url,download_dir_file_id) - async with httpx.AsyncClient() as client: - # 1,获取需要推送的文档ID 2,根据ID获取文档,3,文档写成文件,4,发送文件,5,删除文件 - files = [] - if not os.path.exists(file_path): - logger.info(f"警告: 文件不存在,跳过: {file_path}") - continue - files.append(("files", open(file_path, "rb"))) - if not files: - logger.info("没有有效文件可发送,请求终止。") - return - # data 参数用于表单字段,files 参数用于文件 - for name in config_data["user_group"]: - data = { - "message": "文档", - "contactName": name - } + if file_path is not None: + messages_to_insert =[(id,file_id, current_time, 0)] + error_message = insert_many_messages(messages_to_insert) + if error_message is None: async with httpx.AsyncClient() as client: - response = await client.post(push_message_url, data=data, files=files, timeout=60.0) - # 打印响应信息 - logger.info(f"状态码: {response.status_code}") - logger.info(f"响应文本: {response.text}") - response.raise_for_status() - logger.info("文件发送成功!") - # 都发送成功,则删除文件,回填到数据库 + # 1,获取需要推送的文档ID 2,根据ID获取文档,3,文档写成文件,4,发送文件,5,删除文件 + files = [] + if not os.path.exists(file_path): + logger.info(f"警告: 文件不存在,跳过: {file_path}") + continue + files.append(("files", open(file_path, "rb"))) + if not files: + logger.info("没有有效文件可发送,请求终止。") + return + # data 参数用于表单字段,files 参数用于文件 + for name in config_data["user_group"]: + data = { + "message": "文档", + "contactName": name + } + async with httpx.AsyncClient() as client: + response = await client.post(push_message_url, data=data, files=files, timeout=60.0) + # 打印响应信息 + logger.info(f"状态码: {response.status_code}") + logger.info(f"响应文本: {response.text}") + response.raise_for_status() + logger.info("文件发送成功!") + # 都发送成功,则删除文件,回填到数据库 + update_sent_status(id) + os.remove(file_path) except httpx.HTTPStatusError as e: logger.info(f"调用外部接口 {push_message_url} 时发生 HTTP 状态码错误: {e.response.status_code} - {e.response.text}") except httpx.RequestError as e: @@ -124,9 +131,9 @@ def download_file(url, download_dir_all): if not os.path.exists(download_dir_all): os.makedirs(download_dir_all) print(f"开始从 {url} 下载文件...") + filename = None try: # 使用流式方式(stream=True)下载 - filename = "" with httpx.stream("GET", url, follow_redirects=True) as response: # 检查响应状态码,如果不是2xx则抛出异常 response.raise_for_status() diff --git a/util/database.py b/util/database.py index 50e459c..021318b 100644 --- a/util/database.py +++ b/util/database.py @@ -76,18 +76,22 @@ def insert_many_messages(messages_to_insert): messages_to_insert 是一个包含 (id, update_time, sent) 元组的列表。 例如: [('id1', '2023-10-27 10:00:00', 0), ('id2', '2023-10-27 10:05:00', 0)] """ - conn = sqlite3.connect(DB_PATH) - cursor = conn.cursor() + try: + conn = sqlite3.connect(DB_PATH) + cursor = conn.cursor() - # 使用 executemany 进行批量插入,性能更好 - cursor.executemany("INSERT OR IGNORE INTO messages VALUES (?, ?, ?,?)", messages_to_insert) + # 使用 executemany 进行批量插入,性能更好 + cursor.executemany("INSERT OR IGNORE INTO messages VALUES (?, ?, ?,?)", messages_to_insert) - conn.commit() - conn.close() + conn.commit() + conn.close() + except Exception as e: + return e print(f"已批量插入 {len(messages_to_insert)} 条数据到数据库。") + return None -def update_sent_status(ids_to_update): +def update_sent_status(id): """ 根据给定的 ID 列表,将 sent 字段更新为 1 (已发送)。 ids_to_update 是一个包含 ID 的列表或元组。 @@ -97,10 +101,9 @@ def update_sent_status(ids_to_update): cursor = conn.cursor() # 构建 SQL 语句,使用 IN 子句批量更新 - placeholders = ', '.join('?' for _ in ids_to_update) - sql_query = f"UPDATE messages SET sent = 1 WHERE id IN ({placeholders})" + sql_query = f"UPDATE messages SET sent = 1 WHERE id = ?" - cursor.execute(sql_query, ids_to_update) + cursor.execute(sql_query, id) conn.commit() conn.close()