@@ -169,6 +169,23 @@ def test_identify_new_objects_when_job_already_queued(
169
169
assert f'{ TEST_OBJECT_KEY } has already been queued for ingestion' in caplog .text
170
170
mock_scheduler .assert_not_called ()
171
171
172
+ def test_identify_new_objects_when_job_is_running (
173
+ self ,
174
+ identification_task ,
175
+ mock_scheduler ,
176
+ caplog ,
177
+ ):
178
+ with (
179
+ mock .patch .object (
180
+ S3ObjectProcessor , 'get_most_recent_object' , return_value = TEST_OBJECT_KEY ,
181
+ ),
182
+ mock .patch .object (QueueChecker , 'is_job_running' , return_value = True ),
183
+ caplog .at_level (logging .INFO ),
184
+ ):
185
+ identification_task .identify_new_objects (base_ingestion_task )
186
+ assert f'{ TEST_OBJECT_KEY } is currently being ingested' in caplog .text
187
+ mock_scheduler .assert_not_called ()
188
+
172
189
def test_identify_new_objects_when_object_already_ingested (
173
190
self , identification_task , mock_scheduler , caplog ,
174
191
):
@@ -242,6 +259,43 @@ def test_ingest_object_raises_not_implemented_error(
242
259
assert 'Please override the process_record method and tailor to your use case.' \
243
260
in caplog .text
244
261
262
+ def test_ingest_object_increments_skipped_counter (
263
+ self , s3_object_processor , caplog , ingestion_task ,
264
+ ):
265
+ object_definition = (TEST_OBJECT_KEY , compressed_json_faker ([
266
+ {'modified' : '2024-12-05T10:00:00Z' , 'data' : 'content' },
267
+ ]))
268
+ upload_objects_to_s3 (s3_object_processor , [object_definition ])
269
+ assert ingestion_task .skipped_counter == 0
270
+ with (
271
+ mock .patch .object (ingestion_task , '_should_process_record' , return_value = False ),
272
+ caplog .at_level (logging .INFO ),
273
+ ):
274
+ ingestion_task .ingest_object ()
275
+ assert ingestion_task .skipped_counter == 1
276
+
277
+ def test_ingest_object_calls_additional_methods (
278
+ self , s3_object_processor , caplog , ingestion_task ,
279
+ ):
280
+ """Test that _create_ingested_object_instance and _log_ingestion_metrics are called."""
281
+ object_definition = (TEST_OBJECT_KEY , compressed_json_faker ([
282
+ {'modified' : '2024-12-05T10:00:00Z' , 'data' : 'content' },
283
+ ]))
284
+ upload_objects_to_s3 (s3_object_processor , [object_definition ])
285
+ with (
286
+ mock .patch .object (ingestion_task , '_process_record' , return_value = None ),
287
+ # TODO: explore mocking the methods to assert they've been called
288
+ # mock.patch.object(ingestion_task, '_create_ingested_object_instance') \
289
+ # as mock_create_ingested_object,
290
+ # mock.patch.object(ingestion_task, '_log_ingestion_metrics') as mock_log_ingestion,
291
+ caplog .at_level (logging .INFO ),
292
+ ):
293
+ ingestion_task .ingest_object ()
294
+ # assert mock_create_ingested_object.assert_called_once()
295
+ # assert mock_log_ingestion.assert_called()
296
+ assert f'IngestObject instance created for { TEST_OBJECT_KEY } ' in caplog .text
297
+ assert f'{ ingestion_task .object_key } ingested.' in caplog .text
298
+
245
299
def test_get_record_from_line (self , ingestion_task ):
246
300
deserialized_line = {'data' : 'content' }
247
301
record = ingestion_task ._get_record_from_line (deserialized_line )
0 commit comments