Skip to content

Commit

Permalink
WIP: Add new script to link COGs from our own s3 bucked for our own t…
Browse files Browse the repository at this point in the history
…iler or to direcly link an external tiler
  • Loading branch information
BielStela committed Mar 31, 2023
1 parent 33dbea8 commit 08da81b
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 18 deletions.
7 changes: 5 additions & 2 deletions data/h3_data_importer/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ indicators:
make convert-deforestation
make convert-forestGHG
make convert-satDeforestation
contextual-layers:
make convert-hdi-contextual
contextual-layers: convert-hdi-contextual convert-blue-water-contextual


############################################
# MapSPAM crop production and harvest area #
Expand Down Expand Up @@ -516,5 +516,8 @@ download-hdi-contextual:
convert-hdi-contextual: download-hdi-contextual
python csv_to_h3_table.py $(WORKDIR_HDI)/IHDI_HDR2020_040722.csv h3_grid_hdi_global hdi_2019 2019 iso3

convert-blue-water-contextual:
# TODO: check default_params are the same that are currently hardcoded in the db
python cog_to_contextual_layer_linker.py --cog bluewater.tif --name bluewater_footprint --category "Environmental datasets" --tiler_param colormap=viridis --tp rescale=-18,850
clean:
rm -rf data/*
81 changes: 81 additions & 0 deletions data/h3_data_importer/cog_to_contextual_layer_linker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
import json
import logging
import os

import boto3
import click
import psycopg
from botocore.exceptions import ClientError

from utils import get_connection_info, get_contextual_layer_category_enum, get_metadata

logging.basicConfig(level=logging.INFO)
log = logging.getLogger("cog_to_contextual_layer_linker")


def check_file_exists_in_s3(cog_name: str):
"""Checks that cog_name file exists in the S3 bucket."""
aws_session = boto3.session.Session(
aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"), aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY")
)
s3 = aws_session.client("s3")
# TODO: get bucket name from env? hardcoded? Alex help here 🙏
bucket = os.getenv("S3_BUCKET_NAME")
try:
s3.head_object(Bucket=bucket, Key=cog_name)
except ClientError as e:
log.error(f"{cog_name} not found in S3 bucket")
raise e


def insert_in_contextuals_table(name, category, metadata, tiler_url, default_params=None):
with psycopg.connect(get_connection_info()) as con:
if category not in get_contextual_layer_category_enum(con):
log.error(f"Category '{category}' not supported.")
raise ValueError(f"Category '{category}' not supported.")

with con.cursor() as cur:
cur.execute('DELETE FROM "contextual_layer" WHERE "name" = %s', (name,))
cur.execute(
'INSERT INTO "contextual_layer" ("name", "metadata", "category", "tilerUrl", "defaultTilerParams")'
"VALUES (%s, %s, %s, %s, %s)",
(name, metadata, category, tiler_url, default_params),
)


@click.command()
@click.option("--cog", type=str, help="Name of the cog in the S3 bucket or URL to external cog.", required=True)
@click.option(
"--name",
type=str,
required=True,
help="Name of the contextual layer. Must be unique and the metadata json file with the same name and suffixed "
"_metadata must exist.",
)
@click.option("--category", type=str, required=True, help="Category of the contextual layer.")
@click.option(
"--tiler_param", "--tp", type=str, multiple=True, help="Tiler default parameters in the form of key=value."
)
def main(cog: str, name: str, category: str, tiler_param: list):
"""Link a COG to a contextual layer in the database to be used by our own tiler or simply and external tiler url."""
# List of default tiler params. We will use this to create the json field in the contextual_layer table
params = dict(param.split("=") for param in tiler_param)
if cog.startswith("s3://") or cog.startswith("https://"):
# External tiler url. We don't need to check if the file exists in S3, nor we need params for the tiler
log.info(f"Linking external tiler to contextual_layer table...")
tiler_url = cog
insert_in_contextuals_table(name, category, json.dumps(get_metadata(name)), tiler_url)
else:
log.info(f"Linking cog {cog} to contextual_layer table...")
# TODO: is this the correct url for all cogs we will have in our own S3?
tiler_url = "/tiler/cog/tiles/{z}/{x}/{y}"
# TODO: This always fails :( Alex help here 🙏. but it is really needed?
# check_file_exists_in_s3(cog)
# TODO: is this the correct default params field for the cog name?
params["cog_name"] = cog

insert_in_contextuals_table(name, category, json.dumps(get_metadata(name)), tiler_url, json.dumps(params))


if __name__ == "__main__":
main()
11 changes: 2 additions & 9 deletions data/h3_data_importer/raster_folder_to_h3_table.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import logging
import multiprocessing
import os
from functools import partial
from io import StringIO
from pathlib import Path
Expand All @@ -13,7 +12,7 @@
from psycopg import sql
from rasterio import DatasetReader

from utils import DTYPES_TO_PG, slugify, snakify
from utils import DTYPES_TO_PG, slugify, snakify, get_connection_info

logging.basicConfig(level=logging.INFO)
log = logging.getLogger("raster_to_h3")
Expand Down Expand Up @@ -167,13 +166,7 @@ def to_the_db(df: pd.DataFrame, table: str, data_type: str, dataset: str, year:
This way if we need to separate db stuff from actual data processing it can be done easily
"""
conn_info = psycopg.conninfo.make_conninfo(
host=os.getenv("API_POSTGRES_HOST"),
port=os.getenv("API_POSTGRES_PORT"),
user=os.getenv("API_POSTGRES_USERNAME"),
password=os.getenv("API_POSTGRES_PASSWORD"),
)
with psycopg.connect(conn_info, autocommit=True) as conn:
with psycopg.connect(get_connection_info(), autocommit=True) as conn:
create_h3_grid_table(conn, table, df)
write_data_to_h3_grid_table(conn, table, df)
clean_before_insert(conn, table)
Expand Down
25 changes: 18 additions & 7 deletions data/h3_data_importer/utils.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import json
import logging
import os
from pathlib import Path
from re import sub

import jsonschema
import psycopg
from jsonschema import ValidationError
from psycopg2.extensions import connection

log = logging.getLogger(__name__) # here we can use __name__ because it is an imported module


DTYPES_TO_PG = {
"boolean": "bool",
"uint8": "smallint",
Expand Down Expand Up @@ -37,10 +38,10 @@ def snakify(s):

def get_contextual_layer_category_enum(conn: connection) -> set:
"""Get the enum of contextual layer categories"""
with conn:
with conn.cursor() as cursor:
cursor.execute("SELECT unnest(enum_range(NULL::contextual_layer_category));")
values = set(r[0] for r in cursor.fetchall())

with conn.cursor() as cursor:
cursor.execute("SELECT unnest(enum_range(NULL::contextual_layer_category));")
values = set(r[0] for r in cursor.fetchall())
return values


Expand Down Expand Up @@ -97,9 +98,9 @@ def get_metadata(table: str) -> dict:
metadata_path = metadata_base_path / f"{table}_metadata.json"

if not metadata_path.exists():
log.error(f"No metadata found for {table}")
log.error(f"No metadata found for {table} with the name {metadata_path.name}")
# todo: should we raise exception or return empty metadata and keep going?
raise FileNotFoundError(f"Metadata file for {table} not found")
raise FileNotFoundError(f"Metadata file {metadata_path.name} not found")

with open(metadata_path, "r") as f:
metadata = json.load(f)
Expand All @@ -126,3 +127,13 @@ def link_to_indicator_table(connection: connection, indicator_code: str, h3_colu
log.info(f"Updated indicatorId '{indicator_id}' in h3_data for {h3_column_name}")
else:
log.error(f"Indicator with name code {indicator_code} does not exist")


def get_connection_info() -> str:
"""Returns a connection info string for psycopg based on env variables"""
return psycopg.conninfo.make_conninfo(
host=os.getenv("API_POSTGRES_HOST"),
port=os.getenv("API_POSTGRES_PORT"),
user=os.getenv("API_POSTGRES_USERNAME"),
password=os.getenv("API_POSTGRES_PASSWORD"),
)

0 comments on commit 08da81b

Please sign in to comment.