1
+ import logging
1
2
import pathlib
3
+ from collections import namedtuple
2
4
3
5
import pandas as pd
4
6
7
+ from .ancestry .tools import choose_pval_threshold , compare_ancestry
8
+ from .principalcomponents import PopulationType
9
+ from .ancestry import read
10
+
11
+
12
+ logger = logging .getLogger (__name__ )
13
+
14
+
15
+ class AggregatedPGS :
16
+ """A PGS that's been aggregated and melted, and may contain multiple samplesets
17
+
18
+ >>> from ._config import Config
19
+ >>> score_path = Config.ROOT_DIR / "tests" / "aggregated_scores.txt.gz"
20
+ >>> AggregatedPGS(path=score_path, target_name="hgdp")
21
+ AggregatedPGS(path=PosixPath('.../pgscatalog.calclib/tests/aggregated_scores.txt.gz'))
22
+
23
+ """
24
+
25
+ def __init__ (self , * , target_name , df = None , path = None ):
26
+ if df is None and path is None :
27
+ raise TypeError ("df or path is required" )
28
+
29
+ self ._path = path
30
+ self ._df = None
31
+ self ._target_name = target_name
32
+
33
+ @property
34
+ def path (self ):
35
+ return self ._path
36
+
37
+ @property
38
+ def target_name (self ):
39
+ return self ._target_name
40
+
41
+ @property
42
+ def df (self ):
43
+ if self ._df is None :
44
+ self ._df = read .read_pgs (self ._path )
45
+ return self ._df
46
+
47
+ def __repr__ (self ):
48
+ return f"{ type (self ).__name__ } (path={ repr (self .path )} )"
49
+
50
+ def _check_overlap (self , ref_pc , target_pc ):
51
+ """Before adjusting, there should be perfect target sample overlap"""
52
+ pca_ref_samples = set (ref_pc .df .index .get_level_values (1 ))
53
+ pca_target_samples = set (target_pc .df .index .get_level_values (1 ))
54
+ score_ref_samples = set (self .df .loc ["reference" ].index )
55
+ score_target_samples = set (self .df .loc [self .target_name ].index )
56
+
57
+ if not pca_ref_samples .issubset (score_ref_samples ):
58
+ logger .critical (
59
+ "Error: PGS data missing for reference samples with PCA data"
60
+ )
61
+ raise ValueError
62
+
63
+ if not pca_target_samples .issubset (score_target_samples ):
64
+ logger .critical ("Error: PGS data missing for target samples with PCA data." )
65
+ raise ValueError
66
+
67
+ def adjust (self , * , ref_pc , target_pc , ** kwargs ):
68
+ """
69
+ >>> from ._config import Config
70
+ >>> ref_pc = PrincipalComponents(pcs_path=[Config.ROOT_DIR / "tests" / "ref.pcs"], dataset="reference", psam_path=Config.ROOT_DIR / "tests" / "ref.psam", pop_type=PopulationType.REFERENCE)
71
+ >>> target_pcs = PrincipalComponents(pcs_path=Config.ROOT_DIR / "tests" / "target.pcs", dataset="target", pop_type=PopulationType.TARGET)
72
+ >>> score_path = Config.ROOT_DIR / "tests" / "aggregated_scores.txt.gz"
73
+ >>> AggregatedPGS(path=score_path, target_name="hgdp").adjust(ref_pc=ref_pc, target_pc=target_pcs)
74
+ """
75
+ if ref_pc .pop_type != PopulationType .REFERENCE :
76
+ raise ValueError ("ref_pc argument has bad pop_type" )
77
+
78
+ if target_pc .pop_type != PopulationType .TARGET :
79
+ raise ValueError ("target_pc argument has bad pop_type" )
80
+
81
+ self ._check_overlap (ref_pc = ref_pc , target_pc = target_pc )
82
+
83
+ # join pgs + pca data
84
+ target_df = target_pc .df .join (self .df .loc [self .target_name ], on = "IID" )
85
+ reference_df = ref_pc .df .join (self .df .loc ["reference" ], on = "IID" )
86
+
87
+ # set up
88
+ ancestry_args = namedtuple ("ancestry_args" , ["method_compare" , "pThreshold" ])
89
+ args = ancestry_args (
90
+ kwargs .get ("method_compare" , "RandomForest" ), kwargs .get ("pThreshold" , None )
91
+ )
92
+ assignment_threshold_p = choose_pval_threshold (args )
93
+
94
+ # TODO: bork
95
+ ancestry_ref , ancestry_target , compare_info = compare_ancestry (
96
+ ref_df = reference_df ,
97
+ ref_pop_col = ref_pc .poplabel ,
98
+ ref_train_col = "Unrelated" ,
99
+ target_df = target_df ,
100
+ n_pcs = ref_pc .npcs_popcomp ,
101
+ method = args .method_compare ,
102
+ p_threshold = assignment_threshold_p ,
103
+ )
104
+
105
+ pass
106
+
5
107
6
108
class PolygenicScore :
7
109
"""Represents the output of plink2 --score written to a file
8
110
9
111
>>> from ._config import Config
112
+ >>> import reprlib
10
113
>>> score1 = Config.ROOT_DIR / "tests" / "cineca_22_additive_0.sscore.zst"
11
114
>>> pgs1 = PolygenicScore(sampleset="test", path=score1) # doctest: +ELLIPSIS
12
115
>>> pgs1
13
116
PolygenicScore(sampleset='test', path=PosixPath('.../cineca_22_additive_0.sscore.zst'))
14
117
>>> pgs2 = PolygenicScore(sampleset="test", path=score1)
15
- >>> pgs1.read().to_dict() # doctest: +ELLIPSIS
16
- {'DENOM': ...}, 'PGS001229_22_SUM': {('test', 'HG00096'): 0.54502, ('test', 'HG00097'): 0.674401, ('test', 'HG00099'): 0.63727, ('test', 'HG00100'): 0.863944, ...}}
118
+ >>> reprlib.repr( pgs1.read().to_dict()) # doctest: +ELLIPSIS
119
+ " {'DENOM': {('test', 'HG00096'): 1564, ('test', 'HG00097'): 1564, ('test', 'HG00099'): 1564, ('test', 'HG00100'): 1564, ...}, 'PGS001229_22_SUM': {('test', 'HG00096'): 0.54502, ('test', 'HG00097'): 0.674401, ('test', 'HG00099'): 0.63727, ('test', 'HG00100'): 0.863944, ...}}"
17
120
18
121
It's often helpful to combine PGS that were split per chromosome or by effect type:
19
122
@@ -23,8 +126,8 @@ class PolygenicScore:
23
126
24
127
Once a score has been fully aggregated it can be helpful to recalculate an average:
25
128
26
- >>> aggregated_score.average().to_dict() # doctest: +ELLIPSIS
27
- {'DENOM': ...}, 'PGS001229_22_SUM ': {('test', 'HG00096'): 1.09004, ...}, 'PGS001229_22_AVG': { ('test', 'HG00096 '): 0.000348 ...
129
+ >>> reprlib.repr( aggregated_score.average().to_dict() ) # doctest: +ELLIPSIS
130
+ " {'DENOM': {('test', 'HG00096'): 3128, ('test', 'HG00097'): 3128, ('test', 'HG00099'): 3128, ('test', 'HG00100'): 3128, ...}, 'PGS001229_22_AVG ': {('test', 'HG00096'): 0.0003484782608695652, ('test', 'HG00097'): 0.00043120268542199493, ('test', 'HG00099'): 0.0004074616368286445, ('test', 'HG00100 '): 0.0005523938618925831, ...}}"
28
131
29
132
Scores can be written to a TSV file:
30
133
@@ -100,7 +203,9 @@ def lazy_read(self):
100
203
if self .path is None :
101
204
raise ValueError ("Missing path" )
102
205
103
- for chunk in pd .read_csv (self .path , chunksize = self ._chunksize , sep = "\t " ):
206
+ for chunk in pd .read_csv (
207
+ self .path , chunksize = self ._chunksize , sep = "\t " , converters = {"IID" : str }
208
+ ):
104
209
df = chunk .assign (sampleset = self .sampleset ).set_index (["sampleset" , "#IID" ])
105
210
106
211
df .index .names = ["sampleset" , "IID" ]
@@ -113,7 +218,7 @@ def read(self):
113
218
raise ValueError ("Missing path" )
114
219
115
220
df = (
116
- pd .read_csv (self .path , sep = "\t " )
221
+ pd .read_csv (self .path , sep = "\t " , converters = { "IID" : str } )
117
222
.assign (sampleset = self .sampleset )
118
223
.set_index (["sampleset" , "#IID" ])
119
224
)
@@ -144,13 +249,19 @@ def average(self):
144
249
"""
145
250
chunk_list = []
146
251
for chunk in self :
147
- avgs = chunk .loc [:, chunk . columns . str . endswith ( "_SUM" )]. divide (
148
- chunk [ " DENOM" ] , axis = 0
149
- )
252
+ avgs = chunk .filter ( regex = "SUM$" )
253
+ avgs = avgs . divide ( chunk . DENOM , axis = 0 )
254
+ avgs . insert ( 0 , "DENOM" , chunk . DENOM )
150
255
avgs .columns = avgs .columns .str .replace ("_SUM" , "_AVG" )
151
- chunk_list .append (pd . concat ([ chunk , avgs ], axis = 1 ) )
256
+ chunk_list .append (avgs )
152
257
return pd .concat (chunk_list , axis = 0 )
153
258
259
+ def melt (self ):
260
+ """Melt dataframe from wide format to long format"""
261
+ sum_df = _melt (pd .concat (self .df , axis = 0 ), value_name = "SUM" )
262
+ avg_df = _melt (self .average (), value_name = "AVG" )
263
+ return pd .concat ([sum_df , avg_df .AVG ], axis = 1 )
264
+
154
265
155
266
def _select_agg_cols (cols ):
156
267
"""Select aggregatable columns"""
@@ -163,7 +274,6 @@ def _select_agg_cols(cols):
163
274
164
275
165
276
def _melt (df , value_name ):
166
- """Melt the score dataframe from wide format to long format"""
167
277
df = df .melt (
168
278
id_vars = ["DENOM" ],
169
279
value_name = value_name ,
0 commit comments