feat 🐛:add db

main
许标 6 months ago
parent dbc25c5903
commit ed4b4440ab

@ -3,7 +3,7 @@
"文件传输助手"
],
"push_message_url":"http://192.168.0.216:17000/api/win/sendMixedFiles",
"query_report_url": "http://192.168.197.97:5001/wind-api/v1/windflood/report/queryReport",
"download_report_url": "http://127.0.0.1:5001/wind-api/v1/windflood/report/download"
"query_report_url": "http://192.168.0.216:5001/wind-api/v1/windflood/report/queryReport",
"download_report_url": "http://192.168.0.216:5001/wind-api/v1/windflood/report/download",
"download_dir": "report"
}

@ -1,3 +1,4 @@
import asyncio
import datetime
import json
import logging
@ -8,7 +9,7 @@ import httpx
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from util.config import DataLoader
from util.database import get_latest_update_time, insert_many_messages, update_sent_status
from util.database import get_latest_update_time, insert_many_messages, update_sent_status, query_by_sent
logger = logging.getLogger(__name__) # 获取当前模块的logger
logger.setLevel(logging.INFO)
@ -38,12 +39,29 @@ async def poll_external_api():
download_report_url = config_data["download_report_url"]
current_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
logger.info(f"\n--- [{current_time}] 定时任务开始调用外部消息推送接口: {push_message_url} ---")
"""
report_list_history= query_by_sent(1)
for report_path in report_list_history:
is_update_sent = True
try:
os.remove(report_path[4])
except Exception as e:
logger.info(f"删除失败,下一次轮询删除")
is_update_sent= False
if is_update_sent:
update_sent_status(report_path[0],2)
"""
max_time = get_latest_update_time()
"""
params = {
"start_time": "2025-07-24 14:36:00",
"start_time": "2025-07-25 09:09:01",
"end_time": "2025-07-25 09:11:01"
}
"""
params = {
"start_time": current_time,
"end_time": max_time
}
report_data = request_report(query_report_url,params)
if report_data is None:
logger.info("没有报告数据。")
@ -58,12 +76,16 @@ async def poll_external_api():
for report in reports_list:
id = report["id"]
file_id = report["file_id"]
title = report["title"]
download_dir_file_id = download_dir_all+"/"+file_id
file_path = download_file(download_report_url,download_dir_file_id)
file_path = download_file(download_report_url,download_dir_file_id,title,file_id)
if file_path is not None:
messages_to_insert =[(id,file_id, current_time, 0)]
messages_to_insert = []
for name in config_data["user_group"]:
messages_to_insert.append((id,file_id, current_time, 0,file_path,name))
error_message = insert_many_messages(messages_to_insert)
if error_message is None:
try:
async with httpx.AsyncClient() as client:
# 1获取需要推送的文档ID 2根据ID获取文档3文档写成文件4发送文件5删除文件
files = []
@ -87,9 +109,7 @@ async def poll_external_api():
logger.info(f"响应文本: {response.text}")
response.raise_for_status()
logger.info("文件发送成功!")
# 都发送成功,则删除文件,回填到数据库
update_sent_status(id)
os.remove(file_path)
update_sent_status(id,name,1)
except httpx.HTTPStatusError as e:
logger.info(f"调用外部接口 {push_message_url} 时发生 HTTP 状态码错误: {e.response.status_code} - {e.response.text}")
except httpx.RequestError as e:
@ -123,7 +143,7 @@ def request_report(base_url,params):
print(f"响应内容: {exc.response.text}")
return None
def download_file(url, download_dir_all):
def download_file(url, download_dir_all,filename,file_id):
"""
使用GET请求从URL下载文件并保存到指定目录
"""
@ -131,34 +151,14 @@ def download_file(url, download_dir_all):
if not os.path.exists(download_dir_all):
os.makedirs(download_dir_all)
print(f"开始从 {url} 下载文件...")
filename = None
params = {"id": file_id}
try:
# 使用流式方式stream=True下载
with httpx.stream("GET", url, follow_redirects=True) as response:
with httpx.stream("GET", url, params=params, follow_redirects=True) as response:
# 检查响应状态码如果不是2xx则抛出异常
response.raise_for_status()
# --- 核心逻辑:从响应头获取文件名 ---
content_disposition = response.headers.get("Content-Disposition")
if content_disposition:
# 寻找文件名参数
# 例如: 'attachment; filename="report.pdf"'
parts = content_disposition.split(';')
for part in parts:
if 'filename=' in part:
# 提取文件名并去除多余的引号
filename = part.split('filename=')[1].strip().strip('"')
break
# 如果文件名没有扩展名根据MIME类型添加一个
if '.' not in filename:
content_type = response.headers.get("Content-Type")
if content_type:
ext = mimetypes.guess_extension(content_type)
if ext:
filename += ext
# 构造完整的文件路径
file_path = os.path.join(download_dir_all, filename)
file_path = os.path.join(download_dir_all, filename+".docx")
print(f"文件名从响应头中获取为: {filename}")
# 将文件内容写入本地文件
with open(file_path, "wb") as f:
@ -171,6 +171,7 @@ def download_file(url, download_dir_all):
except httpx.RequestError as e:
print(f"下载文件时发生请求错误: {e}")
def start_scheduler():
"""
启动调度器并添加所有定时任务

@ -0,0 +1,8 @@
import unittest
from util.database import update_sent_status
class TestUpdataDb(unittest.TestCase):
def test_updata_db(self):
update_sent_status(350)

@ -36,7 +36,9 @@ def init_db():
id TEXT ,
file_id TEXT,
update_time TEXT,
sent INTEGER
sent INTEGER,
file_path TEXT,
contact_name TEXT
)
""")
conn.commit()
@ -81,7 +83,7 @@ def insert_many_messages(messages_to_insert):
cursor = conn.cursor()
# 使用 executemany 进行批量插入,性能更好
cursor.executemany("INSERT OR IGNORE INTO messages VALUES (?, ?, ?,?)", messages_to_insert)
cursor.executemany("INSERT OR IGNORE INTO messages VALUES (?, ?, ?,?,?,?)", messages_to_insert)
conn.commit()
conn.close()
@ -91,7 +93,7 @@ def insert_many_messages(messages_to_insert):
return None
def update_sent_status(id):
def update_sent_status(id,contact_name,sent):
"""
根据给定的 ID 列表 sent 字段更新为 1 (已发送)
ids_to_update 是一个包含 ID 的列表或元组
@ -101,10 +103,23 @@ def update_sent_status(id):
cursor = conn.cursor()
# 构建 SQL 语句,使用 IN 子句批量更新
sql_query = f"UPDATE messages SET sent = 1 WHERE id = ?"
sql_query = "UPDATE messages SET sent = ? WHERE id = ? and contact_name = ?"
cursor.execute(sql_query, id)
cursor.execute(sql_query, (sent, id,contact_name))
conn.commit()
conn.close()
print(f"已更新 {cursor.rowcount} 条数据的 sent 状态。")
def query_by_sent(sent):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# 查询所有匹配给定 ID 的记录
cursor.execute("SELECT id, file_id, update_time, sent,file_path,contact_name FROM messages WHERE sent = ?", (sent,))
results = cursor.fetchall()
conn.close()
# 返回查询结果列表
return results

Loading…
Cancel
Save