Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Shutdown IO queue when file can't be opened #471

Merged
merged 1 commit into from
Feb 5, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions boto3/compat.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,18 @@
import sys
import os
import errno
import socket

from botocore.vendored import six

if six.PY3:
# In python3, socket.error is OSError, which is too general
# for what we want (i.e FileNotFoundError is a subclass of OSError).
# In py3 all the socket related errors are in a newly created
# ConnectionError
SOCKET_ERROR = ConnectionError
else:
SOCKET_ERROR = socket.error


if sys.platform.startswith('win'):
Expand Down
33 changes: 18 additions & 15 deletions boto3/s3/transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,10 @@ def __call__(self, bytes_amount):

MB = 1024 * 1024
SHUTDOWN_SENTINEL = object()
S3_RETRYABLE_ERRORS = (
socket.timeout, boto3.compat.SOCKET_ERROR,
ReadTimeoutError, IncompleteReadError
)


def random_file_extension(num_digits=8):
Expand Down Expand Up @@ -523,8 +527,7 @@ def _download_range(self, bucket, key, filename,
self._ioqueue.put((current_index, chunk))
current_index += len(chunk)
return
except (socket.timeout, socket.error,
ReadTimeoutError, IncompleteReadError) as e:
except S3_RETRYABLE_ERRORS as e:
logger.debug("Retrying exception caught (%s), "
"retrying request, (attempt %s / %s)", e, i,
max_attempts, exc_info=True)
Expand All @@ -535,6 +538,15 @@ def _download_range(self, bucket, key, filename,
logger.debug("EXITING _download_range for part: %s", part_index)

def _perform_io_writes(self, filename):
try:
self._loop_on_io_writes(filename)
except Exception as e:
logger.debug("Caught exception in IO thread: %s",
e, exc_info=True)
self._ioqueue.trigger_shutdown()
raise

def _loop_on_io_writes(self, filename):
with self._os.open(filename, 'wb') as f:
while True:
task = self._ioqueue.get()
Expand All @@ -543,15 +555,9 @@ def _perform_io_writes(self, filename):
"shutting down IO handler.")
return
else:
try:
offset, data = task
f.seek(offset)
f.write(data)
except Exception as e:
logger.debug("Caught exception in IO thread: %s",
e, exc_info=True)
self._ioqueue.trigger_shutdown()
raise
offset, data = task
f.seek(offset)
f.write(data)


class TransferConfig(object):
Expand Down Expand Up @@ -699,10 +705,7 @@ def _get_object(self, bucket, key, filename, extra_args, callback):
try:
return self._do_get_object(bucket, key, filename,
extra_args, callback)
except (socket.timeout, socket.error,
ReadTimeoutError, IncompleteReadError) as e:
# TODO: we need a way to reset the callback if the
# download failed.
except S3_RETRYABLE_ERRORS as e:
logger.debug("Retrying exception caught (%s), "
"retrying request, (attempt %s / %s)", e, i,
max_attempts, exc_info=True)
Expand Down
26 changes: 26 additions & 0 deletions tests/integration/test_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,32 @@ def test_download_above_threshold(self):
download_path)
assert_files_equal(filename, download_path)

def test_download_file_with_directory_not_exist(self):
transfer = self.create_s3_transfer()
self.client.put_object(Bucket=self.bucket_name,
Key='foo.txt',
Body=b'foo')
self.addCleanup(self.delete_object, 'foo.txt')
download_path = os.path.join(self.files.rootdir, 'a', 'b', 'c',
'downloaded.txt')
with self.assertRaises(IOError):
transfer.download_file(self.bucket_name, 'foo.txt', download_path)

def test_download_large_file_directory_not_exist(self):
transfer = self.create_s3_transfer()

filename = self.files.create_file_with_size(
'foo.txt', filesize=20 * 1024 * 1024)
with open(filename, 'rb') as f:
self.client.put_object(Bucket=self.bucket_name,
Key='foo.txt',
Body=f)
self.addCleanup(self.delete_object, 'foo.txt')
download_path = os.path.join(self.files.rootdir, 'a', 'b', 'c',
'downloaded.txt')
with self.assertRaises(IOError):
transfer.download_file(self.bucket_name, 'foo.txt', download_path)

def test_transfer_methods_through_client(self):
# This is really just a sanity check to ensure that the interface
# from the clients work. We're not exhaustively testing through
Expand Down
24 changes: 20 additions & 4 deletions tests/unit/s3/test_transfer.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ def test_retry_on_failures_from_stream_reads(self):
response_body = b'foobarbaz'
stream_with_errors = mock.Mock()
stream_with_errors.read.side_effect = [
socket.error("fake error"),
socket.timeout("fake error"),
response_body
]
client.get_object.return_value = {'Body': stream_with_errors}
Expand Down Expand Up @@ -429,7 +429,7 @@ def test_exception_raised_on_exceeded_retries(self):
client = mock.Mock()
response_body = b'foobarbaz'
stream_with_errors = mock.Mock()
stream_with_errors.read.side_effect = socket.error("fake error")
stream_with_errors.read.side_effect = socket.timeout("fake error")
client.get_object.return_value = {'Body': stream_with_errors}
config = TransferConfig(multipart_threshold=4,
multipart_chunksize=4)
Expand Down Expand Up @@ -459,6 +459,22 @@ def test_io_thread_failure_triggers_shutdown(self):
downloader.download_file('bucket', 'key', 'filename',
len(response_body), {})

def test_io_thread_fails_to_open_triggers_shutdown_error(self):
client = mock.Mock()
client.get_object.return_value = {
'Body': six.BytesIO(b'asdf')
}
os_layer = mock.Mock(spec=OSUtils)
os_layer.open.side_effect = IOError("Can't open file")
downloader = MultipartDownloader(
client, TransferConfig(),
os_layer, SequentialExecutor)
# We're verifying that the exception raised from the IO future
# propogates back up via download_file().
with self.assertRaisesRegexp(IOError, "Can't open file"):
downloader.download_file('bucket', 'key', 'filename',
len(b'asdf'), {})

def test_download_futures_fail_triggers_shutdown(self):
class FailedDownloadParts(SequentialExecutor):
def __init__(self, max_workers):
Expand Down Expand Up @@ -619,7 +635,7 @@ def test_get_object_stream_is_retried_and_succeeds(self):
'ContentLength': below_threshold}
self.client.get_object.side_effect = [
# First request fails.
socket.error("fake error"),
socket.timeout("fake error"),
# Second succeeds.
{'Body': six.BytesIO(b'foobar')}
]
Expand All @@ -636,7 +652,7 @@ def test_get_object_stream_uses_all_retries_and_errors_out(self):
# Here we're raising an exception every single time, which
# will exhaust our retry count and propogate a
# RetriesExceededError.
self.client.get_object.side_effect = socket.error("fake error")
self.client.get_object.side_effect = socket.timeout("fake error")
with self.assertRaises(RetriesExceededError):
transfer.download_file('bucket', 'key', 'smallfile')

Expand Down