Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lanedet dataset curation tusimple #35

369 changes: 369 additions & 0 deletions LaneDet/create_lane/TuSimple/process_tusimple.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,369 @@
#! /usr/bin/env python3

import argparse
import json
import os
import pathlib
from PIL import Image, ImageDraw
import warnings
from datetime import datetime

# Custom warning format cuz the default one is wayyyyyy too verbose
def custom_warning_format(message, category, filename, lineno, line=None):
return f"WARNING : {message}\n"

warnings.formatwarning = custom_warning_format

# ============================== Helper functions ============================== #

def normalizeCoords(lane, width, height):
"""
Normalize the coords of lane points.

"""
return [(x / width, y / height) for x, y in lane]


def getLaneAnchor(lane):
"""
Determine "anchor" point of a lane.

"""
(x2, y2) = lane[-1]
(x1, y1) = lane[-2]
for i in range(len(lane) - 2, 0, -1):
if (lane[i][0] != x2):
(x1, y1) = lane[i]
break
if (x1 == x2):
warnings.warn(f"Vertical lane detected: {lane}, with these 2 anchors: ({x1}, {y1}), ({x2}, {y2}).")
return (x1, None, None)
a = (y2 - y1) / (x2 - x1)
b = y1 - a * x1
x0 = (img_height - b) / a

return (x0, a, b)


def getEgoIndexes(anchors):
"""
Identifies 2 ego lanes - left and right - from a sorted list of lane anchors.

"""
for i in range(len(anchors)):
if (anchors[i][0] >= img_width / 2):
if (i == 0):
return "NO LANES on the LEFT side of frame. Something's sussy out there!"
left_ego_idx, right_ego_idx = i - 1, i
return (left_ego_idx, right_ego_idx)

return "NO LANES on the RIGHT side of frame. Something's sussy out there!"


def getDrivablePath(left_ego, right_ego):
"""
Computes drivable path as midpoint between 2 ego lanes, basically the main point of this task.

"""
i, j = 0, 0
drivable_path = []
while (i < len(left_ego) - 1 and j < len(right_ego) - 1):
if (left_ego[i][1] == right_ego[j][1]):
drivable_path.append((
(left_ego[i][0] + right_ego[j][0]) / 2, # Midpoint along x axis
left_ego[i][1]
))
i += 1
j += 1
elif (left_ego[i][1] < right_ego[j][1]):
i += 1
else:
j += 1

# Extend drivable path to bottom edge of the frame
if (len(drivable_path) >= 2):
x1, y1 = drivable_path[-2]
x2, y2 = drivable_path[-1]
if (x2 == x1):
x_bottom = x2
else:
a = (y2 - y1) / (x2 - x1)
x_bottom = x2 + (img_height - y2) / a
drivable_path.append((x_bottom, img_height))

# Extend drivable path to be on par with longest ego lane
# By making it parallel with longer ego lane
y_top = min(left_ego[0][1], right_ego[0][1])
sign_left_ego = left_ego[0][0] - left_ego[1][0]
sign_right_ego = right_ego[0][0] - right_ego[1][0]
sign_val = sign_left_ego * sign_right_ego
if (sign_val > 0): # 2 egos going the same direction
longer_ego = left_ego if left_ego[0][1] < right_ego[0][1] else right_ego
if len(longer_ego) >= 2 and len(drivable_path) >= 2:
x1, y1 = longer_ego[0]
x2, y2 = longer_ego[1]
if (x2 == x1):
x_top = drivable_path[0][0]
else:
a = (y2 - y1) / (x2 - x1)
x_top = drivable_path[0][0] + (y_top - drivable_path[0][1]) / a

drivable_path.insert(0, (x_top, y_top))
else:
# Extend drivable path to be on par with longest ego lane
if len(drivable_path) >= 2:
x1, y1 = drivable_path[0]
x2, y2 = drivable_path[1]
if (x2 == x1):
x_top = x1
else:
a = (y2 - y1) / (x2 - x1)
x_top = x1 + (y_top - y1) / a

drivable_path.insert(0, (x_top, y_top))

return drivable_path


def annotateGT(
anno_entry, anno_raw_file,
raw_dir, visualization_dir, mask_dir,
img_width, img_height,
normalized = True
):
"""
Annotates and saves an image with:
- Raw image, in "output_dir/image".
- Annotated image with all lanes, in "output_dir/visualization".
- Binary segmentation mask of drivable path, in "output_dir/segmentation".

"""

