Skip to content

Commit

Permalink
SparseIndex maint: simplify save/load, pull from ebrains dataproxy, g…
Browse files Browse the repository at this point in the history
…eneral maint
  • Loading branch information
AhmetNSimsek committed Jan 24, 2024
1 parent 6bf2ad3 commit ecc1a42
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 167 deletions.
2 changes: 2 additions & 0 deletions e2e/volumes/test_sparsemap_cache_uniqueness.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import siibra
from siibra.volumes.sparsemap import SparseMap


def test_sparsemap_cache_uniqueness():
mp157 = siibra.get_map("julich 3.0", "colin 27", "statistical", spec="157")
mp175 = siibra.get_map("julich 3.0", "colin 27", "statistical", spec="175")
assert isinstance(mp157, SparseMap) and isinstance(mp175, SparseMap)
assert mp157.sparse_index.probs[0] != mp175.sparse_index.probs[0]
2 changes: 1 addition & 1 deletion siibra/retrieval/repositories.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ def get_loader(self, filename, folder="", decode_func=None):
"""Get a lazy loader for a file, for loading data
only once loader.data is accessed."""
filepath = self._build_url(folder, filename)
if filepath.is_file():
if not filepath.is_file():
raise RuntimeError(f"No file is found in {filepath}")
return FileLoader(filepath, decode_func)

Expand Down
4 changes: 2 additions & 2 deletions siibra/retrieval/requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@
".zip": lambda b: ZipFile(BytesIO(b)),
".png": lambda b: skimage_io.imread(BytesIO(b)),
".npy": lambda b: np.load(BytesIO(b)),
"sparseindex.probs.txt": lambda b: b.decode('utf-8').strip().split('\r\n'),
"sparseindex.bboxes.txt": lambda b: b.decode('utf-8').strip().split('\r\n'),
"sparseindex.probs.txt": lambda b: b.decode('utf-8').strip().splitlines(),
"sparseindex.bboxes.txt": lambda b: b.decode('utf-8').strip().splitlines(),
"sparseindex.voxels.nii": lambda b: Nifti1Image.from_bytes(b),
}

Expand Down
257 changes: 111 additions & 146 deletions siibra/volumes/sparsemap.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@
from ..commons import MapIndex, logger, connected_components, siibra_tqdm
from ..locations import boundingbox
from ..retrieval import cache
from ..retrieval.repositories import ZipfileConnector, GitlabConnector
from ..retrieval.requests import HttpRequest, FileLoader
from ..exceptions import (
InsufficientArgumentException, ExcessiveArgumentException
)

from os import path, rename, makedirs
from zipfile import ZipFile, ZIP_DEFLATED
import gzip
from os import path, makedirs
from typing import Dict, Union, TYPE_CHECKING, List
from nilearn import image
import numpy as np
Expand All @@ -34,6 +35,15 @@

class SparseIndex:

# Precomputed sparse indices are stored in an EBRAINS data proxy
_DATAPROXY_BASEURL = "https://data-proxy.ebrains.eu/api/v1/buckets/reference-atlas-data/sparse-indices/"

_SUFFIXES = {
"probs": ".sparseindex.probs.txt.gz",
"bboxes": ".sparseindex.bboxes.txt.gz",
"voxels": ".sparseindex.voxels.nii.gz"
}

def __init__(self):
self.probs = []
self.bboxes = []
Expand Down Expand Up @@ -110,85 +120,107 @@ def mapped_voxels(self, volume: int):
v = [self.probs[i][volume] for i in self.voxels[x, y, z]]
return x, y, z, v

def _to_local_cache(self, cache_prefix: str):
"""
Serialize this index to the cache, using the given prefix for the cache
filenames.
"""
from nibabel import Nifti1Image
probsfile = cache.CACHE.build_filename(f"{cache_prefix}", suffix="probs.txt.gz")
bboxfile = cache.CACHE.build_filename(f"{cache_prefix}", suffix="bboxes.txt.gz")
voxelfile = cache.CACHE.build_filename(f"{cache_prefix}", suffix="voxels.nii.gz")
Nifti1Image(self.voxels, self.affine).to_filename(voxelfile)
with gzip.open(probsfile, 'wt') as f:
for D in self.probs:
f.write("{}\n".format(" ".join(f"{i} {p}" for i, p in D.items())))
with gzip.open(bboxfile, "wt") as f:
for bbox in self.bboxes:
f.write(
"{} {}\n".format(
" ".join(map(str, bbox["minpoint"])),
" ".join(map(str, bbox["maxpoint"])),
)
)

@staticmethod
def _from_local_cache(cache_name: str):
def load(filepath_or_url: str) -> 'SparseIndex':
"""
Attempts to build a sparse index from the siibra cache, looking for
suitable cache files with the specified prefix.
Loads a precomputed SparseIndex to the memory.
Parameters
----------
prefix: str
Prefix of the filenames.
filepath_or_url: str
Path/url to the SparseIndex files
(eg. https://url_to_files/basefilename):
- basefilename.sparseindex.probs.txt.gz
- basefilename.sparseindex.bboxes.txt.gz
- basefilename.sparseindex.voxels.nii.gz
Returns
-------
SparseIndex
None if cached files are not found or suitable.
"""
from nibabel import load

probsfile = cache.CACHE.build_filename(f"{cache_name}", suffix="probs.txt.gz")
bboxfile = cache.CACHE.build_filename(f"{cache_name}", suffix="bboxes.txt.gz")
voxelfile = cache.CACHE.build_filename(f"{cache_name}", suffix="voxels.nii.gz")
if not all(path.isfile(f) for f in [probsfile, bboxfile, voxelfile]):
return None
probsfile = filepath_or_url + SparseIndex._SUFFIXES["probs"]
bboxfile = filepath_or_url + SparseIndex._SUFFIXES["bboxes"]
voxelfile = filepath_or_url + SparseIndex._SUFFIXES["voxels"]
if all(path.isfile(f) for f in [probsfile, bboxfile, voxelfile]):
request = FileLoader
else:
request = HttpRequest

result = SparseIndex()

voxels = load(voxelfile)
voxels = request(voxelfile).get()
result.voxels = np.asanyarray(voxels.dataobj)
result.affine = voxels.affine
result.shape = voxels.shape

with gzip.open(probsfile, "rt") as f:
lines = f.readlines()
for line in siibra_tqdm(
lines,
total=len(lines),
desc="Loading sparse index",
unit="voxels"
):
fields = line.strip().split(" ")
mapindices = list(map(int, fields[0::2]))
values = list(map(float, fields[1::2]))
D = dict(zip(mapindices, values))
result.probs.append(D)

with gzip.open(bboxfile, "rt") as f:
for line in f:
fields = line.strip().split(" ")
result.bboxes.append(
{
"minpoint": tuple(map(int, fields[:3])),
"maxpoint": tuple(map(int, fields[3:])),
}
)
lines_probs = request(probsfile).get()
for line in siibra_tqdm(
lines_probs,
total=len(lines_probs),
desc="Loading sparse index",
unit="voxels"
):
fields = line.strip().split(" ")
mapindices = list(map(int, fields[0::2]))
values = list(map(float, fields[1::2]))
D = dict(zip(mapindices, values))
result.probs.append(D)

lines_bboxes = request(bboxfile).get()
for line in lines_bboxes:
fields = line.strip().split(" ")
result.bboxes.append({
"minpoint": tuple(map(int, fields[:3])),
"maxpoint": tuple(map(int, fields[3:])),
})

return result

def save(self, base_filename: str, folder: str = ""):
"""
Save SparseIndex (3x) files to under the folder base_filename_sparseindex
with base_filename. If sparseIndex is not cached, siibra will firt
create it first.
Parameters
----------
base_filename: str
The files that will be created as:
- base_filename.sparseindex.probs.txt.gz
- base_filename.sparseindex.bboxes.txt.gz
- base_filename.sparseindex.voxels.nii.gz
folder: str, default=""
"""
from nibabel import Nifti1Image
import gzip
savefolder = path.join(folder, f"{base_filename}_sparseindex")
fullpath = path.join(savefolder, base_filename)
logger.info(f"Saving SparseIndex to '{base_filename}' with suffixes {SparseIndex._SUFFIXES}")

if not path.isdir(savefolder):
makedirs(savefolder)

Nifti1Image(self.voxels, self.affine).to_filename(
fullpath + SparseIndex._SUFFIXES["voxels"]
)
with gzip.open(fullpath + SparseIndex._SUFFIXES["probs"], 'wt') as f:
for D in self.probs:
f.write(
"{}\n".format(
" ".join(f"{i} {p}" for i, p in D.items())
)
)
with gzip.open(fullpath + SparseIndex._SUFFIXES["bboxes"], "wt") as f:
for bbox in self.bboxes:
f.write(
"{} {}\n".format(
" ".join(map(str, bbox["minpoint"])),
" ".join(map(str, bbox["maxpoint"])),
)
)
logger.info(f"SparseIndex is saved to {fullpath}.")


class SparseMap(parcellationmap.Map):
"""
Expand All @@ -210,10 +242,6 @@ class SparseMap(parcellationmap.Map):
to the actual (probability) value.
"""

# A gitlab instance with holds precomputed sparse indices
_GITLAB_SERVER = 'https://jugit.fz-juelich.de'
_GITLAB_PROJECT = 5779

