Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BUG: mv() a file with recursive=True no longer working #394

Closed
jorisvandenbossche opened this issue Nov 10, 2020 · 2 comments · Fixed by #412
Closed

BUG: mv() a file with recursive=True no longer working #394

jorisvandenbossche opened this issue Nov 10, 2020 · 2 comments · Fixed by #412

Comments

@jorisvandenbossche
Copy link

When moving a file with mv() while specifying recursive=True, this now fails with s3fs 0.5.1 or master, but was working with 0.4.

This patch reproduces it in the tests:

--- a/s3fs/tests/test_s3fs.py
+++ b/s3fs/tests/test_s3fs.py
@@ -728,10 +728,11 @@ def test_copy_managed(s3):
         sync(s3.loop, s3._copy_managed, fn, fn + '3', size=len(data), block=6 * 2 ** 30)
 
 
-def test_move(s3):
+@pytest.mark.parametrize("recursive", [True, False])
+def test_move(s3, recursive):
     fn = test_bucket_name + '/test/accounts.1.json'
     data = s3.cat(fn)
-    s3.mv(fn, fn + '2')
+    s3.mv(fn, fn + '2', recursive=recursive)
     assert s3.cat(fn + '2') == data
     assert not s3.exists(fn)

The above is giving this test output on master:

=============================================================================================== test session starts ===============================================================================================
platform linux -- Python 3.7.3, pytest-6.1.1, py-1.9.0, pluggy-0.12.0 -- /home/joris/miniconda3/envs/arrow-dev/bin/python
cachedir: .pytest_cache
hypothesis profile 'default' -> database=DirectoryBasedExampleDatabase('/home/joris/scipy/repos/s3fs/.hypothesis/examples')
rootdir: /home/joris/scipy/repos/s3fs, configfile: pytest.ini
plugins: hypothesis-4.47.5, lazy-fixture-0.6.3, mock-3.1.1
collected 2 items                                                                                                                                                                                                 

s3fs/tests/test_s3fs.py::test_move[True] FAILED                                                                                                                                                             [ 50%]
s3fs/tests/test_s3fs.py::test_move[False] PASSED                                                                                                                                                            [100%]

==================================================================================================== FAILURES =====================================================================================================
_________________________________________________________________________________________________ test_move[True] _________________________________________________________________________________________________

self = <s3fs.core.S3FileSystem object at 0x7f6aebcc8198>, method = <bound method ClientCreator._create_api_method.<locals>._api_call of <aiobotocore.client.S3 object at 0x7f6aeb51c7b8>>, akwarglist = ({},)
kwargs = {'Bucket': 'test', 'CopySource': {'Bucket': 'test', 'Key': 'test'}, 'Key': 'test/accounts.1.json2'}
kw2 = {'Bucket': 'test', 'CopySource': {'Bucket': 'test', 'Key': 'test'}, 'Key': 'test/accounts.1.json2'}
additional_kwargs = {'Bucket': 'test', 'CopySource': {'Bucket': 'test', 'Key': 'test'}, 'Key': 'test/accounts.1.json2'}, i = 0
err = ClientError('An error occurred (404) when calling the CopyObject operation: Not Found')

    async def _call_s3(self, method, *akwarglist, **kwargs):
        kw2 = kwargs.copy()
        kw2.pop('Body', None)
        logger.debug("CALL: %s - %s - %s" % (method.__name__, akwarglist, kw2))
        additional_kwargs = self._get_s3_method_kwargs(method, *akwarglist,
                                                       **kwargs)
        for i in range(self.retries):
            try:
>               return await method(**additional_kwargs)

s3fs/core.py:211: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <aiobotocore.client.S3 object at 0x7f6aeb51c7b8>, operation_name = 'CopyObject', api_params = {'Bucket': 'test', 'CopySource': 'test/test', 'Key': 'test/accounts.1.json2'}

    async def _make_api_call(self, operation_name, api_params):
        operation_model = self._service_model.operation_model(operation_name)
        service_name = self._service_model.service_name
        history_recorder.record('API_CALL', {
            'service': service_name,
            'operation': operation_name,
            'params': api_params,
        })
        if operation_model.deprecated:
            logger.debug('Warning: %s.%s() is deprecated',
                         service_name, operation_name)
        request_context = {
            'client_region': self.meta.region_name,
            'client_config': self.meta.config,
            'has_streaming_input': operation_model.has_streaming_input,
            'auth_type': operation_model.auth_type,
        }
        request_dict = await self._convert_to_request_dict(
            api_params, operation_model, context=request_context)
    
        service_id = self._service_model.service_id.hyphenize()
        handler, event_response = await self.meta.events.emit_until_response(
            'before-call.{service_id}.{operation_name}'.format(
                service_id=service_id,
                operation_name=operation_name),
            model=operation_model, params=request_dict,
            request_signer=self._request_signer, context=request_context)
    
        if event_response is not None:
            http, parsed_response = event_response
        else:
            http, parsed_response = await self._make_request(
                operation_model, request_dict, request_context)
    
        await self.meta.events.emit(
            'after-call.{service_id}.{operation_name}'.format(
                service_id=service_id,
                operation_name=operation_name),
            http_response=http, parsed=parsed_response,
            model=operation_model, context=request_context
        )
    
        if http.status_code >= 300:
            error_code = parsed_response.get("Error", {}).get("Code")
            error_class = self.exceptions.from_code(error_code)
>           raise error_class(parsed_response, operation_name)
E           botocore.exceptions.ClientError: An error occurred (404) when calling the CopyObject operation: Not Found

../../../miniconda3/envs/arrow-dev/lib/python3.7/site-packages/aiobotocore/client.py:151: ClientError

The above exception was the direct cause of the following exception:

s3 = <s3fs.core.S3FileSystem object at 0x7f6aebcc8198>, recursive = True

    @pytest.mark.parametrize("recursive", [True, False])
    def test_move(s3, recursive):
        fn = test_bucket_name + '/test/accounts.1.json'
        data = s3.cat(fn)
>       s3.mv(fn, fn + '2', recursive=recursive)

s3fs/tests/test_s3fs.py:735: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../../../miniconda3/envs/arrow-dev/lib/python3.7/site-packages/fsspec/spec.py:802: in mv
    self.copy(path1, path2, recursive=recursive, maxdepth=maxdepth)
../../../miniconda3/envs/arrow-dev/lib/python3.7/site-packages/fsspec/asyn.py:206: in copy
    maybe_sync(self._copy, self, paths, path2)
../../../miniconda3/envs/arrow-dev/lib/python3.7/site-packages/fsspec/asyn.py:100: in maybe_sync
    return sync(loop, func, *args, **kwargs)
../../../miniconda3/envs/arrow-dev/lib/python3.7/site-packages/fsspec/asyn.py:71: in sync
    raise exc.with_traceback(tb)
../../../miniconda3/envs/arrow-dev/lib/python3.7/site-packages/fsspec/asyn.py:55: in f
    result[0] = await future
../../../miniconda3/envs/arrow-dev/lib/python3.7/site-packages/fsspec/asyn.py:200: in _copy
    *[self._cp_file(p1, p2, **kwargs) for p1, p2 in zip(paths, path2)]
s3fs/core.py:1206: in _cp_file
    await self._copy_basic(path1, path2, **kwargs)
s3fs/core.py:1154: in _copy_basic
    Bucket=buc2, Key=key2, CopySource=copy_src
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <s3fs.core.S3FileSystem object at 0x7f6aebcc8198>, method = <bound method ClientCreator._create_api_method.<locals>._api_call of <aiobotocore.client.S3 object at 0x7f6aeb51c7b8>>, akwarglist = ({},)
kwargs = {'Bucket': 'test', 'CopySource': {'Bucket': 'test', 'Key': 'test'}, 'Key': 'test/accounts.1.json2'}
kw2 = {'Bucket': 'test', 'CopySource': {'Bucket': 'test', 'Key': 'test'}, 'Key': 'test/accounts.1.json2'}
additional_kwargs = {'Bucket': 'test', 'CopySource': {'Bucket': 'test', 'Key': 'test'}, 'Key': 'test/accounts.1.json2'}, i = 0
err = ClientError('An error occurred (404) when calling the CopyObject operation: Not Found')

    async def _call_s3(self, method, *akwarglist, **kwargs):
        kw2 = kwargs.copy()
        kw2.pop('Body', None)
        logger.debug("CALL: %s - %s - %s" % (method.__name__, akwarglist, kw2))
        additional_kwargs = self._get_s3_method_kwargs(method, *akwarglist,
                                                       **kwargs)
        for i in range(self.retries):
            try:
                return await method(**additional_kwargs)
            except S3_RETRYABLE_ERRORS as e:
                logger.debug("Retryable error: %s" % e)
                err = e
                await asyncio.sleep(min(1.7**i * 0.1, 15))
            except Exception as e:
                logger.debug("Nonretryable error: %s" % e)
                err = e
                break
        if "'coroutine'" in str(err):
            # aiobotocore internal error - fetch original botocore error
            tb = err.__traceback__
            while tb.tb_next:
                tb = tb.tb_next
            try:
                await tb.tb_frame.f_locals['response']
            except Exception as e:
                err = e
