Skip to content

Commit

Permalink
Merge branch '255-add-peak-on-edge' into 'release'
Browse files Browse the repository at this point in the history
Resolve "Ajout du critère PEAK_ON_EDGE"

Closes #255

See merge request 3d/PandoraBox/pandora2d!206
  • Loading branch information
lecontm committed Mar 3, 2025
2 parents 671fea8 + 7aef4ea commit 7e222b5
Show file tree
Hide file tree
Showing 4 changed files with 221 additions and 15 deletions.
88 changes: 76 additions & 12 deletions pandora2d/criteria.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
This module contains functions associated to the validity mask and criteria dataarray created in the cost volume step.
"""
import itertools
from typing import Union
from typing import Union, Tuple
import xarray as xr
import numpy as np
from numpy.typing import NDArray
Expand All @@ -33,6 +33,32 @@
)


def get_disparity_grids(
left_image: xr.Dataset, cv_coords: Tuple[NDArray, NDArray]
) -> Tuple[NDArray, NDArray, NDArray, NDArray]:
"""
Return disparity grid from left image according to cost_volumes row and col coordinates.
We need to use the cost volume coordinates to process the right points when the step is different from 1.
:param left_image: left image
:type left_image: xr.Dataset
:param cv_coords: cost volumes row and column coordinates
:type cv_coords: Tuple[NDArray, NDArray]
:return: 4 disparity grids
:rtype: Tuple[NDArray, NDArray, NDArray, NDArray]
"""

# Get rows disparity grid
d_min_row_grid = left_image["row_disparity"].sel(row=cv_coords[0], col=cv_coords[1], band_disp="min").data
d_max_row_grid = left_image["row_disparity"].sel(row=cv_coords[0], col=cv_coords[1], band_disp="max").data

# Get columns disparity grid
d_min_col_grid = left_image["col_disparity"].sel(row=cv_coords[0], col=cv_coords[1], band_disp="min").data
d_max_col_grid = left_image["col_disparity"].sel(row=cv_coords[0], col=cv_coords[1], band_disp="max").data

return d_min_row_grid, d_max_row_grid, d_min_col_grid, d_max_col_grid


def allocate_criteria_dataarray(
cv: xr.Dataset, value: Union[int, float, Criteria] = Criteria.VALID, data_type: Union[np.dtype, None] = None
) -> xr.DataArray:
Expand Down Expand Up @@ -61,6 +87,15 @@ def get_criteria_dataarray(left_image: xr.Dataset, right_image: xr.Dataset, cv:
"""
This method fill the criteria dataarray with the different criteria obtained thanks to
the methods implemented in this file
:param left_image: left image
:type left_image: xr.Dataset
:param right_image: right image
:type right_image: xr.Dataset
:param cv: cost_volumes
:type cv: 4D xarray.Dataset
:return: criteria_dataarray: 4D DataArray containing the criteria
:rtype: criteria_dataarray: xr.DataArray
"""

# Allocate criteria dataarray
Expand Down Expand Up @@ -92,17 +127,10 @@ def get_criteria_dataarray(left_image: xr.Dataset, right_image: xr.Dataset, cv:
# on the border according to offset value, for each disparity
mask_border(cv.attrs["offset_row_col"], criteria_dataarray)

# Select correct rows and columns in case of a step different from 1.
row_cv = cv.row.values
col_cv = cv.col.values

# Get columns disparity grid
d_min_col_grid = left_image["col_disparity"].sel(row=row_cv, col=col_cv, band_disp="min").data
d_max_col_grid = left_image["col_disparity"].sel(row=row_cv, col=col_cv, band_disp="max").data

# Get rows disparity grid
d_min_row_grid = left_image["row_disparity"].sel(row=row_cv, col=col_cv, band_disp="min").data
d_max_row_grid = left_image["row_disparity"].sel(row=row_cv, col=col_cv, band_disp="max").data
# Get disparity grids according to cost volumes coordinates
d_min_row_grid, d_max_row_grid, d_min_col_grid, d_max_col_grid = get_disparity_grids(
left_image, (cv.row.values, cv.col.values)
)

# Put PANDORA2D_MSK_PIXEL_DISPARITY_UNPROCESSED
# on points for which corresponding disparity is not processed
Expand Down Expand Up @@ -311,3 +339,39 @@ def apply_right_criteria_mask(criteria_dataarray: xr.DataArray, right_criteria_m
"disp_row": row_disp,
}
] |= right_criteria_mask[msk_row_slice, msk_col_slice]


def apply_peak_on_edge(
criteria_dataarray: xr.DataArray,
left_image: xr.Dataset,
cv_coords: Tuple[NDArray, NDArray],
row_map: NDArray,
col_map: NDArray,
):
"""
This method raises PANDORA2D_MSK_PIXEL_PEAK_ON_EDGE criteria for points (row, col)
for which the best matching cost is found for the edge of the disparity range.
This criteria is applied on point (row, col), for each disparity value.
:param criteria_dataaray: criteria dataarray to update
:type criteria_dataaray: xr.DataArray
:param left_image: left image
:type left_image: xr.Dataset
:param cv_coords: cost volumes row and column coordinates
:type cv_coords: Tuple[NDArray, NDArray]
:param row_map: row disparity map
:type row_map: NDArray
:param col_map: col disparity map
:type col_map: NDArray
"""

# Get disparity grids according to cost volumes coordinates
d_min_row_grid, d_max_row_grid, d_min_col_grid, d_max_col_grid = get_disparity_grids(left_image, cv_coords)

# Apply PANDORA2D_MSK_PIXEL_PEAK_ON_EDGE criteria
criteria_dataarray.data[
(row_map == d_min_row_grid) | (row_map == d_max_row_grid)
] |= Criteria.PANDORA2D_MSK_PIXEL_PEAK_ON_EDGE
criteria_dataarray.data[
(col_map == d_min_col_grid) | (col_map == d_max_col_grid)
] |= Criteria.PANDORA2D_MSK_PIXEL_PEAK_ON_EDGE
10 changes: 10 additions & 0 deletions pandora2d/state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,16 @@ def disp_maps_run(self, cfg: Dict[str, dict], input_step: str) -> None:
},
)

cv_coords = (self.cost_volumes.row.values, self.cost_volumes.col.values)

criteria.apply_peak_on_edge(
self.criteria_dataarray,
self.left_img,
cv_coords,
self.dataset_disp_maps["row_map"].data,
self.dataset_disp_maps["col_map"].data,
)

@mem_time_profile(name="Refinement step")
def refinement_run(self, cfg: Dict[str, dict], input_step: str) -> None:
"""
Expand Down
4 changes: 2 additions & 2 deletions tests/unit_tests/test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ def test_dataset_disp_maps_with_pipeline_computation(self, roi, step, left_image
cfg_disp = {"disparity_method": "wta", "invalid_disparity": -9999}
disparity_matcher = disparity.Disparity(cfg_disp)
# compute disparity maps
delta_row, delta_col, correlation_score = disparity_matcher.compute_disp_maps(cvs)
delta_col, delta_row, correlation_score = disparity_matcher.compute_disp_maps(cvs)

# create dataset with dataset_disp_maps function
disparity_maps = common.dataset_disp_maps(
Expand All @@ -497,7 +497,7 @@ def test_dataset_disp_maps_with_pipeline_computation(self, roi, step, left_image

# create ground truth with create_dataset_coords method
dataset_ground_truth = create_dataset_coords(
delta_x, delta_y, correlation_score, disparity_maps.row, disparity_maps.col
delta_y, delta_x, correlation_score, disparity_maps.row, disparity_maps.col
)

assert disparity_maps.equals(dataset_ground_truth)
Expand Down
134 changes: 133 additions & 1 deletion tests/unit_tests/test_criteria.py
Original file line number Diff line number Diff line change
Expand Up @@ -1398,9 +1398,141 @@ def test_criteria_datarray_created_in_state_machine(

checked_cfg = check_conf(configuration, pandora2d_machine)

_, __ = run(pandora2d_machine, img_left, img_right, checked_cfg)
dataset_disp_maps, _ = run(pandora2d_machine, img_left, img_right, checked_cfg)

# Get peak on the edges to add Criteria.PANDORA2D_MSK_PIXEL_PEAK_ON_EDGE in ground_truth_criteria_dataarray
row_peak_mask = (
dataset_disp_maps["row_map"].data
== correct_input_cfg["input"]["row_disparity"]["init"] - correct_input_cfg["input"]["row_disparity"]["range"]
) | (
dataset_disp_maps["row_map"].data
== correct_input_cfg["input"]["row_disparity"]["init"] + correct_input_cfg["input"]["row_disparity"]["range"]
)

col_peak_mask = (
dataset_disp_maps["col_map"].data
== correct_input_cfg["input"]["col_disparity"]["init"] - correct_input_cfg["input"]["col_disparity"]["range"]
) | (
dataset_disp_maps["col_map"].data
== correct_input_cfg["input"]["col_disparity"]["init"] + correct_input_cfg["input"]["col_disparity"]["range"]
)

ground_truth_criteria_dataarray[row_peak_mask | col_peak_mask] |= Criteria.PANDORA2D_MSK_PIXEL_PEAK_ON_EDGE

# Check that criteria_dataarray exists and is the same shape as the cost_volumes object
assert pandora2d_machine.cost_volumes.cost_volumes.sizes == pandora2d_machine.criteria_dataarray.sizes
# Check that criteria dataarray contains correct criteria
np.testing.assert_array_equal(pandora2d_machine.criteria_dataarray.data, ground_truth_criteria_dataarray)


class TestPeakOnEdge:
"""
Test the methods linked to PANDORA2D_MSK_PIXEL_PEAK_ON_EDGE criteria
"""

@pytest.fixture()
def row_map(self, img_size, disparity_cfg):
"""
Row disparity map used for tests
"""

row_map = np.full(img_size, 2)

# row_map[0,0] is equal to the minimum of the row disparity range
row_map[0, 0] = disparity_cfg[0]["init"] - disparity_cfg[0]["range"]
# row_map[3,3] is equal to the maximum of the row disparity range
row_map[3, 3] = disparity_cfg[0]["init"] + disparity_cfg[0]["range"]
return row_map

@pytest.fixture()
def col_map(self, img_size, disparity_cfg):
"""
Col disparity map used for tests
"""

col_map = np.full(img_size, -1)

# col_map[0,0] is equal to the maximum of the col disparity range
col_map[0, 0] = disparity_cfg[1]["init"] + disparity_cfg[1]["range"]
# col_map[0,0] is equal to the minimum of the col disparity range
col_map[4, 5] = disparity_cfg[1]["init"] - disparity_cfg[1]["range"]
return col_map

@pytest.fixture()
def row_map_full_peak(self, img_size, disparity_cfg):
"""
Row disparity map with only peak on edges used for tests
"""

# row_map is filled with the minimum of the row disparity range
row_map = np.full(img_size, disparity_cfg[0]["init"] - disparity_cfg[0]["range"])
return row_map

@pytest.fixture()
def col_map_full_peak(self, img_size, disparity_cfg):
"""
Col disparity map with only peak on edges used for tests
"""

# col_map is filled with the maximum of the col disparity range
col_map = np.full(img_size, disparity_cfg[1]["init"] + disparity_cfg[1]["range"])
return col_map

@pytest.fixture()
def map_without_peak(self, img_size):
"""
Disparity map without peak on edges
"""

return np.full(img_size, 1)

def test_apply_peak_on_edge(self, criteria_dataarray, image, cost_volumes, row_map, col_map):
"""
Test the apply_peak_on_edge method
"""

cost_volumes_coords = (cost_volumes.row.values, cost_volumes.col.values)

criteria.apply_peak_on_edge(criteria_dataarray, image, cost_volumes_coords, row_map, col_map)

assert (criteria_dataarray.data[0, 0, :, :] == Criteria.PANDORA2D_MSK_PIXEL_PEAK_ON_EDGE).all()
assert (criteria_dataarray.data[4, 5, :, :] == Criteria.PANDORA2D_MSK_PIXEL_PEAK_ON_EDGE).all()
assert (criteria_dataarray.data[3, 3, :, :] == Criteria.PANDORA2D_MSK_PIXEL_PEAK_ON_EDGE).all()

@pytest.mark.parametrize(
["drow_map", "dcol_map"],
[
pytest.param("row_map_full_peak", "col_map_full_peak", id="Row and col disparity maps full of peaks"),
pytest.param("row_map_full_peak", "col_map", id="Row map full of peaks"),
pytest.param("map_without_peak", "col_map_full_peak", id="Col map full of peaks"),
],
)
def test_apply_peak_on_edge_full_peak_map(
self, criteria_dataarray, image, cost_volumes, drow_map, dcol_map, request
):
"""
Test the apply_peak_on_edge method with disparity maps full of peaks on edges
"""

cost_volumes_coords = (cost_volumes.row.values, cost_volumes.col.values)

criteria.apply_peak_on_edge(
criteria_dataarray,
image,
cost_volumes_coords,
request.getfixturevalue(drow_map),
request.getfixturevalue(dcol_map),
)

assert (criteria_dataarray.data == Criteria.PANDORA2D_MSK_PIXEL_PEAK_ON_EDGE).all()

def test_apply_peak_on_edge_without_peak(self, criteria_dataarray, image, cost_volumes, map_without_peak):
"""
Test the apply_peak_on_edge method with maps without peaks on edges
"""

cost_volumes_coords = (cost_volumes.row.values, cost_volumes.col.values)

criteria.apply_peak_on_edge(criteria_dataarray, image, cost_volumes_coords, map_without_peak, map_without_peak)

assert (criteria_dataarray.data != Criteria.PANDORA2D_MSK_PIXEL_PEAK_ON_EDGE).all()

0 comments on commit 7e222b5

Please sign in to comment.