Skip to content

Commit

Permalink
rework ingester code to be easier to read
Browse files Browse the repository at this point in the history
  • Loading branch information
aperrin66 committed Feb 29, 2024
1 parent 0cbb2d9 commit 8ba6ded
Showing 1 changed file with 117 additions and 64 deletions.
181 changes: 117 additions & 64 deletions geospaas_harvesting/ingesters.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""
import concurrent.futures
import logging
from enum import Enum

from django.contrib.gis.geos import GEOSGeometry

Expand All @@ -15,6 +16,13 @@
logging.getLogger(__name__).addHandler(logging.NullHandler())


class OperationStatus(Enum):
NOOP = 0
CREATED = 1
UPDATED = 2
REMOVED = 3


class Ingester():
"""Takes care of ingesting the output of a crawler to the database
"""
Expand All @@ -32,80 +40,70 @@ def _uri_exists(uri):
"""Checks if the given URI already exists in the database"""
return DatasetURI.objects.filter(uri=uri).exists()

def _ingest_dataset(self, dataset_info):
"""Writes a dataset to the database based on its attributes and
URL. The input should be a DatasetInfo object.
"""
url = dataset_info.url
normalized_attributes = dataset_info.metadata

created_dataset = created_dataset_uri = updated_dataset = False

try:
dataset_uri = DatasetURI.objects.get(uri=url)
except DatasetURI.DoesNotExist:
dataset_uri = None

if dataset_uri and not self.update:
self.logger.info(
"'%s' is already present in the database", url)
return (url, created_dataset, created_dataset_uri, updated_dataset)

# Extract service information
service = normalized_attributes.pop('geospaas_service', 'UNKNOWN')
service_name = normalized_attributes.pop('geospaas_service_name', 'UNKNOWN')

@staticmethod
def _prepare_dataset_attributes(normalized_attributes):
"""Prepares the attributes needed to instantiate a Dataset"""
# Create the objects with which the dataset has relationships
# (or get them if they already exist)
data_center, _ = DataCenter.objects.get_or_create(
normalized_attributes.pop('provider'))
normalized_attributes.get('provider'))

location_geometry = normalized_attributes.pop('location_geometry')
location_geometry = normalized_attributes.get('location_geometry')
geographic_location, _ = GeographicLocation.objects.get_or_create(
geometry=GEOSGeometry(location_geometry))

location, _ = Location.objects.get_or_create(normalized_attributes.pop('gcmd_location'))
location, _ = Location.objects.get_or_create(normalized_attributes.get('gcmd_location'))

iso_topic_category, _ = ISOTopicCategory.objects.get_or_create(
normalized_attributes.pop('iso_topic_category'))
normalized_attributes.get('iso_topic_category'))

platform, _ = Platform.objects.get_or_create(normalized_attributes.pop('platform'))
platform, _ = Platform.objects.get_or_create(normalized_attributes.get('platform'))

instrument, _ = Instrument.objects.get_or_create(
normalized_attributes.pop('instrument'))
normalized_attributes.get('instrument'))

source, _ = Source.objects.get_or_create(
platform=platform,
instrument=instrument,
specs=normalized_attributes.pop('specs', ''))
dataset_parameters_list = normalized_attributes.pop('dataset_parameters')
# Create Dataset in the database. The normalized_attributes dict contains the
# "basic parameter", which are not objects in the database.
# The objects we just created in the database are passed separately.
specs=normalized_attributes.get('specs', ''))
dataset_parameters_list = normalized_attributes.get('dataset_parameters')

attributes = {
'entry_title': normalized_attributes['entry_title'],
'entry_id': normalized_attributes['entry_id'],
'summary': normalized_attributes['summary'],
'time_coverage_start': normalized_attributes['time_coverage_start'],
'time_coverage_end': normalized_attributes['time_coverage_end'],
'data_center': data_center,
'geographic_location': geographic_location,
'gcmd_location': location,
'ISO_topic_category': iso_topic_category,
'source': source,
**normalized_attributes,
}
# TODO: improve, in particular detect when changes are made
if dataset_uri:
queryset = Dataset.objects.filter(id=dataset_uri.dataset.id)
queryset.update(**attributes)
dataset = queryset.first()
updated_dataset = True
else:
dataset, created_dataset = Dataset.objects.get_or_create(**attributes)
# Create the URI for the created Dataset in the database
_, created_dataset_uri = DatasetURI.objects.get_or_create(
name=service_name,
service=service,
uri=url,
dataset=dataset)

for dataset_parameter_info in dataset_parameters_list:
return attributes, dataset_parameters_list

@classmethod
def _create_dataset(cls, normalized_attributes):
"""Create a Dataset object in the database"""
dataset_attributes, dataset_parameters_list = (
cls._prepare_dataset_attributes(normalized_attributes))
dataset = Dataset.objects.create(**dataset_attributes)
cls._add_dataset_parameters(dataset, dataset_parameters_list)
return (dataset, OperationStatus.CREATED)

