Skip to content

Commit

Permalink
rework backoff, add progress bar
Browse files Browse the repository at this point in the history
  • Loading branch information
jcla490 committed Mar 14, 2023
1 parent 25c75c5 commit 9002bc9
Show file tree
Hide file tree
Showing 4 changed files with 116 additions and 64 deletions.
27 changes: 24 additions & 3 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "landfire"
version = "0.4.0"
version = "0.4.1"
description = "Landfire"
authors = ["FireSci <support@firesci.io>"]
license = "MIT"
Expand All @@ -25,6 +25,7 @@ requests = ">=2.28.0"
geojson = { version = ">=3.0.0", optional = true }
geopandas = { version = ">=0.12.0", optional = true }
fiona = { version = ">=1.9.0", optional = true }
tqdm = "^4.65.0"

[tool.poetry.dev-dependencies]
Pygments = ">=2.10.0"
Expand Down
120 changes: 75 additions & 45 deletions src/landfire/__init__.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,23 @@
"""Landfire data accessor."""
import logging
import time
from pathlib import Path
from typing import Any, Dict, List, Optional

import requests
from attrs import AttrsInstance, define, field, validators
from requests import Response
from tqdm import tqdm

from landfire.product.search import ProductSearch


__version__ = "0.4.0"
__all__ = ["landfire"]

# URLs for making requests to LANDFIRE ArcGIS Rest Service
BASE_URL = "https://lfps.usgs.gov/arcgis/rest/services/LandfireProductService/GPServer/LandfireProductService"
REQUEST_URL = BASE_URL + "/submitJob?"
JOB_URL = BASE_URL + "/jobs/"

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


@define
class Landfire:
Expand Down Expand Up @@ -58,27 +54,30 @@ def __attrs_post_init__(self) -> None:
"f": "JSON",
}

# api will fail if 30 is provided to Resample_Resolution. Prefer to handle it here instead of confusing the user to provide None when they want 30 m.
# api will fail if 30 is provided to Resample_Resolution. Handle here instead of confusing user to provide None when they want 30m.
if self.resample_res != 30:
self._base_params["Resample_Resolution"] = self.resample_res

@resample_res.validator
def __resample_range_check(self, attribute: AttrsInstance, value: int) -> None:
def _resample_range_check(self, attribute: AttrsInstance, value: int) -> None:
"""Ensure resampling resolution is within allowable range."""
if not 30 <= value <= 9999:
raise ValueError("resample_res must be between 30 and 9999 meters.")

def __log_status(self, msg: str, show_status: bool = True) -> None:
"""Show processing status as log INFO.
def _write_status(
self, msg: str, progress_bar: tqdm, show_status: bool = True
) -> None:
"""Write progress bar status.
Args:
msg: Message to log.
msg: Message to write.
progress_bar: tqdm progress bar instance.
show_status: Whether to log message.
"""
if show_status:
logger.info(msg)
progress_bar.write(msg)

def __validate_layers(self, layers: List[str]) -> None:
def _validate_layers(self, layers: List[str]) -> None:
"""Validate user provided layers are available for download.
Args:
Expand All @@ -95,7 +94,7 @@ def __validate_layers(self, layers: List[str]) -> None:
"Specified layers do not match available layers from the LANDFIRE API. Please check your layer list and try again!"
)

def __validate_user_output_path(self, output_path: str) -> Path:
def _validate_user_output_path(self, output_path: str) -> Path:
"""Validate user provided output_path is valid.
Args:
Expand All @@ -116,7 +115,7 @@ def __validate_user_output_path(self, output_path: str) -> Path:
)
return path_obj

def __write_resp_to_file(self, response: Response, final_path: Path) -> None:
def _write_resp_to_file(self, response: Response, final_path: Path) -> None:
"""Write final .zip output to user provided path.
Args:
Expand All @@ -129,7 +128,7 @@ def __write_resp_to_file(self, response: Response, final_path: Path) -> None:
for chunk in response.iter_content(chunk_size=1024 * 1024):
fd.write(chunk)

