Skip to content

Commit 9b31676

Browse files
committed
add calcapp
1 parent 27d52da commit 9b31676

32 files changed

+1001
-52
lines changed

.github/workflows/calcapp-pytest.yml

+13
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
on:
2+
push:
3+
paths:
4+
- 'pgscatalog.calcapp/**.py'
5+
pull_request:
6+
paths:
7+
- 'pgscatalog.calcapp/**.py'
8+
9+
jobs:
10+
calcapp-pytest:
11+
uses: ./.github/workflows/pytest.yaml
12+
with:
13+
package-directory: "pgscatalog.calcapp"

pgscatalog.calcapp/README.md

Whitespace-only changes.

pgscatalog.calcapp/poetry.lock

+720
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pgscatalog.calcapp/poetry.toml

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
[virtualenvs]
2+
create = true
3+
in-project = true

pgscatalog.calcapp/pyproject.toml

+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
[tool.poetry]
2+
name = "pgscatalog-calcapp"
3+
version = "0.1.0"
4+
description = ""
5+
authors = ["Benjamin Wingfield <bwingfield@ebi.ac.uk>"]
6+
readme = "README.md"
7+
packages = [
8+
{ include = "pgscatalog", from = "src" },
9+
]
10+
11+
[tool.poetry.dependencies]
12+
python = "^3.11"
13+
"pgscatalog.calclib" = {path = "../pgscatalog.calclib", develop = true}
14+
15+
[tool.poetry.group.dev.dependencies]
16+
pytest = "^8.0.0"
17+
18+
[tool.poetry.scripts]
19+
pgscatalog-aggregate = 'pgscatalog.downloadapp.cli:run'
20+
21+
[build-system]
22+
requires = ["poetry-core"]
23+
build-backend = "poetry.core.masonry.api"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .aggregate_cli import run_aggregate
2+
3+
__all__ = ["run_aggregate"]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
import argparse
2+
import pathlib
3+
import textwrap
4+
import operator
5+
import functools
6+
7+
from pgscatalog.calclib.polygenicscore import PolygenicScore
8+
9+
10+
def run_aggregate():
11+
args = _parse_args()
12+
score_paths = [pathlib.Path(x) for x in args.scores]
13+
pgs = [PolygenicScore(path=x) for x in score_paths]
14+
# call __add__ a lot
15+
aggregated = functools.reduce(operator.add, pgs)
16+
aggregated.write(outdir=args.outdir, split=args.split)
17+
18+
19+
def _description_text() -> str:
20+
return textwrap.dedent(
21+
"""
22+
Aggregate plink .sscore files into a combined TSV table.
23+
24+
This aggregation sums scores that were calculated from plink
25+
.scorefiles. Scorefiles may be split to calculate scores over different
26+
chromosomes or effect types. The PGS Catalog calculator automatically splits
27+
scorefiles where appropriate, and uses this script to combine them.
28+
29+
Input .sscore files can be optionally compressed with zstd or gzip.
30+
31+
The aggregated output scores are compressed with gzip.
32+
"""
33+
)
34+
35+
36+
def _parse_args(args=None):
37+
parser = argparse.ArgumentParser(
38+
description=_description_text(),
39+
formatter_class=argparse.RawDescriptionHelpFormatter,
40+
)
41+
parser.add_argument(
42+
"-s",
43+
"--scores",
44+
dest="scores",
45+
required=True,
46+
nargs="+",
47+
help="<Required> List of scorefile paths. Use a wildcard (*) to select multiple files.",
48+
)
49+
parser.add_argument(
50+
"-o",
51+
"--outdir",
52+
dest="outdir",
53+
required=True,
54+
default="scores/",
55+
help="<Required> Output directory to store downloaded files",
56+
)
57+
parser.add_argument(
58+
"--split",
59+
dest="split",
60+
required=True,
61+
action=argparse.BooleanOptionalAction,
62+
help="Make one aggregated file per sampleset",
63+
)
64+
parser.add_argument(
65+
"-v",
66+
"--verbose",
67+
dest="verbose",
68+
action="store_true",
69+
help="<Optional> Extra logging information",
70+
)
71+
return parser.parse_args(args)
72+
73+
74+
if __name__ == "__main__":
75+
run_aggregate()
+67
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
import itertools
2+
from unittest.mock import patch
3+
4+
from pgscatalog.calcapp.aggregate_cli import run_aggregate
5+
6+
import pytest
7+
import pandas as pd
8+
9+
10+
@pytest.fixture(scope="module")
11+
def scorefiles(request):
12+
return [str(x) for x in list(request.path.parent.glob("testdata/*.zst"))]
13+
14+
15+
def test_split_aggregate(tmp_path_factory, scorefiles):
16+
"""Test aggregating HGDP PGS01229"""
17+
outdir = tmp_path_factory.mktemp("outdir")
18+
19+
args = [
20+
("pgscatalog-aggregate", "-s", *scorefiles, "--outdir", str(outdir), "--split")
21+
]
22+
flargs = list(itertools.chain(*args))
23+
24+
with patch("sys.argv", flargs):
25+
run_aggregate()
26+
27+
outf = list(outdir.glob("*.txt.gz"))
28+
assert [x.name for x in outf] == ["hgdp_pgs.txt.gz"]
29+
outdf = pd.read_csv(outf[0], sep="\t")
30+
assert list(outdf.columns) == [
31+
"sampleset",
32+
"IID",
33+
"DENOM",
34+
"PGS001229_hmPOS_GRCh38_SUM",
35+
]
36+
assert outdf.shape == (929, 4)
37+
38+
39+
def test_nosplit_aggregate(tmp_path_factory, scorefiles):
40+
"""Test aggregating HGDP PGS01229 without splitting per sampleset"""
41+
outdir = tmp_path_factory.mktemp("outdir")
42+
43+
args = [
44+
(
45+
"pgscatalog-aggregate",
46+
"-s",
47+
*scorefiles,
48+
"--outdir",
49+
str(outdir),
50+
"--no-split",
51+
)
52+
]
53+
flargs = list(itertools.chain(*args))
54+
55+
with patch("sys.argv", flargs):
56+
run_aggregate()
57+
58+
outf = list(outdir.glob("*.txt.gz"))
59+
assert [x.name for x in outf] == ["aggregated_scores.txt.gz"]
60+
outdf = pd.read_csv(outf[0], sep="\t")
61+
assert list(outdf.columns) == [
62+
"sampleset",
63+
"IID",
64+
"DENOM",
65+
"PGS001229_hmPOS_GRCh38_SUM",
66+
]
67+
assert outdf.shape == (929, 4)
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.

pgscatalog.calclib/src/pgscatalog/calclib/polygenicscore.py

+97-52
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,36 @@
11
import pathlib
22

33
import pandas as pd
4-
import reprlib
54

65

76
class PolygenicScore:
87
"""Represents the output of plink2 --score written to a file
98
109
>>> from ._config import Config
1110
>>> score1 = Config.ROOT_DIR / "tests" / "cineca_22_additive_0.sscore.zst"
12-
>>> pgs1 = PolygenicScore(sampleset="test", path=score1) # doctest: +ELLIPSIS
11+
>>> pgs1 = PolygenicScore(path=score1) # doctest: +ELLIPSIS
1312
>>> pgs1
14-
PolygenicScore(sampleset='test', path=PosixPath('.../cineca_22_additive_0.sscore.zst'), df=None)
15-
>>> pgs2 = PolygenicScore(sampleset="test", path=score1)
16-
>>> pgs1.read().to_dict() # doctest: +ELLIPSIS
17-
{'DENOM': ...}, 'PGS001229_22_SUM': {('test', 'HG00096'): 0.54502, ('test', 'HG00097'): 0.674401, ('test', 'HG00099'): 0.63727, ('test', 'HG00100'): 0.863944, ...}}
13+
PolygenicScore(sampleset='cineca', path=PosixPath('.../cineca_22_additive_0.sscore.zst'))
14+
>>> pgs2 = PolygenicScore(path=score1)
1815
1916
It's often helpful to combine PGS that were split per chromosome or by effect type:
2017
2118
>>> aggregated_score = pgs1 + pgs2
2219
>>> aggregated_score # doctest: +ELLIPSIS
23-
PolygenicScore(sampleset='test', path=None, df={'DENOM': ...}, 'PGS001229_22_SUM': {('test', 'HG00096'): 1.09004, ('test', 'HG00097'): 1.348802, ('test', 'HG00099'): 1.27454, ('test', 'HG00100'): 1.727888, ...}})
20+
PolygenicScore(sampleset='cineca', path=None)
21+
22+
The backing dataframe is loaded lazily in chunks:
23+
24+
>>> for chunk in aggregated_score:
25+
... chunk.to_dict()
26+
... break
27+
{'DENOM': {('cineca', 'HG00096'): 3128, ...}, 'PGS001229_22_SUM': {('cineca', 'HG00096'): 1.09004, ...}}
28+
2429
2530
Once a score has been fully aggregated it can be helpful to recalculate an average:
2631
2732
>>> aggregated_score.average().to_dict() # doctest: +ELLIPSIS
28-
{'DENOM': ...}, 'PGS001229_22_SUM': {('test', 'HG00096'): 1.09004, ...}, 'PGS001229_22_AVG': {('test', 'HG00096'): 0.000348...
33+
{'DENOM': ...}, 'PGS001229_22_SUM': {('cineca', 'HG00096'): 1.09004, ...}, 'PGS001229_22_AVG': {('cineca', 'HG00096'): 0.000348...
2934
3035
Scores can be written to a TSV file:
3136
@@ -40,10 +45,10 @@ class PolygenicScore:
4045
>>> splitoutd = tempfile.mkdtemp()
4146
>>> aggregated_score.write(splitoutd, split=True)
4247
>>> sorted(os.listdir(splitoutd), key = lambda x: x.split("_")[0])
43-
['test_pgs.txt.gz']
48+
['cineca_pgs.txt.gz']
4449
"""
4550

