#!/usr/bin/env python # -*- encoding: utf-8 -*- import asyncio import datetime import json import logging import mimetypes import os import httpx from apscheduler.schedulers.asyncio import AsyncIOScheduler from util.config import DataLoader from util.database import get_latest_update_time, insert_many_messages, update_sent_status, query_by_sent logger = logging.getLogger(__name__) # 获取当前模块的logger logger.setLevel(logging.INFO) if not logger.handlers: # 避免重复添加handler console_handler = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') console_handler.setFormatter(formatter) logger.addHandler(console_handler) # 初始化调度器 scheduler = AsyncIOScheduler() async def poll_external_api(): """ 定时任务:每分钟使用 httpx 调用一次外部接口。 """ current_file = os.path.abspath(__file__) current_dir = os.path.dirname(current_file) project_root = os.path.dirname(current_dir) assets_dir = os.path.join(project_root, "assets") config_path = os.path.join(assets_dir, "config.json") config_data = DataLoader.load_json_data(config_path) push_message_url = config_data["push_message_url"] query_report_url = config_data["query_report_url"] download_dir = config_data["download_dir"] download_dir_all = os.path.join(project_root, download_dir) download_report_url = config_data["download_report_url"] current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") logger.info(f"\n--- [{current_time}] 定时任务开始调用外部消息推送接口: {push_message_url} ---") """ report_list_history= query_by_sent(1) for report_path in report_list_history: is_update_sent = True try: os.remove(report_path[4]) except Exception as e: logger.info(f"删除失败,下一次轮询删除") is_update_sent= False if is_update_sent: update_sent_status(report_path[0],2) """ max_time = get_latest_update_time() """ params = { "start_time": "2025-07-25 09:09:01", "end_time": "2025-07-25 09:11:01" } """ params = { "start_time": max_time, "end_time": current_time } report_data = request_report(query_report_url,params) if report_data is None: logger.info("没有报告数据。") return reports_list = [] if report_data['code'] == 0: # 按照嵌套结构逐层解析,获得最终的数据列表 reports_list = report_data['data']['data'] else: logger.info("接口返回错误,请检查响应。") return for report in reports_list: id = report["id"] file_id = report["file_id"] title = report["title"] download_dir_file_id = download_dir_all+"/"+file_id file_path = download_file(download_report_url,download_dir_file_id,title,file_id) if file_path is not None: messages_to_insert = [] for name in config_data["user_group"]: messages_to_insert.append((id,file_id, current_time, 0,file_path,name)) error_message = insert_many_messages(messages_to_insert) if error_message is None: try: async with httpx.AsyncClient() as client: # 1,获取需要推送的文档ID 2,根据ID获取文档,3,文档写成文件,4,发送文件,5,删除文件 files = [] if not os.path.exists(file_path): logger.info(f"警告: 文件不存在,跳过: {file_path}") continue files.append(("files", open(file_path, "rb"))) if not files: logger.info("没有有效文件可发送,请求终止。") return # data 参数用于表单字段,files 参数用于文件 for name in config_data["user_group"]: data = { "message": "文档", "contactName": name } async with httpx.AsyncClient() as client: response = await client.post(push_message_url, data=data, files=files, timeout=60.0) # 打印响应信息 logger.info(f"状态码: {response.status_code}") logger.info(f"响应文本: {response.text}") response.raise_for_status() logger.info(f"文件发送成功!{name}") update_sent_status(id,name,1) await asyncio.sleep(15) except httpx.HTTPStatusError as e: logger.error(f"调用外部接口 {push_message_url} 时发生 HTTP 状态码错误: {e.response.status_code} - {e.response.text}") except httpx.RequestError as e: logger.error(f"调用外部接口 {push_message_url} 时发生请求错误: {e}") except Exception as e: logger.error(f"调用外部接口 {push_message_url} 时发生未知错误: {e}") logger.info(f"--- 定时任务外部接口调用结束 ---") def request_report(base_url,params): print(f"正在调用接口: {base_url}") print(f"请求参数: {params}") try: # 使用 httpx.get 发送 GET 请求 headers = { "Accept": "application/json", "Content-Type": "application/json" } response = httpx.get(base_url, params=params, timeout=60.0,headers=headers,follow_redirects=True) response.raise_for_status() # 检查响应的 Content-Type,确保它是 JSON data = response.json() logger.info("\n请求成功,返回数据:") logger.info(json.dumps(data, indent=4, ensure_ascii=False)) return data except httpx.RequestError as exc: logger.error(f"\n请求失败,发生错误: {exc}") return None except httpx.HTTPStatusError as exc: logger.error(f"\n请求失败,HTTP 错误状态码: {exc.response.status_code}") logger.error(f"响应内容: {exc.response.text}") return None def download_file(url, download_dir_all,filename,file_id): """ 使用GET请求从URL下载文件并保存到指定目录。 """ # 确保保存目录存在,如果不存在则创建 if not os.path.exists(download_dir_all): os.makedirs(download_dir_all) logger.info(f"开始从 {url} 下载文件...") params = {"id": file_id} try: # 使用流式方式(stream=True)下载 with httpx.stream("GET", url, params=params, follow_redirects=True) as response: # 检查响应状态码,如果不是2xx则抛出异常 response.raise_for_status() # 构造完整的文件路径 file_path = os.path.join(download_dir_all, filename+".docx") logger.info(f"文件名从响应头中获取为: {filename}") # 将文件内容写入本地文件 with open(file_path, "wb") as f: for chunk in response.iter_bytes(): f.write(chunk) logger.info(f"文件成功下载并保存到: {file_path}") return file_path except httpx.HTTPStatusError as e: logger.error(f"下载文件时发生HTTP错误: {e}") except httpx.RequestError as e: logger.error(f"下载文件时发生请求错误: {e}") def start_scheduler(): """ 启动调度器并添加所有定时任务。 """ logger.info("调度器启动中...") # 添加定时任务:每隔1分钟执行一次 poll_push_message_endpoint 函数 scheduler.add_job(poll_external_api, 'interval', minutes=3) scheduler.start() logger.info("调度器已启动。") def shutdown_scheduler(): """ 关闭调度器。 """ logger.info("调度器关闭中...") scheduler.shutdown() logger.info("调度器已关闭。")