>       raise translate_boto_error(err) from err
E       FileNotFoundError: Not Found

s3fs/core.py:229: FileNotFoundError
---------------------------------------------------------------------------------------------- Captured stderr setup ----------------------------------------------------------------------------------------------
 * Running on http://127.0.0.1:5555/ (Press CTRL+C to quit)
127.0.0.1 - - [10/Nov/2020 17:46:59] "GET / HTTP/1.1" 200 -
127.0.0.1 - - [10/Nov/2020 17:46:59] "PUT /test HTTP/1.1" 200 -
127.0.0.1 - - [10/Nov/2020 17:46:59] "PUT /test-versioned HTTP/1.1" 200 -
127.0.0.1 - - [10/Nov/2020 17:46:59] "PUT /test-versioned?versioning HTTP/1.1" 200 -
127.0.0.1 - - [10/Nov/2020 17:46:59] "PUT /test-secure HTTP/1.1" 200 -
127.0.0.1 - - [10/Nov/2020 17:46:59] "PUT /test-secure?policy HTTP/1.1" 200 -
127.0.0.1 - - [10/Nov/2020 17:46:59] "PUT /test/test/accounts.1.json HTTP/1.1" 200 -
127.0.0.1 - - [10/Nov/2020 17:46:59] "PUT /test/test/accounts.2.json HTTP/1.1" 200 -
127.0.0.1 - - [10/Nov/2020 17:46:59] "PUT /test/2014-01-01.csv HTTP/1.1" 200 -
127.0.0.1 - - [10/Nov/2020 17:46:59] "PUT /test/2014-01-02.csv HTTP/1.1" 200 -
127.0.0.1 - - [10/Nov/2020 17:46:59] "PUT /test/2014-01-03.csv HTTP/1.1" 200 -
127.0.0.1 - - [10/Nov/2020 17:46:59] "PUT /test/nested/file1 HTTP/1.1" 200 -
127.0.0.1 - - [10/Nov/2020 17:46:59] "PUT /test/nested/file2 HTTP/1.1" 200 -
127.0.0.1 - - [10/Nov/2020 17:46:59] "PUT /test/nested/nested2/file1 HTTP/1.1" 200 -
127.0.0.1 - - [10/Nov/2020 17:46:59] "PUT /test/nested/nested2/file2 HTTP/1.1" 200 -
127.0.0.1 - - [10/Nov/2020 17:46:59] "PUT /test/file.dat HTTP/1.1" 200 -
127.0.0.1 - - [10/Nov/2020 17:46:59] "PUT /test/filexdat HTTP/1.1" 200 -
---------------------------------------------------------------------------------------------- Captured stderr call -----------------------------------------------------------------------------------------------
127.0.0.1 - - [10/Nov/2020 17:46:59] "GET /test/test/accounts.1.json HTTP/1.1" 200 -
127.0.0.1 - - [10/Nov/2020 17:46:59] "GET /test?list-type=2&prefix=test%2Faccounts.1.json%2F&delimiter=&encoding-type=url HTTP/1.1" 200 -
127.0.0.1 - - [10/Nov/2020 17:46:59] "HEAD /test/test/accounts.1.json HTTP/1.1" 200 -
127.0.0.1 - - [10/Nov/2020 17:46:59] "HEAD /test/test HTTP/1.1" 404 -
127.0.0.1 - - [10/Nov/2020 17:46:59] "HEAD /test/test/accounts.1.json HTTP/1.1" 200 -
127.0.0.1 - - [10/Nov/2020 17:46:59] "PUT /test/test/accounts.1.json2/accounts.1.json HTTP/1.1" 200 -
127.0.0.1 - - [10/Nov/2020 17:46:59] "GET /test?list-type=2&prefix=test%2F&delimiter=%2F&max-keys=1&encoding-type=url HTTP/1.1" 200 -
127.0.0.1 - - [10/Nov/2020 17:46:59] "PUT /test/test/accounts.1.json2 HTTP/1.1" 404 -

