|
| 1 | +import csv |
| 2 | +import functools |
| 3 | +import gzip |
| 4 | +import itertools |
| 5 | +import logging |
| 6 | +import operator |
| 7 | +import pathlib |
| 8 | +from collections import namedtuple |
| 9 | + |
| 10 | +from xopen import xopen |
| 11 | + |
| 12 | +logger = logging.getLogger(__name__) |
| 13 | + |
| 14 | + |
| 15 | +def _read_map(in_map, col_from, col_to): |
| 16 | + """Read a column from a mapping file into a dictionary""" |
| 17 | + mapping = {} |
| 18 | + logger.debug(f"Reading map {in_map}") |
| 19 | + with xopen(in_map) as map_f: |
| 20 | + reader = csv.DictReader(map_f, delimiter=" ") |
| 21 | + for line in reader: |
| 22 | + mapping[line[col_from]] = line[col_to] |
| 23 | + |
| 24 | + return mapping |
| 25 | + |
| 26 | + |
| 27 | +def _read_maps(map_list, col_from, col_to): |
| 28 | + """Read mapping files and return one big dictionary""" |
| 29 | + maps = (_read_map(in_map=x, col_from=col_from, col_to=col_to) for x in map_list) |
| 30 | + return functools.reduce(operator.ior, maps) |
| 31 | + |
| 32 | + |
| 33 | +def _get_outf_path(chrom, dataset): |
| 34 | + return pathlib.Path(f"{dataset}_{chrom}_relabelled.gz") |
| 35 | + |
| 36 | + |
| 37 | +def _grab_header(in_target, comment_char): |
| 38 | + """Grab header from a TSV file which contains comment lines that are skipped""" |
| 39 | + for line in in_target: |
| 40 | + if line.startswith(comment_char): |
| 41 | + continue |
| 42 | + return line.strip().split() |
| 43 | + |
| 44 | + |
| 45 | +def _relabel(in_path, mapping, target_col, comment_char): |
| 46 | + """Revalue a target column in a file based on the mapping dictionary""" |
| 47 | + with xopen(in_path) as in_f: |
| 48 | + h = _grab_header(in_f, comment_char) |
| 49 | + reader = csv.DictReader(in_f, delimiter="\t", fieldnames=h) |
| 50 | + for line in reader: |
| 51 | + line[target_col] = mapping[line[target_col]] |
| 52 | + yield line |
| 53 | + |
| 54 | + |
| 55 | +def _parse_chrom(x): |
| 56 | + """Grab chromosome from the ID key of a dictionary |
| 57 | +
|
| 58 | + (Some file types may not contain a CHROM column)""" |
| 59 | + return x["ID"].split(":")[0] |
| 60 | + |
| 61 | + |
| 62 | +def relabel_write(relabelled, dataset, split_output, combined_output, out_dir): |
| 63 | + """Write relabelled data to a new file, optionally splitting by chromosome |
| 64 | +
|
| 65 | + >>> from ._config import Config |
| 66 | + >>> import tempfile, os |
| 67 | + >>> maps = [Config.ROOT_DIR / "tests" / "relabel_map_chr1.txt.gz", Config.ROOT_DIR / "tests" / "relabel_map_chr8.txt.gz"] |
| 68 | + >>> in_target = Config.ROOT_DIR / "tests" / "hgdp_ALL_additive_0.scorefile" |
| 69 | + >>> args = RelabelArgs() |
| 70 | + >>> x = relabel(in_path=in_target, map_paths=maps, relabel_args=args) |
| 71 | + >>> with tempfile.TemporaryDirectory() as tmp_dir: |
| 72 | + ... relabel_write(x, dataset="test", split_output=True, combined_output=True, out_dir=tmp_dir) |
| 73 | + ... os.listdir(tmp_dir) |
| 74 | + ['test_ALL_relabelled.gz', 'test_1_relabelled.gz', 'test_8_relabelled.gz'] |
| 75 | + """ |
| 76 | + if combined_output: |
| 77 | + combined_output = out_dir / pathlib.Path( |
| 78 | + _get_outf_path(chrom="ALL", dataset=dataset) |
| 79 | + ) |
| 80 | + if combined_output.exists(): |
| 81 | + raise FileExistsError(f"{combined_output}") |
| 82 | + |
| 83 | + for i, (chrom, group) in enumerate(itertools.groupby(relabelled, _parse_chrom)): |
| 84 | + # grab a batch of relabelled lines, grouped by chromosome |
| 85 | + batch = list(group) |
| 86 | + fieldnames = batch[0].keys() |
| 87 | + |
| 88 | + if split_output: |
| 89 | + outf_path = out_dir / _get_outf_path(chrom=chrom, dataset=dataset) |
| 90 | + logger.debug(f"Writing chrom {chrom} to {outf_path}") |
| 91 | + |
| 92 | + with gzip.open(outf_path, "wt") as csv_file: |
| 93 | + writer = csv.DictWriter(csv_file, delimiter="\t", fieldnames=fieldnames) |
| 94 | + writer.writeheader() |
| 95 | + writer.writerows(batch) |
| 96 | + |
| 97 | + if combined_output: |
| 98 | + logger.debug(f"Appending to {combined_output}") |
| 99 | + with gzip.open(combined_output, "at") as csv_file: |
| 100 | + writer = csv.DictWriter(csv_file, delimiter="\t", fieldnames=fieldnames) |
| 101 | + if i == 0: |
| 102 | + writer.writeheader() |
| 103 | + |
| 104 | + writer.writerows(batch) |
| 105 | + |
| 106 | + |
| 107 | +RelabelArgs = namedtuple( |
| 108 | + "RelabelArgs", |
| 109 | + ["comment_char", "dataset", "target_col", "map_col_from", "map_col_to"], |
| 110 | + defaults=["##", "DATASET", "ID", "ID_TARGET", "ID_REF"], |
| 111 | +) |
| 112 | + |
| 113 | + |
| 114 | +def relabel(*, in_path, map_paths, relabel_args): |
| 115 | + """Revalue a target column in a file based on mapping files |
| 116 | +
|
| 117 | + Returns a generator of relabelled rows (one row = one dictionary). |
| 118 | +
|
| 119 | + >>> from ._config import Config |
| 120 | + >>> relabel_args = RelabelArgs() |
| 121 | + >>> maps = [Config.ROOT_DIR / "tests" / "relabel_map_chr1.txt.gz", Config.ROOT_DIR / "tests" / "relabel_map_chr8.txt.gz"] |
| 122 | +
|
| 123 | + The maps show that the variant on chromosome 8 has different ID across target and reference: |
| 124 | +
|
| 125 | + >>> _read_maps(maps, col_from=relabel_args.map_col_from, col_to=relabel_args.map_col_to) |
| 126 | + {'1:11796321:G:A': '1:11796321:G:A', '8:127401060:G:T': '8:127401060:T:G'} |
| 127 | +
|
| 128 | + >>> in_target = Config.ROOT_DIR / "tests" / "hgdp_ALL_additive_0.scorefile" |
| 129 | + >>> with open(in_target) as f: |
| 130 | + ... for x in f: |
| 131 | + ... x.split() |
| 132 | + ['ID', 'effect_allele', 'PGS000802_hmPOS_GRCh38'] |
| 133 | + ['1:11796321:G:A', 'A', '0.16'] |
| 134 | + ['8:127401060:G:T', 'G', '0.217'] |
| 135 | +
|
| 136 | + The variant on chromosome 8 will get relabelled: |
| 137 | +
|
| 138 | + >>> x = relabel(in_path=in_target, map_paths=maps, relabel_args=relabel_args) |
| 139 | + >>> for row in x: |
| 140 | + ... row |
| 141 | + {'ID': '1:11796321:G:A', 'effect_allele': 'A', 'PGS000802_hmPOS_GRCh38': '0.16'} |
| 142 | + {'ID': '8:127401060:T:G', 'effect_allele': 'G', 'PGS000802_hmPOS_GRCh38': '0.217'} |
| 143 | + """ |
| 144 | + mapping = _read_maps( |
| 145 | + map_list=map_paths, |
| 146 | + col_from=relabel_args.map_col_from, |
| 147 | + col_to=relabel_args.map_col_to, |
| 148 | + ) |
| 149 | + yield from _relabel( |
| 150 | + in_path=in_path, |
| 151 | + mapping=mapping, |
| 152 | + target_col=relabel_args.target_col, |
| 153 | + comment_char=relabel_args.comment_char, |
| 154 | + ) |
0 commit comments