import datetime import json import logging import mimetypes import os import httpx from apscheduler.schedulers.asyncio import AsyncIOScheduler from util.config import DataLoader from util.database import get_latest_update_time logger = logging.getLogger(__name__) # 获取当前模块的logger logger.setLevel(logging.INFO) if not logger.handlers: # 避免重复添加handler console_handler = logging.StreamHandler() formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') console_handler.setFormatter(formatter) logger.addHandler(console_handler) # 初始化调度器 scheduler = AsyncIOScheduler() async def poll_external_api(): """ 定时任务:每分钟使用 httpx 调用一次外部接口。 """ current_file = os.path.abspath(__file__) current_dir = os.path.dirname(current_file) project_root = os.path.dirname(current_dir) assets_dir = os.path.join(project_root, "assets") config_path = os.path.join(assets_dir, "config.json") config_data = DataLoader.load_json_data(config_path) push_message_url = config_data["push_message_url"] query_report_url = config_data["query_report_url"] download_dir = config_data["download_dir"] download_dir_all = os.path.join(project_root, download_dir) download_report_url = config_data["download_report_url"] current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") logger.info(f"\n--- [{current_time}] 定时任务开始调用外部消息推送接口: {push_message_url} ---") try: max_time = get_latest_update_time() params = { "start_time": "2025-07-24 14:36:00", "end_time": "2025-07-25 09:11:01" } report_data = request_report(query_report_url,params) if report_data is None: logger.info("没有报告数据。") return reports_list = [] if report_data['code'] == 0: # 按照嵌套结构逐层解析,获得最终的数据列表 reports_list = report_data['data']['data'] else: logger.info("接口返回错误,请检查响应。") return for report in reports_list: file_id = report["file_id"] download_dir_file_id = download_dir_all+"/"+file_id file_path = download_file(download_report_url,download_dir_file_id) async with httpx.AsyncClient() as client: # 1,获取需要推送的文档ID 2,根据ID获取文档,3,文档写成文件,4,发送文件,5,删除文件 files = [] if not os.path.exists(file_path): logger.info(f"警告: 文件不存在,跳过: {file_path}") continue files.append(("files", open(file_path, "rb"))) if not files: logger.info("没有有效文件可发送,请求终止。") return # data 参数用于表单字段,files 参数用于文件 for name in config_data["user_group"]: data = { "message": "文档", "contactName": name } async with httpx.AsyncClient() as client: response = await client.post(push_message_url, data=data, files=files, timeout=60.0) # 打印响应信息 logger.info(f"状态码: {response.status_code}") logger.info(f"响应文本: {response.text}") response.raise_for_status() logger.info("文件发送成功!") # 都发送成功,则删除文件,回填到数据库 except httpx.HTTPStatusError as e: logger.info(f"调用外部接口 {push_message_url} 时发生 HTTP 状态码错误: {e.response.status_code} - {e.response.text}") except httpx.RequestError as e: logger.info(f"调用外部接口 {push_message_url} 时发生请求错误: {e}") except Exception as e: logger.info(f"调用外部接口 {push_message_url} 时发生未知错误: {e}") logger.info(f"--- 定时任务外部接口调用结束 ---") def request_report(base_url,params): print(f"正在调用接口: {base_url}") print(f"请求参数: {params}") try: # 使用 httpx.get 发送 GET 请求 headers = { "Accept": "application/json", "Content-Type": "application/json" } response = httpx.get(base_url, params=params, timeout=60.0,headers=headers,follow_redirects=True) response.raise_for_status() # 检查响应的 Content-Type,确保它是 JSON data = response.json() print("\n请求成功,返回数据:") print(json.dumps(data, indent=4, ensure_ascii=False)) return data except httpx.RequestError as exc: print(f"\n请求失败,发生错误: {exc}") return None except httpx.HTTPStatusError as exc: print(f"\n请求失败,HTTP 错误状态码: {exc.response.status_code}") print(f"响应内容: {exc.response.text}") return None def download_file(url, download_dir_all): """ 使用GET请求从URL下载文件并保存到指定目录。 """ # 确保保存目录存在,如果不存在则创建 if not os.path.exists(download_dir_all): os.makedirs(download_dir_all) print(f"开始从 {url} 下载文件...") try: # 使用流式方式(stream=True)下载 filename = "" with httpx.stream("GET", url, follow_redirects=True) as response: # 检查响应状态码,如果不是2xx则抛出异常 response.raise_for_status() # --- 核心逻辑:从响应头获取文件名 --- content_disposition = response.headers.get("Content-Disposition") if content_disposition: # 寻找文件名参数 # 例如: 'attachment; filename="report.pdf"' parts = content_disposition.split(';') for part in parts: if 'filename=' in part: # 提取文件名并去除多余的引号 filename = part.split('filename=')[1].strip().strip('"') break # 如果文件名没有扩展名,根据MIME类型添加一个 if '.' not in filename: content_type = response.headers.get("Content-Type") if content_type: ext = mimetypes.guess_extension(content_type) if ext: filename += ext # 构造完整的文件路径 file_path = os.path.join(download_dir_all, filename) print(f"文件名从响应头中获取为: {filename}") # 将文件内容写入本地文件 with open(file_path, "wb") as f: for chunk in response.iter_bytes(): f.write(chunk) print(f"文件成功下载并保存到: {file_path}") return file_path except httpx.HTTPStatusError as e: print(f"下载文件时发生HTTP错误: {e}") except httpx.RequestError as e: print(f"下载文件时发生请求错误: {e}") def start_scheduler(): """ 启动调度器并添加所有定时任务。 """ logger.info("调度器启动中...") # 添加定时任务:每隔1分钟执行一次 poll_push_message_endpoint 函数 scheduler.add_job(poll_external_api, 'interval', minutes=1) scheduler.start() logger.info("调度器已启动。") def shutdown_scheduler(): """ 关闭调度器。 """ logger.info("调度器关闭中...") scheduler.shutdown() logger.info("调度器已关闭。")