From 46e15ea5a27738912ea48fbc2eb89fee0b81c9ef Mon Sep 17 00:00:00 2001 From: spwoodcock Date: Fri, 20 Sep 2024 18:48:31 +0100 Subject: [PATCH] refactor: refactor update_xlsform for readability + minor bugfixes --- osm_fieldwork/update_xlsform.py | 95 ++++++++++++++++++++------------- 1 file changed, 58 insertions(+), 37 deletions(-) diff --git a/osm_fieldwork/update_xlsform.py b/osm_fieldwork/update_xlsform.py index 69148782c..dfcfa0e45 100644 --- a/osm_fieldwork/update_xlsform.py +++ b/osm_fieldwork/update_xlsform.py @@ -16,11 +16,14 @@ FEATURE_COLUMN = "feature" NAME_COLUMN = "name" SURVEY_GROUP_NAME = "survey_questions" -DIGITISATION_GROUP_NAME = "verification" -def filter_df_empty_rows(df, column=NAME_COLUMN): - """Remove rows with None values in the specified column, but retain group rows if they exist.""" +def filter_df_empty_rows(df: pd.DataFrame, column: str = NAME_COLUMN): + """Remove rows with None values in the specified column. + + NOTE We retain 'end group' and 'end group' rows even if they have no name. + NOTE A generic df.dropna(how="all") would not catch accidental spaces etc. + """ if column in df.columns: # Only retain 'begin group' and 'end group' if 'type' column exists if "type" in df.columns: @@ -30,58 +33,76 @@ def filter_df_empty_rows(df, column=NAME_COLUMN): return df -def merge_dataframes(mandatory_df, custom_df, digitisation_df, is_survey_sheet=False): - """Merge multiple Pandas dataframes together, keeping common fields in custom_df.""" - # Filter dataframes (remove rows with None in 'name' column) +def merge_dataframes(mandatory_df: pd.DataFrame, user_question_df: pd.DataFrame, digitisation_df: pd.DataFrame): + """Merge multiple Pandas dataframes together, removing duplicate fields.""" + # Remove empty rows from dataframes mandatory_df = filter_df_empty_rows(mandatory_df) - custom_df = filter_df_empty_rows(custom_df) + user_question_df = filter_df_empty_rows(user_question_df) digitisation_df = filter_df_empty_rows(digitisation_df) - # Find common fields between custom_df and mandatory_df or digitisation_df - common_fields = set(custom_df[NAME_COLUMN]).intersection( + # Find common fields between user_question_df and mandatory_df or digitisation_df + # We use this to remove duplicates from the survey, giving our fields priority + duplicate_fields = set(user_question_df[NAME_COLUMN]).intersection( set(mandatory_df[NAME_COLUMN]).union(set(digitisation_df[NAME_COLUMN])) ) - # Split custom DataFrame into common and non-common fields - custom_common_df = custom_df[custom_df[NAME_COLUMN].isin(common_fields)] - custom_non_common_df = custom_df[~custom_df[NAME_COLUMN].isin(common_fields)] - - # Filter out common fields from mandatory and digitisation DataFrames - mandatory_df_filtered = mandatory_df[~mandatory_df[NAME_COLUMN].isin(common_fields)] - digitisation_df_filtered = digitisation_df[~digitisation_df[NAME_COLUMN].isin(common_fields)] + # Is choices sheet, return ordered merged choices + if "list_name" in user_question_df.columns: + user_question_df_filtered = user_question_df[~user_question_df[NAME_COLUMN].isin(duplicate_fields)] - if not is_survey_sheet: return pd.concat( - [custom_common_df, mandatory_df_filtered, custom_non_common_df, digitisation_df_filtered], + [ + mandatory_df, + user_question_df_filtered, + digitisation_df, + ], ignore_index=True, ) - # Create groups for survey and digitisation - survey_group = create_group(SURVEY_GROUP_NAME) - digitisation_group = create_group(DIGITISATION_GROUP_NAME) - digitisation_group["start"]["relevant"] = ["(${new_feature} = 'yes') or (${building_exists} = 'yes')"] + # Else we are processing the survey sheet, continue + + # NOTE filter out 'end group' from duplicate check as they have empty NAME_COLUMN + end_group_rows = user_question_df[user_question_df["type"].isin(["end group", "end_group"])] + user_question_df_filtered = user_question_df[ + (~user_question_df[NAME_COLUMN].isin(duplicate_fields)) | (user_question_df.index.isin(end_group_rows.index)) + ] + + # Create survey group wrapper + survey_group = create_survey_group(SURVEY_GROUP_NAME) # Concatenate dataframes in the desired order return pd.concat( [ - custom_common_df, - mandatory_df_filtered, - survey_group["start"], - custom_non_common_df, + mandatory_df, + # Wrap the survey question in a group + survey_group["begin"], + user_question_df_filtered, survey_group["end"], - digitisation_group["start"], - digitisation_df_filtered, - digitisation_group["end"], + digitisation_df, ], ignore_index=True, ) -def create_group(name: str) -> dict[str, pd.DataFrame]: - """Helper function to create a start and end group for XLSForm.""" - start_group = pd.DataFrame({"type": ["begin group"], "name": [name]}) - end_group = pd.DataFrame({"type": ["end group"], "name": [f"end of {name}"]}) - return {"start": start_group, "end": end_group} +def create_survey_group(name: str) -> dict[str, pd.DataFrame]: + """Helper function to create a begin and end group for XLSForm.""" + begin_group = pd.DataFrame( + { + "type": ["begin group"], + "name": [name], + "label::English(en)": [name], + "label::Swahili(sw)": [name], + "label::French(fr)": [name], + "label::Spanish(es)": [name], + "relevant": "(${new_feature} = 'yes') or (${building_exists} = 'yes')", + } + ) + end_group = pd.DataFrame( + { + "type": ["end group"], + } + ) + return {"begin": begin_group, "end": end_group} def append_select_one_from_file_row(df: pd.DataFrame, entity_name: str) -> pd.DataFrame: @@ -168,16 +189,16 @@ async def append_mandatory_fields( # Merge 'survey' and 'choices' sheets if "survey" in custom_sheets: custom_sheets["survey"] = merge_dataframes( - mandatory_sheets.get("survey"), custom_sheets.get("survey"), digitisation_sheets.get("survey"), True + mandatory_sheets.get("survey"), custom_sheets.get("survey"), digitisation_sheets.get("survey") ) # Hardcode the form_category value for the start instructions if form_category.endswith("s"): # Plural to singular - form_category_singular = form_category[:-1] + form_category = form_category[:-1] form_category_row = custom_sheets["survey"].loc[custom_sheets["survey"]["name"] == "form_category"] if not form_category_row.empty: custom_sheets["survey"].loc[custom_sheets["survey"]["name"] == "form_category", "calculation"] = ( - f"once('{form_category_singular}')" + f"once('{form_category}')" ) if "choices" in custom_sheets: