Skip to content

Commit ac49666

Browse files
committed
Add try catch to rows_processor
1 parent f235715 commit ac49666

File tree

1 file changed

+46
-31
lines changed

1 file changed

+46
-31
lines changed

bcodmo_frictionless/bcodmo_pipeline_processors/dump_to_s3.py

+46-31
Original file line numberDiff line numberDiff line change
@@ -190,39 +190,54 @@ def handle_datapackage(self):
190190
super(S3Dumper, self).handle_datapackage()
191191

192192
def rows_processor(self, resource, writer, stream):
193-
for row in resource:
194-
writer.write_row(row)
195-
yield row
196-
writer.finalize_file()
197-
198-
# Get resource descriptor
199-
resource_descriptor = resource.res.descriptor
200-
for descriptor in self.datapackage.descriptor["resources"]:
201-
if descriptor["name"] == resource.res.descriptor["name"]:
202-
resource_descriptor = descriptor
203-
204-
# File Hash:
205-
if self.resource_hash:
206-
hasher = S3Dumper.hash_handler(stream)
207-
# Update path with hash
208-
if self.add_filehash_to_path:
209-
DumperBase.insert_hash_in_path(resource_descriptor, hasher.hexdigest())
210-
DumperBase.set_attr(
211-
resource_descriptor, self.resource_hash, hasher.hexdigest()
212-
)
193+
row_number = None
194+
try:
195+
row_number = 0
196+
for row in resource:
197+
row_number += 1
198+
writer.write_row(row)
199+
yield row
200+
row_number = None
201+
writer.finalize_file()
202+
203+
# Get resource descriptor
204+
resource_descriptor = resource.res.descriptor
205+
for descriptor in self.datapackage.descriptor["resources"]:
206+
if descriptor["name"] == resource.res.descriptor["name"]:
207+
resource_descriptor = descriptor
208+
209+
# File Hash:
210+
if self.resource_hash:
211+
hasher = S3Dumper.hash_handler(stream)
212+
# Update path with hash
213+
if self.add_filehash_to_path:
214+
DumperBase.insert_hash_in_path(
215+
resource_descriptor, hasher.hexdigest()
216+
)
217+
DumperBase.set_attr(
218+
resource_descriptor, self.resource_hash, hasher.hexdigest()
219+
)
213220

214-
# Finalise
215-
stream.seek(0)
216-
_, filesize = self.write_file_to_output(
217-
stream.read().encode(), resource.res.source, "text/csv"
218-
)
219-
stream.close()
221+
# Finalise
222+
stream.seek(0)
223+
_, filesize = self.write_file_to_output(
224+
stream.read().encode(), resource.res.source, "text/csv"
225+
)
226+
stream.close()
220227

221-
# Update filesize
222-
DumperBase.inc_attr(
223-
self.datapackage.descriptor, self.datapackage_bytes, filesize
224-
)
225-
DumperBase.inc_attr(resource_descriptor, self.resource_bytes, filesize)
228+
# Update filesize
229+
DumperBase.inc_attr(
230+
self.datapackage.descriptor, self.datapackage_bytes, filesize
231+
)
232+
DumperBase.inc_attr(resource_descriptor, self.resource_bytes, filesize)
233+
except Exception as e:
234+
row_number_text = ""
235+
if row_number is not None:
236+
row_number_text = f" - occured at line # {row_number}"
237+
238+
raise type(e)(
239+
f"{str(e)} - occured at resource {resource.res.descriptor['name']}{row_number_text}"
240+
) from e
226241

227242
def process_resource(self, resource):
228243
if resource.res.name in self.file_formatters:

0 commit comments

Comments
 (0)