Skip to content

Commit

Permalink
update testing script for urbannav dataset
Browse files Browse the repository at this point in the history
  • Loading branch information
Gaaaavin committed Nov 2, 2024
1 parent c5540d1 commit e12ffbf
Show file tree
Hide file tree
Showing 2 changed files with 157 additions and 57 deletions.
204 changes: 149 additions & 55 deletions pl_modules/urban_nav_module.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ def __init__(self, cfg):
self.model = UrbanNav(cfg)
self.save_hyperparameters(cfg)
self.do_normalize = cfg.training.normalize_step_length
self.datatype = cfg.data.type

# Coordinate representation
self.output_coordinate_repr = cfg.model.output_coordinate_repr
Expand Down Expand Up @@ -43,6 +44,10 @@ def __init__(self, cfg):
self.distance_loss_weight = cfg.training.distance_loss_weight
self.angle_loss_weight = cfg.training.angle_loss_weight

if self.datatype == "urbannav":
self.test_catetories = ['crowd', 'person_close_by', 'turn', 'action_target_mismatch', 'crossing', 'other']
self.num_categories = len(self.test_catetories)

def forward(self, obs, cord, gt_action=None):
return self.model(obs, cord, gt_action)

Expand Down Expand Up @@ -137,53 +142,102 @@ def validation_step(self, batch, batch_idx):
def test_step(self, batch, batch_idx):
obs = batch['video_frames']
cord = batch['input_positions']
B, T, _, _, _ = obs.shape

if self.output_coordinate_repr == "euclidean":
if self.datatype == "citywalk":
if self.output_coordinate_repr == "euclidean":
wp_pred, arrive_pred = self(obs, cord)
# Compute L1 loss for waypoints
waypoints_target = batch['waypoints']
l1_loss = F.l1_loss(wp_pred, waypoints_target, reduction='mean').item()
elif self.output_coordinate_repr == "polar":
wp_pred, arrive_pred, distance_pred, angle_pred = self(obs, cord)
waypoints_target = batch['waypoints']
distance_loss, angle_loss, _, _ = self.compute_loss_polar(wp_pred, distance_pred, angle_pred, arrive_pred, batch)


# Compute accuracy for "arrived" prediction
arrived_target = batch['arrived']
arrived_logits = arrive_pred.flatten()
arrived_probs = torch.sigmoid(arrived_logits)
arrived_pred_binary = (arrived_probs >= 0.5).float()
correct = (arrived_pred_binary == arrived_target).float()
accuracy = correct.sum().item() / correct.numel()

# wp_pred_last = wp_pred[:, -1, :] # shape [batch_size, 2]
# waypoints_target_last = waypoints_target[:, -1, :] # shape [batch_size, 2]

# Compute cosine similarity
wp_pred_view = wp_pred.view(-1, 2)
waypoints_target_view = waypoints_target.view(-1, 2)
dot_product = (wp_pred_view * waypoints_target_view).sum(dim=1) # shape [batch_size]
norm_pred = wp_pred_view.norm(dim=1) # shape [batch_size]
norm_target = waypoints_target_view.norm(dim=1) # shape [batch_size]
cos_sim = dot_product / (norm_pred * norm_target + 1e-8) # avoid division by zero

# Compute angle in degrees
angle = torch.acos(cos_sim.clamp(-1+1e-7, 1-1e-7)) * 180 / torch.pi # shape [batch_size]
angle = angle.view(B, T)

# Take mean angle
mean_angle = angle.mean(dim=0).cpu().numpy()

# Store the metrics
if self.output_coordinate_repr == "euclidean":
self.test_metrics['l1_loss'].append(l1_loss)
elif self.output_coordinate_repr == "polar":
self.test_metrics['distance_loss'].append(distance_loss)
self.test_metrics['angle_loss'].append(angle_loss)
self.test_metrics['arrived_accuracy'].append(accuracy)
self.test_metrics['mean_angle'].append(mean_angle)
elif self.datatype == "urbannav":
category = batch['categories']
wp_pred, arrive_pred = self(obs, cord)

# Compute L1 loss for waypoints
waypoints_target = batch['waypoints']
l1_loss = F.l1_loss(wp_pred, waypoints_target, reduction='mean').item()
elif self.output_coordinate_repr == "polar":
wp_pred, arrive_pred, distance_pred, angle_pred = self(obs, cord)
waypoints_target = batch['waypoints']
distance_loss, angle_loss, _, _ = self.compute_loss_polar(wp_pred, distance_pred, angle_pred, arrive_pred, batch)
l1_loss = F.l1_loss(wp_pred, waypoints_target, reduction='none')


