Skip to content

Commit d0d65e3

Browse files
Fix list_objects and get_most_recent_object_key methods
The `list_objects` method now returns a list of each object's metadata. The `get_most_recent_object_key` method now returns the key of the object with the most recent `LastModified` datetime.
1 parent f8f4617 commit d0d65e3

File tree

4 files changed

+51
-23
lines changed

4 files changed

+51
-23
lines changed

datahub/ingest/boto3.py

+20-6
Original file line numberDiff line numberDiff line change
@@ -43,17 +43,31 @@ def __init__(
4343
self.s3_client = s3_client
4444

4545
def list_objects(self) -> list[str]:
46-
"""Returns a list of all objects with specified prefix."""
46+
"""Returns a list of metadata about each object located at self.prefix.
47+
48+
From the boto3 docs, the metadata will contain the following:
49+
- Key (string) - the name that you assign the object
50+
- LastModified (datetime) - creation date of the object
51+
- ETag (string) - entity tag; a hash of the object
52+
- ChecksumAlgorithm (list) - the algorithm used to create a checksum of the object
53+
- Size (integer) - size in bytes of the object
54+
- StorageClass (string) - the class of storage used to store the object
55+
- Owner (dict) - the owner of the object
56+
- RestoreStatus (dict) - the restoration status of the the object
57+
"""
4758
response = self.s3_client.list_objects(
4859
Bucket=self.bucket,
4960
Prefix=self.prefix,
5061
)
51-
return [obj.get('Key') for obj in response.get('Contents', [])]
62+
return response.get('Contents', [])
5263

53-
def get_most_recent_object(self) -> str:
54-
"""Return the most recent object in the given bucket/prefix."""
55-
objs = self.list_objects()
56-
return max(objs) if objs else None
64+
def get_most_recent_object_key(self) -> str:
65+
"""Return the most recent object's key in the self.bucket at self.prefix."""
66+
objects = self.list_objects()
67+
if not objects:
68+
return None
69+
most_recent_object = max(objects, key=lambda x: x['LastModified'])
70+
return most_recent_object['Key']
5771

5872
def get_object_last_modified_datetime(self, object_key: str) -> datetime:
5973
"""Get last modified datetime of a specific object."""

datahub/ingest/tasks.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ def __init__(self, prefix: str):
6464

6565
def identify_new_objects(self, ingestion_task_function: callable) -> None:
6666
"""Entry point method to identify new objects and, if valid, schedule their ingestion."""
67-
latest_object_key = self.s3_processor.get_most_recent_object()
67+
latest_object_key = self.s3_processor.get_most_recent_object_key()
6868

6969
if not latest_object_key:
7070
logger.info('No objects found')

datahub/ingest/test/test_boto3.py

+25-11
Original file line numberDiff line numberDiff line change
@@ -55,17 +55,31 @@ def test_list_objects_with_objects_in_bucket(self, s3_object_processor, test_obj
5555
upload_objects_to_s3(s3_object_processor, test_object_tuples)
5656
objects = s3_object_processor.list_objects()
5757
assert len(objects) == 3
58-
assert all(key.startswith(TEST_PREFIX) for key in objects)
59-
assert all(key.endswith('.jsonl.gz') for key in objects)
60-
61-
def test_get_most_recent_object(self, s3_object_processor, test_object_tuples):
62-
upload_objects_to_s3(s3_object_processor, test_object_tuples)
63-
most_recent_object = s3_object_processor.get_most_recent_object()
64-
assert most_recent_object == f'{TEST_PREFIX}file_c.jsonl.gz'
65-
66-
def test_get_most_recent_object_when_empty(self, s3_object_processor):
67-
most_recent_object = s3_object_processor.get_most_recent_object()
68-
assert most_recent_object is None
58+
assert all(obj['Key'].startswith(TEST_PREFIX) for obj in objects)
59+
assert all(obj['Key'].endswith('.jsonl.gz') for obj in objects)
60+
61+
def test_get_most_recent_object_key(self, s3_object_processor, test_object_tuples):
62+
object_content = compressed_json_faker([{'test': 'content'}])
63+
object_definitions = {
64+
'oldest': (f'{TEST_PREFIX}/oldest-object.json.gz', object_content),
65+
'middle': (f'{TEST_PREFIX}/middle-object.json.gz', object_content),
66+
'newest': (f'{TEST_PREFIX}/newest-object.json.gz', object_content),
67+
}
68+
monday = datetime(2024, 12, 9, 10, 0, 0, tzinfo=timezone.utc)
69+
tuesday = datetime(2024, 12, 10, 10, 0, 0, tzinfo=timezone.utc)
70+
wednesday = datetime(2024, 12, 11, 10, 0, 0, tzinfo=timezone.utc)
71+
with freeze_time(monday) as frozen_datetime:
72+
upload_objects_to_s3(s3_object_processor, [object_definitions['oldest']])
73+
frozen_datetime.move_to(tuesday)
74+
upload_objects_to_s3(s3_object_processor, [object_definitions['middle']])
75+
frozen_datetime.move_to(wednesday)
76+
upload_objects_to_s3(s3_object_processor, [object_definitions['newest']])
77+
most_recent_object_key = s3_object_processor.get_most_recent_object_key()
78+
assert most_recent_object_key == f'{TEST_PREFIX}/newest-object.json.gz'
79+
80+
def test_get_most_recent_object_key_when_empty(self, s3_object_processor):
81+
most_recent_object_key = s3_object_processor.get_most_recent_object_key()
82+
assert most_recent_object_key is None
6983

7084
def test_get_object_last_modified_datetime(self, s3_object_processor):
7185
object_definition = (

datahub/ingest/test/test_tasks.py

+5-5
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ def test_identify_new_objects_when_no_objects_found(
145145
self, identification_task, mock_scheduler, caplog,
146146
):
147147
with (
148-
mock.patch.object(S3ObjectProcessor, 'get_most_recent_object', return_value=None),
148+
mock.patch.object(S3ObjectProcessor, 'get_most_recent_object_key', return_value=None),
149149
caplog.at_level(logging.INFO),
150150
):
151151
identification_task.identify_new_objects(base_ingestion_task)
@@ -160,7 +160,7 @@ def test_identify_new_objects_when_job_already_queued(
160160
):
161161
with (
162162
mock.patch.object(
163-
S3ObjectProcessor, 'get_most_recent_object', return_value=TEST_OBJECT_KEY,
163+
S3ObjectProcessor, 'get_most_recent_object_key', return_value=TEST_OBJECT_KEY,
164164
),
165165
mock.patch.object(QueueChecker, 'is_job_queued', return_value=True),
166166
caplog.at_level(logging.INFO),
@@ -177,7 +177,7 @@ def test_identify_new_objects_when_job_is_running(
177177
):
178178
with (
179179
mock.patch.object(
180-
S3ObjectProcessor, 'get_most_recent_object', return_value=TEST_OBJECT_KEY,
180+
S3ObjectProcessor, 'get_most_recent_object_key', return_value=TEST_OBJECT_KEY,
181181
),
182182
mock.patch.object(QueueChecker, 'is_job_running', return_value=True),
183183
caplog.at_level(logging.INFO),
@@ -191,7 +191,7 @@ def test_identify_new_objects_when_object_already_ingested(
191191
):
192192
with (
193193
mock.patch.object(
194-
S3ObjectProcessor, 'get_most_recent_object', return_value=TEST_OBJECT_KEY,
194+
S3ObjectProcessor, 'get_most_recent_object_key', return_value=TEST_OBJECT_KEY,
195195
),
196196
mock.patch.object(S3ObjectProcessor, 'has_object_been_ingested', return_value=True),
197197
caplog.at_level(logging.INFO),
@@ -205,7 +205,7 @@ def test_identify_new_objects_schedules_ingestion_task(
205205
):
206206
with (
207207
mock.patch.object(
208-
S3ObjectProcessor, 'get_most_recent_object', return_value=TEST_OBJECT_KEY,
208+
S3ObjectProcessor, 'get_most_recent_object_key', return_value=TEST_OBJECT_KEY,
209209
),
210210
mock.patch.object(S3ObjectProcessor, 'has_object_been_ingested', return_value=False),
211211
caplog.at_level(logging.INFO),

0 commit comments

Comments
 (0)