Skip to content

Commit 256b69b

Browse files
committed
Add limit_rows_loader
1 parent a2aef41 commit 256b69b

File tree

2 files changed

+100
-4
lines changed

2 files changed

+100
-4
lines changed

bcodmo_frictionless/bcodmo_pipeline_processors/loaders/bcodmo_aws.py

+88-4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,15 @@
1-
from tabulator.loaders.aws import AWSLoader
1+
from tabulator import Loader
2+
from tabulator import exceptions
3+
from tabulator import helpers
24
from tabulator import config
5+
from six.moves.urllib.parse import urlparse
6+
7+
import time
8+
import os
9+
import io
10+
import boto3
11+
import base64
12+
import zlib
313

414
from bcodmo_frictionless.bcodmo_pipeline_processors.helper import (
515
get_redis_progress_key,
@@ -10,12 +20,13 @@
1020
)
1121

1222

13-
class BcodmoAWS(AWSLoader):
23+
class BcodmoAWS(Loader):
1424
options = [
1525
"s3_endpoint_url",
1626
"loader_cache_id",
1727
"loader_resource_name",
1828
"preloaded_chars",
29+
"_limit_rows",
1930
]
2031

2132
def __init__(
@@ -25,11 +36,84 @@ def __init__(
2536
loader_cache_id=None,
2637
loader_resource_name=None,
2738
preloaded_chars=None,
39+
_limit_rows=None,
2840
):
29-
super(BcodmoAWS, self).__init__(s3_endpoint_url=s3_endpoint_url)
41+
self.__bytes_sample_size = bytes_sample_size
42+
self.__s3_endpoint_url = (
43+
s3_endpoint_url
44+
or os.environ.get("S3_ENDPOINT_URL")
45+
or config.S3_DEFAULT_ENDPOINT_URL
46+
)
47+
self.__s3_client = boto3.client("s3", endpoint_url=self.__s3_endpoint_url)
48+
self.__stats = None
49+
self.encoding = None
50+
3051
self.loader_cache_id = loader_cache_id
3152
self.loader_resource_name = loader_resource_name
3253
self.preloaded_chars = preloaded_chars
54+
self.limit_rows = _limit_rows
55+
56+
def _stream_load(self, source, mode="t", encoding=None):
57+
###
58+
# This is the same as the previous load but allows streaming
59+
###
60+
61+
# Prepare bytes
62+
try:
63+
# print("Not using shared memory")
64+
start = time.time()
65+
parts = urlparse(source, allow_fragments=False)
66+
response = self.__s3_client.get_object(
67+
Bucket=parts.netloc, Key=parts.path[1:]
68+
)
69+
# https://github.com/frictionlessdata/tabulator-py/issues/271
70+
bytes = io.BufferedRandom(io.BytesIO())
71+
row_count = 0
72+
if self.limit_rows:
73+
# We limit the number of rows being streamed
74+
while True:
75+
contents = response["Body"].read(amt=1024)
76+
if contents:
77+
row_count += contents.count(b"\n")
78+
bytes.write(contents)
79+
# To be extra safe, we multiply by two and add 100. This is meant to deal with
80+
# situations where there is a weird header/empty row situation at the beginning of the file
81+
# which will later be filtered by the parser
82+
if row_count >= self.limit_rows * 2 + 100:
83+
break
84+
else:
85+
break
86+
else:
87+
contents = response["Body"].read()
88+
bytes.write(contents)
89+
bytes.seek(0)
90+
try:
91+
print(
92+
f"Took {round(time.time() - start, 3)} to load in {os.path.basename(source)}"
93+
)
94+
except:
95+
pass
96+
97+
if self.__stats:
98+
bytes = helpers.BytesStatsWrapper(bytes, self.__stats)
99+
except Exception as exception:
100+
raise exceptions.LoadingError(str(exception))
101+
102+
# Return bytes
103+
if mode == "b":
104+
return bytes
105+
106+
# Detect encoding
107+
if self.__bytes_sample_size:
108+
sample = bytes.read(self.__bytes_sample_size)
109+
bytes.seek(0)
110+
encoding = helpers.detect_encoding(sample, encoding)
111+
self.encoding = encoding
112+
113+
# Prepare chars
114+
chars = io.TextIOWrapper(bytes, encoding)
115+
116+
return chars
33117

34118
def load(self, source, mode="t", encoding=None):
35119
redis_conn = None
@@ -44,7 +128,7 @@ def load(self, source, mode="t", encoding=None):
44128
ex=REDIS_EXPIRES,
45129
)
46130
if self.preloaded_chars is None:
47-
chars = super(BcodmoAWS, self).load(source, mode=mode, encoding=encoding)
131+
chars = self._stream_load(source, mode=mode, encoding=encoding)
48132
else:
49133
self.encoding = encoding
50134
chars = self.preloaded_chars

bcodmo_frictionless/bcodmo_pipeline_processors/standard_load_multiple.py

+12
Original file line numberDiff line numberDiff line change
@@ -34,12 +34,14 @@ def __init__(
3434
load_sources,
3535
names,
3636
sheets=None,
37+
limit_rows_loader=None,
3738
**options,
3839
):
3940
super(standard_load_multiple, self).__init__("", **options)
4041
self.load_sources = load_sources
4142
self.names = names
4243
self.sheets = sheets
44+
self.limit_rows_loader = limit_rows_loader
4345

4446
def _set_individual(self, i):
4547
load_source = self.load_sources[i]
@@ -176,6 +178,16 @@ def safe_process_datapackage(self, dp: Package):
176178
options = self.options
177179
if self.preloaded_chars is not None:
178180
options["preloaded_chars"] = self.preloaded_chars
181+
182+
# For all formats that don't require being fully loaded in (AKA everything except for xlsx and xls,
183+
# we can limit the rows in the streaming itself
184+
if (
185+
self.limit_rows_loader
186+
and self.limit_rows is not None
187+
and self.options.get("format") != "xlsx"
188+
and self.options.get("format") != "xls"
189+
):
190+
options["_limit_rows"] = self.limit_rows
179191
stream: Stream = Stream(self.load_source, **options).open()
180192
""" Finish change to add preloaded data """
181193

0 commit comments

Comments
 (0)