Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GTC-3172: Support GADM areas in drivers endpoints #656

Merged
merged 13 commits into from
Mar 24, 2025
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 79 additions & 31 deletions app/crud/geostore.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from sqlalchemy.sql.elements import Label, TextClause

from app.application import db
from app.crud.tasks import create_or_update_task
from app.errors import (
BadAdminSourceException,
BadAdminVersionException,
Expand Down Expand Up @@ -190,22 +191,37 @@ async def get_first_row(sql: Select):
return row


async def get_gadm_geostore(
async def get_gadm_geostore_id(
admin_provider: str,
admin_version: str,
adm_level: int,
country_id: str,
region_id: str | None = None,
subregion_id: str | None = None,
) -> str:
src_table = await get_versioned_dataset(admin_provider, admin_version)
columns_etc: List[Column | Label] = [db.column("gfw_geostore_id"),]
sql: Select = db.select(columns_etc).select_from(src_table)
sql = await add_where_clauses(adm_level, admin_provider, admin_version, country_id, region_id, sql, subregion_id)
row = await get_first_row(sql)
if row is None:
raise RecordNotFoundError(
f"Admin boundary not found in {admin_provider} version {admin_version}"
)

return await row.gfw_geostore_id


async def build_gadm_geostore(
admin_provider: str,
admin_version: str,
adm_level: int,
simplify: float | None,
country_id: str,
region_id: str | None = None,
subregion_id: str | None = None,
) -> AdminGeostoreResponse:
dv: Tuple[str, str] = await admin_params_to_dataset_version(
admin_provider, admin_version
)
dataset, version = dv

src_table: Table = db.table(version)
src_table.schema = dataset
) -> AdminGeostore:
src_table = await get_versioned_dataset(admin_provider, admin_version)

columns_etc: List[Column | Label] = [
db.column("adm_level"),
Expand All @@ -230,16 +246,43 @@ async def get_gadm_geostore(
)
)

sql: Select = db.select(columns_etc).select_from(src_table)

sql = await add_where_clauses(adm_level, admin_provider, admin_version, country_id, region_id, sql, subregion_id)

row = await get_first_row(sql)
if row is None:
raise RecordNotFoundError(
f"Admin boundary not found in {admin_provider} version {admin_version}"
)

if row.geojson is None:
raise GeometryIsNullError(
"GeoJSON is None, try reducing or eliminating simplification."
)

return await form_admin_geostore(
adm_level=adm_level,
admin_version=admin_version,
area=float(row.gfw_area__ha),
bbox=[float(val) for val in row.gfw_bbox],
name=str(row.name),
geojson=json.loads(row.geojson),
geostore_id=str(row.gfw_geostore_id),
level_id=str(row.level_id),
simplify=simplify,
)


async def add_where_clauses(adm_level, admin_provider, admin_version, country_id, region_id, sql, subregion_id):
where_clauses: List[TextClause] = [
db.text("adm_level=:adm_level").bindparams(adm_level=str(adm_level))
]

# gid_0 is just a three-character value, but all more specific ids are
# followed by an underscore (which has to be escaped because normally in
# SQL an underscore is a wildcard) and a revision number (for which we
# use an UN-escaped underscore).
level_id_pattern: str = country_id

if adm_level == 0: # Special-case to avoid slow LIKE
where_clauses.append(
db.text("gid_0=:level_id_pattern").bindparams(
Expand All @@ -264,33 +307,38 @@ async def get_gadm_geostore(
level_id_pattern=level_id_pattern
)
)

sql: Select = db.select(columns_etc).select_from(src_table)

for clause in where_clauses:
sql = sql.where(clause)
return sql

row = await get_first_row(sql)
if row is None:
raise RecordNotFoundError(
f"Admin boundary not found in {admin_provider} version {admin_version}"
)

if row.geojson is None:
raise GeometryIsNullError(
"GeoJSON is None, try reducing or eliminating simplification."
)
async def get_versioned_dataset(admin_provider, admin_version):
dv: Tuple[str, str] = await admin_params_to_dataset_version(
admin_provider, admin_version
)
dataset, version = dv
src_table: Table = db.table(version)
src_table.schema = dataset
return src_table

geostore: AdminGeostore = await form_admin_geostore(
adm_level=adm_level,

async def get_gadm_geostore(
admin_provider: str,
admin_version: str,
adm_level: int,
simplify: float | None,
country_id: str,
region_id: str | None = None,
subregion_id: str | None = None,
) -> AdminGeostoreResponse:
geostore: AdminGeostore = await build_gadm_geostore(
admin_provider=admin_provider,
admin_version=admin_version,
area=float(row.gfw_area__ha),
bbox=[float(val) for val in row.gfw_bbox],
name=str(row.name),
geojson=json.loads(row.geojson),
geostore_id=str(row.gfw_geostore_id),
level_id=str(row.level_id),
adm_level=adm_level,
simplify=simplify,
country_id=country_id,
region_id=region_id,
subregion_id=subregion_id,
)

return AdminGeostoreResponse(data=geostore)
Expand Down
65 changes: 62 additions & 3 deletions app/models/pydantic/datamart.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,68 @@
from enum import Enum
from typing import Dict, Optional
from typing import Dict, Literal, Optional, Union
from uuid import UUID
from abc import ABC, abstractmethod

from pydantic import Field
from pydantic import Field, root_validator, validator

from app.models.pydantic.responses import Response

from .base import StrictBaseModel
from ...crud.geostore import get_gadm_geostore_id


class AreaOfInterest(StrictBaseModel, ABC):
@abstractmethod
async def get_geostore_id(self) -> UUID:
"""Return the unique identifier for the area of interest."""
pass


class GeostoreAreaOfInterest(AreaOfInterest):
type: Literal['geostore'] = 'geostore'
geostore_id:UUID = Field(..., title="Geostore ID")

async def get_geostore_id(self) -> UUID:
return self.geostore_id


class AdminAreaOfInterest(AreaOfInterest):
type: Literal['admin'] = 'admin'
country: str = Field(..., title="ISO Country Code")
region: Optional[str] = Field(None, title="Region")
subregion: Optional[str] = Field(None, title="Subregion")
provider: str = Field('gadm', title="Administrative Boundary Provider")
version: str = Field('4.1', title="Administrative Boundary Version")


async def get_geostore_id(self) -> UUID:
admin_level = sum(1 for field in (self.country, self.region, self.subregion) if field is not None) - 1
geostore_id = await get_gadm_geostore_id(
admin_provider=self.provider,
admin_version=self.version,
adm_level=admin_level,
country_id=self.country,
region_id=self.region,
subregion_id=self.subregion,
)
return UUID(geostore_id)

@root_validator
def check_region_subregion(cls, values):
region = values.get("region")
subregion = values.get("subregion")
if subregion is not None and region is None:
raise ValueError("region must be specified if subregion is provided")
return values

@validator('provider', pre=True, always=True)
def set_provider_default(cls, v):
return v or 'gadm'

@validator('version', pre=True, always=True)
def set_version_default(cls, v):
return v or '4.1'



class AnalysisStatus(str, Enum):
Expand Down Expand Up @@ -43,7 +99,7 @@ class DataMartResourceLinkResponse(Response):


class TreeCoverLossByDriverIn(StrictBaseModel):
geostore_id: UUID
aoi: Union[GeostoreAreaOfInterest, AdminAreaOfInterest] = Field(..., discriminator='type')
canopy_cover: int = 30
dataset_version: Dict[str, str] = {}

Expand Down Expand Up @@ -76,3 +132,6 @@ class Config:

class TreeCoverLossByDriverResponse(Response):
data: TreeCoverLossByDriver



Loading
Loading