8
8
#
9
9
# =================================================================
10
10
11
- import logging
12
11
from dagster import op , get_dagster_logger
13
12
from typing import List , Dict , Any
14
13
import requests
@@ -29,13 +28,14 @@ def upsert_collection_item(collection: str, item: Dict[str, Any]) -> bool:
29
28
response = requests .post (api_url , json = item )
30
29
return response .status_code == 201 # Return True if creation is successful
31
30
31
+
32
32
@op
33
33
def transform_stations (stations : List [Dict [str , Any ]]) -> StationsData :
34
34
"""
35
35
Transform raw station data into structured format.
36
36
"""
37
37
transformed_stations = []
38
-
38
+
39
39
for row in stations :
40
40
station = Station (
41
41
station_id = row ['MonitoringLocationIdentifier' ],
@@ -50,27 +50,28 @@ def transform_stations(stations: List[Dict[str, Any]]) -> StationsData:
50
50
datastreams = []
51
51
)
52
52
transformed_stations .append (station )
53
-
53
+
54
54
return StationsData (stations = transformed_stations )
55
55
56
+
56
57
@op
57
58
def publish_station_collection (stations_data : StationsData ) -> None :
58
59
"""
59
60
Publishes station collection to API config and backend.
60
-
61
+
61
62
Takes in the transformed station data and sends it to the SensorThings API.
62
-
63
+
63
64
:param stations_data: The transformed station data.
64
65
:returns: `None`
65
66
"""
66
67
# Iterate over the transformed stations data
67
68
for station in stations_data .stations :
68
69
station_identifier = station .station_id
69
-
70
+
70
71
try :
71
72
datastreams = load_datastreams (station_identifier )
72
73
except Exception :
73
- LOGGER .warning (f"Failed to load datastreams for { station_identifier } " )
74
+ LOGGER .warning (f"Failed to load datastreams for { station_identifier } " ) # noqa
74
75
continue
75
76
76
77
try :
@@ -100,9 +101,9 @@ def publish_station_collection(stations_data: StationsData) -> None:
100
101
}],
101
102
'properties' : {
102
103
'mainstem' : mainstem ,
103
- 'hu08' : url_join (GEOCONNEX_URL , 'ref/hu08' , station .huc_code ),
104
- 'state' : url_join (GEOCONNEX_URL , 'ref/states' , station .state_code ),
105
- 'county' : url_join (GEOCONNEX_URL , 'ref/counties' , f"{ station .state_code } { station .county_code } " ),
104
+ 'hu08' : url_join (GEOCONNEX_URL , 'ref/hu08' , station .huc_code ), # noqa
105
+ 'state' : url_join (GEOCONNEX_URL , 'ref/states' , station .state_code ), # noqa
106
+ 'county' : url_join (GEOCONNEX_URL , 'ref/counties' , f"{ station .state_code } { station .county_code } " ), # noqa
106
107
'provider' : station .provider
107
108
},
108
109
'Datastreams' : list (datastreams )
@@ -118,6 +119,7 @@ def publish_station_collection(stations_data: StationsData) -> None:
118
119
119
120
return
120
121
122
+
121
123
def get_mainstem_uri (id ):
122
124
# Convert the input geom to GeoJSON using Shapely
123
125
0 commit comments