diff --git a/Containerfile b/Containerfile new file mode 100644 index 0000000..ab06558 --- /dev/null +++ b/Containerfile @@ -0,0 +1,31 @@ +FROM python:3.12-slim + +# Checkout and install dagster libraries needed to run the gRPC server by exposing +# your code location to dagster-webserver and dagster-daemon, and loading the +# DagsterInstance. + +RUN pip install \ + dagster \ + dagster-postgres \ + dagster-docker + +# Set $DAGSTER_HOME and copy dagster instance there +ENV DAGSTER_HOME=/opt/dagster/dagster_home + +RUN mkdir -p $DAGSTER_HOME +COPY dagster.yaml $DAGSTER_HOME + + +# Add repository code +WORKDIR /opt/dagster/app +COPY workspace.yaml /opt/dagster/app +COPY local_archives /opt/dagster/app +COPY cloud_archives /opt/dagster/app + +# Run dagster gRPC server on port 4000 +EXPOSE 4000 + +# Using CMD rather than ENTRYPOINT allows the command to be overridden in +# run launchers or executors to run other commands using this image +CMD ["dagster", "api", "grpc", "-h", "0.0.0.0", "-p", "4000"] + diff --git a/cloud_archives/nwp/__init__.py b/cloud_archives/nwp/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/cloud_archives/nwp/icon/__init__.py b/cloud_archives/nwp/icon/__init__.py deleted file mode 100644 index b7eb142..0000000 --- a/cloud_archives/nwp/icon/__init__.py +++ /dev/null @@ -1,22 +0,0 @@ -import dagster as dg - -from . import icon_eu, icon_global - -global_assets = dg.load_assets_from_modules( - modules=[icon_global], - group_name="icon_global", -) - -eu_assets = dg.load_assets_from_modules( - modules=[icon_eu], - group_name="icon_eu", -) - -all_assets: list[dg.AssetsDefinition] = [*global_assets, *eu_assets] - -all_jobs: list[dg.JobDefinition] = [ - icon_global.archive_icon_global_sl_job, - icon_global.archive_icon_global_ml_job, - icon_eu.archive_icon_europe_sl_job, - icon_eu.archive_icon_europe_ml_job, -] diff --git a/cloud_archives/nwp/icon/_ops.py b/cloud_archives/nwp/icon/_ops.py deleted file mode 100644 index 3905432..0000000 --- a/cloud_archives/nwp/icon/_ops.py +++ /dev/null @@ -1,69 +0,0 @@ -import dagster as dg - -from cloud_archives.ops.generic import ( - AssetMaterializationConfig, - log_asset_materialization, - raise_exception, -) -from cloud_archives.ops.huggingface import ( - HFFileConfig, - get_hf_zarr_file_metadata, -) -from cloud_archives.ops.kbatch import ( - NWPConsumerConfig, - define_kbatch_consumer_job, - kbatch_consumer_graph, -) - - -def create_kbatch_huggingface_graph_config( - nwp_config: NWPConsumerConfig, - hf_config: HFFileConfig, - am_config: AssetMaterializationConfig, -) -> dg.RunConfig: - """Mapping from Config to RunConfig for the corresponding graph. - - Args: - nwp_config: Configuration for the nwp consumer. - hf_config: Configuration for huggingface. - am_config: Configuration for asset materialisation. - - Returns: - The RunConfig for the graph. - """ - return dg.RunConfig( - ops={ - kbatch_consumer_graph.name: { - "ops": {define_kbatch_consumer_job.name: nwp_config}, - }, - get_hf_zarr_file_metadata.name: hf_config, - get_hf_zarr_file_metadata.name + "_2": hf_config, - log_asset_materialization.name: am_config, - log_asset_materialization.name + "_2": am_config, - }, - ) - - -@dg.graph -def kbatch_huggingface_graph() -> dict[str, dg.MetadataValue]: - """Op graph for archiving to huggingface using nwp-consumer in kbatch. - - Note: Some of the ops within the graphs require the defining of - run configuration. - - Returns: - The file metadata for the zarr file that was archived. - """ - # First check to see if the file in question already exists - file_metadata, no_file_at_start = get_hf_zarr_file_metadata() - # If the file exists, log the materialization - log_asset_materialization(file_metadata) - # If the file does not exist, create a kbatch job to archive it - job_name = kbatch_consumer_graph(no_file_at_start) - file_metadata, no_file_after_job = get_hf_zarr_file_metadata(job_name) - # Now the file should exist, so log the materialization - log_asset_materialization(file_metadata) - # Raise an exception if it doesn't exist at this point - raise_exception(no_file_after_job) - - return file_metadata diff --git a/cloud_archives/nwp/icon/icon_eu.py b/cloud_archives/nwp/icon/icon_eu.py deleted file mode 100644 index 928ade6..0000000 --- a/cloud_archives/nwp/icon/icon_eu.py +++ /dev/null @@ -1,80 +0,0 @@ -"""Pipeline for the processing of eu ICON data.""" -import os - -import dagster as dg - -from cloud_archives.ops.huggingface import ( - HFFileConfig, -) -from cloud_archives.ops.kbatch import ( - NWPConsumerConfig, -) - -from ._ops import ( - AssetMaterializationConfig, - create_kbatch_huggingface_graph_config, - kbatch_huggingface_graph, -) - -# Define the ICON europe zarr archive as a source asset -icon_europe_zarr_archive = dg.SourceAsset( - key=["nwp", "icon", "europe", "zarr_archive"], - partitions_def=dg.TimeWindowPartitionsDefinition( - fmt="%Y-%m-%d|%H:%M", - start="2024-01-31|00:00", - cron_schedule="0 0/6 * * *", - ), -) - -# Define the job to materialize the ICON europe zarr archive -archive_icon_europe_sl_job = kbatch_huggingface_graph.to_job( - name="archive_icon_europe_sl_job", - partitions_def=icon_europe_zarr_archive.partitions_def, - config=create_kbatch_huggingface_graph_config( - nwp_config=NWPConsumerConfig( - source="icon", - sink="huggingface", - docker_tag="refactor-service-loop", - zdir="single-level/data", - env={ - "ICON_MODEL": "europe", - "ICON_PARAMETER_GROUP": "single-level", - "HUGGINGFACE_TOKEN": os.getenv("HUGGINGFACE_TOKEN", default="not-set"), - "HUGGINGFACE_REPO_ID": "sol-ocf/test-dwd-europe", - }, - ), - hf_config=HFFileConfig(hf_repo_id="sol-ocf/test-dwd-europe"), - am_config=AssetMaterializationConfig( - asset_key=list(icon_europe_zarr_archive.key.path), - asset_description="Europe ICON Zarr Archive stored in huggingface.", - ), - ), -) - - -archive_icon_europe_ml_job = kbatch_huggingface_graph.to_job( - name="archive_icon_europe_ml_job", - partitions_def=icon_europe_zarr_archive.partitions_def, - config=create_kbatch_huggingface_graph_config( - nwp_config=NWPConsumerConfig( - source="icon", - sink="huggingface", - docker_tag="main", - zdir="multi-level/data", - env={ - "ICON_MODEL": "europe", - "ICON_PARAMETER_GROUP": "multi-level", - "HUGGINGFACE_TOKEN": os.getenv("HUGGINGFACE_TOKEN", default="not-set"), - "HUGGINGFACE_REPO_ID": "sol-ocf/test-dwd-europe", - }, - ), - hf_config=HFFileConfig(hf_repo_id="sol-ocf/test-dwd-europe"), - am_config=AssetMaterializationConfig( - asset_key=list(icon_europe_zarr_archive.key.path), - asset_description="Europe ICON Zarr Archive stored in huggingface.", - ), - ), -) - - - diff --git a/cloud_archives/nwp/icon/icon_global.py b/cloud_archives/nwp/icon/icon_global.py deleted file mode 100644 index 6bd1313..0000000 --- a/cloud_archives/nwp/icon/icon_global.py +++ /dev/null @@ -1,76 +0,0 @@ -"""Pipeline for the processing of global ICON data.""" -import os - -import dagster as dg - -from cloud_archives.ops.huggingface import ( - HFFileConfig, -) -from cloud_archives.ops.kbatch import ( - NWPConsumerConfig, -) - -from ._ops import ( - AssetMaterializationConfig, - create_kbatch_huggingface_graph_config, - kbatch_huggingface_graph, -) - -# Define the ICON global zarr archive as a source asset -icon_global_zarr_archive = dg.SourceAsset( - key=["nwp", "icon", "global", "zarr_archive"], - partitions_def=dg.TimeWindowPartitionsDefinition( - fmt="%Y-%m-%d|%H:%M", - start="2024-01-31|00:00", - cron_schedule="0 0/6 * * *", - ), -) - -archive_icon_global_sl_job = kbatch_huggingface_graph.to_job( - name="archive_icon_global_sl_job", - partitions_def=icon_global_zarr_archive.partitions_def, - config=create_kbatch_huggingface_graph_config( - nwp_config=NWPConsumerConfig( - source="icon", - sink="huggingface", - docker_tag="refactor-service-loop", - zdir="single-level/data", - env={ - "ICON_MODEL": "global", - "ICON_PARAMETER_GROUP": "single-level", - "HUGGINGFACE_TOKEN": os.getenv("HUGGINGFACE_TOKEN", default="not-set"), - "HUGGINGFACE_REPO_ID": "sol-ocf/test-dwd-global", - }, - ), - hf_config=HFFileConfig(hf_repo_id="sol-ocf/test-dwd-global"), - am_config=AssetMaterializationConfig( - asset_key=list(icon_global_zarr_archive.key.path), - asset_description="Global ICON Zarr Archive stored in huggingface.", - ), - ), -) - - -archive_icon_global_ml_job = kbatch_huggingface_graph.to_job( - name="archive_icon_global_ml_job", - partitions_def=icon_global_zarr_archive.partitions_def, - config=create_kbatch_huggingface_graph_config( - nwp_config=NWPConsumerConfig( - source="icon", - sink="huggingface", - docker_tag="main", - zdir="multi-level/data", - env={ - "ICON_MODEL": "global", - "ICON_PARAMETER_GROUP": "multi-level", - "HUGGINGFACE_TOKEN": os.getenv("HUGGINGFACE_TOKEN", default="not-set"), - "HUGGINGFACE_REPO_ID": "sol-ocf/test-dwd-global", - }, - ), - hf_config=HFFileConfig(hf_repo_id="sol-ocf/test-dwd-global"), - am_config=AssetMaterializationConfig( - asset_key=list(icon_global_zarr_archive.key.path), - asset_description="Global ICON Zarr Archive stored in huggingface.", - ), - ), -) diff --git a/cloud_archives/ops/__init__.py b/cloud_archives/ops/__init__.py deleted file mode 100644 index 4b2bc6d..0000000 --- a/cloud_archives/ops/__init__.py +++ /dev/null @@ -1,11 +0,0 @@ -from . import ( - generic, - huggingface, - kbatch, -) - -__all__ = [ - "generic", - "huggingface", - "kbatch", -] diff --git a/cloud_archives/ops/generic.py b/cloud_archives/ops/generic.py deleted file mode 100644 index 83ab871..0000000 --- a/cloud_archives/ops/generic.py +++ /dev/null @@ -1,53 +0,0 @@ -"""Defines implementation-agnostic ops for generic graph-building.""" - - -import dagster as dg -from pydantic import Field - - -class AssetMaterializationConfig(dg.Config): - """Configuration for asset materialisation. - - Builds upon the dagster Config type, allowing for the configuration to be - passed to an Op in a dagster pipeline. Default values of an ellipsis (...) - are used to indicate that the value must be provided. - """ - - asset_key: list[str] = Field( - description="The key of the asset to materialise.", - default=..., - ) - asset_description: str | None = Field( - description="A description of the asset.", - default=None, - ) - - -@dg.op -def log_asset_materialization( - context: dg.OpExecutionContext, - config: AssetMaterializationConfig, - metadata: dict[str, dg.MetadataValue], -) -> None: - """Materialises an asset according to the config.""" - context.log_event( - dg.AssetMaterialization( - asset_key=config.asset_key, - description=config.asset_description, - partition=context.partition_key if context.has_partition_key else None, - metadata=metadata, - ), - ) - - -@dg.op( - ins={"depends_on": dg.In(dg.Nothing)}, -) -def raise_exception() -> None: - """Dagster Op that raises an exception. - - This Op is used to mark a branch in a graph as being undesirable. - Defines a "Nothing" input to allow for the op to have upstream dependencies - in a graph without the passing of data. - """ - raise Exception("Reached exception Op.") diff --git a/cloud_archives/ops/huggingface.py b/cloud_archives/ops/huggingface.py deleted file mode 100644 index 92c3131..0000000 --- a/cloud_archives/ops/huggingface.py +++ /dev/null @@ -1,123 +0,0 @@ -"""Dagster ops and resources for interacting with huggingface datasets.""" -import datetime as dt -import os - -import dagster as dg -from huggingface_hub import hf_hub_url -from huggingface_hub.hf_api import HfApi, RepoFile -from pydantic import Field - - -class HFFileConfig(dg.Config): - """Configuration for huggingface. - - Builds upon the dagster Config type, allowing for the configuration to be - passed to an Op in a dagster pipeline. - - Default values of an ellipsis (...) are used to indicate that the value - must be provided when the configuration object is instantiated. - """ - - hf_repo_id: str = Field( - description="The id of the huggingface repo to archive to.", - default=..., - ) - file_init_time: str = Field( - description="The initialisation time of the data of interest.", - default=dt.datetime.now(dt.UTC) - .replace( - hour=0, - minute=0, - second=0, - microsecond=0, - ) - .strftime("%Y-%m-%d|%H:%M"), - ) - - -@dg.op( - ins={"depends_on": dg.In(dg.Nothing)}, - out={ - "file_metadata": dg.Out(dict[str, dg.MetadataValue], is_required=False), - "no_such_file": dg.Out(bool, is_required=False), - }, -) -def get_hf_zarr_file_metadata( - context: dg.OpExecutionContext, - config: HFFileConfig, -) -> tuple[dict[str, dg.MetadataValue], bool]: - """Dagster op to get metadata for a zarr file in a huggingface dataset. - - Assumes the zarr files are stored in a folder structure of the form: - data/{year}/{month}/{day} - and that the names of the zarr files contain the initialisation time: - {year}{month}{day}T{hour}{minute}.zarr.zip - where the time parts correspond to the initialisation time of the file. - - Defines a "Nothing" input to allow for the op to have upstream dependencies - in a graph without the passing of data, as well as two outputs, - - file_metadata: The metadata for the zarr file that was found. - - no_such_file: A signal that no file was found for the given init time. - This is done instead of simply raising an error when no files are found, - as it allows for a branching configuration: downstream Ops can decide - how to handle the case where either one or none files are found. - - Args: - context: The dagster context. - config: Configuration for where to look on huggingface. - - Returns: - Either the metadata for the zarr file that was found, or a signal that - no file was found for the given init time. - """ - # Get the init time from the config or the partition key - itstring: str = config.file_init_time - if context.has_partition_key: - itstring = context.partition_key - it: dt.datetime = dt.datetime.strptime(itstring, "%Y-%m-%d|%H:%M").replace(tzinfo=dt.UTC) - - api = HfApi(token=os.getenv("HUGGINGFACE_TOKEN", default=None)) - # Check if there is an init time folder - if ( - len( - api.get_paths_info( - repo_id=config.hf_repo_id, - repo_type="dataset", - paths=f"data/{it.strftime('%Y/%m/%d')}", - ), - ) - == 0 - ): - files: list[RepoFile] = [] - else: - # List all files in the repo folder for the given init time's date - # and filter for zarr files named according to the init time - files: list[RepoFile] = [ - p - for p in api.list_repo_tree( - repo_id=config.hf_repo_id, - repo_type="dataset", - path_in_repo=f"data/{it.strftime('%Y/%m/%d')}", - ) - if isinstance(p, RepoFile) - and p.path.endswith(".zarr.zip") - and f"{it.strftime('%Y%m%dT%H%M')}" in p.path - ] - - if len(files) == 0: - context.log.info("No files found in the repo for the given init time.") - yield dg.Output(True, "no_such_file") - else: - rf: RepoFile = next(iter(files)) - context.log.info(f"Found file {rf} in repo {config.hf_repo_id}.") - # Map RepoFile object to a dagster metadata dict - metadata: dict[str, dg.MetadataValue] = { - "file": dg.MetadataValue.path(rf.path), - "url": dg.MetadataValue.url( - hf_hub_url(repo_id=config.hf_repo_id, repo_type="dataset", filename=rf.path), - ), - "size (bytes)": dg.MetadataValue.int(rf.size), - "blob ID": dg.MetadataValue.text(rf.blob_id), - } - yield dg.Output(metadata, "file_metadata") - diff --git a/cloud_archives/ops/kbatch.py b/cloud_archives/ops/kbatch.py deleted file mode 100644 index 973d492..0000000 --- a/cloud_archives/ops/kbatch.py +++ /dev/null @@ -1,406 +0,0 @@ -"""Dagster operations for running kbatch jobs. - -Define operations and helper functions for running the nwp-consumer -as a kbatch job on a kubernetes cluster. The operations are designed -to be run as part of a dagster pipeline. - -The key method is `kbatch_consumer_graph`, which combines a selection -of operations into a graph that configures, runs, and tracks a kbatch -nwp-consumer job, streaming logs back to stdout and cleaning up -resources on error or success. -""" - -import datetime as dt -import time -from types import GeneratorType - -import dagster as dg -import httpx -import kbatch._core as kbc -from kbatch._types import Job -from pydantic import Field - -# --- CONSTANTS --- # - -# Set the kbatch url and token arguments to none in all calls to kbatch -# * Don't ask me why, but setting them as one would expect manually -# (through env vars) in these parameters doesn't work. Instead, force -# the kbatch core to find them from the environment by setting them -# to None. -KBATCH_DICT = { - "kbatch_url": None, - "token": None, -} - - -# --- CLASSES AND METHODS --- # - - -class KbatchJobException(Exception): - """Exception raised when a kbatch job fails. - - Contains the name of the job that failed alongside the message. - Useful for enabling further handling of the job failure, e.g. - cleaning up of resources. - """ - - def __init__(self, message: str, job_name: str): - super().__init__(message) - self.job_name = job_name - - -@dg.failure_hook -def kbatch_job_failure_hook(context: dg.HookContext) -> None: - """Failure hook that deletes a kbatch job on exception. - - Can be applied to individual ops via - some_kbatch_op.with_failure_hook(kbatch_job_failure_hook)() - or to all ops in a job via - @dg.job(hooks={kbatch_job_failure_hook}) - - Args: - context: The dagster context within which the hook is operating. - """ - op_exception = context.op_exception - - if isinstance(op_exception, KbatchJobException): - job_name = op_exception.job_name - dg.get_dagster_logger().info(f"Deleting kbatch job {job_name}.") - kbc.delete_job(resource_name=job_name, **KBATCH_DICT) - - -def wait_for_status_change(old_status: str, job_name: str, timeout: int = 60 * 20) -> str: - """Wait for the status of a kbatch job to change from old_status. - - The amount of time to wait is modified by the timeout parameter. - - Args: - old_status: The status to wait for the job to change from. - job_name: The name of the job to check. - timeout: The maximum time to wait for the status to change. - - Returns: - The new status of the job. - """ - time_spent: int = 0 - while time_spent < timeout: - increment_secs: int = 30 - time.sleep(increment_secs) - time_spent += increment_secs - - # Get the status of the pod in the job - # * This can fail and be retried within the timeout limit so - # catch a number of recoverable errors. - try: - pods_info: list[dict] = kbc.list_pods(job_name=job_name, **KBATCH_DICT)["items"] - except httpx.ConnectError as e: - if "Temporary failure in name resolution" in str(e): - dg.get_dagster_logger().debug(f"Name resolution error, retrying: {e}") - continue - else: - raise e - except (httpx.ReadTimeout, httpx.ConnectTimeout) as e: - dg.get_dagster_logger().debug(f"Timed out listing pods, retrying: {e}") - continue - except httpx.HTTPStatusError as e: - if "503" in str(e): - dg.get_dagster_logger().debug(f"Service unavailable, retrying: {e}") - continue - else: - raise e - except Exception as e: - raise e - - if len(pods_info) == 0: - continue - - new_status: str = pods_info[0]["status"]["phase"] - - # Exit if status has changed - if new_status != old_status: - dg.get_dagster_logger().info( - f"Job {job_name} is no longer {old_status}, status: {new_status}.", - ) - if new_status == "Failed": - condition: str = pods_info[0]["status"]["container_statuses"][0]["state"] - dg.get_dagster_logger().error(f"Condition: {condition}") - return new_status - - # Log if still waiting every 10 minutes - if time_spent % (10 * 60) == 0: - dg.get_dagster_logger().debug( - f"Kbatch job {job_name} still {old_status} after {int(time_spent / 60)} mins.", - ) - - # Raise exception if timed out - if time_spent >= timeout: - dg.get_dagster_logger().info(pods_info[0]["status"]) - raise KbatchJobException( - message=f"Timed out waiting for status '{old_status}' to change.", - job_name=job_name, - ) - - return new_status - - -# --- OPS --- # - - -class NWPConsumerConfig(dg.Config): - """Configuration object for the nwp consumer. - - Defines the configuration for the running of the nwp-consumer docker image. - Builds upon the dagster Config type, allowing for the configuration to be - passed to an Op in a dagster pipeline. - - Default values of an ellipsis (...) are used to indicate that the value - must be provided when the configuration object is instantiated. - """ - - docker_tag: str = Field( - description="The tag of the nwp-consumer docker image to use.", - default="0.2.1", - ) - source: str = Field( - description="The source of the data to consume.", - default=..., - ) - sink: str = Field( - description="The sink to write the data to.", - default=..., - ) - zdir: str = Field( - description="The directory to write the data to.", - default="data", - ) - env: dict[str, str] = Field( - description="Environment variables to pass to the nwp-consumer.", - default_factory=lambda: {}, - ) - inittime: str = Field( - description="The initialisation time of the nwp data to consume.", - default=dt.datetime.now(dt.UTC) - .replace(hour=0, minute=0, second=0, microsecond=0) - .strftime("%Y-%m-%d|%H:%M"), - pattern=r"^\d{4}-\d{2}-\d{2}\|\d{2}:\d{2}$", - ) - no_rename_vars: bool = Field( - description="Don't rename variables.", - default=True, - ) - no_variable_dimension: bool = Field( - description="Don't specify variable dimensions.", - default=True, - ) - - -@dg.op( - ins={"depends_on": dg.In(dg.Nothing)}, -) -def define_kbatch_consumer_job( - context: dg.OpExecutionContext, - config: NWPConsumerConfig, -) -> Job: - """Define a kbatch job object to run the nwp-consumer. - - Builds a kbatch job object specifying the parameters required - to run the nwp-consumer docker image according to the - input configuration object. - - Args: - context: The dagster context. - config: Configuration for the nwp-consumer. - - Returns: - The kbatch job definition object. - """ - # Get the init time either from config or partition key - itstring = config.inittime - if context.has_partition_key: - itstring = context.partition_key - it = dt.datetime.strptime(itstring, "%Y-%m-%d|%H:%M").replace(tzinfo=dt.UTC) - - args = [ - "consume", - f"--source={config.source}", - f"--sink={config.sink}", - "--rsink=local", - "--rdir=/tmp/nwpc/raw", - f"--zdir={config.zdir}", - f"--from={it.strftime('%Y-%m-%dT%H:%M')}", - ] - - if config.no_rename_vars: - args = [*args, "--no-rename-vars"] - if config.no_variable_dimension: - args = [*args, "--no-variable-dim"] - - context.log.info(f"Running nwp-consumer with command: {args}") - - job = Job( - name=f"{config.source}-{config.sink}-backfill", - image=f"ghcr.io/openclimatefix/nwp-consumer:{config.docker_tag}", - env=config.env, - args=args, - ) - - return job - - -@dg.op -def submit_kbatch_job(context: dg.OpExecutionContext, job: Job) -> str: - """Submit a kbatch job object to the kbatch server. - - Requires one of the two following configurations set: - - - the appropriate kbatch token and url set in the environment variables - KBATCH_URL and JUPYTERHUB_API_TOKEN - - a `~/.config/kbatch/config.json file containing a dictionary with the - keys "kbatch_url" and "token". - - This can be generated using the kbatch CLI with the command: - kbatch configure --kbatch_url --token - - Defines a "Nothing" input to allow for the op to have upstream dependencies - in a graph without the passing of data. - - Args: - context: The dagster context. - job: A kbatch Job object defining the job to submit. - - Returns: - The name of the created job. - """ - # Request large pod sizes - profile: dict = { - "resources": { - "limits": { - "cpu": "8", - "memory": "64G", - }, - "requests": { - "cpu": "7.0", - "memory": "56G", - }, - }, - } - # Submit the job using kbatch core - result = kbc.submit_job(job=job, profile=profile, **KBATCH_DICT) - # Extract the job name from the result - job_name: str = result["metadata"]["name"] - context.log.info(f"Kbatch job {job_name} requested.") - - return job_name - - -@dg.op -def follow_kbatch_job( - context: dg.OpExecutionContext, - job_name: str, -) -> str: - """Blocking function that follows the status of a kbatch job. - - Waits for a job to start running, then follows the logs, streaming - back to stdout. Checks for failures within the logs and raises an - exception if the job fails. - - This function assumes the job is only running on a single pod. - On a partial read error the function will re-try the read. - - Args: - context: The dagster context. - job_name: The name of the job. - - Returns: - The name of the job. - """ - context.log.info("Assessing status of kbatch job.") - - # Pods take a short while to be provisioned - status: str = wait_for_status_change(old_status="Pending", job_name=job_name) - # If the pod fails to be provisioned there will be no logs to view. - # The condition will be printed to the logs (e.g. ImagePullBackoff) - if status == "Failed": - raise KbatchJobException( - message=f"Job {job_name} failed, see logs.", - job_name=job_name, - ) - - # Otherwise, wait up to timout for the pod to finish running - pod_name: str = kbc.list_pods(job_name=job_name, **KBATCH_DICT)["items"][0]["metadata"]["name"] - status = wait_for_status_change(old_status="Running", job_name=job_name, timeout=60 * 60 * 24) - - # Get the logs from the pod - - total_attempts: int = 0 - while total_attempts < 5: - try: - logs: str = kbc._logs( - pod_name=pod_name, - stream=False, - read_timeout=60 * 6, - **KBATCH_DICT, - ) - # Kbatch/Httpx seem keen to return generators even when "stream" is False - if isinstance(logs, GeneratorType): - for log in logs: - print(log) # noqa: T201 - else: - for line in logs.split("\n"): - print(line) # noqa: T201 - break - except (httpx.RemoteProtocolError, httpx.HTTPStatusError): - time.sleep(20) - total_attempts += 1 - continue - - context.log().warn("Failed to read logs after 3 attempts.") - - pods_info: list[dict] = kbc.list_pods(job_name=job_name, **KBATCH_DICT)["items"] - pod_status = pods_info[0]["status"]["phase"] - context.log.info(f"Captured all logs for job {job_name}; status '{pod_status}'.") - - if status == "Failed": - raise KbatchJobException( - message=f"Job {job_name} failed, see logs.", - job_name=job_name, - ) - - return job_name - - -@dg.op( - out={"job_name": dg.Out(str)}, -) -def delete_kbatch_job(job_name: str) -> str: - """Deletes a kbatch job. - - Args: - job_name: The name of the job. Must be a dagster op output. - """ - dg.get_dagster_logger().info(f"Deleting kbatch job {job_name}.") - kbc.delete_job(resource_name=job_name, **KBATCH_DICT) - return job_name - - -# --- GRAPHS --- # - - -@dg.graph -def kbatch_consumer_graph(depends_on: dg.Nothing) -> str: - """Graph for running the nwp-consumer as a kbatch job. - - Defines the set of operations that configure, run, and track a kbatch - nwp-consumer job, streaming logs back to stdout and deleting the job - upon completion. Any ops that manage or interact with a running kbatch - job also include a hook that deletes the job on exceptions in the graph. - - Implements a Nothing input to allow for the graph to have upstream - dependencies in a pipeline without the passing of data. - """ - job: Job = define_kbatch_consumer_job(depends_on=depends_on) - job_name: str = submit_kbatch_job.with_hooks({kbatch_job_failure_hook})(job=job) - job_name = follow_kbatch_job.with_hooks({kbatch_job_failure_hook})(job_name=job_name) - job_name = delete_kbatch_job(job_name=job_name) - - return job_name diff --git a/cloud_archives/pv/passiv/passiv_year.py b/cloud_archives/pv/passiv/passiv_year.py index 3b65252..da7fb5c 100644 --- a/cloud_archives/pv/passiv/passiv_year.py +++ b/cloud_archives/pv/passiv/passiv_year.py @@ -102,6 +102,3 @@ def pv_passiv_yearly_30min(context: dg.AssetExecutionContext): get_yearly_passiv_data(start_date, period=30) - - - diff --git a/constants.py b/constants.py deleted file mode 100644 index c214662..0000000 --- a/constants.py +++ /dev/null @@ -1,34 +0,0 @@ -"""Defines constant values for the nwp deployment.""" -import dataclasses as dc - - -@dc.dataclass -class StorageLocations: - """Defines the storage locations for a given environment.""" - - RAW_FOLDER: str - NWP_ZARR_FOLDER: str - STATIC_ZARR_FOLDER: str - POINT_ZARR_FOLDER: str - SAT_ZARR_FOLDER: str - EPHEMERAL_FOLDER: str - -# Defines the storage locations for each environment -LOCATIONS_BY_ENVIRONMENT: dict[str, StorageLocations] = { - "leo": StorageLocations( - RAW_FOLDER="/mnt/storage_c/raw", - NWP_ZARR_FOLDER="/mnt/storage_b", - STATIC_ZARR_FOLDER="/mnt/storage_a", - POINT_ZARR_FOLDER="/mnt/storage_a", - SAT_ZARR_FOLDER="/mnt/storage_a", - EPHEMERAL_FOLDER="/mnt/storage_c/ephemeral", - ), - "local": StorageLocations( - RAW_FOLDER="/tmp/raw", - NWP_ZARR_FOLDER="/tmp/zarr", - STATIC_ZARR_FOLDER="/tmp/zarr", - POINT_ZARR_FOLDER="/tmp/zarr", - SAT_ZARR_FOLDER="/tmp/zarr", - EPHEMERAL_FOLDER="/tmp/ephemeral", - ), -} diff --git a/dagster.yaml b/dagster.yaml new file mode 100644 index 0000000..753a2ef --- /dev/null +++ b/dagster.yaml @@ -0,0 +1,28 @@ +storage: + postgres: + postgres_db: + username: + env: DAGSTER_PG_USERNAME + password: + env: DAGSTER_PG_PASSWORD + hostname: + env: DAGSTER_PG_HOST + db_name: + env: DAGSTER_PG_DB + port: 5432 + +local_artifact_storage: + module: dagster.core.storage.root + class: LocalArtifactStorage + config: + base_dir: "/opt/dagster/local/" + +run_coordinator: + module: dagster.core.run_coordinator + class: QueuedRunCoordinator + config: + max_concurrent_runs: 30 + tag_concurrency_limits: + - key: "dagster/backfill" + limit: 15 + diff --git a/local_archives/__init__.py b/local_archives/__init__.py index 45265eb..4e2497e 100644 --- a/local_archives/__init__.py +++ b/local_archives/__init__.py @@ -3,55 +3,26 @@ import dagster as dg from dagster_docker import PipesDockerClient -import managers -import resources -from constants import LOCATIONS_BY_ENVIRONMENT - from . import nwp, sat -resources_by_env = { - "leo": { - "nwp_xr_zarr_io": managers.LocalFilesystemXarrayZarrManager( - base_path=LOCATIONS_BY_ENVIRONMENT["leo"].NWP_ZARR_FOLDER, - ), - "meteomatics_api": resources.MeteomaticsAPIResource( - username=dg.EnvVar("METEOMATICS_USERNAME"), - password=dg.EnvVar("METEOMATICS_PASSWORD"), - ), - "pipes_subprocess_client": dg.PipesSubprocessClient(), - "pipes_docker_client": PipesDockerClient(), - }, - "local": { - "nwp_xr_zarr_io": managers.LocalFilesystemXarrayZarrManager( - base_path=LOCATIONS_BY_ENVIRONMENT["local"].NWP_ZARR_FOLDER, - ), - "meteomatics_api": resources.MeteomaticsAPIResource( - username=dg.EnvVar("METEOMATICS_USERNAME"), - password=dg.EnvVar("METEOMATICS_PASSWORD"), - ), - "pipes_subprocess_client": dg.PipesSubprocessClient(), - "pipes_docker_client": PipesDockerClient(), - }, -} - -all_assets: list[dg.AssetsDefinition] = [ - *nwp.all_assets, - *sat.all_assets, -] - -all_jobs: list[dg.JobDefinition] = [ - *nwp.all_jobs, - *sat.all_jobs, -] +nwp_assets: list[dg.AssetsDefinition] = dg.load_assets_from_package_module( + package_module=nwp, + group_name="nwp", + key_prefix="nwp", +) -all_schedules: list[dg.ScheduleDefinition] = [ - *nwp.all_schedules, - *sat.all_schedules, -] +sat_assets: list[dg.AssetsDefinition] = dg.load_assets_from_package_module( + package_module=sat, + group_name="sat", + key_prefix="sat", +) defs = dg.Definitions( - assets=all_assets, - resources=resources_by_env[os.getenv("ENVIRONMENT", "local")], - jobs=all_jobs, - schedules=all_schedules, + assets=[*nwp_assets, *sat_assets], + resources={ + "pipes_subprocess_client": dg.PipesSubprocessClient(), + "pipes_docker_client": PipesDockerClient(), + }, + jobs=[], + schedules=[], ) diff --git a/local_archives/air/__init__.py b/local_archives/air/__init__.py new file mode 100644 index 0000000..4c3a92c --- /dev/null +++ b/local_archives/air/__init__.py @@ -0,0 +1,8 @@ +import dagster as dg + +air_assets: list[dg.AssetsDefinition] = dg.load_assets_from_package_name( + package_name="air", + group_name="air", + key_prefix=["air"], +) + diff --git a/local_archives/air/cams_eu.py b/local_archives/air/cams_eu.py new file mode 100644 index 0000000..ff53332 --- /dev/null +++ b/local_archives/air/cams_eu.py @@ -0,0 +1,130 @@ +"""NetCDF archive of Atmospheric Quality data from CAMS, covering Europe. + +CAMS is Copernicus' Atmospheric Monitoring Service, which provides +forecasts of atmospheric quality. + +Sourced via CDS API from Copernicus ADS (https://ads.atmosphere.copernicus.eu). +This asset is updated weekly, and surfaced as a zipped NetCDF file for each week +per variable. It is downloaded using the cdsapi Python package +(https://github.com/ecmwf/cdsapi). +""" + +import datetime as dt + +import dagster as dg +import cdsapi + +ARCHIVE_FOLDER = "/var/dagster-storage/air/cams-europe" +if os.getenv("ENVIRONMENT", "local") == "leo": + ARCHIVE_FOLDER = "/mnt/storage_b/air/cams-europe" + +partitions_def: dg.TimeWindowPartitionsDefinition = dg.WeeklyPartitionsDefinition( + start_date="2020-02-08", + end_offset=-2, +) + +@dg.asset( + name="cams-europe", + description=__doc__, + key_prefix=["air"], + metadata={ + "archive_folder": dg.MetadataValue.text(ARCHIVE_FOLDER), + "area": dg.MetadataValue.text("europe"), + "source": dg.MetadataValue.text("copernicus-ads"), + "model": dg.MetadataValue.text("cams"), + "format": dg.MetadataValue.text("netcdf"), + "expected_runtime": dg.MetadataValue.text("6 hours"), + }, + compute_kind="python", + automation_condition=dg.AutomationCondition.on_cron( + cron_schedule=partitions_def.get_cron_schedule( + hour_of_day=7, + ), + tags={ + "dagster/max_runtime": str(60 * 60 * 24 * 4), # Should take about 2 days + "dagster/priority": "1", + "dagster/concurrency_key": "copernicus-ads", + }, + partitions_def=partitions_def, +) +def cams_eu_raw_asset(context: dg.AssetExecutionContext) -> dg.Output[list[pathlib.Path]]: + it_start: dt.datetime = context.partition_time_window.start + it_end: dt.datetime = context.partition_time_window.end + execution_start = dt.datetime.now(tz=dt.UTC) + stored_files: list[pathlib.Path] = [] + + variables: list[str] = [ + "alder_pollen", + "ammonia", + "birch_pollen", + "carbon_monoxide", + "dust", + "grass_pollen", + "nitrogen_dioxide", + "nitrogen_monoxide", + "non_methane_vocs", + "olive_pollen", + "ozone", + "particulate_matter_10um", + "particulate_matter_2.5um", + "peroxyacyl_nitrates", + "pm10_wildfires", + "ragweed_pollen", + "secondary_inorganic_aerosol", + "sulphur_dioxide", + ] + + for var in variables: + dst: pathlib.Path = pathlib.Path(ARCHIVE_FOLDER) / "raw" / f"{it_start:%Y%m%d}-{it_end:%Y%m%d}_{var}.nc.zip" + dst.parent.mkdir(parents=True, exist_ok=True) + + if dst.exists(): + context.log.info(f"File already exists, skipping download", extra={ + "file": dst.as_posix(), + }) + stored_files.append(dst) + continue + + request: dict[str, Any]: { + "date": [f"{it_start:%Y-%m-%d}/{it_end:%Y-%m-%d}"], + "type": ["forecast"], + "time": ["00:00"], + "model": ["ensemble"], + "leadtime_hour": [str(x) for x in range(0, 97)], + "data_format": ["netcdf_zip"], + "level": ["0", "50", "250", "500", "1000", "3000", "5000"], + "variable": [var], + } + + context.log.info(f"Reqesting file from Copernicus ADS via CDS API", extra={ + "request": sl_var_request, + "target": dst.as_posix(), + }) + client = cdsapi.Client() + client.retrieve( + name="cams-europe-air-quality-forecast", + request=request, + target=dst.as_posix(), + ) + context.log.info(f"Downloaded file {dst.as_posix()} from Copernicus ADS via CDS API", extra={ + "file": dst.as_posix(), + "size": dst.stat().st_size, + }) + stored_files.append(dst) + + if len(stored_files) == 0: + raise Exception( + "No remote files found for this partition key. See logs for more details.", + ) + + elapsed_time: dt.timedelta = dt.datetime.now(tz=dt.UTC) - execution_start + + return dg.Output( + value=stored_files, + metadata={ + "files": dg.MetadataValue.text(", ".join([f.as_posix() for f in stored_files])), + "partition_size": dg.MetadataValue.int(sum[f.stat().st_size for f in stored_files]), + "elapsed_time_hours": dg.MetadataValue.float(elapsed_time / dt.timedelta(hours=1)), + }, + ) + diff --git a/local_archives/nwp/__init__.py b/local_archives/nwp/__init__.py index d86d55d..e69de29 100644 --- a/local_archives/nwp/__init__.py +++ b/local_archives/nwp/__init__.py @@ -1,60 +0,0 @@ -"""Definitions for the NWP dagster code location.""" - -import dagster as dg - -from . import cams, ceda, ecmwf, jobs, meteomatics, gfs - -all_assets: list[dg.AssetsDefinition] = [ - *ceda.all_assets, - *ecmwf.all_assets, - *cams.all_assets, - *meteomatics.all_assets, - *gfs.all_assets, -] - -all_jobs: list[dg.JobDefinition] = [ - jobs.scan_nwp_raw_archive, - jobs.scan_nwp_zarr_archive, -] - - -@dg.schedule( - job=jobs.scan_nwp_raw_archive, - cron_schedule="0 3 * * *", - default_status=dg.DefaultScheduleStatus.RUNNING, -) -def scan_nwp_raw_archives_schedule(context: dg.ScheduleEvaluationContext) -> dg.RunRequest: - """Scan the raw archives. - - Yields a RunRequest for the scan_nwp_raw_archive job for each raw archive. - """ - raw_assets: list[dg.AssetsDefinition] = [a for a in all_assets if "raw_archive" in a.key.path] - for a in raw_assets: - yield dg.RunRequest( - run_key=f"scan_nwp_{a.key.path[1]}_{a.key.path[2]}_{a.key.path[3]}", - run_config=jobs.gen_run_config(a.key), - ) - - -@dg.schedule( - job=jobs.scan_nwp_zarr_archive, - cron_schedule="15 3 * * *", - default_status=dg.DefaultScheduleStatus.RUNNING, -) -def scan_nwp_zarr_archives_schedule(context: dg.ScheduleEvaluationContext) -> dg.RunRequest: - """Scan the zarr archives. - - Yields a RunRequest for the scan_nwp_zarr_archive job for each zarr archive. - """ - zarr_assets: list[dg.AssetsDefinition] = [a for a in all_assets if "zarr_archive" in a.key.path] - for a in zarr_assets: - yield dg.RunRequest( - run_key=f"scan_nwp_{a.key.path[1]}_{a.key.path[2]}_{a.key.path[3]}", - run_config=jobs.gen_run_config(a.key), - ) - - -all_schedules: list[dg.ScheduleDefinition] = [ - scan_nwp_raw_archives_schedule, - scan_nwp_zarr_archives_schedule, -] diff --git a/local_archives/nwp/_generic_definitions_factory.py b/local_archives/nwp/_generic_definitions_factory.py deleted file mode 100644 index b3a2a5e..0000000 --- a/local_archives/nwp/_generic_definitions_factory.py +++ /dev/null @@ -1,210 +0,0 @@ -"""Defines a factory for creating nwp-consumer-backed assets and jobs.""" - -import dataclasses as dc -import datetime as dt -import os -import pathlib -import shutil -from typing import Literal - -import dagster as dg -import numpy as np -import xarray as xr -from nwp_consumer.internal import IT_FOLDER_STRUCTURE_RAW, FetcherInterface, FileInfoModel - -from constants import LOCATIONS_BY_ENVIRONMENT - -env = os.getenv("ENVIRONMENT", "local") -RAW_FOLDER = LOCATIONS_BY_ENVIRONMENT[env].RAW_FOLDER -ZARR_FOLDER = LOCATIONS_BY_ENVIRONMENT[env].NWP_ZARR_FOLDER - - -@dc.dataclass -class MakeDefinitionsOptions: - """Typesafe options for the make_asset_definitions function.""" - - area: str - fetcher: FetcherInterface - source: Literal["ecmwf", "ceda", "cams"] - partitions: dg.TimeWindowPartitionsDefinition - - def key_prefix(self) -> list[str]: - """Generate an asset key prefix based on the area. - - The prefix is important as it defines the folder structure under which - assets are stored. - """ - return ["nwp", self.source, self.area] - - -@dc.dataclass -class MakeDefinitionsOutputs: - """Typesafe outputs for the make_definitions function.""" - - raw_asset: dg.AssetsDefinition - zarr_asset: dg.AssetsDefinition - - -def make_definitions( - opts: MakeDefinitionsOptions, -) -> MakeDefinitionsOutputs: - """Generates assets and associated jobs for NWP-consumer data.""" - - # The Raw Archive asset has the following properties: - # * Key Prefix: nwp/{source}/{area} - defines part of the storage folder structure - # * Auto Materialize Policy: Eagerly materialize the asset when the raw archive is updated - # ** This is checked on a cron schedule every tuesday and saturday at midnight, and up - # ** to 10 materializations are allowed per check. - # * Partitions: Defines the partitioning scheme for the asset - # * Check Specs: Defines the checks that should be performed on the asset - @dg.asset( - name="raw_archive", - key_prefix=opts.key_prefix(), - automation_condition=dg.AutomationCondition.eager(), - partitions_def=opts.partitions, - check_specs=[ - dg.AssetCheckSpec( - name="num_local_is_num_remote", - asset=[*opts.key_prefix(), "raw_archive"], - ), - dg.AssetCheckSpec(name="nonzero_local_size", asset=[*opts.key_prefix(), "raw_archive"]), - ], - metadata={ - "archive_folder": dg.MetadataValue.text(f"{RAW_FOLDER}/{'/'.join(opts.key_prefix())}"), - "area": dg.MetadataValue.text(opts.area), - "source": dg.MetadataValue.text(opts.source), - }, - compute_kind="download", - op_tags={"dagster/max_runtime": int(60 * 100)}, - ) - def _raw_archive( - context: dg.AssetExecutionContext, - ) -> dg.Output[list[pathlib.Path]]: - """Locally stored archive of raw data.""" - execution_start = dt.datetime.now(tz=dt.UTC) - - # List all available source files for this partition - # TODO: Enable single run backfills - it = context.partition_time_window.start - context.log.info( - f"Listing files for init time {it.strftime('%Y-%m-%d %H:%M')} from {opts.source}.", - ) - fileinfos: list[FileInfoModel] = opts.fetcher.listRawFilesForInitTime(it=it) - - if len(fileinfos) == 0: - raise ValueError("No files found for this partition. See error logs.") - - context.log.info(f"Found {len(fileinfos)} files for this partition.") - - # For each file in the remote archive, download and store it - stored_paths: list[pathlib.Path] = [] - sizes: list[int] = [] - - # Store the file based on the asset key prefix and the init time of the file - loc = "/".join(context.asset_key.path[:-1]) - for fi in fileinfos: - dst = pathlib.Path( - f"{RAW_FOLDER}/{loc}/{fi.it().strftime(IT_FOLDER_STRUCTURE_RAW)}/{fi.filename()}", - ) - - # If the file already exists, don't re download it - if dst.exists() and dst.stat().st_size > 0: - context.log.info( - f"File {fi.filename()} already exists at {dst.as_posix()}. Skipping download.", - ) - stored_paths.append(dst) - sizes.append(dst.stat().st_size) - continue - - # Otherwise, download it and store it - if dst.exists() and dst.stat().st_size == 0: - dst.unlink() - context.log.info( - f"Downloading file {fi.filename()} to {dst.as_posix()}", - ) - # Download to temp fails soft, so we need to check the src - # to see if it is an empty path. - src = opts.fetcher.downloadToCache(fi=fi) - if src is None or src == pathlib.Path(): - raise ValueError( - f"Error downloading file {fi.filename()}. See stdout logs for details.", - ) - context.log.info(f"Moving file {src.as_posix()} to {dst.as_posix()}") - dst.parent.mkdir(parents=True, exist_ok=True) - shutil.move(src=src, dst=dst) - - stored_paths.append(dst) - sizes.append(dst.stat().st_size) - - elapsed_time = dt.datetime.now(tz=dt.UTC) - execution_start - - yield dg.Output( - stored_paths, - metadata={ - "inittime": dg.MetadataValue.text(context.asset_partition_key_for_output()), - "partition_num_files": dg.MetadataValue.int(len(stored_paths)), - "file_paths": dg.MetadataValue.text(str([f.as_posix() for f in stored_paths])), - "partition_size": dg.MetadataValue.int(sum(sizes)), - "area": dg.MetadataValue.text(opts.area), - "elapsed_time_mins": dg.MetadataValue.float(elapsed_time / dt.timedelta(minutes=1)), - }, - ) - - # Perform the checks defined in the check_specs above - yield dg.AssetCheckResult( - check_name="num_local_is_num_remote", - passed=bool(len(stored_paths) == len(fileinfos)), - ) - yield dg.AssetCheckResult( - check_name="nonzero_local_size", - passed=bool(np.all(sizes)), - ) - - # The Zarr Archive asset has the following properties: - # * Key Prefix: nwp/{source}/{area} - defines part of the storage folder structure - # * Auto Materialize Policy: Eagerly materialize the asset when the raw archive is updated - # * Partitions: Defines the partitioning scheme for the asset - @dg.asset( - name="zarr_archive", - key_prefix=opts.key_prefix(), - partitions_def=opts.partitions, - automation_condition=dg.AutomationCondition.eager(), - ins={"raw_paths": dg.AssetIn(key=_raw_archive.key)}, - io_manager_key="nwp_xr_zarr_io", - compute_kind="process", - metadata={ - "archive_folder": dg.MetadataValue.text(f"{ZARR_FOLDER}/{'/'.join(opts.key_prefix())}"), - "area": dg.MetadataValue.text(opts.area), - "source": dg.MetadataValue.text(opts.source), - }, - op_tags={"dagster/max_runtime": 60 * 10}, - ) - def _zarr_archive( - context: dg.AssetExecutionContext, - raw_paths: list[pathlib.Path], - ) -> dg.Output[xr.Dataset]: - """Locally stored archive of zarr-formatted xarray data.""" - execution_start = dt.datetime.now(tz=dt.UTC) - # Convert each file to an xarray dataset and merge - datasets: list[xr.Dataset] = [] - for path in raw_paths: - context.log.info(f"Converting raw file at {path.as_posix()} to xarray dataset.") - datasets.append(opts.fetcher.mapCachedRaw(p=path)) - context.log.info(f"Merging {len(datasets)} datasets into one.") - ds = xr.merge(datasets, combine_attrs="drop_conflicts") - - elapsed_time = dt.datetime.now(tz=dt.UTC) - execution_start - - return dg.Output( - ds, - metadata={ - "inittime": dg.MetadataValue.text(context.asset_partition_key_for_output()), - "dataset": dg.MetadataValue.md(str(ds)), - "elapsed_time_mins": dg.MetadataValue.float(elapsed_time / dt.timedelta(minutes=1)), - }, - ) - - return MakeDefinitionsOutputs( - raw_asset=_raw_archive, - zarr_asset=_zarr_archive, - ) diff --git a/local_archives/nwp/cams/__init__.py b/local_archives/nwp/cams/__init__.py deleted file mode 100644 index 8f58e31..0000000 --- a/local_archives/nwp/cams/__init__.py +++ /dev/null @@ -1,15 +0,0 @@ -import dagster as dg - -from . import cams_eu, cams_global - -eu_assets = dg.load_assets_from_modules( - modules=[cams_eu], - group_name="cams_eu", -) - -global_assets = dg.load_assets_from_modules( - modules=[cams_global], - group_name="cams_global", -) - -all_assets: list[dg.AssetsDefinition] = [*eu_assets, *global_assets] diff --git a/local_archives/nwp/cams/_definitions_factory.py b/local_archives/nwp/cams/_definitions_factory.py deleted file mode 100644 index 0cd60ad..0000000 --- a/local_archives/nwp/cams/_definitions_factory.py +++ /dev/null @@ -1,257 +0,0 @@ -import dataclasses as dc -import datetime as dt -import os -import pathlib -from typing import Any, Literal - -import cdsapi -import dagster as dg - -from constants import LOCATIONS_BY_ENVIRONMENT - -env = os.getenv("ENVIRONMENT", "local") -RAW_FOLDER = LOCATIONS_BY_ENVIRONMENT[env].RAW_FOLDER -IT_FOLDER_FMTSTR = "%Y/%m/%d/%H%M" - -@dc.dataclass -class VariableSelection: - """Defines the variables to request from CAMS.""" - - # The slow variables are those that are only available from tape - # The dictionary maps the variable group name to a list of variables - # to pull within that groups' request. This can be one to [one]. - slow: dict[str, list[str]] = dc.field(default_factory=dict) - # The fast variables are those that are available from disk - # The dictionary maps the variable group name to a list of variables - # to pull within that groups' request. This can be one to [one]. - fast: dict[str, list[str]] = dc.field(default_factory=dict) - # The hours to pull - hours: list[str] = dc.field(default_factory=list) - -@dc.dataclass -class MakeDefinitionsOptions: - """Typesafe options for the make_asset_definitions function.""" - - area: str - file_format: Literal["grib", "netcdf"] - partitions: dg.TimeWindowPartitionsDefinition - client: cdsapi.Client - multilevel_vars: VariableSelection | None = None - multilevel_levels: list[str] | None = None - singlelevel_vars: VariableSelection | None = None - - def key_prefix(self) -> list[str]: - """Generate an asset key prefix based on the area. - - The prefix is important as it defines the folder structure under which - assets are stored. - """ - return ["nwp", "cams", self.area] - - def dataset_name(self) -> str: - """Generate a dataset name based on the area.""" - match self.area: - case "eu": - return "cams-europe-air-quality-forecasts" - case "global": - return "cams-global-atmospheric-composition-forecasts" - case _: - raise ValueError(f"Area {self.area} not supported") - - -@dc.dataclass -class CamsFileInfo: - """Information about a remote file from the CAMS CDS API. - - Mirrors the structure of the cdsapi.api.Result.toJSON() method: - https://github.com/ecmwf/cdsapi/blob/master/cdsapi/api.py - Also adds in a field to hold the variable name and inittime. - """ - - resultType: str - contentType: str - contentLength: int - location: str - var: str - inittime: dt.datetime - - -@dc.dataclass -class MakeDefinitionsOutputs: - """Outputs from the make_asset_definitions function.""" - - raw_asset: dg.AssetsDefinition - - -def make_definitions( - opts: MakeDefinitionsOptions, -) -> MakeDefinitionsOutputs: - """Generate the assets for a CAMS datset.""" - - @dg.asset( - name="raw_archive", - key_prefix=opts.key_prefix(), - partitions_def=opts.partitions, - compute_kind="download", - op_tags={ - "expected_runtime": "5hrs", - "MAX_RUNTIME_SECONDS_TAG": 20 * 60 * 60, - }, - ) - def _cams_raw_archive(context: dg.AssetExecutionContext) -> dg.Output[list[pathlib.Path]]: - """Asset detailing all wanted remote files from CAMS.""" - execution_start = dt.datetime.now(tz=dt.UTC) - - stored_files: list[pathlib.Path] = [] - sizes: list[int] = [] - - # Check if partition is targeting a time more than 30 days old - # * CAMS data older than 30 days is only available from tape - # * These variables are slower to collect - it = context.partition_time_window.start - use_slow: bool = False - if (dt.datetime.now(tz=dt.UTC) - it) > dt.timedelta(days=30): - context.log.info( - f"Partition {context.partition_key} is targeting a time more than 30 days old. " - + "Pulling variables from tape, this may take a while.", - ) - use_slow = True - - # First handle single level variables - if opts.singlelevel_vars is not None: - for name, varlist in ( - opts.singlelevel_vars.slow if use_slow else opts.singlelevel_vars.fast - ).items(): - - # Create the target file path for the current set of vars - loc = "/".join(context.asset_key.path[:-1]) - ext = ".grib" if opts.file_format == "grib" else ".nc" - dst = pathlib.Path( - f"{RAW_FOLDER}/{loc}/{it.strftime(IT_FOLDER_FMTSTR)}/" - + f"{it.strftime('%Y%m%d%H')}_{name}{ext}", - ) - # If the file already exists, don't redownload it - if dst.exists(): - stored_files.append(dst) - sizes.append(dst.stat().st_size) - context.log.info(f"File {dst.as_posix()} already exists, skipping", extra={ - "file": dst.as_posix(), - "size": dst.stat().st_size, - }) - continue - - dst.parent.mkdir(parents=True, exist_ok=True) - dst.touch() - - # Othrwise, build the request - sl_var_request: dict[str, Any] = { - "date": it.strftime("%Y-%m-%d/%Y-%m-%d"), - "type": "forecast", - "format": opts.file_format, - "variable": varlist, - "leadtime_hour": opts.singlelevel_vars.hours, - "time": it.strftime("%H:%M"), - } - if opts.area == "eu": - sl_var_request["model"] = "ensemble" - - # Request the file and download it to the target - context.log.info(f"Reqesting file {dst.as_posix()} from CDS API", extra={ - "request": sl_var_request, - "target": dst.as_posix(), - }) - result = opts.client.retrieve( - name=opts.dataset_name(), - request=sl_var_request, - target=dst.as_posix(), - ) - stored_files.append(dst) - sizes.append(dst.stat().st_size) - context.log.info(f"File {dst.as_posix()} downloaded from CDS API", extra={ - "file": dst.as_posix(), - "size": dst.stat().st_size, - }) - - # TODO: Split up multi-variables stored files into a single file per variable - # using grib_filter - - # Then handle multilevel variables - if opts.multilevel_vars is not None: - for name, varlist in ( - opts.multilevel_vars.slow if use_slow else opts.multilevel_vars.fast - ).items(): - - # Create the target file path for the current set of vars - loc = "/".join(context.asset_key.path[:-1]) - ext = ".grib" if opts.file_format == "grib" else ".nc" - dst = pathlib.Path( - f"{RAW_FOLDER}/{loc}/{it.strftime(IT_FOLDER_FMTSTR)}/" - + f"{it.strftime('%Y%m%d%H')}_{name}{ext}", - ) - - # If the file already exists, don't redownload it - if dst.exists(): - stored_files.append(dst) - sizes.append(dst.stat().st_size) - context.log.info(f"File {dst.as_posix()} already exists, skipping", extra={ - "file": dst.as_posix(), - "size": dst.stat().st_size, - }) - continue - - dst.parent.mkdir(parents=True, exist_ok=True) - dst.touch() - - # Othrwise, build the request - ml_var_request: dict[str, Any] = { - "date": it.strftime("%Y-%m-%d/%Y-%m-%d"), - "type": "forecast", - "format": opts.file_format, - "variable": varlist, - "leadtime_hour": opts.multilevel_vars.hours, - "time": it.strftime("%H:%M"), - } - if opts.area == "eu": - ml_var_request["level"] = opts.multilevel_levels - ml_var_request["model"] = "ensemble" - else: - ml_var_request["pressure_level"] = opts.multilevel_levels - - # Request the file and download it to the target - context.log.info(f"Reqesting file {dst.as_posix()} from CDS API", extra={ - "request": ml_var_request, - "target": dst.as_posix(), - }) - result = opts.client.retrieve( - name=opts.dataset_name(), - request=ml_var_request, - target=dst.as_posix(), - ) - stored_files.append(dst) - sizes.append(dst.stat().st_size) - context.log.info(f"File {dst.as_posix()} downloaded from CDS API", extra={ - "file": dst.as_posix(), - "size": dst.stat().st_size, - }) - - - if len(stored_files) == 0: - raise Exception( - "No remote files found for this partition key. See logs for more details.", - ) - - elapsed_time: dt.timedelta = dt.datetime.now(tz=dt.UTC) - execution_start - - return dg.Output( - stored_files, - metadata={ - "inittime": dg.MetadataValue.text(context.asset_partition_key_for_output()), - "num_files": dg.MetadataValue.int(len(stored_files)), - "partition_size": dg.MetadataValue.int(sum(sizes)), - "elapsed_time_mins": dg.MetadataValue.float(elapsed_time / dt.timedelta(minutes=1)), - }, - ) - - return MakeDefinitionsOutputs( - raw_asset=_cams_raw_archive, - ) diff --git a/local_archives/nwp/cams/cams_eu.py b/local_archives/nwp/cams/cams_eu.py deleted file mode 100644 index dabb970..0000000 --- a/local_archives/nwp/cams/cams_eu.py +++ /dev/null @@ -1,57 +0,0 @@ -import datetime as dt - -import dagster as dg -from cdsapi import Client - -from ._definitions_factory import ( - MakeDefinitionsOptions, - MakeDefinitionsOutputs, - VariableSelection, - make_definitions, -) - -# CAMS data is only available from 3 years ago onwards -start_date: dt.datetime = dt.datetime.now(tz=dt.UTC) - dt.timedelta(days=3 * 365) -cams_eu_partitions: dg.TimeWindowPartitionsDefinition = dg.TimeWindowPartitionsDefinition( - start=start_date.strftime("%Y-%m-%dT%H:%M"), - cron_schedule="0 0 * * *", # Daily at midnight - fmt="%Y-%m-%dT%H:%M", -) - -VARIABLES = [ - "alder_pollen", - "ammonia", - "birch_pollen", - "carbon_monoxide", - "dust", - "grass_pollen", - "nitrogen_dioxide", - "nitrogen_monoxide", - "non_methane_vocs", - "olive_pollen", - "ozone", - "particulate_matter_10um", - "particulate_matter_2.5um", - "peroxyacyl_nitrates", - "pm10_wildfires", - "ragweed_pollen", - "secondary_inorganic_aerosol", - "sulphur_dioxide", -] - -opts: MakeDefinitionsOptions = MakeDefinitionsOptions( - area="eu", - file_format="netcdf", - multilevel_vars=VariableSelection( - slow={v: [v] for v in VARIABLES}, - fast={v: [v] for v in VARIABLES}, - hours=[str(x) for x in range(0, 97)], - ), - multilevel_levels=["0", "1000", "2000", "250", "3000", "50", "500", "5000"], - partitions=cams_eu_partitions, - client=Client(), -) - -defs: MakeDefinitionsOutputs = make_definitions(opts=opts) - -cams_eu_raw_archive = defs.raw_asset diff --git a/local_archives/nwp/cams/cams_global.py b/local_archives/nwp/cams/cams_global.py deleted file mode 100644 index 2c2bac7..0000000 --- a/local_archives/nwp/cams/cams_global.py +++ /dev/null @@ -1,300 +0,0 @@ -import dagster as dg -from cdsapi import Client - -from ._definitions_factory import ( - MakeDefinitionsOptions, - MakeDefinitionsOutputs, - VariableSelection, - make_definitions, -) - -cams_global_partitions = dg.TimeWindowPartitionsDefinition( - start="2015-01-01T00:00", - cron_schedule="0 0,12 * * *", - fmt="%Y-%m-%dT%H:%M", -) - -singlelevel_fast_vars: list[str] = [ - "ammonium_aerosol_optical_depth_550nm", - "black_carbon_aerosol_optical_depth_550nm", - "dust_aerosol_optical_depth_550nm", - "nitrate_aerosol_optical_depth_550nm", - "organic_matter_aerosol_optical_depth_550nm", - "particulate_matter_10um", - "particulate_matter_1um", - "particulate_matter_2.5um", - "sea_salt_aerosol_optical_depth_550nm", - "secondary_organic_aerosol_optical_depth_550nm", - "sulphate_aerosol_optical_depth_550nm", - "total_aerosol_optical_depth_1240nm", - "total_aerosol_optical_depth_469nm", - "total_aerosol_optical_depth_550nm", - "total_aerosol_optical_depth_670nm", - "total_aerosol_optical_depth_865nm", - "total_column_carbon_monoxide", - "total_column_chlorine_monoxide", - "total_column_chlorine_nitrate", - "total_column_ethane", - "total_column_formaldehyde", - "total_column_hydrogen_chloride", - "total_column_hydrogen_cyanide", - "total_column_hydrogen_peroxide", - "total_column_hydroxyl_radical", - "total_column_isoprene", - "total_column_methane", - "total_column_nitric_acid", - "total_column_nitrogen_dioxide", - "total_column_nitrogen_monoxide", - "total_column_ozone", - "total_column_peroxyacetyl_nitrate", - "total_column_propane", - "total_column_sulphur_dioxide", -] - -multilevel_fast_vars: list[str] = [ - "ammonium_aerosol_mass_mixing_ratio", - "anthropogenic_secondary_organic_aerosol_mass_mixing_ratio", - "biogenic_secondary_organic_aerosol_mass_mixing_ratio", - "carbon_monoxide", - "chlorine_monoxide", - "chlorine_nitrate", - "dust_aerosol_0.03-0.55um_mixing_ratio", - "dust_aerosol_0.55-0.9um_mixing_ratio", - "dust_aerosol_0.9-20um_mixing_ratio", - "ethane", - "formaldehyde", - "hydrogen_chloride", - "hydrogen_cyanide", - "hydrogen_peroxide", - "hydrophilic_black_carbon_aerosol_mixing_ratio", - "hydrophilic_organic_matter_aerosol_mixing_ratio", - "hydrophobic_black_carbon_aerosol_mixing_ratio", - "hydrophobic_organic_matter_aerosol_mixing_ratio", - "hydroxyl_radical", - "isoprene", - "methane", - "nitrate_coarse_mode_aerosol_mass_mixing_ratio", - "nitrate_fine_mode_aerosol_mass_mixing_ratio", - "nitric_acid", - "nitrogen_dioxide", - "nitrogen_monoxide", - "ozone", - "peroxyacetyl_nitrate", - "propane", - "sea_salt_aerosol_0.03-0.5um_mixing_ratio", - "sea_salt_aerosol_0.5-5um_mixing_ratio", - "sea_salt_aerosol_5-20um_mixing_ratio", - "specific_humidity", - "sulphate_aerosol_mixing_ratio", - "sulphur_dioxide", -] - -multilevel_hours: list[str] = [str(x) for x in range(0, 121, 3)] - -singlelevel_hours: list[str] = [str(x) for x in range(0, 121)] - -# It is faster to download all variables in a group than to download them individually -# as then you are queuing fewer requests to the CDS API, for tape variables. -# Each group here have been checked in the ADS app to ensure they do not exceed -# the limit of 10000 items per request, when paired with downloading every step -# and init time for a single day. -singlelevel_slow_var_groups: dict[str, list[str]] = { - "asymmetry_factor_340-2130nm": [ - "asymmetry_factor_340nm", - "asymmetry_factor_355nm", - "asymmetry_factor_380nm", - "asymmetry_factor_400nm", - "asymmetry_factor_440nm", - "asymmetry_factor_469nm", - "asymmetry_factor_500nm", - "asymmetry_factor_532nm", - "asymmetry_factor_550nm", - "asymmetry_factor_645nm", - "asymmetry_factor_670nm", - "asymmetry_factor_800nm", - "asymmetry_factor_858nm", - "asymmetry_factor_865nm", - "asymmetry_factor_1020nm", - "asymmetry_factor_1064nm", - "asymmetry_factor_1240nm", - "asymmetry_factor_1640nm", - "asymmetry_factor_2130nm", - ], - "single_scattering_albedo_340-2130nm": [ - "single_scattering_albedo_340nm", - "single_scattering_albedo_355nm", - "single_scattering_albedo_380nm", - "single_scattering_albedo_400nm", - "single_scattering_albedo_440nm", - "single_scattering_albedo_469nm", - "single_scattering_albedo_500nm", - "single_scattering_albedo_532nm", - "single_scattering_albedo_550nm", - "single_scattering_albedo_645nm", - "single_scattering_albedo_670nm", - "single_scattering_albedo_800nm", - "single_scattering_albedo_858nm", - "single_scattering_albedo_865nm", - "single_scattering_albedo_1020nm", - "single_scattering_albedo_1064nm", - "single_scattering_albedo_1240nm", - "single_scattering_albedo_1640nm", - "single_scattering_albedo_2130nm", - ], - "total_aerosol_optical_depth_340-2130nm": [ - "total_aerosol_optical_depth_340nm", - "total_aerosol_optical_depth_355nm", - "total_aerosol_optical_depth_380nm", - "total_aerosol_optical_depth_400nm", - "total_aerosol_optical_depth_440nm", - "total_aerosol_optical_depth_500nm", - "total_aerosol_optical_depth_532nm", - "total_aerosol_optical_depth_645nm", - "total_aerosol_optical_depth_800nm", - "total_aerosol_optical_depth_858nm", - "total_aerosol_optical_depth_1020nm", - "total_aerosol_optical_depth_1064nm", - "total_aerosol_optical_depth_1640nm", - "total_aerosol_optical_depth_2130nm", - ], - "total_absorption_aerosol_optical_depth_340-2130nm": [ - "total_absorption_aerosol_optical_depth_340nm", - "total_absorption_aerosol_optical_depth_355nm", - "total_absorption_aerosol_optical_depth_380nm", - "total_absorption_aerosol_optical_depth_400nm", - "total_absorption_aerosol_optical_depth_440nm", - "total_absorption_aerosol_optical_depth_469nm", - "total_absorption_aerosol_optical_depth_500nm", - "total_absorption_aerosol_optical_depth_532nm", - "total_absorption_aerosol_optical_depth_550nm", - "total_absorption_aerosol_optical_depth_645nm", - "total_absorption_aerosol_optical_depth_670nm", - "total_absorption_aerosol_optical_depth_800nm", - "total_absorption_aerosol_optical_depth_858nm", - "total_absorption_aerosol_optical_depth_865nm", - "total_absorption_aerosol_optical_depth_1020nm", - "total_absorption_aerosol_optical_depth_1064nm", - "total_absorption_aerosol_optical_depth_1240nm", - "total_absorption_aerosol_optical_depth_1640nm", - "total_absorption_aerosol_optical_depth_2130nm", - ], - "total_fine_mode_aerosol_optical_depth_340-2130nm": [ - "total_fine_mode_aerosol_optical_depth_340nm", - "total_fine_mode_aerosol_optical_depth_355nm", - "total_fine_mode_aerosol_optical_depth_380nm", - "total_fine_mode_aerosol_optical_depth_400nm", - "total_fine_mode_aerosol_optical_depth_440nm", - "total_fine_mode_aerosol_optical_depth_469nm", - "total_fine_mode_aerosol_optical_depth_500nm", - "total_fine_mode_aerosol_optical_depth_532nm", - "total_fine_mode_aerosol_optical_depth_550nm", - "total_fine_mode_aerosol_optical_depth_645nm", - "total_fine_mode_aerosol_optical_depth_670nm", - "total_fine_mode_aerosol_optical_depth_800nm", - "total_fine_mode_aerosol_optical_depth_858nm", - "total_fine_mode_aerosol_optical_depth_865nm", - "total_fine_mode_aerosol_optical_depth_1020nm", - "total_fine_mode_aerosol_optical_depth_1064nm", - "total_fine_mode_aerosol_optical_depth_1240nm", - "total_fine_mode_aerosol_optical_depth_1640nm", - "total_fine_mode_aerosol_optical_depth_2130nm", - ], - "dust_aerosol_optical_depth_550nm_0.04-20um": [ - "dust_aerosol_0.03-0.55um_optical_depth_550nm", - "dust_aerosol_0.55-9um_optical_depth_550nm", - "dust_aerosol_9-20um_optical_depth_550nm", - ], - "sea_salt_aerosol_optical_depth_550nm_0.03-20um": [ - "sea_salt_aerosol_0.03-0.5um_optical_depth_550nm", - "sea_salt_aerosol_0.5-5um_optical_depth_550nm", - "sea_salt_aerosol_5-20um_optical_depth_550nm", - ], - "nitrate_aerosol_optical_depth_550nm_coarse-fine": [ - "nitrate_coarse_mode_aerosol_optical_depth_550nm", - "nitrate_fine_mode_aerosol_optical_depth_550nm", - ], - "hydrophilic_aerosol_optical_depth_550nm_bc-om": [ - "hydrophilic_black_carbon_aerosol_optical_depth_550nm", - "hydrophilic_organic_matter_aerosol_optical_depth_550nm", - ], - "hydrophobic_aerosol_optical_depth_550nm_bc-om": [ - "hydrophobic_black_carbon_aerosol_optical_depth_550nm", - "hydrophobic_organic_matter_aerosol_optical_depth_550nm", - ], -} - -singlelevel_slow_var_groups_subset: dict[str, list[str]] = { - "total_aerosol_optical_depth_400-645nm": [ - "total_aerosol_optical_depth_400nm", - "total_aerosol_optical_depth_440nm", - "total_aerosol_optical_depth_500nm", - "total_aerosol_optical_depth_532nm", - "total_aerosol_optical_depth_645nm", - ], -} - -# Due to pulling every pressure level, these need to be pulled one at a time -# to avoid exceeding the 10000 item limit per request. -multilevel_slow_vars: list[str] = [ - "aerosol_extinction_coefficient_1064nm", - "aerosol_extinction_coefficient_355nm", - "aerosol_extinction_coefficient_532nm", - "attenuated_backscatter_due_to_aerosol_1064nm_from_ground", - "attenuated_backscatter_due_to_aerosol_1064nm_from_top_of_atmosphere", - "attenuated_backscatter_due_to_aerosol_355nm_from_ground", - "attenuated_backscatter_due_to_aerosol_355nm_from_top_of_atmosphere", - "attenuated_backscatter_due_to_aerosol_532nm_from_ground", - "attenuated_backscatter_due_to_aerosol_532nm_from_top_of_atmosphere", -] - -multilevel_slow_vars_subset = ["aerosol_extinction_coefficient_532nm"] - -multilevel_levels: list[str] = [ - "1", - "2", - "3", - "5", - "7", - "10", - "20", - "30", - "50", - "70", - "100", - "150", - "200", - "250", - "300", - "400", - "500", - "600", - "700", - "800", - "850", - "900", - "925", - "950", - "1000", -] - -opts: MakeDefinitionsOptions = MakeDefinitionsOptions( - area="global", - file_format="grib", - multilevel_vars=VariableSelection( - slow={d: [d] for d in multilevel_slow_vars_subset}, - fast={d: [d] for d in multilevel_fast_vars}, - hours=multilevel_hours, - ), - multilevel_levels=multilevel_levels, - singlelevel_vars=VariableSelection( - slow=singlelevel_slow_var_groups_subset, - fast={d: [d] for d in singlelevel_fast_vars}, - hours=singlelevel_hours, - ), - partitions=cams_global_partitions, - client=Client(), -) - -defs: MakeDefinitionsOutputs = make_definitions(opts=opts) - -cams_global_raw_archive = defs.raw_asset diff --git a/local_archives/nwp/ceda/__init__.py b/local_archives/nwp/ceda/__init__.py deleted file mode 100644 index a7926e7..0000000 --- a/local_archives/nwp/ceda/__init__.py +++ /dev/null @@ -1,15 +0,0 @@ -import dagster as dg - -from . import ceda_uk, ceda_global - -uk_assets = dg.load_assets_from_modules( - modules=[ceda_uk], - group_name="ceda_uk", -) - -global_assets = dg.load_assets_from_modules( - modules=[ceda_global], - group_name="ceda_global", -) - -all_assets: list[dg.AssetsDefinition] = [*uk_assets, *global_assets] diff --git a/local_archives/nwp/ceda/ceda_uk.py b/local_archives/nwp/ceda/ceda_uk.py deleted file mode 100644 index 7a2cf60..0000000 --- a/local_archives/nwp/ceda/ceda_uk.py +++ /dev/null @@ -1,36 +0,0 @@ -"""CEDA UK data pipeline.""" -import os - -import dagster as dg -from nwp_consumer.internal import FetcherInterface -from nwp_consumer.internal.inputs import ceda - -from local_archives.nwp._generic_definitions_factory import ( - MakeDefinitionsOptions, - MakeDefinitionsOutputs, - make_definitions, -) - -fetcher: FetcherInterface = ceda.Client( - ftpUsername=os.getenv("CEDA_FTP_USER", "not-set"), - ftpPassword=os.getenv("CEDA_FTP_PASS", "not-set"), -) - -partitions: dg.TimeWindowPartitionsDefinition = dg.TimeWindowPartitionsDefinition( - start="2017-01-01T00:00", - cron_schedule="0 0/3 * * *", # Every 3 hours - fmt="%Y-%m-%dT%H:%M", - end_offset=-(8 * 8), # CEDA only available 8 days back (8 partitions per day) -) - -defs: MakeDefinitionsOutputs = make_definitions( - opts=MakeDefinitionsOptions( - area="uk", - source="ceda", - fetcher=fetcher, - partitions=partitions, - ), -) - -ceda_uk_raw_archive = defs.raw_asset -ceda_uk_zarr_archive = defs.zarr_asset diff --git a/local_archives/nwp/ceda/ceda_global.py b/local_archives/nwp/ceda_mo_um_global.py similarity index 58% rename from local_archives/nwp/ceda/ceda_global.py rename to local_archives/nwp/ceda_mo_um_global.py index c541418..3e557e5 100644 --- a/local_archives/nwp/ceda/ceda_global.py +++ b/local_archives/nwp/ceda_mo_um_global.py @@ -1,4 +1,4 @@ -"""Zarr archive of NWP data from the Met Office's Global model. +"""Zarr archive of NWP data from the Met Office's Unified Model in the global configuration. The MetOffice runs it's Unified Model (UM) in two configurations: Global, and UK. This asset contains data from the global configuration covering the whole globe. @@ -6,7 +6,7 @@ Sourced via FTP from CEDA (https://catalogue.ceda.ac.uk/uuid/86df725b793b4b4cb0ca0646686bd783). This asset is updated monthly, and surfaced as a Zarr Directory Store for each month. It is downloaded using the nwp-consumer docker image -(https://github.com/openclimatefix/nwp-consumer) +(https://github.com/openclimatefix/nwp-consumer). """ import datetime as dt @@ -16,56 +16,55 @@ import dagster as dg from dagster_docker import PipesDockerClient -from constants import LOCATIONS_BY_ENVIRONMENT +ARCHIVE_FOLDER = "/var/dagster-storage/nwp/ceda-mo-um-global" +if os.getenv("ENVIRONMENT", "local") == "leo": + ARCHIVE_FOLDER = "/mnt/storage_b/nwp/ceda-mo-um-global" -env = os.getenv("ENVIRONMENT", "local") -ZARR_FOLDER = LOCATIONS_BY_ENVIRONMENT[env].NWP_ZARR_FOLDER +partitions_def: dg.TimeWindowPartitionsDefinition = dg.MonthlyPartitionsDefinition( + start_date="2019-01-01", + end_offset=-3, +) @dg.asset( - name="zarr_archive", + name="ceda-mo-um-global", description=__doc__, - key_prefix=["nwp", "ceda", "global"], metadata={ - "archive_folder": dg.MetadataValue.text(f"{ZARR_FOLDER}/nwp/ceda/global"), + "archive_folder": dg.MetadataValue.text(ARCHIVE_FOLDER), "area": dg.MetadataValue.text("global"), "source": dg.MetadataValue.text("ceda"), + "model": dg.MetadataValue.text("mo-um"), "expected_runtime": dg.MetadataValue.text("6 hours"), }, compute_kind="docker", - automation_condition=dg.AutomationCondition.eager(), + automation_condition=dg.AutomationCondition.on_cron( + cron_schedule=partitions_def.get_cron_schedule( + hour_of_day=5, + ), + ), tags={ "dagster/max_runtime": str(60 * 60 * 10), # Should take 6 ish hours "dagster/priority": "1", - "dagster/concurrency_key": "ceda-ftp-consumer", + "dagster/concurrency_key": "nwp-consumer", }, - partitions_def=dg.MonthlyPartitionsDefinition( - start_date="2019-01-01", - end_offset=-3, - ), ) -def ceda_global( +def ceda_mo_um_global_asset( context: dg.AssetExecutionContext, pipes_docker_client: PipesDockerClient, ) -> Any: - image: str = "ghcr.io/openclimatefix/nwp-consumer:devsjc-major-refactor" it: dt.datetime = context.partition_time_window.start return pipes_docker_client.run( - image=image, - command=[ - "archive", - "-y", - str(it.year), - "-m", - str(it.month), - ], + image="ghcr.io/openclimatefix/nwp-consumer:1.0.12", + command=["archive", "-y", str(it.year), "-m", str(it.month)], env={ "NWP_CONSUMER_MODEL_REPOSITORY": "ceda-metoffice-global", "NWP_CONSUMER_NOTIFICATION_REPOSITORY": "dagster-pipes", "CEDA_FTP_USER": os.environ["CEDA_FTP_USER"], "CEDA_FTP_PASS": os.environ["CEDA_FTP_PASS"], + "CONCURRENCY": "false", }, container_kwargs={ - "volumes": [f"{ZARR_FOLDER}/nwp/ceda/global:/work"], + "volumes": [f"{ARCHIVE_FOLDER}:/work"], }, context=context, ).get_results() + diff --git a/local_archives/nwp/ecmwf/__init__.py b/local_archives/nwp/ecmwf/__init__.py deleted file mode 100644 index aab2a92..0000000 --- a/local_archives/nwp/ecmwf/__init__.py +++ /dev/null @@ -1,43 +0,0 @@ -import dagster as dg - -from . import ( - ecmwf_malta, - ecmwf_nw_india, - ecmwf_uk, - ecmwf_india, - ecmwf_ens_stat_india, -) - -uk_assets = dg.load_assets_from_modules( - modules=[ecmwf_uk], - group_name="ecmwf_uk", -) - - -nw_india_assets = dg.load_assets_from_modules( - modules=[ecmwf_nw_india], - group_name="ecmwf_nw_india", -) - -malta_assets = dg.load_assets_from_modules( - modules=[ecmwf_malta], - group_name="ecmwf_malta", -) - -india_assets = dg.load_assets_from_modules( - modules=[ecmwf_india], - group_name="ecmwf_india", -) - -india_stat_assets = dg.load_assets_from_modules( - modules=[ecmwf_ens_stat_india], - group_name="ecmwf_ens_india_stat", -) - -all_assets: list[dg.AssetsDefinition] = [ - *uk_assets, - *nw_india_assets, - *malta_assets, - *india_assets, - *india_stat_assets, -] diff --git a/local_archives/nwp/ecmwf/ecmwf_india.py b/local_archives/nwp/ecmwf/ecmwf_india.py deleted file mode 100644 index 1829867..0000000 --- a/local_archives/nwp/ecmwf/ecmwf_india.py +++ /dev/null @@ -1,34 +0,0 @@ -"""ECMWF India data pipeline.""" -import dagster as dg -from nwp_consumer.internal import FetcherInterface -from nwp_consumer.internal.inputs.ecmwf import mars - -from local_archives.nwp._generic_definitions_factory import ( - MakeDefinitionsOptions, - MakeDefinitionsOutputs, - make_definitions, -) - -fetcher: FetcherInterface = mars.MARSClient( - area="india", - hours=55, -) - -partitions: dg.TimeWindowPartitionsDefinition = dg.TimeWindowPartitionsDefinition( - start="2020-01-01T00:00", - cron_schedule="0 0,12 * * *", # 00:00 and 12:00 - fmt="%Y-%m-%dT%H:%M", - end_offset=-(4 * 2), # ECMWF only available 4 days back (2 partitions per day) -) - -defs: MakeDefinitionsOutputs = make_definitions( - opts=MakeDefinitionsOptions( - area="india", - source="ecmwf", - partitions=partitions, - fetcher=fetcher, - ), -) - -ecmwf_india_raw_archive = defs.raw_asset -ecmwf_india_zarr_archive = defs.zarr_asset diff --git a/local_archives/nwp/ecmwf/ecmwf_malta.py b/local_archives/nwp/ecmwf/ecmwf_malta.py deleted file mode 100644 index cfc929b..0000000 --- a/local_archives/nwp/ecmwf/ecmwf_malta.py +++ /dev/null @@ -1,34 +0,0 @@ -"""ECMWF Malta data pipeline.""" -import dagster as dg -from nwp_consumer.internal import FetcherInterface -from nwp_consumer.internal.inputs.ecmwf import mars - -from local_archives.nwp._generic_definitions_factory import ( - MakeDefinitionsOptions, - MakeDefinitionsOutputs, - make_definitions, -) - -fetcher: FetcherInterface = mars.MARSClient( - area="malta", - hours=84, -) - -partitions: dg.TimeWindowPartitionsDefinition = dg.TimeWindowPartitionsDefinition( - start="2017-01-01T00:00", - cron_schedule="0 0,12 * * *", # 00:00 and 12:00 - fmt="%Y-%m-%dT%H:%M", - end_offset=-(3 * 2), # ECMWF only available 3 days back (2 partitions per day) -) - -defs: MakeDefinitionsOutputs = make_definitions( - opts=MakeDefinitionsOptions( - area="malta", - source="ecmwf", - partitions=partitions, - fetcher=fetcher, - ), -) - -ecmwf_malta_raw_archive = defs.raw_asset -ecmwf_malta_zarr_archive = defs.zarr_asset diff --git a/local_archives/nwp/ecmwf/ecmwf_nw_india.py b/local_archives/nwp/ecmwf/ecmwf_nw_india.py deleted file mode 100644 index ef3414c..0000000 --- a/local_archives/nwp/ecmwf/ecmwf_nw_india.py +++ /dev/null @@ -1,34 +0,0 @@ -"""ECMWF NW India data pipeline.""" -import dagster as dg -from nwp_consumer.internal import FetcherInterface -from nwp_consumer.internal.inputs.ecmwf import mars - -from local_archives.nwp._generic_definitions_factory import ( - MakeDefinitionsOptions, - MakeDefinitionsOutputs, - make_definitions, -) - -fetcher: FetcherInterface = mars.MARSClient( - area="nw-india", - hours=192, -) - -partitions: dg.TimeWindowPartitionsDefinition = dg.TimeWindowPartitionsDefinition( - start="2017-01-01T00:00", - cron_schedule="0 0,12 * * *", # 00:00 and 12:00 - fmt="%Y-%m-%dT%H:%M", - end_offset=-(3 * 2), # ECMWF only available 3 days back (2 partitions per day) -) - -defs: MakeDefinitionsOutputs = make_definitions( - opts=MakeDefinitionsOptions( - area="nw_india", - source="ecmwf", - partitions=partitions, - fetcher=fetcher, - ), -) - -ecmwf_nw_india_raw_archive = defs.raw_asset -ecmwf_nw_india_zarr_archive = defs.zarr_asset diff --git a/local_archives/nwp/ecmwf/ecmwf_uk.py b/local_archives/nwp/ecmwf/ecmwf_uk.py deleted file mode 100644 index 93ff079..0000000 --- a/local_archives/nwp/ecmwf/ecmwf_uk.py +++ /dev/null @@ -1,35 +0,0 @@ -"""ECMWF UK data pipeline.""" - -import dagster as dg -from nwp_consumer.internal import FetcherInterface -from nwp_consumer.internal.inputs.ecmwf import mars - -from local_archives.nwp._generic_definitions_factory import ( - MakeDefinitionsOptions, - MakeDefinitionsOutputs, - make_definitions, -) - -fetcher: FetcherInterface = mars.MARSClient( - area="uk", - hours=84, -) - -partitions: dg.TimeWindowPartitionsDefinition = dg.TimeWindowPartitionsDefinition( - start="2017-01-01T00:00", - cron_schedule="0 0,12 * * *", # 00:00 and 12:00 - fmt="%Y-%m-%dT%H:%M", - end_offset=-(3 * 2), # ECMWF only available 3 days back (2 partitions per day) -) - -defs: MakeDefinitionsOutputs = make_definitions( - opts=MakeDefinitionsOptions( - area="uk", - source="ecmwf", - partitions=partitions, - fetcher=fetcher, - ), -) - -ecmwf_uk_raw_archive = defs.raw_asset -ecmwf_uk_zarr_archive = defs.zarr_asset diff --git a/local_archives/nwp/ecmwf/ecmwf_ens_stat_india.py b/local_archives/nwp/ecmwf_ens_stat_india.py similarity index 59% rename from local_archives/nwp/ecmwf/ecmwf_ens_stat_india.py rename to local_archives/nwp/ecmwf_ens_stat_india.py index b600ac4..1e38fef 100644 --- a/local_archives/nwp/ecmwf/ecmwf_ens_stat_india.py +++ b/local_archives/nwp/ecmwf_ens_stat_india.py @@ -1,6 +1,6 @@ -"""Zarr archive of Summary NWP data from ECMWF's EPS. +"""Zarr archive of Summary NWP data from ECMWF's ENS, covering India. -EPS is the ECMWF Ensemble Prediction System, +ENS (sometimes EPS) is the ECMWF Ensemble Prediction System, which provides 50 perturbed forecasts of upcoming atmospheric conditions. This asset contains summary statistics of this data (mean, standard deviation) for India. @@ -17,59 +17,59 @@ import dagster as dg from dagster_docker import PipesDockerClient -from constants import LOCATIONS_BY_ENVIRONMENT +ARCHIVE_FOLDER = "/var/dagster-storage/nwp/ecmwf-ens-stat-india" +if os.getenv("ENVIRONMENT", "local") == "leo": + ARCHIVE_FOLDER = "/mnt/storage_b/nwp/ecmwf-ens-stat-india" -env = os.getenv("ENVIRONMENT", "local") -ZARR_FOLDER = LOCATIONS_BY_ENVIRONMENT[env].NWP_ZARR_FOLDER -ARCHIVE_FOLDER = f"{ZARR_FOLDER}/nwp/ecmwf-eps/india-stat" +partitions_def: dg.TimeWindowPartitionsDefinition = dg.MonthlyPartitionsDefinition( + start_date="2020-01-01", + end_offset=-3, +) @dg.asset( - name="zarr_archive", + name="ecmwf-ens-stat-india", description=__doc__, - key_prefix=["nwp", "ecmwf-eps", "india-stat"], metadata={ "archive_folder": dg.MetadataValue.text(ARCHIVE_FOLDER), - "area": dg.MetadataValue.text("global"), + "area": dg.MetadataValue.text("india"), "source": dg.MetadataValue.text("ecmwf-mars"), + "model": dg.MetadataValue.text("ens-stat"), "expected_runtime": dg.MetadataValue.text("6 hours"), }, compute_kind="docker", - automation_condition=dg.AutomationCondition.eager(), + automation_condition=dg.AutomationCondition.on_cron( + cron_schedule=partitions_def.get_cron_schedule( + hour_of_day=6, + day_of_week=1, + ), + ), tags={ "dagster/max_runtime": str(60 * 60 * 10), # Should take 6 ish hours "dagster/priority": "1", - "dagster/concurrency_key": "ecmwf-mars-consumer", + "dagster/concurrency_key": "nwp-consumer", }, - partitions_def=dg.MonthlyPartitionsDefinition( - start_date="2020-01-01", - end_offset=-3, - ), + partitions_def=partitions_def, ) -def ecmwf_eps_india_stat( +def ecmwf_ens_stat_india_asset( context: dg.AssetExecutionContext, pipes_docker_client: PipesDockerClient, ) -> Any: - image: str = "ghcr.io/openclimatefix/nwp-consumer:1.0.5" it: dt.datetime = context.partition_time_window.start return pipes_docker_client.run( - image=image, - command=[ - "archive", - "-y", - str(it.year), - "-m", - str(it.month), - ], + image="ghcr.io/openclimatefix/nwp-consumer:1.0.12", + command=["archive", "-y", str(it.year), "-m", str(it.month)], env={ "MODEL_REPOSITORY": "ecmwf-mars", + "MODEL": "ens-stat-india", "NOTIFICATION_REPOSITORY": "dagster-pipes", "ECMWF_API_KEY": os.environ["ECMWF_API_KEY"], "ECMWF_API_EMAIL": os.environ["ECMWF_API_EMAIL"], "ECMWF_API_URL": os.environ["ECMWF_API_URL"], - "ECMWF_MARS_AREA": "35/67/6/97", + "CONCURRENCY": "false", }, container_kwargs={ "volumes": [f"{ARCHIVE_FOLDER}:/work"], }, context=context, ).get_results() + diff --git a/local_archives/nwp/ecmwf_hres_ifs_india.py b/local_archives/nwp/ecmwf_hres_ifs_india.py new file mode 100644 index 0000000..dae66c7 --- /dev/null +++ b/local_archives/nwp/ecmwf_hres_ifs_india.py @@ -0,0 +1,74 @@ +"""Zarr archive of NWP data from ECMWF's IFS model, covering India. + +IFS is the Integrated Forecasting System, which uses a global numerical model +of earth to produce deterministic forecasts of upcoming atmospheric conditions. + +Sourced via MARS API from ECMWF (https://apps.ecmwf.int/mars-catalogue). +This asset is updated monthly, and surfaced as a Zarr Directory Store for each month. +It is downloaded using the nwp-consumer docker image +(https://github.com/openclimatefix/nwp-consumer). +""" + +import datetime as dt +import os +from typing import Any + +import dagster as dg +from dagster_docker import PipesDockerClient + +ARCHIVE_FOLDER = "/var/dagster-storage/nwp/ecmwf-hres-ifs-india" +if os.getenv("ENVIRONMENT", "local") == "leo": + ARCHIVE_FOLDER = f"/mnt/storage_b/nwp/ecmwf-hres-ifs-india" + +partitions_def: dg.TimeWindowPartitionsDefinition = dg.MonthlyPartitionsDefinition( + start_date="2017-01-01", + end_offset=-1, +) + +@dg.asset( + name="ecmwf-hres-ifs-india", + description=__doc__, + metadata={ + "archive_folder": dg.MetadataValue.text(ARCHIVE_FOLDER), + "area": dg.MetadataValue.text("india"), + "source": dg.MetadataValue.text("ecmwf-mars"), + "model": dg.MetadataValue.text("hres-ifs"), + "expected_runtime": dg.MetadataValue.text("6 hours"), + }, + compute_kind="docker", + automation_condition=dg.AutomationCondition.on_cron( + cron_schedule=partitions_def.get_cron_schedule( + hour_of_day=3, + day_of_week=0, + ), + ), + tags={ + "dagster/max_runtime": str(60 * 60 * 10), # Should take 6 ish hours + "dagster/priority": "1", + "dagster/concurrency_key": "nwp-consumer", + }, + partitions_def=partitions_def, +) +def ecmwf_hres_ifs_india_asset( + context: dg.AssetExecutionContext, + pipes_docker_client: PipesDockerClient, +) -> Any: + it: dt.datetime = context.partition_time_window.start + return pipes_docker_client.run( + image="ghcr.io/openclimatefix/nwp-consumer:1.0.12", + command=["archive", "-y", str(it.year), "-m", str(it.month)], + env={ + "MODEL_REPOSITORY": "ecmwf-mars", + "MODEL": "hres-ifs-india", + "NOTIFICATION_REPOSITORY": "dagster-pipes", + "ECMWF_API_KEY": os.environ["ECMWF_API_KEY"], + "ECMWF_API_EMAIL": os.environ["ECMWF_API_EMAIL"], + "ECMWF_API_URL": os.environ["ECMWF_API_URL"], + "CONCURRENCY": "false", + }, + container_kwargs={ + "volumes": [f"{ARCHIVE_FOLDER}:/work"], + }, + context=context, + ).get_results() + diff --git a/local_archives/nwp/ecmwf_hres_ifs_west_europe.py b/local_archives/nwp/ecmwf_hres_ifs_west_europe.py new file mode 100644 index 0000000..4d6e53b --- /dev/null +++ b/local_archives/nwp/ecmwf_hres_ifs_west_europe.py @@ -0,0 +1,74 @@ +"""Zarr archive of NWP data from ECMWF's IFS model, covering Western Europe. + +IFS is the Integrated Forecasting System, which uses a global numerical model +of earth to produce deterministic forecasts of upcoming atmospheric conditions. + +Sourced via MARS API from ECMWF (https://apps.ecmwf.int/mars-catalogue). +This asset is updated monthly, and surfaced as a Zarr Directory Store for each month. +It is downloaded using the nwp-consumer docker image +(https://github.com/openclimatefix/nwp-consumer). +""" + +import datetime as dt +import os +from typing import Any + +import dagster as dg +from dagster_docker import PipesDockerClient + +ARCHIVE_FOLDER = "/var/dagster-storage/nwp/ecmwf-hres-ifs-west-europe" +if os.getenv("ENVIRONMENT", "local") == "leo": + ARCHIVE_FOLDER = f"/mnt/storage_b/nwp/ecmwf-hres-ifs-west-europe" + +partitions_def: dg.TimeWindowPartitionsDefinition = dg.MonthlyPartitionsDefinition( + start_date="2017-01-01", + end_offset=-1, +) + +@dg.asset( + name="ecmwf-hres-ifs-west-europe", + description=__doc__, + metadata={ + "archive_folder": dg.MetadataValue.text(ARCHIVE_FOLDER), + "area": dg.MetadataValue.text("west-europe"), + "source": dg.MetadataValue.text("ecmwf-mars"), + "model": dg.MetadataValue.text("hres-ifs"), + "expected_runtime": dg.MetadataValue.text("6 hours"), + }, + compute_kind="docker", + automation_condition=dg.AutomationCondition.on_cron( + cron_schedule=partitions_def.get_cron_schedule( + hour_of_day=18, + day_of_week=0, + ), + ), + tags={ + "dagster/max_runtime": str(60 * 60 * 10), # Should take 6 ish hours + "dagster/priority": "1", + "dagster/concurrency_key": "nwp-consumer", + }, + partitions_def=partitions_def, +) +def ecmwf_hres_ifs_west_europe_asset( + context: dg.AssetExecutionContext, + pipes_docker_client: PipesDockerClient, +) -> Any: + it: dt.datetime = context.partition_time_window.start + return pipes_docker_client.run( + image="ghcr.io/openclimatefix/nwp-consumer:1.0.12", + command=["archive", "-y", str(it.year), "-m", str(it.month)], + env={ + "MODEL_REPOSITORY": "ecmwf-mars", + "MODEL": "hres-ifs-west-europe", + "NOTIFICATION_REPOSITORY": "dagster-pipes", + "ECMWF_API_KEY": os.environ["ECMWF_API_KEY"], + "ECMWF_API_EMAIL": os.environ["ECMWF_API_EMAIL"], + "ECMWF_API_URL": os.environ["ECMWF_API_URL"], + "CONCURRENCY": "false", + }, + container_kwargs={ + "volumes": [f"{ARCHIVE_FOLDER}:/work"], + }, + context=context, + ).get_results() + diff --git a/local_archives/nwp/gfs/__init__.py b/local_archives/nwp/gfs/__init__.py deleted file mode 100644 index 5a4430c..0000000 --- a/local_archives/nwp/gfs/__init__.py +++ /dev/null @@ -1,8 +0,0 @@ -import dagster as dg - -from . import gfs - -all_assets: list[dg.AssetsDefinition] = dg.load_assets_from_modules( - modules=[gfs], - group_name="gfs_global", -) diff --git a/local_archives/nwp/gfs/gfs.py b/local_archives/nwp/gfs/gfs.py deleted file mode 100644 index 10f8e3b..0000000 --- a/local_archives/nwp/gfs/gfs.py +++ /dev/null @@ -1,47 +0,0 @@ -import datetime as dt -import os - -import dagster as dg - -from constants import LOCATIONS_BY_ENVIRONMENT -from containers.gfs import download_combine_gfs - -env = os.getenv("ENVIRONMENT", "local") -ZARR_FOLDER = LOCATIONS_BY_ENVIRONMENT[env].NWP_ZARR_FOLDER - -@dg.asset( - name="zarr_daily_archive", - description="Daily archive of GFS global NWP data", - key_prefix=["nwp", "gfs", "global"], - automation_condition=dg.AutomationCondition.eager(), - partitions_def=dg.DailyPartitionsDefinition( - start_date="2015-01-15", - end_offset=-2, - ), - metadata={ - "archive_folder": dg.MetadataValue.text(f"{ZARR_FOLDER}/nwp/gfs/global"), - "area": dg.MetadataValue.text("global"), - "source": dg.MetadataValue.text("gfs"), - }, -) -def zarr_archive( - context: dg.AssetExecutionContext, -) -> dg.Output: - start: dt.datetime.now(tz=dt.UTC) - outfile: str = download_combine_gfs.run( - path=ZARR_FOLDER + "/nwp/gfs/global", - date=context.partition_time_window.start, - config=download_combine_gfs.DEFAULT_CONFIG, - ) - end: dt.datetime.now(tz=dt.UTC) - return dg.Output( - value=outfile, - metadata={ - "archive_folder": dg.MetadataValue.text(f"{ZARR_FOLDER}/nwp/gfs/global"), - "area": dg.MetadataValue.text("global"), - "source": dg.MetadataValue.text("gfs"), - "partition_elapsed_time_minutes": dg.MetadataValue.int( - (end - start).total_seconds() // 60, - ), - }, - ) diff --git a/local_archives/nwp/jobs.py b/local_archives/nwp/jobs.py deleted file mode 100644 index ccc6943..0000000 --- a/local_archives/nwp/jobs.py +++ /dev/null @@ -1,241 +0,0 @@ -"""Defines the jobs for the ECMWF data pipeline.""" -import datetime as dt -import os -import pathlib - -import dagster as dg -import ocf_blosc2 # noqa -import xarray as xr -from nwp_consumer.internal import ( - IT_FOLDER_STRUCTURE_RAW, - IT_FOLDER_GLOBSTR_RAW, -) - -from constants import LOCATIONS_BY_ENVIRONMENT - -env = os.getenv("ENVIRONMENT", "local") -RAW_FOLDER = LOCATIONS_BY_ENVIRONMENT[env].RAW_FOLDER -ZARR_FOLDER = LOCATIONS_BY_ENVIRONMENT[env].NWP_ZARR_FOLDER - - -class ValidateExistingFilesConfig(dg.Config): - """Config schema for the validate_existing_files job.""" - - base_path: str - source: str - area: str - asset_name: str - - def check(self) -> None: - """Check that the source and area are valid.""" - if self.area not in ["global", "eu", "uk", "nw_india", "malta"]: - raise ValueError(f"Area {self.area} not recognised.") - - if self.source not in ["ecmwf", "icon", "ceda", "cams"]: - raise ValueError(f"Source {self.source} not recognised.") - - if self.archive_path().exists() is False: - raise FileNotFoundError( - f"Could not find archive folder {self.archive_path().as_posix()}", - ) - - def archive_path(self) -> pathlib.Path: - """Return the base path of the archive.""" - return pathlib.Path(self.base_path) / "nwp" / self.source / self.area - - -@dg.op -def validate_existing_raw_files( - context: dg.OpExecutionContext, - config: ValidateExistingFilesConfig, -) -> None: - """Checks for existing raw files. - - The folder structure of the raw files is assumed to follw the convention - from the nwp-consumer library. That is to say, the files are stored in - folders named after the inittime, which are in turn stored in folders - named after the area and source. See README.md for more details. - """ - config.check() - - total_archive_size_bytes: int = 0 - for it_folder in [f for f in config.archive_path().glob(IT_FOLDER_GLOBSTR_RAW) if f.suffix == ""]: - # Parse the folder as an inittime: - try: - it = dt.datetime.strptime( - it_folder.relative_to(config.archive_path()).as_posix(), - IT_FOLDER_STRUCTURE_RAW, - ).replace(tzinfo=dt.UTC) - except ValueError: - continue - - # For every file in the inittime folder with the correct extension, - # create an AssetObservation for the relevant partition - sizes: list[int] = [] - it_filepaths: list[pathlib.Path] = [] - for file in list(it_folder.glob("*.nc")) + list(it_folder.glob("*.grib")): - it_filepaths.append(file) - sizes.append(file.stat().st_size) - - total_archive_size_bytes += sum(sizes) - - if len(it_filepaths) > 0: - context.log_event( - dg.AssetObservation( - asset_key=["nwp", config.source, config.area, config.asset_name], - partition=it.strftime("%Y-%m-%d|%H:%M"), - metadata={ - "inittime": dg.MetadataValue.text( - it.strftime("%Y-%m-%d|%H:%M"), - ), - "num_files": dg.MetadataValue.int( - len(it_filepaths), - ), - "file_paths": dg.MetadataValue.text( - str([f.as_posix() for f in it_filepaths]), - ), - "partition_size": dg.MetadataValue.int( - sum(sizes), - ), - "area": dg.MetadataValue.text(config.area), - "last_checked": dg.MetadataValue.text( - dt.datetime.now(tz=dt.UTC).isoformat(), - ), - }, - ), - ) - - context.log_event( - dg.AssetObservation( - asset_key=["nwp", config.source, config.area, config.asset_name], - metadata={ - "archive_folder": dg.MetadataValue.text(config.archive_path().as_posix()), - "area": dg.MetadataValue.text(config.area), - "total_archive_size_gb": dg.MetadataValue.float(total_archive_size_bytes / 1e9), - "last_scan": dg.MetadataValue.text(dt.datetime.now(tz=dt.UTC).isoformat()), - }, - ), - ) - - -@dg.op -def validate_existing_zarr_files( - context: dg.OpExecutionContext, - config: ValidateExistingFilesConfig, -) -> None: - """Checks for existing zarr files.""" - config.check() - - total_archive_size_bytes: int = 0 - for file in config.archive_path().glob("*.zarr.zip"): - # Try to parse the init time from the filename - try: - it = dt.datetime.strptime( - file.name, - "%Y%m%dT%H%M.zarr.zip", - ).replace(tzinfo=dt.UTC) - except ValueError: - continue - - total_archive_size_bytes += file.stat().st_size - - ds = xr.open_zarr("zip::" + file.as_posix()) - - # Create an AssetObservation for the relevant partition - context.log_event( - dg.AssetObservation( - asset_key=["nwp", config.source, config.area, config.asset_name], - partition=it.strftime("%Y-%m-%d|%H:%M"), - metadata={ - "inittime": dg.MetadataValue.text(it.strftime("%Y-%m-%d|%H:%M")), - "dataset": dg.MetadataValue.md(str(ds)), - }, - ), - ) - - context.log_event( - dg.AssetObservation( - asset_key=["nwp", config.source, config.area, config.asset_name], - metadata={ - "archive_folder": dg.MetadataValue.text(config.archive_path().as_posix()), - "area": dg.MetadataValue.text(config.area), - "total_archive_size_gb": dg.MetadataValue.float(total_archive_size_bytes / 1e9), - "last_scan": dg.MetadataValue.text(dt.datetime.now(tz=dt.UTC).isoformat()), - }, - ), - ) - - return None - - -@dg.job( - name="scan_nwp_raw_archive", - config=dg.RunConfig( - ops={ - validate_existing_raw_files.__name__: ValidateExistingFilesConfig( - base_path=RAW_FOLDER, - source="ecmwf", - area="uk", - asset_name="raw_archive", - ), - }, - ), -) -def scan_nwp_raw_archive() -> None: - """Scan the raw NWP archive for existing files. - - This assumes a folder structure as follows: - >>> {base_path}/nwp/{source}/{area}/{YYYY}/{MM}/{DD}/{HHMM}/{file} - - where the time values pertain to the init time. - The values `nwp`, `source``` and `area` - are taken from the asset key. - """ - validate_existing_raw_files() - - -@dg.job( - name="scan_nwp_zarr_archive", - config=dg.RunConfig( - ops={ - validate_existing_zarr_files.__name__: ValidateExistingFilesConfig( - base_path=ZARR_FOLDER, - source="ecmwf", - area="uk", - asset_name="zarr_archive", - ), - }, - ), -) -def scan_nwp_zarr_archive() -> None: - """Scan the zarr NWP archive for existing files. - - This assumes a folder structure as follows: - >>> {base_path}/nwp/{source}/{area}/{YYYYMMDD}T{HHMM}.zarr.zip - - where the time values pertain to the init time. - """ - validate_existing_zarr_files() - - -def gen_run_config(asset_key: dg.AssetKey) -> dg.RunConfig: - """Generate a Run config for the validate_existing_files job.""" - vc: ValidateExistingFilesConfig = ValidateExistingFilesConfig( - base_path=RAW_FOLDER, - source=asset_key.path[1], - area=asset_key.path[2], - asset_name=asset_key.path[3], - ) - - if asset_key.path[-1] == "raw_archive": - return dg.RunConfig( - ops={ - validate_existing_raw_files.__name__: vc, - }, - ) - elif asset_key.path[-1] == "zarr_archive": - return dg.RunConfig( - ops={ - validate_existing_zarr_files.__name__: vc, - }, - ) diff --git a/local_archives/nwp/meteomatics/__init__.py b/local_archives/nwp/meteomatics/__init__.py deleted file mode 100644 index 7015086..0000000 --- a/local_archives/nwp/meteomatics/__init__.py +++ /dev/null @@ -1,12 +0,0 @@ -import dagster as dg - -from . import meteomatics_sites_india - -india_site_assets = dg.load_assets_from_modules( - modules=[meteomatics_sites_india], - group_name="meteomatics_sites_india", -) - -all_assets: list[dg.AssetsDefinition] = [ - *india_site_assets, -] diff --git a/local_archives/nwp/meteomatics/meteomatics_sites_india.py b/local_archives/nwp/meteomatics/meteomatics_sites_india.py deleted file mode 100644 index 8c2b8bb..0000000 --- a/local_archives/nwp/meteomatics/meteomatics_sites_india.py +++ /dev/null @@ -1,212 +0,0 @@ -import datetime as dt -import os -import pathlib - -import dagster as dg -import meteomatics.api as mmapi -import pandas as pd -import xarray as xr -import zarr -from ocf_blosc2 import Blosc2 - -from constants import LOCATIONS_BY_ENVIRONMENT -from resources import MeteomaticsAPIResource - -env = os.getenv("ENVIRONMENT", "local") -BASE_PATH = LOCATIONS_BY_ENVIRONMENT[env].NWP_ZARR_FOLDER - -# ==== Constants ==== - -# The order of these coordinate lists are used to determine the station_id -solar_coords = [ - (26.264, 71.237), - (26.671, 71.262), - (26.709, 71.413), - (26.871, 71.49), - (26.833, 71.815), - (26.792, 72.008), - (26.892, 72.06), - (27.179, 71.841), - (27.476, 71.971), - (27.387, 72.218), - (27.951, 72.987), - (28.276, 73.341), - (24.687, 75.132), - (26.731, 73.2), - (26.524, 72.862), - (27.207, 74.252), - (27.388, 72.208), - (27.634, 72.698), - (28.344, 73.435), - (28.022, 73.067), - # Adani - (13.995, 78.428), - (26.483, 71.232), - (14.225, 77.43), - (24.12, 69.34), -] - -wind_coords = [ - (27.035, 70.515), - (27.188, 70.661), - (27.085, 70.638), - (27.055, 70.72), - (27.186, 70.81), - (27.138, 71.024), - (26.97, 70.917), - (26.898, 70.996), - (26.806, 70.732), - (26.706, 70.81), - (26.698, 70.875), - (26.708, 70.982), - (26.679, 71.027), - (26.8, 71.128), - (26.704, 71.127), - (26.5, 71.285), - (26.566, 71.369), - (26.679, 71.452), - (26.201, 71.295), - (26.501, 72.512), - (26.463, 72.836), - (26.718, 73.049), - (26.63, 73.581), - (24.142, 74.731), - (23.956, 74.625), - (23.657, 74.772), - # Adani - (26.479, 1.220), - (23.098, 75.255), - (23.254, 69.252), -] - -wind_parameters = [ - "wind_speed_10m:ms", - "wind_speed_100m:ms", - "wind_speed_200m:ms", - "wind_dir_10m:d", - "wind_dir_100m:d", - "wind_dir_200m:d", - "wind_gusts_10m:ms", - "wind_gusts_100m:ms", - "wind_gusts_200m:ms", - "air_density_10m:kgm3", - "air_density_25m:kgm3", - "air_density_100m:kgm3", - "air_density_200m:kgm3", - "cape:Jkg", -] - - -solar_parameters = [ - "direct_rad:W", - "diffuse_rad:W", - "global_rad:W", -] - -# ==== Ops ==== - -@dg.op -def query_meteomatics_wind_api( - context: dg.OpExecutionContext, - meteomatics_api: MeteomaticsAPIResource, -) -> pd.DataFrame: - """Query Meteomatics API for wind data.""" - return meteomatics_api.query_api( - start=context.partition_time_window.start, - end=context.partition_time_window.end, - coords=wind_coords, - params=wind_parameters, - ) - -@dg.op -def query_meteomatics_solar_api( - context: dg.OpExecutionContext, - meteomatics_api: MeteomaticsAPIResource, -) -> pd.DataFrame: - """Query Meteomatics API for solar data.""" - return meteomatics_api.query_api( - start=context.partition_time_window.start, - end=context.partition_time_window.end, - coords=solar_coords, - params=solar_parameters, - ) - -@dg.op -def map_df_ds(df: pd.DataFrame) -> xr.Dataset: - """Map DataFrame to xarray Dataset.""" - # Reset index to create columns for lat, lon, and validdate - df = df.reset_index(level=["lat", "lon", "validdate"]) - # Create a station_id column based on the coordinates - df["station_id"] = df.groupby(["lat", "lon"], sort=False).ngroup() + 1 - # Create a time_utc column based on the validdate - df["time_utc"] = pd.to_datetime(df["validdate"]) - # Make a new index based on station_id and time_utc - df = df.set_index(["station_id", "time_utc"]).drop(columns=["validdate"]) - # Create xarray dataset from dataframe - ds = xr.Dataset.from_dataframe(df).set_coords(("lat", "lon")) - # Ensure time_utc is a timestamp object - ds["time_utc"] = pd.to_datetime(ds["time_utc"]) - return ds - - -@dg.op -def store_ds(context: dg.OpExecutionContext, ds: xr.Dataset) -> dg.Output[pathlib.Path]: - """Store xarray Dataset to Zarr.""" - encoding = {} - for var in ds.data_vars: - encoding[var] = {"compressor": Blosc2(cname="zstd", clevel=5)} - - pdt = context.partition_time_window.start - path = pathlib.Path( - f"{BASE_PATH}/{'/'.join(context.asset_key.path[:-1])}/{context.asset_key.path[-1]}_{pdt.strftime('%Y-%m')}.zarr.zip", - ) - path.parent.mkdir(parents=True, exist_ok=True) - with zarr.ZipStore(path.as_posix(), mode="w") as store: - ds.to_zarr(store, encoding=encoding, mode="w") - - return dg.Output( - path, - metadata={ - "dataset": dg.MetadataValue.text(ds.__str__()), - "path": dg.MetadataValue.path(path.as_posix()), - "partition_size:kb": dg.MetadataValue.int(int(path.stat().st_size / 1024)), - }, - ) - - -# ==== Assets ==== - -@dg.graph_asset( - key=["nwp", "meteomatics", "nw_india", "wind_archive"], - partitions_def=dg.TimeWindowPartitionsDefinition( - fmt="%Y-%m", - start="2019-03", - cron_schedule="0 0 1 * *", # Once a month - ), - metadata={ - "path": dg.MetadataValue.path(f"{BASE_PATH}/nwp/meteomatics/nw_india/wind_archive"), - }, -) -def meteomatics_wind_archive() -> dg.Output[str]: - """Meteomatics wind archive asset.""" - df = query_meteomatics_wind_api() - ds = map_df_ds(df) - return store_ds(ds) - - -@dg.graph_asset( - key=["nwp", "meteomatics", "nw_india", "solar_archive"], - partitions_def=dg.TimeWindowPartitionsDefinition( - fmt="%Y-%m", - start="2019-03", - cron_schedule="0 0 1 * *", # Once a month - ), - metadata={ - "path": dg.MetadataValue.path(f"{BASE_PATH}/nwp/meteomatics/nw_india/solar_archive"), - }, -) -def meteomatics_solar_archive() -> dg.Output[pathlib.Path]: - """Meteomatics solar archive asset.""" - df = query_meteomatics_solar_api() - ds = map_df_ds(df) - return store_ds(ds) diff --git a/local_archives/nwp/noaa-gfs-global.py b/local_archives/nwp/noaa-gfs-global.py new file mode 100644 index 0000000..08cf54b --- /dev/null +++ b/local_archives/nwp/noaa-gfs-global.py @@ -0,0 +1,70 @@ +"""Zarr archive of NWP data from NCEP's GFS model. + +The National Centers for Environmental Prediction (NCEP) runs the +deterministic Global Forecast System (GFS) model +(https://www.ncei.noaa.gov/products/weather-climate-models/global-forecast). + +Sourced via S3 from NOAA (https://noaa-gfs-bdp-pds.s3.amazonaws.com/index.html). +This asset is updated monthly, and surfaced as a Zarr Directory Store for each month. +It is downloaded using the nwp-consumer docker image +(https://github.com/openclimatefix/nwp-consumer). +""" + +import datetime as dt +import os +from typing import Any + +import dagster as dg +from dagster_docker import PipesDockerClient + +ARCHIVE_FOLDER = "/var/dagster-storage/nwp/ncep-gfs-global" +if os.getenv("ENVIRONMENT", "local") == "leo": + ARCHIVE_FOLDER = f"/mnt/storage_b/nwp/ncep-gfs-global" + +partitions_def: dg.TimeWindowPartitionsDefinition = dg.MonthlyPartitionsDefinition( + start_date="2021-01-01", + end_offset=-1, +) + +@dg.asset( + name="ncep-gfs-global", + description=__doc__, + metadata={ + "archive_folder": dg.MetadataValue.text(ARCHIVE_FOLDER), + "area": dg.MetadataValue.text("global"), + "source": dg.MetadataValue.text("noaa-s3"), + "model": dg.MetadataValue.text("ncep-gfs"), + "expected_runtime": dg.MetadataValue.text("6 hours"), + }, + compute_kind="docker", + automation_condition=dg.AutomationCondition.on_cron( + cron_schedule=partitions_def.get_cron_schedule( + hour_of_day=21, + day_of_week=1, + ), + ), + tags={ + "dagster/max_runtime": str(60 * 60 * 10), # Should take 6 ish hours + "dagster/priority": "1", + "dagster/concurrency_key": "nwp-consumer", + }, +) +def ncep_gfs_global_asset( + context: dg.AssetExecutionContext, + pipes_docker_client: PipesDockerClient, +) -> Any: + it: dt.datetime = context.partition_time_window.start + return pipes_docker_client.run( + image="ghcr.io/openclimatefix/nwp-consumer:1.0.12", + command=["archive", "-y", str(it.year), "-m", str(it.month)], + env={ + "MODEL_REPOSITORY": "gfs", + "NOTIFICATION_REPOSITORY": "dagster-pipes", + "CONCURRENCY": "false", + }, + container_kwargs={ + "volumes": [f"{ARCHIVE_FOLDER}:/work"], + }, + context=context, + ).get_results() + diff --git a/local_archives/sat/__init__.py b/local_archives/sat/__init__.py index 28aeb1d..6209f98 100644 --- a/local_archives/sat/__init__.py +++ b/local_archives/sat/__init__.py @@ -2,13 +2,8 @@ import dagster as dg -from . import eumetsat - -all_assets: list[dg.AssetsDefinition] = [ - *eumetsat.all_assets, -] - -all_jobs: list[dg.JobDefinition] = [] - -all_schedules: list[dg.ScheduleDefinition] = [] +sat_assets: list[dg.AssetsDefinition] = dg.load_assets_from_current_module( + group_name="sat", + key_prefix=["sat"], +) diff --git a/local_archives/sat/eumetsat/__init__.py b/local_archives/sat/eumetsat/__init__.py deleted file mode 100644 index ca94245..0000000 --- a/local_archives/sat/eumetsat/__init__.py +++ /dev/null @@ -1,12 +0,0 @@ -import dagster as dg - -from . import eumetsat_iodc - - -iodc_assets = dg.load_assets_from_modules( - modules=[eumetsat_iodc], - group_name="eumetsat_iodc", -) - -all_assets: list[dg.AssetsDefinition] = [*iodc_assets] - diff --git a/local_archives/sat/eumetsat/eumetsat_iodc.py b/local_archives/sat/eumetsat/eumetsat_iodc.py deleted file mode 100644 index 7e0cd44..0000000 --- a/local_archives/sat/eumetsat/eumetsat_iodc.py +++ /dev/null @@ -1,59 +0,0 @@ -import datetime as dt -import os -from typing import Any - -import dagster as dg - -from constants import LOCATIONS_BY_ENVIRONMENT - -env = os.getenv("ENVIRONMENT", "local") -ZARR_FOLDER = LOCATIONS_BY_ENVIRONMENT[env].SAT_ZARR_FOLDER - -@dg.asset( - name="zarr_archive", - description="".join(( - "Zarr archive of satellite data from EUMETSAT's IODC satellite.", - "Sourced via EUMDAC from EUMETSAT ", - "(https://navigator.eumetsat.int/product/EO:EUM:DAT:MSG:OCA-IODC). ", - "This asset is updated monthly, and surfaced as a Zarr Directory Store ", - "for each month. It is downloaded using the sat container ", - "(https://github.com/openclimatefix/dagster-dags/pkgs/container/sat-etl).", - )), - key_prefix=["sat", "eumetsat", "iodc"], - metadata={ - "archive_folder": dg.MetadataValue.text(f"{ZARR_FOLDER}/sat/eumetsat/india"), - "area": dg.MetadataValue.text("india"), - "source": dg.MetadataValue.text("eumetsat"), - "expected_runtime": dg.MetadataValue.text("TBD"), - }, - compute_kind="subprocess", - automation_condition=dg.AutomationCondition.eager(), - tags={ - # "dagster/max_runtime": str(60 * 60 * 10), # Should take 6 ish hours - "dagster/priority": "1", - "dagster/concurrency_key": "eumetsat", - }, - partitions_def=dg.MonthlyPartitionsDefinition( - start_date="2019-01-01", - end_offset=-3, - ), -) -def iodc_monthly( - context: dg.AssetExecutionContext, - pipes_subprocess_client: dg.PipesSubprocessClient, -) -> Any: - image: str = "ghcr.io/openclimatefix/sat-etl:main" - it: dt.datetime = context.partition_time_window.start - return pipes_subprocess_client.run( - command=[ - "/home/dagster/mambaforge/envs/sat-etl/bin/python", - "/home/dagster/dags/containers/sat/download_process_sat.py", - "--month", - it.strftime("%Y-%m"), - "--path", - f"/mnt/storage_a/sat/eumetsat/india", - "--rm", - ], - context=context, - ).get_materialize_result() - diff --git a/local_archives/sat/eumetsat_iodc_lrv.py b/local_archives/sat/eumetsat_iodc_lrv.py new file mode 100644 index 0000000..a110d7f --- /dev/null +++ b/local_archives/sat/eumetsat_iodc_lrv.py @@ -0,0 +1,59 @@ +"""Zarr archive of satellite image data from EUMETSAT's RSS service, low resolution. + +EUMETSAT have a seviri satellite that provides images of the earth's surface. +The Rapid Scan Service (RSS) provides images at 15 minute intervals. +The images are in the MSG format, which is a compressed format that contains +multiple channels of data. The come in high resolution (HRV) and low resolution (LRV). + +Sourced via eumdac from DataStore (https://navigator.eumetsat.int/product/EO:EUM:DAT:MSG:RSS). +This asset is updated monthly, and surfaced as a Zarr Directory Store for each month. +It is downloaded using the sat container. +""" + +import datetime as dt +import os +from typing import Any + +import dagster as dg +from dagster_docker import PipesDockerClient + +ARCHIVE_FOLDER = "/var/dagster-storage/sat/eumetsat-iodc-lrv" +if os.getenv("ENVIRONMENT", "local") == "leo": + ARCHIVE_FOLDER = "/mnt/storage_b/sat/eumetsat-iodc-lrv" + +partitions_def: dg.TimeWindowPartitionsDefinition = dg.MonthlyPartitionsDefinition( + start_date="2019-01-01", + end_offset=-1, +) + +@dg.asset( + name="eumetsat-iodc-lrv", + description=__doc__, + metadata={ + "archive_folder": dg.MetadataValue.text(ARCHIVE_FOLDER), + "area": dg.MetadataValue.text("india"), + "source": dg.MetadataValue.text("eumetsat"), + "expected_runtime": dg.MetadataValue.text("6 hours"), + }, + compute_kind="docker", + tags={ + "dagster/max_runtime": str(60 * 60 * 10), # Should take 6 ish hours + "dagster/priority": "1", + "dagster/concurrency_key": "eumetsat", + }, + partitions_def=partitions_def, +) +def eumetsat_seviri_asset( + context: dg.AssetExecutionContext, + pipes_docker_client: PipesDockerClient, +) -> Any: + it: dt.datetime = context.partition_time_window.start + return pipes_docker_client.run( + image="ghcr.io/openclimatefix/sat-etl:main", + command=["iodc", "--month", f"{it:%Y-%m}", "--path", "/work", "--rm"], + container_kwargs={ + "volumes": [f"{ARCHIVE_FOLDER}:/work"], + }, + context=context, + ).get_results() + diff --git a/managers/__init__.py b/managers/__init__.py deleted file mode 100644 index fbd8692..0000000 --- a/managers/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from .xr_zarr_local import LocalFilesystemXarrayZarrManager - -__all__ = ["LocalFilesystemXarrayZarrManager"] diff --git a/managers/xr_zarr_local.py b/managers/xr_zarr_local.py deleted file mode 100644 index ef3e1ef..0000000 --- a/managers/xr_zarr_local.py +++ /dev/null @@ -1,76 +0,0 @@ -import datetime as dt -import pathlib - -import dagster as dg -import xarray as xr -import zarr -from ocf_blosc2 import Blosc2 - - -class LocalFilesystemXarrayZarrManager(dg.ConfigurableIOManager): - """IOManager for reading and writing xarray datasets to the local filesystem. - - Datasets are stored in zipped zarr format. It is expected to be used with an asset - continaing a MultiPartitionDefinition with two keys: "date" and "inittime" from which - the full initialisation time of the dataset can be inferred. - - The dataset is stored in a folder structure using the assets key prefixes and the - base path. The full path to the dataset is: - - {base_path}/{slash_joined_asset_key_prefixes}/{date}{inittime}.zarr.zip - """ - - base_path: str = "" - filename_formatstr: str = "%Y%m%dT%H%M.zarr.zip" - - def _get_path(self, context: dg.InputContext | dg.OutputContext) -> pathlib.Path: - """Get the path to the zarr file.""" - if context.has_partition_key: - if isinstance(context.asset_key.path, str) or len(context.asset_key.path) <= 1: - raise ValueError( - "AssetKey is not a list of strings with at least two elements." - "Ensure the you have setkey_prefix on the asset.", - ) - - asset_prefixes: str = "/".join(context.asset_key.path[:-1]) - it = context.asset_partitions_time_window.start - return ( - pathlib.Path(self.base_path) / asset_prefixes / it.strftime(self.filename_formatstr) - ) - else: - # Not yet implemented - raise NotImplementedError("No partition key found") - - def handle_output(self, context: dg.OutputContext, obj: xr.Dataset) -> None: - """Save an xarray dataset to a zarr file.""" - dst = self._get_path(context) - if dst.exists(): - dst.unlink() - dst.parent.mkdir(parents=True, exist_ok=True) - dataVar: str = next(iter(obj.data_vars.keys())) - with zarr.ZipStore(path=dst.as_posix(), mode="w") as store: - obj.to_zarr( - store=store, - encoding={ - "init_time": {"units": "nanoseconds since 1970-01-01"}, - dataVar: { - "compressor": Blosc2(cname="zstd", clevel=5), - }, - }, - ) - context.add_output_metadata( - { - "path": dg.MetadataValue.path(dst.as_posix()), - "size": dg.MetadataValue.int(dst.stat().st_size), - "modified": dg.MetadataValue.text( - dt.datetime.fromtimestamp(dst.stat().st_mtime, tz=dt.UTC).strftime( - "%Y-%m-%d %H:%M:%S", - ), - ), - }, - ) - - def load_input(self, context: dg.InputContext) -> xr.Dataset: - """Load an xarray dataset from a zarr file.""" - src = self._get_path(context) - return xr.open_zarr(f"zip::{src.as_posix()}") diff --git a/pyproject.toml b/pyproject.toml index 47e386d..eed40ba 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,45 +13,38 @@ readme = {file = "README.md", content-type = "text/markdown"} requires-python = ">=3.11.0" license = {text = "MIT License"} authors = [ - { name = "Jacob Bieker", email = "jacob@openclimatefix.org"}, { name = "Sol Cotton", email = "sol@openclimatefix.org"}, + { name = "Jacob Bieker", email = "jacob@openclimatefix.org"}, ] classifiers = ["Programming Language :: Python :: 3"] dependencies = [ "cdsapi >= 0.6.1", - "ecmwf-api-client >= 1.6.3", - "dagit >= 1.8.5", "dagster >= 1.8.5", - "dagster-cloud >= 1.8.5", - "dagster-webserver >= 1.8.5", - "dagster-graphql >= 1.8.5", "dagster-postgres >= 0.24.5", "dagster-docker >= 0.24.5", "dagster-pipes >= 1.8.5", "huggingface-hub >= 0.19.4", - "kbatch >= 0.4.2", - "meteomatics == 2.11.1", "numpy >= 1.26.0", - "nwp-consumer >= 0.5.8", - "ocf-blosc2 >= 0.0.3", - "pathlib >= 1.0.1", "pyarrow >= 10.0.1", "requests >= 2.31.0", "requests-toolbelt >= 1.0.0", - "xarray >= 2022.3.0", - "zarr >= 2.13.3", ] -[project.optional-dependencies] +[dependency-groups] dev = [ - "mypy == 1.7.1", - "types-PyYAML", + # Testing + "pytest", + "unittest-xml-reporting", + "dagit", + # Linting + "ruff", + "types-pyyaml", "types-pytz", "types-requests", - "ruff == 0.1.7", - "unittest-xml-reporting == 3.2.0", - "pytest >= 7.4.1", - "python-lsp-server == 1.7.4" + # LSP support + "python-lsp-server", + "pylsp-mypy", + "python-lsp-ruff", ] [tool.setuptools.packages.find] diff --git a/resources/__init__.py b/resources/__init__.py deleted file mode 100644 index 7f04d49..0000000 --- a/resources/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -"""Configurable resources for use across dagster application.""" - -from .meteomatics import MeteomaticsAPIResource - -__all__ = ["MeteomaticsAPIResource"] diff --git a/resources/meteomatics.py b/resources/meteomatics.py deleted file mode 100644 index fe2ded9..0000000 --- a/resources/meteomatics.py +++ /dev/null @@ -1,58 +0,0 @@ -import datetime as dt - -import dagster as dg -import meteomatics.api -import pandas as pd -from pydantic import PrivateAttr - - - -class MeteomaticsAPIResource(dg.ConfigurableResource): - """A resource for interacting with the Meteomatics API.""" - - # Authentication for the API, set via environment - username: str - password: str - - # Subscription limits - _subscription_min_date: dt.datetime = PrivateAttr() - _subscription_max_requestable_parameters = PrivateAttr() - - def setup_for_execution(self, context) -> None: - """Set up the resource according to subscription limits.""" - self._subscription_min_date = dt.datetime(2019, 3, 19, tzinfo=dt.UTC) - self._subscription_max_requestable_parameters = 10 - - def query_api(self, start: dt.datetime, end: dt.datetime, coords: list[tuple[float, float]], params: list[str]) -> pd.DataFrame: - """Query the Meteomatics API for NWP data.""" - - # Ensure subscription limits are respected - # * Split the parameters into groups of max size - groups = [ - params[i : i + self._subscription_max_requestable_parameters] - for i in range(0, len(params), self._subscription_max_requestable_parameters) - ] - - dfs: list[pd.DataFrame] = [] - try: - for param_group in groups: - df: pd.DataFrame = meteomatics.api.query_time_series( - coordinate_list=coords, - startdate=max(start, self._subscription_min_date), - enddate=max(end, self._subscription_min_date), - interval=dt.timedelta(minutes=15), - parameters=param_group, - username=self.username, - password=self.password, - model="ecmwf-ifs", - ) - dfs.append(df) - except Exception as e: - raise dg.Failure( - description=f"Failed to query the Meteomatics API: {e}", - ) from e - - if len(dfs) > 1: - return dfs[0].join(dfs[1:]) - else: - return dfs[0] diff --git a/tests/compile_test.py b/tests/compile_test.py index 454d46b..0c6c68c 100644 --- a/tests/compile_test.py +++ b/tests/compile_test.py @@ -1,17 +1,10 @@ import sys -from local_archives.nwp import all_assets - +from local_archives import all_assets def test_nwp_asset_key_prefixes() -> None: """Test asset keys for all nwp assets have the correct key structure.""" for asset in all_assets: - assert len(asset.key.path) == 4 + # Ensure that the prefix is one of the expected flavours + assert asset.key.path[0] in ["nwp", "sat", "air"] - # Ensure that the prefix is as expected - # The first element should be the flavor: - assert asset.key.path[0] in ["nwp", "sat"] - # The second element should be the provider - assert asset.key.path[1] in ["ecmwf", "metoffice", "eumetsat", "cams", "ceda", "meteomatics", "gfs", "ecmwf-eps"] - # The third element should be the region - assert asset.key.path[2] in ["uk", "eu", "global", "nw_india", "malta", "india", "india-stat"] diff --git a/workspace.yaml b/workspace.yaml new file mode 100644 index 0000000..d284844 --- /dev/null +++ b/workspace.yaml @@ -0,0 +1,7 @@ +load_from: + - python_module: + working_directory: /opt/dagster/app + module_name: local_archives + - python_module: + working_directory: /opt/dagster/app + module_name: cloud_archives