Skip to content

Commit 67772bc

Browse files
committed
Add delete option to dump_to_s3
1 parent 62ecf7f commit 67772bc

File tree

1 file changed

+29
-8
lines changed

1 file changed

+29
-8
lines changed

bcodmo_frictionless/bcodmo_pipeline_processors/dump_to_s3.py

+29-8
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ def __init__(self, bucket_name, prefix, **options):
9595
self.submission_id = options.get("submission_id", None)
9696
self.submission_ids = options.get("submission_ids", [])
9797
self.cache_id = options.get("cache_id", None)
98+
self.delete = options.get("delete", False)
9899

99100
self.prefix = prefix
100101
self.bucket_name = bucket_name
@@ -115,6 +116,27 @@ def __init__(self, bucket_name, prefix, **options):
115116
else:
116117
logging.warn("Using base boto credentials for S3 Dumper")
117118
self.s3 = boto3.resource("s3")
119+
if self.delete:
120+
s3_client = boto3.client(
121+
"s3",
122+
aws_access_key_id=access_key,
123+
aws_secret_access_key=secret_access_key,
124+
endpoint_url=host,
125+
)
126+
res = s3_client.list_objects_v2(
127+
Bucket=self.bucket_name,
128+
Prefix=self.prefix,
129+
)
130+
if "Contents" in res:
131+
contents = res["Contents"]
132+
if len(contents) >= 10:
133+
raise Exception(
134+
f"Throwing an error from the dump_to_s3 processor because the number of files to be deleted was more than 10. This is a safety measure to ensure we don't accidently more files than expected."
135+
)
136+
s3_client.delete_objects(
137+
Bucket=self.bucket_name,
138+
Delete={"Objects": [{"Key": obj["Key"]} for obj in contents]},
139+
)
118140

119141
def process_datapackage(self, datapackage):
120142
datapackage = super(S3Dumper, self).process_datapackage(datapackage)
@@ -155,10 +177,10 @@ def write_file_to_output(self, contents, path, content_type):
155177
contents = contents.replace(WINDOWS_LINE_ENDING, UNIX_LINE_ENDING)
156178

157179
start = time.time()
158-
print(f"Starting save file {time.time()}")
180+
# print(f"Starting save file {time.time()}")
159181
obj = self.s3.Object(self.bucket_name, obj_name)
160182
obj.put(Body=contents, ContentType=content_type)
161-
print(f"Took {time.time() - start} to save the file ({path})")
183+
# print(f"Took {time.time() - start} to save the file ({path})")
162184

163185
return path, len(contents)
164186

@@ -210,7 +232,6 @@ def rows_processor(self, resource, writer, stream):
210232
redis_conn = None
211233
progress_key = None
212234
resource_name = resource.res.descriptor["name"]
213-
print("RUNNING ROWS PROCESSOR FOR RESOURCE", resource_name)
214235
if self.cache_id:
215236
redis_conn = get_redis_connection()
216237
redis_conn.sadd(
@@ -221,7 +242,7 @@ def rows_processor(self, resource, writer, stream):
221242
progress_key = get_redis_progress_key(resource_name, self.cache_id)
222243

223244
row_number = None
224-
print(f"Received at {time.time()}")
245+
# print(f"Received at {time.time()}")
225246
start1 = time.time()
226247

227248
try:
@@ -240,9 +261,9 @@ def rows_processor(self, resource, writer, stream):
240261
writer.finalize_file()
241262
if redis_conn is not None:
242263
redis_conn.set(progress_key, REDIS_PROGRESS_SAVING_START_FLAG)
243-
print(
244-
f"Finished going through loop at {time.time()}, in {time.time() - start1}"
245-
)
264+
# print(
265+
# f"Finished going through loop at {time.time()}, in {time.time() - start1}"
266+
# )
246267

247268
# Get resource descriptor
248269
resource_descriptor = resource.res.descriptor
@@ -261,7 +282,7 @@ def rows_processor(self, resource, writer, stream):
261282
DumperBase.set_attr(
262283
resource_descriptor, self.resource_hash, hasher.hexdigest()
263284
)
264-
print(f"After hash, starting to write file at {time.time()}")
285+
# print(f"After hash, starting to write file at {time.time()}")
265286

266287
# Finalise
267288
stream.seek(0)

0 commit comments

Comments
 (0)