Skip to content

Commit

Permalink
Robust loss: add GemanMcClure based GNC loss and hinge loss (facebook…
Browse files Browse the repository at this point in the history
  • Loading branch information
YipuZhao authored Aug 2, 2023
1 parent dd4356b commit 240e120
Show file tree
Hide file tree
Showing 5 changed files with 260 additions and 39 deletions.
155 changes: 124 additions & 31 deletions tests/theseus_tests/core/test_robust_cost.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,19 @@
# LICENSE file in the root directory of this source tree.
import pytest

import torch

import theseus as th

import torch
from tests.theseus_tests.core.common import BATCH_SIZES_TO_TEST


def _new_robust_cf(
batch_size, loss_cls, generator, masked_weight=False
) -> th.RobustCostFunction:
batch_size,
loss_cls,
generator,
masked_weight=False,
gnc_cost=False,
) -> [th.RobustCostFunction, th.GNCRobustCostFunction]:
v1 = th.rand_se3(batch_size, generator=generator)
v2 = th.rand_se3(batch_size, generator=generator)
if masked_weight:
Expand All @@ -25,44 +29,93 @@ def _new_robust_cf(
w = th.ScaleCostWeight(w_tensor)
cf = th.Local(v1, v2, w)
ll_radius = th.Variable(tensor=torch.randn(1, 1, generator=generator))
return th.RobustCostFunction(cf, loss_cls, ll_radius)
if gnc_cost:
gnc_control_val = th.Variable(
tensor=torch.randn(1, 1, generator=generator).abs() * 1e3
)
return th.GNCRobustCostFunction(
cf,
loss_cls,
log_loss_radius=ll_radius,
gnc_control_val=gnc_control_val,
)
else:
return th.RobustCostFunction(cf, loss_cls, log_loss_radius=ll_radius)


def _grad(jac: torch.Tensor, error: torch.Tensor) -> torch.Tensor:
return jac.transpose(2, 1).matmul(error.unsqueeze(2)).permute(0, 2, 1)


def test_robust_cost_weighted_error():
def _loss_evaluate(robust_cf, loss_cls, x: torch.Tensor) -> torch.Tensor:
if isinstance(robust_cf, th.GNCRobustCostFunction):
return loss_cls.evaluate(
x,
log_radius=robust_cf.log_loss_radius.tensor,
mu=robust_cf.gnc_control_val.tensor,
)
else:
return loss_cls.evaluate(x, log_radius=robust_cf.log_loss_radius.tensor)


def _loss_linearize(robust_cf, loss_cls, x: torch.Tensor) -> torch.Tensor:
if isinstance(robust_cf, th.GNCRobustCostFunction):
return loss_cls.linearize(
x,
log_radius=robust_cf.log_loss_radius.tensor,
mu=robust_cf.gnc_control_val.tensor,
)
else:
return loss_cls.linearize(x, log_radius=robust_cf.log_loss_radius.tensor)


@pytest.mark.parametrize("batch_size", BATCH_SIZES_TO_TEST)
@pytest.mark.parametrize(
"loss_cls", [th.WelschLoss, th.HingeLoss, th.HuberLoss, th.GemanMcClureLoss]
)
def test_robust_cost_weighted_error(batch_size, loss_cls):
generator = torch.Generator()
generator.manual_seed(0)
for _ in range(10):
for batch_size in BATCH_SIZES_TO_TEST:
for loss_cls in [th.WelschLoss, th.HuberLoss]:
robust_cf = _new_robust_cf(batch_size, loss_cls, generator)
cf = robust_cf.cost_function
e = cf.weighted_error()
rho = robust_cf.weighted_error()
rho2 = (rho * rho).sum(dim=1, keepdim=True)
# `RobustCostFunction.weighted_error` is written so that
# ||we||2 == rho(||e||2)
expected_rho2 = loss_cls.evaluate(
(e * e).sum(dim=1, keepdim=True), robust_cf.log_loss_radius.tensor
)
torch.testing.assert_close(rho2, expected_rho2)
robust_cf = _new_robust_cf(
batch_size,
loss_cls,
generator,
gnc_cost=issubclass(loss_cls, th.GemanMcClureLoss),
)
cf = robust_cf.cost_function
e = cf.weighted_error()
rho = robust_cf.weighted_error()
rho2 = (rho * rho).sum(dim=1, keepdim=True)
# `RobustCostFunction.weighted_error` is written so that
# ||we||2 == rho(||e||2)
expected_rho2 = _loss_evaluate(
robust_cf,
loss_cls,
(e * e).sum(dim=1, keepdim=True),
)
torch.testing.assert_close(rho2, expected_rho2)


@pytest.mark.parametrize("batch_size", BATCH_SIZES_TO_TEST)
@pytest.mark.parametrize("loss_cls", [th.WelschLoss, th.HuberLoss])
@pytest.mark.parametrize(
"loss_cls", [th.WelschLoss, th.HingeLoss, th.HuberLoss, th.GemanMcClureLoss]
)
def test_robust_cost_grad_form(batch_size, loss_cls):
generator = torch.Generator()
generator.manual_seed(0)
for _ in range(10):
robust_cf = _new_robust_cf(batch_size, loss_cls, generator)
robust_cf = _new_robust_cf(
batch_size,
loss_cls,
generator,
gnc_cost=issubclass(loss_cls, th.GemanMcClureLoss),
)
cf = robust_cf.cost_function
jacs, e = cf.weighted_jacobians_error()
cf_grad = _grad(jacs[0], e)
e_norm = (e * e).sum(1, keepdim=True)
rho_prime = loss_cls.linearize(e_norm, robust_cf.log_loss_radius.tensor)
rho_prime = _loss_linearize(robust_cf, loss_cls, e_norm)
# `weighted_jacobians_error()` is written so that it results in a
# gradient equal to drho_de2 * J^T * e, which in the code is
# `rho_prime * cf_grad`.
Expand All @@ -73,26 +126,49 @@ def test_robust_cost_grad_form(batch_size, loss_cls):


@pytest.mark.parametrize("batch_size", BATCH_SIZES_TO_TEST)
@pytest.mark.parametrize("loss_cls", [th.WelschLoss, th.HuberLoss])
@pytest.mark.parametrize(
"loss_cls", [th.WelschLoss, th.HingeLoss, th.HuberLoss, th.GemanMcClureLoss]
)
def test_robust_cost_jacobians(batch_size, loss_cls):
generator = torch.Generator()
generator.manual_seed(0)

for _ in range(10):
robust_cf = _new_robust_cf(batch_size, loss_cls, generator)
robust_cf = _new_robust_cf(
batch_size,
loss_cls,
generator,
gnc_cost=issubclass(loss_cls, th.GemanMcClureLoss),
)
v1, v2 = robust_cf.cost_function.var, robust_cf.cost_function.target
v_aux = v1.copy()
ll_radius = robust_cf.log_loss_radius
w = robust_cf.cost_function.weight

def test_fn(v_data):
v_aux.update(v_data)
new_robust_cf = th.RobustCostFunction(
th.Local(v_aux, v2, w), loss_cls, ll_radius
new_robust_cf = (
th.GNCRobustCostFunction(
th.Local(v_aux, v2, w),
loss_cls,
log_loss_radius=ll_radius,
gnc_control_val=robust_cf.gnc_control_val,
)
if issubclass(loss_cls, th.GemanMcClureLoss)
else th.RobustCostFunction(
th.Local(v_aux, v2, w), loss_cls, log_loss_radius=ll_radius
)
)
e = new_robust_cf.cost_function.weighted_error()
e_norm = (e * e).sum(1, keepdim=True)
return loss_cls.evaluate(e_norm, ll_radius.tensor) / 2.0
return (
_loss_evaluate(
new_robust_cf,
loss_cls,
e_norm,
)
/ 2.0
)

aux_id = torch.arange(batch_size)
grad_raw_dense = torch.autograd.functional.jacobian(test_fn, (v1.tensor,))[0]
Expand All @@ -105,10 +181,19 @@ def test_fn(v_data):
torch.testing.assert_close(grad, expected_grad, atol=1e-2, rtol=1e-2)


def test_masked_jacobians_called(monkeypatch):
@pytest.mark.parametrize(
"loss_cls", [th.WelschLoss, th.HingeLoss, th.HuberLoss, th.GemanMcClureLoss]
)
def test_masked_jacobians_called(monkeypatch, loss_cls):
rng = torch.Generator()
rng.manual_seed(0)
robust_cf = _new_robust_cf(128, th.WelschLoss, rng, masked_weight=True)
robust_cf = _new_robust_cf(
128,
loss_cls,
rng,
masked_weight=True,
gnc_cost=issubclass(loss_cls, th.GemanMcClureLoss),
)
robust_cf._supports_masking = True

called = [False]
Expand All @@ -124,12 +209,20 @@ def masked_jacobians_mock(cost_fn, mask):
assert called[0]


@pytest.mark.parametrize("loss_cls", [th.WelschLoss, th.HuberLoss])
@pytest.mark.parametrize(
"loss_cls", [th.WelschLoss, th.HingeLoss, th.HuberLoss, th.GemanMcClureLoss]
)
def test_mask_jacobians(loss_cls):
batch_size = 512
rng = torch.Generator()
rng.manual_seed(0)
robust_cf = _new_robust_cf(batch_size, loss_cls, rng, masked_weight=True)
robust_cf = _new_robust_cf(
batch_size,
loss_cls,
rng,
masked_weight=True,
gnc_cost=issubclass(loss_cls, th.GemanMcClureLoss),
)
jac_expected, err_expected = robust_cf.weighted_jacobians_error()
robust_cf._supports_masking = True
jac, err = robust_cf.weighted_jacobians_error()
Expand Down
4 changes: 4 additions & 0 deletions theseus/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@
CostFunction,
CostWeight,
DiagonalCostWeight,
GemanMcClureLoss,
GNCRobustCostFunction,
GNCRobustLoss,
HingeLoss,
HuberLoss,
Objective,
RobustCostFunction,
Expand Down
11 changes: 9 additions & 2 deletions theseus/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,14 @@
)
from .cost_weight import CostWeight, DiagonalCostWeight, ScaleCostWeight
from .objective import Objective
from .robust_cost_function import RobustCostFunction
from .robust_loss import HuberLoss, RobustLoss, WelschLoss
from .robust_cost_function import GNCRobustCostFunction, RobustCostFunction
from .robust_loss import (
GemanMcClureLoss,
GNCRobustLoss,
HingeLoss,
HuberLoss,
RobustLoss,
WelschLoss,
)
from .variable import Variable, as_variable, masked_variables
from .vectorizer import Vectorize
68 changes: 62 additions & 6 deletions theseus/core/robust_cost_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from .cost_function import CostFunction
from .cost_weight import CostWeight
from .robust_loss import RobustLoss
from .robust_loss import GNCRobustLoss, RobustLoss
from .variable import Variable


Expand Down Expand Up @@ -89,7 +89,7 @@ def weighted_error(self) -> torch.Tensor:
if self.flatten_dims:
weighted_error = weighted_error.reshape(-1, 1)
squared_norm = torch.sum(weighted_error**2, dim=1, keepdim=True)
error_loss = self.loss.evaluate(squared_norm, self.log_loss_radius.tensor)
error_loss = self._evaluate_loss(squared_norm)

if self.flatten_dims:
return (error_loss.reshape(-1, self.dim()) + RobustCostFunction._EPS).sqrt()
Expand Down Expand Up @@ -122,10 +122,7 @@ def weighted_jacobians_error(self) -> Tuple[List[torch.Tensor], torch.Tensor]:
for i, wj in enumerate(weighted_jacobians):
weighted_jacobians[i] = wj.view(-1, 1, wj.shape[2])
squared_norm = torch.sum(weighted_error**2, dim=1, keepdim=True)
rescale = (
self.loss.linearize(squared_norm, self.log_loss_radius.tensor)
+ RobustCostFunction._EPS
).sqrt()
rescale = (self._linearize(squared_norm) + RobustCostFunction._EPS).sqrt()

rescaled_jacobians = [
rescale.view(-1, 1, 1) * jacobian for jacobian in weighted_jacobians
Expand All @@ -149,6 +146,12 @@ def _copy_impl(self, new_name: Optional[str] = None) -> "RobustCostFunction":
flatten_dims=self.flatten_dims,
)

def _evaluate_loss(self, squared_norm: torch.Tensor) -> torch.Tensor:
return self.loss.evaluate(squared_norm, self.log_loss_radius.tensor)

def _linearize(self, squared_norm: torch.Tensor) -> torch.Tensor:
return self.loss.linearize(squared_norm, self.log_loss_radius.tensor)

@property
def weight(self) -> CostWeight:
return self.cost_function.weight
Expand All @@ -165,3 +168,56 @@ def _supports_masking(self) -> bool:
def _supports_masking(self, val: bool):
self.cost_function._supports_masking = val
self.__supports_masking__ = val


# Graduated non-convexity (GNC) is a classic annealing method for approximating the
# global solution for nonconvex minimization of unconstrained, continuous problems,
# and still be adapted in recent works e.g., https://arxiv.org/abs/1909.08605
class GNCRobustCostFunction(RobustCostFunction):
def __init__(
self,
cost_function: CostFunction,
loss_cls: Type[GNCRobustLoss],
log_loss_radius: Variable,
gnc_control_val: Variable,
flatten_dims: bool = False,
name: Optional[str] = None,
):
if not issubclass(loss_cls, GNCRobustLoss):
raise RuntimeError(
f"{loss_cls} must be GNCRobustLoss type to initialize GNCRobustCostFunction."
)

super().__init__(
cost_function,
loss_cls,
log_loss_radius=log_loss_radius,
flatten_dims=flatten_dims,
name=name,
)
self.gnc_control_val = gnc_control_val
self.register_aux_var("gnc_control_val")

def _evaluate_loss(self, squared_norm: torch.Tensor) -> torch.Tensor:
return self.loss.evaluate( # type: ignore
squared_norm,
self.log_loss_radius.tensor,
self.gnc_control_val.tensor,
)

def _linearize(self, squared_norm: torch.Tensor) -> torch.Tensor:
return self.loss.linearize( # type: ignore
squared_norm,
self.log_loss_radius.tensor,
self.gnc_control_val.tensor,
)

def _copy_impl(self, new_name: Optional[str] = None) -> "GNCRobustCostFunction":
return GNCRobustCostFunction(
self.cost_function.copy(),
type(self.loss), # type: ignore
self.log_loss_radius.copy(),
self.gnc_control_val.copy(),
name=new_name,
flatten_dims=self.flatten_dims,
)
Loading

0 comments on commit 240e120

Please sign in to comment.