Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

iter_bucket fails on AWS Lambda when worker threads is not 1 #340

Closed
rodjunger opened this issue Jul 25, 2019 · 8 comments
Closed

iter_bucket fails on AWS Lambda when worker threads is not 1 #340

rodjunger opened this issue Jul 25, 2019 · 8 comments

Comments

@rodjunger
Copy link

rodjunger commented Jul 25, 2019

I did some research before opening this issue so it's more of a "information" kind of issue.
AWS Lambda environments do not support multiprocessing.Queue or multiprocessing.Pool (which is used by smart open) so calling iter_bucket with default arguments results in this exception:

Traceback (most recent call last):
File "/var/task/handler.py", line 33, in hello
for data in zs.stream():
File "/var/task/zipstream/zipstream.py", line 298, in stream
for idx, source in enumerate(self._source_of_files):
File "/var/task/handler.py", line 11, in stream_files_from_s3_bucket
for key, content in s3_iter_bucket(bucket_name, prefix=prefix):
File "/var/task/smart_open/s3.py", line 603, in iter_bucket
with _create_process_pool(processes=workers) as pool:
File "/var/lang/lib/python3.6/contextlib.py", line 81, in enter
return next(self.gen)
File "/var/task/smart_open/s3.py", line 694, in _create_process_pool
pool = multiprocessing.pool.Pool(processes=processes)
File "/var/lang/lib/python3.6/multiprocessing/pool.py", line 156, in init
self._setup_queues()
File "/var/lang/lib/python3.6/multiprocessing/pool.py", line 249, in _setup_queues
self._inqueue = self._ctx.SimpleQueue()
File "/var/lang/lib/python3.6/multiprocessing/context.py", line 112, in SimpleQueue
return SimpleQueue(ctx=self.get_context())
File "/var/lang/lib/python3.6/multiprocessing/queues.py", line 315, in init
self._rlock = ctx.Lock()
File "/var/lang/lib/python3.6/multiprocessing/context.py", line 67, in Lock
return Lock(ctx=self.get_context())
File "/var/lang/lib/python3.6/multiprocessing/synchronize.py", line 162, in init
SemLock.init(self, SEMAPHORE, 1, 1, ctx=ctx)
File "/var/lang/lib/python3.6/multiprocessing/synchronize.py", line 59, in init
unlink_now)
OSError: [Errno 38] Function not implemented

I found this fix from another project which seems simple enough for me to try to apply to this project too

@piskvorky
Copy link
Owner

That might work. Which version of Python is needed for concurrent.futures?

@rodjunger
Copy link
Author

rodjunger commented Jul 25, 2019

That might work. Which version of Python is needed for concurrent.futures?

It does work locally, going to test it on lambda in a second. Here's the code i "wrote" (basically a copy of the commit i linked with some adaptations) :

with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
        futures = [executor.submit(download_key, key) for key in key_iterator]
        for key_no, future in enumerate(concurrent.futures.as_completed(futures)):
            (key, content) = future.result()

            if True or key_no % 1000 == 0:
                logger.info(
                    "yielding key #%i: %s, size %i (total %.1fMB)", key_no, key, len(content), total_size / 1024.0 ** 2
                )

            yield key, content

            total_size += len(content)

            if key_limit is not None and key_no + 1 >= key_limit:
                # we were asked to output only a limited number of keys => we're done
                break

concurrent.futures was added on python 3.2 according to https://docs.python.org/3/library/concurrent.futures.html

@piskvorky
Copy link
Owner

piskvorky commented Jul 25, 2019

smart_open supports 2.7 too, so that wouldn't work out-of-the-box. Although we are considering dropping py2 support in our open source projects.

@mpenkov WDYT? According to https://pypistats.org/packages/smart-open , we still get more py2 downloads than py3 downloads. But god knows how many of these are bots / noise…

"Support iter_buckets on AWS lambda" doesn't seem a strong enough argument for us to drop py2. So I'm -1 on dropping py2, unless there are additional reasons to switch.

@rodjunger
Copy link
Author

Yeah that doesn't seem like a worthy reason to completely drop python 2 support, but there's this: https://pypi.org/project/futures/

@mpenkov
Copy link
Collaborator

mpenkov commented Jul 26, 2019

