diff --git a/bin/ds_bench b/bin/ds_bench index bfacbc8e25c8..80bf4029604e 100755 --- a/bin/ds_bench +++ b/bin/ds_bench @@ -10,7 +10,10 @@ import sys required_env = ["RANK", "WORLD_SIZE", "MASTER_ADDR", "MASTER_PORT", "LOCAL_RANK"] if not all(map(lambda v: v in os.environ, required_env)): import subprocess - subprocess.run("deepspeed $(which ds_bench) " + " ".join(sys.argv[1:]), shell=True) + r = subprocess.check_output(["which", "ds_bench"]) + ds_bench_bin = r.decode('utf-8').strip() + safe_cmd = ["deepspeed", ds_bench_bin] + sys.argv[1:] + subprocess.run(safe_cmd) else: args = benchmark_parser().parse_args() rank = args.local_rank diff --git a/csrc/aio/py_test/ds_aio_job.py b/csrc/aio/py_test/ds_aio_job.py index bbddee1bf26d..e9579a48fe4d 100644 --- a/csrc/aio/py_test/ds_aio_job.py +++ b/csrc/aio/py_test/ds_aio_job.py @@ -6,6 +6,7 @@ Functionality of swapping tensors to/from (NVMe) storage devices. """ import subprocess +import shlex class Job(object): @@ -39,10 +40,10 @@ def close_output_file(self): def run_job(job): - args = ' '.join(job.cmd()) + args = shlex.split(' '.join(job.cmd())) print(f'args = {args}') job.open_output_file() - proc = subprocess.run(args=args, shell=True, stdout=job.get_stdout(), stderr=job.get_stderr(), cwd=job.get_cwd()) + proc = subprocess.run(args=args, stdout=job.get_stdout(), stderr=job.get_stderr(), cwd=job.get_cwd()) job.close_output_file() assert proc.returncode == 0, \ f"This command failed: {job.cmd()}" diff --git a/deepspeed/comm/comm.py b/deepspeed/comm/comm.py index 2895e0f2e011..df8e8022081d 100755 --- a/deepspeed/comm/comm.py +++ b/deepspeed/comm/comm.py @@ -697,8 +697,9 @@ def mpi_discovery(distributed_port=TORCH_DISTRIBUTED_DEFAULT_PORT, verbose=True) master_addr = None if rank == 0: - hostname_cmd = ["hostname -I"] - result = subprocess.check_output(hostname_cmd, shell=True) + import shlex + hostname_cmd = shlex.split("hostname -I") + result = subprocess.check_output(hostname_cmd) master_addr = result.decode('utf-8').split()[0] master_addr = comm.bcast(master_addr, root=0) diff --git a/deepspeed/elasticity/elastic_agent.py b/deepspeed/elasticity/elastic_agent.py index 039b999dfeca..c6a69dd2a49f 100644 --- a/deepspeed/elasticity/elastic_agent.py +++ b/deepspeed/elasticity/elastic_agent.py @@ -54,7 +54,9 @@ def _set_master_addr_port(store: Store, if master_addr is None: # master_addr = _get_fq_hostname() - result = subprocess.check_output("hostname -I", shell=True) + import shlex + safe_cmd = shlex.split("hostname -I") + result = subprocess.check_output(safe_cmd) master_addr = result.decode('utf-8').split()[0] store.set("MASTER_ADDR", master_addr.encode(encoding="UTF-8")) diff --git a/deepspeed/launcher/multinode_runner.py b/deepspeed/launcher/multinode_runner.py index f8d1e00203fa..a816b5685760 100644 --- a/deepspeed/launcher/multinode_runner.py +++ b/deepspeed/launcher/multinode_runner.py @@ -406,7 +406,7 @@ def backend_exists(self): if not mpiname_exists: warnings.warn("mpiname does not exist, mvapich is not installed properly") else: - results = subprocess.check_output('mpiname', shell=True) + results = subprocess.check_output(['mpiname']) mpiname_results = results.decode('utf-8').strip() if "MVAPICH2-GDR" in mpiname_results: exists = True diff --git a/deepspeed/launcher/runner.py b/deepspeed/launcher/runner.py index f140d73ee0d0..076f65b201b3 100755 --- a/deepspeed/launcher/runner.py +++ b/deepspeed/launcher/runner.py @@ -20,6 +20,7 @@ from copy import deepcopy import signal import time +import shlex from .multinode_runner import PDSHRunner, OpenMPIRunner, MVAPICHRunner, SlurmRunner, MPICHRunner, IMPIRunner from .constants import PDSH_LAUNCHER, OPENMPI_LAUNCHER, MVAPICH_LAUNCHER, SLURM_LAUNCHER, MPICH_LAUNCHER, IMPI_LAUNCHER @@ -445,7 +446,8 @@ def main(args=None): if args.ssh_port is not None: ssh_check_cmd += f"-p {args.ssh_port} " ssh_check_cmd += f"{first_host} hostname" - subprocess.check_call(ssh_check_cmd, stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL, shell=True) + safe_ssh_cmd = shlex.split(ssh_check_cmd) + subprocess.check_call(safe_ssh_cmd, stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL) except subprocess.CalledProcessError: raise RuntimeError( f"Using hostfile at {args.hostfile} but host={first_host} was not reachable via ssh. If you are running with a single node please remove {args.hostfile} or setup passwordless ssh." @@ -458,9 +460,9 @@ def main(args=None): if args.ssh_port is not None: ssh_check_cmd += f" -p {args.ssh_port}" ssh_check_cmd += f" {first_host} hostname -I" - hostname_cmd = [ssh_check_cmd] + hostname_cmd = shlex.split(ssh_check_cmd) try: - result = subprocess.check_output(hostname_cmd, shell=True) + result = subprocess.check_output(hostname_cmd) except subprocess.CalledProcessError as err: logger.error( "Unable to detect suitable master address via `hostname -I`, please manually specify one via --master_addr" diff --git a/op_builder/builder.py b/op_builder/builder.py index 807cbee7aa70..e935a179f6af 100644 --- a/op_builder/builder.py +++ b/op_builder/builder.py @@ -253,7 +253,8 @@ def get_rocm_gpu_arch(): rocm_info = Path("rocminfo") rocm_gpu_arch_cmd = str(rocm_info) + " | grep -o -m 1 'gfx.*'" try: - result = subprocess.check_output(rocm_gpu_arch_cmd, shell=True) + safe_cmd = shlex.split(rocm_gpu_arch_cmd) + result = subprocess.check_output(safe_cmd) rocm_gpu_arch = result.decode('utf-8').strip() except subprocess.CalledProcessError: rocm_gpu_arch = "" @@ -271,7 +272,8 @@ def get_rocm_wavefront_size(): rocm_wavefront_size_cmd = str( rocm_info) + " | grep -Eo -m1 'Wavefront Size:[[:space:]]+[0-9]+' | grep -Eo '[0-9]+'" try: - result = subprocess.check_output(rocm_wavefront_size_cmd, shell=True) + safe_cmd = shlex.split(rocm_wavefront_size_cmd) + result = subprocess.check_output(rocm_wavefront_size_cmd) rocm_wavefront_size = result.decode('utf-8').strip() except subprocess.CalledProcessError: rocm_wavefront_size = "32" @@ -432,7 +434,7 @@ def _backup_cpuinfo(self): "to detect the CPU architecture. 'lscpu' does not appear to exist on " "your system, will fall back to use -march=native and non-vectorized execution.") return None - result = subprocess.check_output('lscpu', shell=True) + result = subprocess.check_output(['lscpu']) result = result.decode('utf-8').strip().lower() cpu_info = {} diff --git a/setup.py b/setup.py index 5b8a917dbf8f..1b6768d1b2c3 100755 --- a/setup.py +++ b/setup.py @@ -27,6 +27,7 @@ from setuptools.command import egg_info import time import typing +import shlex torch_available = True try: @@ -157,10 +158,11 @@ def get_env_if_set(key, default: typing.Any = ""): def command_exists(cmd): if sys.platform == "win32": - result = subprocess.Popen(f'{cmd}', stdout=subprocess.PIPE, shell=True) + safe_cmd = shlex.split(f'{cmd}') + result = subprocess.Popen(safe_cmd, stdout=subprocess.PIPE) return result.wait() == 1 else: - safe_cmd = ["bash", "-c", f"type {cmd}"] + safe_cmd = shlex.split(f"bash -c type {cmd}") result = subprocess.Popen(safe_cmd, stdout=subprocess.PIPE) return result.wait() == 0 @@ -200,13 +202,13 @@ def op_enabled(op_name): print(f'Install Ops={install_ops}') # Write out version/git info. -git_hash_cmd = "git rev-parse --short HEAD" -git_branch_cmd = "git rev-parse --abbrev-ref HEAD" +git_hash_cmd = shlex.split("bash -c git rev-parse --short HEAD") +git_branch_cmd = shlex.split("bash -c git rev-parse --abbrev-ref HEAD") if command_exists('git') and not is_env_set('DS_BUILD_STRING'): try: - result = subprocess.check_output(git_hash_cmd, shell=True) + result = subprocess.check_output(git_hash_cmd) git_hash = result.decode('utf-8').strip() - result = subprocess.check_output(git_branch_cmd, shell=True) + result = subprocess.check_output(git_branch_cmd) git_branch = result.decode('utf-8').strip() except subprocess.CalledProcessError: git_hash = "unknown" diff --git a/tests/model/BingBertSquad/BingBertSquad_test_common.py b/tests/model/BingBertSquad/BingBertSquad_test_common.py index ef42f85cc945..b47ddfe0c649 100755 --- a/tests/model/BingBertSquad/BingBertSquad_test_common.py +++ b/tests/model/BingBertSquad/BingBertSquad_test_common.py @@ -7,6 +7,7 @@ import subprocess import os import time +import shlex class BaseTestCase(unittest.TestCase): @@ -40,9 +41,9 @@ def ensure_directory_exists(self, filename): os.makedirs(dirname) def clean_test_env(self): - cmd = "dlts_ssh pkill -9 -f /usr/bin/python" + cmd = shlex.split("dlts_ssh pkill -9 -f /usr/bin/python") print(cmd) - subprocess.run(cmd, shell=True, check=False, executable='/bin/bash') + subprocess.run(cmd, check=False, executable='/bin/bash') time.sleep(20) def run_BingBertSquad_test(self, test_config, output): @@ -50,8 +51,8 @@ def run_BingBertSquad_test(self, test_config, output): other_args = " " + test_config["other_args"] if "other_args" in test_config else " " cmd = "./run_BingBertSquad_sanity.sh -e 1 -g {0} {1} {2}".format(test_config["gpus"], other_args, ds_flag) - + cmd = shlex.split(cmd) self.ensure_directory_exists(output) with open(output, "w") as f: print(cmd) - subprocess.run(cmd, shell=True, check=False, executable='/bin/bash', stdout=f, stderr=f) + subprocess.run(cmd, check=False, executable='/bin/bash', stdout=f, stderr=f) diff --git a/tests/model/Megatron_GPT2/run_checkpoint_test.py b/tests/model/Megatron_GPT2/run_checkpoint_test.py index d97a28ff1ad5..824f8269a972 100755 --- a/tests/model/Megatron_GPT2/run_checkpoint_test.py +++ b/tests/model/Megatron_GPT2/run_checkpoint_test.py @@ -10,6 +10,7 @@ import subprocess import os import re +import shlex from .test_common import BaseTestCase LAYERS = 2 @@ -18,9 +19,9 @@ def remove_file(test_id, filename): - cmd = f"if [ -f {filename} ] ; then rm -v {filename}; fi" + cmd = shlex.split(f"if [ -f {filename} ] ; then rm -v {filename}; fi") print(f"{test_id} cmd: {cmd}") - subprocess.run(cmd, shell=True, check=False, executable='/bin/bash') + subprocess.run(cmd, check=False, executable='/bin/bash') def grep_loss_from_file(file_name): @@ -451,9 +452,9 @@ def run_test(self, test_config, r_tol): checkpoint_name = test_config["checkpoint_name"] #---------------remove old checkpoint---------------# try: - cmd = f"rm -rf {checkpoint_name}" + cmd = shlex.split(f"rm -rf {checkpoint_name}") print(f"{self.id()} cmd: {cmd}") - subprocess.run(cmd, shell=True, check=False, executable='/bin/bash') + subprocess.run(cmd, check=False, executable='/bin/bash') except: print("No old checkpoint") @@ -474,8 +475,8 @@ def run_test(self, test_config, r_tol): # remove previous test log try: - cmd = f"rm {base_file}" - subprocess.run(cmd, shell=True, check=False, executable='/bin/bash') + cmd = shlex.split(f"rm {base_file}") + subprocess.run(cmd, check=False, executable='/bin/bash') except: print(f"{self.id()} No old logs") @@ -489,9 +490,9 @@ def run_test(self, test_config, r_tol): # set checkpoint load iteration try: - cmd = f"echo {checkpoint_interval} > {checkpoint_name}/latest_checkpointed_iteration.txt" + cmd = shlex.split(f"echo {checkpoint_interval} > {checkpoint_name}/latest_checkpointed_iteration.txt") print(f"{self.id()} running cmd: {cmd}") - subprocess.run(cmd, shell=True, check=False, executable='/bin/bash') + subprocess.run(cmd, check=False, executable='/bin/bash') except: print(f"{self.id()} Failed to update the checkpoint iteration file") return False @@ -506,8 +507,8 @@ def run_test(self, test_config, r_tol): # remove previous test log try: - cmd = f"rm {test_file}" - subprocess.run(cmd, shell=True, check=False, executable='/bin/bash') + cmd = shlex.split(f"rm {test_file}") + subprocess.run(cmd, check=False, executable='/bin/bash') except: print(f"{self.id()} no previous logs for") self.run_gpt2_test(test_config, test_file) diff --git a/tests/model/Megatron_GPT2/test_common.py b/tests/model/Megatron_GPT2/test_common.py index 1bcd891e31d5..4eb84ac7eeee 100755 --- a/tests/model/Megatron_GPT2/test_common.py +++ b/tests/model/Megatron_GPT2/test_common.py @@ -7,6 +7,7 @@ import subprocess import os import time +import shlex class BaseTestCase(unittest.TestCase): @@ -46,9 +47,9 @@ def ensure_directory_exists(self, filename): os.makedirs(dirname) def clean_test_env(self): - cmd = "dlts_ssh pkill -9 -f /usr/bin/python" + cmd = shlex.split("dlts_ssh pkill -9 -f /usr/bin/python") print(cmd) - subprocess.run(cmd, shell=True, check=False, executable='/bin/bash') + subprocess.run(cmd, check=False, executable='/bin/bash') time.sleep(20) def run_gpt2_test(self, test_config, output): @@ -60,8 +61,8 @@ def run_gpt2_test(self, test_config, output): test_config["mp"], test_config["gpus"], test_config["nodes"], test_config["bs"], test_config["steps"], test_config["layers"], test_config["hidden_size"], test_config["seq_length"], test_config["heads"], ckpt_num, other_args, ds_flag) - + cmd = shlex.split(cmd) self.ensure_directory_exists(output) with open(output, "w") as f: print(cmd) - subprocess.run(cmd, shell=True, check=False, executable='/bin/bash', stdout=f, stderr=f) + subprocess.run(cmd, check=False, executable='/bin/bash', stdout=f, stderr=f) diff --git a/tests/unit/common.py b/tests/unit/common.py index 14beeb317198..c9eb7ffaa5f4 100644 --- a/tests/unit/common.py +++ b/tests/unit/common.py @@ -58,6 +58,20 @@ def get_master_port(base_port=29500, port_range_size=1000): raise IOError('no free ports') +def _get_cpu_socket_count(): + import shlex + p1 = subprocess.Popen(shlex.split("cat /proc/cpuinfo"), stdout=subprocess.PIPE) + p2 = subprocess.Popen(["grep", "physical id"], stdin=p1.stdout, stdout=subprocess.PIPE) + p1.stdout.close() + p3 = subprocess.Popen(shlex.split("sort -u"), stdin=p2.stdout, stdout=subprocess.PIPE) + p2.stdout.close() + p4 = subprocess.Popen(shlex.split("wc -l"), stdin=p3.stdout, stdout=subprocess.PIPE) + p3.stdout.close() + r = int(p4.communicate()[0]) + p4.stdout.close() + return r + + def set_accelerator_visible(): cuda_visible = os.environ.get("CUDA_VISIBLE_DEVICES", None) xdist_worker_id = get_xdist_worker_id() @@ -95,9 +109,7 @@ def set_accelerator_visible(): num_accelerators = int(npu_smi.decode('utf-8').strip().split('\n')[0].split(':')[1].strip()) else: assert get_accelerator().device_name() == 'cpu' - cpu_sockets = int( - subprocess.check_output('cat /proc/cpuinfo | grep "physical id" | sort -u | wc -l', shell=True)) - num_accelerators = cpu_sockets + num_accelerators = _get_cpu_socket_count() if isinstance(num_accelerators, list): cuda_visible = ",".join(num_accelerators)