Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix problems with ancestry aggregation & scaling on biobank-data #19

Merged
merged 13 commits into from
Jun 12, 2024
Merged
19 changes: 10 additions & 9 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,20 @@ WORKDIR /app

RUN pip install poetry

COPY . .
COPY pgscatalog.core /app/pgscatalog.core

COPY pgscatalog.calc /app/pgscatalog.calc

COPY pgscatalog.match /app/pgscatalog.match

COPY pgscatalog.utils /app/pgscatalog.utils

WORKDIR /app/pgscatalog.utils

RUN poetry install --no-root && rm -rf $POETRY_CACHE_DIR

FROM python:3.11-slim-bullseye

ENV VIRTUAL_ENV=/app/pgscatalog.utils/.venv \
PATH="/app/pgscatalog.utils/.venv/bin:$PATH"

COPY --from=builder ${VIRTUAL_ENV} ${VIRTUAL_ENV}

RUN apt-get update && apt-get install -y procps && rm -rf /var/lib/apt/lists/*

ENV PATH="/venv/bin:${PATH}"
ENV PATH="/app/pgscatalog.utils/.venv/bin:$PATH"


495 changes: 257 additions & 238 deletions pgscatalog.calc/poetry.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions pgscatalog.calc/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "pgscatalog.calc"
version = "0.1.1"
version = "0.2.0"
description = "Libraries and applications for working with calculated polygenic scores"
authors = ["Benjamin Wingfield <bwingfield@ebi.ac.uk>", "Samuel Lambert <sl925@medschl.cam.ac.uk>", "Laurent Gil <lg10@sanger.ac.uk>"]
readme = "README.md"
Expand All @@ -10,7 +10,7 @@ packages = [

[tool.poetry.dependencies]
python = "^3.11"
"pgscatalog.core" = "^0.1.0"
"pgscatalog.core" = {path = "../pgscatalog.core", develop = true}
numpy = "^1.26.4"
pandas = "^2.2.0"
pyarrow = "^15.0.0"
Expand Down
2 changes: 1 addition & 1 deletion pgscatalog.calc/src/pgscatalog/calc/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@
"AdjustResults",
]

__version__ = "0.1.1"
__version__ = "0.2.0"
4 changes: 1 addition & 3 deletions pgscatalog.calc/src/pgscatalog/calc/cli/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1 @@
from .aggregate_cli import run_aggregate

__all__ = ["run_aggregate"]
__all__ = []
50 changes: 43 additions & 7 deletions pgscatalog.calc/src/pgscatalog/calc/cli/aggregate_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
import logging
import pathlib
import textwrap
import operator
import functools
from collections import deque
from typing import Optional

from ..lib.polygenicscore import PolygenicScore
from ..lib import PolygenicScore
from pgscatalog.core import chrom_keyfunc

logger = logging.getLogger(__name__)

Expand All @@ -21,15 +22,50 @@ def run_aggregate():

if args.verbose:
logger.setLevel(logging.INFO)
logging.getLogger("pgscatalog.core").setLevel(logging.INFO)
logging.getLogger("pgscatalog.calc").setLevel(logging.INFO)

if not (outdir := pathlib.Path(args.outdir)).exists():
raise FileNotFoundError(f"--outdir {outdir.name} doesn't exist")

score_paths = [pathlib.Path(x) for x in args.scores]
pgs = [PolygenicScore(path=x) for x in score_paths]
# call __add__ a lot
aggregated = functools.reduce(operator.add, pgs)
score_paths = sorted([pathlib.Path(x) for x in args.scores], key=chrom_keyfunc())
# dfs are only read into memory after accessing them explicitly e.g. pgs[0].df
pgs = deque(PolygenicScore(path=x) for x in score_paths)

observed_columns = set()
aggregated: Optional[PolygenicScore] = None

# first, use PolygenicScore's __add__ method, which implements df.add(fill_value=0)
while pgs:
# popleft ensures that dfs are removed from memory after each aggregation
score: PolygenicScore = pgs.popleft()
if aggregated is None:
logger.info(f"Initialising aggregation with {score}")
aggregated: PolygenicScore = score
else:
logger.info(f"Adding {score}")
aggregated += score
observed_columns.update(set(score.df.columns))

# check to make sure that every column we saw in the dataframes is in the output
if (dfcols := set(aggregated.df.columns)) != observed_columns:
raise ValueError(
f"Missing columns in aggregated file!. "
f"Observed: {observed_columns}. "
f"In aggregated: {dfcols}"
)
else:
logger.info("Aggregated columns match observed columns")

# next, melt the plink2 scoring files from wide (many columns) format to long format
aggregated.melt()

# recalculate PGS average using aggregated SUM and DENOM
aggregated.average()

logger.info("Aggregation finished! Writing to a file")
aggregated.write(outdir=args.outdir, split=args.split)
logger.info("all done. bye :)")


def _description_text() -> str:
Expand Down
2 changes: 2 additions & 0 deletions pgscatalog.calc/src/pgscatalog/calc/cli/ancestry_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ def run_ancestry():

if args.verbose:
logger.setLevel(logging.INFO)
logging.getLogger("pgscatalog.core").setLevel(logging.INFO)
logging.getLogger("pgscatalog.calc").setLevel(logging.INFO)
logger.info("Starting ancestry adjustment")
logger.info("Verbose mode enabled")

Expand Down
6 changes: 3 additions & 3 deletions pgscatalog.calc/src/pgscatalog/calc/lib/_ancestry/read.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ def read_pgs(loc_aggscore):
index_col=["sampleset", "IID"],
converters={"IID": str},
header=0,
).pivot(columns=["PGS"], values=["SUM", "AVG"])
# join column levels ({PGS}_{VALUE})
df.columns = [f"{j}_{i}" for i, j in df.columns]
).pivot(columns=["PGS"], values=["SUM"])
# rename to PGS only
df.columns = [f"{j}" for i, j in df.columns]

return df
Loading
Loading