Closed
Description
relates #707
Description:
To efficiently process CO3D data, the block white
function needs to be improved for better scalability. This includes:
- Adding a condition in the
block wise
settings to enable or disable the application of a RANSAC filter in the_apply_rst
function. - Reusing and integrating the existing code from the POC.
- Adding unit tests to validate the implementation.
Tasks:
- Add a parameter in
blockwise
settings to enable/disable the RANSAC filter. - Integrate the RANSAC filter into the
_apply_rst
function. - Retrieve and adapt the POC code for this implementation.
- Add unit tests to validate the filter activation and functionality.
Some code
def filter_ransac(
coreg, positions, thresh=0.05, minPoints=10, maxIteration=100
):
in_shape = coreg.shape
rows, cols = positions[:, :, 0], positions[:, :, 1]
rows, cols, arr = rows.flatten(), cols.flatten(), coreg.flatten()
points = np.squeeze(np.dstack([rows, cols, arr]))
points = points[~np.isnan(points).any(axis=1), :]
plane = pyrsc.Plane()
best_eq, best_inliers = plane.fit(
points, thresh=thresh, minPoints=minPoints, maxIteration=maxIteration
)
A = -best_eq[0] / best_eq[2]
B = -best_eq[1] / best_eq[2]
C = -best_eq[3] / best_eq[2]
new_arr = rows * A + cols * B + C
new_arr = np.reshape(new_arr, in_shape)
return new_arr, [A, B, C]
def generate_grids_from_ransac(raster_shape, ransac_coefs, step=30):
"""
Generate position grids
"""
row_min = 0
row_max = raster_shape[0]
col_min = 0
col_max = raster_shape[1]
row_range = np.arange(start=row_min, stop=row_max + step, step=step)
col_range = np.arange(start=col_min, stop=col_max + step, step=step)
(
row_gridvalues,
col_gridvalues,
) = np.meshgrid(
col_range,
row_range,
)
grid = (
ransac_coefs[0] * row_gridvalues
+ ransac_coefs[1] * col_gridvalues
+ ransac_coefs[2]
)
return grid, row_gridvalues, col_gridvalues
def generate_correction_grid(correg, step, dem_shape):
correg_shift = correg["coreg"]
positions = correg["positions"]
percent_not_nan = np.sum(~np.isnan(np.array(correg_shift))) / np.size(
np.array(correg_shift)
)
nb_points_min = int(
(correg_shift.shape[0] * correg_shift.shape[1]) * 0.85 * percent_not_nan
)
max_iter = 2000
thresh_x = (
np.nanpercentile(correg_shift[:, :, 0], 90)
- np.nanpercentile(correg_shift[:, :, 0], 10)
) / 3
thresh_y = (
np.nanpercentile(correg_shift[:, :, 1], 90)
- np.nanpercentile(correg_shift[:, :, 1], 10)
) / 3
x_2d_ransac, coef_ransac_x = filter_ransac(
correg_shift[:, :, 0],
positions,
thresh=thresh_x,
minPoints=nb_points_min,
maxIteration=max_iter,
)
y_2d_ransac, coef_ransac_y = filter_ransac(
correg_shift[:, :, 1],
positions,
thresh=thresh_y,
minPoints=nb_points_min,
maxIteration=max_iter,
)
x_grid, row_gridvalues, col_gridvalues = generate_grids_from_ransac(
dem_shape, coef_ransac_x, step=step
)
y_grid, _, _ = generate_grids_from_ransac(
dem_shape, coef_ransac_y, step=step
)
x_grid += row_gridvalues
y_grid += col_gridvalues
position_grid = {"grid": np.stack([x_grid, y_grid], axis=0), "step": step}
return position_grid
## /!\ code based on CARS, cf licence
def resample_dem(row_min, row_max, col_min, col_max, position_grid, dem_path):
with rio.open(dem_path) as img_reader:
block_region = [row_min, col_min, row_max, col_max]
oversampling = position_grid["step"]
grid = position_grid["grid"]
# Convert resampled region to grid region with oversampling
grid_region = [
math.floor(block_region[0] / oversampling),
math.floor(block_region[1] / oversampling),
math.ceil(block_region[2] / oversampling),
math.ceil(block_region[3] / oversampling),
]
grid_region[0::2] = list(np.clip(grid_region[0::2], 0, grid.shape[1]))
grid_region[1::2] = list(np.clip(grid_region[1::2], 0, grid.shape[2]))
grid_as_array = copy.copy(
grid[
:,
grid_region[0] : grid_region[2] + 1,
grid_region[1] : grid_region[3] + 1,
]
)
# get needed source bounding box
left = math.floor(np.amin(grid_as_array[1, ...]))
right = math.ceil(np.amax(grid_as_array[1, ...]))
top = math.floor(np.amin(grid_as_array[0, ...]))
bottom = math.ceil(np.amax(grid_as_array[0, ...]))
# filter margin for bicubic = 2
filter_margin = 2
top -= filter_margin
bottom += filter_margin
left -= filter_margin
right += filter_margin
left, right = list(np.clip([left, right], 0, img_reader.shape[0]))
top, bottom = list(np.clip([top, bottom], 0, img_reader.shape[1]))
img_window = Window.from_slices([left, right], [top, bottom])
# round window
img_window = img_window.round_offsets()
img_window = img_window.round_lengths()
# Compute offset
x_offset = min(left, right)
y_offset = min(top, bottom)
# Get dem data
img_as_array = img_reader.read(window=img_window)
# shift grid regarding the img extraction
grid_as_array[1, ...] -= x_offset
grid_as_array[0, ...] -= y_offset
block_resamp = cresample.grid(
img_as_array,
grid_as_array,
oversampling,
interpolator="bicubic",
nodata=0,
).astype(np.float32)
# extract exact region
out_region = oversampling * np.array(grid_region)
ext_region = block_region - out_region
block_resamp = block_resamp[
0,
ext_region[0] : ext_region[2] - 1,
ext_region[1] : ext_region[3] - 1,
]
return block_resamp
def create_tile_resampled(dem_ref_path, dem_sec_path, tile, position_grid):
dem_ref_tile = load_dem_from_window(dem_ref_path, tile)
dem_sec_tile = copy.copy(dem_ref_tile)
arr_sec = resample_dem(
int(tile[0]),
int(tile[1]),
int(tile[2]),
int(tile[3]),
position_grid,
dem_sec_path,
)
return dem_ref_tile.data, arr_sec
def compute_resampling(dem_ref_path, dem_sec_path, tiling_grid, position_grid):
shape = (
int(np.max(tiling_grid[:, :, 0:2])),
int(np.max(tiling_grid[:, :, 2:4])),
)
res_ref = np.empty(shape)
res_sampling = np.empty(shape)
for row in range(tiling_grid.shape[0]):
for col in range(tiling_grid.shape[1]):
row_min, row_max, col_min, col_max = tiling_grid[row, col, :]
row_min, row_max, col_min, col_max = (
int(row_min),
int(row_max),
int(col_min),
int(col_max),
)
(
res_ref[row_min:row_max, col_min:col_max],
res_sampling[row_min:row_max, col_min:col_max],
) = create_tile_resampled(
dem_ref_path,
dem_sec_path,
tiling_grid[row, col, :],
position_grid,
)