# Compute accuracy for "arrived" prediction
arrived_target = batch['arrived']
arrived_logits = arrive_pred.flatten()
arrived_probs = torch.sigmoid(arrived_logits)
arrived_pred_binary = (arrived_probs >= 0.5).float()
correct = (arrived_pred_binary == arrived_target).float()
accuracy = correct.sum().item() / correct.numel()
# Compute accuracy for "arrived" prediction
arrived_target = batch['arrived']
arrived_probs = torch.sigmoid(arrive_pred)
arrived_pred_binary = (arrived_probs >= 0.5).float().squeeze(-1)
correct = (arrived_pred_binary == arrived_target).float()

# Compute cosine similarity
wp_pred_view = wp_pred.view(-1, 2)
waypoints_target_view = waypoints_target.view(-1, 2)
dot_product = (wp_pred_view * waypoints_target_view).sum(dim=1) # shape [batch_size]
norm_pred = wp_pred_view.norm(dim=1) # shape [batch_size]
norm_target = waypoints_target_view.norm(dim=1) # shape [batch_size]
cos_sim = dot_product / (norm_pred * norm_target + 1e-8) # avoid division by zero
# Compute angle in degrees
angle = torch.acos(cos_sim.clamp(-1+1e-7, 1-1e-7)) * 180 / torch.pi # shape [batch_size]
angle = angle.view(B, T)

for batch_idx in range(B):
for category_idx in range(self.num_categories):
if category[batch_idx, category_idx] == 1:
category_name = self.test_catetories[category_idx]
self.test_metrics[category_name]['l1_loss'].append(l1_loss[batch_idx].mean().item())
self.test_metrics[category_name]['arrived_accuracy'].append(correct[batch_idx].item())
self.test_metrics[category_name]['mean_angle'].append(angle[batch_idx].mean().cpu().numpy())
self.test_metrics[category_name]['angle_step1'].append(angle[batch_idx, 0].cpu().numpy())
self.test_metrics[category_name]['angle_step2'].append(angle[batch_idx, 1].cpu().numpy())
self.test_metrics[category_name]['angle_step3'].append(angle[batch_idx, 2].cpu().numpy())
self.test_metrics[category_name]['angle_step4'].append(angle[batch_idx, 3].cpu().numpy())
self.test_metrics[category_name]['angle_step5'].append(angle[batch_idx, 4].cpu().numpy())
else:
continue
self.test_metrics['overall']['l1_loss'].append(l1_loss[batch_idx].mean().item())
self.test_metrics['overall']['arrived_accuracy'].append(correct[batch_idx].item())
self.test_metrics['overall']['mean_angle'].append(angle[batch_idx].mean().cpu().numpy())
self.test_metrics['overall']['angle_step1'].append(angle[batch_idx, 0].cpu().numpy())
self.test_metrics['overall']['angle_step2'].append(angle[batch_idx, 1].cpu().numpy())
self.test_metrics['overall']['angle_step3'].append(angle[batch_idx, 2].cpu().numpy())
self.test_metrics['overall']['angle_step4'].append(angle[batch_idx, 3].cpu().numpy())
self.test_metrics['overall']['angle_step5'].append(angle[batch_idx, 4].cpu().numpy())

# wp_pred_last = wp_pred[:, -1, :] # shape [batch_size, 2]
# waypoints_target_last = waypoints_target[:, -1, :] # shape [batch_size, 2]

# Compute cosine similarity
B, T, _ = wp_pred.shape
wp_pred_view = wp_pred.view(-1, 2)
waypoints_target_view = waypoints_target.view(-1, 2)
dot_product = (wp_pred_view * waypoints_target_view).sum(dim=1) # shape [batch_size]
norm_pred = wp_pred_view.norm(dim=1) # shape [batch_size]
norm_target = waypoints_target_view.norm(dim=1) # shape [batch_size]
cos_sim = dot_product / (norm_pred * norm_target + 1e-8) # avoid division by zero

# Compute angle in degrees
angle = torch.acos(cos_sim.clamp(-1+1e-7, 1-1e-7)) * 180 / torch.pi # shape [batch_size]
angle = angle.view(B, T)

# Take mean angle
mean_angle = angle.mean(dim=0).cpu().numpy()

# Store the metrics
if self.output_coordinate_repr == "euclidean":
self.test_metrics['l1_loss'].append(l1_loss)
elif self.output_coordinate_repr == "polar":
self.test_metrics['distance_loss'].append(distance_loss)
self.test_metrics['angle_loss'].append(angle_loss)
self.test_metrics['arrived_accuracy'].append(accuracy)
self.test_metrics['mean_angle'].append(mean_angle)

# Handle visualization
if self.output_coordinate_repr == "euclidean":
Expand All @@ -204,26 +258,66 @@ def test_step(self, batch, batch_idx):
)

