diff --git a/python/paddle/distributed/auto_tuner/tuner.py b/python/paddle/distributed/auto_tuner/tuner.py index b3b6cbf3cdc528..6a6a0ba4e082ff 100644 --- a/python/paddle/distributed/auto_tuner/tuner.py +++ b/python/paddle/distributed/auto_tuner/tuner.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import csv +import os from .utils import default_candidates, gbs_default_candidates @@ -54,6 +56,8 @@ def __init__(self, tuner_cfg): raise NotImplementedError() self.history_cfgs = [] + self.resume_cfgs = [] + self.tuner_cfg = tuner_cfg def search_once(self): """Return a new task config.""" @@ -67,3 +71,76 @@ def search_once(self): def add_cfg(self, cfg): """Add cfg into history cfgs""" self.history_cfgs.append(cfg) + + def resume_form_history(self, history_csv_path="./history.csv"): + """Resume form history csv file""" + # The breakpoint resume function does not start when the resume csv file does not exist. + if not os.path.exists(history_csv_path): + return + resume_csv_path = os.path.join( + os.path.dirname(history_csv_path), + f'{os.path.basename(history_csv_path).split(".")[0]}_copy.csv', + ) + with open(history_csv_path, "r") as fread: + reader = csv.reader(fread) + data_list = list(reader) + with open(resume_csv_path, "w") as fwrite: + writer = csv.writer(fwrite) + for row in data_list: + writer.writerow(row) + # chang str type to real type + for row in data_list: + for i, value in enumerate(row): + try: + row[i] = int(value) + except ValueError: + try: + row[i] = float(value) + except ValueError: + pass + + data_dict = [] + keys = data_list[0] + values = data_list[1:] + for val in values: + val = [x if x != '' else None for x in val] + val = [True if x == 'True' else x for x in val] + val = [False if x == 'False' else x for x in val] + dictionary = dict(zip(keys, val)) + time_val = -1 + target_key = self.tuner_cfg["metric_cfg"]["name"] + if dictionary[target_key]: + time_val = dictionary[target_key] + dictionary["time"] = time_val + data_dict.append(dictionary) + self.resume_cfgs = data_dict + + def get_cfg_from_resume(self, cur_cfg): + """Get cfg from resume cfgs""" + keys_to_compare = [ + 'mp_degree', + 'sharding_degree', + 'pp_degree', + 'dp_degree', + 'sharding_stage', + 'micro_batch_size', + 'vpp_degree', + 'use_recompute', + 'recompute_granularity', + 'num_gpus', + 'nodes', + 'global_batch_size', + 'sharding_overlap', + 'acc_steps', + ] + for cfg in self.resume_cfgs: + ret_is_same = True + for key in keys_to_compare: + if not cfg.get(key) and not cur_cfg.get(key): + continue + else: + is_same = str(cfg.get(key)) == str(cur_cfg.get(key)) + ret_is_same = ret_is_same and is_same + if ret_is_same: + return cfg + return None diff --git a/python/paddle/distributed/launch/main.py b/python/paddle/distributed/launch/main.py index 6d821860c0fb58..3562d38d9a92c0 100644 --- a/python/paddle/distributed/launch/main.py +++ b/python/paddle/distributed/launch/main.py @@ -587,6 +587,10 @@ def launch(): logger.info( f"Launch {len(auto_tuner.algo.all_tasks)} tasks by auto tuner: " ) + resume_csv_file_path = tuner_cfg.get( + "resume_csv_file_path", history_file_path + ) + auto_tuner.resume_form_history(resume_csv_file_path) cur_cfg = auto_tuner.search_once() auto_tuner.add_cfg(cur_cfg) assert cur_cfg is not None, "No config can run." @@ -658,6 +662,73 @@ def launch(): ) logger.info(f"Launch task: job_id {task_job_id}, log_dir {log_dir}") + cur_resume_cfg = auto_tuner.get_cfg_from_resume(cur_cfg) + if cur_resume_cfg: + cur_cfg = cur_resume_cfg + cur_cfg['job_id'] = job_id + auto_tuner.history_cfgs.pop(-1) + auto_tuner.add_cfg(cur_cfg) + recorder.add_cfg(**cur_cfg) + cur_best_cfgs, err = recorder.get_best( + metric=tuner_cfg['metric_cfg']['name'], + direction=tuner_cfg['metric_cfg']['OptimizationDirection'], + ) + if not err: + ctx.logger.info(f"Current best config: {cur_best_cfgs}") + logger.info(f"Current best config: {cur_best_cfgs}") + else: + ctx.logger.info( + "Get best config failed. Currently no config can be run." + ) + logger.info( + "Get best config failed. Currently no config can be run." + ) + if ( + "sharding_overlap" in cur_cfg + and cur_cfg["sharding_overlap"] + ): + add_overlap_performance( + cur_cfg, tuner_cfg, recorder.history + ) + + if cur_cfg["error_info"]: + error_task_nums += 1 + error_info = cur_cfg["error_info"] + task_nums = len(auto_tuner.algo.all_tasks) + cur_task_id = auto_tuner.algo.idx + ctx.logger.info( + "Auto Tuner Schedule: [{}/{}], Pruned nums {}, Error nums {}, Error info {}, Remaining time {} min".format( + cur_task_id, + task_nums, + cur_task_id - job_id, + error_task_nums, + error_info, + round( + (task_nums - cur_task_id) * max_time_per_task / 60, + 2, + ), + ) + ) + logger.info( + "Auto Tuner Schedule: [{}/{}], Pruned nums {}, Error nums {}, Error info {}, Remaining time {} min".format( + cur_task_id, + task_nums, + cur_task_id - job_id, + error_task_nums, + error_info, + round( + (task_nums - cur_task_id) * max_time_per_task / 60, + 2, + ), + ) + ) + recorder.store_history(history_file_path) + # generate a new config + new_cfg = auto_tuner.search_once() + cur_cfg = copy.deepcopy(new_cfg) + auto_tuner.add_cfg(cur_cfg) + continue + # in single dp estimation scene, just some nodes not all nodes run ctx = gen_new_ctx(ctx, cur_cfg, tuner_cfg) actual_nnodes = int(ctx.args.nnodes.split(":")[0])