def __submit_request(
def _submit_request(
self,
url: str,
params: Optional[Dict[str, Any]] = None,
Expand All @@ -145,7 +144,7 @@ def __submit_request(
Returns:
Response object.
"""
submit_req = requests.get(url=url, params=params, stream=stream, timeout=300)
submit_req = requests.get(url=url, params=params, stream=stream, timeout=600)
submit_req.raise_for_status()
return submit_req

Expand All @@ -154,52 +153,67 @@ def request_data(
layers: List[str],
output_path: str,
show_status: bool = True,
backoff_base_value: int = 2,
backoff_base_value: int = 5,
) -> None:
"""Request particular layers from Landfire to be output as a zipped .tif. NOTE: this function has no return, data will simply be downloaded to the specified `output_path`.
"""Request particular layers from Landfire to be output as a zipped .tif.
NOTE: this function has no return, data will simply be downloaded to the specified `output_path`.
NOTE: this function implements a linear backoff strategy, polling for job status every 5 seconds by default. Depending on the size of your job, it may take several seconds or minutes to process. You may change this with the `backoff_base_value`.
Args:
layers: List of product layers.
output_path: Path-like string where data will be downloaded to. Include 'empty' file name and .zip extension. For example, `~/tmp/my_landfire_data/output.zip`.
show_status: Boolean whether to log data request status.
backoff_base_value: Base time in seconds for liner backoff strategy. This is used to periodically query the job API for status while avoiding making too many requests.
show_status: Whether to write (True) or suppress (False) progress bar and status update output for data request.
backoff_base_value: Base time in seconds for liner backoff strategy. This is used to query the job API periodically for status while avoiding making too many requests. Please be courteous with this parameter as it will directly affect the number of calls to the LANDFIRE API.
Raises:
RuntimeError: If provided layers are not valid, if output_path does not exist, or if an unexpected error occurs when processing requested data.
"""
# User input validation
self.__validate_layers(layers)
final_path: Path = self.__validate_user_output_path(output_path)
self._validate_layers(layers)
final_path: Path = self._validate_user_output_path(output_path)

# Add layer list to base_params
self._base_params["Layer_List"] = ";".join(layers)

# Init progress
if show_status:
pbar = tqdm(total=100, desc="Job Status")
else:
pbar = tqdm(total=100, desc="Job Status", disable=True)

# Submit initial request for layers
submit_job_req = self.__submit_request(
self._write_status("Submitting job...", pbar, show_status)
submit_job_req = self._submit_request(
REQUEST_URL, params=self._base_params, stream=False
).json()

# Get job id, check status of processing with backoff
if "jobId" in submit_job_req:
job_id = submit_job_req["jobId"]
status = submit_job_req["jobStatus"]
self.__log_status("Job submitted! Processing request.", show_status)

pbar.update(25)
self._write_status(
"Job submitted! Processing layers... ⏱️", pbar, show_status
)

job_url = JOB_URL + job_id
n = 0
while status == "esriJobSubmitted" or status == "esriJobExecuting":
# Backoff logic
n += 1
backoff_sec = backoff_base_value * n
# Don't wait on first try
if n != 1:
self.__log_status(
f"Checking status of job in {backoff_sec} seconds...",
show_status,
)
time.sleep(backoff_sec)
self._write_status(
f"Checking status of job again in {backoff_sec} seconds... 💤",
pbar,
show_status,
)
time.sleep(backoff_sec)

# Get job status
status_job_req = self.__submit_request(
status_job_req = self._submit_request(
url=job_url, params={"f": "json"}, stream=False
).json()

Expand All @@ -217,40 +231,56 @@ def request_data(
if status == "esriJobSucceeded":
data_path = status_job_req["results"]["Output_File"]["paramUrl"]
results_url = job_url + "/" + data_path

pbar.update(25)
self._write_status(
"Job complete! Getting path to .zip file... 🙏",
pbar,
show_status,
)

# Get zip file url
data_job_req = self.__submit_request(
data_job_req = self._submit_request(
results_url, params={"f": "json"}, stream=False
).json()

zip_url = data_job_req["value"]["url"]
self.__log_status(
f"Downloading data as .zip to {output_path}",

pbar.update(25)
self._write_status(
"Downloading data as .zip file... 🤞",
pbar,
show_status,
)

# Last request to get the zip file
zip_job_req = self.__submit_request(zip_url, stream=True)

self.__write_resp_to_file(zip_job_req, final_path)

self.__log_status(
f"Data written successfully to {output_path}",
zip_job_req = self._submit_request(zip_url, stream=True)
# Write data to user path
self._write_resp_to_file(zip_job_req, final_path)

pbar.update(25)
self._write_status(
f"Data written successfully to {output_path}! 🎉",
pbar,
show_status,
)
pbar.close()

# Still executing, display most recent processing step
elif status in (
"esriJobExecuting",
"esriJobSubmitted",
"esriJobWaiting",
):
self.__log_status(
f"Still processing! Most recent message is `{latest_status_msg}`",
self._write_status(
f"Most recent message is `{latest_status_msg}`",
pbar,
show_status,
)

# Fail if processing error
else:
raise RuntimeError(
f"Encountered an error during job processing! Status was {status} and message was {latest_status_msg}."
f"Encountered an error during job processing! Status was `{status}` and message was `{latest_status_msg}`."
)
# Fail if no job status
else:
Expand Down
Loading

0 comments on commit 9002bc9

Please sign in to comment.