Skip to content

Commit

Permalink
[auto config] Resume from history csv file
Browse files Browse the repository at this point in the history
  • Loading branch information
xysheng-baidu committed Dec 27, 2023
1 parent de7b288 commit 7817574
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 0 deletions.
82 changes: 82 additions & 0 deletions python/paddle/distributed/auto_tuner/tuner.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import csv
import os

from .utils import default_candidates, gbs_default_candidates

Expand Down Expand Up @@ -54,6 +56,8 @@ def __init__(self, tuner_cfg):
raise NotImplementedError()

self.history_cfgs = []
self.resume_cfgs = []
self.tuner_cfg = tuner_cfg

def search_once(self):
"""Return a new task config."""
Expand All @@ -67,3 +71,81 @@ def search_once(self):
def add_cfg(self, cfg):
"""Add cfg into history cfgs"""
self.history_cfgs.append(cfg)

def resume_form_history(self, history_csv_path="./history.csv"):
"""Resume form history csv file"""
# The breakpoint resume function does not start when the resume csv file does not exist.
if not os.path.exists(history_csv_path):
return
resume_csv_path = os.path.join(
os.path.dirname(history_csv_path),
f'{os.path.basename(history_csv_path).split(".")[0]}_copy.csv',
)
with open(history_csv_path, "r") as fread:
reader = csv.reader(fread)
data_list = list(reader)
with open(resume_csv_path, "w") as fwrite:
writer = csv.writer(fwrite)
for row in data_list:
writer.writerow(row)
# chang str type to real type
for row in data_list:
for i, value in enumerate(row):
try:
row[i] = int(value)
except ValueError:
try:
row[i] = float(value)
except ValueError:
pass
# if value == "False":
# row[i] = bool(False)
# if value == "True":
# row[i] = bool(True)
# row[i] = value

data_dict = []
keys = data_list[0]
values = data_list[1:]
for val in values:
val = [x if x != '' else None for x in val]
val = [True if x == 'True' else x for x in val]
val = [False if x == 'False' else x for x in val]
dictionary = dict(zip(keys, val))
time_val = -1
target_key = self.tuner_cfg["metric_cfg"]["name"]
if dictionary[target_key]:
time_val = dictionary[target_key]
dictionary["time"] = time_val
data_dict.append(dictionary)
self.resume_cfgs = data_dict

def get_cfg_from_resume(self, cur_cfg):
"""Get cfg from resume cfgs"""
keys_to_compare = [
'mp_degree',
'sharding_degree',
'pp_degree',
'dp_degree',
'sharding_stage',
'micro_batch_size',
'vpp_degree',
'use_recompute',
'recompute_granularity',
'num_gpus',
'nodes',
'global_batch_size',
'sharding_overlap',
'acc_steps',
]
for cfg in self.resume_cfgs:
ret_is_same = True
for key in keys_to_compare:
if not cfg.get(key) and not cur_cfg.get(key):
continue
else:
is_same = str(cfg.get(key)) == str(cur_cfg.get(key))
ret_is_same = ret_is_same and is_same
if ret_is_same:
return cfg
return None
71 changes: 71 additions & 0 deletions python/paddle/distributed/launch/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,10 @@ def launch():
logger.info(
f"Launch {len(auto_tuner.algo.all_tasks)} tasks by auto tuner: "
)
resume_csv_file_path = tuner_cfg.get(
"resume_csv_file_path", history_file_path
)
auto_tuner.resume_form_history(resume_csv_file_path)
cur_cfg = auto_tuner.search_once()
auto_tuner.add_cfg(cur_cfg)
assert cur_cfg is not None, "No config can run."
Expand Down Expand Up @@ -658,6 +662,73 @@ def launch():
)
logger.info(f"Launch task: job_id {task_job_id}, log_dir {log_dir}")

cur_resume_cfg = auto_tuner.get_cfg_from_resume(cur_cfg)
if cur_resume_cfg:
cur_cfg = cur_resume_cfg
cur_cfg['job_id'] = job_id
auto_tuner.history_cfgs.pop(-1)
auto_tuner.add_cfg(cur_cfg)
recorder.add_cfg(**cur_cfg)
cur_best_cfgs, err = recorder.get_best(
metric=tuner_cfg['metric_cfg']['name'],
direction=tuner_cfg['metric_cfg']['OptimizationDirection'],
)
if not err:
ctx.logger.info(f"Current best config: {cur_best_cfgs}")
logger.info(f"Current best config: {cur_best_cfgs}")
else:
ctx.logger.info(
"Get best config failed. Currently no config can be run."
)
logger.info(
"Get best config failed. Currently no config can be run."
)
if (
"sharding_overlap" in cur_cfg
and cur_cfg["sharding_overlap"]
):
add_overlap_performance(
cur_cfg, tuner_cfg, recorder.history
)

if cur_cfg["error_info"]:
error_task_nums += 1
error_info = cur_cfg["error_info"]
task_nums = len(auto_tuner.algo.all_tasks)
cur_task_id = auto_tuner.algo.idx
ctx.logger.info(
"Auto Tuner Schedule: [{}/{}], Pruned nums {}, Error nums {}, Error info {}, Remaining time {} min".format(
cur_task_id,
task_nums,
cur_task_id - job_id,
error_task_nums,
error_info,
round(
(task_nums - cur_task_id) * max_time_per_task / 60,
2,
),
)
)
logger.info(
"Auto Tuner Schedule: [{}/{}], Pruned nums {}, Error nums {}, Error info {}, Remaining time {} min".format(
cur_task_id,
task_nums,
cur_task_id - job_id,
error_task_nums,
error_info,
round(
(task_nums - cur_task_id) * max_time_per_task / 60,
2,
),
)
)
recorder.store_history(history_file_path)
# generate a new config
new_cfg = auto_tuner.search_once()
cur_cfg = copy.deepcopy(new_cfg)
auto_tuner.add_cfg(cur_cfg)
continue

# in single dp estimation scene, just some nodes not all nodes run
ctx = gen_new_ctx(ctx, cur_cfg, tuner_cfg)
actual_nnodes = int(ctx.args.nnodes.split(":")[0])
Expand Down

0 comments on commit 7817574

Please sign in to comment.