-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathpreprocess.py
98 lines (87 loc) · 3.45 KB
/
preprocess.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import utils as utils
import h5py
import scipy as sp
import numpy as np
import scanpy as sc
import pandas as pd
def read_clean(data):
assert isinstance(data, np.ndarray)
if data.dtype.type is np.bytes_:
data = utils.decode(data)
if data.size == 1:
data = data.flat[0]
return data
def dict_from_group(group):
assert isinstance(group, h5py.Group)
d = utils.dotdict()
for key in group:
if isinstance(group[key], h5py.Group):
value = dict_from_group(group[key])
else:
value = read_clean(group[key][...])
d[key] = value
return d
def read_data(filename, sparsify = False, skip_exprs = False):
with h5py.File(filename, "r") as f:
obs = pd.DataFrame(dict_from_group(f["obs"]), index = utils.decode(f["obs_names"][...]))
var = pd.DataFrame(dict_from_group(f["var"]), index = utils.decode(f["var_names"][...]))
uns = dict_from_group(f["uns"])
if not skip_exprs:
exprs_handle = f["exprs"]
if isinstance(exprs_handle, h5py.Group):
mat = sp.sparse.csr_matrix((exprs_handle["data"][...], exprs_handle["indices"][...],
exprs_handle["indptr"][...]), shape = exprs_handle["shape"][...])
else:
mat = exprs_handle[...].astype(np.float32)
if sparsify:
mat = sp.sparse.csr_matrix(mat)
else:
mat = sp.sparse.csr_matrix((obs.shape[0], var.shape[0]))
return mat, obs, var, uns
def prepro(filename):
data_path = filename
mat, obs, var, uns = read_data(data_path, sparsify=False, skip_exprs=False)
if isinstance(mat, np.ndarray):
X = np.array(mat)
else:
X = np.array(mat.toarray())
cell_name = np.array(obs["cell_type1"])
cell_type, cell_label = np.unique(cell_name, return_inverse=True)
return X, cell_label
def normalize(adata, copy=True, highly_genes = None, filter_min_counts=True, size_factors=True, normalize_input=True, logtrans_input=True):
if isinstance(adata, sc.AnnData):
if copy:
adata = adata.copy()
elif isinstance(adata, str):
adata = sc.read(adata)
else:
raise NotImplementedError
norm_error = 'Make sure that the dataset (adata.X) contains unnormalized count data.'
assert 'n_count' not in adata.obs, norm_error
if adata.X.size < 50e6: # check if adata.X is integer only if array is small
if sp.sparse.issparse(adata.X):
assert (adata.X.astype(int) != adata.X).nnz == 0, norm_error
else:
assert np.all(adata.X.astype(int) == adata.X), norm_error
if filter_min_counts:
sc.pp.filter_genes(adata, min_counts=1)
sc.pp.filter_cells(adata, min_counts=1)
if size_factors or normalize_input or logtrans_input:
adata.raw = adata.copy()
else:
adata.raw = adata
if size_factors:
sc.pp.normalize_per_cell(adata)
adata.obs['size_factors'] = adata.obs.n_counts / np.median(adata.obs.n_counts)
else:
adata.obs['size_factors'] = 1.0
if logtrans_input:
sc.pp.log1p(adata)
if highly_genes != None:
sc.pp.highly_variable_genes(adata, min_mean=0.0125, max_mean=3, min_disp=0.5, n_top_genes = highly_genes, subset=True)
if normalize_input:
sc.pp.scale(adata)
return adata