Skip to content

Commit d1363c7

Browse files
committed
add merge cli
1 parent 229bfe1 commit d1363c7

File tree

9 files changed

+355
-41
lines changed

9 files changed

+355
-41
lines changed

pgscatalog.matchapp/pyproject.toml

+1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ pytest = "^8.0.0"
2020

2121
[tool.poetry.scripts]
2222
pgscatalog-match = 'pgscatalog.matchapp.match_cli:run_match'
23+
pgscatalog-matchmerge = 'pgscatalog.matchapp.merge_cli:run_merge'
2324

2425
[build-system]
2526
requires = ["poetry-core"]
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
from .match_cli import run_match
2+
from .merge_cli import run_merge
23

3-
__all__ = ["run_match"]
4+
__all__ = ["run_match", "run_merge"]

pgscatalog.matchapp/src/pgscatalog/matchapp/_config.py

+1
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,4 @@ class Config:
2828
"remove_multiallelic",
2929
]
3030
MATCH_PARAMS = None
31+
MIN_OVERLAP = 0.75
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
""" This module contains internal functions for writing match results to scoring files and logs
2+
3+
It expects Config class attributes to be set up before being called
4+
"""
5+
import gzip
6+
7+
from ._config import Config
8+
9+
10+
def write_matches(matchresults, score_df):
11+
"""Write matchresults out to scoring files and logs"""
12+
match (Config.SPLIT, Config.COMBINED):
13+
case (True, True):
14+
# requires extra work: first write split, then re-combine without recomputing matches
15+
raise NotImplementedError
16+
case (True, False) | (False, True):
17+
# split parameter can handle this case OK
18+
matchresults.write_scorefiles(
19+
directory=Config.OUTDIR,
20+
split=Config.SPLIT,
21+
score_df=score_df,
22+
min_overlap=Config.MIN_OVERLAP,
23+
**Config.MATCH_PARAMS,
24+
)
25+
case _:
26+
raise ValueError
27+
28+
write_log(matchresults=matchresults, score_df=score_df)
29+
# returns labelled and filtered data for checking after merging
30+
return matchresults.df
31+
32+
33+
def write_log(matchresults, score_df):
34+
logfname = Config.OUTDIR / f"{Config.DATASET}_log.csv.gz"
35+
36+
# summary log is smol
37+
matchresults.summary_log.write_csv(Config.OUTDIR / f"{Config.DATASET}_summary.csv")
38+
39+
# this one can get big. gzip is slow, but everywhere
40+
with gzip.open(logfname, "wb") as f:
41+
matchresults.full_variant_log(score_df=score_df).collect().write_csv(f)

pgscatalog.matchapp/src/pgscatalog/matchapp/combine_cli.py

Whitespace-only changes.

pgscatalog.matchapp/src/pgscatalog/matchapp/match_cli.py

+9-40
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import argparse
22
import atexit
3-
import gzip
43
import logging
54
import pathlib
65
import sys
@@ -18,6 +17,7 @@
1817
)
1918

2019
from ._config import Config
20+
from ._write import write_matches
2121

2222
logger = logging.getLogger(__name__)
2323

@@ -53,6 +53,12 @@ def run_match():
5353
Config.SPLIT = args.split
5454
Config.COMBINED = args.combined
5555