# Load raw image
raw_img = Image.open(anno_raw_file).convert("RGB")

# Define save name
# Keep original pathname (back to 5 levels) for traceability, but replace "/" with "-"
# Also save in PNG (EXTREMELY SLOW compared to jpg, for lossless quality)
save_name = str(img_id_counter).zfill(5) + ".png"

# Copy raw img and put it in raw dir.
raw_img.save(os.path.join(raw_dir, save_name))

# Draw all lanes & lines
draw = ImageDraw.Draw(raw_img)
lane_colors = {
"leftego_green": (0, 255, 0),
"rightego_blue" : (0,0,255),
"outer_yellow": (255, 255, 0)
}
lane_w = 5
# Draw lanes
for idx, lane in enumerate(anno_entry["lanes"]):
if (normalized):
lane = [(x * img_width, y * img_height) for x, y in lane]

# left ego lane in green
if (idx == anno_entry["ego_indexes"][0]):
draw.line(lane, fill = lane_colors["leftego_green"], width = lane_w)

# right ego lane in blue
elif (idx == anno_entry["ego_indexes"][1]):
draw.line(lane, fill = lane_colors["rightego_blue"], width = lane_w)

# Outer lanes, in yellow
else:
draw.line(lane, fill = lane_colors["outer_yellow"], width = lane_w)



# Save visualization img, same format with raw, just different dir
raw_img.save(os.path.join(visualization_dir, save_name))

# renormalize all lanes
lanes_renormed=[]
for lane in anno_entry["lanes"]:


lane_renormed=[(x* img_width,y*img_height) for x,y in lane]
lanes_renormed.append(lane_renormed)

# Working on binary mask
mask = Image.new("L", (img_width, img_height), 0)
mask_draw = ImageDraw.Draw(mask)
#mask_draw.line(drivable_renormed, fill = 255, width = lane_w)

for lane in lanes_renormed:
mask_draw.line(lane,fill=255,width=lane_w)

mask.save(os.path.join(mask_dir, save_name))

def parseAnnotations(anno_path):
"""
Parses lane annotations from raw dataset file, then extracts normalized GT data.

"""
# Read each line of GT text file as JSON
with open(anno_path, "r") as f:
read_data = [json.loads(line) for line in f.readlines()]

# Parse data from those JSON lines
anno_data = {}
for item in read_data:
lanes = item["lanes"]
h_samples = item["h_samples"]
raw_file = item["raw_file"]

# Decouple from {lanes: [xi1, xi2, ...], h_samples: [y1, y2, ...]} to [(xi1, y1), (xi2, y2), ...]
# `lane_decoupled` is a list of sublists representing lanes, each lane is a list of (x, y) tuples.
lanes_decoupled = [
[(x, y) for x, y in zip(lane, h_samples) if x != -2]
for lane in lanes if sum(1 for x in lane if x != -2) >= 2 # Filter out lanes < 2 points (there's actually a bunch of em)
]

# Determine 2 ego lanes
lane_anchors = [getLaneAnchor(lane) for lane in lanes_decoupled]
ego_indexes = getEgoIndexes(lane_anchors)

if (type(ego_indexes) is str):
if (ego_indexes.startswith("NO")):
warnings.warn(f"Parsing {raw_file}: {ego_indexes}")
continue

left_ego = lanes_decoupled[ego_indexes[0]]
right_ego = lanes_decoupled[ego_indexes[1]]

# Determine drivable path from 2 egos
drivable_path = getDrivablePath(left_ego, right_ego)

# Parse processed data, all coords normalized
anno_data[raw_file] = {
"lanes" : [normalizeCoords(lane, img_width, img_height) for lane in lanes_decoupled],
"ego_indexes" : ego_indexes,
"drivable_path" : normalizeCoords(drivable_path, img_width, img_height),
"img_width" : img_width,
"img_height" : img_height,
}

return anno_data


if __name__ == "__main__":

# ============================== Dataset structure ============================== #

root_dir = "TUSimple"
train_dir = "train_set"
test_dir = "test_set"

