1
1
import enum
2
+ import itertools
3
+ import logging
4
+
5
+ from .ancestry import read
6
+
7
+ logger = logging .getLogger (__name__ )
2
8
3
9
4
10
class PopulationType (enum .Enum ):
@@ -7,10 +13,100 @@ class PopulationType(enum.Enum):
7
13
8
14
9
15
class PrincipalComponents :
10
- def __init__ (
11
- self , pcs_path , psam_path , npcs_popcomp , npmcs_normalisation , pop_type
12
- ):
13
- self .pcs_path = pcs_path
16
+ """
17
+ This class represents principal components data calculated by fraposa-pgsc
18
+
19
+ >>> ref_pc = PrincipalComponents(pcs_path=[Config.ROOT_DIR / "tests" / "ref.pcs"], dataset="reference", psam_path=Config.ROOT_DIR / "tests" / "ref.psam", pop_type=PopulationType.REFERENCE)
20
+ >>> ref_pc
21
+ PrincipalComponents(dataset='reference', pop_type=PopulationType.REFERENCE, pcs_path=[PosixPath('.../pgscatalog.calclib/tests/ref.pcs')], psam_path=PosixPath('.../pgscatalog.calclib/tests/ref.psam'))
22
+ >>> ref_pc.df.to_dict()
23
+ {'PC1': {('reference', 'HG00096'): -23.8212, ('reference', 'HG00097'): -24.8106, ...
24
+
25
+ >>> target_pcs = PrincipalComponents(pcs_path=Config.ROOT_DIR / "tests" / "target.pcs", dataset="target", pop_type=PopulationType.TARGET)
26
+ >>> target_pcs
27
+ PrincipalComponents(dataset='target', pop_type=PopulationType.TARGET, pcs_path=[PosixPath('.../pgscatalog.calclib/tests/target.pcs')], psam_path=None)
28
+ >>> target_pcs.df.to_dict()
29
+ {'PC1': {('target', 'HGDP00001'): -18.5135, ('target', 'HGDP00003'): -18.8314, ...
30
+ """
31
+
32
+ def __init__ (self , pcs_path , dataset , pop_type , psam_path = None , ** kwargs ):
33
+ try :
34
+ self .pcs_path = list (itertools .chain (pcs_path )) # handle a list of paths
35
+ except TypeError :
36
+ self .pcs_path = [pcs_path ] # or a single path
37
+
38
+ self .dataset = dataset
14
39
self .psam_path = psam_path
15
- self .max_pcs = max ([10 , npcs_popcomp , npmcs_normalisation ])
16
40
self .pop_type = pop_type
41
+
42
+ if self .psam_path is None and self .pop_type == PopulationType .REFERENCE :
43
+ raise ValueError ("Reference requires psam_path" )
44
+
45
+ self ._npcs_popcomp = kwargs .get ("npcs_popcomp" , 5 )
46
+ self ._npcs_norm = kwargs .get ("npcs_normalization" , 4 )
47
+
48
+ self ._df = None
49
+ # File of related sample IDs (excluded from training ancestry assignments)
50
+ self ._related_ids = kwargs .get ("related_id_path" , None )
51
+ # Population labels in REFERENCE psam to use for assignment
52
+ self ._poplabel = kwargs .get ("ref_label" , "SuperPop" )
53
+
54
+ def __repr__ (self ):
55
+ return f"PrincipalComponents(dataset={ self .dataset !r} , pop_type={ self .pop_type } , pcs_path={ self .pcs_path !r} , psam_path={ self .psam_path !r} )"
56
+
57
+ @property
58
+ def max_pcs (self ):
59
+ """The maximum number of PCs used in calculations"""
60
+ return max ([10 , self ._npcs_popcomp , self ._npcs_norm ])
61
+
62
+ @property
63
+ def npcs_popcomp (self ):
64
+ """Number of PCs used for population comparison (default = 5)"""
65
+ return self ._npcs_popcomp
66
+
67
+ @npcs_popcomp .setter
68
+ def npcs_popcomp (self , value ):
69
+ if 1 <= value <= 20 :
70
+ self ._npcs_popcomp = int (value )
71
+ else :
72
+ raise ValueError ("Must be integer between 1 and 20" )
73
+
74
+ @property
75
+ def npcs_norm (self ):
76
+ """Number of PCs used for population NORMALIZATION (default = 4)"""
77
+ return self ._npcs_norm
78
+
79
+ @npcs_norm .setter
80
+ def npcs_norm (self , value ):
81
+ if 1 <= value <= 20 :
82
+ self ._npcs_popcomp = int (value )
83
+ else :
84
+ raise ValueError ("Must be integer between 1 and 20" )
85
+
86
+ @property
87
+ def df (self ):
88
+ if self ._df is None :
89
+ self ._df = read .read_pcs (
90
+ loc_pcs = self .pcs_path ,
91
+ dataset = self .dataset ,
92
+ loc_related_ids = self ._related_ids ,
93
+ nPCs = self .max_pcs ,
94
+ )
95
+
96
+ if self .pop_type == PopulationType .REFERENCE :
97
+ self ._df = read .extract_ref_psam_cols (
98
+ loc_psam = self .psam_path ,
99
+ dataset = self .dataset ,
100
+ df_target = self ._df ,
101
+ keepcols = self ._poplabel ,
102
+ )
103
+
104
+ if self ._df .shape [0 ] < 100 and self .pop_type == PopulationType .REFERENCE :
105
+ logger .critical (
106
+ "Error: too few reference panel samples. This is an arbitrary threshold "
107
+ "for input QC; however, it is inadvisable to run this analysis with limited "
108
+ "reference panel diversity as empirical percentiles are calculated."
109
+ )
110
+ raise ValueError
111
+
112
+ return self ._df
0 commit comments