56+
# parameters that control how the best match candidate is chosen
57+
# missing parameters will be set to defaults specified in matchlib
58+
Config.MATCH_PARAMS = {
59+
k: v for k in Config.MATCH_KWARGS if (v := getattr(args, k)) is not None
60+
}
61+
5662
# start doing the work
5763
with ScoringFileFrame(
5864
path=Config.SCOREFILE,
@@ -83,46 +89,9 @@ def run_match():
8389
f"You'll need to combine match candidates using the Arrow IPC files in {Config.OUTDIR}"
8490
)
8591
sys.exit()
86-
87-
matchresults = MatchResults(*matchresults)
88-
89-
# parameters that control how the best match candidate is chosen
90-
# missing parameters will be set to defaults specified in matchlib
91-
Config.MATCH_PARAMS = {
92-
k: v for k in Config.MATCH_KWARGS if (v := getattr(args, k)) is not None
93-
}
94-
95-
match (Config.SPLIT, Config.COMBINED):
96-
case (True, True):
97-
# requires extra work: first write split, then re-combine without recomputing matches
98-
raise NotImplementedError
99-
case (True, False) | (False, True):
100-
# split parameter can handle this case OK
101-
matchresults.write_scorefiles(
102-
directory=Config.OUTDIR,
103-
split=Config.SPLIT,
104-
score_df=score_df,
105-
min_overlap=args.min_overlap,
106-
**Config.MATCH_PARAMS,
107-
)
108-
case _:
109-
raise ValueError
110-
111-
# write logs:
112-
if Config.CHROM is None:
113-
logfname = Config.OUTDIR / f"{Config.DATASET}_log.csv.gz"
11492
else:
115-
logfname = (
116-
Config.OUTDIR / f"{Config.DATASET}_chrom{Config.CHROM}_log.csv.gz"
117-
)
118-
# summary log is smol
119-
matchresults.summary_log.write_csv(
120-
Config.OUTDIR / f"{Config.DATASET}_summary.csv"
121-
)
122-
123-
# this one can get big. gzip is slow, but everywhere
124-
with gzip.open(logfname, "wb") as f:
125-
matchresults.full_variant_log(score_df=score_df).collect().write_csv(f)
93+
matchresults = MatchResults(*matchresults)
94+
_ = write_matches(matchresults=matchresults, score_df=score_df)
12695

12796