46-
def __init__(self, *, sampleset, path=None, df=None):
51+
def __init__(self, *, path=None, df=None, sampleset=None):
4752
match (path, df):
4853
case (None, None):
4954
raise ValueError("init with path or df")
@@ -52,62 +57,102 @@ def __init__(self, *, sampleset, path=None, df=None):
5257
case _:
5358
pass
5459

55-
self.path = path
56-
self.df = df
57-
self.sampleset = sampleset
60+
try:
61+
self.path = pathlib.Path(path)
62+
except TypeError:
63+
self.path = None
64+
65+
if sampleset is None:
66+
self.sampleset = path.stem.split("_")[0]
67+
else:
68+
self.sampleset = sampleset
69+
70+
self._chunksize = 50000
71+
72+
if df is not None:
73+
# big df is an in-memory pandas df
74+
self._bigdf = df
75+
else:
76+
self._bigdf = None
77+
self._df = None
5878

5979
def __repr__(self):
60-
if self.df is not None:
61-
df = reprlib.repr(self.df.to_dict())
80+
return f"{type(self).__name__}(sampleset={repr(self.sampleset)}, path={repr(self.path)})"
81+
82+
def __iter__(self):
83+
yield from self.df
84+
85+
def __add__(self, other):
86+
if isinstance(other, PolygenicScore):
87+
dfs = []
88+
for df1, df2 in zip(self, other, strict=True):
89+
sumdf = df1.add(df2, fill_value=0)
90+
dfs.append(sumdf)
91+
return PolygenicScore(sampleset=self.sampleset, df=pd.concat(dfs, axis=0))
6292
else:
63-
df = reprlib.repr(None)
93+
return NotImplemented
6494

65-
return f"{type(self).__name__}(sampleset={repr(self.sampleset)}, path={repr(self.path)}, df={df})"
95+
@property
96+
def df(self):
97+
if self.path is not None:
98+
self._df = self.lazy_read()
99+
elif self._bigdf is not None:
100+
# a fake generator
101+
self._df = (x for x in [self._bigdf])
102+
return self._df
66103

67-
def read(self):
68-
"""Read a PGS file as a pandas dataframe"""
69-
if self.df is None:
70-
df = (
71-
pd.read_table(self.path)
72-
.assign(sampleset=self.sampleset)
73-
.set_index(["sampleset", "#IID"])
74-
)
104+
def lazy_read(self):
105+
"""Lazily read a PGS in chunks"""
106+
if self.path is None:
107+
raise ValueError("Missing path")
108+
109+
for chunk in pd.read_csv(self.path, chunksize=self._chunksize, sep="\t"):
110+
df = chunk.assign(sampleset=self.sampleset).set_index(["sampleset", "#IID"])
75111

76112
df.index.names = ["sampleset", "IID"]
77113
df = df[_select_agg_cols(df.columns)]
78-
self.df = df
79-
return self.df
114+
yield df
80115

81-
def average(self):
82-
avgs = self.df.loc[:, self.df.columns.str.endswith("_SUM")].divide(
83-
self.df["DENOM"], axis=0
116+
def read(self):
117+
"""Eagerly load a PGS into a pandas dataframe"""
118+
if self.path is None:
119+
raise ValueError("Missing path")
120+
121+
df = (
122+
pd.read_csv(self.path, sep="\t")
123+
.assign(sampleset=self.sampleset)
124+
.set_index(["sampleset", "#IID"])
84125
)
85-
avgs.columns = avgs.columns.str.replace("_SUM", "_AVG")
86-
self.df = pd.concat([self.df, avgs], axis=1)
87-
return self.df
126+
127+
df.index.names = ["sampleset", "IID"]
128+
df = df[_select_agg_cols(df.columns)]
129+
return df
88130

89131
def write(self, outdir, split=False):
90-
"""Write a PGS to a compressed TSV"""
132+
"""Write PGS to a compressed TSV"""
91133
outdir = pathlib.Path(outdir)
92-
if split:
93-
for sampleset, group in self.df.groupby("sampleset"):
94-
fout = outdir / f"{sampleset}_pgs.txt.gz"
95-
group.to_csv(fout, sep="\t", compression="gzip")
96-
else:
97-
fout = outdir / "aggregated_scores.txt.gz"
98-
self.df.to_csv(fout, sep="\t", compression="gzip")
134+
for chunk in self:
135+
if split:
136+
for sampleset, group in chunk.groupby("sampleset"):
137+
fout = outdir / f"{sampleset}_pgs.txt.gz"
138+
chunk.to_csv(fout, sep="\t", compression="gzip", mode="a")
139+
else:
140+
fout = outdir / "aggregated_scores.txt.gz"
141+
chunk.to_csv(fout, sep="\t", compression="gzip", mode="a")
99142

100-
def __add__(self, other):
101-
if isinstance(other, PolygenicScore):
102-
if self.sampleset != other.sampleset:
103-
raise ValueError("Can't add PolygenicScore with different samplesets")
104-
105-
df1 = self.read()
106-
df2 = other.read()
107-
sumdf = df1.add(df2, fill_value=0)
108-
return PolygenicScore(sampleset=self.sampleset, df=sumdf)
109-
else:
110-
return NotImplemented
143+
def average(self):
144+
"""Recalculate average.
145+
146+
This is an eager operation, and immediately returns a dataframe
147+
"""
148+
chunk_list = []
149+
for chunk in self:
150+
avgs = chunk.loc[:, chunk.columns.str.endswith("_SUM")].divide(
151+
chunk["DENOM"], axis=0
152+
)
153+
avgs.columns = avgs.columns.str.replace("_SUM", "_AVG")
154+
chunk_list.append(pd.concat([chunk, avgs], axis=1))
155+
return pd.concat(chunk_list, axis=0)
111156

112157

113158
def _select_agg_cols(cols):

0 commit comments

Comments
 (0)