# from filterpy.kalman import KalmanFilter import numpy as np import sys import os from crud.service.scene_service import SceneService BASE_DIR = os.path.dirname(os.path.abspath(__file__)) sys.path.append(BASE_DIR) sys.path.append(os.path.join(BASE_DIR, '..')) import scene_reader class MAFilter: def __init__(self, init_x): self.x = init_x self.v = np.zeros(9) # position, rotation self.step = 0 def update(self, x): if self.step == 0: self.v = x-self.x else: self.v = self.v*0.5 + (x-self.x)*0.5 self.x[0:9] = x self.step += 1 def predict(self): self.x += self.v self.step += 1 return self.x def get_my_filter(init_x): return MAFilter(init_x) # def get_kalman_filter(init_x): # dim_z = 9 # kf = KalmanFilter(dim_x=12, dim_z=dim_z) # kf.F = np.array([ [1,0,0,0,0,0,0,0,0,1,0,0], # state transition matrix # [0,1,0,0,0,0,0,0,0,0,1,0], # [0,0,1,0,0,0,0,0,0,0,0,0], # [0,0,0,1,0,0,0,0,0,0,0,0], # [0,0,0,0,1,0,0,0,0,0,0,0], # [0,0,0,0,0,1,0,0,0,0,0,0], # [0,0,0,0,0,0,1,0,0,0,0,0], # [0,0,0,0,0,0,0,1,0,0,0,0], # [0,0,0,0,0,0,0,0,1,0,0,0], # [0,0,0,0,0,0,0,0,0,1,0,0], # [0,0,0,0,0,0,0,0,1,0,1,0], # [0,0,0,0,0,0,0,0,0,0,0,1], # ]) # kf.H = np.array([ [1,0,0,0,0,0,0,0,0,0,0,0], # measurement function, # [0,1,0,0,0,0,0,0,0,0,0,0], # [0,0,1,0,0,0,0,0,0,0,0,0], # [0,0,0,1,0,0,0,0,0,0,0,0], # [0,0,0,0,1,0,0,0,0,0,0,0], # [0,0,0,0,0,1,0,0,0,0,0,0], # [0,0,0,0,0,0,1,0,0,0,0,0], # [0,0,0,0,0,0,0,1,0,0,0,0], # [0,0,0,0,0,0,0,0,1,0,0,0], # ]) # # self.kf.R[0:,0:] *= 10. # measurement uncertainty # #kf.P[dim_z:,dim_z:] *= 1000. #state uncertainty, give high uncertainty to the unobservable initial velocities, covariance matrix # #kf.P *= 10. # # self.kf.Q[-1,-1] *= 0.01 # process uncertainty # #kf.Q[dim_z:,dim_z:] *= 0.01 # #kf.x = kf.x * 0.0 # kf.x[:dim_z] = init_x.reshape((dim_z, 1)) # print(kf.P) # return kf def get_obj_ann(scene, frame, id): ann = scene_reader.read_annotations(scene, frame) target_ann = list(filter(lambda a: a["obj_id"]==id, ann)) if len(target_ann) == 1: return target_ann[0] elif len(target_ann)> 1: print("Warning: duplicate object id found!") return target_ann[0] else: return None # bbox3D measurement state: x,y,z,theta,l,w,h, vx,vy,vz def ann_to_kalman_state(ann): return np.array([ ann["psr"]["position"]["x"], ann["psr"]["position"]["y"], ann["psr"]["position"]["z"], ann["psr"]["scale"]["x"], ann["psr"]["scale"]["y"], ann["psr"]["scale"]["z"], ann["psr"]["rotation"]["x"], ann["psr"]["rotation"]["y"], ann["psr"]["rotation"]["z"], ]) def kalman_state_to_ann(proto, state): state = np.reshape(state, -1) return {"psr":{"position":{"x":state[0], "y":state[1], "z":state[2] }, "scale": {"x":state[3], "y":state[4], "z":state[5]}, "rotation":{"x":state[6], "y":state[7], "z":state[8] }, }, "obj_type":proto["obj_type"], "obj_id":proto["obj_id"], } def interpolate_segment(start_ann, end_ann, insert_number): end = ann_to_kalman_state(end_ann) start = ann_to_kalman_state(start_ann) linear_delta = (end-start)/(insert_number+1) return list(map(lambda i: kalman_state_to_ann(start_ann, start+linear_delta*(i+1)), range(insert_number))) def interpolate(annotations): # interpolate N = len(annotations) i = 0 num_interpolate = 0 while i+1