Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor splitting functions to more manageable functions #81

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
155 changes: 132 additions & 23 deletions fmtm_splitter/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@
"""DB models for temporary tables in splitBySQL."""

import logging
from typing import Union
from typing import Any, Dict, List, Optional, Union

import psycopg2
from geojson import FeatureCollection
from psycopg2.extensions import register_adapter
from psycopg2.extras import Json, register_uuid
from shapely.geometry import Polygon
from shapely.geometry import Polygon, shape

from fmtm_splitter.osm_extract import generate_osm_extract
from fmtm_splitter.parsers import json_str_to_dict

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -85,17 +89,26 @@ def create_tables(conn: psycopg2.extensions.connection):

CREATE TABLE ways_poly (
id SERIAL PRIMARY KEY,
osm_id VARCHAR,
geom GEOMETRY(GEOMETRY, 4326),
tags JSONB
osm_id VARCHAR NULL,
geom GEOMETRY(GEOMETRY, 4326) NOT NULL,
tags JSONB NULL
);

CREATE TABLE ways_line (
id SERIAL PRIMARY KEY,
osm_id VARCHAR,
geom GEOMETRY(GEOMETRY, 4326),
tags JSONB
osm_id VARCHAR NULL,
geom GEOMETRY(GEOMETRY, 4326) NOT NULL,
tags JSONB NULL
);

-- Create indexes for geospatial and query performance
CREATE INDEX idx_project_aoi_geom ON project_aoi USING GIST(geom);

CREATE INDEX idx_ways_poly_geom ON ways_poly USING GIST(geom);
CREATE INDEX idx_ways_poly_tags ON ways_poly USING GIN(tags);

CREATE INDEX idx_ways_line_geom ON ways_line USING GIST(geom);
CREATE INDEX idx_ways_line_tags ON ways_line USING GIN(tags);
"""
log.debug(
"Running tables create command for 'project_aoi', 'ways_poly', 'ways_line'"
Expand Down Expand Up @@ -142,31 +155,127 @@ def aoi_to_postgis(conn: psycopg2.extensions.connection, geom: Polygon) -> None:
"""
log.debug("Adding AOI to project_aoi table")

sql = """
sql_insert = """
INSERT INTO project_aoi (geom)
VALUES (ST_SetSRID(CAST(%s AS GEOMETRY), 4326));
VALUES (ST_SetSRID(CAST(%s AS GEOMETRY), 4326))
RETURNING id, geom;
"""

cur = conn.cursor()
cur.execute(sql, (geom.wkb_hex,))
cur.close()
try:
with conn.cursor() as cur:
cur.execute(sql_insert, (geom.wkb_hex,))
cur.close()

except Exception as e:
log.error(f"Error during database operations: {e}")
conn.rollback() # Rollback in case of error


def insert_geom(cur: psycopg2.extensions.cursor, table_name: str, **kwargs) -> None:
"""Insert an OSM geometry into the database.
def insert_geom(
conn: psycopg2.extensions.connection, table_name: str, data: List[Dict[str, Any]]
) -> None:
"""Insert geometries into the database.

Does not commit the values automatically.
Such as:
- LineStrings
- Polygons

Handles both cases: with or without tags and osm_id.

Args:
cur (psycopg2.extensions.cursor): The PostgreSQL cursor.
conn (psycopg2.extensions.connection): The PostgreSQL connection.
table_name (str): The name of the table to insert data into.
**kwargs: Keyword arguments representing the values to be inserted.
data(List[dict]): Values of features to be inserted; geom, tags.

Returns:
None
"""
query = (
f"INSERT INTO {table_name}(geom,osm_id,tags) "
"VALUES (%(geom)s,%(osm_id)s,%(tags)s)"
)
cur.execute(query, kwargs)
placeholders = ", ".join(data[0].keys())
values = [tuple(record.values()) for record in data]

sql_query = f"""
INSERT INTO {table_name} ({placeholders})
VALUES ({", ".join(["%s"] * len(data[0]))})
"""
try:
with conn.cursor() as cursor:
cursor.executemany(sql_query, values)
conn.commit()
except Exception as e:
log.error(f"Error executing query: {e}")
conn.rollback() # Rollback transaction on error


def insert_features_to_db(
features: List[Dict[str, Any]],
conn: psycopg2.extensions.connection,
extract_type: str,
) -> None:
"""Insert features into the database with optimized performance.

Args:
features: List of features containing geometry and properties.
conn: Database connection object.
extract_type: Type of data extraction ("custom", "lines", or "extracts").
"""
ways_poly_batch = []
ways_line_batch = []

for feature in features:
geom = shape(feature["geometry"]).wkb_hex
properties = feature.get("properties", {})
tags = properties.get("tags", properties)

if tags:
tags = json_str_to_dict(tags).get("tags", json_str_to_dict(tags))

osm_id = properties.get("osm_id")

# Process based on extract_type
if extract_type == "custom":
ways_poly_batch.append({"geom": geom})
elif extract_type == "lines":
if any(key in tags for key in ["highway", "waterway", "railway"]):
ways_line_batch.append({"osm_id": osm_id, "geom": geom, "tags": tags})
elif extract_type == "extracts":
if tags.get("building") == "yes":
ways_poly_batch.append({"osm_id": osm_id, "geom": geom, "tags": tags})
elif any(key in tags for key in ["highway", "waterway", "railway"]):
ways_line_batch.append({"osm_id": osm_id, "geom": geom, "tags": tags})

# Perform batch inserts
if ways_poly_batch:
insert_geom(conn, "ways_poly", ways_poly_batch)
if ways_line_batch:
insert_geom(conn, "ways_line", ways_line_batch)


def process_features_for_db(
aoi_featcol: FeatureCollection,
conn: psycopg2.extensions.connection,
custom_features: Optional[List[Dict]] = None,
) -> None:
"""Process custom features or OSM extracts and insert them into the database."""
if custom_features:
insert_features_to_db(custom_features["features"], conn, "custom")

extract_lines = generate_osm_extract(aoi_featcol, "lines")
if extract_lines:
insert_features_to_db(extract_lines["features"], conn, "lines")
else:
extract_buildings = generate_osm_extract(aoi_featcol, "extracts")
insert_features_to_db(extract_buildings["features"], conn, "extracts")


def setup_lines_view(conn, geometry):
"""Create a database view for line features intersecting the AOI."""
view_sql = """
DROP VIEW IF EXISTS lines_view;
CREATE VIEW lines_view AS
SELECT tags, geom
FROM ways_line
WHERE ST_Intersects(ST_SetSRID(CAST(%s AS GEOMETRY), 4326), geom)
"""
with conn.cursor() as cur:
cur.execute(view_sql, (geometry,))
cur.close()
31 changes: 13 additions & 18 deletions fmtm_splitter/fmtm_algorithm.sql
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,9 @@ CREATE TABLE buildings AS (
FROM ways_poly AS b, polygonsnocount AS polys
WHERE
ST_INTERSECTS(polys.geom, ST_CENTROID(b.geom))
AND b.tags ->> 'building' IS NOT NULL
);


-- ALTER TABLE buildings ADD PRIMARY KEY(osm_id);


