You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

194 lines
8.1 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#!/usr/bin/env python
# -*- encoding: utf-8 -*-
import asyncio
import datetime
import json
import logging
import mimetypes
import os
import httpx
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from util.config import DataLoader
from util.database import get_latest_update_time, insert_many_messages, update_sent_status, query_by_sent
logger = logging.getLogger(__name__) # 获取当前模块的logger
logger.setLevel(logging.INFO)
if not logger.handlers: # 避免重复添加handler
console_handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# 初始化调度器
scheduler = AsyncIOScheduler()
async def poll_external_api():
"""
定时任务:每分钟使用 httpx 调用一次外部接口。
"""
current_file = os.path.abspath(__file__)
current_dir = os.path.dirname(current_file)
project_root = os.path.dirname(current_dir)
assets_dir = os.path.join(project_root, "assets")
config_path = os.path.join(assets_dir, "config.json")
config_data = DataLoader.load_json_data(config_path)
push_message_url = config_data["push_message_url"]
query_report_url = config_data["query_report_url"]
download_dir = config_data["download_dir"]
download_dir_all = os.path.join(project_root, download_dir)
download_report_url = config_data["download_report_url"]
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
logger.info(f"\n--- [{current_time}] 定时任务开始调用外部消息推送接口: {push_message_url} ---")
"""
report_list_history= query_by_sent(1)
for report_path in report_list_history:
is_update_sent = True
try:
os.remove(report_path[4])
except Exception as e:
logger.info(f"删除失败,下一次轮询删除")
is_update_sent= False
if is_update_sent:
update_sent_status(report_path[0],2)
"""
max_time = get_latest_update_time()
"""
params = {
"start_time": "2025-07-25 09:09:01",
"end_time": "2025-07-25 09:11:01"
}
"""
params = {
"start_time": max_time,
"end_time": current_time
}
report_data = request_report(query_report_url,params)
if report_data is None:
logger.info("没有报告数据。")
return
reports_list = []
if report_data['code'] == 0:
# 按照嵌套结构逐层解析,获得最终的数据列表
reports_list = report_data['data']['data']
else:
logger.info("接口返回错误,请检查响应。")
return
for report in reports_list:
id = report["id"]
file_id = report["file_id"]
title = report["title"]
download_dir_file_id = download_dir_all+"/"+file_id
file_path = download_file(download_report_url,download_dir_file_id,title,file_id)
if file_path is not None:
messages_to_insert = []
for name in config_data["user_group"]:
messages_to_insert.append((id,file_id, current_time, 0,file_path,name))
error_message = insert_many_messages(messages_to_insert)
if error_message is None:
try:
async with httpx.AsyncClient() as client:
# 1获取需要推送的文档ID 2根据ID获取文档3文档写成文件4发送文件5删除文件
files = []
if not os.path.exists(file_path):
logger.info(f"警告: 文件不存在,跳过: {file_path}")
continue
files.append(("files", open(file_path, "rb")))
if not files:
logger.info("没有有效文件可发送,请求终止。")
return
# data 参数用于表单字段files 参数用于文件
for name in config_data["user_group"]:
data = {
"message": "",
"contactName": name
}
async with httpx.AsyncClient() as client:
response = await client.post(push_message_url, data=data, files=files, timeout=60.0)
# 打印响应信息
logger.info(f"状态码: {response.status_code}")
logger.info(f"响应文本: {response.text}")
response.raise_for_status()
logger.info(f"文件发送成功!{name}")
update_sent_status(id,name,1)
await asyncio.sleep(15)
except httpx.HTTPStatusError as e:
logger.error(f"调用外部接口 {push_message_url} 时发生 HTTP 状态码错误: {e.response.status_code} - {e.response.text}")
except httpx.RequestError as e:
logger.error(f"调用外部接口 {push_message_url} 时发生请求错误: {e}")
except Exception as e:
logger.error(f"调用外部接口 {push_message_url} 时发生未知错误: {e}")
logger.info(f"--- 定时任务外部接口调用结束 ---")
def request_report(base_url,params):
print(f"正在调用接口: {base_url}")
print(f"请求参数: {params}")
try:
# 使用 httpx.get 发送 GET 请求
headers = {
"Accept": "application/json",
"Content-Type": "application/json"
}
response = httpx.get(base_url, params=params, timeout=60.0,headers=headers,follow_redirects=True)
response.raise_for_status()
# 检查响应的 Content-Type确保它是 JSON
data = response.json()
logger.info("\n请求成功,返回数据:")
logger.info(json.dumps(data, indent=4, ensure_ascii=False))
return data
except httpx.RequestError as exc:
logger.error(f"\n请求失败,发生错误: {exc}")
return None
except httpx.HTTPStatusError as exc:
logger.error(f"\n请求失败HTTP 错误状态码: {exc.response.status_code}")
logger.error(f"响应内容: {exc.response.text}")
return None
def download_file(url, download_dir_all,filename,file_id):
"""
使用GET请求从URL下载文件并保存到指定目录。
"""
# 确保保存目录存在,如果不存在则创建
if not os.path.exists(download_dir_all):
os.makedirs(download_dir_all)
logger.info(f"开始从 {url} 下载文件...")
params = {"id": file_id}
try:
# 使用流式方式stream=True下载
with httpx.stream("GET", url, params=params, follow_redirects=True) as response:
# 检查响应状态码如果不是2xx则抛出异常
response.raise_for_status()
# 构造完整的文件路径
file_path = os.path.join(download_dir_all, filename+".docx")
logger.info(f"文件名从响应头中获取为: {filename}")
# 将文件内容写入本地文件
with open(file_path, "wb") as f:
for chunk in response.iter_bytes():
f.write(chunk)
logger.info(f"文件成功下载并保存到: {file_path}")
return file_path
except httpx.HTTPStatusError as e:
logger.error(f"下载文件时发生HTTP错误: {e}")
except httpx.RequestError as e:
logger.error(f"下载文件时发生请求错误: {e}")
def start_scheduler():
"""
启动调度器并添加所有定时任务。
"""
logger.info("调度器启动中...")
# 添加定时任务每隔1分钟执行一次 poll_push_message_endpoint 函数
scheduler.add_job(poll_external_api, 'interval', minutes=3)
scheduler.start()
logger.info("调度器已启动。")
def shutdown_scheduler():
"""
关闭调度器。
"""
logger.info("调度器关闭中...")
scheduler.shutdown()
logger.info("调度器已关闭。")