Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[auto config] Resume from history csv file #60417

Merged
merged 1 commit into from
Dec 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions python/paddle/distributed/auto_tuner/tuner.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import csv
import os

from .utils import default_candidates, gbs_default_candidates

Expand Down Expand Up @@ -54,6 +56,8 @@ def __init__(self, tuner_cfg):
raise NotImplementedError()

self.history_cfgs = []
self.resume_cfgs = []
self.tuner_cfg = tuner_cfg

def search_once(self):
"""Return a new task config."""
Expand All @@ -67,3 +71,76 @@ def search_once(self):
def add_cfg(self, cfg):
"""Add cfg into history cfgs"""
self.history_cfgs.append(cfg)

def resume_form_history(self, history_csv_path="./history.csv"):
"""Resume form history csv file"""
# The breakpoint resume function does not start when the resume csv file does not exist.
if not os.path.exists(history_csv_path):
return
resume_csv_path = os.path.join(
os.path.dirname(history_csv_path),
f'{os.path.basename(history_csv_path).split(".")[0]}_copy.csv',
)
with open(history_csv_path, "r") as fread:
reader = csv.reader(fread)
data_list = list(reader)
with open(resume_csv_path, "w") as fwrite:
writer = csv.writer(fwrite)
for row in data_list:
writer.writerow(row)
# chang str type to real type
for row in data_list:
for i, value in enumerate(row):
try:
row[i] = int(value)
except ValueError:
try:
row[i] = float(value)
except ValueError:
pass

data_dict = []
keys = data_list[0]
values = data_list[1:]
for val in values:
val = [x if x != '' else None for x in val]
val = [True if x == 'True' else x for x in val]
val = [False if x == 'False' else x for x in val]
dictionary = dict(zip(keys, val))
time_val = -1
target_key = self.tuner_cfg["metric_cfg"]["name"]
if dictionary[target_key]:
time_val = dictionary[target_key]
dictionary["time"] = time_val
data_dict.append(dictionary)
self.resume_cfgs = data_dict

def get_cfg_from_resume(self, cur_cfg):
"""Get cfg from resume cfgs"""
keys_to_compare = [
'mp_degree',
'sharding_degree',
'pp_degree',
'dp_degree',
'sharding_stage',
'micro_batch_size',
'vpp_degree',
'use_recompute',
'recompute_granularity',
'num_gpus',
'nodes',
'global_batch_size',
'sharding_overlap',
'acc_steps',
]
for cfg in self.resume_cfgs:
ret_is_same = True
for key in keys_to_compare:
if not cfg.get(key) and not cur_cfg.get(key):
continue
else:
is_same = str(cfg.get(key)) == str(cur_cfg.get(key))
ret_is_same = ret_is_same and is_same
if ret_is_same:
return cfg
return None
71 changes: 71 additions & 0 deletions python/paddle/distributed/launch/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,10 @@ def launch():
logger.info(
f"Launch {len(auto_tuner.algo.all_tasks)} tasks by auto tuner: "
)
resume_csv_file_path = tuner_cfg.get(
"resume_csv_file_path", history_file_path
)
auto_tuner.resume_form_history(resume_csv_file_path)
cur_cfg = auto_tuner.search_once()
auto_tuner.add_cfg(cur_cfg)
assert cur_cfg is not None, "No config can run."
Expand Down Expand Up @@ -658,6 +662,73 @@ def launch():
)
logger.info(f"Launch task: job_id {task_job_id}, log_dir {log_dir}")

cur_resume_cfg = auto_tuner.get_cfg_from_resume(cur_cfg)
if cur_resume_cfg:
cur_cfg = cur_resume_cfg
cur_cfg['job_id'] = job_id
auto_tuner.history_cfgs.pop(-1)
auto_tuner.add_cfg(cur_cfg)
recorder.add_cfg(**cur_cfg)
cur_best_cfgs, err = recorder.get_best(
metric=tuner_cfg['metric_cfg']['name'],
direction=tuner_cfg['metric_cfg']['OptimizationDirection'],
)
if not err:
ctx.logger.info(f"Current best config: {cur_best_cfgs}")
logger.info(f"Current best config: {cur_best_cfgs}")
else:
ctx.logger.info(
"Get best config failed. Currently no config can be run."
)
logger.info(
"Get best config failed. Currently no config can be run."
)
if (
"sharding_overlap" in cur_cfg
and cur_cfg["sharding_overlap"]
):
add_overlap_performance(
cur_cfg, tuner_cfg, recorder.history
)

if cur_cfg["error_info"]:
error_task_nums += 1
error_info = cur_cfg["error_info"]
task_nums = len(auto_tuner.algo.all_tasks)
cur_task_id = auto_tuner.algo.idx
ctx.logger.info(
"Auto Tuner Schedule: [{}/{}], Pruned nums {}, Error nums {}, Error info {}, Remaining time {} min".format(
cur_task_id,
task_nums,
cur_task_id - job_id,
error_task_nums,
error_info,
round(
(task_nums - cur_task_id) * max_time_per_task / 60,
2,
),
)
)
logger.info(
"Auto Tuner Schedule: [{}/{}], Pruned nums {}, Error nums {}, Error info {}, Remaining time {} min".format(
cur_task_id,
task_nums,
cur_task_id - job_id,
error_task_nums,
error_info,
round(
(task_nums - cur_task_id) * max_time_per_task / 60,
2,
),
)
)
recorder.store_history(history_file_path)
# generate a new config
new_cfg = auto_tuner.search_once()
cur_cfg = copy.deepcopy(new_cfg)
auto_tuner.add_cfg(cur_cfg)
continue

# in single dp estimation scene, just some nodes not all nodes run
ctx = gen_new_ctx(ctx, cur_cfg, tuner_cfg)
actual_nnodes = int(ctx.args.nnodes.split(":")[0])
Expand Down