You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

178 lines
5.5 KiB
Python

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

import json
import mimetypes
import os
from typing import List
from fastapi import APIRouter, HTTPException, Request,Response
import scene_reader
from algos import pre_annotate
from constant_3d import MINIO_BUCKET
from crud.entity.scene_dto import SaveWorldItem, CropSceneRequest, PredictRotationRequest, LoadWorldItem
from crud.service.scene_service import SceneService
from crud.utils.minio_conn import MinioServer
from tools import check_labels as check
router = APIRouter()
# 已修改完成
@router.post("/saveworldlist")
async def saveworldlist(items: List[SaveWorldItem]):
"""批量保存标注数据"""
return SceneService.save_world_list(items)
# 已修改完成
@router.get("/checkscene")
def checkscene(scene: str):
"""检查场景的标注"""
# ck = check.LabelChecker(os.path.join("./data", scene))
ck = check.LabelChecker(scene)
ck.check()
return ck.messages
# 已修改完成
@router.post("/predict_rotation")
async def predict_rotation(request_data: PredictRotationRequest):
"""预测旋转角度"""
# FastAPI 自动将请求体转换为 Pydantic 模型
return {"angle": pre_annotate.predict_yaw(request_data.points)}
# 已修改完成
@router.get("/load_annotation")
def load_annotation(scene: str, frame: str):
"""加载标注数据"""
# 读取标注数据 /data/项目名/label/**.json
# return scene_reader.read_annotations(scene, frame)
return SceneService.get_label_json(scene, frame)
# 已修改完成
@router.get("/load_ego_pose")
def load_ego_pose(scene: str, frame: str):
"""加载自车姿态"""
# return scene_reader.read_ego_pose(scene, frame)
return SceneService.get_ego_pose_json(scene, frame)
# 已修改完成
@router.post("/loadworldlist")
async def load_world_list(request: Request):
"""批量加载标注数据"""
# 1. 获取原始请求体 (bytes)
body = await request.body()
# 2. 将 bytes 转换为字符串
body_str = body.decode('utf-8')
try:
items = json.loads(body_str)
except json.JSONDecodeError:
raise HTTPException(status_code=400, detail="请求体不是有效的 JSON 格式")
anns = []
for i in items:
w = LoadWorldItem(**i)
# 查询数据库的标注数据
anns.append({
"scene": w.scene,
"frame": w.frame,
# "annotation": scene_reader.read_annotations(w.scene, w.frame)
"annotation": SceneService.get_label_json(w.scene, w.frame)
})
return anns
# 已修改完成
@router.get("/datameta")
def datameta():
"""获取所有场景元数据"""
return SceneService.get_scene_info()
# 已修改完成
@router.get("/get_all_scene_desc")
def get_all_scene_desc():
"""获取所有场景描述"""
# todo 获取标注任务列表 ==> 字典key为目录
# return {"aaa": 1}
return SceneService.get_scene_desc()
# 已修改完成
@router.get("/objs_of_scene")
def objs_of_scene(scene: str):
"""获取场景中的所有对象"""
# todo从数据库查询图片列表
return SceneService.get_all_objs(os.path.join("data", scene))
# 无须修改
@router.post("/cropscene")
async def cropscene(request_data: CropSceneRequest):
"""裁剪场景"""
rawdata = request_data.rawSceneId
timestamp = rawdata.split("_")[0]
log_file = f"temp/crop-scene-{timestamp}.log"
# 注意os.system 存在安全风险,在生产环境中应替换为更安全的 subprocess 模块
cmd = (
f"python ./tools/dataset_preprocess/crop_scene.py generate "
f"{rawdata[0:10]}/{timestamp}_preprocessed/dataset_2hz - "
f"{request_data.startTime} {request_data.seconds} "
f'"{request_data.desc}" > {log_file} 2>&1'
)
print(f"Executing command: {cmd}")
code = os.system(cmd)
log = []
if os.path.exists(log_file):
with open(log_file) as f:
log = [s.strip() for s in f.readlines()]
os.remove(log_file)
return {"code": code, "log": log}
# 已修改完成
@router.get("/scenemeta")
def scenemeta(scene: str):
# 获取s目录下的所有文件信息
return scene_reader.get_one_scene(scene)
# 暂时不使用
@router.get("/auto_annotate")
def auto_annotate(scene: str, frame: str):
"""自动标注"""
print(f"Auto annotate {scene}, {frame}")
file_path = f'data/{scene}/lidar/{frame}.pcd'
if not os.path.exists(file_path):
raise HTTPException(status_code=404, detail=f"File not found: {file_path}")
return pre_annotate.annotate_file(file_path)
# 原始静态资源获取
@router.get("/data/{file:path}")
def get_file(file):
file_path = f'/data/{file}'
"""获取文件"""
conn=MinioServer()
minio_file=conn.get_file(MINIO_BUCKET, file_path)
if minio_file is None:
raise HTTPException(status_code=404, detail="File not found")
# media_type, _ = mimetypes.guess_type(minio_file)
# if media_type is None:
media_type = 'application/octet-stream' # 默认二进制流
# 设置响应头,告诉浏览器这是一个附件,并建议文件名
headers = {
'Content-Disposition': f'attachment; filename="{os.path.basename(file_path)}"',
"Content-Type": media_type,
}
# 使用 StreamingResponse 进行流式响应
# response.data 是一个类似文件的对象,可以被直接读取
return Response(
minio_file.read(), # 将整个文件读入内存后返回(简单但内存占用高)
# 更优的流式方式见下方说明
media_type=media_type,
headers=headers
)