2
2
3
3
import logging
4
4
import re
5
- from typing import Dict , List , Union
5
+ from typing import Dict , List , Union , Tuple , Optional , Any
6
6
7
7
import pandas as pd
8
8
15
15
reorder_columns ,
16
16
reorder_rows ,
17
17
split_index_column ,
18
+ filter_and_rename_columns ,
18
19
)
19
20
20
21
logger = logging .getLogger ()
@@ -113,7 +114,7 @@ def parse_annotation_data(annotation_str):
113
114
return annotation_dict
114
115
115
116
116
- def reformat_custom_df (df ) :
117
+ def reformat_custom_df (df : pd . DataFrame , cluster_df : pd . DataFrame ) -> pd . DataFrame :
117
118
"""
118
119
Reformat the custom dataframe.
119
120
"""
@@ -124,6 +125,10 @@ def reformat_custom_df(df):
124
125
125
126
df = split_index_column (df )
126
127
128
+ if not cluster_df .empty :
129
+ df = pd .merge (df , cluster_df , on = ["sample" , "cluster" ], how = "left" )
130
+ df .index = df ["index" ]
131
+
127
132
# Reorder the columns
128
133
logger .info ("Reordering columns" )
129
134
final_columns = ["index" , "sample" , "cluster" , "step" ] + [
@@ -133,27 +138,28 @@ def reformat_custom_df(df):
133
138
"mash-screen" ,
134
139
"blast" ,
135
140
"checkv" ,
136
- "QC check " ,
141
+ "cluster " ,
137
142
"quast" ,
138
143
]
139
144
for column in df .columns
140
145
if group in column
141
146
]
142
- return reorder_columns (df , final_columns )
147
+ return reorder_columns (df . dropna ( subset = [ "step" ]), list ( dict . fromkeys ( final_columns )) )
143
148
144
149
145
- def filter_constrain (df , column , value ):
150
+ def filter_constrain (dataframe , column , value ):
146
151
"""
147
152
Filter a dataframe based on a column and a regex value.
148
153
149
154
Args:
150
- df (pd.DataFrame): The dataframe to be filtered.
155
+ dataframe (pd.DataFrame): The dataframe to be filtered.
151
156
column (str): The column to filter on.
152
157
regex_value (str): The regex value to filter on.
153
158
154
159
Returns:
155
160
pd.DataFrame, pd.DataFrame: The filtered dataframe with the regex value and the filtered dataframe without the regex value.
156
161
"""
162
+ df = dataframe .copy ()
157
163
# Find rows with the regex value
158
164
locations = df [column ].str .contains (value ) | df ["step" ].str .contains ("constrain" )
159
165
@@ -188,8 +194,12 @@ def create_constrain_summary(df_constrain: pd.DataFrame, file_columns: List[Unio
188
194
else :
189
195
dic_columns [item ] = item
190
196
197
+ logger .debug ("dic_columns: %s" , dic_columns )
198
+
191
199
columns_of_interest = [dic_columns .get (key , key ) for key in CONSTRAIN_GENERAL_STATS_COLUMNS ]
192
200
201
+ logger .debug ("columns_of_interest: %s" , columns_of_interest )
202
+
193
203
if not columns_of_interest :
194
204
logger .warning ("No columns of interest were found to create the constrain summary table!" )
195
205
return pd .DataFrame ()
@@ -219,6 +229,7 @@ def create_constrain_summary(df_constrain: pd.DataFrame, file_columns: List[Unio
219
229
df_constrain = df_constrain [present_columns ]
220
230
221
231
if df_constrain .empty :
232
+ logger .warning ("The constrain DataFrame is empty." )
222
233
return df_constrain
223
234
224
235
df_constrain = df_constrain .rename (columns = COLUMN_MAPPING )
@@ -260,7 +271,8 @@ def reformat_constrain_df(df, file_columns, args):
260
271
"""
261
272
# Separate table for mapping constrains
262
273
if df .empty :
263
- return df
274
+ logger .warning ("The constrain DataFrame is empty." )
275
+ return df , df
264
276
265
277
# Add constrain metadata to the mapping constrain table
266
278
constrain_meta = filelist_to_df ([args .mapping_constrains ])
@@ -296,14 +308,12 @@ def generate_ignore_samples(dataframe: pd.DataFrame) -> pd.Series:
296
308
Generate a Series of indices that are not part of the df_snip dataframe.
297
309
298
310
Parameters:
299
- dataframe (pd.DataFrame): The input DataFrame to ocess .
311
+ dataframe (pd.DataFrame): The input DataFrame to process .
300
312
301
313
Returns:
302
314
pd.Series: A Series containing the indices that are not in df_snip.
303
315
"""
304
316
df = dataframe .copy ()
305
- df = drop_columns (df , ["index" ])
306
- df ["index" ] = df .index
307
317
df = split_index_column (df )
308
318
309
319
df = reorder_rows (df )
@@ -322,3 +332,77 @@ def add_prefix_to_values_dict(data: List[Union[str, Dict[str, str]]], prefix: st
322
332
else :
323
333
updated_items .extend ({key : f"({ prefix } ) { value } " } for key , value in item .items ())
324
334
return updated_items
335
+
336
+
337
+ def check_section_exists (module_data : Dict , section_key : str ) -> bool :
338
+ """Check if a section exists in the module data."""
339
+ return any (section_key in key for key in module_data .keys ())
340
+
341
+
342
+ def extract_mqc_from_simple_section (all_module_data : Dict , section : Optional [str ], module : str ) -> Tuple [List [pd .DataFrame ], List [Any ]]:
343
+ """Handle simple string or None section cases."""
344
+ logger .debug ("Extracting data from simple str %s" , module )
345
+ if not section :
346
+ # Return all data if no specific section is specified
347
+ return [pd .DataFrame .from_dict (all_module_data , orient = "index" )], []
348
+
349
+ # Check if specific section exists
350
+ if check_section_exists (all_module_data , section ):
351
+ return [pd .DataFrame .from_dict (all_module_data [section ], orient = "index" )], []
352
+
353
+ logger .warning (f"Section { section } not found in module { module } " )
354
+ return [pd .DataFrame ()], []
355
+
356
+
357
+ def extract_mqc_from_list_section (all_module_data : Dict , section : List , module : str ) -> Tuple [List [pd .DataFrame ], List [Any ]]:
358
+ """Handle list-based section specifications."""
359
+ logger .debug ("Extracting data from list %s : %s" , module , section )
360
+ # Case for list of column names
361
+ if all (not isinstance (item , dict ) or not isinstance (list (item .values ())[0 ], list ) for item in section ):
362
+ full_df = pd .DataFrame .from_dict (all_module_data , orient = "index" )
363
+ return [filter_and_rename_columns (full_df , section )], section
364
+
365
+ # Handle nested section lists
366
+ result_dfs = []
367
+ result_columns = []
368
+ for subsection in section :
369
+ # Handle different types of subsections
370
+ if isinstance (subsection , str ):
371
+ # Simple section name
372
+ subsection_dfs , subsection_columns = extract_mqc_from_simple_section (all_module_data , subsection , module )
373
+ if isinstance (subsection , list ):
374
+ # Simple section name
375
+ subsection_dfs , subsection_columns = extract_mqc_from_list_section (all_module_data , subsection , module )
376
+ elif isinstance (subsection , dict ):
377
+ # Dictionary-based section specification
378
+ subsection_dfs , subsection_columns = extract_mqc_from_dict_section (all_module_data , subsection , module )
379
+ else :
380
+ # Unsupported subsection type
381
+ logger .warning (f"Unsupported subsection type: { type (subsection )} " )
382
+ continue
383
+
384
+ result_dfs .extend (subsection_dfs )
385
+ result_columns .extend (subsection_columns )
386
+
387
+ return result_dfs , result_columns
388
+
389
+
390
+ def extract_mqc_from_dict_section (all_module_data : Dict , section : Dict , module : str ) -> Tuple [List [pd .DataFrame ], List [Any ]]:
391
+ """Handle dictionary-based section specifications."""
392
+ logger .debug ("Extracting data from dict %s, %s" , module , section )
393
+ # Extract section name and column specifications
394
+ section_name , columns = next (iter (section .items ()))
395
+
396
+ # Check if section exists
397
+ if check_section_exists (all_module_data , section_name ):
398
+ # Find the matching section data
399
+ section_data = next ((data for key , data in all_module_data .items () if section_name in key ), None )
400
+
401
+ if section_data :
402
+ # Convert to DataFrame and filter columns
403
+ data = pd .DataFrame .from_dict (section_data , orient = "index" )
404
+ filtered_data = filter_and_rename_columns (data , columns )
405
+ return [filtered_data ], columns
406
+
407
+ logger .warning (f"Section '{ section_name } ' not found in module '{ module } '" )
408
+ return [pd .DataFrame ()], []
0 commit comments