本文最后更新于 444 天前

今天最开心的一件事:实现了 MOANA 数据集上 LiDAR 到 RGB 图像的 projection 感谢 wooseongY 的积极恢复,我确实发现了转换过程中的一个小错误,得到了很快的修复,也敦促他更新了数据集。下面是代码:
import numpy as np
import os
import matplotlib.pyplot as plt
import cv2
import json
import struct
from tqdm import tqdm
import open3d as o3d
from numpy.linalg import inv
class DataGenerator:
def __init__(self, base_dir, img_dir, xband_dir,lidar_dir,calib_dir):
self.base_dir = base_dir
self.img_dir = img_dir
self.xband_dir = xband_dir
self.lidar_dir = lidar_dir
self.calib_dir = calib_dir
self.file_list = list(set(f.split('.')[0] for f in os.listdir(img_dir)))
self.file_list.sort()
self.labeled = []
self.img_list = []
self.lidar_list = []
def sample_loader(self, img_name):
img = cv2.imread(os.path.join(self.img_dir, img_name + '.jpg'))
label = json.load(open(os.path.join(self.img_dir, img_name + '.json')))
return img, label
def draw_bbox(self, img, label):
for obj in label:
x, y, w, h = obj['xmin'],obj['ymin'],obj['width'],obj['height']
cv2.rectangle(img, (x, y), (x+w, y+h), (0, 255, 0), 5)
cv2.putText(img, 'ID:'+str(obj['id']), (x, y-10), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 5)
return img
def visualize_rgb(self, img_name):
img, label = self.sample_loader(img_name)
if label['annotations'] != []:
self.labeled.append(img_name)
img = self.draw_bbox(img, label['annotations'])
plt.imshow(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))
plt.show()
self.img_list.append(img)
return img
def gen_video(self, video_list, out_dir, fps=30):
w, h = video_list[0].shape[1], video_list[0].shape[0]
fourcc = cv2.VideoWriter_fourcc(*'XVID')
out = cv2.VideoWriter(out_dir, fourcc, fps, (w, h))
for img in video_list:
out.write(img)
out.release()
def visualize_projection_video(self, output_video_path, fps=30):
# Generate the match list for images and LiDAR files
match_list = self.create_match_list()
# Read the first image to get the size (for video writer)
first_image_path = os.path.join(self.img_dir, match_list[0][0])
first_image = cv2.imread(first_image_path)
height, width = first_image.shape[:2]
# Initialize video writer
fourcc = cv2.VideoWriter_fourcc(*'XVID')
out = cv2.VideoWriter(output_video_path, fourcc, fps, (width, height))
# Iterate through the match list and create the video
for img_name, lidar_name in tqdm(match_list):
img_with_projection = self.visualize_projection(lidar_name, img_name)
out.write(img_with_projection) # Write frame to video
out.release() # Release the video writer
def load_lidar_data(self, file_name):
pointcloud = []
with open(file_name, 'rb') as f:
while True:
data = f.read(16) # 4 * 4 bytes
if not data:
break
x, y, z, intensity = struct.unpack('ffff', data)
pointcloud.append([x, y, z, intensity])
return np.array(pointcloud)
def visualize_lidar(self, pointcloud):
pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(pointcloud[:, :3]) # 只使用 x, y, z
o3d.visualization.draw_geometries([pcd])
def process_lidar(self, lidar_file_name):
file_path = os.path.join(self.lidar_dir, lidar_file_name)
if not os.path.exists(file_path):
print(f"LiDAR file {file_path} does not exist!")
return
# load cloud points
pointcloud = self.load_lidar_data(file_path)
# visualize cloud points
# self.visualize_lidar(pointcloud)
return pointcloud
def convert_txt_to_json(self, txt_file_name):
key,value = '',[]
with open(os.path.join(self.calib_dir, txt_file_name)) as f:
lines = f.readlines()
calib = {}
for line in lines:
if line.strip() == '':
calib[key] = value
key = ''
value = []
elif ':' in line.strip():
if key != '':
calib[key] = value
key,value = line.strip().split(':')
value = [list(map(float, value.split()))]
calib[key] = value
else:
value.append(list(map(float, line.strip().split())))
with open(os.path.join(self.calib_dir, txt_file_name.split('.')[0]+'.json'), 'w') as f:
json.dump(calib, f, indent=4)
def load_calibration(self, calib_file_name=None):
with open(os.path.join(self.calib_dir, 'Base2camera.json')) as f:
cam_calib = json.load(f)
cam_K = np.array(cam_calib['camera_matrix_left'])
cam_R = np.array(cam_calib['Tr_gnss_to_cam_left'])[:3,:3]
cam_T = np.array(cam_calib['Tr_gnss_to_cam_left'])[:,3]
cam_dis_raw = cam_calib['distortion_coefficent_left']
# Flatten the distortion coefficients if it's a list of lists
cam_dis = []
for item in cam_dis_raw:
if isinstance(item, list):
cam_dis.extend(item) # Extend if it's a nested list
else:
cam_dis.append(item) # Add the element if it's a single value
# Convert the flattened list into a numpy array
cam_dis = np.array(cam_dis)
with open(os.path.join(self.calib_dir, 'Base2lidar.json')) as f:
lidar_camlib = json.load(f)
lidar_R = np.array(lidar_camlib['Tr_gnss_to_lidar'])[:3,:3]
lidar_T = np.array(lidar_camlib['Tr_gnss_to_lidar'])[:,3]
# lidar_R = lidar_R.T
# lidar_T = - lidar_R @ lidar_T
return (cam_K, cam_R, cam_T, cam_dis), (lidar_R, lidar_T)
def project_pointcloud_lidar_to_image(self, pointcloud, image, cam_K, cam_R, cam_T, cam_dis, lidar_R, lidar_T):
cam_TR = np.vstack((np.hstack((cam_R,cam_T.reshape(-1,1))),np.reshape(np.array([0,0,0,1]),(1,-1)))) # (4,4)
lidar_TR = np.vstack((np.hstack((lidar_R,lidar_T.reshape(-1,1))),np.reshape(np.array([0,0,0,1]),(1,-1)))) # (4,4)
pointcloud_homogeneous = np.hstack((pointcloud, np.ones((pointcloud.shape[0], 1)))).T # (4, N)
pointcloud_homogeneous = (inv(cam_TR) @ lidar_TR @ pointcloud_homogeneous)[:3,:] # (3,N)
points_2d_homogeneous = cam_K @ pointcloud_homogeneous # (3, N)
points_2d = points_2d_homogeneous[:2, :] / points_2d_homogeneous[2, :] # 归一化
img_with_projection = image.copy()
for u, v in points_2d.T:
u, v = int(u), int(v)
if 0 <= u < image.shape[1] and 0 <= v < image.shape[0]: # 检查是否在图像范围内
cv2.circle(img_with_projection, (u, v), 3, (0, 0, 255), -1)
return img_with_projection
# def project_pointcloud_to_image(self, pointcloud, image, K, R, T):
# RT = np.hstack((R, np.reshape(T,(-1,1))))
# pointcloud_homogeneous = np.hstack((pointcloud, np.ones((pointcloud.shape[0], 1))))
# projection_matrix = K @ RT
# points_2d_homogeneous = projection_matrix @ pointcloud_homogeneous.T # (3, N)
# points_2d = points_2d_homogeneous[:2, :] / points_2d_homogeneous[2, :] # 归一化
# img_with_projection = image.copy()
# for u, v in points_2d.T:
# u, v = int(u), int(v)
# if 0 <= u < image.shape[1] and 0 <= v < image.shape[0]:
# cv2.circle(img_with_projection, (u, v), 5, (0, 0, 255), -1)
# return img_with_projection
def create_match_list(self):
# Get the list of image and LiDAR files
self.img_list = sorted(os.listdir(self.img_dir))
self.lidar_list = sorted(os.listdir(self.lidar_dir))
match_list = []
# Define a helper function to extract the timestamp (in nsec) from the filename
def extract_timestamp(filename):
# Assuming filenames are based on nanosecond timestamps like "1630451234567891234.jpg"
timestamp_str = filename.split('.')[0] # Extract the base name without extension
try:
return int(timestamp_str) # Convert to integer (nanosecond timestamp)
except ValueError:
return None
# Create a list of (filename, timestamp) for both image and LiDAR files
img_timestamps = [(img_name, extract_timestamp(img_name)) for img_name in self.img_list]
lidar_timestamps = [(lidar_name, extract_timestamp(lidar_name)) for lidar_name in self.lidar_list]
# For each image, find the LiDAR file with the nearest timestamp
for i, (img_name, img_timestamp) in enumerate(img_timestamps):
if img_timestamp is None or i % 10 != 0:
continue # Skip non-10th images
# Find the LiDAR file with the closest timestamp
closest_lidar_name = None
closest_lidar_diff = None
for lidar_name, lidar_timestamp in lidar_timestamps:
if lidar_timestamp is None:
continue # Skip if we can't extract a valid timestamp
# Calculate the time difference between the image and LiDAR timestamps
time_diff = abs(img_timestamp - lidar_timestamp)
if closest_lidar_diff is None or time_diff < closest_lidar_diff:
closest_lidar_diff = time_diff
closest_lidar_name = lidar_name
if closest_lidar_name:
match_list.append([img_name, closest_lidar_name])
# Save the match list to a text file
with open('match_list.txt', 'w') as f:
for item in match_list:
f.write("%s\n" % str(item))
return match_list
def visualize_projection(self, lidar_name, img_name):
# load image
img = cv2.imread(os.path.join(self.img_dir, img_name))
# load pointcloud
cloud_points = self.process_lidar(lidar_name)
distances = np.linalg.norm(cloud_points[:, :2], axis=1) # Compute distance for [x, y, z]
min_range = 4.0
# Filter points that are closer than the minimum range
cloud_points = cloud_points[distances >= min_range]
# load calibration
cam_calib, lidar_calib = self.load_calibration()
cam_K, cam_R, cam_T, cam_dis = cam_calib
lidar_R, lidar_T = lidar_calib
img_with_projection = self.project_pointcloud_lidar_to_image(cloud_points[:,:3], img, cam_K, cam_R, cam_T, cam_dis, lidar_R, lidar_T)
return img_with_projection
base_dir = r'1_Single_Island'
data_dir = r'1_Single_Island/sensor_data'
img_dir = os.path.join(data_dir,'Camera_left')
xband_dir = os.path.join(data_dir,'X_band_radar')
lidar_dir = os.path.join(data_dir,'LiDAR')
calib_dir = os.path.join(base_dir,'calibration')
data_loader = DataGenerator(data_dir, img_dir, xband_dir,lidar_dir,calib_dir)
data_loader.convert_txt_to_json('Base2camera.txt')
data_loader.convert_txt_to_json('Base2lidar.txt')
data_loader.visualize_projection_video('output_video.mp4')
晚上打算回去的时候,sq又来骚扰,我只能说他利欲熏心,即使讨论学术,也感受到他的攻击性和攀比心,他又说到他要求利物浦访问,做手术机器人相关。但是手术机器人这个行业吧……高情商的说法是——未来有希望,但已经发展了那么多年,也水了那么多年论文,联想到他以往干的事情……不敢想象。
今天军棋到了,玩得很开心。生活就要这样,既要买的下车票,选好路线,吃着火锅唱着歌。
大冰老师似乎失去所有的抵抗手段,很温暖也很成熟,很帅!不开心就看大冰、小陈;开心就看科普、历史。