diff --git a/geospaas_harvesting/ingesters.py b/geospaas_harvesting/ingesters.py index 25b1df6..773a896 100644 --- a/geospaas_harvesting/ingesters.py +++ b/geospaas_harvesting/ingesters.py @@ -3,6 +3,7 @@ """ import concurrent.futures import logging +from enum import Enum from django.contrib.gis.geos import GEOSGeometry @@ -15,6 +16,13 @@ logging.getLogger(__name__).addHandler(logging.NullHandler()) +class OperationStatus(Enum): + NOOP = 0 + CREATED = 1 + UPDATED = 2 + REMOVED = 3 + + class Ingester(): """Takes care of ingesting the output of a crawler to the database """ @@ -32,80 +40,70 @@ def _uri_exists(uri): """Checks if the given URI already exists in the database""" return DatasetURI.objects.filter(uri=uri).exists() - def _ingest_dataset(self, dataset_info): - """Writes a dataset to the database based on its attributes and - URL. The input should be a DatasetInfo object. - """ - url = dataset_info.url - normalized_attributes = dataset_info.metadata - - created_dataset = created_dataset_uri = updated_dataset = False - - try: - dataset_uri = DatasetURI.objects.get(uri=url) - except DatasetURI.DoesNotExist: - dataset_uri = None - - if dataset_uri and not self.update: - self.logger.info( - "'%s' is already present in the database", url) - return (url, created_dataset, created_dataset_uri, updated_dataset) - - # Extract service information - service = normalized_attributes.pop('geospaas_service', 'UNKNOWN') - service_name = normalized_attributes.pop('geospaas_service_name', 'UNKNOWN') - + @staticmethod + def _prepare_dataset_attributes(normalized_attributes): + """Prepares the attributes needed to instantiate a Dataset""" # Create the objects with which the dataset has relationships # (or get them if they already exist) data_center, _ = DataCenter.objects.get_or_create( - normalized_attributes.pop('provider')) + normalized_attributes.get('provider')) - location_geometry = normalized_attributes.pop('location_geometry') + location_geometry = normalized_attributes.get('location_geometry') geographic_location, _ = GeographicLocation.objects.get_or_create( geometry=GEOSGeometry(location_geometry)) - location, _ = Location.objects.get_or_create(normalized_attributes.pop('gcmd_location')) + location, _ = Location.objects.get_or_create(normalized_attributes.get('gcmd_location')) iso_topic_category, _ = ISOTopicCategory.objects.get_or_create( - normalized_attributes.pop('iso_topic_category')) + normalized_attributes.get('iso_topic_category')) - platform, _ = Platform.objects.get_or_create(normalized_attributes.pop('platform')) + platform, _ = Platform.objects.get_or_create(normalized_attributes.get('platform')) instrument, _ = Instrument.objects.get_or_create( - normalized_attributes.pop('instrument')) + normalized_attributes.get('instrument')) source, _ = Source.objects.get_or_create( platform=platform, instrument=instrument, - specs=normalized_attributes.pop('specs', '')) - dataset_parameters_list = normalized_attributes.pop('dataset_parameters') - # Create Dataset in the database. The normalized_attributes dict contains the - # "basic parameter", which are not objects in the database. - # The objects we just created in the database are passed separately. + specs=normalized_attributes.get('specs', '')) + dataset_parameters_list = normalized_attributes.get('dataset_parameters') + attributes = { + 'entry_title': normalized_attributes['entry_title'], + 'entry_id': normalized_attributes['entry_id'], + 'summary': normalized_attributes['summary'], + 'time_coverage_start': normalized_attributes['time_coverage_start'], + 'time_coverage_end': normalized_attributes['time_coverage_end'], 'data_center': data_center, 'geographic_location': geographic_location, 'gcmd_location': location, 'ISO_topic_category': iso_topic_category, 'source': source, - **normalized_attributes, } - # TODO: improve, in particular detect when changes are made - if dataset_uri: - queryset = Dataset.objects.filter(id=dataset_uri.dataset.id) - queryset.update(**attributes) - dataset = queryset.first() - updated_dataset = True - else: - dataset, created_dataset = Dataset.objects.get_or_create(**attributes) - # Create the URI for the created Dataset in the database - _, created_dataset_uri = DatasetURI.objects.get_or_create( - name=service_name, - service=service, - uri=url, - dataset=dataset) - - for dataset_parameter_info in dataset_parameters_list: + return attributes, dataset_parameters_list + + @classmethod + def _create_dataset(cls, normalized_attributes): + """Create a Dataset object in the database""" + dataset_attributes, dataset_parameters_list = ( + cls._prepare_dataset_attributes(normalized_attributes)) + dataset = Dataset.objects.create(**dataset_attributes) + cls._add_dataset_parameters(dataset, dataset_parameters_list) + return (dataset, OperationStatus.CREATED) + + @classmethod + def _update_dataset(cls, dataset, normalized_attributes): + """Update an existing Dataset object in the database""" + dataset_attributes, dataset_parameters_list = ( + cls._prepare_dataset_attributes(normalized_attributes)) + Dataset.objects.filter(id=dataset.id).update(**dataset_attributes) + cls._add_dataset_parameters(dataset, dataset_parameters_list) + return OperationStatus.UPDATED + + @staticmethod + def _add_dataset_parameters(dataset, parameters_list): + """""" + for dataset_parameter_info in parameters_list: standard_name = dataset_parameter_info.get('standard_name', None) short_name = dataset_parameter_info.get('short_name', None) units = dataset_parameter_info.get('units', None) @@ -119,13 +117,60 @@ def _ingest_dataset(self, dataset_info): if params.count() >= 1: dataset.parameters.add(params[0]) - return (url, created_dataset, created_dataset_uri, updated_dataset) + @staticmethod + def _prepare_dataset_uri_attributes(dataset, uri, normalized_attributes): + """Prepares the attributes needed to instantiate a DatasetURI + """ + # Extract service information + service = normalized_attributes.get('geospaas_service', 'UNKNOWN') + service_name = normalized_attributes.get('geospaas_service_name', 'UNKNOWN') + return { + 'dataset': dataset, + 'uri': uri, + 'name': service_name, + 'service': service + } + + def _create_dataset_uri(self, dataset, uri, normalized_attributes): + """Create a DatasetURI object in the database""" + uri_attributes = self._prepare_dataset_uri_attributes( + dataset, uri, normalized_attributes) + DatasetURI.objects.create(**uri_attributes) + return OperationStatus.CREATED + + def _ingest_dataset(self, dataset_info): + """Writes a dataset to the database based on its attributes and + URL. The input should be a DatasetInfo object. + """ + url = dataset_info.url + normalized_attributes = dataset_info.metadata + + dataset_status = dataset_uri_status = OperationStatus.NOOP + + try: + dataset_uri = DatasetURI.objects.get(uri=url) + except DatasetURI.DoesNotExist: + dataset_uri = None + + try: + dataset = Dataset.objects.get(entry_id=normalized_attributes['entry_id']) + except Dataset.DoesNotExist: + dataset = None + + if dataset is None: + dataset, dataset_status = self._create_dataset(normalized_attributes) + else: + if self.update: + dataset_status = self._update_dataset(dataset, normalized_attributes) + + if dataset_uri is None: + dataset_uri_status = self._create_dataset_uri(dataset, url, normalized_attributes) + + return (url, dataset.entry_id, dataset_status, dataset_uri_status) def ingest(self, datasets_to_ingest): """Iterates over a crawler and writes the datasets to the - database. Database access can be parallelized, although it is - usually not necessary. The bottleneck when harvesting is - generally the crawling or metadata normalization step. + database. If a KeyboardInterrupt exception occurs (which might mean that a SIGINT or SIGTERM was received by the process), all scheduled threads are cancelled. We wait for the currently running @@ -140,19 +185,27 @@ def ingest(self, datasets_to_ingest): dataset_info)) for future in concurrent.futures.as_completed(futures): try: - url, created_dataset, created_dataset_uri, updated_dataset = future.result() - if created_dataset: - self.logger.info("Successfully created dataset from url: '%s'", url) - if not created_dataset_uri: + url, dataset_entry_id, dataset_status, dataset_uri_status = future.result() + if dataset_status == OperationStatus.CREATED: + self.logger.info("Successfully created dataset '%s' from url: '%s'", + dataset_entry_id, url) + if dataset_uri_status == OperationStatus.NOOP: # This should only happen if a database problem # occurred in _ingest_dataset(), because the # presence of the URI in the database is checked # before attempting to ingest. - self.logger.warning("The Dataset URI '%s' was not created.", url) - elif created_dataset_uri: - self.logger.info("Dataset URI '%s' added to existing dataset", url) - elif updated_dataset: - self.logger.info("Sucessfully updated dataset for url: '%s'", url) + self.logger.error("The Dataset URI '%s' was not created.", url) + elif dataset_status == OperationStatus.UPDATED: + self.logger.info("Sucessfully updated dataset '%s' from url: '%s'", + dataset_entry_id, url) + elif dataset_status == OperationStatus.NOOP: + if dataset_uri_status == OperationStatus.CREATED: + self.logger.info("Dataset URI '%s' added to existing dataset '%s'", + url, dataset_entry_id) + elif dataset_uri_status == OperationStatus.NOOP: + self.logger.info("Dataset '%s' with URI '%s' already exists", + dataset_entry_id, url) + except Exception as error: # pylint: disable=broad-except self.logger.error("Error during ingestion: %s", str(error), exc_info=True) finally: