Skip to content

Commit 7ea3f62

Browse files
committed
add Team Convalesco submission code
0 parents  commit 7ea3f62

File tree

102 files changed

+5517
-0
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

102 files changed

+5517
-0
lines changed

README.md

+93
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
# NIH Long Covid Challenge Solution
2+
3+
This repository contains a winning submission for the NIH Long Covid Computational Challenge ([L3C](https://www.challenge.gov/?challenge=l3c)) developed by [Team Convalesco](https://www.linkedin.com/pulse/announcing-nih-long-covid-computational/). The objective of the challenge was to develop machine learning models to predict which patients are susceptible to developing PASC/Long COVID using structured medical records up to 28 days from COVID onset.
4+
5+
## Overview
6+
7+
Our solution leverages the rich clinical data available in the [N3C environment](https://ncats.nih.gov/n3c/about/data-overview) including condition occurrences, lab measurements, drug exposure, doctor notes, etc. With model generalizability and robustness in mind, we focus on creating a small number of meaningful features by curating and expanding concept sets. A key idea in feature engineering is to use the temporal information in the medical records to create features that are more predictive of Long COVID risks. The original submission consists of ~100 workflow cells operating on Spark dataframes in the N3C enclave. All the transform codes are included in this repository to be tested and run locally on synthetic data.
8+
9+
## Installation
10+
11+
1. Clone the repository:
12+
```
13+
git clone https://github.com/levinas/long-covid-prediction.git
14+
cd long-covid-prediction
15+
```
16+
17+
2. Create a virtual environment (optional):
18+
```
19+
conda create -n l3c python=3.10
20+
conda activate l3c
21+
```
22+
23+
3. Install the required packages:
24+
```
25+
pip install -r requirements.txt
26+
```
27+
28+
4. Ensure Java, a [PySpark dependency](https://spark.apache.org/docs/latest/api/python/getting_started/install.html), is installed and the JAVA_HOME environment variable is set.
29+
30+
For example, on an Ubuntu Linux machine, you can run the following command (or use other package managers such as homebrew to avoid sudo):
31+
```
32+
sudo apt-get install openjdk-17-jdk
33+
export JAVA_HOME=/usr/lib/jvm/java-17-openjdk-amd64
34+
```
35+
36+
## Running the Code on Synthetic Data
37+
38+
1. Download the synthetic data:
39+
40+
Download [synthetic_data.zip](https://www.dropbox.com/s/krrw6ydutf6j98p/synthetic_data.zip?dl=0) (1.5GB). Extract the zip file and place the folder in the root directory of the repo. Make sure the directory structure looks like `synthetic_data/training/person.csv`. A command line example to do this is:
41+
```
42+
cd long-covid-prediction
43+
wget https://www.dropbox.com/s/krrw6ydutf6j98p/synthetic_data.zip
44+
unzip synthetic_data.zip
45+
```
46+
47+
2. Run the demo script from the root directory of the repo:
48+
```
49+
./run_all.sh
50+
```
51+
This will run the entire workflow on the synthetic data. The final output will be saved as `Convalesco_predictions.csv` in the root directory of this repo; the outputs of all intermediate datasets will be saved in the `output/` folder.
52+
53+
The test run on the synthetic data could take 1-2 hours on a typical linux machine with 64 GB memory. PySpark may generate `RowBasedKeyValueBatch` warnings that could be safely avoided.
54+
55+
Th final output is a patient-level table with prediction results for the testing data with 8 columns:
56+
```python
57+
# Key columns:
58+
# person_id
59+
# outcome_likelihoods: final prediction on patient PASC probability
60+
# confidence_estimate: a proxy estimate based on patient data completeness
61+
# likelihood_3month: predicted probability of PASC within 3 months after COVID index
62+
# likelihood_6month: predicted probability of PASC within 6 months after COVID index
63+
# Additional columns:
64+
# model100_pred: prediction of Model_100 with 100 temporal features
65+
# model36_pred: prediction of Model_36, a simple model with 36 temporal features
66+
# model_z_pred: prediction of Model_Z, an aspiring "zero-bias" model
67+
```
68+
In this example, since we are using synthetic data, the predictions will not be as meaningful.
69+
70+
## Models and Features
71+
72+
We have created 4 models with different emphases, and our submission is an ensemble of the first three.
73+
74+
![Table 1](figs/table1-model-description.png)
75+
76+
The model features are grouped into seven categories, and the population-level feature utilization scores on the real data are shown in the figure below.
77+
78+
![Fig. 2](figs/fig2-feature-categories.png)
79+
80+
81+
## Documentation
82+
83+
The key components of the repository are as follows:
84+
85+
- `src/`: Contains all the source code including the ~100 transforms and global code.
86+
- `src/global_code.py`: Global python code.
87+
- `utils/execution_engine.py`: Execution engine.
88+
89+
The original submission was developed on the N3C environment in the form of a [Palantir Code Workbook](https://www.palantir.com/docs/foundry/code-workbook/overview/). We used global Python code extensively to simplify the transform codes and make the reusable blocks more readable. The execution engine is a Python module we developed after the challenge to enable local execution of the original codes with minimal modifications.
90+
91+
For more details, please refer to the [DOCUMENTATION](DOCUMENTATION.md).
92+
93+

src/Convalesco_predictions.py

+39
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
# Key columns in this submission:
2+
# person_id
3+
# outcome_likelihoods: final prediction on patient PASC probability
4+
# confidence_estimate: proxy quality estimate based on data completeness
5+
# likelihood_3month: predicted probability of PASC within 3 months after COVID index
6+
# likelihood_6month: predicted probability of PASC within 6 months after COVID index
7+
8+
# Additional columns:
9+
# model100_pred: prediction of Model_100 with 100 temporal features
10+
# model36_pred: prediction of Model_36, a simple model with 36 temporal features
11+
# model_z_pred: prediction of Model_Z, an aspiring "zero-bias" model
12+
13+
14+
import pandas as pd
15+
16+
def Convalesco_predictions(train_test_model: pd.DataFrame,
17+
person_data_completeness_test):
18+
df = spark.createDataFrame(train_test_model)
19+
20+
# add confidence estimate
21+
df_quality = person_data_completeness_test \
22+
.select('person_id', 'completeness_score') \
23+
.join(df.select('person_id'), on='person_id', how='right') \
24+
.fillna(0)
25+
26+
df = df.join(df_quality, on='person_id', how='left')
27+
28+
# round numbers for better display
29+
df = df.select('person_id',
30+
F.round(col('outcome_likelihoods'), 8).alias('outcome_likelihoods'),
31+
F.round(col('completeness_score'), 3).alias('confidence_estimate'),
32+
F.round(col('model_t_3month'), 6).alias('likelihood_3month'),
33+
F.round(col('model_t_6month'), 6).alias('likelihood_6month'),
34+
F.round(col('model100'), 6).alias('model100_pred'),
35+
F.round(col('model36'), 6).alias('model36_pred'),
36+
F.round(col('model_z'), 6).alias('model_z_pred'),
37+
)
38+
39+
return df

src/concept_bundles.py

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
def concept_bundles(raw_concept_bundles,
2+
concept_set_members,
3+
curated_bundles):
4+
df1 = raw_concept_bundles.select(
5+
'tag_name',
6+
col('concept_set_name').alias('raw_concept_set_name'),
7+
col('best_version_id'))
8+
df2 = concept_set_members.drop('concept_id', 'concept_name').distinct()
9+
df2_current = df2.where(col('is_most_recent_version') & ~col('archived'))
10+
df = df1.join(df2, df1.best_version_id == df2.codeset_id, how='left')
11+
df_outdated = df.where(~col('is_most_recent_version') | col('archived')) \
12+
.drop('is_most_recent_version', 'archived') \
13+
.select('tag_name', 'raw_concept_set_name', 'concept_set_name',
14+
col('codeset_id').alias('old_codeset_id'),
15+
col('version').alias('old_version'))
16+
df_current = df2_current.join(df.select('tag_name', 'codeset_id'),
17+
on='codeset_id')
18+
df_updated = df_outdated.join(df2_current, on='concept_set_name')
19+
cols = [
20+
'tag_name', 'codeset_id', 'concept_set_name', 'version',
21+
'is_most_recent_version', 'archived'
22+
]
23+
df = df_current.select(cols).union(df_updated.select(cols)) \
24+
.withColumnRenamed('tag_name', 'bundle_name') \
25+
.join(curated_bundles, on='bundle_name', how='left') \
26+
.orderBy('bundle_name', 'concept_set_name')
27+
return df

src/concept_sets.py

+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
# typical runtime: 20s; output shape: 10 x 4848231
2+
3+
def concept_sets(concept_set_members,
4+
concept_bundles):
5+
df = concept_set_members.where(col('is_most_recent_version') & ~col('archived'))
6+
df = df.join(concept_bundles.select('bundle_name', 'bundle_id','codeset_id'),
7+
on='codeset_id',
8+
how='left')
9+
df_count = df.groupBy('codeset_id').count().withColumnRenamed('count', 'member_count')
10+
df = df.join(df_count, on='codeset_id', how='left')
11+
df = df.orderBy(col('version').desc(), col('codeset_id'))
12+
return df

src/concept_to_feature.py

+66
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
# typical runtime: 1m; output shape: 17 x 57010
2+
3+
def concept_to_feature(event_stats, concept_sets):
4+
df_concept = event_stats # assumes columns: concept_id, concept_name
5+
df_bundle = concept_sets # assumes columns: concept_id, concept_name, codeset_id, concept_set_name, bundle_id, bundle_name
6+
7+
selected_concept_ids = get_selected_concept_ids()
8+
selected_set_names = get_selected_concept_set_names()
9+
custom_sets = create_custom_concept_sets(df_bundle, df_concept)
10+
11+
null = lit(None)
12+
13+
case1 = col('concept_id').isin(selected_concept_ids)
14+
df1 = df_concept.where(case1) \
15+
.withColumn('feature_source', lit('concept_id')) \
16+
.withColumn('feature_id', format_string('c%d', 'concept_id')) \
17+
.withColumn('feature_name', format_string('C: %s', 'concept_name'))
18+
19+
df1.count()
20+
21+
case2 = col('concept_set_name').isin(selected_set_names)
22+
df2 = df_bundle.where(case2) \
23+
.withColumn('feature_source', lit('codeset_id')) \
24+
.withColumn('feature_id', format_string('s%d', 'codeset_id')) \
25+
.withColumn('feature_name', format_string('S: %s', 'concept_set_name'))
26+
27+
df2.count()
28+
29+
case3 = col('concept_set_name').startswith('ARIScience')
30+
df3 = df_bundle.where(case3) \
31+
.withColumn('feature_source', lit('codeset_id')) \
32+
.withColumn('feature_id', format_string('s%d', 'codeset_id')) \
33+
.withColumn('feature_name', format_string('A: %s',
34+
regexp_extract('concept_set_name', 'ARIScience\s+[-–]\s+(.*?)\s*[-–]*\s*[A-Z]*$', 1)))
35+
36+
df3.count()
37+
38+
df4 = df_bundle.where(col('bundle_id').isNotNull()) \
39+
.withColumn('feature_source', lit('bundle_id')) \
40+
.withColumn('feature_id', col('bundle_id')) \
41+
.withColumn('feature_name', format_string('B: %s', 'bundle_name'))
42+
43+
df4.count()
44+
45+
df_custom_sets = custom_sets.select('concept_id', 'custom_set_id', 'custom_set_name')
46+
df5 = df_concept.join(df_custom_sets, on='concept_id') \
47+
.withColumn('feature_source', lit('custom_set_id')) \
48+
.withColumn('feature_id', col('custom_set_id')) \
49+
.withColumn('feature_name', format_string('X: %s', 'custom_set_name'))
50+
51+
df5.count()
52+
53+
dfs = [df1, df2, df3, df4, df5]
54+
cols = ['concept_id', 'feature_id', 'feature_name', 'feature_source']
55+
df_union = reduce(DataFrame.union, [d.select(cols) for d in dfs]).distinct()
56+
57+
df_union.count()
58+
59+
df = df_concept.join(df_union, on='concept_id', how='left')
60+
61+
df = move_cols_to_front(df, ['concept_id', 'concept_name', 'domain_id',
62+
'feature_id', 'feature_name', 'feature_source'])
63+
df = df.orderBy(col('cmi').desc())
64+
65+
return df
66+

src/covid_dates.py

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
def covid_dates(concept,
2+
measurement,
3+
merged_events,
4+
person_table,
5+
concept_set_members):
6+
df1 = covid_index_from_measurement(concept, measurement, person_table, concept_set_members, mark_all=True)
7+
df2 = covid_index_from_concepts(merged_events, person_table, use_custom_covid_set=True, mark_all=True)
8+
9+
df = df1.unionByName(df2, allowMissingColumns=True)
10+
df = df.select('person_id', 'date',
11+
'covid_test_positive', 'covid_concept_positive',
12+
'concept_id', 'concept_name')
13+
df = df.orderBy('person_id', 'date')
14+
15+
return df

src/covid_dates_test.py

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
def covid_dates_test(concept,
2+
measurement_test,
3+
merged_events_test,
4+
person_test,
5+
concept_set_members):
6+
df1 = covid_index_from_measurement(concept, measurement_test, person_test, concept_set_members, mark_all=True)
7+
df2 = covid_index_from_concepts(merged_events_test, person_test, use_custom_covid_set=True, mark_all=True)
8+
9+
df = df1.unionByName(df2, allowMissingColumns=True)
10+
df = df.select('person_id', 'date',
11+
'covid_test_positive', 'covid_concept_positive',
12+
'concept_id', 'concept_name')
13+
df = df.orderBy('person_id', 'date')
14+
15+
return df

src/covid_episodes.py

+16
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# typical runtime: 30s; output shape: 18 x 57672
2+
3+
def covid_episodes(covid_dates,
4+
silver):
5+
df = compute_covid_diagnostic_windows(covid_dates, silver)
6+
cols = ['person_id', 'time_to_pasc', 'covid_index',
7+
'num_covid_episodes', 'total_episode_length', 'max_episode_length',
8+
'months_from_covid_index', 'months_from_first_covid']
9+
# for i in range(1, 6):
10+
for i in range(1, 2):
11+
cols.append(f'covid_{i}_first')
12+
cols.append(f'covid_{i}_last')
13+
df = silver.select('person_id', 'time_to_pasc') \
14+
.join(df, on='person_id', how='left')
15+
df = df.select(cols).distinct().orderBy('person_id')
16+
return df

src/covid_episodes_test.py

+13
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
def covid_episodes_test(covid_dates_test,
2+
silver_test):
3+
df = compute_covid_diagnostic_windows(covid_dates_test, silver_test)
4+
cols = ['person_id', 'covid_index',
5+
'num_covid_episodes', 'total_episode_length', 'max_episode_length',
6+
'months_from_covid_index', 'months_from_first_covid']
7+
# for i in range(1, 6):
8+
for i in range(1, 2):
9+
if f'covid_{i}_first' in df.columns:
10+
cols.append(f'covid_{i}_first')
11+
cols.append(f'covid_{i}_last')
12+
df = df.select(cols).distinct().orderBy('person_id')
13+
return df

src/curated_bundles.py

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# typical runtime: 4m; output shape: 2 x 32
2+
3+
def curated_bundles():
4+
selected_bundle_dict = get_selected_bundle_dict()
5+
pandas_bundle = pd.DataFrame.from_dict(selected_bundle_dict, orient='index').reset_index()
6+
pandas_bundle.columns = ['bundle_id', 'bundle_name']
7+
return spark.createDataFrame(pandas_bundle)

src/demographics.py

+21
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# typical runtime: 10s; output shape: 17 x 57672
2+
3+
# def demographics(person_table, location):
4+
def demographics(person_table):
5+
cols = ['year_of_birth', 'gender_concept_id', 'race_concept_id', 'ethnicity_concept_id']#, 'data_partner_id']
6+
df = person_table.select('person_id', *cols)
7+
8+
# df_dp = data_partner_id_to_onehot(df)
9+
# df = df.join(df_dp, on='person_id', how='left')
10+
11+
# df_loc = location.dropDuplicates(['location_id'])
12+
# df_zip = person_table.select('person_id', 'location_id') \
13+
# .join(df_loc, on='location_id', how='left') \
14+
# .select('person_id', 'zip') \
15+
# .withColumn('zip_id', col('zip').astype(IntegerType())) \
16+
# .drop('zip')
17+
18+
# df = df.join(df_zip, on='person_id', how='left').fillna(0)
19+
df = df.fillna(0)
20+
21+
return df

src/demographics_test.py

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
# def demographics_test(person_test, location_test):
2+
def demographics_test(person_test):
3+
cols = ['year_of_birth', 'gender_concept_id', 'race_concept_id', 'ethnicity_concept_id']#, 'data_partner_id']
4+
df = person_test.select('person_id', *cols)
5+
6+
# df_dp = data_partner_id_to_onehot(df)
7+
# df = df.join(df_dp, on='person_id', how='left')
8+
9+
# df_loc = location_test.dropDuplicates(['location_id'])
10+
# df_zip = person_test.select('person_id', 'location_id') \
11+
# .join(df_loc, on='location_id', how='left') \
12+
# .select('person_id', 'zip') \
13+
# .withColumn('zip_id', col('zip').astype(IntegerType())) \
14+
# .drop('zip')
15+
16+
# df = df.join(df_zip, on='person_id', how='left').fillna(0)
17+
df = df.fillna(0)
18+
19+
return df

0 commit comments

Comments
 (0)