-- Properly register geometry column (makes QGIS happy)
SELECT POPULATE_GEOMETRY_COLUMNS('public.buildings'::regclass);
-- Add a spatial index (vastly improves performance for a lot of operations)
Expand Down Expand Up @@ -187,7 +183,7 @@ DROP TABLE polygonsnocount;
-- ON ST_TOUCHES(p.geom, pf.geom)
-- -- But eliminate those whose intersection is a point, because
-- -- polygons that only touch at a corner shouldn't be merged
-- AND ST_GEOMETRYTYPE(ST_INTERSECTION(p.geom, pf.geom)) != 'ST_Point'
-- AND ST_GEOMETRYTYPE(ST_INTERSECTION(p.geom, pf.geom)) != 'ST_Point'
-- -- Sort first by polyid of the low-feature-count polygons
-- -- Then by descending featurecount and area of the
-- -- high-feature-count neighbors (area is in case of equal
Expand All @@ -211,19 +207,19 @@ DROP TABLE IF EXISTS clusteredbuildings;
CREATE TABLE clusteredbuildings AS (
WITH splitpolygonswithcontents AS (
SELECT *
FROM splitpolygons AS sp
WHERE sp.numfeatures > 0
FROM splitpolygons
WHERE numfeatures > 0
),

-- Add the count of features in the splitpolygon each building belongs to
-- to the buildings table; sets us up to be able to run the clustering.
buildingswithcount AS (
SELECT
b.*,
p.numfeatures
sp.numfeatures
FROM buildings AS b
LEFT JOIN splitpolygons AS p
ON b.polyid = p.polyid
LEFT JOIN splitpolygonswithcontents AS sp
ON b.polyid = sp.polyid
),

-- Cluster the buildings within each splitpolygon. The second term in the
Expand All @@ -234,20 +230,20 @@ CREATE TABLE clusteredbuildings AS (
-- TODO: This should certainly not be a hardcoded, the number of features
-- per cluster should come from a project configuration table
buildingstocluster AS (
SELECT * FROM buildingswithcount AS bc
WHERE bc.numfeatures > 0
SELECT * FROM buildingswithcount
WHERE numfeatures > 0
),

clusteredbuildingsnocombineduid AS (
SELECT
*,
ST_CLUSTERKMEANS(
b.geom,
CAST((b.numfeatures / %(num_buildings)s) + 1 AS integer)
geom,
CAST((numfeatures / %(num_buildings)s) + 1 AS integer)
)
OVER (PARTITION BY b.polyid)
OVER (PARTITION BY polyid)
AS cid
FROM buildingstocluster AS b
FROM buildingstocluster
),

-- uid combining the id of the outer splitpolygon and inner cluster
Expand All @@ -271,14 +267,13 @@ USING gist (geom);
DROP TABLE IF EXISTS dumpedpoints;
CREATE TABLE dumpedpoints AS (
SELECT
cb.osm_id,
cb.polyid,
cb.cid,
cb.clusteruid,
-- POSSIBLE BUG: PostGIS' Voronoi implementation seems to panic
-- with segments less than 0.00004 degrees.
-- Should probably use geography instead of geometry
(ST_DUMPPOINTS(ST_SEGMENTIZE(cb.geom, 0.00004))).geom
(ST_DUMPPOINTS(ST_SEGMENTIZE(cb.geom, 0.00004))).geom AS geom
FROM clusteredbuildings AS cb
);
SELECT POPULATE_GEOMETRY_COLUMNS('public.dumpedpoints'::regclass);
Expand Down
75 changes: 75 additions & 0 deletions fmtm_splitter/osm_extract.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#!/bin/python3

# Copyright (c) 2022 Humanitarian OpenStreetMap Team
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# FMTM-Splitter is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with FMTM-Splitter. If not, see <https:#www.gnu.org/licenses/>.
#
"""Class and helper methods for task splitting."""

import logging
from io import BytesIO

from geojson import FeatureCollection
from osm_rawdata.postgres import PostgresClient

# Instantiate logger
log = logging.getLogger(__name__)


def generate_osm_extract(
aoi_featcol: FeatureCollection, extract_type: str
) -> FeatureCollection:
"""Generate OSM extract based on the specified type from AOI FeatureCollection."""
try:
config_data_map = {
"extracts": """
select:
from:
- nodes
- ways_poly
- ways_line
where:
tags:
- building: not null
- highway: not null
- waterway: not null
- railway: not null
- aeroway: not null
""",
"lines": """
select:
from:
- nodes
- ways_line
where:
tags:
- highway: not null
- waterway: not null
- railway: not null
- aeroway: not null
""",
}
config_data = config_data_map.get(extract_type)
if not config_data:
raise ValueError(f"Invalid extract type: {extract_type}")

config_bytes = BytesIO(config_data.encode())
pg = PostgresClient("underpass", config_bytes)
return pg.execQuery(
aoi_featcol,
extra_params={"fileName": "fmtm_splitter", "useStWithin": False},
)
except Exception as e:
log.error(f"Error during OSM extract generation: {e}")
raise RuntimeError(f"Failed to generate OSM extract: {e}") from e
Loading
Loading