import json import mimetypes import os from typing import List from fastapi import APIRouter, HTTPException, Request,Response import scene_reader from algos import pre_annotate from constant_3d import MINIO_BUCKET from crud.entity.scene_dto import SaveWorldItem, CropSceneRequest, PredictRotationRequest, LoadWorldItem from crud.service.scene_service import SceneService from crud.utils.minio_conn import MinioServer from tools import check_labels as check router = APIRouter() # 已修改完成 @router.post("/saveworldlist") async def saveworldlist(items: List[SaveWorldItem]): """批量保存标注数据""" return SceneService.save_world_list(items) # 已修改完成 @router.get("/checkscene") def checkscene(scene: str): """检查场景的标注""" # ck = check.LabelChecker(os.path.join("./data", scene)) ck = check.LabelChecker(scene) ck.check() return ck.messages # 已修改完成 @router.post("/predict_rotation") async def predict_rotation(request_data: PredictRotationRequest): """预测旋转角度""" # FastAPI 自动将请求体转换为 Pydantic 模型 return {"angle": pre_annotate.predict_yaw(request_data.points)} # 已修改完成 @router.get("/load_annotation") def load_annotation(scene: str, frame: str): """加载标注数据""" # 读取标注数据 /data/项目名/label/**.json # return scene_reader.read_annotations(scene, frame) return SceneService.get_label_json(scene, frame) # 已修改完成 @router.get("/load_ego_pose") def load_ego_pose(scene: str, frame: str): """加载自车姿态""" # return scene_reader.read_ego_pose(scene, frame) return SceneService.get_ego_pose_json(scene, frame) # 已修改完成 @router.post("/loadworldlist") async def load_world_list(request: Request): """批量加载标注数据""" # 1. 获取原始请求体 (bytes) body = await request.body() # 2. 将 bytes 转换为字符串 body_str = body.decode('utf-8') try: items = json.loads(body_str) except json.JSONDecodeError: raise HTTPException(status_code=400, detail="请求体不是有效的 JSON 格式") anns = [] for i in items: w = LoadWorldItem(**i) # 查询数据库的标注数据 anns.append({ "scene": w.scene, "frame": w.frame, # "annotation": scene_reader.read_annotations(w.scene, w.frame) "annotation": SceneService.get_label_json(w.scene, w.frame) }) return anns # 已修改完成 @router.get("/datameta") def datameta(): """获取所有场景元数据""" return SceneService.get_scene_info() # 已修改完成 @router.get("/get_all_scene_desc") def get_all_scene_desc(): """获取所有场景描述""" # todo 获取标注任务列表 ==> 字典,key为目录 # return {"aaa": 1} return SceneService.get_scene_desc() # 已修改完成 @router.get("/objs_of_scene") def objs_of_scene(scene: str): """获取场景中的所有对象""" # todo从数据库查询图片列表 return SceneService.get_all_objs(os.path.join("./data", scene)) # 无须修改 @router.post("/cropscene") async def cropscene(request_data: CropSceneRequest): """裁剪场景""" rawdata = request_data.rawSceneId timestamp = rawdata.split("_")[0] log_file = f"temp/crop-scene-{timestamp}.log" # 注意:os.system 存在安全风险,在生产环境中应替换为更安全的 subprocess 模块 cmd = ( f"python ./tools/dataset_preprocess/crop_scene.py generate " f"{rawdata[0:10]}/{timestamp}_preprocessed/dataset_2hz - " f"{request_data.startTime} {request_data.seconds} " f'"{request_data.desc}" > {log_file} 2>&1' ) print(f"Executing command: {cmd}") code = os.system(cmd) log = [] if os.path.exists(log_file): with open(log_file) as f: log = [s.strip() for s in f.readlines()] os.remove(log_file) return {"code": code, "log": log} # todo 修改完成一半 @router.get("/scenemeta") def scenemeta(scene: str): # 获取s目录下的所有文件信息 return scene_reader.get_one_scene(scene) # 暂时不使用 @router.get("/auto_annotate") def auto_annotate(scene: str, frame: str): """自动标注""" print(f"Auto annotate {scene}, {frame}") file_path = f'./data/{scene}/lidar/{frame}.pcd' if not os.path.exists(file_path): raise HTTPException(status_code=404, detail=f"File not found: {file_path}") return pre_annotate.annotate_file(file_path) @router.get("/data/{file:path}") def get_file(file): file_path = f'/data/{file}' """获取文件""" conn=MinioServer() minio_file=conn.get_file(MINIO_BUCKET, file_path) if minio_file is None: raise HTTPException(status_code=404, detail="File not found") # media_type, _ = mimetypes.guess_type(minio_file) # if media_type is None: media_type = 'application/octet-stream' # 默认二进制流 # 设置响应头,告诉浏览器这是一个附件,并建议文件名 headers = { 'Content-Disposition': f'attachment; filename="{os.path.basename(file_path)}"', "Content-Type": media_type, } # 使用 StreamingResponse 进行流式响应 # response.data 是一个类似文件的对象,可以被直接读取 return Response( minio_file.read(), # 将整个文件读入内存后返回(简单但内存占用高) # 更优的流式方式见下方说明 media_type=media_type, headers=headers ) # @router.get("/get_file") def get_file(file_path:str): """获取文件""" conn=MinioServer() minio_file=conn.get_file(MINIO_BUCKET, file_path) # media_type, _ = mimetypes.guess_type(minio_file) # if media_type is None: media_type = 'application/octet-stream' # 默认二进制流 # 设置响应头,告诉浏览器这是一个附件,并建议文件名 headers = { 'Content-Disposition': f'attachment; filename="{os.path.basename(file_path)}"', "Content-Type": media_type, } # 使用 StreamingResponse 进行流式响应 # response.data 是一个类似文件的对象,可以被直接读取 return Response( minio_file.read(), # 将整个文件读入内存后返回(简单但内存占用高) # 更优的流式方式见下方说明 media_type=media_type, headers=headers )