def __init__(
self,
identifier: str,
Expand Down Expand Up @@ -251,24 +279,25 @@ def _cache_prefix(self):
@property
def sparse_index(self):
if self._sparse_index_cached is None:
spind = SparseIndex._from_local_cache(self._cache_prefix)
if spind is None:
logger.info("Downloading precomputed SparseIndex...")
gconn = GitlabConnector(self._GITLAB_SERVER, self._GITLAB_PROJECT, "main")
zip_fname = f"{self.name.replace(' ', '_').replace('statistical', 'continuous')}_index.zip"
# try loading from cache on disk
try:
spind = SparseIndex.load(self._cache_prefix)
except Exception:
spind = None
if spind is None: # try loading from precomputed source
logger.info("Loading precomputed SparseIndex...")
fname = f"{self.name.replace(' ', '_').replace('statistical', 'continuous')}"
try:
assert zip_fname in gconn.search_files(), f"{zip_fname} is not in {gconn}."
zipfile = gconn.get_loader(zip_fname).url
spind = self.load_zipped_sparseindex(zipfile)
spind = SparseIndex.load(SparseIndex._DATAPROXY_BASEURL + fname)
except Exception:
logger.info("Failed to load precomputed SparseIndex from Gitlab.")
logger.debug(f"Could not load SparseIndex from Gitlab at {gconn}", exc_info=1)
if spind is None:
logger.info("Failed to fetch precomputed SparseIndex.")
logger.debug("Error:", exc_info=1)
if spind is None: # Download each map and compute the SparseIndex
with provider.SubvolumeProvider.UseCaching():
spind = SparseIndex()
for vol in siibra_tqdm(
range(len(self)), total=len(self), unit="maps",
desc=f"Fetching {len(self)} volumetric maps"
desc="Fetching volumetric maps and computing SparseIndex"
):
img = super().fetch(
index=MapIndex(volume=vol, label=None)
Expand All @@ -278,7 +307,7 @@ def sparse_index(self):
logger.error(f"Cannot retrieve volume #{vol} for {region.name}, it will not be included in the sparse map.")
continue
spind.add_img(np.asanyarray(img.dataobj), img.affine)
spind._to_local_cache(self._cache_prefix)
spind.save(self._cache_prefix, folder=cache.CACHE.folder)
self._sparse_index_cached = spind
assert self._sparse_index_cached.max() == len(self._sparse_index_cached.probs) - 1
return self._sparse_index_cached
Expand All @@ -291,70 +320,6 @@ def affine(self):
def shape(self):
return self.sparse_index.shape

def save_sparseindex(self, destination: str, filename: str = None):
"""
Save SparseIndex as a .zip in destination folder from local cache. If
SparseIndex is not cached, siibra will firt create it first.
Parameters
----------
destination: str
The path where the zip file will be created.
filename: str, default=None
Name of the zip and prefix of the SparseIndex files. If None, siibra
uses `name` property.
"""
if filename is None:
filename = f"{self.name.replace(' ', '_')}_index"
logger.info(f"Saving SparseIndex of '{self.name}' as '{filename}.zip'")
if not path.isdir(destination):
makedirs(destination)
if self._sparse_index_cached is None:
_ = self.sparse_index
suffices = [".probs.txt.gz", ".bboxes.txt.gz", ".voxels.nii.gz"]
try:
with ZipFile(f"{destination}/{filename}.zip", 'w') as zipf:
for suffix in suffices:
zipf.write(
filename=cache.CACHE.build_filename(self._cache_prefix, suffix),
arcname=path.basename(f"{filename}{suffix}"),
compress_type=ZIP_DEFLATED
)
except Exception as e:
logger.error("Could not save SparseIndex:\n")
raise e
logger.info("SparseIndex is saved.")

def load_zipped_sparseindex(self, zipfname: str):
"""
Load SparseIndex from previously computed source and creates a local
cache.
Parameters
----------
zipfile: str
A url or a path to zip file containing the SparseIndex files for
this SparseMap precomputed by siibra.
Returns
-------
SparseIndex
"""
zconn = ZipfileConnector(zipfname)
with ZipFile(zconn.zipfile, 'r') as zp:
suffices = [".probs.txt.gz", ".bboxes.txt.gz", ".voxels.nii.gz"]
for suffix in suffices:
file = [f for f in zconn.search_files(suffix=suffix)]
assert len(file) == 1, f"Could not find a unique '{suffix}' file in {zipfname}."
zp.extract(file[0], cache.CACHE.folder)
rename(
path.join(cache.CACHE.folder, file[0]),
cache.CACHE.build_filename(self._cache_prefix, suffix=suffix)
)
zconn.clear_cache()

return SparseIndex._from_local_cache(self._cache_prefix)

def fetch(
self,
region_or_index: Union[MapIndex, str, 'Region'] = None,
Expand Down Expand Up @@ -390,7 +355,7 @@ def fetch(
assert length == 1
except AssertionError:
if length > 1:
raise parcellationmap.ExcessiveArgumentException(
raise ExcessiveArgumentException(
"One and only one of region_or_index, region, index can be defined for fetch"
)
# user can provide no arguments, which assumes one and only one volume present
Expand All @@ -416,7 +381,7 @@ def fetch(
assert len(self) == 1
volidx = 0
except AssertionError:
raise parcellationmap.InsufficientArgumentException(
raise InsufficientArgumentException(
f"{self.__class__.__name__} provides {len(self)} volumes. "
"Specify 'region' or 'index' for fetch() to identify one."
)
Expand Down
Loading

0 comments on commit ecc1a42

Please sign in to comment.