Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pgscatalog-match v0.3.0 #36

Merged
merged 15 commits into from
Aug 2, 2024
9 changes: 7 additions & 2 deletions .github/workflows/pytest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ jobs:
pytest:
runs-on: ubuntu-latest

permissions:
id-token: write
contents: read

steps:
- uses: actions/checkout@v4

Expand All @@ -52,8 +56,9 @@ jobs:
working-directory: ${{ inputs.package-directory }}

- name: Upload coverage reports to Codecov
uses: codecov/codecov-action@v4.0.1
uses: codecov/codecov-action@v4
with:
token: ${{ secrets.CODECOV_TOKEN }}
use_oidc: true
fail_ci_if_error: true
slug: PGScatalog/pygscatalog
flags: ${{ inputs.package-directory }}
4 changes: 2 additions & 2 deletions .github/workflows/ruff.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ jobs:
ruff:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: chartboost/ruff-action@v1
- uses: actions/checkout@v4
- uses: chartboost/ruff-action@v1
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ that has been converted to namespace packages for modularity and re-use.
## User applications

[![install with bioconda](https://img.shields.io/badge/install%20with-bioconda-brightgreen.svg?style=flat)](http://bioconda.github.io/recipes/pgscatalog-utils/README.html)
[![install with pypi](https://img.shields.io/pypi/v/pgscatalog-utils)](https://pypi.org/project/pgscatalog-utils/)

These CLI applications are used internally by the [PGS Catalog Calculator (`pgsc_calc`)](https://github.com/PGScatalog/pgsc_calc)
workflow for calculating PGS and performing common adjustments for genetic ancestry.
Expand Down
5 changes: 2 additions & 3 deletions codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ flag_management:
carryforward: true
statuses:
- type: project
target: auto
threshold: 1%
target: 80%
- type: patch
target: 90%
target: 80%
individual_flags: # exceptions to the default rules above, stated flag by flag
- name: pgscatalog.core #fill in your own flag name
paths:
Expand Down
2 changes: 1 addition & 1 deletion pgscatalog.match/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "pgscatalog.match"
version = "0.2.3"
version = "0.3.0"
description = "Tools for matching variants in PGS scoring files and target variant information files"
authors = ["Benjamin Wingfield <bwingfield@ebi.ac.uk>", "Samuel Lambert <sl925@medschl.cam.ac.uk>", "Laurent Gil <lg10@sanger.ac.uk>"]
readme = "README.md"
Expand Down
12 changes: 10 additions & 2 deletions pgscatalog.match/src/pgscatalog/match/cli/intersect_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def run_intersect():
([key, v["ID"], v["REF"]], [IS_INDEL, STRANDAMB, IS_MA_REF])
)

if count_var_r % 500000 == 0:
if count_var_r % args.batch_size == 0:
heapq.heapify(ref_heap)
tmppath = tempfile.NamedTemporaryFile(dir=tmpdir, delete=False)
with open(tmppath.name, "wt") as outf:
Expand Down Expand Up @@ -134,7 +134,7 @@ def run_intersect():
)
)

if count_var_t % 500000 == 0:
if count_var_t % args.batch_size == 0:
heapq.heapify(target_heap)
tmppath = tempfile.NamedTemporaryFile(dir=tmpdir, delete=False)
with open(tmppath.name, "wt") as outf:
Expand Down Expand Up @@ -397,6 +397,14 @@ def parse_args(args=None):
parser.add_argument(
"--outdir", dest="outdir", required=True, help="<Required> Output directory"
)
parser.add_argument(
"--batch_size",
dest="batch_size",
type=int,
default=500000,
required=False,
help="<Optional> Number of variants processed per batch",
)
return parser.parse_args(args)


Expand Down
13 changes: 10 additions & 3 deletions pgscatalog.match/src/pgscatalog/match/lib/_match/label.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,13 @@ def _label_duplicate_id(df: pl.LazyFrame, keep_first_match: bool) -> pl.LazyFram


def _label_biallelic_ambiguous(df: pl.LazyFrame, remove_ambiguous) -> pl.LazyFrame:
"""
Identify ambiguous variants (A/T & C/G SNPs)

Often scores and genotypes will be on different builds and including these SNPs
would involve tallying incorrect dosages if there has been a strand-flip across builds.
Even on the same build you can get improperly strand-oriented data.
"""
logger.debug("Labelling ambiguous variants")
ambig = (
df.with_columns(
Expand Down Expand Up @@ -259,7 +266,7 @@ def _label_biallelic_ambiguous(df: pl.LazyFrame, remove_ambiguous) -> pl.LazyFra
)
else:
return (
ambig.with_column(pl.lit(False).alias("exclude_ambiguous"))
ambig.with_columns(pl.lit(False).alias("exclude_ambiguous"))
.with_columns(max=pl.max_horizontal("exclude", "exclude_ambiguous"))
.drop(["exclude", "exclude_ambiguous"])
.rename({"max": "exclude"})
Expand Down Expand Up @@ -301,7 +308,7 @@ def _label_flips(df: pl.LazyFrame, skip_flip: bool) -> pl.LazyFrame:
)
else:
logger.debug("Not excluding flipped matches")
return df.with_columns(match_IDs=pl.lit("NA"))
return df


def _label_filter(df: pl.LazyFrame, filter_IDs: pathlib.Path) -> pl.LazyFrame:
Expand All @@ -323,4 +330,4 @@ def _label_filter(df: pl.LazyFrame, filter_IDs: pathlib.Path) -> pl.LazyFrame:
)
else:
logger.info("--filter_IDs not set, skipping filtering")
return df
return df.with_columns(match_IDs=pl.lit("NA"))
105 changes: 70 additions & 35 deletions pgscatalog.match/src/pgscatalog/match/lib/_match/plink.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,43 @@
Variants with the same ID and different effect alleles must be split into different files

Input df must contain match candidates filtered to best matched variant

Some fake match data where effect_allele is the same but other_allele is different:

>>> data = {'row_nr': [1, 2, 3], 'chr_name': ['11', '11', '11'], 'chr_position': [69331418, 69331418, 69331419], 'effect_allele': ['A', 'C', 'T'], 'other_allele': ['C', 'C', 'T'], 'effect_weight': ['1', '1.5', '2'], 'effect_type': ['additive', 'additive', 'additive'], 'accession': ['pgs1', 'pgs2', 'pgs3'],'ID': ['11:69331418:A:C', '11:69331418:A:C', '11:69331419:T:T'],'matched_effect_allele': ['A', 'C', 'T']}
>>> plinked = plinkify(pl.DataFrame(data).lazy())
>>> dfs = sorted((x.collect() for x in plinked[EffectType.ADDITIVE]), key= lambda x: len(x))
>>> assert len(dfs) == 2
>>> dfs[0].select(["row_nr", "accession", "ID", "matched_effect_allele"])
shape: (1, 4)
┌────────┬───────────┬─────────────────┬───────────────────────┐
│ row_nr ┆ accession ┆ ID ┆ matched_effect_allele │
│ --- ┆ --- ┆ --- ┆ --- │
│ i64 ┆ str ┆ str ┆ str │
╞════════╪═══════════╪═════════════════╪═══════════════════════╡
│ 2 ┆ pgs2 ┆ 11:69331418:A:C ┆ C │
└────────┴───────────┴─────────────────┴───────────────────────┘

>>> dfs[1].select(["row_nr", "accession", "ID", "matched_effect_allele"])
shape: (2, 4)
┌────────┬───────────┬─────────────────┬───────────────────────┐
│ row_nr ┆ accession ┆ ID ┆ matched_effect_allele │
│ --- ┆ --- ┆ --- ┆ --- │
│ i64 ┆ str ┆ str ┆ str │
╞════════╪═══════════╪═════════════════╪═══════════════════════╡
│ 1 ┆ pgs1 ┆ 11:69331418:A:C ┆ A │
│ 3 ┆ pgs3 ┆ 11:69331419:T:T ┆ T │
└────────┴───────────┴─────────────────┴───────────────────────┘

When merging a lot of scoring files, sometimes a variant might be duplicated
this can happen when the matched effect allele differs at the same position, e.g.:
- chr1: chr2:20003:A:C A 0.3 NA
- chr1: chr2:20003:A:C C NA 0.7
where the last two columns represent different scores. plink demands
unique identifiers! so need to split, score, and sum later.
"""
min_cols = [
"row_nr",
"accession",
"effect_type",
"chr_name",
Expand All @@ -36,7 +71,7 @@
deduped = {}
for effect_type, effect_frame in effect_frames:
deduped.setdefault(effect_type, [])
deduped[effect_type].append(effect_frame)
deduped[effect_type] = _deduplicate_variants(effect_frame)

return deduped

Expand Down Expand Up @@ -94,7 +129,7 @@
return additive, dominant, recessive


def _deduplicate_variants(df: pl.LazyFrame) -> list[pl.LazyFrame | None]:
def _deduplicate_variants(df: pl.LazyFrame) -> list[pl.LazyFrame]:
"""Find variant matches that have duplicate identifiers
When merging a lot of scoring files, sometimes a variant might be duplicated
this can happen when the matched effect allele differs at the same position, e.g.:
Expand All @@ -108,44 +143,44 @@
Returns:
A list of dataframes, with unique ID - matched effect allele combinations
"""
if df.select("ID").head().collect().is_empty():
if df.limit(1).collect().is_empty():
logger.info("Empty input: skipping deduplication")
return [None]
else:
logger.debug("Deduplicating variants")

# 1. unique ID - EA is important because normal duplicates are already
# handled by pivoting, and it's pointless to split them unnecessarily
# 2. use cumcount to number duplicate IDs
# 3. join cumcount data on original DF, use this data for splitting
# note: effect_allele should be equivalent to matched_effect_allele
ea_count: pl.LazyFrame = (
df.select(["ID", "matched_effect_allele"])
.unique()
.with_columns(
return [df]
logger.debug("Deduplicating variants (splitting across files)")
ea_count: pl.LazyFrame = df.unique(
["ID", "matched_effect_allele"], maintain_order=True
).with_columns([pl.col(["ID"]).cum_count().over(["ID"]).alias("cum_count")])

# now get groups for each different cumulative count (1 instance, 2, ..., n)
groups = ea_count.collect().group_by(["cum_count"], maintain_order=True)
group_dfs = (x.lazy() for _, x in groups)
ldf_lst = []

for x in group_dfs:
tempdf = df.join(
x, on=["row_nr", "ID", "matched_effect_allele"], how="inner"
).select(
[
pl.col("ID").cumcount().over(["ID"]).alias("cumcount"),
pl.col("ID").count().over(["ID"]).alias("count"),
"row_nr",
"accession",
"effect_type",
"chr_name",
"ID",
"matched_effect_allele",
"effect_weight",
]
)
)

dup_label: pl.LazyFrame = df.join(
ea_count, on=["ID", "matched_effect_allele"], how="left"
)
ldf_lst.append(tempdf)

# now split the matched variants, and make sure we don't lose any
# cumcount = ngroup-1, so add 1
n_splits: int = ea_count.select("cumcount").max().collect().item(0, 0) + 1
df_lst: list = []
# double check to help me sleep at night
original_length = df.select(pl.len()).collect().item(0, 0)
new_length = sum(x.select(pl.len()).collect().item(0, 0) for x in ldf_lst)

for i in range(0, n_splits):
x: pl.LazyFrame = dup_label.filter(pl.col("cumcount") == i).drop(
["cumcount", "count"]
if original_length != new_length:
raise ValueError(

Check warning on line 180 in pgscatalog.match/src/pgscatalog/match/lib/_match/plink.py

View check run for this annotation

Codecov / codecov/patch

pgscatalog.match/src/pgscatalog/match/lib/_match/plink.py#L180

Added line #L180 was not covered by tests
f"Some variants went missing during deduplication ({original_length} != {new_length}) "
"This should never happen"
)
if (df := x.collect()).is_empty():
continue
else:
df_lst.append(df.lazy())

return df_lst
logger.info("Dataframe lengths are OK after deduplicating :)")
return ldf_lst
5 changes: 5 additions & 0 deletions pgscatalog.match/tests/data/duplicated_scorefile.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
chr_name chr_position effect_allele other_allele effect_weight effect_type is_duplicated accession row_nr
11 69331418 C T 0.210422987 additive FALSE test 0
11 69331418 T C 0.210422987 additive FALSE test2 1
11 69379161 A C 0.018723614 additive FALSE test2 2
5 1279790 T C -0.005213567 additive FALSE test3 4
3 changes: 3 additions & 0 deletions pgscatalog.match/tests/test_intersect_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def target(request):
def test_intersect_cli(tmp_path_factory, ref, target, afreq, vmiss):
outdir = tmp_path_factory.mktemp("outdir")

# using a low batch size to test batching behaviour works as expected
args = [
(
"pgscatalog-intersect",
Expand All @@ -39,6 +40,8 @@ def test_intersect_cli(tmp_path_factory, ref, target, afreq, vmiss):
str(target),
"--outdir",
str(outdir),
"--batch_size",
"100",
)
]
flargs = list(itertools.chain(*args))
Expand Down
Loading