@classmethod
def _update_dataset(cls, dataset, normalized_attributes):
"""Update an existing Dataset object in the database"""
dataset_attributes, dataset_parameters_list = (
cls._prepare_dataset_attributes(normalized_attributes))
Dataset.objects.filter(id=dataset.id).update(**dataset_attributes)
cls._add_dataset_parameters(dataset, dataset_parameters_list)
return OperationStatus.UPDATED

@staticmethod
def _add_dataset_parameters(dataset, parameters_list):
""""""
for dataset_parameter_info in parameters_list:
standard_name = dataset_parameter_info.get('standard_name', None)
short_name = dataset_parameter_info.get('short_name', None)
units = dataset_parameter_info.get('units', None)
Expand All @@ -119,13 +117,60 @@ def _ingest_dataset(self, dataset_info):
if params.count() >= 1:
dataset.parameters.add(params[0])

return (url, created_dataset, created_dataset_uri, updated_dataset)
@staticmethod
def _prepare_dataset_uri_attributes(dataset, uri, normalized_attributes):
"""Prepares the attributes needed to instantiate a DatasetURI
"""
# Extract service information
service = normalized_attributes.get('geospaas_service', 'UNKNOWN')
service_name = normalized_attributes.get('geospaas_service_name', 'UNKNOWN')
return {
'dataset': dataset,
'uri': uri,
'name': service_name,
'service': service
}

def _create_dataset_uri(self, dataset, uri, normalized_attributes):
"""Create a DatasetURI object in the database"""
uri_attributes = self._prepare_dataset_uri_attributes(
dataset, uri, normalized_attributes)
DatasetURI.objects.create(**uri_attributes)
return OperationStatus.CREATED

def _ingest_dataset(self, dataset_info):
"""Writes a dataset to the database based on its attributes and
URL. The input should be a DatasetInfo object.
"""
url = dataset_info.url
normalized_attributes = dataset_info.metadata

dataset_status = dataset_uri_status = OperationStatus.NOOP

try:
dataset_uri = DatasetURI.objects.get(uri=url)
except DatasetURI.DoesNotExist:
dataset_uri = None

try:
dataset = Dataset.objects.get(entry_id=normalized_attributes['entry_id'])
except Dataset.DoesNotExist:
dataset = None

if dataset is None:
dataset, dataset_status = self._create_dataset(normalized_attributes)
else:
if self.update:
dataset_status = self._update_dataset(dataset, normalized_attributes)

if dataset_uri is None:
dataset_uri_status = self._create_dataset_uri(dataset, url, normalized_attributes)

return (url, dataset.entry_id, dataset_status, dataset_uri_status)

def ingest(self, datasets_to_ingest):
"""Iterates over a crawler and writes the datasets to the
database. Database access can be parallelized, although it is
usually not necessary. The bottleneck when harvesting is
generally the crawling or metadata normalization step.
database.
If a KeyboardInterrupt exception occurs (which might mean that
a SIGINT or SIGTERM was received by the process), all scheduled
threads are cancelled. We wait for the currently running
Expand All @@ -140,19 +185,27 @@ def ingest(self, datasets_to_ingest):
dataset_info))
for future in concurrent.futures.as_completed(futures):
try:
url, created_dataset, created_dataset_uri, updated_dataset = future.result()
if created_dataset:
self.logger.info("Successfully created dataset from url: '%s'", url)
if not created_dataset_uri:
url, dataset_entry_id, dataset_status, dataset_uri_status = future.result()
if dataset_status == OperationStatus.CREATED:
self.logger.info("Successfully created dataset '%s' from url: '%s'",
dataset_entry_id, url)
if dataset_uri_status == OperationStatus.NOOP:
# This should only happen if a database problem
# occurred in _ingest_dataset(), because the
# presence of the URI in the database is checked
# before attempting to ingest.
self.logger.warning("The Dataset URI '%s' was not created.", url)
elif created_dataset_uri:
self.logger.info("Dataset URI '%s' added to existing dataset", url)
elif updated_dataset:
self.logger.info("Sucessfully updated dataset for url: '%s'", url)
self.logger.error("The Dataset URI '%s' was not created.", url)
elif dataset_status == OperationStatus.UPDATED:
self.logger.info("Sucessfully updated dataset '%s' from url: '%s'",
dataset_entry_id, url)
elif dataset_status == OperationStatus.NOOP:
if dataset_uri_status == OperationStatus.CREATED:
self.logger.info("Dataset URI '%s' added to existing dataset '%s'",
url, dataset_entry_id)
elif dataset_uri_status == OperationStatus.NOOP:
self.logger.info("Dataset '%s' with URI '%s' already exists",
dataset_entry_id, url)

except Exception as error: # pylint: disable=broad-except
self.logger.error("Error during ingestion: %s", str(error), exc_info=True)
finally:
Expand Down

0 comments on commit 8ba6ded

Please sign in to comment.