Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add mock s3 test #232

Merged
merged 13 commits into from
Jan 14, 2025
1 change: 1 addition & 0 deletions environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ dependencies:
# see github.com/zarr-developers/zarr-python/issues/1362
- zarr >=2.13.6 # KVStore to FSStore
# Python packages for testing
- moto # mock S3 tests
- pytest
- pytest-cov >=2.10.1
- pytest-html !=2.1.0
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
# pin Zarr to use new FSStore instead of KVStore
'zarr>=2.13.3', # github.com/zarr-developers/zarr-python/issues/1362
# for testing
'moto', # mock S3 tests
'pytest',
'pytest-cov>=2.10.1',
'pytest-html!=2.1.0',
Expand Down
108 changes: 108 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import os
import s3fs
import pathlib
import json
import moto
import pytest

from moto.moto_server.threaded_moto_server import ThreadedMotoServer


# some spoofy server parameters
# test parameters; don't modify these
port = 5555
endpoint_uri = "http://127.0.0.1:%s/" % port
test_bucket_name = "test"
versioned_bucket_name = "test-versioned"
secure_bucket_name = "test-secure"

def get_boto3_client():
from botocore.session import Session

# NB: we use the sync botocore client for setup
session = Session()
return session.create_client("s3", endpoint_url=endpoint_uri)


@pytest.fixture(scope="module")
def s3_base():
# writable local S3 system

# This fixture is module-scoped, meaning that we can re-use the MotoServer across all tests
#####
# lifted from https://github.com/fsspec/s3fs/blob/main/s3fs/tests/test_s3fs.py
#####
server = ThreadedMotoServer(ip_address="127.0.0.1", port=port)
server.start()
# the user ID and secret key are needed when accessing a public bucket
# since our S3 FS and bucket are not actually on an AWS system, they can have
# bogus values
if "AWS_SECRET_ACCESS_KEY" not in os.environ:
os.environ["AWS_SECRET_ACCESS_KEY"] = "foo"
if "AWS_ACCESS_KEY_ID" not in os.environ:
os.environ["AWS_ACCESS_KEY_ID"] = "foo"
os.environ.pop("AWS_PROFILE", None)

print("server up")
yield
print("moto done")
server.stop()


@pytest.fixture()
def s3fs_s3(s3_base):
"""
Create a fully functional "virtual" S3 FileSystem compatible with fsspec/s3fs.
Method inspired by https://github.com/fsspec/s3fs/blob/main/s3fs/tests/test_s3fs.py

The S3 FS, being AWS-like but not actually physically deployed anywhere, still needs
all the usual user IDs, secret keys, endpoint URLs etc; the setup makes use of the ACL=public
configuration (public-read, or public-read-write). Public DOES NOT mean anon=True, but rather,
All Users group – https://docs.aws.amazon.com/AmazonS3/latest/userguide/acl-overview.html
Access permission to this group allows anyone with AWS credentials to access the resource.
The requests need be signed (authenticated) or not.

Also, keys are encrypted using AWS-KMS
https://docs.aws.amazon.com/kms/latest/developerguide/overview.html
"""
client = get_boto3_client()

# see not above about ACL=public-read
client.create_bucket(Bucket=test_bucket_name, ACL="public-read")

client.create_bucket(Bucket=versioned_bucket_name, ACL="public-read")
client.put_bucket_versioning(
Bucket=versioned_bucket_name, VersioningConfiguration={"Status": "Enabled"}
)

# initialize secure bucket
client.create_bucket(Bucket=secure_bucket_name, ACL="public-read")
policy = json.dumps(
{
"Version": "2012-10-17",
"Id": "PutObjPolicy",
"Statement": [
{
"Sid": "DenyUnEncryptedObjectUploads",
"Effect": "Deny",
"Principal": "*",
"Action": "s3:PutObject",
"Resource": "arn:aws:s3:::{bucket_name}/*".format(
bucket_name=secure_bucket_name
),
"Condition": {
"StringNotEquals": {
"s3:x-amz-server-side-encryption": "aws:kms"
}
},
}
],
}
)

client.put_bucket_policy(Bucket=secure_bucket_name, Policy=policy)
s3fs.S3FileSystem.clear_instance_cache()
s3 = s3fs.S3FileSystem(anon=False, client_kwargs={"endpoint_url": endpoint_uri})
s3.invalidate_cache()

yield s3
149 changes: 149 additions & 0 deletions tests/unit/test_mock_s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
import os
import s3fs
import pathlib
import pytest
import h5netcdf

from tempfile import NamedTemporaryFile
from activestorage.active import load_from_s3


# needed by the spoofed s3 filesystem
port = 5555
endpoint_uri = "http://127.0.0.1:%s/" % port


def test_s3fs_s3(s3fs_s3):
"""Test mock S3 filesystem constructor."""
# this is an entire mock S3 FS
mock_s3_filesystem = s3fs_s3

# explore its attributes and methods
print(dir(mock_s3_filesystem))

assert not mock_s3_filesystem.anon
assert not mock_s3_filesystem.version_aware
assert mock_s3_filesystem.client_kwargs == {'endpoint_url': 'http://127.0.0.1:5555/'}


def spoof_boto3_s3(bucket, file_name, file_path):
# this is a pure boto3 implementation
# I am leaving it here just in case we'll ever need it in the future
# NOTE: we are NOT including boto3 as dependency yet, until we ever need it

# "put" file
if os.path.exists(file_path):
with open(file_path, "rb") as file_contents:
conn = boto3.session.Session()
s3 = conn.resource('s3')
object = s3.Object(bucket, file_name)
result = object.put(Body=file_contents)
res = result.get('ResponseMetadata')
if res.get('HTTPStatusCode') == 200:
print('File Uploaded Successfully')
else:
print('File Not Uploaded Successfully')

# "download" file
s3 = boto3.resource('s3')
# arg0: file in bucket; arg1: file to download to
target_file = "test.nc"
s3file = s3.Bucket(bucket).download_file(file_name, target_file)
print(os.path.isfile(target_file))

# "access" file "remotely" with s3fs
fs = s3fs.S3FileSystem(anon=True)
with open('testobj.nc', 'wb') as ncdata:
object.download_fileobj(ncdata)
with open('testobj.nc', 'rb') as ncdata:
ncfile = h5netcdf.File(ncdata, 'r', invalid_netcdf=True)
print(ncfile)

return res


@pytest.fixture(scope='session')
def aws_credentials():
"""
Mocked AWS Credentials for moto.
NOTE: Used ONLY by the pure boto3 test method spoof_boto3_s3.
"""
# NOTE: Used ONLY by the pure boto3 test method spoof_boto3_s3
os.environ['AWS_ACCESS_KEY_ID'] = 'testing'
os.environ['AWS_SECRET_ACCESS_KEY'] = 'testing'
os.environ['AWS_SECURITY_TOKEN'] = 'testing'
os.environ['AWS_SESSION_TOKEN'] = 'testing'
os.environ['AWS_DEFAULT_REGION'] = 'us-east-1'

try:
tmp = NamedTemporaryFile(delete=False)
tmp.write(b"""[wild weasel]
aws_access_key_id = testing
aws_secret_access_key = testing""")
tmp.close()
os.environ['AWS_SHARED_CREDENTIALS_FILE'] = str(tmp.name)
yield
finally:
os.unlink(tmp.name)


@pytest.fixture(scope='function')
def empty_bucket(aws_credentials):
"""Create an empty bucket."""
# NOTE: Used ONLY by the pure boto3 test method spoof_boto3_s3
moto_fake = moto.mock_aws()
try:
moto_fake.start()
conn = boto3.resource('s3')
conn.create_bucket(Bucket="MY_BUCKET")
yield conn
finally:
moto_fake.stop()


@pytest.mark.skip(reason="This test uses the pure boto3 implement which we don't need at the moment.")
def test_s3file_with_pure_boto3(empty_bucket):
ncfile = "./tests/test_data/daily_data.nc"
file_path = pathlib.Path(ncfile)
file_name = pathlib.Path(ncfile).name
# partial spoofing with only boto3+moto
result = spoof_s3("MY_BUCKET", file_name, file_path)
with s3.open(os.path.join("MY_BUCKET", file_name), "rb") as f:
ncfile = h5netcdf.File(f, 'r', invalid_netcdf=True)
assert result.get('HTTPStatusCode') == 200


def test_s3file_with_s3fs(s3fs_s3):
"""
This test spoofs a complete s3fs FileSystem via s3fs_s3,
creates a mock bucket inside it, then puts a REAL netCDF4 file in it,
then it loads it as if it was an S3 file. This is proper
Wild Weasel stuff right here.
"""
# set up physical file and Path properties
ncfile = "./tests/test_data/daily_data.nc"
file_path = pathlib.Path(ncfile)
file_name = pathlib.Path(ncfile).name

# use mocked s3fs
bucket = "MY_BUCKET"
s3fs_s3.mkdir(bucket)
s3fs_s3.put(file_path, bucket)
s3 = s3fs.S3FileSystem(
anon=False, version_aware=True, client_kwargs={"endpoint_url": endpoint_uri}
)

# test load by h5netcdf
with s3.open(os.path.join("MY_BUCKET", file_name), "rb") as f:
print("File path", f.path)
ncfile = h5netcdf.File(f, 'r', invalid_netcdf=True)
print("File loaded from spoof S3 with h5netcdf:", ncfile)
print(ncfile["ta"])
assert "ta" in ncfile

# test Active
storage_options = dict(anon=False, version_aware=True,
client_kwargs={"endpoint_url": endpoint_uri})
with load_from_s3(os.path.join("MY_BUCKET", file_name), storage_options) as ac_file:
print(ac_file)
assert "ta" in ac_file
Loading