From ed1c035c34f007fea4d725d1b1261f12cd561369 Mon Sep 17 00:00:00 2001 From: Sujan Adhikari <109404840+Sujanadh@users.noreply.github.com> Date: Fri, 7 Mar 2025 22:20:14 +0545 Subject: [PATCH] feat(backend): optimise generating project data (#2214) * refactor(backend): optimized generating project data * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * refactor: use single cursor to execute all sqls * fix: stringify datetime to make it json serializable * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * refactor(backend): Optimize GeoJSON task splitting with CTE-based approach * fix: use ST contains on centroid instead of ST intersects * fix: use ST_Within instead of ST_Contains * refactor: add back the comments and notes --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- src/backend/app/db/postgis_utils.py | 289 ++++++++++------------- src/backend/app/projects/project_crud.py | 176 +++++++------- 2 files changed, 207 insertions(+), 258 deletions(-) diff --git a/src/backend/app/db/postgis_utils.py b/src/backend/app/db/postgis_utils.py index 25dd4030c..c9c5f6d5d 100644 --- a/src/backend/app/db/postgis_utils.py +++ b/src/backend/app/db/postgis_utils.py @@ -30,7 +30,8 @@ from osm_fieldwork.data_models import data_models_path from osm_rawdata.postgres import PostgresClient from psycopg import Connection, ProgrammingError -from psycopg.rows import class_row +from psycopg.rows import class_row, dict_row +from psycopg.types.json import Json from shapely.geometry import MultiPolygon, Point, Polygon, mapping, shape from shapely.geometry.base import BaseGeometry from shapely.ops import unary_union @@ -155,33 +156,35 @@ async def flatgeobuf_to_featcol( cur.row_factory = class_row(geojson_pydantic.FeatureCollection) await cur.execute( """ - SELECT - 'FeatureCollection' as type, - COALESCE(jsonb_agg(feature), '[]'::jsonb) AS features - FROM ( + WITH feature_data AS MATERIALIZED ( + SELECT + geom, + osm_id, + tags, + version, + changeset, + timestamp + FROM ST_FromFlatGeobuf(null::temp_fgb, %(fgb_bytes)s) + ), + processed_features AS MATERIALIZED ( SELECT jsonb_build_object( 'type', 'Feature', - 'geometry', ST_AsGeoJSON(ST_GeometryN(fgb_data.geom, 1))::jsonb, - 'id', fgb_data.osm_id::VARCHAR, + 'geometry', ST_AsGeoJSON(ST_GeometryN(geom, 1))::jsonb, + 'id', osm_id::VARCHAR, 'properties', jsonb_build_object( - 'osm_id', fgb_data.osm_id, - 'tags', fgb_data.tags, - 'version', fgb_data.version, - 'changeset', fgb_data.changeset, - 'timestamp', fgb_data.timestamp - )::jsonb + 'osm_id', osm_id, + 'tags', tags, + 'version', version, + 'changeset', changeset, + 'timestamp', timestamp + ) ) AS feature - FROM ( - SELECT - geom, - osm_id, - tags, - version, - changeset, - timestamp - FROM ST_FromFlatGeobuf(null::temp_fgb, %(fgb_bytes)s) - ) AS fgb_data - ) AS features; + FROM feature_data + ) + SELECT + 'FeatureCollection' as type, + COALESCE(jsonb_agg(feature), '[]'::jsonb) AS features + FROM processed_features; """, {"fgb_bytes": flatgeobuf}, ) @@ -198,8 +201,7 @@ async def flatgeobuf_to_featcol( ) return None - if featcol: - return geojson.loads(featcol.model_dump_json()) + return geojson.loads(featcol.model_dump_json()) if featcol else None async def split_geojson_by_task_areas( @@ -209,7 +211,7 @@ async def split_geojson_by_task_areas( ) -> Optional[dict[int, geojson.FeatureCollection]]: """Split GeoJSON into tagged task area GeoJSONs. - NOTE inserts feature.properties.osm_id as feature.id for each feature. + NOTE batch inserts feature.properties.osm_id as feature.id for each feature. NOTE ST_Within used on polygon centroids to correctly capture the geoms per task. Args: @@ -221,129 +223,88 @@ async def split_geojson_by_task_areas( dict[int, geojson.FeatureCollection]: {task_id: FeatureCollection} mapping. """ try: - async with db.cursor() as cur: - await cur.execute(""" - -- Drop table if already exists - DROP TABLE IF EXISTS temp_features CASCADE; - """) - - await cur.execute(""" - -- Create a temporary table to store the parsed GeoJSON features - CREATE TEMP TABLE temp_features ( - id VARCHAR, - geometry GEOMETRY, - properties JSONB - ); - """) - + features = featcol["features"] + feature_ids = [] + feature_properties = [] + feature_geometries = [] + result_dict = {} + for f in features: + feature_ids.append(str(f["properties"].get("osm_id"))) + feature_properties.append(Json(f["properties"])) + feature_geometries.append(json.dumps(f["geometry"])) + + async with db.cursor(row_factory=dict_row) as cur: await cur.execute( """ - -- Insert parsed geometries and properties into the temporary table - INSERT INTO temp_features (id, geometry, properties) + WITH feature_data AS ( + SELECT DISTINCT ON (geom) + unnest(%s::TEXT[]) AS id, + unnest(%s::JSONB[]) AS properties, + ST_SetSRID(ST_GeomFromGeoJSON(unnest(%s::TEXT[])), 4326) AS geom + ), + task_features AS ( SELECT - (feature->'properties'->>'osm_id')::VARCHAR AS id, - ST_SetSRID( - ST_GeomFromGeoJSON(feature->>'geometry'), - 4326 - ) AS geometry, - jsonb_set( - jsonb_set( - feature->'properties', - '{task_id}', to_jsonb(tasks.project_task_index), true - ), - '{project_id}', to_jsonb(tasks.project_id), true - ) AS properties - FROM ( - SELECT jsonb_array_elements( - CAST(%(geojson_featcol)s AS jsonb - )->'features') - AS feature - ) AS features - JOIN tasks ON tasks.project_id = %(project_id)s - WHERE - ST_Within( - ST_Centroid(ST_SetSRID( - ST_GeomFromGeoJSON(feature->>'geometry' - ), 4326) - ), tasks.outline); - """, - { - "project_id": project_id, - "geojson_featcol": json.dumps(featcol), - }, - ) - - # Set row_factory to output task_id:task_featcol dict - cur.row_factory = class_row(dict[int, geojson_pydantic.FeatureCollection]) - await cur.execute( - """ - -- Retrieve task outlines based on the provided project_id - SELECT - tasks.project_task_index AS task_id, + t.project_task_index AS task_id, jsonb_build_object( - 'type', 'FeatureCollection', - 'features', jsonb_agg(feature) - ) AS task_featcol - FROM - tasks - LEFT JOIN LATERAL ( - SELECT - jsonb_build_object( - 'type', 'Feature', - 'geometry', ST_AsGeoJSON(temp_features.geometry)::jsonb, - 'id', temp_features.id::VARCHAR, - 'properties', temp_features.properties - ) AS feature - FROM ( - SELECT DISTINCT ON (geometry) - id, - geometry, - properties - FROM temp_features - ) AS temp_features - WHERE - ST_Within(ST_Centroid(temp_features.geometry), tasks.outline) - ) AS feature ON true - WHERE - tasks.project_id = %(project_id)s - GROUP BY - tasks.project_task_index; - + 'type', 'Feature', + 'id', f.id, + 'geometry', ST_AsGeoJSON(f.geom)::jsonb, + 'properties', jsonb_set( + jsonb_set( + f.properties, + '{task_id}', + to_jsonb(t.project_task_index) + ), + '{project_id}', to_jsonb(%s) + ) + ) AS feature + FROM tasks t + JOIN feature_data f + ON ST_Within(ST_Centroid(f.geom), t.outline) + WHERE t.project_id = %s + ) + SELECT + task_id, + jsonb_agg(feature) AS features + FROM task_features + GROUP BY task_id; """, - {"project_id": project_id}, + ( + feature_ids, + feature_properties, + feature_geometries, + project_id, + project_id, + ), ) - task_featcol_dict = await cur.fetchall() - await cur.execute(""" - -- Drop table at the end - DROP TABLE IF EXISTS temp_features CASCADE; - """) + # Convert results into GeoJSON FeatureCollection + result_dict = { + rec["task_id"]: geojson.FeatureCollection(features=rec["features"]) + for rec in await cur.fetchall() + } + if not result_dict: + msg = f"Failed to split project ({project_id}) geojson by task areas." + log.exception(msg) + raise HTTPException( + status_code=HTTPStatus.INTERNAL_SERVER_ERROR, + detail=msg, + ) + + if len(result_dict) < 1: + msg = ( + f"Attempted splitting project ({project_id}) geojson by task areas," + "but no data was returned." + ) + log.warning(msg) + return None + return result_dict except ProgrammingError as e: log.error(e) log.error("Attempted geojson task splitting failed") return None - if not task_featcol_dict: - msg = f"Failed to split project ({project_id}) geojson by task areas." - log.exception(msg) - raise HTTPException( - status_code=HTTPStatus.INTERNAL_SERVER_ERROR, - detail=msg, - ) - - if len(task_featcol_dict) < 1: - msg = ( - f"Attempted splitting project ({project_id}) geojson by task areas, " - "but no data was returned." - ) - log.warning(msg) - return None - - # Update to be a dict of repeating task_id:task_featcol pairs - return {record["task_id"]: record["task_featcol"] for record in task_featcol_dict} - return task_featcol_dict - def add_required_geojson_properties( geojson: geojson.FeatureCollection, @@ -353,43 +314,35 @@ def add_required_geojson_properties( This step is required prior to flatgeobuf generation, else the workflows of conversion between the formats will fail. """ - for feature in geojson.get("features", []): - properties = feature.get("properties", {}) - - # The top level id is defined, set to osm_id - if feature_id := feature.get("id"): - properties["osm_id"] = feature_id + features = geojson.get("features", []) + current_date = timestamp() + for feature in features: + properties = feature.get("properties", {}) # Check for id type embedded in properties - if osm_id := properties.get("osm_id"): + if feature_id := feature.get("id"): # osm_id property exists, set top level id - feature["id"] = f"{osm_id}" + properties["osm_id"] = feature_id else: - if prop_id := properties.get("id"): - # id is nested in properties, use that - feature["id"] = f"{prop_id}" - properties["osm_id"] = prop_id - elif fid := properties.get("fid"): - # The default from QGIS - feature["id"] = f"{fid}" - properties["osm_id"] = fid - else: + osm_id = ( + properties.get("osm_id") + or properties.get("id") + or properties.get("fid") # Random id # NOTE 32-bit int is max supported by standard postgres Integer - random_id = getrandbits(30) - feature["id"] = f"{random_id}" - properties["osm_id"] = random_id - - # Other required fields - if not properties.get("tags"): - properties["tags"] = "" - if not properties.get("version"): - properties["version"] = 1 - if not properties.get("changeset"): - properties["changeset"] = 1 - if not properties.get("timestamp"): - properties["timestamp"] = timestamp().strftime("%Y-%m-%dT%H:%M:%S") - properties["submission_ids"] = None + or getrandbits(30) + ) + feature["id"] = str(osm_id) + properties["osm_id"] = osm_id + + # Set default values efficiently + properties.setdefault("tags", "") + properties.setdefault("version", 1) + properties.setdefault("changeset", 1) + properties.setdefault("timestamp", str(current_date)) + properties.setdefault("submission_ids", None) + + feature["properties"] = properties return geojson diff --git a/src/backend/app/projects/project_crud.py b/src/backend/app/projects/project_crud.py index cc7eac19e..b3b9fbc24 100644 --- a/src/backend/app/projects/project_crud.py +++ b/src/backend/app/projects/project_crud.py @@ -26,15 +26,15 @@ from traceback import format_exc from typing import Optional, Union +import aiohttp import geojson import geojson_pydantic -import requests from asgiref.sync import async_to_sync from fastapi import HTTPException, Request from loguru import logger as log from osm_login_python.core import Auth from osm_rawdata.postgres import PostgresClient -from psycopg import Connection +from psycopg import Connection, sql from psycopg.rows import class_row from app.auth.providers.osm import get_osm_token, send_osm_message @@ -496,91 +496,83 @@ async def generate_project_files( Returns: bool: True if success. """ - project = await DbProject.one(db, project_id, warn_on_missing_token=False) - log.info(f"Starting generate_project_files for project {project_id}") - - # Extract data extract from flatgeobuf - log.debug("Getting data extract geojson from flatgeobuf") - feature_collection = await get_project_features_geojson(db, project) - - # Get properties to create datasets - entity_properties = list( - feature_collection.get("features")[0].get("properties").keys() - ) - entity_properties.append("submission_ids") - - # Split extract by task area - log.debug("Splitting data extract per task area") - # TODO in future this splitting could be removed if the task_id is - # no longer used in the XLSForm for the map filter - task_extract_dict = await split_geojson_by_task_areas( - db, feature_collection, project_id - ) - - # Get ODK Project details - project_odk_id = project.odkid - project_xlsform = project.xlsform_content - project_odk_form_id = project.odk_form_id - project_odk_creds = project.odk_credentials + try: + project = await project_deps.get_project_by_id(db, project_id) + log.info(f"Starting generate_project_files for project {project_id}") - if not project_odk_creds: - # get default credentials - project_odk_creds = await get_default_odk_creds() + # Extract data extract from flatgeobuf + log.debug("Getting data extract geojson from flatgeobuf") + feature_collection = await get_project_features_geojson(db, project) - odk_token = await generate_odk_central_project_content( - project_odk_id, - project_odk_form_id, - project_odk_creds, - BytesIO(project_xlsform), - task_extract_dict, - entity_properties, - ) - log.debug( - f"Setting encrypted odk token for FMTM project ({project_id}) " - f"ODK project {project_odk_id}: {type(odk_token)} ({odk_token[:15]}...)" - ) - await DbProject.update( - db, - project_id, - project_schemas.ProjectUpdate( - odk_token=odk_token, - ), - ) + # Get properties to create datasets + entity_properties = list( + feature_collection.get("features")[0].get("properties").keys() + ) + entity_properties.append("submission_ids") + + # Split extract by task area + log.debug("Splitting data extract per task area") + # TODO in future this splitting could be removed if the task_id is + # no longer used in the XLSForm for the map filter + task_extract_dict = await split_geojson_by_task_areas( + db, feature_collection, project_id + ) - task_feature_counts = [ - ( - task.id, - len(task_extract_dict.get(task.project_task_index, {}).get("features", [])), + # Get ODK Project details + project_odk_id = project.odkid + project_xlsform = project.xlsform_content + project_odk_form_id = project.odk_form_id + project_odk_creds = project.odk_credentials + + if not project_odk_creds: + # get default credentials + project_odk_creds = await get_default_odk_creds() + + odk_token = await generate_odk_central_project_content( + project_odk_id, + project_odk_form_id, + project_odk_creds, + BytesIO(project_xlsform), + task_extract_dict, + entity_properties, ) - for task in project.tasks - ] - log.debug( - "Setting task feature counts in db for " - f"({len(project.tasks if project.tasks else 0)}) tasks", - ) - sql = """ - WITH task_update(id, feature_count) AS ( - VALUES {} + log.debug( + f"Setting encrypted odk token for FMTM project ({project_id}) " + f"ODK project {project_odk_id}: {type(odk_token)} ({odk_token[:15]}...)" ) - UPDATE - public.tasks t - SET - feature_count = tu.feature_count - FROM - task_update tu - WHERE - t.id = tu.id; - """ - value_placeholders = ", ".join( - f"({task_id}, {feature_count})" - for task_id, feature_count in task_feature_counts - ) - formatted_sql = sql.format(value_placeholders) - async with db.cursor() as cur: - await cur.execute(formatted_sql) + await DbProject.update( + db, + project_id, + project_schemas.ProjectUpdate( + odk_token=odk_token, + ), + ) + task_feature_counts = [ + ( + task.id, + len( + task_extract_dict.get(task.project_task_index, {}).get( + "features", [] + ) + ), + ) + for task in project.tasks + ] + + # Use parameterized batch update + update_data = [ + (task_id, feature_count) for task_id, feature_count in task_feature_counts + ] - log.info("Finished generation of project additional files") - return True + async with db.cursor() as cur: + await cur.executemany( + sql.SQL("UPDATE public.tasks SET feature_count = %s WHERE id = %s"), + [(fc, tid) for tid, fc in update_data], + ) + return True + except Exception as e: + log.error(f"Error generating project files for project {project_id}: {e}") + return False async def get_task_geometry(db: Connection, project_id: int): @@ -642,16 +634,20 @@ async def get_project_features_geojson( settings.S3_ENDPOINT, ) - with requests.get(data_extract_url) as response: - if not response.ok: - msg = f"Download failed for data extract, project ({project_id})" - log.error(msg) - raise HTTPException( - status_code=HTTPStatus.UNPROCESSABLE_ENTITY, - detail=msg, + async with aiohttp.ClientSession() as session: + async with session.get(data_extract_url) as response: + if not response.ok: + msg = f"Download failed for data extract, project ({project_id})" + log.error(msg) + raise HTTPException( + status_code=HTTPStatus.UNPROCESSABLE_ENTITY, + detail=msg, + ) + + log.debug("Converting FlatGeobuf to GeoJSON") + data_extract_geojson = await flatgeobuf_to_featcol( + db, await response.read() ) - log.debug("Converting download flatgeobuf to geojson") - data_extract_geojson = await flatgeobuf_to_featcol(db, response.content) if not data_extract_geojson: msg = f"Failed to convert flatgeobuf --> geojson for project ({project_id})"