Upload files to "/"

This commit is contained in:
2026-03-12 15:51:21 +08:00
commit d6668beff5
2 changed files with 1102 additions and 0 deletions

730
convert_to_nuscenes.py Normal file
View File

@@ -0,0 +1,730 @@
"""
Convert private dataset to NuScenes-style .pkl and .coco.json files
compatible with MMDetection3D.
Coordinate system note
----------------------
Your dataset: X-right, Y-forward, Z-up (same as NuScenes LiDAR frame)
-> No rotation is needed for the point cloud.
-> lidar == ego == global (identity transforms, no vehicle odometry available)
Projection matrix note
----------------------
Your camera_raw stores a 3×4 matrix P = K @ [R | t]
where K is the 3×3 intrinsic and [R|t] maps points from the LiDAR frame
into the camera frame.
We decompose P via RQ-decomposition to recover K (intrinsic) and [R|t]
(lidar-to-camera extrinsic).
Usage
-----
python convert_to_nuscenes.py \
--root /path/to/your/dataset \
--out /path/to/output/dir \
--split 0.8 # fraction used for training (rest = val)
--tag mydata # prefix for output file names
"""
import os
import sys
import json
import uuid
import pickle
import argparse
import numpy as np
from pathlib import Path
from PIL import Image
from typing import Dict, List, Tuple, Optional
# ---------------------------------------------------------------------------
# Inline minimal copy of the dataset loader so the script is self-contained.
# If you already have my_dataset.py on PYTHONPATH you can remove this block
# and simply do: from my_dataset import MyDataset, SampleData, FrameData
# ---------------------------------------------------------------------------
import glob
import open3d as o3d
from dataclasses import dataclass, field
from tqdm import tqdm
@dataclass
class FrameData:
frame_index: int
lidar_points: np.ndarray # (N,3) float32
image_paths: Dict[str, str]
camera_raw: Dict[str, dict] # cam_name -> {projection_matrix, image_size}
labels: List[dict]
contain_labels: bool = False
@dataclass
class SampleData:
sample_id: str
frames: Dict[int, FrameData]
class MyDataset:
cam2filename = {
"front_120": "scanofilm_surround_front_120_8M.jpg",
"front_left_100": "scanofilm_surround_front_left_100_2M.jpg",
"front_right_100": "scanofilm_surround_front_right_100_2M.jpg",
"rear_100": "scanofilm_surround_rear_100_2M.jpg",
"rear_left_100": "scanofilm_surround_rear_left_100_2M.jpg",
"rear_right_100": "scanofilm_surround_rear_right_100_2M.jpg",
}
color2class = {
"#5414ED": "car", "#F6EE64": "pick-up-truck", "#F6A087": "small-truck",
"#BC4EF1": "truck", "#4E9AF1": "bus", "#F1A94E": "special-vehicle",
"#E1DFDD": "ignore", "#F91906": "tricyclist-withrider",
"#FA5F51": "tricyclist-withoutrider", "#B8CB30": "bicycle-withrider",
"#E6FD4E": "bicycle-withoutrider", "#876363": "people",
"#2CBDF5": "crowd-people", "#C9F52C": "crowd-bicycle",
"#DC6788": "crowd-car", "#6EC913": "traffic-cone",
"#0DDE69": "plastic-barrier", "#8260D2": "crash-barrels",
"#F1D1D1": "warning-triangle", "#FE6DF4": "crowd-traffic-cone",
"#D1AA35": "crowd-plastic-barrier", "#3BE8D0": "crowd-crash-barrels",
"#2B7567": "crowd-warning-triangle",
}
def __init__(self, root_folder: str):
self.root = root_folder
self.class2color = {v: k for k, v in self.color2class.items()}
self._samples: Dict[str, SampleData] = {}
self._load_all_samples()
@staticmethod
def _extract_id(folder_name: str) -> str:
return ''.join(c for c in os.path.basename(folder_name) if c.isdigit())
def _find_sample_folders(self):
return [f for f in glob.glob(os.path.join(self.root, "sample*"))
if os.path.isdir(f)]
def _find_paths_in_sample(self, sample_folder, sample_id):
pcd_folder = os.path.join(sample_folder, f"pcd_sequence{sample_id}")
pcd_files = glob.glob(os.path.join(pcd_folder, "*.pcd")) if os.path.isdir(pcd_folder) else []
img_folders = [os.path.join(sample_folder, str(i)) for i in range(len(pcd_files))]
return {
"pcd_folder": pcd_folder,
"img_folders": img_folders,
"page_json": os.path.join(sample_folder, f"test{sample_id}.json"),
"mark_json": os.path.join(sample_folder, f"test{sample_id}-mark.json"),
}
def _read_lidar_points(self, pcd_path):
try:
pcd = o3d.io.read_point_cloud(pcd_path, remove_nan_points=True,
remove_infinite_points=True, format="auto")
return np.asarray(pcd.points, dtype=np.float32)
except Exception as e:
print(f"Error loading {pcd_path}: {e}")
return None
def _load_all_lidar_frames(self, pcd_folder):
out = {}
for f in glob.glob(os.path.join(pcd_folder, "*.pcd")):
idx = int(os.path.basename(f).split(".")[0].split("n")[-1])
pts = self._read_lidar_points(f)
if pts is not None:
out[idx] = pts
return out
def _load_image_paths(self, img_folders):
out = {}
for folder in img_folders:
if not os.path.isdir(folder):
continue
idx = int(os.path.basename(folder))
paths = {cam: os.path.join(folder, fname)
for cam, fname in self.cam2filename.items()
if os.path.isfile(os.path.join(folder, fname))}
if paths:
out[idx] = paths
return out
def _load_camera_raw(self, page_json_path):
if not os.path.isfile(page_json_path):
return {}
with open(page_json_path, "r", encoding="utf-8") as f:
data = json.load(f)
try:
extend_source = data['data']['files'][0]['extendSources']
except (KeyError, IndexError):
return {}
out = {}
for d in extend_source:
pe = d.get('pageElement', {})
if "sensor" not in pe:
continue
try:
idx = int(d['fileName'].split('.')[0])
except Exception:
continue
cam_name = pe['sensor'].replace("ofilm_surround_", "")
for suf in ["_2M", "_8M"]:
if cam_name.endswith(suf):
cam_name = cam_name[:-len(suf)]
break
if cam_name not in self.cam2filename:
continue
matrix = pe.get('mtx', [])
image_size = pe.get('imageSize', [])
if not matrix or not image_size:
continue
matrix = np.array(matrix, dtype=np.float64)
if matrix.shape != (3, 4):
continue
out.setdefault(idx, {})[cam_name] = {
"projection_matrix": matrix,
"image_size": image_size,
}
return out
def _extract_text_label(self, json_path):
with open(json_path, 'r', encoding='utf-8') as f:
data = json.load(f)['data']['list'][0]
idx2rate = {k: float(v['accuracy'].replace('%', '')) / 100.0
for k, v in data['rate'].items()}
max_idx = max(idx2rate, key=idx2rate.get)
if idx2rate[max_idx] == 1.0:
return data['result'][max_idx]
return []
def _process_text_label(self, json_path):
label_list = self._extract_text_label(json_path)
if not label_list:
return {}
out = {}
for entry in label_list:
out[entry['index']] = self._process_frame_labels(entry['value'])
return out
def _process_frame_labels(self, label_list):
labels = []
for lb in label_list:
nl = {
'num': lb['num'],
'class': lb['label']['class-name'],
'is_moving': not bool(lb['label']['static']),
'isolation': bool(lb['label']['isolation']),
'color': self.class2color.get(lb['label']['class-name'], ''),
'points': [[float(lb['newPoints'][i]['x']),
float(lb['newPoints'][i]['y']),
float(lb['newPoints'][i]['z'])] for i in range(8)],
'center': [float(lb['x']), float(lb['y']), float(lb['z'])],
'rotateZ': float(lb['rotateZ']),
'dx': float(lb['width']),
'dy': float(lb['height']),
'dz': float(lb['depth']),
}
labels.append(nl)
return labels
def _load_one_sample(self, sample_folder):
sid = self._extract_id(sample_folder)
paths = self._find_paths_in_sample(sample_folder, sid)
lidar_dict = self._load_all_lidar_frames(paths["pcd_folder"])
img_dict = self._load_image_paths(paths["img_folders"])
cam_dict = self._load_camera_raw(paths["page_json"])
label_dict = self._process_text_label(paths["mark_json"])
if not (len(label_dict) == len(lidar_dict) == len(img_dict) == len(cam_dict)):
print(f"Sample {sid}: frame count mismatch, skipping.")
return None
frames = {idx: FrameData(
frame_index=idx,
lidar_points=lidar_dict[idx],
image_paths=img_dict[idx],
camera_raw=cam_dict[idx],
labels=label_dict[idx],
contain_labels=len(label_dict[idx]) > 0,
) for idx in range(len(lidar_dict))}
return SampleData(sample_id=sid, frames=frames) if frames else None
def _load_all_samples(self):
for sf in tqdm(self._find_sample_folders(), desc="Loading samples"):
s = self._load_one_sample(sf)
if s is not None:
self._samples[s.sample_id] = s
print(f"Loaded {len(self._samples)} samples.")
@property
def sample_ids(self):
return list(self._samples.keys())
def get_sample(self, sid):
return self._samples.get(sid)
def __len__(self):
return len(self._samples)
# ===========================================================================
# Helper math utilities
# ===========================================================================
def rq_decompose(P: np.ndarray) -> Tuple[np.ndarray, np.ndarray, np.ndarray]:
"""
Decompose a 3×4 projection matrix P = K @ [R | t]
Returns K (3×3 upper-triangular intrinsic), R (3×3 rotation), t (3,).
"""
M = P[:, :3] # 3×3 left block
# RQ decomposition via QR on the transpose
Q, R = np.linalg.qr(np.linalg.inv(M).T)
R_mat = np.linalg.inv(R.T) # upper triangular -> intrinsic
Q_mat = Q.T # rotation
# Enforce positive diagonal on K
signs = np.sign(np.diag(R_mat))
signs[signs == 0] = 1.0
D = np.diag(signs)
R_mat = D @ R_mat
Q_mat = D @ Q_mat
# Recover t
t = np.linalg.inv(R_mat) @ P[:, 3]
return R_mat, Q_mat, t
def rotation_matrix_to_quaternion(R: np.ndarray) -> List[float]:
"""Convert 3×3 rotation matrix to quaternion [w, x, y, z]."""
trace = R[0, 0] + R[1, 1] + R[2, 2]
if trace > 0:
s = 0.5 / np.sqrt(trace + 1.0)
w = 0.25 / s
x = (R[2, 1] - R[1, 2]) * s
y = (R[0, 2] - R[2, 0]) * s
z = (R[1, 0] - R[0, 1]) * s
elif R[0, 0] > R[1, 1] and R[0, 0] > R[2, 2]:
s = 2.0 * np.sqrt(1.0 + R[0, 0] - R[1, 1] - R[2, 2])
w = (R[2, 1] - R[1, 2]) / s
x = 0.25 * s
y = (R[0, 1] + R[1, 0]) / s
z = (R[0, 2] + R[2, 0]) / s
elif R[1, 1] > R[2, 2]:
s = 2.0 * np.sqrt(1.0 + R[1, 1] - R[0, 0] - R[2, 2])
w = (R[0, 2] - R[2, 0]) / s
x = (R[0, 1] + R[1, 0]) / s
y = 0.25 * s
z = (R[1, 2] + R[2, 1]) / s
else:
s = 2.0 * np.sqrt(1.0 + R[2, 2] - R[0, 0] - R[1, 1])
w = (R[1, 0] - R[0, 1]) / s
x = (R[0, 2] + R[2, 0]) / s
y = (R[1, 2] + R[2, 1]) / s
z = 0.25 * s
return [w, x, y, z]
def identity_quaternion() -> List[float]:
return [1.0, 0.0, 0.0, 0.0]
def project_3d_to_2d(points_3d: np.ndarray, P: np.ndarray) -> np.ndarray:
"""
Project Nx3 LiDAR points to image using 3×4 P.
Returns Nx2 pixel coordinates; filters out points behind camera.
"""
N = points_3d.shape[0]
pts_h = np.hstack([points_3d, np.ones((N, 1))]) # Nx4
proj = (P @ pts_h.T).T # Nx3
valid = proj[:, 2] > 0
uv = proj[valid, :2] / proj[valid, 2:3]
return uv, valid
def count_points_in_box(points: np.ndarray, cx, cy, cz, dx, dy, dz, yaw) -> int:
"""Count LiDAR points inside an axis-aligned-then-rotated bounding box."""
cos_y, sin_y = np.cos(-yaw), np.sin(-yaw)
R_inv = np.array([[cos_y, -sin_y, 0],
[sin_y, cos_y, 0],
[0, 0, 1]], dtype=np.float32)
pts_centered = points - np.array([cx, cy, cz], dtype=np.float32)
pts_rot = (R_inv @ pts_centered.T).T
mask = ((np.abs(pts_rot[:, 0]) <= dx / 2) &
(np.abs(pts_rot[:, 1]) <= dy / 2) &
(np.abs(pts_rot[:, 2]) <= dz / 2))
return int(mask.sum())
# ---------------------------------------------------------------------------
# NuScenes camera name mapping
# ---------------------------------------------------------------------------
CAM_NAME_MAP = {
"front_120": "CAM_FRONT",
"front_left_100": "CAM_FRONT_LEFT",
"front_right_100": "CAM_FRONT_RIGHT",
"rear_100": "CAM_BACK",
"rear_left_100": "CAM_BACK_LEFT",
"rear_right_100": "CAM_BACK_RIGHT",
}
# ---------------------------------------------------------------------------
# Class mapping: private dataset labels -> NuScenes labels
# ---------------------------------------------------------------------------
PRIVATE2NUSCENES = {
"car": "car",
"pick-up-truck": "car",
"small-truck": "truck",
"truck": "truck",
"bus": "bus",
"ignore": "car",
"special-vehicle": "construction_vehicle",
"tricyclist-withrider": "bicycle",
"tricyclist-withoutrider":"bicycle",
"bicycle-withrider": "bicycle",
"bicycle-withoutrider": "bicycle",
"people": "pedestrian",
"crowd-people": "pedestrian",
"crowd-bicycle": "bicycle",
"crowd-car": "car",
"traffic-cone": "traffic_cone",
"plastic-barrier": "barrier",
"crash-barrels": "barrier",
"warning-triangle": "barrier",
"crowd-traffic-cone": "barrier",
"crowd-plastic-barrier": "barrier",
"crowd-crash-barrels": "barrier",
"crowd-warning-triangle": "barrier",
}
# Official NuScenes detection classes (in canonical order)
DETECTION_CLASSES = [
"car", "truck", "bus", "construction_vehicle", "motorcycle",
"bicycle", "pedestrian", "traffic_cone", "barrier", "trailer",
]
CLASS2ID = {c: i for i, c in enumerate(DETECTION_CLASSES)}
# ===========================================================================
# Core conversion
# ===========================================================================
def frame_to_info(frame: FrameData, sample_id: str, out_lidar_dir: str,
out_img_dir: str, rel_lidar_root: str, rel_img_root: str) -> dict:
"""
Convert one FrameData into a NuScenes-style info dict.
Since we have no vehicle poses, we treat lidar = ego = global (identity).
"""
token = str(uuid.uuid4())
# ---- Save point cloud as .bin (float32 x,y,z) -------------------------
os.makedirs(out_lidar_dir, exist_ok=True)
bin_name = f"{sample_id}_frame{frame.frame_index:04d}.bin"
bin_path = os.path.join(out_lidar_dir, bin_name)
frame.lidar_points.astype(np.float32).tofile(bin_path)
lidar_path_rel = os.path.join(rel_lidar_root, bin_name)
# ---- Identity ego/global poses ----------------------------------------
zero3 = [0.0, 0.0, 0.0]
id_quat = identity_quaternion()
# ---- Camera info -------------------------------------------------------
cams_info = {}
target_W, target_H = 1920, 1280
for cam_key, nus_cam in CAM_NAME_MAP.items():
if cam_key not in frame.camera_raw:
continue
cam_data = frame.camera_raw[cam_key]
P = cam_data["projection_matrix"] # 3×4
ori_W, ori_H = cam_data["image_size"]
# Decompose P -> K (3×3 intrinsic), R_cam_from_lidar, t_cam_from_lidar
K, R_c2l_inv, t_in_cam = rq_decompose(P)
# R_c2l_inv is R such that x_cam = R @ x_lidar + t
R_lidar2cam = R_c2l_inv # camera_from_lidar rotation
t_lidar2cam = t_in_cam # translation in camera coords
# sensor2lidar = inverse of lidar2camera
R_cam2lidar = R_lidar2cam.T
t_cam2lidar = -R_cam2lidar @ t_lidar2cam
# adjust intrinsic matrix
sx = target_W / ori_W
sy = target_H / ori_H
K_adjusted = K.copy()
K_adjusted[0, 0] *= sx
K_adjusted[1, 1] *= sy
K_adjusted[0, 2] *= sx
K_adjusted[1, 2] *= sy
# Copy image to output directory
src_img = frame.image_paths[cam_key]
cam_img_dir = os.path.join(out_img_dir, nus_cam)
os.makedirs(cam_img_dir, exist_ok=True)
img_name = f"{sample_id}_frame{frame.frame_index:04d}.jpg"
dst_img = os.path.join(cam_img_dir, img_name)
img = Image.open(src_img)
img_resized = img.resize((target_W, target_H), Image.LANCZOS)
img_resized.save(dst_img)
img_path_rel = os.path.join(rel_img_root, nus_cam, img_name)
cam_token = str(uuid.uuid4())
cams_info[nus_cam] = {
"data_path": img_path_rel,
"type": nus_cam,
"sample_data_token": cam_token,
# camera -> ego (treat camera as fixed to lidar frame via R_cam2lidar)
"sensor2ego_translation": t_cam2lidar.tolist(),
"sensor2ego_rotation": rotation_matrix_to_quaternion(R_cam2lidar),
# ego -> global (identity)
"ego2global_translation": zero3,
"ego2global_rotation": id_quat,
# lidar -> camera extrinsic (for projection)
"sensor2lidar_translation": t_cam2lidar.tolist(),
"sensor2lidar_rotation": rotation_matrix_to_quaternion(R_cam2lidar),
# intrinsic
"cam_intrinsic": K_adjusted.tolist(),
"width": target_W,
"height": target_H,
"timestamp": frame.frame_index * 100000, # synthetic
}
# ---- Annotations -------------------------------------------------------
instances = []
for lb in frame.labels:
raw_cls = lb.get("class", "")
cls = PRIVATE2NUSCENES.get(raw_cls) # remap to NuScenes name
if cls is None or cls not in CLASS2ID:
continue
cx, cy, cz = lb["center"]
dx, dy, dz = lb["dx"], lb["dy"], lb["dz"]
yaw = lb["rotateZ"]
n_pts = count_points_in_box(frame.lidar_points, cx, cy, cz, dx, dy, dz, yaw)
instances.append({
"bbox_3d": [cx, cy, cz, dx, dy, dz, yaw],
"bbox_label_3d": CLASS2ID[cls],
"bbox": [0.0, 0.0, 1.0, 1.0], # no 2D GT available
"bbox_label": CLASS2ID[cls],
"velocity": [float("nan"), float("nan")],
"num_lidar_pts": n_pts,
"bbox_3d_isvalid": n_pts > 0,
})
info = {
"lidar_points": {
"lidar_path": lidar_path_rel,
"num_pts_feats": 3, # X, Y, Z only
},
"token": token,
"sweeps": [], # no sweep data available
"cams": cams_info,
# lidar == ego (identity)
"lidar2ego_translation": zero3,
"lidar2ego_rotation": id_quat,
# ego == global (identity)
"ego2global_translation": zero3,
"ego2global_rotation": id_quat,
"timestamp": frame.frame_index * 100000,
# annotations
"instances": instances,
}
return info
def build_coco_json(infos: List[dict]) -> dict:
"""Build a COCO-style mono3d JSON from NuScenes info dicts."""
categories = [{"id": i, "name": c} for i, c in enumerate(DETECTION_CLASSES)]
images = []
annotations = []
ann_id = 0
for info in infos:
frame_token = info["token"]
for nus_cam, cam_info in info["cams"].items():
img_token = cam_info["sample_data_token"]
W = cam_info["width"]
H = cam_info["height"]
K = np.array(cam_info["cam_intrinsic"]) # 3×3
img_entry = {
"file_name": cam_info["data_path"],
"id": img_token,
"token": frame_token,
"cam2ego_rotation": cam_info["sensor2ego_rotation"],
"cam2ego_translation": cam_info["sensor2ego_translation"],
"ego2global_rotation": info["ego2global_rotation"],
"ego2global_translation": info["ego2global_translation"],
"cam_intrinsic": cam_info["cam_intrinsic"],
"width": W,
"height": H,
}
images.append(img_entry)
# Build sensor2lidar matrix for projecting boxes
t_c2l = np.array(cam_info["sensor2lidar_translation"])
quat = cam_info["sensor2lidar_rotation"] # [w,x,y,z]
w, x, y, z = quat
R_c2l = np.array([
[1-2*(y*y+z*z), 2*(x*y-z*w), 2*(x*z+y*w)],
[2*(x*y+z*w), 1-2*(x*x+z*z), 2*(y*z-x*w)],
[2*(x*z-y*w), 2*(y*z+x*w), 1-2*(x*x+y*y)],
])
R_l2c = R_c2l.T
t_l2c = -R_l2c @ t_c2l
# Iterate over the valid "instances" key generated by frame_to_info
for inst in info.get("instances", []):
cls_id = inst["bbox_label_3d"]
if cls_id < 0 or cls_id >= len(DETECTION_CLASSES):
continue
cls = DETECTION_CLASSES[cls_id]
cx_l, cy_l, cz_l, dx, dy, dz, yaw_l = inst["bbox_3d"]
# Transform center to camera frame
center_l = np.array([cx_l, cy_l, cz_l])
center_c = R_l2c @ center_l + t_l2c
depth = center_c[2]
if depth <= 0:
continue # box behind camera
# Project 3D center to 2D
center_2d = K @ center_c
u = center_2d[0] / center_2d[2]
v = center_2d[1] / center_2d[2]
# Yaw in camera frame (approx: only rotate around Z in lidar)
yaw_cam = yaw_l
# 3D box annotation: [cx_cam, cy_cam, cz_cam, dx, dy, dz, yaw_cam]
bbox_cam3d = [float(center_c[0]), float(center_c[1]), float(center_c[2]),
float(dx), float(dy), float(dz), float(yaw_cam)]
# Project 8 corners to get 2D bbox
corners_l = _box_corners(cx_l, cy_l, cz_l, dx, dy, dz, yaw_l)
corners_c = (R_l2c @ corners_l.T).T + t_l2c
corners_c = corners_c[corners_c[:, 2] > 0]
if len(corners_c) == 0:
continue
proj_h = (K @ corners_c.T).T
uvs = proj_h[:, :2] / proj_h[:, 2:3]
x1, y1 = uvs[:, 0].min(), uvs[:, 1].min()
x2, y2 = uvs[:, 0].max(), uvs[:, 1].max()
x1 = max(0, min(x1, W)); x2 = max(0, min(x2, W))
y1 = max(0, min(y1, H)); y2 = max(0, min(y2, H))
bw, bh = x2 - x1, y2 - y1
if bw <= 0 or bh <= 0:
continue
# Extract velocity from the instance dict
vx, vy = inst.get("velocity", [float("nan"), float("nan")])
ann = {
"file_name": cam_info["data_path"],
"image_id": img_token,
"area": float(bw * bh),
"category_name": cls,
"category_id": cls_id,
"bbox": [float(x1), float(y1), float(bw), float(bh)],
"iscrowd": 0,
"bbox_cam3d": bbox_cam3d,
"velo_cam3d": [vx, vy],
"center2d": [float(u), float(v), float(depth)],
"attribute_name": "",
"attribute_id": -1,
"id": ann_id,
}
annotations.append(ann)
ann_id += 1
return {"categories": categories, "images": images, "annotations": annotations}
def _box_corners(cx, cy, cz, dx, dy, dz, yaw) -> np.ndarray:
"""Return 8 corners of a 3D box as (8,3) array."""
hx, hy, hz = dx / 2, dy / 2, dz / 2
corners = np.array([
[ hx, hy, hz], [ hx, hy, -hz], [ hx, -hy, hz], [ hx, -hy, -hz],
[-hx, hy, hz], [-hx, hy, -hz], [-hx, -hy, hz], [-hx, -hy, -hz],
], dtype=np.float32)
c, s = np.cos(yaw), np.sin(yaw)
Rz = np.array([[c, -s, 0], [s, c, 0], [0, 0, 1]], dtype=np.float32)
corners = (Rz @ corners.T).T + np.array([cx, cy, cz], dtype=np.float32)
return corners
# ===========================================================================
# Main
# ===========================================================================
def main():
parser = argparse.ArgumentParser(description="Convert private dataset to NuScenes-style PKL/JSON.")
parser.add_argument("--root", default="sample", help="Root folder of your dataset")
parser.add_argument("--out", default="my_dataset", help="Output directory")
parser.add_argument("--split", type=float, default=0.8, help="Train fraction (0-1)")
parser.add_argument("--tag", default="mydata", help="Output file prefix")
args = parser.parse_args()
out_root = Path(args.out)
lidar_dir = out_root / "samples" / "LIDAR_TOP"
img_dir = out_root / "samples"
rel_lidar = "samples/LIDAR_TOP"
rel_img = "samples"
print("Loading dataset ...")
ds = MyDataset(args.root)
# Collect all (sample_id, frame_index) pairs
all_frames = []
for sid in sorted(ds.sample_ids):
sample = ds.get_sample(sid)
for fidx in sorted(sample.frames.keys()):
all_frames.append((sid, fidx, sample.frames[fidx]))
print(f"Total frames: {len(all_frames)}")
n_train = int(len(all_frames) * args.split)
train_frames = all_frames[:n_train]
val_frames = all_frames[n_train:]
def convert_split(frames, split_name):
infos = []
for sid, fidx, frame in tqdm(frames, desc=f"Converting {split_name}"):
info = frame_to_info(
frame, sid,
out_lidar_dir=str(lidar_dir),
out_img_dir=str(img_dir),
rel_lidar_root=rel_lidar,
rel_img_root=rel_img,
)
infos.append(info)
pkl_path = out_root / f"{args.tag}_infos_{split_name}.pkl"
json_path = out_root / f"{args.tag}_infos_{split_name}_mono3d.coco.json"
with open(pkl_path, "wb") as f:
pickle.dump({
"metainfo": {
"version": "custom-v1.0",
"classes": DETECTION_CLASSES,
},
"data_list": infos,
}, f)
print(f"Saved {pkl_path} ({len(infos)} frames)")
coco = build_coco_json(infos)
with open(json_path, "w", encoding="utf-8") as f:
json.dump(coco, f, ensure_ascii=False, indent=2)
print(f"Saved {json_path} ({len(coco['images'])} images, {len(coco['annotations'])} anns)")
convert_split(train_frames, "train")
convert_split(val_frames, "val")
# Write class list for reference
classes_path = out_root / f"{args.tag}_classes.txt"
classes_path.write_text("\n".join(DETECTION_CLASSES))
print(f"\nDone. Output structure:\n {out_root}/")
print(f" ├── samples/LIDAR_TOP/*.bin")
print(f" ├── samples/CAM_FRONT/*.jpg (and other cameras)")
print(f" ├── {args.tag}_infos_train.pkl")
print(f" ├── {args.tag}_infos_val.pkl")
print(f" ├── {args.tag}_infos_train_mono3d.coco.json")
print(f" └── {args.tag}_infos_val_mono3d.coco.json")
if __name__ == "__main__":
main()

