diff --git a/.github/workflows/pytest.yaml b/.github/workflows/pytest.yaml index 9a21fbc..2c904a1 100644 --- a/.github/workflows/pytest.yaml +++ b/.github/workflows/pytest.yaml @@ -29,6 +29,10 @@ jobs: pytest: runs-on: ubuntu-latest + permissions: + id-token: write + contents: read + steps: - uses: actions/checkout@v4 @@ -52,8 +56,9 @@ jobs: working-directory: ${{ inputs.package-directory }} - name: Upload coverage reports to Codecov - uses: codecov/codecov-action@v4.0.1 + uses: codecov/codecov-action@v4 with: - token: ${{ secrets.CODECOV_TOKEN }} + use_oidc: true + fail_ci_if_error: true slug: PGScatalog/pygscatalog flags: ${{ inputs.package-directory }} diff --git a/.github/workflows/ruff.yml b/.github/workflows/ruff.yml index 87a0780..7421606 100644 --- a/.github/workflows/ruff.yml +++ b/.github/workflows/ruff.yml @@ -11,5 +11,5 @@ jobs: ruff: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 - - uses: chartboost/ruff-action@v1 \ No newline at end of file + - uses: actions/checkout@v4 + - uses: chartboost/ruff-action@v1 diff --git a/README.md b/README.md index a6b2ba9..5bdf6c1 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,7 @@ that has been converted to namespace packages for modularity and re-use. ## User applications [![install with bioconda](https://img.shields.io/badge/install%20with-bioconda-brightgreen.svg?style=flat)](http://bioconda.github.io/recipes/pgscatalog-utils/README.html) +[![install with pypi](https://img.shields.io/pypi/v/pgscatalog-utils)](https://pypi.org/project/pgscatalog-utils/) These CLI applications are used internally by the [PGS Catalog Calculator (`pgsc_calc`)](https://github.com/PGScatalog/pgsc_calc) workflow for calculating PGS and performing common adjustments for genetic ancestry. diff --git a/codecov.yml b/codecov.yml index 085f461..90b115a 100644 --- a/codecov.yml +++ b/codecov.yml @@ -3,10 +3,9 @@ flag_management: carryforward: true statuses: - type: project - target: auto - threshold: 1% + target: 80% - type: patch - target: 90% + target: 80% individual_flags: # exceptions to the default rules above, stated flag by flag - name: pgscatalog.core #fill in your own flag name paths: diff --git a/pgscatalog.match/pyproject.toml b/pgscatalog.match/pyproject.toml index 32adfa4..54bee6b 100644 --- a/pgscatalog.match/pyproject.toml +++ b/pgscatalog.match/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "pgscatalog.match" -version = "0.2.3" +version = "0.3.0" description = "Tools for matching variants in PGS scoring files and target variant information files" authors = ["Benjamin Wingfield ", "Samuel Lambert ", "Laurent Gil "] readme = "README.md" diff --git a/pgscatalog.match/src/pgscatalog/match/cli/intersect_cli.py b/pgscatalog.match/src/pgscatalog/match/cli/intersect_cli.py index feeb5b9..56bcbc5 100644 --- a/pgscatalog.match/src/pgscatalog/match/cli/intersect_cli.py +++ b/pgscatalog.match/src/pgscatalog/match/cli/intersect_cli.py @@ -49,7 +49,7 @@ def run_intersect(): ([key, v["ID"], v["REF"]], [IS_INDEL, STRANDAMB, IS_MA_REF]) ) - if count_var_r % 500000 == 0: + if count_var_r % args.batch_size == 0: heapq.heapify(ref_heap) tmppath = tempfile.NamedTemporaryFile(dir=tmpdir, delete=False) with open(tmppath.name, "wt") as outf: @@ -134,7 +134,7 @@ def run_intersect(): ) ) - if count_var_t % 500000 == 0: + if count_var_t % args.batch_size == 0: heapq.heapify(target_heap) tmppath = tempfile.NamedTemporaryFile(dir=tmpdir, delete=False) with open(tmppath.name, "wt") as outf: @@ -397,6 +397,14 @@ def parse_args(args=None): parser.add_argument( "--outdir", dest="outdir", required=True, help=" Output directory" ) + parser.add_argument( + "--batch_size", + dest="batch_size", + type=int, + default=500000, + required=False, + help=" Number of variants processed per batch", + ) return parser.parse_args(args) diff --git a/pgscatalog.match/src/pgscatalog/match/lib/_match/label.py b/pgscatalog.match/src/pgscatalog/match/lib/_match/label.py index e7ac80c..2b048c7 100644 --- a/pgscatalog.match/src/pgscatalog/match/lib/_match/label.py +++ b/pgscatalog.match/src/pgscatalog/match/lib/_match/label.py @@ -221,6 +221,13 @@ def _label_duplicate_id(df: pl.LazyFrame, keep_first_match: bool) -> pl.LazyFram def _label_biallelic_ambiguous(df: pl.LazyFrame, remove_ambiguous) -> pl.LazyFrame: + """ + Identify ambiguous variants (A/T & C/G SNPs) + + Often scores and genotypes will be on different builds and including these SNPs + would involve tallying incorrect dosages if there has been a strand-flip across builds. + Even on the same build you can get improperly strand-oriented data. + """ logger.debug("Labelling ambiguous variants") ambig = ( df.with_columns( @@ -259,7 +266,7 @@ def _label_biallelic_ambiguous(df: pl.LazyFrame, remove_ambiguous) -> pl.LazyFra ) else: return ( - ambig.with_column(pl.lit(False).alias("exclude_ambiguous")) + ambig.with_columns(pl.lit(False).alias("exclude_ambiguous")) .with_columns(max=pl.max_horizontal("exclude", "exclude_ambiguous")) .drop(["exclude", "exclude_ambiguous"]) .rename({"max": "exclude"}) @@ -301,7 +308,7 @@ def _label_flips(df: pl.LazyFrame, skip_flip: bool) -> pl.LazyFrame: ) else: logger.debug("Not excluding flipped matches") - return df.with_columns(match_IDs=pl.lit("NA")) + return df def _label_filter(df: pl.LazyFrame, filter_IDs: pathlib.Path) -> pl.LazyFrame: @@ -323,4 +330,4 @@ def _label_filter(df: pl.LazyFrame, filter_IDs: pathlib.Path) -> pl.LazyFrame: ) else: logger.info("--filter_IDs not set, skipping filtering") - return df + return df.with_columns(match_IDs=pl.lit("NA")) diff --git a/pgscatalog.match/src/pgscatalog/match/lib/_match/plink.py b/pgscatalog.match/src/pgscatalog/match/lib/_match/plink.py index 5e05705..fd371f6 100644 --- a/pgscatalog.match/src/pgscatalog/match/lib/_match/plink.py +++ b/pgscatalog.match/src/pgscatalog/match/lib/_match/plink.py @@ -14,8 +14,43 @@ def plinkify(df): Variants with the same ID and different effect alleles must be split into different files Input df must contain match candidates filtered to best matched variant + + Some fake match data where effect_allele is the same but other_allele is different: + + >>> data = {'row_nr': [1, 2, 3], 'chr_name': ['11', '11', '11'], 'chr_position': [69331418, 69331418, 69331419], 'effect_allele': ['A', 'C', 'T'], 'other_allele': ['C', 'C', 'T'], 'effect_weight': ['1', '1.5', '2'], 'effect_type': ['additive', 'additive', 'additive'], 'accession': ['pgs1', 'pgs2', 'pgs3'],'ID': ['11:69331418:A:C', '11:69331418:A:C', '11:69331419:T:T'],'matched_effect_allele': ['A', 'C', 'T']} + >>> plinked = plinkify(pl.DataFrame(data).lazy()) + >>> dfs = sorted((x.collect() for x in plinked[EffectType.ADDITIVE]), key= lambda x: len(x)) + >>> assert len(dfs) == 2 + >>> dfs[0].select(["row_nr", "accession", "ID", "matched_effect_allele"]) + shape: (1, 4) + ┌────────┬───────────┬─────────────────┬───────────────────────┐ + │ row_nr ┆ accession ┆ ID ┆ matched_effect_allele │ + │ --- ┆ --- ┆ --- ┆ --- │ + │ i64 ┆ str ┆ str ┆ str │ + ╞════════╪═══════════╪═════════════════╪═══════════════════════╡ + │ 2 ┆ pgs2 ┆ 11:69331418:A:C ┆ C │ + └────────┴───────────┴─────────────────┴───────────────────────┘ + + >>> dfs[1].select(["row_nr", "accession", "ID", "matched_effect_allele"]) + shape: (2, 4) + ┌────────┬───────────┬─────────────────┬───────────────────────┐ + │ row_nr ┆ accession ┆ ID ┆ matched_effect_allele │ + │ --- ┆ --- ┆ --- ┆ --- │ + │ i64 ┆ str ┆ str ┆ str │ + ╞════════╪═══════════╪═════════════════╪═══════════════════════╡ + │ 1 ┆ pgs1 ┆ 11:69331418:A:C ┆ A │ + │ 3 ┆ pgs3 ┆ 11:69331419:T:T ┆ T │ + └────────┴───────────┴─────────────────┴───────────────────────┘ + + When merging a lot of scoring files, sometimes a variant might be duplicated + this can happen when the matched effect allele differs at the same position, e.g.: + - chr1: chr2:20003:A:C A 0.3 NA + - chr1: chr2:20003:A:C C NA 0.7 + where the last two columns represent different scores. plink demands + unique identifiers! so need to split, score, and sum later. """ min_cols = [ + "row_nr", "accession", "effect_type", "chr_name", @@ -36,7 +71,7 @@ def plinkify(df): deduped = {} for effect_type, effect_frame in effect_frames: deduped.setdefault(effect_type, []) - deduped[effect_type].append(effect_frame) + deduped[effect_type] = _deduplicate_variants(effect_frame) return deduped @@ -94,7 +129,7 @@ def _split_effect_type( return additive, dominant, recessive -def _deduplicate_variants(df: pl.LazyFrame) -> list[pl.LazyFrame | None]: +def _deduplicate_variants(df: pl.LazyFrame) -> list[pl.LazyFrame]: """Find variant matches that have duplicate identifiers When merging a lot of scoring files, sometimes a variant might be duplicated this can happen when the matched effect allele differs at the same position, e.g.: @@ -108,44 +143,44 @@ def _deduplicate_variants(df: pl.LazyFrame) -> list[pl.LazyFrame | None]: Returns: A list of dataframes, with unique ID - matched effect allele combinations """ - if df.select("ID").head().collect().is_empty(): + if df.limit(1).collect().is_empty(): logger.info("Empty input: skipping deduplication") - return [None] - else: - logger.debug("Deduplicating variants") - - # 1. unique ID - EA is important because normal duplicates are already - # handled by pivoting, and it's pointless to split them unnecessarily - # 2. use cumcount to number duplicate IDs - # 3. join cumcount data on original DF, use this data for splitting - # note: effect_allele should be equivalent to matched_effect_allele - ea_count: pl.LazyFrame = ( - df.select(["ID", "matched_effect_allele"]) - .unique() - .with_columns( + return [df] + logger.debug("Deduplicating variants (splitting across files)") + ea_count: pl.LazyFrame = df.unique( + ["ID", "matched_effect_allele"], maintain_order=True + ).with_columns([pl.col(["ID"]).cum_count().over(["ID"]).alias("cum_count")]) + + # now get groups for each different cumulative count (1 instance, 2, ..., n) + groups = ea_count.collect().group_by(["cum_count"], maintain_order=True) + group_dfs = (x.lazy() for _, x in groups) + ldf_lst = [] + + for x in group_dfs: + tempdf = df.join( + x, on=["row_nr", "ID", "matched_effect_allele"], how="inner" + ).select( [ - pl.col("ID").cumcount().over(["ID"]).alias("cumcount"), - pl.col("ID").count().over(["ID"]).alias("count"), + "row_nr", + "accession", + "effect_type", + "chr_name", + "ID", + "matched_effect_allele", + "effect_weight", ] ) - ) - - dup_label: pl.LazyFrame = df.join( - ea_count, on=["ID", "matched_effect_allele"], how="left" - ) + ldf_lst.append(tempdf) - # now split the matched variants, and make sure we don't lose any - # cumcount = ngroup-1, so add 1 - n_splits: int = ea_count.select("cumcount").max().collect().item(0, 0) + 1 - df_lst: list = [] + # double check to help me sleep at night + original_length = df.select(pl.len()).collect().item(0, 0) + new_length = sum(x.select(pl.len()).collect().item(0, 0) for x in ldf_lst) - for i in range(0, n_splits): - x: pl.LazyFrame = dup_label.filter(pl.col("cumcount") == i).drop( - ["cumcount", "count"] + if original_length != new_length: + raise ValueError( + f"Some variants went missing during deduplication ({original_length} != {new_length}) " + "This should never happen" ) - if (df := x.collect()).is_empty(): - continue - else: - df_lst.append(df.lazy()) - return df_lst + logger.info("Dataframe lengths are OK after deduplicating :)") + return ldf_lst diff --git a/pgscatalog.match/tests/data/duplicated_scorefile.txt b/pgscatalog.match/tests/data/duplicated_scorefile.txt new file mode 100644 index 0000000..563a965 --- /dev/null +++ b/pgscatalog.match/tests/data/duplicated_scorefile.txt @@ -0,0 +1,5 @@ +chr_name chr_position effect_allele other_allele effect_weight effect_type is_duplicated accession row_nr +11 69331418 C T 0.210422987 additive FALSE test 0 +11 69331418 T C 0.210422987 additive FALSE test2 1 +11 69379161 A C 0.018723614 additive FALSE test2 2 +5 1279790 T C -0.005213567 additive FALSE test3 4 diff --git a/pgscatalog.match/tests/test_intersect_cli.py b/pgscatalog.match/tests/test_intersect_cli.py index 9ff54fc..d03a4f8 100644 --- a/pgscatalog.match/tests/test_intersect_cli.py +++ b/pgscatalog.match/tests/test_intersect_cli.py @@ -30,6 +30,7 @@ def target(request): def test_intersect_cli(tmp_path_factory, ref, target, afreq, vmiss): outdir = tmp_path_factory.mktemp("outdir") + # using a low batch size to test batching behaviour works as expected args = [ ( "pgscatalog-intersect", @@ -39,6 +40,8 @@ def test_intersect_cli(tmp_path_factory, ref, target, afreq, vmiss): str(target), "--outdir", str(outdir), + "--batch_size", + "100", ) ] flargs = list(itertools.chain(*args)) diff --git a/pgscatalog.match/tests/test_match_cli.py b/pgscatalog.match/tests/test_match_cli.py index e057810..178b11f 100644 --- a/pgscatalog.match/tests/test_match_cli.py +++ b/pgscatalog.match/tests/test_match_cli.py @@ -1,6 +1,8 @@ import csv +import gzip import itertools import os +import pathlib from unittest.mock import patch import pytest @@ -13,6 +15,11 @@ def good_scorefile(request): return request.path.parent / "data" / "good_match_scorefile.txt" +@pytest.fixture(scope="module") +def duplicated_scorefile(request): + return request.path.parent / "data" / "duplicated_scorefile.txt" + + @pytest.fixture(scope="module") def bad_scorefile(request): return request.path.parent / "data" / "bad_match_scorefile.txt.gz" @@ -28,6 +35,107 @@ def multiallelic_variants(request): return request.path.parent / "data" / "multiallelic.pvar" +@pytest.fixture() +def filter_ids(request, tmp_path, good_scorefile): + with open(good_scorefile) as f: + scorefile = list(csv.DictReader(f, delimiter="\t")) + # build variant IDs from the scoring file + ids = [ + ":".join( + [x["chr_name"], x["chr_position"], x["effect_allele"], x["other_allele"]] + ) + for x in scorefile + ] + # grab half of them, use these to filter matches + filter_list = ids[: int(len(ids) / 2)] + outf = tmp_path / "filter_ids.txt" + with open(outf, mode="wt") as f: + [f.write(x + "\n") for x in filter_list] + + return pathlib.Path(outf) + + +def test_filter_match(tmp_path_factory, good_variants, good_scorefile, filter_ids): + outdir = tmp_path_factory.mktemp("outdir") + + args = [ + ( + "pgscatalog-match", + "-d", + "test", + "-s", + str(good_scorefile), + "-t", + str(good_variants), + "--outdir", + str(outdir), + "--min_overlap", + "0.75", + "--filter_IDs", + str(filter_ids), + ) + ] + flargs = list(itertools.chain(*args)) + + with patch("sys.argv", flargs): + run_match() + + with gzip.open( + outdir / "test_ALL_additive_0.scorefile.gz", mode="rt" + ) as out_f, open(good_scorefile) as f1, open(filter_ids) as f2: + n_out_score = len(list(csv.DictReader(out_f, delimiter="\t"))) + n_scorefile = sum(1 for _ in f1) + n_filt = sum(1 for _ in f2) + + # matched output from scorefile has been filtered + assert n_out_score < n_filt < n_scorefile + + +def test_duplicated(tmp_path_factory, good_variants, duplicated_scorefile): + """A scoring file where the same variant ID has different effect alleles from different scores + Instead of pivoting wider, these variants must be split across different files + (plink2 doesn't like duplicated variants). + """ + outdir = tmp_path_factory.mktemp("outdir") + + args = [ + ( + "pgscatalog-match", + "-d", + "test", + "-s", + str(duplicated_scorefile), + "-t", + str(good_variants), + "--outdir", + str(outdir), + "--min_overlap", + "0.75", + ) + ] + flargs = list(itertools.chain(*args)) + + with patch("sys.argv", flargs): + run_match() + + # scoring files seem to be split properly + assert (outdir / "test_ALL_additive_0.scorefile.gz").exists() + assert (outdir / "test_ALL_additive_1.scorefile.gz").exists() + + # test the split happened + with gzip.open( + outdir / "test_ALL_additive_0.scorefile.gz", mode="rt" + ) as f1, gzip.open(outdir / "test_ALL_additive_1.scorefile.gz", mode="rt") as f2: + f1variants = list(csv.DictReader(f1, delimiter="\t")) + f2variants = list(csv.DictReader(f2, delimiter="\t")) + + # variants were split correctly across the files + assert len(f1variants) == 3 and len(f2variants) == 1 + # test2 and test3 PGS have been pivoted + assert all("test2" in x and "test3" in x for x in f1variants) + assert all("test" in x for x in f2variants) + + def test_multiallelic(tmp_path_factory, multiallelic_variants, good_scorefile): outdir = tmp_path_factory.mktemp("outdir") @@ -66,8 +174,30 @@ def test_multiallelic(tmp_path_factory, multiallelic_variants, good_scorefile): assert (outdir / "test_ALL_additive_0.scorefile.gz").exists() -def test_match(tmp_path_factory, good_scorefile, good_variants): - """Test matching runs without errors with good data""" +@pytest.mark.parametrize( + "ambiguous,multiallelic,skipflip,keep_first_match", + [ + ( + "--keep_ambiguous", + "--keep_multiallelic", + "--ignore_strand_flips", + "--keep_first_match", + ), + (None, None, None, None), + ], +) +def test_match( + tmp_path_factory, + good_scorefile, + good_variants, + ambiguous, + multiallelic, + skipflip, + keep_first_match, +): + """Test matching runs without errors with good data + + Parameterised to run twice: with default CLI match args and optional matching arguments""" outdir = tmp_path_factory.mktemp("outdir") args = [ @@ -83,9 +213,14 @@ def test_match(tmp_path_factory, good_scorefile, good_variants): str(outdir), "--min_overlap", "0.75", + ambiguous, + multiallelic, + skipflip, + keep_first_match, ) ] flargs = list(itertools.chain(*args)) + flargs = [x for x in flargs if x] # drop parameterised None with patch("sys.argv", flargs): run_match()