feat 🐛:add

main
许标 6 months ago
parent 064d6a0d7e
commit dbc25c5903

@ -8,7 +8,7 @@ import httpx
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from util.config import DataLoader
from util.database import get_latest_update_time
from util.database import get_latest_update_time, insert_many_messages, update_sent_status
logger = logging.getLogger(__name__) # 获取当前模块的logger
logger.setLevel(logging.INFO)
@ -56,33 +56,40 @@ async def poll_external_api():
logger.info("接口返回错误,请检查响应。")
return
for report in reports_list:
id = report["id"]
file_id = report["file_id"]
download_dir_file_id = download_dir_all+"/"+file_id
file_path = download_file(download_report_url,download_dir_file_id)
async with httpx.AsyncClient() as client:
# 1获取需要推送的文档ID 2根据ID获取文档3文档写成文件4发送文件5删除文件
files = []
if not os.path.exists(file_path):
logger.info(f"警告: 文件不存在,跳过: {file_path}")
continue
files.append(("files", open(file_path, "rb")))
if not files:
logger.info("没有有效文件可发送,请求终止。")
return
# data 参数用于表单字段files 参数用于文件
for name in config_data["user_group"]:
data = {
"message": "文档",
"contactName": name
}
if file_path is not None:
messages_to_insert =[(id,file_id, current_time, 0)]
error_message = insert_many_messages(messages_to_insert)
if error_message is None:
async with httpx.AsyncClient() as client:
response = await client.post(push_message_url, data=data, files=files, timeout=60.0)
# 打印响应信息
logger.info(f"状态码: {response.status_code}")
logger.info(f"响应文本: {response.text}")
response.raise_for_status()
logger.info("文件发送成功!")
# 都发送成功,则删除文件,回填到数据库
# 1获取需要推送的文档ID 2根据ID获取文档3文档写成文件4发送文件5删除文件
files = []
if not os.path.exists(file_path):
logger.info(f"警告: 文件不存在,跳过: {file_path}")
continue
files.append(("files", open(file_path, "rb")))
if not files:
logger.info("没有有效文件可发送,请求终止。")
return
# data 参数用于表单字段files 参数用于文件
for name in config_data["user_group"]:
data = {
"message": "文档",
"contactName": name
}
async with httpx.AsyncClient() as client:
response = await client.post(push_message_url, data=data, files=files, timeout=60.0)
# 打印响应信息
logger.info(f"状态码: {response.status_code}")
logger.info(f"响应文本: {response.text}")
response.raise_for_status()
logger.info("文件发送成功!")
# 都发送成功,则删除文件,回填到数据库
update_sent_status(id)
os.remove(file_path)
except httpx.HTTPStatusError as e:
logger.info(f"调用外部接口 {push_message_url} 时发生 HTTP 状态码错误: {e.response.status_code} - {e.response.text}")
except httpx.RequestError as e:
@ -124,9 +131,9 @@ def download_file(url, download_dir_all):
if not os.path.exists(download_dir_all):
os.makedirs(download_dir_all)
print(f"开始从 {url} 下载文件...")
filename = None
try:
# 使用流式方式stream=True下载
filename = ""
with httpx.stream("GET", url, follow_redirects=True) as response:
# 检查响应状态码如果不是2xx则抛出异常
response.raise_for_status()

@ -76,18 +76,22 @@ def insert_many_messages(messages_to_insert):
messages_to_insert 是一个包含 (id, update_time, sent) 元组的列表
例如: [('id1', '2023-10-27 10:00:00', 0), ('id2', '2023-10-27 10:05:00', 0)]
"""
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
try:
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# 使用 executemany 进行批量插入,性能更好
cursor.executemany("INSERT OR IGNORE INTO messages VALUES (?, ?, ?,?)", messages_to_insert)
# 使用 executemany 进行批量插入,性能更好
cursor.executemany("INSERT OR IGNORE INTO messages VALUES (?, ?, ?,?)", messages_to_insert)
conn.commit()
conn.close()
conn.commit()
conn.close()
except Exception as e:
return e
print(f"已批量插入 {len(messages_to_insert)} 条数据到数据库。")
return None
def update_sent_status(ids_to_update):
def update_sent_status(id):
"""
根据给定的 ID 列表 sent 字段更新为 1 (已发送)
ids_to_update 是一个包含 ID 的列表或元组
@ -97,10 +101,9 @@ def update_sent_status(ids_to_update):
cursor = conn.cursor()
# 构建 SQL 语句,使用 IN 子句批量更新
placeholders = ', '.join('?' for _ in ids_to_update)
sql_query = f"UPDATE messages SET sent = 1 WHERE id IN ({placeholders})"
sql_query = f"UPDATE messages SET sent = 1 WHERE id = ?"
cursor.execute(sql_query, ids_to_update)
cursor.execute(sql_query, id)
conn.commit()
conn.close()

Loading…
Cancel
Save