372
my_dataset.py Normal file
View File

@@ -0,0 +1,372 @@
import os
import glob
import json
import numpy as np
import open3d as o3d
from typing import Dict, List, Optional, Union
from dataclasses import dataclass, field
from tqdm import tqdm
@dataclass
class FrameData:
"""单帧原始数据,完全未经过数学转换"""
frame_index: int # 帧编号从0开始
lidar_points: np.ndarray # (N, 3) float32原始点云坐标
image_paths: Dict[str, str] # 视角名 -> 图像路径
camera_raw: Dict[str, dict] # 视角名称 -> {
# "projection_matrix": List[float], # 4x4 投影矩阵
# "image_size": List[int] # [width, height]
# }
labels: List[dict] # 原始3D框标注每个框包含
# - "num": int # 框ID
# - "color": str
# - "class": str # 原始类别名
# - "points": List[List[float]] # 8个点的xyz坐标
# - "center": List[float] # [x,y,z]
# - "rotateZ": float # 绕Z轴旋转角弧度
# # 正方向从X正半轴转向Y正半轴逆时针
# - "dx": float
# - "dy": float
# - "dz": float
contain_labels: bool = False # 是否包含标注框
@dataclass
class SampleData:
"""一个 sample 文件夹包含的全部帧"""
sample_id: str # 数字ID
frames: Dict[int, FrameData] # 帧索引 -> FrameData
class MyDataset:
f"""
私有数据集加载。点云坐标系世界坐标系Y轴正方向为前X轴正方向为右Z轴正方向为上。
数据组织规范:
root_folder/
sample{id}/
├── pcd_sequence{id}/ # 点云帧
├── 0/, 1/, .../ # 各帧图像
├── test{id}.json # 相机参数
└── test{id}-mark.json # 3D标注
"""
def __init__(self, root_folder: str):
self.root = root_folder
self._samples: Dict[str, SampleData] = {}
self.color2class = {
"#5414ED": "car",
"#F6EE64": "pick-up-truck",
"#F6A087": "small-truck",
"#BC4EF1": "truck",
"#4E9AF1": "bus",
"#F1A94E": "special-vehicle",
"#E1DFDD": "ignore",
"#F91906": "tricyclist-withrider",
"#FA5F51": "tricyclist-withoutrider",
"#B8CB30": "bicycle-withrider",
"#E6FD4E": "bicycle-withoutrider",
"#876363": "people",
"#2CBDF5": "crowd-people",
"#C9F52C": "crowd-bicycle",
"#DC6788": "crowd-car",
"#6EC913": "traffic-cone",
"#0DDE69": "plastic-barrier",
"#8260D2": "crash-barrels",
"#F1D1D1": "warning-triangle",
"#FE6DF4": "crowd-traffic-cone",
"#D1AA35": "crowd-plastic-barrier",
"#3BE8D0": "crowd-crash-barrels",
"#2B7567": "crowd-warning-triangle"
}
self.class2color = {cls: color for color, cls in self.color2class.items()}
self.cam2filename = {
"front_120": "scanofilm_surround_front_120_8M.jpg",
"front_left_100": "scanofilm_surround_front_left_100_2M.jpg",
"front_right_100": "scanofilm_surround_front_right_100_2M.jpg",
"rear_100": "scanofilm_surround_rear_100_2M.jpg",
"rear_left_100": "scanofilm_surround_rear_left_100_2M.jpg",
"rear_right_100": "scanofilm_surround_rear_right_100_2M.jpg",
}
self._load_all_samples()
# ------------------------------------------------------------------
# 路径与ID辅助方法
# ------------------------------------------------------------------
@staticmethod
def _extract_id(folder_name: str) -> str:
"""从文件夹名提取连续数字作为sample_id"""
return ''.join(c for c in os.path.basename(folder_name) if c.isdigit())
def _find_sample_folders(self) -> List[str]:
"""返回所有 sample{id} 文件夹路径"""
pattern = os.path.join(self.root, "sample*")
return [f for f in glob.glob(pattern) if os.path.isdir(f)]
def _find_paths_in_sample(self, sample_folder: str, sample_id: str) -> dict:
"""返回 sample 内部各资源路径字典"""
pcd_folder = os.path.join(sample_folder, f"pcd_sequence{sample_id}")
img_folders = []
if os.path.isdir(pcd_folder):
pcd_files = glob.glob(os.path.join(pcd_folder, "*.pcd"))
num_frames = len(pcd_files)
img_folders = [os.path.join(sample_folder, str(i)) for i in range(num_frames)]
page_json = os.path.join(sample_folder, f"test{sample_id}.json")
mark_json = os.path.join(sample_folder, f"test{sample_id}-mark.json")
return {
"pcd_folder": pcd_folder,
"img_folders": img_folders,
"page_json": page_json,
"mark_json": mark_json
}
# ------------------------------------------------------------------
# 原始数据读取(无转换)
# ------------------------------------------------------------------
def _read_lidar_points(self, pcd_path: str) -> np.ndarray:
try:
# Convert path to bytes for better Chinese path support on Windows
pcd_path_bytes = pcd_path.encode('utf-8') if isinstance(pcd_path, str) else pcd_path
pcd = o3d.io.read_point_cloud(
pcd_path_bytes,
remove_nan_points=True,
remove_infinite_points=True,
format="auto"
)
pcd_np = np.asarray(pcd.points, dtype=np.float32) # [N, 3]
return pcd_np
except Exception as e:
print(f"Error loading point cloud {pcd_path}: {e}")
return None
def _load_all_lidar_frames(self, pcd_folder: str) -> Dict[int, np.ndarray]:
idx2pcd_np = {}
pcd_files = glob.glob(os.path.join(pcd_folder, "*.pcd"))
for pcd_file in pcd_files:
idx = int(os.path.basename(pcd_file).split(".")[0].split("n")[-1])
pcd_np = self._read_lidar_points(pcd_file)
if pcd_np is not None:
idx2pcd_np[idx] = pcd_np
return idx2pcd_np
def _load_image_paths(self, img_folders: List[str]) -> Dict[int, Dict[str, str]]:
"""返回 帧索引 -> {视角名: 图像路径}"""
idx2imgs = {}
for folder in img_folders:
if not os.path.isdir(folder):
continue
idx = int(os.path.basename(folder))
paths = {}
for cam, fname in self.cam2filename.items():
full_path = os.path.join(folder, fname)
assert os.path.isfile(full_path), f"Image file not found: {full_path}"
paths[cam] = full_path
if paths:
idx2imgs[idx] = paths
return idx2imgs
def _load_camera_raw(self, page_json_path: str) -> Dict[int, Dict[str, dict]]:
"""返回 帧索引 -> {视角名: {"projection_matrix": list, "image_size": list}}"""
if not os.path.isfile(page_json_path):
return {}
with open(page_json_path, "r", encoding="utf-8") as f:
data = json.load(f)
try:
extend_source = data['data']['files'][0]['extendSources']
except (KeyError, IndexError):
return {}
idx2cam = {}
for d in extend_source:
page_element = d.get('pageElement', {})
if "sensor" not in page_element:
continue
# 提取帧索引
try:
idx = int(d['fileName'].split('.')[0])
except:
continue
sensor = page_element['sensor']
cam_name = sensor.replace("ofilm_surround_", "")
for suffix in ["_2M", "_8M"]:
if cam_name.endswith(suffix):
cam_name = cam_name[:-len(suffix)]
break
if cam_name not in self.cam2filename:
continue
matrix = page_element.get('mtx', [])
image_size = page_element.get('imageSize', [])
if not matrix or not image_size:
continue
matrix = np.array(matrix, dtype=np.float64)
assert matrix.shape == (3, 4), f"Invalid projection matrix shape: {matrix.shape}"
cam_info = {
"projection_matrix": matrix,
"image_size": image_size
}
if idx not in idx2cam:
idx2cam[idx] = {}
idx2cam[idx][cam_name] = cam_info
return idx2cam
def extract_text_label_from_json(self, json_path: str) -> List[Dict]:
with open(json_path, 'r', encoding='utf-8') as f:
data = json.load(f)['data']['list'][0]
type_dict = data['rate']
idx2rate = {}
for key, value in type_dict.items():
float_value = float(value['accuracy'].replace('%', '')) / 100.0
idx2rate[key] = float_value
# 找出最大准确率对应的索引
max_idx = max(idx2rate, key=idx2rate.get)
max_rate = idx2rate[max_idx]
if max_rate == 1.0:
result = data['result']
label_list = result[max_idx]
return label_list
else:
return []
def process_text_label(self, json_path: str):
label_list = self.extract_text_label_from_json(json_path)
if len(label_list) == 0:
return {}
idx2label: Dict[int, List[Dict]] = {}
for label in label_list:
idx2label[label['index']] = self.process_label_for_one_frame(label['value'])
return idx2label
def process_label_for_one_frame(self, label_list: List[Dict]) -> List[Dict]:
if len(label_list) == 0:
return []
# there could be multiple 3D bounding boxes in one frame
labels = []
for label in label_list:
new_label = {}
# num is the id of the 3D bounding box
new_label['num'] = label['num']
# label
new_label['label'] = label['label']
# class
new_label['class'] = label['label']['class-name']
# is_moving
new_label['is_moving'] = not bool(label['label']['static'])
# isolation
# True: 被建筑物,护栏,绿化带等隔离开
# False: 可⾏驶区域
new_label['isolation'] = bool(label['label']['isolation'])
# color
new_label['color'] = self.class2color[new_label['class']]
# corners
assert len(label['newPoints']) == 8, f"Points number for one 3D bounding box is 8, but got {len(label['newPoints'])}."
eight_points = []
for i in range(8):
eight_points.append(
[
float(label['newPoints'][i]['x']),
float(label['newPoints'][i]['y']),
float(label['newPoints'][i]['z'])
]
)
new_label['points'] = eight_points
# center
new_label['center'] = [
float(label['x']),
float(label['y']),
float(label['z'])
]
# rotate - 只保留绕Z轴的旋转
# 单位:弧度
# 正方向从X轴正半轴转向Y轴正半轴逆时针
new_label['rotateZ'] = float(label['rotateZ'])
new_label['dx'] = float(label['width'])
new_label['dy'] = float(label['height'])
new_label['dz'] = float(label['depth'])
labels.append(new_label)
return labels
# ------------------------------------------------------------------
# Sample 级组装
# ------------------------------------------------------------------
def _load_one_sample(self, sample_folder: str) -> Optional[SampleData]:
"""加载单个 sample 文件夹,返回 SampleData 对象"""
sample_id = self._extract_id(sample_folder)
paths = self._find_paths_in_sample(sample_folder, sample_id)
# 读取各模态原始数据
lidar_dict = self._load_all_lidar_frames(paths["pcd_folder"])
img_dict = self._load_image_paths(paths["img_folders"])
cam_dict = self._load_camera_raw(paths["page_json"])
label_dict = self.process_text_label(paths["mark_json"])
if not (len(label_dict) == len(lidar_dict) == len(img_dict) == len(cam_dict)):
print(f"Sample {sample_id}: Mismatch in frame counts between lidar, images, cameras, and labels.")
return None
frames = {}
for idx in range(len(lidar_dict)):
frames[idx] = FrameData(
frame_index=idx,
lidar_points=lidar_dict[idx],
image_paths=img_dict[idx],
camera_raw=cam_dict[idx],
labels=label_dict[idx],
contain_labels=len(label_dict[idx]) > 0
)
if not frames:
return None
return SampleData(sample_id=sample_id, frames=frames)
def _load_all_samples(self):
"""遍历所有 sample 文件夹并加载"""
sample_folders = self._find_sample_folders()
process_bar = tqdm(sample_folders, desc="Loading samples")
for sf in process_bar:
sample = self._load_one_sample(sf)
if sample is not None:
self._samples[sample.sample_id] = sample
print(f"Loaded {len(self._samples)} samples.")
# ------------------------------------------------------------------
# 公开接口
# ------------------------------------------------------------------
@property
def sample_ids(self) -> List[str]:
return list(self._samples.keys())
def get_sample(self, sample_id: str) -> Optional[SampleData]:
return self._samples.get(sample_id)
def __len__(self) -> int:
return len(self._samples)
def __getitem__(self, idx: int) -> SampleData:
"""通过整数索引访问 sample按sample_id排序"""
sids = sorted(self.sample_ids)
if idx < 0 or idx >= len(sids):
raise IndexError(f"Index {idx} out of range [0, {len(sids)-1}]")
return self._samples[sids[idx]]
if __name__ == "__main__":
dataset = MyDataset(root_folder="sample")