def on_test_epoch_end(self):
for metric in self.test_metrics:
metric_array = np.array(self.test_metrics[metric])
save_path = os.path.join(self.result_dir, f'test_{metric}.npy')
np.save(save_path, metric_array)
if not metric == "mean_angle":
print(f"Test mean {metric} {metric_array.mean():.4f} saved to {save_path}")
else:
mean_angle = metric_array.mean(axis=0)
for i in range(len(mean_angle)):
print(f"Test mean angle at step {i} {mean_angle[i]:.4f}")
if self.datatype == "citywalk":
for metric in self.test_metrics:
metric_array = np.array(self.test_metrics[metric])
save_path = os.path.join(self.result_dir, f'test_{metric}.npy')
np.save(save_path, metric_array)
if not metric == "mean_angle":
print(f"Test mean {metric} {metric_array.mean():.4f} saved to {save_path}")
else:
mean_angle = metric_array.mean(axis=0)
for i in range(len(mean_angle)):
print(f"Test mean angle at step {i} {mean_angle[i]:.4f}")
elif self.datatype == "urbannav":
import pandas as pd
for category in self.test_catetories:
for metric in self.test_metrics[category]:
self.test_metrics[category][metric] = np.array(self.test_metrics[category][metric]).mean()
for metric in self.test_metrics['overall']:
self.test_metrics['overall'][metric] = np.array(self.test_metrics['overall'][metric]).mean()
metrics = ['l1_loss', 'arrived_accuracy', 'angle_step1', 'angle_step2', 'angle_step3', 'angle_step4', 'angle_step5', 'mean_angle']
for metric in metrics:
category_val = []
for category in self.test_catetories:
category_val.append(self.test_metrics[category][metric])
self.test_metrics['mean'][metric] = np.array(category_val).mean()
print(f"{metric}: Sample mean {self.test_metrics['overall'][metric]:.4f}, Category mean {self.test_metrics['mean'][metric]:.4f}")

df = pd.DataFrame(self.test_metrics)
df = df.reset_index().rename(columns={'index': 'Metrics'})
save_path = os.path.join(self.result_dir, 'test_metrics.csv')
df.to_csv(save_path, index=False)


def on_validation_epoch_start(self):
self.vis_count = 0

def on_test_epoch_start(self):
self.vis_count = 0
if self.output_coordinate_repr == "euclidean":
self.test_metrics = {'l1_loss': [], 'arrived_accuracy': [], 'mean_angle': []}
elif self.output_coordinate_repr == "polar":
self.test_metrics = {'distance_loss': [], 'angle_loss': [], 'arrived_accuracy': [], 'mean_angle': []}
if self.datatype == "citywalk":
if self.output_coordinate_repr == "euclidean":
self.test_metrics = {'l1_loss': [], 'arrived_accuracy': [], 'mean_angle': []}
elif self.output_coordinate_repr == "polar":
self.test_metrics = {'distance_loss': [], 'angle_loss': [], 'arrived_accuracy': [], 'mean_angle': []}
elif self.datatype == "urbannav":
self.test_metrics = {}
categories = self.test_catetories[:]
categories.extend(['mean', 'overall'])
for category in categories:
if self.output_coordinate_repr == "euclidean":
self.test_metrics[category] = {
'l1_loss': [],
'arrived_accuracy': [],
'angle_step1': [],
'angle_step2': [],
'angle_step3': [],
'angle_step4': [],
'angle_step5': [],
'mean_angle': []
}
elif self.output_coordinate_repr == "polar":
raise ValueError("Polar representation is not supported for UrbanNav dataset.")

def compute_loss(self, wp_pred, arrive_pred, batch):
waypoints_target = batch['waypoints']
Expand Down
10 changes: 8 additions & 2 deletions test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import yaml
import os
from pl_modules.citywalk_datamodule import CityWalkDataModule
from pl_modules.urbannav_datamodule import UrbanNavDataModule
from pl_modules.urban_nav_module import UrbanNavModule
import torch
import glob
Expand Down Expand Up @@ -70,8 +71,13 @@ def main():
test_dir = os.path.join(cfg.project.result_dir, cfg.project.run_name, 'test')
os.makedirs(test_dir, exist_ok=True)

# Initialize the DataModule for testing
datamodule = CityWalkDataModule(cfg)
# Initialize the DataModule
if cfg.data.type == 'citywalk':
datamodule = CityWalkDataModule(cfg)
elif cfg.data.type == 'urbannav':
datamodule = UrbanNavDataModule(cfg)
else:
raise ValueError(f"Invalid dataset: {cfg.data.dataset}")

# Initialize the model
model = UrbanNavModule(cfg)
Expand Down

0 comments on commit e12ffbf

Please sign in to comment.