|
| 1 | +import logging |
| 2 | + |
| 3 | +from django.core.exceptions import ValidationError |
| 4 | +from django.db import IntegrityError |
| 5 | + |
| 6 | +from datahub.company_activity.models import StovaAttendee |
| 7 | +from datahub.company_activity.tasks.constants import STOVA_ATTENDEE_PREFIX |
| 8 | +from datahub.ingest.boto3 import S3ObjectProcessor |
| 9 | +from datahub.ingest.tasks import BaseObjectIdentificationTask, BaseObjectIngestionTask |
| 10 | + |
| 11 | + |
| 12 | +logger = logging.getLogger(__name__) |
| 13 | +DATE_FORMAT = '%Y-%m-%dT%H:%M:%S.%f' |
| 14 | + |
| 15 | + |
| 16 | +def ingest_stova_attendee_data() -> None: |
| 17 | + """Identifies the most recent file to be ingested and schedules a task to ingest it""" |
| 18 | + logger.info('Stova attendee identification task started.') |
| 19 | + identification_task = StovaAttendeeIndentificationTask(prefix=STOVA_ATTENDEE_PREFIX) |
| 20 | + identification_task.identify_new_objects(stova_attendee_ingestion_task) |
| 21 | + logger.info('Stova attendee identification task finished.') |
| 22 | + |
| 23 | + |
| 24 | +def stova_attendee_ingestion_task(object_key: str) -> None: |
| 25 | + """Ingest the given key (file) from S3""" |
| 26 | + logger.info(f'Stova attendee ingestion task started for file {object_key}.') |
| 27 | + ingestion_task = StovaAttendeeIngestionTask( |
| 28 | + object_key=object_key, |
| 29 | + s3_processor=S3ObjectProcessor(prefix=STOVA_ATTENDEE_PREFIX), |
| 30 | + ) |
| 31 | + ingestion_task.ingest_object() |
| 32 | + logger.info(f'Stova attendee ingestion task finished for file {object_key}.') |
| 33 | + |
| 34 | + |
| 35 | +class StovaAttendeeIndentificationTask(BaseObjectIdentificationTask): |
| 36 | + pass |
| 37 | + |
| 38 | + |
| 39 | +class StovaAttendeeIngestionTask(BaseObjectIngestionTask): |
| 40 | + |
| 41 | + existing_ids = [] |
| 42 | + |
| 43 | + def _process_record(self, record: dict) -> None: |
| 44 | + """Saves an attendee from Stova from the S3 bucket into a `Stovaattendee`""" |
| 45 | + if not self.existing_ids: |
| 46 | + self.existing_ids = set( |
| 47 | + StovaAttendee.objects.values_list('stova_attendee_id', flat=True), |
| 48 | + ) |
| 49 | + |
| 50 | + stova_attendee_id = record.get('id') |
| 51 | + if stova_attendee_id in self.existing_ids: |
| 52 | + logger.info(f'Record already exists for stova_attendee_id: {stova_attendee_id}') |
| 53 | + return |
| 54 | + |
| 55 | + values = { |
| 56 | + 'stova_attendee_id': stova_attendee_id, |
| 57 | + 'stova_event_id': record.get('event_id', ''), |
| 58 | + 'created_date': record.get('created_date'), |
| 59 | + 'email': record.get('email', ''), |
| 60 | + 'first_name': record.get('first_name', ''), |
| 61 | + 'last_name': record.get('last_name', ''), |
| 62 | + 'company_name': record.get('company_name', ''), |
| 63 | + 'category': record.get('category', ''), |
| 64 | + 'registration_status': record.get('registration_status', ''), |
| 65 | + 'created_by': record.get('created_by', ''), |
| 66 | + 'language': record.get('language', ''), |
| 67 | + 'modified_date': record.get('modified_date'), |
| 68 | + 'virtual_event_attendance': record.get('virtual_event_attendance', ''), |
| 69 | + 'last_lobby_login': record.get('last_lobby_login', ''), |
| 70 | + 'attendee_questions': record.get('attendee_questions', ''), |
| 71 | + 'modified_by': record.get('modified_by', ''), |
| 72 | + } |
| 73 | + |
| 74 | + try: |
| 75 | + StovaAttendee.objects.create(**values) |
| 76 | + except IntegrityError as error: |
| 77 | + logger.error( |
| 78 | + f'Error processing Stova attendee record, stova_attendee_id: {stova_attendee_id}. ' |
| 79 | + f'Error: {error}', |
| 80 | + ) |
| 81 | + except ValidationError as error: |
| 82 | + logger.error( |
| 83 | + 'Got unexpected value for a field when processing Stova attendee record, ' |
| 84 | + f'stova_attendee_id: {stova_attendee_id}. ' |
| 85 | + f'Error: {error}', |
| 86 | + ) |
0 commit comments