Skip to content

Commit 2bed658

Browse files
committed
Fix error handling in dump_to_s3
1 parent f3fc1f5 commit 2bed658

File tree

1 file changed

+25
-14
lines changed

1 file changed

+25
-14
lines changed

bcodmo_frictionless/bcodmo_pipeline_processors/dump_to_s3.py

+25-14
Original file line numberDiff line numberDiff line change
@@ -201,11 +201,18 @@ def process_datapackage(self, datapackage):
201201

202202

203203
def handle_datapackage(self):
204+
204205
etags = {}
205206
filesizes = {}
206207
self.pool.close()
207208
self.pool.join()
208209
for resource_name, d in self.procs.items():
210+
redis_conn = None
211+
progress_key = None
212+
if self.cache_id:
213+
redis_conn = get_redis_connection()
214+
progress_key = get_redis_progress_key(resource_name, self.cache_id)
215+
209216

210217
upload_id = d["upload_id"]
211218
procs = d["procs"]
@@ -216,7 +223,7 @@ def handle_datapackage(self):
216223
for proc in procs:
217224
partsize, part, err = proc.get()
218225
if err is not None:
219-
raise err
226+
return self._handle_exception(err, resource_name)
220227
parts.append(part)
221228
filesize += partsize
222229

@@ -237,6 +244,9 @@ def handle_datapackage(self):
237244

238245
filesizes[resource_name] = filesize
239246

247+
if redis_conn is not None:
248+
redis_conn.set(progress_key, REDIS_PROGRESS_SAVING_DONE_FLAG)
249+
240250

241251

242252

@@ -441,6 +451,19 @@ def async_write_part(self, stream, resource, part_number, object_key, upload_id,
441451
writer, stream = self.generate_writer(resource)
442452
return part_number, upload_id, writer, stream
443453

454+
def _handle_exception(self, e, resource_name, row_number=None):
455+
row_number_text = ""
456+
if row_number is not None:
457+
row_number_text = f" - occured at line # {row_number + 1}"
458+
459+
if len(e.args) >= 1:
460+
e.args = (
461+
e.args[0]
462+
+ f"\n\nOccured at resource {resource_name}{row_number_text} in the dump_to_s3 processor",
463+
) + e.args[1:]
464+
raise e
465+
466+
444467

445468
def rows_processor(self, resource, writer, stream):
446469
resource_name = resource.res.descriptor["name"]
@@ -467,9 +490,6 @@ def rows_processor(self, resource, writer, stream):
467490

468491
progress_key = get_redis_progress_key(resource_name, self.cache_id)
469492

470-
def on_complete():
471-
if redis_conn is not None:
472-
redis_conn.set(progress_key, REDIS_PROGRESS_SAVING_DONE_FLAG)
473493

474494
row_number = None
475495
upload_id = None
@@ -546,16 +566,7 @@ def on_complete():
546566

547567
stream.close()
548568
except Exception as e:
549-
row_number_text = ""
550-
if row_number is not None:
551-
row_number_text = f" - occured at line # {row_number + 1}"
552-
553-
if len(e.args) >= 1:
554-
e.args = (
555-
e.args[0]
556-
+ f"\n\nOccured at resource {resource.res.descriptor['name']}{row_number_text}",
557-
) + e.args[1:]
558-
raise e
569+
return self._handle_exception(e, resource_name, row_number=row_number)
559570

560571
def generate_writer(self, resource):
561572
schema = resource.res.schema

0 commit comments

Comments
 (0)