-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnoaa-gfs-global.py
76 lines (67 loc) · 2.68 KB
/
noaa-gfs-global.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
"""Zarr archive of NWP data from NCEP's GFS model.
The National Centers for Environmental Prediction (NCEP) runs the
deterministic Global Forecast System (GFS) model
(https://www.ncei.noaa.gov/products/weather-climate-models/global-forecast).
Sourced via S3 from NOAA (https://noaa-gfs-bdp-pds.s3.amazonaws.com/index.html).
This asset is updated monthly, and surfaced as a Zarr Directory Store for each month.
It is downloaded using the nwp-consumer docker image
(https://github.com/openclimatefix/nwp-consumer).
"""
import os
from typing import TYPE_CHECKING
import dagster as dg
from dagster_docker import PipesDockerClient
if TYPE_CHECKING:
import datetime as dt
ARCHIVE_FOLDER = "/var/dagster-storage/nwp/ncep-gfs-global"
if os.getenv("ENVIRONMENT", "local") == "leo":
ARCHIVE_FOLDER = "/mnt/storage_ssd_4tb/nwp/ncep-gfs-global"
partitions_def: dg.TimeWindowPartitionsDefinition = dg.MonthlyPartitionsDefinition(
start_date="2021-01-01",
end_offset=-1,
)
@dg.asset(
name="ncep-gfs-global",
description=__doc__,
metadata={
"archive_folder": dg.MetadataValue.text(ARCHIVE_FOLDER),
"area": dg.MetadataValue.text("global"),
"source": dg.MetadataValue.text("noaa-s3"),
"model": dg.MetadataValue.text("ncep-gfs"),
"expected_runtime": dg.MetadataValue.text("6 hours"),
},
compute_kind="docker",
automation_condition=dg.AutomationCondition.on_cron(
cron_schedule=partitions_def.get_cron_schedule(
hour_of_day=21,
day_of_week=1,
),
),
tags={
"dagster/max_runtime": str(60 * 60 * 10), # Should take 6 ish hours
"dagster/priority": "1",
"dagster/concurrency_key": "nwp-consumer",
},
)
def ncep_gfs_global_asset(
context: dg.AssetExecutionContext,
pipes_docker_client: PipesDockerClient,
) -> dg.MaterializeResult:
"""Dagster asset for NCEP GFS global forecast model data."""
it: dt.datetime = context.partition_time_window.start
return pipes_docker_client.run(
image="ghcr.io/openclimatefix/nwp-consumer:1.0.12",
command=["archive", "-y", str(it.year), "-m", str(it.month)],
env={
"MODEL_REPOSITORY": "gfs",
"NOTIFICATION_REPOSITORY": "dagster-pipes",
"CONCURRENCY": "true",
},
# See https://docker-py.readthedocs.io/en/stable/containers.html#docker.models.containers.ContainerCollection.run
container_kwargs={
"volumes": [f"{ARCHIVE_FOLDER}:/work"],
"mem_limit": "8g",
"nano_cpus": 4e9,
},
context=context,
).get_materialize_result()