12897
def get_match_candidates(target, score_df, chrom, dataset, **kwargs):
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,188 @@
1+
import argparse
2+
import logging
3+
import pathlib
4+
5+
from ._config import Config
6+
from ._write import write_matches
7+
8+
from pgscatalog.matchlib import ScoringFileFrame, MatchResult, MatchResults
9+
10+
import polars as pl
11+
12+
logger = logging.getLogger(__name__)
13+
14+
15+
def run_merge():
16+
args = parse_args()
17+
18+
Config.DATASET = args.dataset
19+
Config.CLEANUP = False
20+
Config.OUTDIR = pathlib.Path(args.outdir)
21+
Config.SCOREFILE = pathlib.Path(args.scorefile)
22+
23+
if not Config.OUTDIR.exists():
24+
raise FileNotFoundError(f"{Config.OUTDIR} does not exist")
25+
26+
Config.TMPDIR = Config.OUTDIR / "tmp"
27+
Config.TMPDIR.mkdir(exist_ok=False)
28+
29+
# parameters that control how the best match candidate is chosen
30+
# missing parameters will be set to defaults specified in matchlib
31+
Config.MATCH_PARAMS = {
32+
k: v for k in Config.MATCH_KWARGS if (v := getattr(args, k)) is not None
33+
}
34+
Config.MIN_OVERLAP = args.min_overlap
35+
Config.SPLIT = args.split
36+
Config.COMBINED = args.combined
37+
38+
with ScoringFileFrame(
39+
path=Config.SCOREFILE,
40+
chrom=None, # when merging, scoring files can't be filtered
41+
cleanup=Config.CLEANUP,
42+
tmpdir=Config.TMPDIR,
43+
) as score_df, pl.StringCache():
44+
matchresults = MatchResults(
45+
*(MatchResult.from_ipc(x, dataset=Config.DATASET) for x in args.matches)
46+
)
47+
matchdf = write_matches(matchresults=matchresults, score_df=score_df)
48+
_check_duplicate_vars(matchdf)
49+
50+
51+
def _check_duplicate_vars(matches):
52+
max_occurrence = (
53+
matches.filter(pl.col("match_status") == "matched")
54+
.group_by(["accession", "ID"])
55+
.len()
56+
.select("len")
57+
.max()
58+
.collect()
59+
.item(0, 0)
60+
)
61+
62+
match n := max_occurrence:
63+
case None:
64+
logger.critical("No variant matches found")
65+
logger.critical(
66+
"Did you set the correct genome build? Did you impute your genomes?"
67+
)
68+
raise ValueError
69+
case _ if n > 1:
70+
logger.critical("Duplicate IDs in final matches")
71+
logger.critical(
72+
"Please double check your genomes for duplicates and try again"
73+
)
74+
raise ValueError
75+
case _:
76+
logger.info("Scoring files are valid (no duplicate variants found)")
77+
78+
79+
def parse_args(args=None):
80+
parser = argparse.ArgumentParser()
81+
parser.add_argument(
82+
"-d",
83+
"--dataset",
84+
dest="dataset",
85+
required=True,
86+
help="<Required> Label for target genomic dataset",
87+
)
88+
parser.add_argument(
89+
"-s",
90+
"--scorefile",
91+
dest="scorefile",
92+
required=True,
93+
help="<Required> Path to scorefile",
94+
)
95+
parser.add_argument(
96+
"-m",
97+
"--matches",
98+
dest="matches",
99+
required=True,
100+
nargs="+",
101+
help="<Required> List of match files",
102+
)
103+
parser.add_argument(
104+
"--min_overlap",
105+
dest="min_overlap",
106+
required=True,
107+
type=float,
108+
default=0.75,
109+
help="<Required> Minimum proportion of variants to match before error",
110+
)
111+
parser.add_argument(
112+
"-IDs",
113+
"--filter_IDs",
114+
dest="filter_IDs",
115+
help="<Optional> Path to file containing list of variant IDs that can be included in the final scorefile."
116+
"[useful for limiting scoring files to variants present in multiple datasets]",
117+
)
118+
parser.add_argument(
119+
"--outdir", dest="outdir", required=True, help="<Required> Output directory"
120+
)
121+
parser.add_argument(
122+
"--split",
123+
dest="split",
124+
default=False,
125+
action="store_true",
126+
help="<Optional> Write scorefiles split per chromosome?",
127+
)
128+
parser.add_argument(
129+
"--combined",
130+
dest="combined",
131+
default=False,
132+
action="store_true",
133+
help="<Optional> Write scorefiles in combined format?",
134+
)
135+
parser.add_argument(
136+
"-v",
137+
"--verbose",
138+
dest="verbose",
139+
action="store_true",
140+
help="<Optional> Extra logging information",
141+
)
142+
# variant matching arguments -------------------------------------------------------
143+
parser.add_argument(
144+
"--keep_ambiguous",
145+
dest="remove_ambiguous",
146+
action="store_false",
147+
help="""<Optional> Flag to force the program to keep variants with
148+
ambiguous alleles, (e.g. A/T and G/C SNPs), which are normally
149+
excluded (default: false). In this case the program proceeds
150+
assuming that the genotype data is on the same strand as the
151+
GWAS whose summary statistics were used to construct the score.
152+
""",
153+
)
154+
parser.add_argument(
155+
"--keep_multiallelic",
156+
dest="remove_multiallelic",
157+
action="store_false",
158+
help="<Optional> Flag to allow matching to multiallelic variants (default: false).",
159+
)
160+
parser.add_argument(
161+
"--ignore_strand_flips",
162+
dest="skip_flip",
163+
action="store_true",
164+
help="""<Optional> Flag to not consider matched variants that may be reported
165+
on the opposite strand. Default behaviour is to flip/complement unmatched variants and check if
166+
they match.""",
167+
)
168+
parser.add_argument(
169+
"--keep_first_match",
170+
dest="keep_first_match",
171+
action="store_true",
172+
help="""<Optional> If multiple match candidates for a variant exist that can't be prioritised,
173+
keep the first match candidate (default: drop all candidates)""",
174+
)
175+
176+
return _check_args(parser.parse_args(args))
177+
178+
179+
def _check_args(args):
180+
if args.combined is False and args.split is False:
181+
logger.warning("No output format specified, writing to combined scoring file")
182+
args.combined = True
183+
184+
return args
185+
186+
187+
if __name__ == "__main__":
188+
run_merge()

pgscatalog.matchapp/tests/match.ipc

28.4 KB
Binary file not shown.

0 commit comments

Comments
 (0)