Skip to content

Commit e5396bb

Browse files
authored
feature/temp-cache-last-processed-file-to-avoid-repeat-processing (#5856)
* Store last file processed in the cache * logging updates * Switch to using the IngestedFile table for tracking ingested files * switch to use the generic table
1 parent cc6eac8 commit e5396bb

File tree

2 files changed

+45
-3
lines changed

2 files changed

+45
-3
lines changed

datahub/company/tasks/contact.py

+16-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
import environ
88
import requests
9+
910
from dateutil import parser
1011
from django.conf import settings
1112
from django.core.exceptions import ImproperlyConfigured
@@ -23,6 +24,7 @@
2324
from datahub.core.queues.job_scheduler import job_scheduler
2425
from datahub.core.queues.scheduler import LONG_RUNNING_QUEUE
2526
from datahub.core.realtime_messaging import send_realtime_message
27+
from datahub.ingest.models import IngestedObject
2628

2729

2830
logger = logging.getLogger(__name__)
@@ -206,8 +208,16 @@ def ingest(self):
206208
)
207209
return
208210

211+
if IngestedObject.objects.filter(object_key=file_key).exists():
212+
logger.info(
213+
'File %s has already been processed',
214+
file_key,
215+
)
216+
return
217+
209218
try:
210219
self.sync_file_with_database(s3_client, file_key)
220+
IngestedObject.objects.create(object_key=file_key)
211221
except Exception as exc:
212222
logger.exception(
213223
f'Error ingesting contact consent file {file_key}',
@@ -313,7 +323,12 @@ def sync_file_with_database(self, client, file_key):
313323
consent_data=contact.consent_data,
314324
consent_data_last_modified=contact.consent_data_last_modified,
315325
)
316-
logger.info('Updated contact consent data for email %s in row %s', email, i)
326+
logger.info(
327+
'Updated consent data for contact id %s with email %s in file row %s',
328+
contact.id,
329+
email,
330+
i,
331+
)
317332

318333
logger.info(
319334
'Finished processing total %s rows for contact consent from file %s',

datahub/company/test/tasks/test_contact_task.py

+29-2
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
from datahub.company.test.factories import CompanyFactory, ContactFactory
3636
from datahub.core.queues.errors import RetryError
3737
from datahub.core.test_utils import HawkMockJSONResponse
38+
from datahub.ingest.models import IngestedObject
39+
from datahub.ingest.test.factories import IngestedObjectFactory
3840

3941

4042
def generate_hawk_response(payload):
@@ -527,11 +529,35 @@ def test_ingest_with_empty_s3_bucket_does_not_call_sync(self):
527529

528530
@mock_aws
529531
@override_settings(S3_LOCAL_ENDPOINT_URL=None)
530-
def test_ingest_calls_sync_with_newest_file_order(self, test_files):
532+
def test_ingest_with_newest_file_key_equal_to_existing_file_key_does_not_call_sync(
533+
self,
534+
test_files,
535+
):
536+
"""
537+
Test that the task returns when the latest file is equal to an existing ingested file
538+
"""
539+
setup_s3_bucket(BUCKET, test_files)
540+
IngestedObjectFactory(object_key=test_files[-1])
541+
task = ContactConsentIngestionTask()
542+
with mock.patch.multiple(
543+
task,
544+
sync_file_with_database=mock.DEFAULT,
545+
):
546+
task.ingest()
547+
task.sync_file_with_database.assert_not_called()
548+
549+
@mock_aws
550+
@override_settings(S3_LOCAL_ENDPOINT_URL=None)
551+
def test_ingest_calls_sync_with_newest_file_when_file_is_new(
552+
self,
553+
test_files,
554+
):
531555
"""
532-
Test that the ingest calls the sync with the files in correct order
556+
Test that the ingest calls the sync with the latest file when the file key does
557+
not exist in the list of previously ingested files
533558
"""
534559
setup_s3_bucket(BUCKET, test_files)
560+
IngestedObjectFactory()
535561
task = ContactConsentIngestionTask()
536562
with mock.patch.multiple(
537563
task,
@@ -542,6 +568,7 @@ def test_ingest_calls_sync_with_newest_file_order(self, test_files):
542568
mock.ANY,
543569
test_files[-1],
544570
)
571+
assert IngestedObject.objects.filter(object_key=test_files[-1]).exists()
545572

546573
@mock_aws
547574
def test_sync_file_without_contacts_stops_job_processing(self):

0 commit comments

Comments
 (0)