It sounds like the current smart_open version with default arguments does not work under Lambda at all, regardless of the Python version you're using. The fix suggested by @rodjunger improves that situation for Py3.2+ users only. As long as nothing changes for Py2.7 users (they continue to see OSError: [Errno 38] Function not implemented when using default arguments under Lambda), I think it's a worthwhile fix.

@rodjunger Can you make a PR that fixes the problem if concurrent.futures is available, but leaves everything as-is if it isn't?

@piskvorky I think we'll have to drop Py2.7 support eventually, for the same reasons as in gensim. Our Py2.7 users can continue to download smart_open<2.0. We'll develop new features for smart_open 2.0 and beyond, but those will support Py3.5 only (same as gensim).

@rodjunger
Copy link
Author

@mpenkov Yeah I can do that, I'll submit the PR this week if possible.

derpferd added a commit to derpferd/smart_open that referenced this issue Oct 29, 2019
This commit addresses issue piskvorky#340.
AWS Lambda environments do not support multiprocessing.Queue or
multiprocessing.Pool, which are used by iter_bucket to optimize the
pulling of files from s3.

Solution: Switch to using concurrent.futures.ThreadPoolExecutor instead.
This still optimizes the pulling of files from s3 without using new
processes.
derpferd added a commit to derpferd/smart_open that referenced this issue Oct 29, 2019
This commit addresses issue piskvorky#340.
AWS Lambda environments do not support multiprocessing.Queue or
multiprocessing.Pool, which are used by iter_bucket to optimize the
pulling of files from s3.

Solution: Switch to using concurrent.futures.ThreadPoolExecutor instead.
This still optimizes the pulling of files from s3 without using new
processes.
@derpferd
Copy link
Contributor

@mpenkov Assumed that since you marked this as Hackoberfest and @rodjunger hasn't respond that this needed work. I submitted a PR for the work @rodjunger said they were going to do.
Let me know what you think.

derpferd added a commit to derpferd/smart_open that referenced this issue Oct 29, 2019
This commit addresses issue piskvorky#340.
AWS Lambda environments do not support multiprocessing.Queue or
multiprocessing.Pool, which are used by iter_bucket to optimize the
pulling of files from s3.

Solution: Switch to using concurrent.futures.ThreadPoolExecutor instead.
This still optimizes the pulling of files from s3 without using new
processes.
derpferd added a commit to derpferd/smart_open that referenced this issue Oct 29, 2019
This commit addresses issue piskvorky#340.
AWS Lambda environments do not support multiprocessing.Queue or
multiprocessing.Pool, which are used by iter_bucket to optimize the
pulling of files from s3.

Solution: Switch to using concurrent.futures.ThreadPoolExecutor instead.
This still optimizes the pulling of files from s3 without using new
processes.
derpferd added a commit to derpferd/smart_open that referenced this issue Oct 30, 2019
This commit addresses issue piskvorky#340.
AWS Lambda environments do not support multiprocessing.Queue or
multiprocessing.Pool, which are used by iter_bucket to optimize the
pulling of files from s3.

Solution: Switch to using concurrent.futures.ThreadPoolExecutor instead.
This still optimizes the pulling of files from s3 without using new
processes.
@mpenkov mpenkov added this to the 1.9.0 milestone Nov 3, 2019
mpenkov added a commit that referenced this issue Mar 11, 2020
* Updated iter_bucket to use concurrent futures.

This commit addresses issue #340.
AWS Lambda environments do not support multiprocessing.Queue or
multiprocessing.Pool, which are used by iter_bucket to optimize the
pulling of files from s3.

Solution: Switch to using concurrent.futures.ThreadPoolExecutor instead.
This still optimizes the pulling of files from s3 without using new
processes.

* disable test_old when mocks are disabled

* favor multiprocessing over concurrent.futures

* make imap_unordered return an iterator instead of a list

* skip tests when their respective features are unavailable

* Revert "disable test_old when mocks are disabled"

This reverts commit 6506562.

* tweak imap_unordered

* remove tests_require pins

Co-authored-by: Michael Penkov <m@penkov.dev>
@mpenkov
Copy link
Collaborator

mpenkov commented Mar 11, 2020

Closed via #368

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants