Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update ABSStore to current Azure Storage API version #620

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion requirements_dev_optional.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ ipytree==0.1.3
# optional library requirements for services
# don't let pyup change pinning for azure-storage-blob, need to pin to older
# version to get compatibility with azure storage emulator on appveyor
azure-storage-blob==2.0.1 # pyup: ignore
azure-storage-blob==12.5.0 # pyup: ignore
redis==3.3.8
pymongo==3.9.0
# optional test requirements
Expand Down
75 changes: 30 additions & 45 deletions zarr/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -2147,17 +2147,18 @@ class ABSStore(MutableMapping):

def __init__(self, container, prefix='', account_name=None, account_key=None,
blob_service_kwargs=None):
from azure.storage.blob import BlockBlobService
from azure.storage.blob import ContainerClient
self.container = container
self.prefix = normalize_storage_path(prefix)
self.account_name = account_name
self.account_account_url = f"https://{self.account_name}.blob.core.windows.net"
self.account_key = account_key
if blob_service_kwargs is not None:
self.blob_service_kwargs = blob_service_kwargs
else: # pragma: no cover
self.blob_service_kwargs = dict()
self.client = BlockBlobService(self.account_name, self.account_key,
**self.blob_service_kwargs)
self.client = ContainerClient(self.account_account_url, self.container,
credential=self.account_key, **self.blob_service_kwargs)

# needed for pickling
def __getstate__(self):
Expand All @@ -2166,10 +2167,10 @@ def __getstate__(self):
return state

def __setstate__(self, state):
from azure.storage.blob import BlockBlobService
from azure.storage.blob import ContainerClient
self.__dict__.update(state)
self.client = BlockBlobService(self.account_name, self.account_key,
**self.blob_service_kwargs)
self.client = ContainerClient(self.account_account_url, self.container,
credential=self.account_key, **self.blob_service_kwargs)

def _append_path_to_prefix(self, path):
if self.prefix == '':
Expand All @@ -2188,24 +2189,23 @@ def _strip_prefix_from_path(path, prefix):
return path_norm

def __getitem__(self, key):
from azure.common import AzureMissingResourceHttpError
from azure.core.exceptions import ResourceNotFoundError
blob_name = self._append_path_to_prefix(key)
try:
blob = self.client.get_blob_to_bytes(self.container, blob_name)
return blob.content
except AzureMissingResourceHttpError:
return self.client.download_blob(blob_name).readall()
except ResourceNotFoundError:
raise KeyError('Blob %s not found' % blob_name)

def __setitem__(self, key, value):
value = ensure_bytes(value)
blob_name = self._append_path_to_prefix(key)
self.client.create_blob_from_bytes(self.container, blob_name, value)
self.client.upload_blob(blob_name, value, overwrite=True)

def __delitem__(self, key):
from azure.common import AzureMissingResourceHttpError
from azure.core.exceptions import ResourceNotFoundError
try:
self.client.delete_blob(self.container, self._append_path_to_prefix(key))
except AzureMissingResourceHttpError:
self.client.delete_blob(self._append_path_to_prefix(key))
except ResourceNotFoundError:
raise KeyError('Blob %s not found' % key)

def __eq__(self, other):
Expand All @@ -2223,63 +2223,48 @@ def __iter__(self):
list_blobs_prefix = self.prefix + '/'
else:
list_blobs_prefix = None
for blob in self.client.list_blobs(self.container, list_blobs_prefix):
for blob in self.client.list_blobs(list_blobs_prefix):
yield self._strip_prefix_from_path(blob.name, self.prefix)

def __len__(self):
return len(self.keys())

def __contains__(self, key):
blob_name = self._append_path_to_prefix(key)
assert len(blob_name) >= 1
if self.client.exists(self.container, blob_name):
return True
else:
return False
return self.client.get_blob_client(blob_name).exists()

def listdir(self, path=None):
from azure.storage.blob import Blob
dir_path = normalize_storage_path(self._append_path_to_prefix(path))
if dir_path:
dir_path += '/'
items = list()
for blob in self.client.list_blobs(self.container, prefix=dir_path, delimiter='/'):
if type(blob) == Blob:
items.append(self._strip_prefix_from_path(blob.name, dir_path))
else:
items.append(self._strip_prefix_from_path(
blob.name[:blob.name.find('/', len(dir_path))], dir_path))
items = set()
for blob in self.client.walk_blobs(name_starts_with=dir_path, delimiter='/'):
items.add(self._strip_prefix_from_path(blob.name, dir_path))
return items

def rmdir(self, path=None):
dir_path = normalize_storage_path(self._append_path_to_prefix(path))
if dir_path:
dir_path += '/'
for blob in self.client.list_blobs(self.container, prefix=dir_path):
assert len(blob.name) >= 1
self.client.delete_blob(self.container, blob.name)
for blob in self.client.list_blobs(name_starts_with=dir_path):
self.client.delete_blob(blob)

def getsize(self, path=None):
from azure.storage.blob import Blob
store_path = normalize_storage_path(path)
fs_path = self.prefix
if store_path:
fs_path = self._append_path_to_prefix(store_path)

if fs_path != "" and self.client.exists(self.container, fs_path):
return self.client.get_blob_properties(
self.container, fs_path
).properties.content_length
fs_path = self._append_path_to_prefix(store_path)
blob_client = self.client.get_blob_client(fs_path)
if blob_client.exists():
return blob_client.get_blob_properties().size
else:
size = 0
if fs_path == '':
fs_path = None
else:
elif not fs_path.endswith('/'):
fs_path += '/'
for blob in self.client.list_blobs(self.container, prefix=fs_path,
delimiter='/'):
if type(blob) == Blob:
size += blob.properties.content_length
for blob in self.client.walk_blobs(name_starts_with=fs_path, delimiter='/'):
blob_client = self.client.get_blob_client(blob)
if blob_client.exists():
size += blob_client.get_blob_properties().size
return size

def clear(self):
Expand Down