Skip to content

Commit

Permalink
feat(backend): optimise generating project data (#2214)
Browse files Browse the repository at this point in the history
* refactor(backend): optimized generating project data

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* refactor: use single cursor to execute all sqls

* fix: stringify datetime to make it json serializable

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* refactor(backend): Optimize GeoJSON task splitting with CTE-based approach

* fix: use ST contains on centroid instead of ST intersects

* fix: use ST_Within instead of ST_Contains

* refactor: add back the comments and notes

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
Sujanadh and pre-commit-ci[bot] authored Mar 7, 2025
1 parent e909a68 commit ed1c035
Show file tree
Hide file tree
Showing 2 changed files with 207 additions and 258 deletions.
289 changes: 121 additions & 168 deletions src/backend/app/db/postgis_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
from osm_fieldwork.data_models import data_models_path
from osm_rawdata.postgres import PostgresClient
from psycopg import Connection, ProgrammingError
from psycopg.rows import class_row
from psycopg.rows import class_row, dict_row
from psycopg.types.json import Json
from shapely.geometry import MultiPolygon, Point, Polygon, mapping, shape
from shapely.geometry.base import BaseGeometry
from shapely.ops import unary_union
Expand Down Expand Up @@ -155,33 +156,35 @@ async def flatgeobuf_to_featcol(
cur.row_factory = class_row(geojson_pydantic.FeatureCollection)
await cur.execute(
"""
SELECT
'FeatureCollection' as type,
COALESCE(jsonb_agg(feature), '[]'::jsonb) AS features
FROM (
WITH feature_data AS MATERIALIZED (
SELECT
geom,
osm_id,
tags,
version,
changeset,
timestamp
FROM ST_FromFlatGeobuf(null::temp_fgb, %(fgb_bytes)s)
),
processed_features AS MATERIALIZED (
SELECT jsonb_build_object(
'type', 'Feature',
'geometry', ST_AsGeoJSON(ST_GeometryN(fgb_data.geom, 1))::jsonb,
'id', fgb_data.osm_id::VARCHAR,
'geometry', ST_AsGeoJSON(ST_GeometryN(geom, 1))::jsonb,
'id', osm_id::VARCHAR,
'properties', jsonb_build_object(
'osm_id', fgb_data.osm_id,
'tags', fgb_data.tags,
'version', fgb_data.version,
'changeset', fgb_data.changeset,
'timestamp', fgb_data.timestamp
)::jsonb
'osm_id', osm_id,
'tags', tags,
'version', version,
'changeset', changeset,
'timestamp', timestamp
)
) AS feature
FROM (
SELECT
geom,
osm_id,
tags,
version,
changeset,
timestamp
FROM ST_FromFlatGeobuf(null::temp_fgb, %(fgb_bytes)s)
) AS fgb_data
) AS features;
FROM feature_data
)
SELECT
'FeatureCollection' as type,
COALESCE(jsonb_agg(feature), '[]'::jsonb) AS features
FROM processed_features;
""",
{"fgb_bytes": flatgeobuf},
)
Expand All @@ -198,8 +201,7 @@ async def flatgeobuf_to_featcol(
)
return None

if featcol:
return geojson.loads(featcol.model_dump_json())
return geojson.loads(featcol.model_dump_json()) if featcol else None


async def split_geojson_by_task_areas(
Expand All @@ -209,7 +211,7 @@ async def split_geojson_by_task_areas(
) -> Optional[dict[int, geojson.FeatureCollection]]:
"""Split GeoJSON into tagged task area GeoJSONs.
NOTE inserts feature.properties.osm_id as feature.id for each feature.
NOTE batch inserts feature.properties.osm_id as feature.id for each feature.
NOTE ST_Within used on polygon centroids to correctly capture the geoms per task.
Args:
Expand All @@ -221,129 +223,88 @@ async def split_geojson_by_task_areas(
dict[int, geojson.FeatureCollection]: {task_id: FeatureCollection} mapping.
"""
try:
async with db.cursor() as cur:
await cur.execute("""
-- Drop table if already exists
DROP TABLE IF EXISTS temp_features CASCADE;
""")

await cur.execute("""
-- Create a temporary table to store the parsed GeoJSON features
CREATE TEMP TABLE temp_features (
id VARCHAR,
geometry GEOMETRY,
properties JSONB
);
""")

features = featcol["features"]
feature_ids = []
feature_properties = []
feature_geometries = []
result_dict = {}
for f in features:
feature_ids.append(str(f["properties"].get("osm_id")))
feature_properties.append(Json(f["properties"]))
feature_geometries.append(json.dumps(f["geometry"]))

async with db.cursor(row_factory=dict_row) as cur:
await cur.execute(
"""
-- Insert parsed geometries and properties into the temporary table
INSERT INTO temp_features (id, geometry, properties)
WITH feature_data AS (
SELECT DISTINCT ON (geom)
unnest(%s::TEXT[]) AS id,
unnest(%s::JSONB[]) AS properties,
ST_SetSRID(ST_GeomFromGeoJSON(unnest(%s::TEXT[])), 4326) AS geom
),
task_features AS (
SELECT
(feature->'properties'->>'osm_id')::VARCHAR AS id,
ST_SetSRID(
ST_GeomFromGeoJSON(feature->>'geometry'),
4326
) AS geometry,
jsonb_set(
jsonb_set(
feature->'properties',
'{task_id}', to_jsonb(tasks.project_task_index), true
),
'{project_id}', to_jsonb(tasks.project_id), true
) AS properties
FROM (
SELECT jsonb_array_elements(
CAST(%(geojson_featcol)s AS jsonb
)->'features')
AS feature
) AS features
JOIN tasks ON tasks.project_id = %(project_id)s
WHERE
ST_Within(
ST_Centroid(ST_SetSRID(
ST_GeomFromGeoJSON(feature->>'geometry'
), 4326)
), tasks.outline);
""",
{
"project_id": project_id,
"geojson_featcol": json.dumps(featcol),
},
)

# Set row_factory to output task_id:task_featcol dict
cur.row_factory = class_row(dict[int, geojson_pydantic.FeatureCollection])
await cur.execute(
"""
-- Retrieve task outlines based on the provided project_id
SELECT
tasks.project_task_index AS task_id,
t.project_task_index AS task_id,
jsonb_build_object(
'type', 'FeatureCollection',
'features', jsonb_agg(feature)
) AS task_featcol
FROM
tasks
LEFT JOIN LATERAL (
SELECT
jsonb_build_object(
'type', 'Feature',
'geometry', ST_AsGeoJSON(temp_features.geometry)::jsonb,
'id', temp_features.id::VARCHAR,
'properties', temp_features.properties
) AS feature
FROM (
SELECT DISTINCT ON (geometry)
id,
geometry,
properties
FROM temp_features
) AS temp_features
WHERE
ST_Within(ST_Centroid(temp_features.geometry), tasks.outline)
) AS feature ON true
WHERE
tasks.project_id = %(project_id)s
GROUP BY
tasks.project_task_index;
'type', 'Feature',
'id', f.id,
'geometry', ST_AsGeoJSON(f.geom)::jsonb,
'properties', jsonb_set(
jsonb_set(
f.properties,
'{task_id}',
to_jsonb(t.project_task_index)
),
'{project_id}', to_jsonb(%s)
)
) AS feature
FROM tasks t
JOIN feature_data f
ON ST_Within(ST_Centroid(f.geom), t.outline)
WHERE t.project_id = %s
)
SELECT
task_id,
jsonb_agg(feature) AS features
FROM task_features
GROUP BY task_id;
""",
{"project_id": project_id},
(
feature_ids,
feature_properties,
feature_geometries,
project_id,
project_id,
),
)
task_featcol_dict = await cur.fetchall()

await cur.execute("""
-- Drop table at the end
DROP TABLE IF EXISTS temp_features CASCADE;
""")
# Convert results into GeoJSON FeatureCollection
result_dict = {
rec["task_id"]: geojson.FeatureCollection(features=rec["features"])
for rec in await cur.fetchall()
}
if not result_dict:
msg = f"Failed to split project ({project_id}) geojson by task areas."
log.exception(msg)
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail=msg,
)

if len(result_dict) < 1:
msg = (
f"Attempted splitting project ({project_id}) geojson by task areas,"
"but no data was returned."
)
log.warning(msg)
return None
return result_dict

except ProgrammingError as e:
log.error(e)
log.error("Attempted geojson task splitting failed")
return None

if not task_featcol_dict:
msg = f"Failed to split project ({project_id}) geojson by task areas."
log.exception(msg)
raise HTTPException(
status_code=HTTPStatus.INTERNAL_SERVER_ERROR,
detail=msg,
)

if len(task_featcol_dict) < 1:
msg = (
f"Attempted splitting project ({project_id}) geojson by task areas, "
"but no data was returned."
)
log.warning(msg)
return None

# Update to be a dict of repeating task_id:task_featcol pairs
return {record["task_id"]: record["task_featcol"] for record in task_featcol_dict}
return task_featcol_dict


def add_required_geojson_properties(
geojson: geojson.FeatureCollection,
Expand All @@ -353,43 +314,35 @@ def add_required_geojson_properties(
This step is required prior to flatgeobuf generation,
else the workflows of conversion between the formats will fail.
"""
for feature in geojson.get("features", []):
properties = feature.get("properties", {})

# The top level id is defined, set to osm_id
if feature_id := feature.get("id"):
properties["osm_id"] = feature_id
features = geojson.get("features", [])
current_date = timestamp()

for feature in features:
properties = feature.get("properties", {})
# Check for id type embedded in properties
if osm_id := properties.get("osm_id"):
if feature_id := feature.get("id"):
# osm_id property exists, set top level id
feature["id"] = f"{osm_id}"
properties["osm_id"] = feature_id
else:
if prop_id := properties.get("id"):
# id is nested in properties, use that
feature["id"] = f"{prop_id}"
properties["osm_id"] = prop_id
elif fid := properties.get("fid"):
# The default from QGIS
feature["id"] = f"{fid}"
properties["osm_id"] = fid
else:
osm_id = (
properties.get("osm_id")
or properties.get("id")
or properties.get("fid")
# Random id
# NOTE 32-bit int is max supported by standard postgres Integer
random_id = getrandbits(30)
feature["id"] = f"{random_id}"
properties["osm_id"] = random_id

# Other required fields
if not properties.get("tags"):
properties["tags"] = ""
if not properties.get("version"):
properties["version"] = 1
if not properties.get("changeset"):
properties["changeset"] = 1
if not properties.get("timestamp"):
properties["timestamp"] = timestamp().strftime("%Y-%m-%dT%H:%M:%S")
properties["submission_ids"] = None
or getrandbits(30)
)
feature["id"] = str(osm_id)
properties["osm_id"] = osm_id

# Set default values efficiently
properties.setdefault("tags", "")
properties.setdefault("version", 1)
properties.setdefault("changeset", 1)
properties.setdefault("timestamp", str(current_date))
properties.setdefault("submission_ids", None)

feature["properties"] = properties

return geojson

Expand Down
Loading

0 comments on commit ed1c035

Please sign in to comment.