diff --git a/convert_to_nuscenes.py b/convert_to_nuscenes.py deleted file mode 100644 index 4bd579e..0000000 --- a/convert_to_nuscenes.py +++ /dev/null @@ -1,730 +0,0 @@ -""" -Convert private dataset to NuScenes-style .pkl and .coco.json files -compatible with MMDetection3D. - -Coordinate system note ----------------------- -Your dataset: X-right, Y-forward, Z-up (same as NuScenes LiDAR frame) - -> No rotation is needed for the point cloud. - -> lidar == ego == global (identity transforms, no vehicle odometry available) - -Projection matrix note ----------------------- -Your camera_raw stores a 3×4 matrix P = K @ [R | t] -where K is the 3×3 intrinsic and [R|t] maps points from the LiDAR frame -into the camera frame. -We decompose P via RQ-decomposition to recover K (intrinsic) and [R|t] -(lidar-to-camera extrinsic). - -Usage ------ -python convert_to_nuscenes.py \ - --root /path/to/your/dataset \ - --out /path/to/output/dir \ - --split 0.8 # fraction used for training (rest = val) - --tag mydata # prefix for output file names -""" - -import os -import sys -import json -import uuid -import pickle -import argparse -import numpy as np -from pathlib import Path -from PIL import Image -from typing import Dict, List, Tuple, Optional - -# --------------------------------------------------------------------------- -# Inline minimal copy of the dataset loader so the script is self-contained. -# If you already have my_dataset.py on PYTHONPATH you can remove this block -# and simply do: from my_dataset import MyDataset, SampleData, FrameData -# --------------------------------------------------------------------------- -import glob -import open3d as o3d -from dataclasses import dataclass, field -from tqdm import tqdm - - -@dataclass -class FrameData: - frame_index: int - lidar_points: np.ndarray # (N,3) float32 - image_paths: Dict[str, str] - camera_raw: Dict[str, dict] # cam_name -> {projection_matrix, image_size} - labels: List[dict] - contain_labels: bool = False - - -@dataclass -class SampleData: - sample_id: str - frames: Dict[int, FrameData] - - -class MyDataset: - cam2filename = { - "front_120": "scanofilm_surround_front_120_8M.jpg", - "front_left_100": "scanofilm_surround_front_left_100_2M.jpg", - "front_right_100": "scanofilm_surround_front_right_100_2M.jpg", - "rear_100": "scanofilm_surround_rear_100_2M.jpg", - "rear_left_100": "scanofilm_surround_rear_left_100_2M.jpg", - "rear_right_100": "scanofilm_surround_rear_right_100_2M.jpg", - } - color2class = { - "#5414ED": "car", "#F6EE64": "pick-up-truck", "#F6A087": "small-truck", - "#BC4EF1": "truck", "#4E9AF1": "bus", "#F1A94E": "special-vehicle", - "#E1DFDD": "ignore", "#F91906": "tricyclist-withrider", - "#FA5F51": "tricyclist-withoutrider", "#B8CB30": "bicycle-withrider", - "#E6FD4E": "bicycle-withoutrider", "#876363": "people", - "#2CBDF5": "crowd-people", "#C9F52C": "crowd-bicycle", - "#DC6788": "crowd-car", "#6EC913": "traffic-cone", - "#0DDE69": "plastic-barrier", "#8260D2": "crash-barrels", - "#F1D1D1": "warning-triangle", "#FE6DF4": "crowd-traffic-cone", - "#D1AA35": "crowd-plastic-barrier", "#3BE8D0": "crowd-crash-barrels", - "#2B7567": "crowd-warning-triangle", - } - - def __init__(self, root_folder: str): - self.root = root_folder - self.class2color = {v: k for k, v in self.color2class.items()} - self._samples: Dict[str, SampleData] = {} - self._load_all_samples() - - @staticmethod - def _extract_id(folder_name: str) -> str: - return ''.join(c for c in os.path.basename(folder_name) if c.isdigit()) - - def _find_sample_folders(self): - return [f for f in glob.glob(os.path.join(self.root, "sample*")) - if os.path.isdir(f)] - - def _find_paths_in_sample(self, sample_folder, sample_id): - pcd_folder = os.path.join(sample_folder, f"pcd_sequence{sample_id}") - pcd_files = glob.glob(os.path.join(pcd_folder, "*.pcd")) if os.path.isdir(pcd_folder) else [] - img_folders = [os.path.join(sample_folder, str(i)) for i in range(len(pcd_files))] - return { - "pcd_folder": pcd_folder, - "img_folders": img_folders, - "page_json": os.path.join(sample_folder, f"test{sample_id}.json"), - "mark_json": os.path.join(sample_folder, f"test{sample_id}-mark.json"), - } - - def _read_lidar_points(self, pcd_path): - try: - pcd = o3d.io.read_point_cloud(pcd_path, remove_nan_points=True, - remove_infinite_points=True, format="auto") - return np.asarray(pcd.points, dtype=np.float32) - except Exception as e: - print(f"Error loading {pcd_path}: {e}") - return None - - def _load_all_lidar_frames(self, pcd_folder): - out = {} - for f in glob.glob(os.path.join(pcd_folder, "*.pcd")): - idx = int(os.path.basename(f).split(".")[0].split("n")[-1]) - pts = self._read_lidar_points(f) - if pts is not None: - out[idx] = pts - return out - - def _load_image_paths(self, img_folders): - out = {} - for folder in img_folders: - if not os.path.isdir(folder): - continue - idx = int(os.path.basename(folder)) - paths = {cam: os.path.join(folder, fname) - for cam, fname in self.cam2filename.items() - if os.path.isfile(os.path.join(folder, fname))} - if paths: - out[idx] = paths - return out - - def _load_camera_raw(self, page_json_path): - if not os.path.isfile(page_json_path): - return {} - with open(page_json_path, "r", encoding="utf-8") as f: - data = json.load(f) - try: - extend_source = data['data']['files'][0]['extendSources'] - except (KeyError, IndexError): - return {} - out = {} - for d in extend_source: - pe = d.get('pageElement', {}) - if "sensor" not in pe: - continue - try: - idx = int(d['fileName'].split('.')[0]) - except Exception: - continue - cam_name = pe['sensor'].replace("ofilm_surround_", "") - for suf in ["_2M", "_8M"]: - if cam_name.endswith(suf): - cam_name = cam_name[:-len(suf)] - break - if cam_name not in self.cam2filename: - continue - matrix = pe.get('mtx', []) - image_size = pe.get('imageSize', []) - if not matrix or not image_size: - continue - matrix = np.array(matrix, dtype=np.float64) - if matrix.shape != (3, 4): - continue - out.setdefault(idx, {})[cam_name] = { - "projection_matrix": matrix, - "image_size": image_size, - } - return out - - def _extract_text_label(self, json_path): - with open(json_path, 'r', encoding='utf-8') as f: - data = json.load(f)['data']['list'][0] - idx2rate = {k: float(v['accuracy'].replace('%', '')) / 100.0 - for k, v in data['rate'].items()} - max_idx = max(idx2rate, key=idx2rate.get) - if idx2rate[max_idx] == 1.0: - return data['result'][max_idx] - return [] - - def _process_text_label(self, json_path): - label_list = self._extract_text_label(json_path) - if not label_list: - return {} - out = {} - for entry in label_list: - out[entry['index']] = self._process_frame_labels(entry['value']) - return out - - def _process_frame_labels(self, label_list): - labels = [] - for lb in label_list: - nl = { - 'num': lb['num'], - 'class': lb['label']['class-name'], - 'is_moving': not bool(lb['label']['static']), - 'isolation': bool(lb['label']['isolation']), - 'color': self.class2color.get(lb['label']['class-name'], ''), - 'points': [[float(lb['newPoints'][i]['x']), - float(lb['newPoints'][i]['y']), - float(lb['newPoints'][i]['z'])] for i in range(8)], - 'center': [float(lb['x']), float(lb['y']), float(lb['z'])], - 'rotateZ': float(lb['rotateZ']), - 'dx': float(lb['width']), - 'dy': float(lb['height']), - 'dz': float(lb['depth']), - } - labels.append(nl) - return labels - - def _load_one_sample(self, sample_folder): - sid = self._extract_id(sample_folder) - paths = self._find_paths_in_sample(sample_folder, sid) - lidar_dict = self._load_all_lidar_frames(paths["pcd_folder"]) - img_dict = self._load_image_paths(paths["img_folders"]) - cam_dict = self._load_camera_raw(paths["page_json"]) - label_dict = self._process_text_label(paths["mark_json"]) - if not (len(label_dict) == len(lidar_dict) == len(img_dict) == len(cam_dict)): - print(f"Sample {sid}: frame count mismatch, skipping.") - return None - frames = {idx: FrameData( - frame_index=idx, - lidar_points=lidar_dict[idx], - image_paths=img_dict[idx], - camera_raw=cam_dict[idx], - labels=label_dict[idx], - contain_labels=len(label_dict[idx]) > 0, - ) for idx in range(len(lidar_dict))} - return SampleData(sample_id=sid, frames=frames) if frames else None - - def _load_all_samples(self): - for sf in tqdm(self._find_sample_folders(), desc="Loading samples"): - s = self._load_one_sample(sf) - if s is not None: - self._samples[s.sample_id] = s - print(f"Loaded {len(self._samples)} samples.") - - @property - def sample_ids(self): - return list(self._samples.keys()) - - def get_sample(self, sid): - return self._samples.get(sid) - - def __len__(self): - return len(self._samples) - - -# =========================================================================== -# Helper math utilities -# =========================================================================== - -def rq_decompose(P: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: - """ - Decompose a 3×4 projection matrix P = K @ [R | t] - Returns K (3×3 upper-triangular intrinsic), R (3×3 rotation), t (3,). - """ - M = P[:, :3] # 3×3 left block - # RQ decomposition via QR on the transpose - Q, R = np.linalg.qr(np.linalg.inv(M).T) - R_mat = np.linalg.inv(R.T) # upper triangular -> intrinsic - Q_mat = Q.T # rotation - - # Enforce positive diagonal on K - signs = np.sign(np.diag(R_mat)) - signs[signs == 0] = 1.0 - D = np.diag(signs) - R_mat = D @ R_mat - Q_mat = D @ Q_mat - - # Recover t - t = np.linalg.inv(R_mat) @ P[:, 3] - - return R_mat, Q_mat, t - - -def rotation_matrix_to_quaternion(R: np.ndarray) -> List[float]: - """Convert 3×3 rotation matrix to quaternion [w, x, y, z].""" - trace = R[0, 0] + R[1, 1] + R[2, 2] - if trace > 0: - s = 0.5 / np.sqrt(trace + 1.0) - w = 0.25 / s - x = (R[2, 1] - R[1, 2]) * s - y = (R[0, 2] - R[2, 0]) * s - z = (R[1, 0] - R[0, 1]) * s - elif R[0, 0] > R[1, 1] and R[0, 0] > R[2, 2]: - s = 2.0 * np.sqrt(1.0 + R[0, 0] - R[1, 1] - R[2, 2]) - w = (R[2, 1] - R[1, 2]) / s - x = 0.25 * s - y = (R[0, 1] + R[1, 0]) / s - z = (R[0, 2] + R[2, 0]) / s - elif R[1, 1] > R[2, 2]: - s = 2.0 * np.sqrt(1.0 + R[1, 1] - R[0, 0] - R[2, 2]) - w = (R[0, 2] - R[2, 0]) / s - x = (R[0, 1] + R[1, 0]) / s - y = 0.25 * s - z = (R[1, 2] + R[2, 1]) / s - else: - s = 2.0 * np.sqrt(1.0 + R[2, 2] - R[0, 0] - R[1, 1]) - w = (R[1, 0] - R[0, 1]) / s - x = (R[0, 2] + R[2, 0]) / s - y = (R[1, 2] + R[2, 1]) / s - z = 0.25 * s - return [w, x, y, z] - - -def identity_quaternion() -> List[float]: - return [1.0, 0.0, 0.0, 0.0] - - -def project_3d_to_2d(points_3d: np.ndarray, P: np.ndarray) -> np.ndarray: - """ - Project Nx3 LiDAR points to image using 3×4 P. - Returns Nx2 pixel coordinates; filters out points behind camera. - """ - N = points_3d.shape[0] - pts_h = np.hstack([points_3d, np.ones((N, 1))]) # Nx4 - proj = (P @ pts_h.T).T # Nx3 - valid = proj[:, 2] > 0 - uv = proj[valid, :2] / proj[valid, 2:3] - return uv, valid - - -def count_points_in_box(points: np.ndarray, cx, cy, cz, dx, dy, dz, yaw) -> int: - """Count LiDAR points inside an axis-aligned-then-rotated bounding box.""" - cos_y, sin_y = np.cos(-yaw), np.sin(-yaw) - R_inv = np.array([[cos_y, -sin_y, 0], - [sin_y, cos_y, 0], - [0, 0, 1]], dtype=np.float32) - pts_centered = points - np.array([cx, cy, cz], dtype=np.float32) - pts_rot = (R_inv @ pts_centered.T).T - mask = ((np.abs(pts_rot[:, 0]) <= dx / 2) & - (np.abs(pts_rot[:, 1]) <= dy / 2) & - (np.abs(pts_rot[:, 2]) <= dz / 2)) - return int(mask.sum()) - - -# --------------------------------------------------------------------------- -# NuScenes camera name mapping -# --------------------------------------------------------------------------- -CAM_NAME_MAP = { - "front_120": "CAM_FRONT", - "front_left_100": "CAM_FRONT_LEFT", - "front_right_100": "CAM_FRONT_RIGHT", - "rear_100": "CAM_BACK", - "rear_left_100": "CAM_BACK_LEFT", - "rear_right_100": "CAM_BACK_RIGHT", -} - -# --------------------------------------------------------------------------- -# Class mapping: private dataset labels -> NuScenes labels -# --------------------------------------------------------------------------- -PRIVATE2NUSCENES = { - "car": "car", - "pick-up-truck": "car", - "small-truck": "truck", - "truck": "truck", - "bus": "bus", - "ignore": "car", - "special-vehicle": "construction_vehicle", - "tricyclist-withrider": "bicycle", - "tricyclist-withoutrider":"bicycle", - "bicycle-withrider": "bicycle", - "bicycle-withoutrider": "bicycle", - "people": "pedestrian", - "crowd-people": "pedestrian", - "crowd-bicycle": "bicycle", - "crowd-car": "car", - "traffic-cone": "traffic_cone", - "plastic-barrier": "barrier", - "crash-barrels": "barrier", - "warning-triangle": "barrier", - "crowd-traffic-cone": "barrier", - "crowd-plastic-barrier": "barrier", - "crowd-crash-barrels": "barrier", - "crowd-warning-triangle": "barrier", -} - -# Official NuScenes detection classes (in canonical order) -DETECTION_CLASSES = [ - "car", "truck", "bus", "construction_vehicle", "motorcycle", - "bicycle", "pedestrian", "traffic_cone", "barrier", "trailer", -] -CLASS2ID = {c: i for i, c in enumerate(DETECTION_CLASSES)} - - -# =========================================================================== -# Core conversion -# =========================================================================== - -def frame_to_info(frame: FrameData, sample_id: str, out_lidar_dir: str, - out_img_dir: str, rel_lidar_root: str, rel_img_root: str) -> dict: - """ - Convert one FrameData into a NuScenes-style info dict. - - Since we have no vehicle poses, we treat lidar = ego = global (identity). - """ - token = str(uuid.uuid4()) - - # ---- Save point cloud as .bin (float32 x,y,z) ------------------------- - os.makedirs(out_lidar_dir, exist_ok=True) - bin_name = f"{sample_id}_frame{frame.frame_index:04d}.bin" - bin_path = os.path.join(out_lidar_dir, bin_name) - frame.lidar_points.astype(np.float32).tofile(bin_path) - lidar_path_rel = os.path.join(rel_lidar_root, bin_name) - - # ---- Identity ego/global poses ---------------------------------------- - zero3 = [0.0, 0.0, 0.0] - id_quat = identity_quaternion() - - # ---- Camera info ------------------------------------------------------- - cams_info = {} - target_W, target_H = 1920, 1280 - - for cam_key, nus_cam in CAM_NAME_MAP.items(): - if cam_key not in frame.camera_raw: - continue - cam_data = frame.camera_raw[cam_key] - P = cam_data["projection_matrix"] # 3×4 - ori_W, ori_H = cam_data["image_size"] - - # Decompose P -> K (3×3 intrinsic), R_cam_from_lidar, t_cam_from_lidar - K, R_c2l_inv, t_in_cam = rq_decompose(P) - # R_c2l_inv is R such that x_cam = R @ x_lidar + t - R_lidar2cam = R_c2l_inv # camera_from_lidar rotation - t_lidar2cam = t_in_cam # translation in camera coords - - # sensor2lidar = inverse of lidar2camera - R_cam2lidar = R_lidar2cam.T - t_cam2lidar = -R_cam2lidar @ t_lidar2cam - - # adjust intrinsic matrix - sx = target_W / ori_W - sy = target_H / ori_H - K_adjusted = K.copy() - K_adjusted[0, 0] *= sx - K_adjusted[1, 1] *= sy - K_adjusted[0, 2] *= sx - K_adjusted[1, 2] *= sy - - # Copy image to output directory - src_img = frame.image_paths[cam_key] - cam_img_dir = os.path.join(out_img_dir, nus_cam) - os.makedirs(cam_img_dir, exist_ok=True) - img_name = f"{sample_id}_frame{frame.frame_index:04d}.jpg" - dst_img = os.path.join(cam_img_dir, img_name) - - img = Image.open(src_img) - img_resized = img.resize((target_W, target_H), Image.LANCZOS) - img_resized.save(dst_img) - img_path_rel = os.path.join(rel_img_root, nus_cam, img_name) - - cam_token = str(uuid.uuid4()) - cams_info[nus_cam] = { - "data_path": img_path_rel, - "type": nus_cam, - "sample_data_token": cam_token, - # camera -> ego (treat camera as fixed to lidar frame via R_cam2lidar) - "sensor2ego_translation": t_cam2lidar.tolist(), - "sensor2ego_rotation": rotation_matrix_to_quaternion(R_cam2lidar), - # ego -> global (identity) - "ego2global_translation": zero3, - "ego2global_rotation": id_quat, - # lidar -> camera extrinsic (for projection) - "sensor2lidar_translation": t_cam2lidar.tolist(), - "sensor2lidar_rotation": rotation_matrix_to_quaternion(R_cam2lidar), - # intrinsic - "cam_intrinsic": K_adjusted.tolist(), - "width": target_W, - "height": target_H, - "timestamp": frame.frame_index * 100000, # synthetic - } - - # ---- Annotations ------------------------------------------------------- - instances = [] - for lb in frame.labels: - raw_cls = lb.get("class", "") - cls = PRIVATE2NUSCENES.get(raw_cls) # remap to NuScenes name - if cls is None or cls not in CLASS2ID: - continue - cx, cy, cz = lb["center"] - dx, dy, dz = lb["dx"], lb["dy"], lb["dz"] - yaw = lb["rotateZ"] - n_pts = count_points_in_box(frame.lidar_points, cx, cy, cz, dx, dy, dz, yaw) - - instances.append({ - "bbox_3d": [cx, cy, cz, dx, dy, dz, yaw], - "bbox_label_3d": CLASS2ID[cls], - "bbox": [0.0, 0.0, 1.0, 1.0], # no 2D GT available - "bbox_label": CLASS2ID[cls], - "velocity": [float("nan"), float("nan")], - "num_lidar_pts": n_pts, - "bbox_3d_isvalid": n_pts > 0, - }) - - info = { - "lidar_points": { - "lidar_path": lidar_path_rel, - "num_pts_feats": 3, # X, Y, Z only - }, - "token": token, - "sweeps": [], # no sweep data available - "cams": cams_info, - # lidar == ego (identity) - "lidar2ego_translation": zero3, - "lidar2ego_rotation": id_quat, - # ego == global (identity) - "ego2global_translation": zero3, - "ego2global_rotation": id_quat, - "timestamp": frame.frame_index * 100000, - # annotations - "instances": instances, - } - return info - - -def build_coco_json(infos: List[dict]) -> dict: - """Build a COCO-style mono3d JSON from NuScenes info dicts.""" - categories = [{"id": i, "name": c} for i, c in enumerate(DETECTION_CLASSES)] - images = [] - annotations = [] - ann_id = 0 - - for info in infos: - frame_token = info["token"] - for nus_cam, cam_info in info["cams"].items(): - img_token = cam_info["sample_data_token"] - W = cam_info["width"] - H = cam_info["height"] - K = np.array(cam_info["cam_intrinsic"]) # 3×3 - - img_entry = { - "file_name": cam_info["data_path"], - "id": img_token, - "token": frame_token, - "cam2ego_rotation": cam_info["sensor2ego_rotation"], - "cam2ego_translation": cam_info["sensor2ego_translation"], - "ego2global_rotation": info["ego2global_rotation"], - "ego2global_translation": info["ego2global_translation"], - "cam_intrinsic": cam_info["cam_intrinsic"], - "width": W, - "height": H, - } - images.append(img_entry) - - # Build sensor2lidar matrix for projecting boxes - t_c2l = np.array(cam_info["sensor2lidar_translation"]) - quat = cam_info["sensor2lidar_rotation"] # [w,x,y,z] - w, x, y, z = quat - R_c2l = np.array([ - [1-2*(y*y+z*z), 2*(x*y-z*w), 2*(x*z+y*w)], - [2*(x*y+z*w), 1-2*(x*x+z*z), 2*(y*z-x*w)], - [2*(x*z-y*w), 2*(y*z+x*w), 1-2*(x*x+y*y)], - ]) - R_l2c = R_c2l.T - t_l2c = -R_l2c @ t_c2l - - # Iterate over the valid "instances" key generated by frame_to_info - for inst in info.get("instances", []): - cls_id = inst["bbox_label_3d"] - if cls_id < 0 or cls_id >= len(DETECTION_CLASSES): - continue - - cls = DETECTION_CLASSES[cls_id] - cx_l, cy_l, cz_l, dx, dy, dz, yaw_l = inst["bbox_3d"] - - # Transform center to camera frame - center_l = np.array([cx_l, cy_l, cz_l]) - center_c = R_l2c @ center_l + t_l2c - depth = center_c[2] - if depth <= 0: - continue # box behind camera - - # Project 3D center to 2D - center_2d = K @ center_c - u = center_2d[0] / center_2d[2] - v = center_2d[1] / center_2d[2] - - # Yaw in camera frame (approx: only rotate around Z in lidar) - yaw_cam = yaw_l - - # 3D box annotation: [cx_cam, cy_cam, cz_cam, dx, dy, dz, yaw_cam] - bbox_cam3d = [float(center_c[0]), float(center_c[1]), float(center_c[2]), - float(dx), float(dy), float(dz), float(yaw_cam)] - - # Project 8 corners to get 2D bbox - corners_l = _box_corners(cx_l, cy_l, cz_l, dx, dy, dz, yaw_l) - corners_c = (R_l2c @ corners_l.T).T + t_l2c - corners_c = corners_c[corners_c[:, 2] > 0] - if len(corners_c) == 0: - continue - proj_h = (K @ corners_c.T).T - uvs = proj_h[:, :2] / proj_h[:, 2:3] - x1, y1 = uvs[:, 0].min(), uvs[:, 1].min() - x2, y2 = uvs[:, 0].max(), uvs[:, 1].max() - x1 = max(0, min(x1, W)); x2 = max(0, min(x2, W)) - y1 = max(0, min(y1, H)); y2 = max(0, min(y2, H)) - bw, bh = x2 - x1, y2 - y1 - if bw <= 0 or bh <= 0: - continue - - # Extract velocity from the instance dict - vx, vy = inst.get("velocity", [float("nan"), float("nan")]) - - ann = { - "file_name": cam_info["data_path"], - "image_id": img_token, - "area": float(bw * bh), - "category_name": cls, - "category_id": cls_id, - "bbox": [float(x1), float(y1), float(bw), float(bh)], - "iscrowd": 0, - "bbox_cam3d": bbox_cam3d, - "velo_cam3d": [vx, vy], - "center2d": [float(u), float(v), float(depth)], - "attribute_name": "", - "attribute_id": -1, - "id": ann_id, - } - annotations.append(ann) - ann_id += 1 - - return {"categories": categories, "images": images, "annotations": annotations} - - -def _box_corners(cx, cy, cz, dx, dy, dz, yaw) -> np.ndarray: - """Return 8 corners of a 3D box as (8,3) array.""" - hx, hy, hz = dx / 2, dy / 2, dz / 2 - corners = np.array([ - [ hx, hy, hz], [ hx, hy, -hz], [ hx, -hy, hz], [ hx, -hy, -hz], - [-hx, hy, hz], [-hx, hy, -hz], [-hx, -hy, hz], [-hx, -hy, -hz], - ], dtype=np.float32) - c, s = np.cos(yaw), np.sin(yaw) - Rz = np.array([[c, -s, 0], [s, c, 0], [0, 0, 1]], dtype=np.float32) - corners = (Rz @ corners.T).T + np.array([cx, cy, cz], dtype=np.float32) - return corners - - -# =========================================================================== -# Main -# =========================================================================== - -def main(): - parser = argparse.ArgumentParser(description="Convert private dataset to NuScenes-style PKL/JSON.") - parser.add_argument("--root", default="sample", help="Root folder of your dataset") - parser.add_argument("--out", default="my_dataset", help="Output directory") - parser.add_argument("--split", type=float, default=0.8, help="Train fraction (0-1)") - parser.add_argument("--tag", default="mydata", help="Output file prefix") - args = parser.parse_args() - - out_root = Path(args.out) - lidar_dir = out_root / "samples" / "LIDAR_TOP" - img_dir = out_root / "samples" - rel_lidar = "samples/LIDAR_TOP" - rel_img = "samples" - - print("Loading dataset ...") - ds = MyDataset(args.root) - - # Collect all (sample_id, frame_index) pairs - all_frames = [] - for sid in sorted(ds.sample_ids): - sample = ds.get_sample(sid) - for fidx in sorted(sample.frames.keys()): - all_frames.append((sid, fidx, sample.frames[fidx])) - - print(f"Total frames: {len(all_frames)}") - n_train = int(len(all_frames) * args.split) - train_frames = all_frames[:n_train] - val_frames = all_frames[n_train:] - - def convert_split(frames, split_name): - infos = [] - for sid, fidx, frame in tqdm(frames, desc=f"Converting {split_name}"): - info = frame_to_info( - frame, sid, - out_lidar_dir=str(lidar_dir), - out_img_dir=str(img_dir), - rel_lidar_root=rel_lidar, - rel_img_root=rel_img, - ) - infos.append(info) - - pkl_path = out_root / f"{args.tag}_infos_{split_name}.pkl" - json_path = out_root / f"{args.tag}_infos_{split_name}_mono3d.coco.json" - - with open(pkl_path, "wb") as f: - pickle.dump({ - "metainfo": { - "version": "custom-v1.0", - "classes": DETECTION_CLASSES, - }, - "data_list": infos, - }, f) - print(f"Saved {pkl_path} ({len(infos)} frames)") - - coco = build_coco_json(infos) - with open(json_path, "w", encoding="utf-8") as f: - json.dump(coco, f, ensure_ascii=False, indent=2) - print(f"Saved {json_path} ({len(coco['images'])} images, {len(coco['annotations'])} anns)") - - convert_split(train_frames, "train") - convert_split(val_frames, "val") - - # Write class list for reference - classes_path = out_root / f"{args.tag}_classes.txt" - classes_path.write_text("\n".join(DETECTION_CLASSES)) - print(f"\nDone. Output structure:\n {out_root}/") - print(f" ├── samples/LIDAR_TOP/*.bin") - print(f" ├── samples/CAM_FRONT/*.jpg (and other cameras)") - print(f" ├── {args.tag}_infos_train.pkl") - print(f" ├── {args.tag}_infos_val.pkl") - print(f" ├── {args.tag}_infos_train_mono3d.coco.json") - print(f" └── {args.tag}_infos_val_mono3d.coco.json") - - -if __name__ == "__main__": - main()