Using a different test with the MinIO fixture setup as used in pyarrow (but pure s3fs test otherwise), I get a different error message ("ParamValidationError", see test in the error output below):

test_s3fs.py::test_mv FAILED                                                                                                                                                                                [100%]

==================================================================================================== FAILURES =====================================================================================================
_____________________________________________________________________________________________________ test_mv _____________________________________________________________________________________________________

fsspec_s3fs = (<s3fs.core.S3FileSystem object at 0x7f697046f790>, <method-wrapper '__add__' of str object at 0x7f69b5a7e710>)

    def test_mv(fsspec_s3fs):
        s3, _ = fsspec_s3fs
    
        bucket = 'pyarrow-filesystem/'
    
        source = bucket + 'test-move-source-file'
        target = bucket + 'test-move-target-file'
    
        with s3.open(source, mode="wb"):
                pass
    
>       s3.mv(source, target, recursive=True)

test_s3fs.py:141: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
../miniconda3/envs/test-s3fs-master/lib/python3.8/site-packages/fsspec/spec.py:802: in mv
    self.copy(path1, path2, recursive=recursive, maxdepth=maxdepth)
../miniconda3/envs/test-s3fs-master/lib/python3.8/site-packages/fsspec/asyn.py:206: in copy
    maybe_sync(self._copy, self, paths, path2)
../miniconda3/envs/test-s3fs-master/lib/python3.8/site-packages/fsspec/asyn.py:100: in maybe_sync
    return sync(loop, func, *args, **kwargs)
../miniconda3/envs/test-s3fs-master/lib/python3.8/site-packages/fsspec/asyn.py:71: in sync
    raise exc.with_traceback(tb)
../miniconda3/envs/test-s3fs-master/lib/python3.8/site-packages/fsspec/asyn.py:55: in f
    result[0] = await future
../miniconda3/envs/test-s3fs-master/lib/python3.8/site-packages/fsspec/asyn.py:199: in _copy
    await asyncio.gather(
repos/s3fs/s3fs/core.py:1203: in _cp_file
    size = (await self._info(path1, bucket, key, version_id=vers))['size']
repos/s3fs/s3fs/core.py:759: in _info
    out = await self._call_s3(self.s3.head_object, kwargs, Bucket=bucket,
repos/s3fs/s3fs/core.py:229: in _call_s3
    raise translate_boto_error(err) from err
repos/s3fs/s3fs/core.py:211: in _call_s3
    return await method(**additional_kwargs)
../miniconda3/envs/test-s3fs-master/lib/python3.8/site-packages/aiobotocore/client.py:123: in _make_api_call
    request_dict = await self._convert_to_request_dict(
../miniconda3/envs/test-s3fs-master/lib/python3.8/site-packages/aiobotocore/client.py:171: in _convert_to_request_dict
    request_dict = self._serializer.serialize_to_request(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _

self = <botocore.validate.ParamValidationDecorator object at 0x7f69aae31df0>, parameters = {'Bucket': 'pyarrow-filesystem', 'Key': ''}, operation_model = OperationModel(name=HeadObject)

    def serialize_to_request(self, parameters, operation_model):
        input_shape = operation_model.input_shape
        if input_shape is not None:
            report = self._param_validator.validate(parameters,
                                                    operation_model.input_shape)
            if report.has_errors():
>               raise ParamValidationError(report=report.generate_report())
E               botocore.exceptions.ParamValidationError: Parameter validation failed:
E               Invalid length for parameter Key, value: 0, valid range: 1-inf

../miniconda3/envs/test-s3fs-master/lib/python3.8/site-packages/botocore/validate.py:297: ParamValidationError
---------------------------------------------------------------------------------------------- Captured stdout setup ----------------------------------------------------------------------------------------------
Attempting encryption of all config, IAM users and policies on MinIO backend
============================================================================================= short test summary info =============================================================================================
FAILED test_s3fs.py::test_mv - botocore.exceptions.ParamValidationError: Parameter validation failed:

With released s3fs 0.5.1, this last test is actually working for the mv step, but fails to raise an error on s3.rm(source) (where source does not exist aymore, as it was moved)

@michalz92
Copy link

Similar with cp(recursive=True).

@michalz92
Copy link

Both cp(recursive=True) and mv(recursive=True) still fail for me in 0.5.2. They seem to copy all the files but likely break when trying to copy the directory itself (unfortunately the error msg doesn't show the missing key and I'd rather just write a workaround than debug).

Below is a reproduction of what I'm doing

import s3fs
import os

common_key = os.path.join(bucket, data_path)
staging_key = os.path.join(common_key, "staging")
prod_key = os.path.join(common_key, "prod")

source = f"s3://{staging_key}"
destination = f"s3://{prod_key}"

s3 = s3fs.S3FileSystem()
s3.mv(source, destination, recursive=True)

Produces:

---------------------------------------------------------------------------
NoSuchKey                                 Traceback (most recent call last)
/usr/local/lib/python3.8/site-packages/s3fs/core.py in _call_s3(self, method, *akwarglist, **kwargs)
    233             try:
--> 234                 return await method(**additional_kwargs)
    235             except S3_RETRYABLE_ERRORS as e:

/usr/local/lib/python3.8/site-packages/aiobotocore/client.py in _make_api_call(self, operation_name, api_params)
    153             error_class = self.exceptions.from_code(error_code)
--> 154             raise error_class(parsed_response, operation_name)
    155         else:

NoSuchKey: An error occurred (NoSuchKey) when calling the CopyObject operation: The specified key does not exist.

The above exception was the direct cause of the following exception:

FileNotFoundError                         Traceback (most recent call last)
<ipython-input-8-5a342e637684> in <module>
      1 s3 = s3fs.S3FileSystem()
----> 2 s3.mv(source, destination, recursive=True)

/usr/local/lib/python3.8/site-packages/fsspec/spec.py in mv(self, path1, path2, recursive, maxdepth, **kwargs)
    800     def mv(self, path1, path2, recursive=False, maxdepth=None, **kwargs):
    801         """ Move file(s) from one location to another """
--> 802         self.copy(path1, path2, recursive=recursive, maxdepth=maxdepth)
    803         self.rm(path1, recursive=recursive)
    804 

/usr/local/lib/python3.8/site-packages/fsspec/asyn.py in copy(self, path1, path2, recursive, **kwargs)
    204         paths = self.expand_path(path1, recursive=recursive)
    205         path2 = other_paths(paths, path2)
--> 206         maybe_sync(self._copy, self, paths, path2)
    207 
    208     async def _pipe(self, path, value=None, **kwargs):

/usr/local/lib/python3.8/site-packages/fsspec/asyn.py in maybe_sync(func, self, *args, **kwargs)
     98         if inspect.iscoroutinefunction(func):
     99             # run the awaitable on the loop
--> 100             return sync(loop, func, *args, **kwargs)
    101         else:
    102             # just call the blocking function

/usr/local/lib/python3.8/site-packages/fsspec/asyn.py in sync(loop, func, callback_timeout, *args, **kwargs)
     69     if error[0]:
     70         typ, exc, tb = error[0]
---> 71         raise exc.with_traceback(tb)
     72     else:
     73         return result[0]

/usr/local/lib/python3.8/site-packages/fsspec/asyn.py in f()
     53             if callback_timeout is not None:
     54                 future = asyncio.wait_for(future, callback_timeout)
---> 55             result[0] = await future
     56         except Exception:
     57             error[0] = sys.exc_info()

/usr/local/lib/python3.8/site-packages/fsspec/asyn.py in _copy(self, paths, path2, **kwargs)
    197 
    198     async def _copy(self, paths, path2, **kwargs):
--> 199         await asyncio.gather(
    200             *[self._cp_file(p1, p2, **kwargs) for p1, p2 in zip(paths, path2)]
    201         )

/usr/local/lib/python3.8/site-packages/s3fs/core.py in _cp_file(self, path1, path2, **kwargs)
   1350         if size <= gb5:
   1351             # simple copy allowed for <5GB
-> 1352             await self._copy_basic(path1, path2, **kwargs)
   1353         else:
   1354             # serial multipart copy

/usr/local/lib/python3.8/site-packages/s3fs/core.py in _copy_basic(self, path1, path2, **kwargs)
   1294             if ver1:
   1295                 copy_src["VersionId"] = ver1
-> 1296             await self._call_s3(
   1297                 self.s3.copy_object, kwargs, Bucket=buc2, Key=key2, CopySource=copy_src
   1298             )

/usr/local/lib/python3.8/site-packages/s3fs/core.py in _call_s3(self, method, *akwarglist, **kwargs)
    250             except Exception as e:
    251                 err = e
--> 252         raise translate_boto_error(err) from err
    253 
    254     call_s3 = sync_wrapper(_call_s3)

FileNotFoundError: The specified key does not exist.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants