Skip to content

Commit 9d52d14

Browse files
committed
Updated iter_bucket to use concurrent futures.
This commit addresses issue #340. AWS Lambda environments do not support multiprocessing.Queue or multiprocessing.Pool, which are used by iter_bucket to optimize the pulling of files from s3. Solution: Switch to using concurrent.futures.ThreadPoolExecutor instead. This still optimizes the pulling of files from s3 without using new processes.
1 parent 1770807 commit 9d52d14

File tree

3 files changed

+107
-5
lines changed

3 files changed

+107
-5
lines changed

setup.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ def read(fname):
4141

4242
tests_require = [
4343
'mock',
44-
'moto==1.3.4',
44+
'moto[server]==1.3.4',
4545
'pathlib2',
4646
'responses',
4747
# Temporary pin boto3 & botocore, because moto doesn't work with new version

smart_open/s3.py

+28-2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,16 @@
2121

2222
logger = logging.getLogger(__name__)
2323

24+
# AWS Lambda environments do not support multiprocessing.Queue or multiprocessing.Pool.
25+
# However they do support Threads and therefore concurrent.futures's ThreadPoolExecutor.
26+
# We use this flag to allow python 2 backward compatibility, where concurrent.futures doesn't exist.
27+
_CONCURRENT_FUTURES = False
28+
try:
29+
import concurrent.futures
30+
_CONCURRENT_FUTURES = True
31+
except ImportError:
32+
warnings.warn("concurrent.futures could not be imported and won't be used")
33+
2434
# Multiprocessing is unavailable in App Engine (and possibly other sandboxes).
2535
# The only method currently relying on it is iter_bucket, which is instructed
2636
# whether to use it by the MULTIPROCESSING flag.
@@ -766,10 +776,26 @@ def terminate(self):
766776
pass
767777

768778

779+
class ConcurrentFuturesPool(object):
780+
"""A class that mimics multiprocessing.pool.Pool but uses concurrent futures instead of processes."""
781+
def __init__(self, max_workers):
782+
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers)
783+
784+
def imap_unordered(self, function, items):
785+
futures = [self.executor.submit(function, item) for item in items]
786+
return map(lambda future: future.result(), concurrent.futures.as_completed(futures))
787+
788+
def terminate(self):
789+
self.executor.shutdown(wait=True)
790+
791+
769792
@contextlib.contextmanager
770793
def _create_process_pool(processes=1):
771-
if _MULTIPROCESSING and processes:
772-
logger.info("creating pool with %i workers", processes)
794+
if _CONCURRENT_FUTURES and processes:
795+
logger.info("creating concurrent futures pool with %i workers", processes)
796+
pool = ConcurrentFuturesPool(max_workers=processes)
797+
elif _MULTIPROCESSING and processes:
798+
logger.info("creating multiprocessing pool with %i workers", processes)
773799
pool = multiprocessing.pool.Pool(processes=processes)
774800
else:
775801
logger.info("creating dummy pool")

smart_open/tests/test_s3.py

+78-2
Original file line numberDiff line numberDiff line change
@@ -502,13 +502,89 @@ def test_old(self):
502502
@maybe_mock_s3
503503
class IterBucketSingleProcessTest(unittest.TestCase):
504504
def setUp(self):
505-
self.old_flag = smart_open.s3._MULTIPROCESSING
505+
self.old_flag_multi = smart_open.s3._MULTIPROCESSING
506+
# self.old_flag_concurrent = smart_open.s3._CONCURRENT_FUTURES
506507
smart_open.s3._MULTIPROCESSING = False
508+
# smart_open.s3._CONCURRENT_FUTURES = False
507509

508510
ignore_resource_warnings()
509511

510512
def tearDown(self):
511-
smart_open.s3._MULTIPROCESSING = self.old_flag
513+
smart_open.s3._MULTIPROCESSING = self.old_flag_multi
514+
# smart_open.s3._CONCURRENT_FUTURES = self.old_flag_concurrent
515+
cleanup_bucket()
516+
517+
def test(self):
518+
num_keys = 101
519+
populate_bucket(num_keys=num_keys)
520+
keys = list(smart_open.s3.iter_bucket(BUCKET_NAME))
521+
self.assertEqual(len(keys), num_keys)
522+
523+
expected = [('key_%d' % x, b'%d' % x) for x in range(num_keys)]
524+
self.assertEqual(sorted(keys), sorted(expected))
525+
526+
527+
@maybe_mock_s3
528+
class IterBucketConcurrentFuturesTest(unittest.TestCase):
529+
def setUp(self):
530+
self.old_flag_multi = smart_open.s3._MULTIPROCESSING
531+
smart_open.s3._MULTIPROCESSING = False
532+
if not smart_open.s3._CONCURRENT_FUTURES:
533+
self.fail('Unable to test iter bucket concurrent futures since it is disabled on this machine.')
534+
535+
ignore_resource_warnings()
536+
537+
def tearDown(self):
538+
smart_open.s3._MULTIPROCESSING = self.old_flag_multi
539+
cleanup_bucket()
540+
541+
def test(self):
542+
num_keys = 101
543+
populate_bucket(num_keys=num_keys)
544+
keys = list(smart_open.s3.iter_bucket(BUCKET_NAME))
545+
self.assertEqual(len(keys), num_keys)
546+
547+
expected = [('key_%d' % x, b'%d' % x) for x in range(num_keys)]
548+
self.assertEqual(sorted(keys), sorted(expected))
549+
550+
551+
@maybe_mock_s3
552+
class IterBucketMultiprocessingTest(unittest.TestCase):
553+
def setUp(self):
554+
self.old_flag_concurrent = smart_open.s3._CONCURRENT_FUTURES
555+
smart_open.s3._CONCURRENT_FUTURES = False
556+
if not smart_open.s3._MULTIPROCESSING:
557+
self.fail('Unable to test iter bucket multiprocessing since it is disabled on this machine.')
558+
559+
ignore_resource_warnings()
560+
561+
def tearDown(self):
562+
smart_open.s3._CONCURRENT_FUTURES = self.old_flag_concurrent
563+
cleanup_bucket()
564+
565+
def test(self):
566+
num_keys = 101
567+
populate_bucket(num_keys=num_keys)
568+
keys = list(smart_open.s3.iter_bucket(BUCKET_NAME))
569+
self.assertEqual(len(keys), num_keys)
570+
571+
expected = [('key_%d' % x, b'%d' % x) for x in range(num_keys)]
572+
self.assertEqual(sorted(keys), sorted(expected))
573+
574+
575+
@maybe_mock_s3
576+
class IterBucketSingleProcessTest(unittest.TestCase):
577+
def setUp(self):
578+
self.old_flag_multi = smart_open.s3._MULTIPROCESSING
579+
self.old_flag_concurrent = smart_open.s3._CONCURRENT_FUTURES
580+
smart_open.s3._MULTIPROCESSING = False
581+
smart_open.s3._CONCURRENT_FUTURES = False
582+
583+
ignore_resource_warnings()
584+
585+
def tearDown(self):
586+
smart_open.s3._MULTIPROCESSING = self.old_flag_multi
587+
smart_open.s3._CONCURRENT_FUTURES = self.old_flag_concurrent
512588
cleanup_bucket()
513589

514590
def test(self):

0 commit comments

Comments
 (0)