From d6668beff5b5b4c104c5c1ddec573cceb4bb80ed Mon Sep 17 00:00:00 2001 From: WangZiFan Date: Thu, 12 Mar 2026 15:51:21 +0800 Subject: [PATCH] Upload files to "/" --- convert_to_nuscenes.py | 730 +++++++++++++++++++++++++++++++++++++++++ my_dataset.py | 372 +++++++++++++++++++++ 2 files changed, 1102 insertions(+) create mode 100644 convert_to_nuscenes.py create mode 100644 my_dataset.py diff --git a/convert_to_nuscenes.py b/convert_to_nuscenes.py new file mode 100644 index 0000000..4bd579e --- /dev/null +++ b/convert_to_nuscenes.py @@ -0,0 +1,730 @@ +""" +Convert private dataset to NuScenes-style .pkl and .coco.json files +compatible with MMDetection3D. + +Coordinate system note +---------------------- +Your dataset: X-right, Y-forward, Z-up (same as NuScenes LiDAR frame) + -> No rotation is needed for the point cloud. + -> lidar == ego == global (identity transforms, no vehicle odometry available) + +Projection matrix note +---------------------- +Your camera_raw stores a 3×4 matrix P = K @ [R | t] +where K is the 3×3 intrinsic and [R|t] maps points from the LiDAR frame +into the camera frame. +We decompose P via RQ-decomposition to recover K (intrinsic) and [R|t] +(lidar-to-camera extrinsic). + +Usage +----- +python convert_to_nuscenes.py \ + --root /path/to/your/dataset \ + --out /path/to/output/dir \ + --split 0.8 # fraction used for training (rest = val) + --tag mydata # prefix for output file names +""" + +import os +import sys +import json +import uuid +import pickle +import argparse +import numpy as np +from pathlib import Path +from PIL import Image +from typing import Dict, List, Tuple, Optional + +# --------------------------------------------------------------------------- +# Inline minimal copy of the dataset loader so the script is self-contained. +# If you already have my_dataset.py on PYTHONPATH you can remove this block +# and simply do: from my_dataset import MyDataset, SampleData, FrameData +# --------------------------------------------------------------------------- +import glob +import open3d as o3d +from dataclasses import dataclass, field +from tqdm import tqdm + + +@dataclass +class FrameData: + frame_index: int + lidar_points: np.ndarray # (N,3) float32 + image_paths: Dict[str, str] + camera_raw: Dict[str, dict] # cam_name -> {projection_matrix, image_size} + labels: List[dict] + contain_labels: bool = False + + +@dataclass +class SampleData: + sample_id: str + frames: Dict[int, FrameData] + + +class MyDataset: + cam2filename = { + "front_120": "scanofilm_surround_front_120_8M.jpg", + "front_left_100": "scanofilm_surround_front_left_100_2M.jpg", + "front_right_100": "scanofilm_surround_front_right_100_2M.jpg", + "rear_100": "scanofilm_surround_rear_100_2M.jpg", + "rear_left_100": "scanofilm_surround_rear_left_100_2M.jpg", + "rear_right_100": "scanofilm_surround_rear_right_100_2M.jpg", + } + color2class = { + "#5414ED": "car", "#F6EE64": "pick-up-truck", "#F6A087": "small-truck", + "#BC4EF1": "truck", "#4E9AF1": "bus", "#F1A94E": "special-vehicle", + "#E1DFDD": "ignore", "#F91906": "tricyclist-withrider", + "#FA5F51": "tricyclist-withoutrider", "#B8CB30": "bicycle-withrider", + "#E6FD4E": "bicycle-withoutrider", "#876363": "people", + "#2CBDF5": "crowd-people", "#C9F52C": "crowd-bicycle", + "#DC6788": "crowd-car", "#6EC913": "traffic-cone", + "#0DDE69": "plastic-barrier", "#8260D2": "crash-barrels", + "#F1D1D1": "warning-triangle", "#FE6DF4": "crowd-traffic-cone", + "#D1AA35": "crowd-plastic-barrier", "#3BE8D0": "crowd-crash-barrels", + "#2B7567": "crowd-warning-triangle", + } + + def __init__(self, root_folder: str): + self.root = root_folder + self.class2color = {v: k for k, v in self.color2class.items()} + self._samples: Dict[str, SampleData] = {} + self._load_all_samples() + + @staticmethod + def _extract_id(folder_name: str) -> str: + return ''.join(c for c in os.path.basename(folder_name) if c.isdigit()) + + def _find_sample_folders(self): + return [f for f in glob.glob(os.path.join(self.root, "sample*")) + if os.path.isdir(f)] + + def _find_paths_in_sample(self, sample_folder, sample_id): + pcd_folder = os.path.join(sample_folder, f"pcd_sequence{sample_id}") + pcd_files = glob.glob(os.path.join(pcd_folder, "*.pcd")) if os.path.isdir(pcd_folder) else [] + img_folders = [os.path.join(sample_folder, str(i)) for i in range(len(pcd_files))] + return { + "pcd_folder": pcd_folder, + "img_folders": img_folders, + "page_json": os.path.join(sample_folder, f"test{sample_id}.json"), + "mark_json": os.path.join(sample_folder, f"test{sample_id}-mark.json"), + } + + def _read_lidar_points(self, pcd_path): + try: + pcd = o3d.io.read_point_cloud(pcd_path, remove_nan_points=True, + remove_infinite_points=True, format="auto") + return np.asarray(pcd.points, dtype=np.float32) + except Exception as e: + print(f"Error loading {pcd_path}: {e}") + return None + + def _load_all_lidar_frames(self, pcd_folder): + out = {} + for f in glob.glob(os.path.join(pcd_folder, "*.pcd")): + idx = int(os.path.basename(f).split(".")[0].split("n")[-1]) + pts = self._read_lidar_points(f) + if pts is not None: + out[idx] = pts + return out + + def _load_image_paths(self, img_folders): + out = {} + for folder in img_folders: + if not os.path.isdir(folder): + continue + idx = int(os.path.basename(folder)) + paths = {cam: os.path.join(folder, fname) + for cam, fname in self.cam2filename.items() + if os.path.isfile(os.path.join(folder, fname))} + if paths: + out[idx] = paths + return out + + def _load_camera_raw(self, page_json_path): + if not os.path.isfile(page_json_path): + return {} + with open(page_json_path, "r", encoding="utf-8") as f: + data = json.load(f) + try: + extend_source = data['data']['files'][0]['extendSources'] + except (KeyError, IndexError): + return {} + out = {} + for d in extend_source: + pe = d.get('pageElement', {}) + if "sensor" not in pe: + continue + try: + idx = int(d['fileName'].split('.')[0]) + except Exception: + continue + cam_name = pe['sensor'].replace("ofilm_surround_", "") + for suf in ["_2M", "_8M"]: + if cam_name.endswith(suf): + cam_name = cam_name[:-len(suf)] + break + if cam_name not in self.cam2filename: + continue + matrix = pe.get('mtx', []) + image_size = pe.get('imageSize', []) + if not matrix or not image_size: + continue + matrix = np.array(matrix, dtype=np.float64) + if matrix.shape != (3, 4): + continue + out.setdefault(idx, {})[cam_name] = { + "projection_matrix": matrix, + "image_size": image_size, + } + return out + + def _extract_text_label(self, json_path): + with open(json_path, 'r', encoding='utf-8') as f: + data = json.load(f)['data']['list'][0] + idx2rate = {k: float(v['accuracy'].replace('%', '')) / 100.0 + for k, v in data['rate'].items()} + max_idx = max(idx2rate, key=idx2rate.get) + if idx2rate[max_idx] == 1.0: + return data['result'][max_idx] + return [] + + def _process_text_label(self, json_path): + label_list = self._extract_text_label(json_path) + if not label_list: + return {} + out = {} + for entry in label_list: + out[entry['index']] = self._process_frame_labels(entry['value']) + return out + + def _process_frame_labels(self, label_list): + labels = [] + for lb in label_list: + nl = { + 'num': lb['num'], + 'class': lb['label']['class-name'], + 'is_moving': not bool(lb['label']['static']), + 'isolation': bool(lb['label']['isolation']), + 'color': self.class2color.get(lb['label']['class-name'], ''), + 'points': [[float(lb['newPoints'][i]['x']), + float(lb['newPoints'][i]['y']), + float(lb['newPoints'][i]['z'])] for i in range(8)], + 'center': [float(lb['x']), float(lb['y']), float(lb['z'])], + 'rotateZ': float(lb['rotateZ']), + 'dx': float(lb['width']), + 'dy': float(lb['height']), + 'dz': float(lb['depth']), + } + labels.append(nl) + return labels + + def _load_one_sample(self, sample_folder): + sid = self._extract_id(sample_folder) + paths = self._find_paths_in_sample(sample_folder, sid) + lidar_dict = self._load_all_lidar_frames(paths["pcd_folder"]) + img_dict = self._load_image_paths(paths["img_folders"]) + cam_dict = self._load_camera_raw(paths["page_json"]) + label_dict = self._process_text_label(paths["mark_json"]) + if not (len(label_dict) == len(lidar_dict) == len(img_dict) == len(cam_dict)): + print(f"Sample {sid}: frame count mismatch, skipping.") + return None + frames = {idx: FrameData( + frame_index=idx, + lidar_points=lidar_dict[idx], + image_paths=img_dict[idx], + camera_raw=cam_dict[idx], + labels=label_dict[idx], + contain_labels=len(label_dict[idx]) > 0, + ) for idx in range(len(lidar_dict))} + return SampleData(sample_id=sid, frames=frames) if frames else None + + def _load_all_samples(self): + for sf in tqdm(self._find_sample_folders(), desc="Loading samples"): + s = self._load_one_sample(sf) + if s is not None: + self._samples[s.sample_id] = s + print(f"Loaded {len(self._samples)} samples.") + + @property + def sample_ids(self): + return list(self._samples.keys()) + + def get_sample(self, sid): + return self._samples.get(sid) + + def __len__(self): + return len(self._samples) + + +# =========================================================================== +# Helper math utilities +# =========================================================================== + +def rq_decompose(P: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: + """ + Decompose a 3×4 projection matrix P = K @ [R | t] + Returns K (3×3 upper-triangular intrinsic), R (3×3 rotation), t (3,). + """ + M = P[:, :3] # 3×3 left block + # RQ decomposition via QR on the transpose + Q, R = np.linalg.qr(np.linalg.inv(M).T) + R_mat = np.linalg.inv(R.T) # upper triangular -> intrinsic + Q_mat = Q.T # rotation + + # Enforce positive diagonal on K + signs = np.sign(np.diag(R_mat)) + signs[signs == 0] = 1.0 + D = np.diag(signs) + R_mat = D @ R_mat + Q_mat = D @ Q_mat + + # Recover t + t = np.linalg.inv(R_mat) @ P[:, 3] + + return R_mat, Q_mat, t + + +def rotation_matrix_to_quaternion(R: np.ndarray) -> List[float]: + """Convert 3×3 rotation matrix to quaternion [w, x, y, z].""" + trace = R[0, 0] + R[1, 1] + R[2, 2] + if trace > 0: + s = 0.5 / np.sqrt(trace + 1.0) + w = 0.25 / s + x = (R[2, 1] - R[1, 2]) * s + y = (R[0, 2] - R[2, 0]) * s + z = (R[1, 0] - R[0, 1]) * s + elif R[0, 0] > R[1, 1] and R[0, 0] > R[2, 2]: + s = 2.0 * np.sqrt(1.0 + R[0, 0] - R[1, 1] - R[2, 2]) + w = (R[2, 1] - R[1, 2]) / s + x = 0.25 * s + y = (R[0, 1] + R[1, 0]) / s + z = (R[0, 2] + R[2, 0]) / s + elif R[1, 1] > R[2, 2]: + s = 2.0 * np.sqrt(1.0 + R[1, 1] - R[0, 0] - R[2, 2]) + w = (R[0, 2] - R[2, 0]) / s + x = (R[0, 1] + R[1, 0]) / s + y = 0.25 * s + z = (R[1, 2] + R[2, 1]) / s + else: + s = 2.0 * np.sqrt(1.0 + R[2, 2] - R[0, 0] - R[1, 1]) + w = (R[1, 0] - R[0, 1]) / s + x = (R[0, 2] + R[2, 0]) / s + y = (R[1, 2] + R[2, 1]) / s + z = 0.25 * s + return [w, x, y, z] + + +def identity_quaternion() -> List[float]: + return [1.0, 0.0, 0.0, 0.0] + + +def project_3d_to_2d(points_3d: np.ndarray, P: np.ndarray) -> np.ndarray: + """ + Project Nx3 LiDAR points to image using 3×4 P. + Returns Nx2 pixel coordinates; filters out points behind camera. + """ + N = points_3d.shape[0] + pts_h = np.hstack([points_3d, np.ones((N, 1))]) # Nx4 + proj = (P @ pts_h.T).T # Nx3 + valid = proj[:, 2] > 0 + uv = proj[valid, :2] / proj[valid, 2:3] + return uv, valid + + +def count_points_in_box(points: np.ndarray, cx, cy, cz, dx, dy, dz, yaw) -> int: + """Count LiDAR points inside an axis-aligned-then-rotated bounding box.""" + cos_y, sin_y = np.cos(-yaw), np.sin(-yaw) + R_inv = np.array([[cos_y, -sin_y, 0], + [sin_y, cos_y, 0], + [0, 0, 1]], dtype=np.float32) + pts_centered = points - np.array([cx, cy, cz], dtype=np.float32) + pts_rot = (R_inv @ pts_centered.T).T + mask = ((np.abs(pts_rot[:, 0]) <= dx / 2) & + (np.abs(pts_rot[:, 1]) <= dy / 2) & + (np.abs(pts_rot[:, 2]) <= dz / 2)) + return int(mask.sum()) + + +# --------------------------------------------------------------------------- +# NuScenes camera name mapping +# --------------------------------------------------------------------------- +CAM_NAME_MAP = { + "front_120": "CAM_FRONT", + "front_left_100": "CAM_FRONT_LEFT", + "front_right_100": "CAM_FRONT_RIGHT", + "rear_100": "CAM_BACK", + "rear_left_100": "CAM_BACK_LEFT", + "rear_right_100": "CAM_BACK_RIGHT", +} + +# --------------------------------------------------------------------------- +# Class mapping: private dataset labels -> NuScenes labels +# --------------------------------------------------------------------------- +PRIVATE2NUSCENES = { + "car": "car", + "pick-up-truck": "car", + "small-truck": "truck", + "truck": "truck", + "bus": "bus", + "ignore": "car", + "special-vehicle": "construction_vehicle", + "tricyclist-withrider": "bicycle", + "tricyclist-withoutrider":"bicycle", + "bicycle-withrider": "bicycle", + "bicycle-withoutrider": "bicycle", + "people": "pedestrian", + "crowd-people": "pedestrian", + "crowd-bicycle": "bicycle", + "crowd-car": "car", + "traffic-cone": "traffic_cone", + "plastic-barrier": "barrier", + "crash-barrels": "barrier", + "warning-triangle": "barrier", + "crowd-traffic-cone": "barrier", + "crowd-plastic-barrier": "barrier", + "crowd-crash-barrels": "barrier", + "crowd-warning-triangle": "barrier", +} + +# Official NuScenes detection classes (in canonical order) +DETECTION_CLASSES = [ + "car", "truck", "bus", "construction_vehicle", "motorcycle", + "bicycle", "pedestrian", "traffic_cone", "barrier", "trailer", +] +CLASS2ID = {c: i for i, c in enumerate(DETECTION_CLASSES)} + + +# =========================================================================== +# Core conversion +# =========================================================================== + +def frame_to_info(frame: FrameData, sample_id: str, out_lidar_dir: str, + out_img_dir: str, rel_lidar_root: str, rel_img_root: str) -> dict: + """ + Convert one FrameData into a NuScenes-style info dict. + + Since we have no vehicle poses, we treat lidar = ego = global (identity). + """ + token = str(uuid.uuid4()) + + # ---- Save point cloud as .bin (float32 x,y,z) ------------------------- + os.makedirs(out_lidar_dir, exist_ok=True) + bin_name = f"{sample_id}_frame{frame.frame_index:04d}.bin" + bin_path = os.path.join(out_lidar_dir, bin_name) + frame.lidar_points.astype(np.float32).tofile(bin_path) + lidar_path_rel = os.path.join(rel_lidar_root, bin_name) + + # ---- Identity ego/global poses ---------------------------------------- + zero3 = [0.0, 0.0, 0.0] + id_quat = identity_quaternion() + + # ---- Camera info ------------------------------------------------------- + cams_info = {} + target_W, target_H = 1920, 1280 + + for cam_key, nus_cam in CAM_NAME_MAP.items(): + if cam_key not in frame.camera_raw: + continue + cam_data = frame.camera_raw[cam_key] + P = cam_data["projection_matrix"] # 3×4 + ori_W, ori_H = cam_data["image_size"] + + # Decompose P -> K (3×3 intrinsic), R_cam_from_lidar, t_cam_from_lidar + K, R_c2l_inv, t_in_cam = rq_decompose(P) + # R_c2l_inv is R such that x_cam = R @ x_lidar + t + R_lidar2cam = R_c2l_inv # camera_from_lidar rotation + t_lidar2cam = t_in_cam # translation in camera coords + + # sensor2lidar = inverse of lidar2camera + R_cam2lidar = R_lidar2cam.T + t_cam2lidar = -R_cam2lidar @ t_lidar2cam + + # adjust intrinsic matrix + sx = target_W / ori_W + sy = target_H / ori_H + K_adjusted = K.copy() + K_adjusted[0, 0] *= sx + K_adjusted[1, 1] *= sy + K_adjusted[0, 2] *= sx + K_adjusted[1, 2] *= sy + + # Copy image to output directory + src_img = frame.image_paths[cam_key] + cam_img_dir = os.path.join(out_img_dir, nus_cam) + os.makedirs(cam_img_dir, exist_ok=True) + img_name = f"{sample_id}_frame{frame.frame_index:04d}.jpg" + dst_img = os.path.join(cam_img_dir, img_name) + + img = Image.open(src_img) + img_resized = img.resize((target_W, target_H), Image.LANCZOS) + img_resized.save(dst_img) + img_path_rel = os.path.join(rel_img_root, nus_cam, img_name) + + cam_token = str(uuid.uuid4()) + cams_info[nus_cam] = { + "data_path": img_path_rel, + "type": nus_cam, + "sample_data_token": cam_token, + # camera -> ego (treat camera as fixed to lidar frame via R_cam2lidar) + "sensor2ego_translation": t_cam2lidar.tolist(), + "sensor2ego_rotation": rotation_matrix_to_quaternion(R_cam2lidar), + # ego -> global (identity) + "ego2global_translation": zero3, + "ego2global_rotation": id_quat, + # lidar -> camera extrinsic (for projection) + "sensor2lidar_translation": t_cam2lidar.tolist(), + "sensor2lidar_rotation": rotation_matrix_to_quaternion(R_cam2lidar), + # intrinsic + "cam_intrinsic": K_adjusted.tolist(), + "width": target_W, + "height": target_H, + "timestamp": frame.frame_index * 100000, # synthetic + } + + # ---- Annotations ------------------------------------------------------- + instances = [] + for lb in frame.labels: + raw_cls = lb.get("class", "") + cls = PRIVATE2NUSCENES.get(raw_cls) # remap to NuScenes name + if cls is None or cls not in CLASS2ID: + continue + cx, cy, cz = lb["center"] + dx, dy, dz = lb["dx"], lb["dy"], lb["dz"] + yaw = lb["rotateZ"] + n_pts = count_points_in_box(frame.lidar_points, cx, cy, cz, dx, dy, dz, yaw) + + instances.append({ + "bbox_3d": [cx, cy, cz, dx, dy, dz, yaw], + "bbox_label_3d": CLASS2ID[cls], + "bbox": [0.0, 0.0, 1.0, 1.0], # no 2D GT available + "bbox_label": CLASS2ID[cls], + "velocity": [float("nan"), float("nan")], + "num_lidar_pts": n_pts, + "bbox_3d_isvalid": n_pts > 0, + }) + + info = { + "lidar_points": { + "lidar_path": lidar_path_rel, + "num_pts_feats": 3, # X, Y, Z only + }, + "token": token, + "sweeps": [], # no sweep data available + "cams": cams_info, + # lidar == ego (identity) + "lidar2ego_translation": zero3, + "lidar2ego_rotation": id_quat, + # ego == global (identity) + "ego2global_translation": zero3, + "ego2global_rotation": id_quat, + "timestamp": frame.frame_index * 100000, + # annotations + "instances": instances, + } + return info + + +def build_coco_json(infos: List[dict]) -> dict: + """Build a COCO-style mono3d JSON from NuScenes info dicts.""" + categories = [{"id": i, "name": c} for i, c in enumerate(DETECTION_CLASSES)] + images = [] + annotations = [] + ann_id = 0 + + for info in infos: + frame_token = info["token"] + for nus_cam, cam_info in info["cams"].items(): + img_token = cam_info["sample_data_token"] + W = cam_info["width"] + H = cam_info["height"] + K = np.array(cam_info["cam_intrinsic"]) # 3×3 + + img_entry = { + "file_name": cam_info["data_path"], + "id": img_token, + "token": frame_token, + "cam2ego_rotation": cam_info["sensor2ego_rotation"], + "cam2ego_translation": cam_info["sensor2ego_translation"], + "ego2global_rotation": info["ego2global_rotation"], + "ego2global_translation": info["ego2global_translation"], + "cam_intrinsic": cam_info["cam_intrinsic"], + "width": W, + "height": H, + } + images.append(img_entry) + + # Build sensor2lidar matrix for projecting boxes + t_c2l = np.array(cam_info["sensor2lidar_translation"]) + quat = cam_info["sensor2lidar_rotation"] # [w,x,y,z] + w, x, y, z = quat + R_c2l = np.array([ + [1-2*(y*y+z*z), 2*(x*y-z*w), 2*(x*z+y*w)], + [2*(x*y+z*w), 1-2*(x*x+z*z), 2*(y*z-x*w)], + [2*(x*z-y*w), 2*(y*z+x*w), 1-2*(x*x+y*y)], + ]) + R_l2c = R_c2l.T + t_l2c = -R_l2c @ t_c2l + + # Iterate over the valid "instances" key generated by frame_to_info + for inst in info.get("instances", []): + cls_id = inst["bbox_label_3d"] + if cls_id < 0 or cls_id >= len(DETECTION_CLASSES): + continue + + cls = DETECTION_CLASSES[cls_id] + cx_l, cy_l, cz_l, dx, dy, dz, yaw_l = inst["bbox_3d"] + + # Transform center to camera frame + center_l = np.array([cx_l, cy_l, cz_l]) + center_c = R_l2c @ center_l + t_l2c + depth = center_c[2] + if depth <= 0: + continue # box behind camera + + # Project 3D center to 2D + center_2d = K @ center_c + u = center_2d[0] / center_2d[2] + v = center_2d[1] / center_2d[2] + + # Yaw in camera frame (approx: only rotate around Z in lidar) + yaw_cam = yaw_l + + # 3D box annotation: [cx_cam, cy_cam, cz_cam, dx, dy, dz, yaw_cam] + bbox_cam3d = [float(center_c[0]), float(center_c[1]), float(center_c[2]), + float(dx), float(dy), float(dz), float(yaw_cam)] + + # Project 8 corners to get 2D bbox + corners_l = _box_corners(cx_l, cy_l, cz_l, dx, dy, dz, yaw_l) + corners_c = (R_l2c @ corners_l.T).T + t_l2c + corners_c = corners_c[corners_c[:, 2] > 0] + if len(corners_c) == 0: + continue + proj_h = (K @ corners_c.T).T + uvs = proj_h[:, :2] / proj_h[:, 2:3] + x1, y1 = uvs[:, 0].min(), uvs[:, 1].min() + x2, y2 = uvs[:, 0].max(), uvs[:, 1].max() + x1 = max(0, min(x1, W)); x2 = max(0, min(x2, W)) + y1 = max(0, min(y1, H)); y2 = max(0, min(y2, H)) + bw, bh = x2 - x1, y2 - y1 + if bw <= 0 or bh <= 0: + continue + + # Extract velocity from the instance dict + vx, vy = inst.get("velocity", [float("nan"), float("nan")]) + + ann = { + "file_name": cam_info["data_path"], + "image_id": img_token, + "area": float(bw * bh), + "category_name": cls, + "category_id": cls_id, + "bbox": [float(x1), float(y1), float(bw), float(bh)], + "iscrowd": 0, + "bbox_cam3d": bbox_cam3d, + "velo_cam3d": [vx, vy], + "center2d": [float(u), float(v), float(depth)], + "attribute_name": "", + "attribute_id": -1, + "id": ann_id, + } + annotations.append(ann) + ann_id += 1 + + return {"categories": categories, "images": images, "annotations": annotations} + + +def _box_corners(cx, cy, cz, dx, dy, dz, yaw) -> np.ndarray: + """Return 8 corners of a 3D box as (8,3) array.""" + hx, hy, hz = dx / 2, dy / 2, dz / 2 + corners = np.array([ + [ hx, hy, hz], [ hx, hy, -hz], [ hx, -hy, hz], [ hx, -hy, -hz], + [-hx, hy, hz], [-hx, hy, -hz], [-hx, -hy, hz], [-hx, -hy, -hz], + ], dtype=np.float32) + c, s = np.cos(yaw), np.sin(yaw) + Rz = np.array([[c, -s, 0], [s, c, 0], [0, 0, 1]], dtype=np.float32) + corners = (Rz @ corners.T).T + np.array([cx, cy, cz], dtype=np.float32) + return corners + + +# =========================================================================== +# Main +# =========================================================================== + +def main(): + parser = argparse.ArgumentParser(description="Convert private dataset to NuScenes-style PKL/JSON.") + parser.add_argument("--root", default="sample", help="Root folder of your dataset") + parser.add_argument("--out", default="my_dataset", help="Output directory") + parser.add_argument("--split", type=float, default=0.8, help="Train fraction (0-1)") + parser.add_argument("--tag", default="mydata", help="Output file prefix") + args = parser.parse_args() + + out_root = Path(args.out) + lidar_dir = out_root / "samples" / "LIDAR_TOP" + img_dir = out_root / "samples" + rel_lidar = "samples/LIDAR_TOP" + rel_img = "samples" + + print("Loading dataset ...") + ds = MyDataset(args.root) + + # Collect all (sample_id, frame_index) pairs + all_frames = [] + for sid in sorted(ds.sample_ids): + sample = ds.get_sample(sid) + for fidx in sorted(sample.frames.keys()): + all_frames.append((sid, fidx, sample.frames[fidx])) + + print(f"Total frames: {len(all_frames)}") + n_train = int(len(all_frames) * args.split) + train_frames = all_frames[:n_train] + val_frames = all_frames[n_train:] + + def convert_split(frames, split_name): + infos = [] + for sid, fidx, frame in tqdm(frames, desc=f"Converting {split_name}"): + info = frame_to_info( + frame, sid, + out_lidar_dir=str(lidar_dir), + out_img_dir=str(img_dir), + rel_lidar_root=rel_lidar, + rel_img_root=rel_img, + ) + infos.append(info) + + pkl_path = out_root / f"{args.tag}_infos_{split_name}.pkl" + json_path = out_root / f"{args.tag}_infos_{split_name}_mono3d.coco.json" + + with open(pkl_path, "wb") as f: + pickle.dump({ + "metainfo": { + "version": "custom-v1.0", + "classes": DETECTION_CLASSES, + }, + "data_list": infos, + }, f) + print(f"Saved {pkl_path} ({len(infos)} frames)") + + coco = build_coco_json(infos) + with open(json_path, "w", encoding="utf-8") as f: + json.dump(coco, f, ensure_ascii=False, indent=2) + print(f"Saved {json_path} ({len(coco['images'])} images, {len(coco['annotations'])} anns)") + + convert_split(train_frames, "train") + convert_split(val_frames, "val") + + # Write class list for reference + classes_path = out_root / f"{args.tag}_classes.txt" + classes_path.write_text("\n".join(DETECTION_CLASSES)) + print(f"\nDone. Output structure:\n {out_root}/") + print(f" ├── samples/LIDAR_TOP/*.bin") + print(f" ├── samples/CAM_FRONT/*.jpg (and other cameras)") + print(f" ├── {args.tag}_infos_train.pkl") + print(f" ├── {args.tag}_infos_val.pkl") + print(f" ├── {args.tag}_infos_train_mono3d.coco.json") + print(f" └── {args.tag}_infos_val_mono3d.coco.json") + + +if __name__ == "__main__": + main() diff --git a/my_dataset.py b/my_dataset.py new file mode 100644 index 0000000..ca420f3 --- /dev/null +++ b/my_dataset.py @@ -0,0 +1,372 @@ +import os +import glob +import json +import numpy as np +import open3d as o3d +from typing import Dict, List, Optional, Union +from dataclasses import dataclass, field +from tqdm import tqdm + +@dataclass +class FrameData: + """单帧原始数据,完全未经过数学转换""" + frame_index: int # 帧编号(从0开始) + lidar_points: np.ndarray # (N, 3) float32,原始点云坐标 + image_paths: Dict[str, str] # 视角名 -> 图像路径 + camera_raw: Dict[str, dict] # 视角名称 -> { + # "projection_matrix": List[float], # 4x4 投影矩阵 + # "image_size": List[int] # [width, height] + # } + labels: List[dict] # 原始3D框标注,每个框包含: + # - "num": int # 框ID + # - "color": str + # - "class": str # 原始类别名 + # - "points": List[List[float]] # 8个点的xyz坐标 + # - "center": List[float] # [x,y,z] + # - "rotateZ": float # 绕Z轴旋转角,弧度 + # # 正方向:从X正半轴转向Y正半轴(逆时针) + # - "dx": float + # - "dy": float + # - "dz": float + contain_labels: bool = False # 是否包含标注框 + +@dataclass +class SampleData: + """一个 sample 文件夹包含的全部帧""" + sample_id: str # 数字ID + frames: Dict[int, FrameData] # 帧索引 -> FrameData + +class MyDataset: + f""" + 私有数据集加载。点云坐标系(世界坐标系)Y轴正方向为前,X轴正方向为右,Z轴正方向为上。 + 数据组织规范: + root_folder/ + sample{id}/ + ├── pcd_sequence{id}/ # 点云帧 + ├── 0/, 1/, .../ # 各帧图像 + ├── test{id}.json # 相机参数 + └── test{id}-mark.json # 3D标注 + """ + def __init__(self, root_folder: str): + self.root = root_folder + self._samples: Dict[str, SampleData] = {} + + self.color2class = { + "#5414ED": "car", + "#F6EE64": "pick-up-truck", + "#F6A087": "small-truck", + "#BC4EF1": "truck", + "#4E9AF1": "bus", + "#F1A94E": "special-vehicle", + "#E1DFDD": "ignore", + "#F91906": "tricyclist-withrider", + "#FA5F51": "tricyclist-withoutrider", + "#B8CB30": "bicycle-withrider", + "#E6FD4E": "bicycle-withoutrider", + "#876363": "people", + "#2CBDF5": "crowd-people", + "#C9F52C": "crowd-bicycle", + "#DC6788": "crowd-car", + "#6EC913": "traffic-cone", + "#0DDE69": "plastic-barrier", + "#8260D2": "crash-barrels", + "#F1D1D1": "warning-triangle", + "#FE6DF4": "crowd-traffic-cone", + "#D1AA35": "crowd-plastic-barrier", + "#3BE8D0": "crowd-crash-barrels", + "#2B7567": "crowd-warning-triangle" + } + self.class2color = {cls: color for color, cls in self.color2class.items()} + + self.cam2filename = { + "front_120": "scanofilm_surround_front_120_8M.jpg", + "front_left_100": "scanofilm_surround_front_left_100_2M.jpg", + "front_right_100": "scanofilm_surround_front_right_100_2M.jpg", + "rear_100": "scanofilm_surround_rear_100_2M.jpg", + "rear_left_100": "scanofilm_surround_rear_left_100_2M.jpg", + "rear_right_100": "scanofilm_surround_rear_right_100_2M.jpg", + } + + self._load_all_samples() + # ------------------------------------------------------------------ + # 路径与ID辅助方法 + # ------------------------------------------------------------------ + @staticmethod + def _extract_id(folder_name: str) -> str: + """从文件夹名提取连续数字作为sample_id""" + return ''.join(c for c in os.path.basename(folder_name) if c.isdigit()) + + def _find_sample_folders(self) -> List[str]: + """返回所有 sample{id} 文件夹路径""" + pattern = os.path.join(self.root, "sample*") + return [f for f in glob.glob(pattern) if os.path.isdir(f)] + + def _find_paths_in_sample(self, sample_folder: str, sample_id: str) -> dict: + """返回 sample 内部各资源路径字典""" + pcd_folder = os.path.join(sample_folder, f"pcd_sequence{sample_id}") + img_folders = [] + if os.path.isdir(pcd_folder): + pcd_files = glob.glob(os.path.join(pcd_folder, "*.pcd")) + num_frames = len(pcd_files) + img_folders = [os.path.join(sample_folder, str(i)) for i in range(num_frames)] + page_json = os.path.join(sample_folder, f"test{sample_id}.json") + mark_json = os.path.join(sample_folder, f"test{sample_id}-mark.json") + return { + "pcd_folder": pcd_folder, + "img_folders": img_folders, + "page_json": page_json, + "mark_json": mark_json + } + + # ------------------------------------------------------------------ + # 原始数据读取(无转换) + # ------------------------------------------------------------------ + def _read_lidar_points(self, pcd_path: str) -> np.ndarray: + try: + # Convert path to bytes for better Chinese path support on Windows + pcd_path_bytes = pcd_path.encode('utf-8') if isinstance(pcd_path, str) else pcd_path + pcd = o3d.io.read_point_cloud( + pcd_path_bytes, + remove_nan_points=True, + remove_infinite_points=True, + format="auto" + ) + pcd_np = np.asarray(pcd.points, dtype=np.float32) # [N, 3] + return pcd_np + + except Exception as e: + print(f"Error loading point cloud {pcd_path}: {e}") + return None + + def _load_all_lidar_frames(self, pcd_folder: str) -> Dict[int, np.ndarray]: + idx2pcd_np = {} + pcd_files = glob.glob(os.path.join(pcd_folder, "*.pcd")) + for pcd_file in pcd_files: + idx = int(os.path.basename(pcd_file).split(".")[0].split("n")[-1]) + pcd_np = self._read_lidar_points(pcd_file) + if pcd_np is not None: + idx2pcd_np[idx] = pcd_np + return idx2pcd_np + + def _load_image_paths(self, img_folders: List[str]) -> Dict[int, Dict[str, str]]: + """返回 帧索引 -> {视角名: 图像路径}""" + idx2imgs = {} + for folder in img_folders: + if not os.path.isdir(folder): + continue + idx = int(os.path.basename(folder)) + paths = {} + for cam, fname in self.cam2filename.items(): + full_path = os.path.join(folder, fname) + assert os.path.isfile(full_path), f"Image file not found: {full_path}" + paths[cam] = full_path + if paths: + idx2imgs[idx] = paths + return idx2imgs + + def _load_camera_raw(self, page_json_path: str) -> Dict[int, Dict[str, dict]]: + """返回 帧索引 -> {视角名: {"projection_matrix": list, "image_size": list}}""" + if not os.path.isfile(page_json_path): + return {} + with open(page_json_path, "r", encoding="utf-8") as f: + data = json.load(f) + try: + extend_source = data['data']['files'][0]['extendSources'] + except (KeyError, IndexError): + return {} + + idx2cam = {} + for d in extend_source: + page_element = d.get('pageElement', {}) + if "sensor" not in page_element: + continue + # 提取帧索引 + try: + idx = int(d['fileName'].split('.')[0]) + except: + continue + sensor = page_element['sensor'] + + cam_name = sensor.replace("ofilm_surround_", "") + for suffix in ["_2M", "_8M"]: + if cam_name.endswith(suffix): + cam_name = cam_name[:-len(suffix)] + break + + if cam_name not in self.cam2filename: + continue + + matrix = page_element.get('mtx', []) + image_size = page_element.get('imageSize', []) + if not matrix or not image_size: + continue + + matrix = np.array(matrix, dtype=np.float64) + assert matrix.shape == (3, 4), f"Invalid projection matrix shape: {matrix.shape}" + cam_info = { + "projection_matrix": matrix, + "image_size": image_size + } + if idx not in idx2cam: + idx2cam[idx] = {} + idx2cam[idx][cam_name] = cam_info + return idx2cam + + def extract_text_label_from_json(self, json_path: str) -> List[Dict]: + with open(json_path, 'r', encoding='utf-8') as f: + data = json.load(f)['data']['list'][0] + + type_dict = data['rate'] + idx2rate = {} + for key, value in type_dict.items(): + float_value = float(value['accuracy'].replace('%', '')) / 100.0 + idx2rate[key] = float_value + + # 找出最大准确率对应的索引 + max_idx = max(idx2rate, key=idx2rate.get) + max_rate = idx2rate[max_idx] + + if max_rate == 1.0: + result = data['result'] + label_list = result[max_idx] + + return label_list + else: + return [] + + def process_text_label(self, json_path: str): + label_list = self.extract_text_label_from_json(json_path) + + if len(label_list) == 0: + return {} + + idx2label: Dict[int, List[Dict]] = {} + for label in label_list: + idx2label[label['index']] = self.process_label_for_one_frame(label['value']) + return idx2label + + def process_label_for_one_frame(self, label_list: List[Dict]) -> List[Dict]: + if len(label_list) == 0: + return [] + + # there could be multiple 3D bounding boxes in one frame + labels = [] + for label in label_list: + new_label = {} + + # num is the id of the 3D bounding box + new_label['num'] = label['num'] + + # label + new_label['label'] = label['label'] + + # class + new_label['class'] = label['label']['class-name'] + + # is_moving + new_label['is_moving'] = not bool(label['label']['static']) + + # isolation + # True: 被建筑物,护栏,绿化带等隔离开 + # False: 可⾏驶区域 + new_label['isolation'] = bool(label['label']['isolation']) + + # color + new_label['color'] = self.class2color[new_label['class']] + + # corners + assert len(label['newPoints']) == 8, f"Points number for one 3D bounding box is 8, but got {len(label['newPoints'])}." + eight_points = [] + for i in range(8): + eight_points.append( + [ + float(label['newPoints'][i]['x']), + float(label['newPoints'][i]['y']), + float(label['newPoints'][i]['z']) + ] + ) + new_label['points'] = eight_points + + # center + new_label['center'] = [ + float(label['x']), + float(label['y']), + float(label['z']) + ] + + # rotate - 只保留绕Z轴的旋转 + # 单位:弧度 + # 正方向:从X轴正半轴转向Y轴正半轴(逆时针) + new_label['rotateZ'] = float(label['rotateZ']) + + new_label['dx'] = float(label['width']) + new_label['dy'] = float(label['height']) + new_label['dz'] = float(label['depth']) + + labels.append(new_label) + return labels + + # ------------------------------------------------------------------ + # Sample 级组装 + # ------------------------------------------------------------------ + def _load_one_sample(self, sample_folder: str) -> Optional[SampleData]: + """加载单个 sample 文件夹,返回 SampleData 对象""" + sample_id = self._extract_id(sample_folder) + paths = self._find_paths_in_sample(sample_folder, sample_id) + + # 读取各模态原始数据 + lidar_dict = self._load_all_lidar_frames(paths["pcd_folder"]) + img_dict = self._load_image_paths(paths["img_folders"]) + cam_dict = self._load_camera_raw(paths["page_json"]) + label_dict = self.process_text_label(paths["mark_json"]) + if not (len(label_dict) == len(lidar_dict) == len(img_dict) == len(cam_dict)): + print(f"Sample {sample_id}: Mismatch in frame counts between lidar, images, cameras, and labels.") + return None + + frames = {} + for idx in range(len(lidar_dict)): + frames[idx] = FrameData( + frame_index=idx, + lidar_points=lidar_dict[idx], + image_paths=img_dict[idx], + camera_raw=cam_dict[idx], + labels=label_dict[idx], + contain_labels=len(label_dict[idx]) > 0 + ) + if not frames: + return None + return SampleData(sample_id=sample_id, frames=frames) + + def _load_all_samples(self): + """遍历所有 sample 文件夹并加载""" + sample_folders = self._find_sample_folders() + process_bar = tqdm(sample_folders, desc="Loading samples") + for sf in process_bar: + sample = self._load_one_sample(sf) + if sample is not None: + self._samples[sample.sample_id] = sample + + print(f"Loaded {len(self._samples)} samples.") + + # ------------------------------------------------------------------ + # 公开接口 + # ------------------------------------------------------------------ + @property + def sample_ids(self) -> List[str]: + return list(self._samples.keys()) + + def get_sample(self, sample_id: str) -> Optional[SampleData]: + return self._samples.get(sample_id) + + def __len__(self) -> int: + return len(self._samples) + + def __getitem__(self, idx: int) -> SampleData: + """通过整数索引访问 sample(按sample_id排序)""" + sids = sorted(self.sample_ids) + if idx < 0 or idx >= len(sids): + raise IndexError(f"Index {idx} out of range [0, {len(sids)-1}]") + return self._samples[sids[idx]] + +if __name__ == "__main__": + dataset = MyDataset(root_folder="sample") \ No newline at end of file