train_clip_codes = ["0313", "0531", "0601"] # Train labels are split into 3 dirs
test_file = "test_label.json" # Test file name

img_width = 1280
img_height = 720

# ============================== Parsing args ============================== #

parser = argparse.ArgumentParser(
description = "Process TuSimple dataset - PathDet groundtruth generation"
)
parser.add_argument(
"--dataset_dir",
type = str,
help = "TuSimple directory (right after extraction)"
)
parser.add_argument(
"--output_dir",
type = str,
help = "Output directory"
)
args = parser.parse_args()

# Generate output structure
"""
--output_dir
|----image
|----segmentation
|----visualization
|----drivable_path.json
"""
dataset_dir = args.dataset_dir
output_dir = args.output_dir
list_subdirs = ["image", "segmentation", "visualization"]
for subdir in list_subdirs:
subdir_path = os.path.join(output_dir, subdir)
if (not os.path.exists(subdir_path)):
os.makedirs(subdir_path, exist_ok = True)

# ============================== Parsing annotations ============================== #

train_label_files = [
os.path.join(dataset_dir, root_dir, train_dir, f"label_data_{clip_code}.json")
for clip_code in train_clip_codes
]

"""
Make no mistake: the file `TuSimple/test_set/test_tasks_0627.json` is NOT a label file.
It is just a template for test submission. Kinda weird they put it in `test_set` while
the actual label file is in `TuSimple/test_label.json`.
"""
test_label_files = [os.path.join(dataset_dir, root_dir, test_file)]
label_files = train_label_files + test_label_files

# Parse data by batch
data_master = {
"files": label_files,
"data": {}
}

img_id_counter = -1

for anno_file in data_master["files"]:
print(f"\n==================== Processing data in label file {anno_file} ====================\n")
this_data = parseAnnotations(anno_file)

list_raw_files = list(this_data.keys())
for raw_file in list_raw_files:

# Conduct index increment
img_id_counter += 1
#set_dir = "/".join(anno_file.split("/")[ : -1]) # Slap "train_set" or "test_set" to the end <-- Specific to linux hence used os.path.dirname command below
set_dir= os.path.dirname(anno_file)
set_dir = os.path.join(set_dir, test_dir) if test_file in anno_file else set_dir # Tricky test dir

# Annotate raw images
anno_entry = this_data[raw_file]
annotateGT(
anno_entry,
anno_raw_file = os.path.join(set_dir, raw_file),
raw_dir = os.path.join(output_dir, "image"),
visualization_dir = os.path.join(output_dir, "visualization"),
mask_dir = os.path.join(output_dir, "segmentation"),
img_height = anno_entry["img_height"],
img_width = anno_entry["img_width"],
)

anno_entry["other_lanes"]=[]
for idx,lane in enumerate(anno_entry["lanes"]):
if idx==anno_entry["ego_indexes"][0]:
anno_entry["egoleft_lane"]=lane
elif idx==anno_entry["ego_indexes"][1]:
anno_entry["egoright_lane"]=lane
else:
anno_entry["other_lanes"].append(lane)
# Pop redundant keys
del anno_entry["lanes"]
del anno_entry["ego_indexes"]
del anno_entry["drivable_path"]
# Change `raw_file` to 5-digit incremental index
this_data[str(img_id_counter).zfill(5)] = anno_entry
this_data.pop(raw_file)

data_master["data"].update(this_data)
print(f"Processed {len(this_data)} entries in above file.\n")

print(f"Done processing data with {len(data_master['data'])} entries in total.\n")

# Save master data
with open(os.path.join(output_dir, "drivable_path.json"), "w") as f:
json.dump(data_master, f, indent = 4)
3 changes: 2 additions & 1 deletion PathDet/create_path/TuSimple/process_tusimple.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,8 @@ def parseAnnotations(anno_path):

# Conduct index increment
img_id_counter += 1
set_dir = "/".join(anno_file.split("/")[ : -1]) # Slap "train_set" or "test_set" to the end
#set_dir = "/".join(anno_file.split("/")[ : -1]) # Slap "train_set" or "test_set" to the end <-- Specific to linux hence used os.path.dirname command below
set_dir= os.path.dirname(anno_file)
set_dir = os.path.join(set_dir, test_dir) if test_file in anno_file else set_dir # Tricky test dir

# Annotate raw images
Expand Down