Skip to content

HFI Processing updates #4354

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Mar 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ COPY ./wps_shared /wps_shared
# Install dependencies.
RUN poetry install --without dev

RUN poetry run python -m pip install --upgrade pip

RUN poetry run python -m pip install -U setuptools wheel
# Get a python binding for gdal that matches the version of gdal we have installed.
RUN poetry run python -m pip install --no-build-isolation --no-cache-dir --force-reinstall gdal==$(gdal-config --version)
Expand Down
2 changes: 1 addition & 1 deletion api/app/auto_spatial_advisory/critical_hours.py
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@ async def calculate_critical_hours(run_type: RunType, run_datetime: datetime, fo
Entry point for calculating critical hours.

:param run_type: The run type, either forecast or actual.
:param run_datetime: The date and time of the sfms run.
:param run_datetime: The date and time of the sfms run in UTC.
:param for_date: The date critical hours are being calculated for.
"""

Expand Down
9 changes: 5 additions & 4 deletions api/app/auto_spatial_advisory/hfi_filepath.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,19 @@
from wps_shared.utils.time import convert_to_sfms_timezone


def get_pmtiles_filepath(run_date: date, run_type: RunType, filename: str) -> str:
def get_pmtiles_filepath(run_datetime: datetime, run_type: RunType, filename: str) -> str:
"""
Get the file path for both reading and writing the pmtiles from/to the object store.
Example: {bucket}/psu/pmtiles/hfi/actual/[issue/run_date]/hfi[for_date].pmtiles


:param run_date: The date of the run to process. (when was the hfi file created?)
:param run_datetime: The date and time of the run to process. (when was the hfi file created?)
:param run_type: forecast or actual
:param filename: hfi[for_date].pmtiles -> hfi20230821.pmtiles
:return: s3 bucket key for pmtiles file
"""
pmtiles_filepath = os.path.join("psu", "pmtiles", "hfi", run_type.value, run_date.strftime("%Y-%m-%d"), filename)
sfms_run_date = convert_to_sfms_timezone(run_datetime).date()
pmtiles_filepath = os.path.join("psu", "pmtiles", "hfi", run_type.value, sfms_run_date.strftime("%Y-%m-%d"), filename)

return pmtiles_filepath

Expand All @@ -37,7 +38,7 @@ def get_snow_masked_hfi_filepath(run_datetime: datetime, run_type: RunType, file
Example: {bucket}/psu/rasters/hfi/actual/[issue/run_date]/snow_masked_hfi[for_date].tif


:param run_date: The datetime of the run to process. (when was the hfi file created?)
:param run_datetime: The datetime of the run to process. (when was the hfi file created?)
:param run_type: forecast or actual
:param filename: snow_masked_hfi[for_date].tif -> snow_masked_hfi20230821.tif
:return: s3 bucket key for raster file
Expand Down
2 changes: 1 addition & 1 deletion api/app/auto_spatial_advisory/hfi_minimum_wind_speed.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ async def process_hfi_min_wind_speed(run_type: RunType, run_datetime: datetime,
Entry point for calculating minimum wind speed for each advisory threshold

:param run_type: The run type, either forecast or actual.
:param run_datetime: The date and time of the sfms run.
:param run_datetime: The date and time of the sfms run in UTC.
:param for_date: The date being calculated for.
"""
logger.info(f"Calculating minimum wind speed for {run_type} run type on run date: {run_datetime}, for date: {for_date}")
Expand Down
2 changes: 1 addition & 1 deletion api/app/auto_spatial_advisory/hfi_percent_conifer.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ async def process_hfi_percent_conifer(run_type: RunType, run_datetime: datetime,
Entry point for calculating minimum percent conifer for hfi > 4000 (above advisory level)

:param run_type: The run type, either forecast or actual.
:param run_datetime: The date and time of the sfms run.
:param run_datetime: The date and time of the sfms run in UTC.
:param for_date: The date being calculated for.
"""
logger.info(f"Calculating minimum percent conifer for {run_type} run type on run date: {run_datetime}, for date: {for_date}")
Expand Down
2 changes: 1 addition & 1 deletion api/app/auto_spatial_advisory/local/process_stats_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ async def main(for_dates: list[date], run_type: RunType):
run_param = await get_most_recent_run_parameters(session, run_type, for_date)
if run_param:
run_datetime = run_param[0].run_datetime
await process_stats.process_stats(run_type, run_datetime, run_datetime.date(), for_date)
await process_stats.process_stats(run_type, run_datetime, for_date)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cleans up the interface, thanks!

else:
print(f"No run params found for {for_date} - {run_type.value}")

Expand Down
15 changes: 7 additions & 8 deletions api/app/auto_spatial_advisory/nats_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,17 @@ def parse_nats_message(msg: Msg):
Parse the fields from the messages to drive the processing.

:param msg: NATS message
:return: A tuple of run_type, run_date, run_datetime, and for_date. run_date and for_date are the dates in local Vancouver time.
run_datetime is in utc time. This is important as run_datetime and run_date may fall on different dates.
:return: A tuple of run_type, run_datetime, and for_date. for_date is in local Vancouver time.
run_datetime is in utc time.
"""
if msg.subject == sfms_file_subject:
decoded_msg = json.loads(json.loads(msg.data.decode()))
run_type = RunType.from_str(decoded_msg["run_type"])
run_date = datetime.strptime(decoded_msg["run_date"], "%Y-%m-%d").date()
for_date = datetime.strptime(decoded_msg["for_date"], "%Y-%m-%d").date()
# SFMS doesn't give us a timezone, but from the 2022 data it runs in local time
# so we localize it as such then convert it to UTC
run_datetime = get_utc_datetime(datetime.fromisoformat(decoded_msg["create_time"]))
return (run_type, run_date, run_datetime, for_date)
return (run_type, run_datetime, for_date)


async def run():
Expand Down Expand Up @@ -79,10 +78,10 @@ async def closed_cb():
try:
logger.info("Msg received - {}\n".format(msg))
await msg.ack()
run_type, run_date, run_datetime, for_date = parse_nats_message(msg)
logger.info("Awaiting process_hfi({}, {}, {})\n".format(run_type, run_date, for_date))
await process_hfi(run_type, run_date, run_datetime, for_date)
await process_hfi_elevation(run_type, run_date, run_datetime, for_date)
run_type, run_datetime, for_date = parse_nats_message(msg)
logger.info("Awaiting process_hfi({}, {}, {})\n".format(run_type, run_datetime, for_date))
await process_hfi(run_type, run_datetime, for_date)
await process_hfi_elevation(run_type, run_datetime, for_date)
await process_high_hfi_area(run_type, run_datetime, for_date)
await process_fuel_type_hfi_by_shape(run_type, run_datetime, for_date)
await process_hfi_min_wind_speed(run_type, run_datetime, for_date)
Expand Down
6 changes: 3 additions & 3 deletions api/app/auto_spatial_advisory/process_elevation_hfi.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@
logger = logging.getLogger(__name__)


async def process_hfi_elevation(run_type: RunType, run_date: date, run_datetime: datetime, for_date: date):
async def process_hfi_elevation(run_type: RunType, run_datetime: datetime, for_date: date):
"""Create a new elevation based hfi analysis records for the given date.

:param run_type: The type of run to process. (is it a forecast or actual run?)
:param run_date: The date of the run to process. (when was the hfi file created?)
:param run_datetime: The date and time of the sfms run in UTC. (when was the hfi file created?)
:param for_date: The date of the hfi to process. (when is the hfi for?)
"""

logger.info("Processing HFI elevation %s for run date: %s, for date: %s", run_type, run_date, for_date)
logger.info("Processing HFI elevation %s for run date: %s, for date: %s", run_type, run_datetime, for_date)
perf_start = perf_counter()

await process_elevation_tpi(run_type, run_datetime, for_date)
Expand Down
12 changes: 6 additions & 6 deletions api/app/auto_spatial_advisory/process_hfi.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from wps_shared.db.models.auto_spatial_advisory import ClassifiedHfi, HfiClassificationThreshold, RunTypeEnum
from wps_shared.db.database import get_async_read_session_scope, get_async_write_session_scope
from wps_shared.db.crud.auto_spatial_advisory import save_hfi, get_hfi_classification_threshold, HfiClassificationThresholdEnum, save_run_parameters, get_run_parameters_id
from wps_shared.db.crud.snow import get_last_processed_snow_by_processed_date
from wps_shared.db.crud.snow import get_most_recent_processed_snow_by_date
from wps_shared.db.models.snow import SnowSourceEnum
from app.auto_spatial_advisory.classify_hfi import classify_hfi
from wps_shared.run_type import RunType
Expand Down Expand Up @@ -78,11 +78,11 @@ def create_model_object(
)


async def process_hfi(run_type: RunType, run_date: date, run_datetime: datetime, for_date: date):
async def process_hfi(run_type: RunType, run_datetime: datetime, for_date: date):
"""Create a new hfi record for the given date.

:param run_type: The type of run to process. (is it a forecast or actual run?)
:param run_date: The date of the run to process. (when was the hfi file created?)
:param run_datetime: The date and time of the sfms run in UTC. (when was the hfi file created?)
:param for_date: The date of the hfi to process. (when is the hfi for?)
"""

Expand All @@ -92,9 +92,9 @@ async def process_hfi(run_type: RunType, run_date: date, run_datetime: datetime,
if existing_run is not None:
logger.info((f"Skipping run, already processed for run_type:{run_type}" f"run_datetime:{run_datetime}," f"for_date:{for_date}"))
return
last_processed_snow = await get_last_processed_snow_by_processed_date(session, run_datetime, SnowSourceEnum.viirs)
last_processed_snow = await get_most_recent_processed_snow_by_date(session, run_datetime, SnowSourceEnum.viirs)

logger.info("Processing HFI %s for run date: %s, for date: %s", run_type, run_date, for_date)
logger.info("Processing HFI %s for run date: %s, for date: %s", run_type, run_datetime, for_date)
perf_start = perf_counter()

hfi_key = get_hfi_s3_key(run_type, run_datetime, for_date)
Expand Down Expand Up @@ -131,7 +131,7 @@ async def process_hfi(run_type: RunType, run_date: date, run_datetime: datetime,
logger.info(f"Writing pmtiles -- {pmtiles_filename}")
tippecanoe_wrapper(temp_geojson, temp_pmtiles_filepath, min_zoom=HFI_PMTILES_MIN_ZOOM, max_zoom=HFI_PMTILES_MAX_ZOOM)

key = get_pmtiles_filepath(run_date, run_type, pmtiles_filename)
key = get_pmtiles_filepath(run_datetime, run_type, pmtiles_filename)
logger.info(f"Uploading file {pmtiles_filename} to {key}")

await client.put_object(
Expand Down
28 changes: 11 additions & 17 deletions api/app/auto_spatial_advisory/process_high_hfi_area.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
""" Code relating to processing high HFI area per fire zone
"""

"""Code relating to processing high HFI area per fire zone"""

import logging
from datetime import date, datetime
Expand All @@ -17,41 +15,37 @@


async def write_high_hfi_area(session: AsyncSession, row: any, run_parameters_id: int):
high_hfi_area = HighHfiArea(advisory_shape_id=row.shape_id,
run_parameters=run_parameters_id,
area=row.area,
threshold=row.threshold)
high_hfi_area = HighHfiArea(advisory_shape_id=row.shape_id, run_parameters=run_parameters_id, area=row.area, threshold=row.threshold)
await save_high_hfi_area(session, high_hfi_area)


async def process_high_hfi_area(run_type: RunType, run_datetime: datetime, for_date: date):
""" Create new high hfi area analysis records for the given date.
"""Create new high hfi area analysis records for the given date.

:param run_type: The type of run to process. (is it a forecast or actual run?)
:param run_date: The date of the run to process. (when was the hfi file created?)
:param run_datetime: The date and time of the sfms run in UTC. (when was the hfi file created?)
:param for_date: The date of the hfi to process. (when is the hfi for?)
"""
logger.info('Processing high HFI area %s for run date: %s, for date: %s', run_type, run_datetime, for_date)
logger.info("Processing high HFI area %s for run date: %s, for date: %s", run_type, run_datetime, for_date)
perf_start = perf_counter()

async with get_async_write_session_scope() as session:
run_parameters_id = await get_run_parameters_id(session, run_type, run_datetime, for_date)

stmt = select(HighHfiArea)\
.where(HighHfiArea.run_parameters == run_parameters_id)

stmt = select(HighHfiArea).where(HighHfiArea.run_parameters == run_parameters_id)

exists = (await session.execute(stmt)).scalars().first() is not None

if (not exists):
logger.info('Getting high HFI area per zone...')
if not exists:
logger.info("Getting high HFI area per zone...")
high_hfi_areas = await calculate_high_hfi_areas(session, run_type, run_datetime, for_date)

logger.info('Writing high HFI areas...')
logger.info("Writing high HFI areas...")
for row in high_hfi_areas:
await write_high_hfi_area(session, row, run_parameters_id)
else:
logger.info("High hfi area already processed")

perf_end = perf_counter()
delta = perf_end - perf_start
logger.info('%f delta count before and after processing high HFI area', delta)
logger.info("%f delta count before and after processing high HFI area", delta)
6 changes: 3 additions & 3 deletions api/app/auto_spatial_advisory/process_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
from datetime import date, datetime


async def process_stats(run_type: RunType, run_datetime: datetime, run_date: date, for_date: date):
await process_hfi(run_type, run_date, run_datetime, for_date)
await process_hfi_elevation(run_type, run_date, run_datetime, for_date)
async def process_stats(run_type: RunType, run_datetime: datetime, for_date: date):
await process_hfi(run_type, run_datetime, for_date)
await process_hfi_elevation(run_type, run_datetime, for_date)
await process_high_hfi_area(run_type, run_datetime, for_date)
await process_fuel_type_hfi_by_shape(run_type, run_datetime, for_date)
await process_hfi_min_wind_speed(run_type, run_datetime, for_date)
Expand Down
16 changes: 7 additions & 9 deletions wps_shared/wps_shared/db/crud/snow.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
""" CRUD operations relating to processing snow coverage
"""
from datetime import datetime
"""CRUD operations relating to processing snow coverage"""

from datetime import date, datetime
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from wps_shared.db.models.snow import ProcessedSnow, SnowSourceEnum


async def save_processed_snow(session: AsyncSession, processed_snow: ProcessedSnow):
""" Add a new ProcessedSnow record.
"""Add a new ProcessedSnow record.

:param processed_snow: The record to be saved.List of actual weather values
:type processed_snow: ProcessedSnow
Expand All @@ -27,6 +27,7 @@ async def get_last_processed_snow_by_processed_date(session: AsyncSession, proce
result = await session.execute(stmt)
return result.first()


async def get_most_recent_processed_snow_by_date(session: AsyncSession, target_date: datetime, snow_source: SnowSourceEnum = SnowSourceEnum.viirs) -> ProcessedSnow:
"""Retrieve the most recent record prior or equal to the provided date.

Expand All @@ -37,9 +38,6 @@ async def get_most_recent_processed_snow_by_date(session: AsyncSession, target_d
:return: A record containing the last date for which snow data from the specified source was successfully processed.
:rtype: ProcessedSnow
"""
stmt = select(ProcessedSnow)\
.where(ProcessedSnow.snow_source == snow_source)\
.where(ProcessedSnow.for_date <= target_date)\
.order_by(ProcessedSnow.for_date.desc())
stmt = select(ProcessedSnow).where(ProcessedSnow.snow_source == snow_source).where(ProcessedSnow.for_date <= target_date).order_by(ProcessedSnow.for_date.desc())
result = await session.execute(stmt)
return result.first()
return result.first()
Loading