@@ -151,18 +151,14 @@ def _deduplicate_variants(
151
151
152
152
if df_len == 0 :
153
153
logger .info (f"No variants with { effect_type = } , skipping deduplication" )
154
- return [df ]
154
+ # best to return an empty list (which will be skipped if iterated over)
155
+ return []
155
156
156
157
logger .debug (
157
158
f"Splitting { effect_type } variants with different effect alleles and the same ID"
158
159
)
159
160
160
- # make sure input is sorted by ID, _MUST_ be chr:pos:ea:oa
161
- # row_nr resets to 0 for each accession so add a temporary index (for left join)
162
- df = df .sort ("ID" ).with_row_index ()
163
-
164
161
# count the number of _unique_ matched alleles per ID (this operation drops rows)
165
- # the cumulative count will be used to group and split the dfs
166
162
counts = (
167
163
df .unique (["ID" , "matched_effect_allele" ], maintain_order = True )
168
164
.with_columns (
@@ -171,25 +167,25 @@ def _deduplicate_variants(
171
167
.over ("ID" )
172
168
.alias ("allele_cum_count" )
173
169
)
174
- .select ("index " , "allele_cum_count" )
170
+ .select ("ID" , "matched_effect_allele " , "allele_cum_count" )
175
171
)
176
172
177
- # after joining the count variants that were dropped by unique()
178
- # will have a null cumulative count
179
- # forward fill (repeats last seen value) the null values to populate groups
180
- # this is important because the dropped variants are OK to get pivoted wide later
181
- groups = (
182
- df .join (counts , on = "index" , how = "left" )
183
- .with_columns (pl .col ("allele_cum_count" ).forward_fill ().alias ("group" ))
184
- .collect ()
185
- .group_by (["group" ], maintain_order = True )
186
- )
173
+ # now calculate the number of splits required
174
+ n_splits : int = counts .select ("allele_cum_count" ).max ().collect ().item ()
175
+
176
+ # add the count data back to the original df
177
+ df = df .join (counts , on = ["ID" , "matched_effect_allele" ], how = "left" )
187
178
188
179
ldf_lst = []
189
- for group , tempdf in groups :
190
- logger .info (f"Deduplicating { group = } " )
191
- # now we have split dfs, restore a sort order that's friendly for humans
192
- tempdf = tempdf .lazy ().select (pl .exclude ("index" )).sort (["accession" , "row_nr" ])
180
+ # start iteration at index 1 (the smallest possible cumulative count)
181
+ # iteration must include max value, so + 1
182
+ for i in range (1 , n_splits + 1 ):
183
+ logger .info (f"Splitting variants into group { i } " )
184
+ tempdf = (
185
+ df .filter (pl .col ("allele_cum_count" ) == i )
186
+ .select (pl .exclude ("allele_cum_count" ))
187
+ .sort (["accession" , "row_nr" ])
188
+ )
193
189
ldf_lst .append (tempdf )
194
190
195
191
# let's double-check the number of variants before and after splitting
0 commit comments