Skip to content

Commit 889c647

Browse files
committed
Fix tests, bump dataflows, add infer and cast strategy default, remove print statements
1 parent f2e3415 commit 889c647

File tree

9 files changed

+194
-86
lines changed

9 files changed

+194
-86
lines changed

bcodmo_frictionless/bcodmo_pipeline_processors/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,4 @@
2121
from .set_types import flow as set_types
2222
from .edit_cells import flow as edit_cells
2323

24-
from .join import flow as join
24+
# from .join import flow as join

bcodmo_frictionless/bcodmo_pipeline_processors/dump_to_s3.py

+5-33
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
from billiard import Process, Queue, Pool
1515

1616
from dataflows.processors.dumpers.dumper_base import DumperBase
17-
from dataflows.processors.dumpers.file_formats import CSVFormat, JSONFormat, FileFormat
17+
from dataflows.processors.dumpers.formats import CSVFormat, JSONFormat, FileFormat
1818
from bcodmo_frictionless.bcodmo_pipeline_processors.helper import (
1919
get_redis_progress_key,
2020
get_redis_progress_resource_key,
@@ -25,6 +25,7 @@
2525
REDIS_PROGRESS_SAVING_DONE_FLAG,
2626
REDIS_EXPIRES,
2727
)
28+
from bcodmo_frictionless.bcodmo_pipeline_processors.helper import get_missing_values
2829

2930
WINDOWS_LINE_ENDING = b"\r\n"
3031
UNIX_LINE_ENDING = b"\n"
@@ -543,63 +544,33 @@ def rows_processor(self, resource, writer, stream):
543544
row_number = None
544545
upload_id = None
545546

546-
writer_timer_sum = 0
547-
process_timer_sum = 0
548-
process_timer_count = 0
549-
process_timer = None
550-
redis_timer_sum = 0
551-
552547
try:
553548
row_number = 0
554549
part_number = 0
555-
start = time.time()
556550
timer = time.time()
557551

558-
yield_start = time.time()
559-
yield_total = 0
560-
async_total = 0
561-
loop_total = 0
562552
for row in resource:
563-
loop_total += time.time() - yield_start
564-
565553
row_number += 1
566554

567-
writer_timer = time.time()
568555
writer.write_row(row)
569-
writer_timer_sum += time.time() - writer_timer
570556

571-
redis_timer = time.time()
572557
if redis_conn is not None and time.time() - timer > 0.75:
573558
redis_conn.set(progress_key, row_number, ex=REDIS_EXPIRES)
574559
timer = time.time()
575-
redis_timer_sum += time.time() - redis_timer
576560

577-
async_start = time.time()
578-
if row_number % 25 == 0 and stream.tell() > calculate_partsize(
561+
if row_number % 100 == 0 and stream.tell() > calculate_partsize(
579562
part_number
580563
):
581564
part_number, upload_id, writer, stream = self.async_write_part(
582565
stream, resource, part_number, object_key, upload_id, False
583566
)
584-
async_total += time.time() - async_start
585-
586-
yield_total += time.time() - yield_start
587-
if (row_number + 1) % 10000 == 0:
588-
# print(
589-
# f"total {yield_total}. async {async_total}. redis {redis_timer_sum}. writer {writer_timer_sum}. loop {loop_total}"
590-
# )
591-
yield_total = 0
592-
async_total = 0
593-
redis_timer_sum = 0
594-
writer_timer_sum = 0
595-
loop_total = 0
567+
596568
if (
597569
self.limit_yield is None
598570
or self.limit_yield < 0
599571
or row_number <= self.limit_yield
600572
):
601573
yield row
602-
yield_start = time.time()
603574
# Set row number values
604575
DumperBase.inc_attr(
605576
self.datapackage.descriptor, self.datapackage_rowcount, row_number
@@ -613,6 +584,7 @@ def rows_processor(self, resource, writer, stream):
613584
row_number = None
614585
writer.finalize_file()
615586
# Upload final part
587+
print("Starting last upload")
616588
part_number, _, _, stream = self.async_write_part(
617589
stream, resource, part_number, object_key, upload_id, True
618590
)

bcodmo_frictionless/bcodmo_pipeline_processors/load.py

+4
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,10 @@ def process_resource(rows, missing_data_values):
354354
ex=REDIS_EXPIRES,
355355
)
356356

357+
# Default all infer and cast strategy to string to handle an update from dataflows that deprecates old pipelines
358+
# https://bco-dmo-group.slack.com/archives/CSQ582V4Y/p1712063770616059
359+
parameters["infer_strategy"] = "strings"
360+
parameters["cast_strategy"] = "strings"
357361
params.extend(
358362
[
359363
count_resources(),

data/test_multiline_header.csv

+6
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
1,2,3,4,5
2+
5,4,3,2,1
3+
abc,1,1.532,12/29/19,1
4+
abc,2,35.131,12/30/19,1
5+
def,1,53.1,12/31/19,1
6+
ghi,3,54262.5,01/01/20,1

setup.py

+1-2
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,10 @@
55
"datapackage-pipelines==2.2.6",
66
# "datapackage-pipelines @ git+https://github.com/frictionlessdata/datapackage-pipelines.git@d78d1391adf6470ca484303e512e038f7dc57483",
77
"pyparsing==3.1.2",
8-
"dataflows==0.3.1",
8+
"dataflows==0.5.5",
99
# "dataflows @ git+https://github.com/cschloer/dataflows.git@master",
1010
# "tabulator==1.53.5",
1111
"tabulator @ git+https://github.com/BCODMO/tabulator-py.git@main",
12-
"tableschema==1.16.4",
1312
"goodtables==2.5.0",
1413
"python-dateutil==2.8.0",
1514
"xlrd==1.2.0",

tests/processors/test_dump_to_s3.py

+4
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ def test_dump_s3():
3838
"from": "s3://testing_bucket/test.csv",
3939
"name": "res",
4040
"format": "csv",
41+
"infer_strategy": "strings",
42+
"cast_strategy": "strings",
4143
}
4244
),
4345
dump_to_s3(
@@ -73,6 +75,8 @@ def test_dump_s3_hash():
7375
"from": "s3://testing_bucket/test.csv",
7476
"name": "res",
7577
"format": "csv",
78+
"infer_strategy": "strings",
79+
"cast_strategy": "strings",
7680
}
7781
),
7882
dump_to_s3(

tests/processors/test_join.py

+19-7
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import pytest
22
import os
3-
from dataflows import Flow
3+
from dataflows import Flow, join
44
from decimal import Decimal
55

66
from bcodmo_frictionless.bcodmo_pipeline_processors import *
@@ -25,6 +25,23 @@ def test_join():
2525
flows = [
2626
data2,
2727
data1,
28+
join(
29+
"res_1",
30+
"{#}",
31+
"res_2",
32+
"{#}",
33+
fields={"col2": {"name": "col2"}},
34+
source_delete=True,
35+
mode="half-outer",
36+
),
37+
]
38+
rows, datapackage, _ = Flow(*flows).results()
39+
print(rows)
40+
assert rows == [
41+
[{"col1": 1, "col2": 1}, {"col1": 2, "col2": 2}, {"col1": 3, "col2": 3}]
42+
]
43+
44+
"""
2845
join({
2946
"source": {
3047
"name": "res_1",
@@ -38,9 +55,4 @@ def test_join():
3855
"fields": {"col2": {"name": "col2"}},
3956
"mode": "half-outer",
4057
}),
41-
]
42-
rows, datapackage, _ = Flow(*flows).results()
43-
print(rows)
44-
assert rows == [
45-
[{"col1": 1, "col2": 1}, {"col1": 2, "col2": 2}, {"col1": 3, "col2": 3}]
46-
]
58+
"""

0 commit comments

Comments
 (0)