1. 启动训练脚本
1.1 训练脚本
pip install deepspeed
git clone https://github.com/microsoft/DeepSpeedExamples.git
cd DeepSpeedExamples/training/cifar
bash run_ds.sh
1.2 相关环境依赖
- gcc 版本要小于 10;
- /usr/bin/gcc --> 链接到 /usr/bin/gcc-7 即可;
2 启动指令源码解读
deepspeed --bind_cores_to_rank cifar10_deepspeed.py --deepspeed $@
其中:
- deepspeed 为可执行脚本:miniconda3/envs/pytorch2.0/bin/deepspeed
#!/home/mtn/miniconda3/envs/pytorch2.0/bin/python
# EASY-INSTALL-DEV-SCRIPT: 'deepspeed==0.14.1+ffb53c25','deepspeed'
__requires__ = 'deepspeed==0.14.1+ffb53c25'
__import__('pkg_resources').require('deepspeed==0.14.1+ffb53c25')
__file__ = '/home/mtn/DeepSpeed/bin/deepspeed'
with open(__file__) as f:
exec(compile(f.read(), __file__, 'exec'))
- 启动的文件为 /home/mtn/DeepSpeed/bin/deepspeed:
#!/usr/bin/env python3
from deepspeed.launcher.runner import main
if __name__ == '__main__':
main()
# main 中的只要工作
device_count = get_accelerator().device_count() # 设备数量
args.master_addr = "127.0.0.1" # 设定主机侧地址
active_resources = parse_inclusion_exclusion(resource_pool, args.include, args.exclude) # 过滤设备
world_info_base64 = encode_world_info(active_resources) # 编码激活的设备资源信息
multi_node_exec = args.force_multi or len(active_resources) > 1 # 是否多节点执行
# 接下来 为 launcher 准备命令行参数
if not multi_node_exec:
deepspeed_launch = [
sys.executable, "-u", "-m", "deepspeed.launcher.launch", f"--world_info={world_info_base64}",
f"--master_addr={args.master_addr}", f"--master_port={args.master_port}"
]
if args.no_python:
deepspeed_launch.append("--no_python")
if args.module:
deepspeed_launch.append("--module")
if args.no_local_rank:
deepspeed_launch.append("--no_local_rank")
if args.save_pid:
deepspeed_launch += ["--save_pid", f"{os.getpid()}"]
if args.enable_each_rank_log:
deepspeed_launch.append(f"--enable_each_rank_log={args.enable_each_rank_log}")
if args.elastic_training:
deepspeed_launch.append("--enable_elastic_training")
deepspeed_launch.append(f"--max_elastic_nodes={args.max_elastic_nodes}")
deepspeed_launch.append(f"--min_elastic_nodes={args.min_elastic_nodes}")
if args.bind_cores_to_rank:
deepspeed_launch.append("--bind_cores_to_rank")
if args.bind_core_list is not None:
deepspeed_launch.append(f"--bind_core_list={args.bind_core_list}")
cmd = deepspeed_launch + [args.user_script] + args.user_args # 在此处将 deepspeed_launch 函数 和 用户脚本 及 用户参数 拼接起来
else:
args.launcher = args.launcher.lower()
if args.launcher == PDSH_LAUNCHER:
runner = PDSHRunner(args, world_info_base64)
elif args.launcher == OPENMPI_LAUNCHER:
runner = OpenMPIRunner(args, world_info_base64, resource_pool)
elif args.launcher == MPICH_LAUNCHER:
runner = MPICHRunner(args, world_info_base64, resource_pool)
elif args.launcher == IMPI_LAUNCHER:
runner = IMPIRunner(args, world_info_base64, resource_pool)
elif args.launcher == MVAPICH_LAUNCHER:
runner = MVAPICHRunner(args, world_info_base64, resource_pool)
elif args.launcher == SLURM_LAUNCHER:
runner = SlurmRunner(args, world_info_base64, resource_pool)
else:
raise NotImplementedError(f"Unknown launcher {args.launcher}")
if not runner.backend_exists():
raise RuntimeError(f"launcher '{args.launcher}' not installed.")
curr_path = os.path.abspath('.')
if 'PYTHONPATH' in env:
env['PYTHONPATH'] = curr_path + ":" + env['PYTHONPATH']
else:
env['PYTHONPATH'] = curr_path
excluded_vars = []
for exclude_key, var_list in EXCLUDE_ENVS.items():
if exclude_key in env.keys():
# key exists in launcher env -> var list should be used
excluded_vars += var_list
# load envs from accelerator
exports = EXPORT_ENVS + get_accelerator().export_envs()
for var in env.keys():
if any([var.startswith(name) for name in exports]):
if not any([var == name for name in excluded_vars]):
runner.add_export(var, env[var])
for environ_path in DEEPSPEED_ENVIRONMENT_PATHS:
environ_file = os.path.join(environ_path, DEEPSPEED_ENVIRONMENT_NAME)
if os.path.isfile(environ_file):
logger.info(f"deepspeed_env file = {environ_file}")
with open(environ_file, 'r') as fd:
for var in fd.readlines():
key, val = var.split('=', maxsplit=1)
runner.add_export(key, val)
if args.launcher == PDSH_LAUNCHER:
cmd, kill_cmd, env = runner.get_cmd(env, active_resources)
else:
cmd = runner.get_cmd(env, active_resources)
# cmd 内容:
# ['/home/mtn/miniconda3/envs/pytorch2.0/bin/python', '-u', '-m', 'deepspeed.launcher.launch', '--world_info=eyJsb2NhbGhvc3QiOiBbMCwgMV19',
# '--master_addr=127.0.0.1', '--master_port=29500', '--enable_each_rank_log=None', '--bind_cores_to_rank', 'cifar10_deepspeed.py', '--deepspeed']
result = subprocess.Popen(cmd, env=env) # 在此行启动子程序执行指令,启动子进程来执行我们的程序
多进程调试
import os, debugpy
base_port = 5001
rank = int(os.getenv("LOCAL_RANK"))
debugpy.listen(("0.0.0.0", base_port + rank))
print("Waiting for debugger to attach...", os.getpid())
debugpy.wait_for_client()
debugpy.set_breakpoint()
# 在 vscode 中打开项目目录,点击运行和调试,添加如下 launch.json,再点击开始调试即可:(安装了c++ 扩展才行)
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python: Attach 5001",
"type": "python",
"request": "attach",
"connect": {
"host": "0.0.0.0",
"port": 5001
},
"justMyCode": false
},
{
"name": "Python: Attach 5002",
"type": "python",
"request": "attach",
"connect": {
"host": "0.0.0.0",
"port": 5002
},
"justMyCode": true
},
]
}
deepspeed.launcher.launch 内容
def main():
args = parse_args()
import os, debugpy
base_port = 29500
rank = args.node_rank
debugpy.listen(("127.0.0.1", base_port + rank))
print("Waiting for debugger to attach...", os.getpid())
debugpy.wait_for_client()
debugpy.breakpoint()
current_env = os.environ.copy()
for k in current_env.keys():
if "NCCL" in k:
logger.info(f"{args.node_rank} {k}={current_env[k]}")
if args.world_info == "None":
raise ValueError("world_info can not be None")
world_info = base64.urlsafe_b64decode(args.world_info)
world_info = json.loads(world_info)
logger.info(f"WORLD INFO DICT: {world_info}")
node_list = list(world_info.keys())
args.nnodes = len(node_list)
local_node = node_list[args.node_rank]
local_gpu_ids = world_info[local_node]
num_local_procs = len(local_gpu_ids)
logger.info(f"nnodes={args.nnodes}, num_local_procs={num_local_procs}, node_rank={args.node_rank}")
global_rank_mapping = defaultdict(list)
curr_global_rank = 0
dist_world_size = 0
for node_id in node_list:
gids = world_info[node_id]
dist_world_size += len(gids)
for gid in gids:
global_rank_mapping[node_id].append(curr_global_rank)
curr_global_rank += 1
logger.info(f"global_rank_mapping={global_rank_mapping}")
logger.info(f"dist_world_size={dist_world_size}")
current_env["CUDA_VISIBLE_DEVICES"] = ",".join(map(str, local_gpu_ids))
logger.info(f"Setting CUDA_VISIBLE_DEVICES={current_env['CUDA_VISIBLE_DEVICES']}")
# set PyTorch distributed related environmental variables
current_env["MASTER_ADDR"] = args.master_addr
current_env["MASTER_PORT"] = str(args.master_port)
current_env["WORLD_SIZE"] = str(dist_world_size)
current_env["CROSS_RANK"] = str(args.node_rank)
current_env["CROSS_SIZE"] = str(args.nnodes)
current_env["LOCAL_SIZE"] = str(num_local_procs)
if args.save_pid:
print(f"launcher pid: {os.getpid()}")
pid_file = None
if args.save_pid:
launcher_pid = os.getpid()
pid_file = os.path.join(PID_FILE_BASEPATH, f"{args.save_pid}.deepspeed")
assert not os.path.isfile(pid_file), "pid file exists but shouldn't"
with open(pid_file, 'w') as fd:
fd.write(f"{launcher_pid}")
if not is_torch_elastic_compatible():
if args.enable_elastic_training:
logger.info(f"Disabling elastic training support as \
PyTorch version should be greater than 1.11.x")
args.enable_elastic_training = False
if os.path.exists(DLTS_POD_ENV_PATH):
with open(DLTS_POD_ENV_PATH) as file:
lines = file.readlines()
lines = [line.rstrip() for line in lines]
for line in lines:
if line.startswith('export FC_TASKROLE_NAME') or line.startswith('export FC_TASK_INDEX'):
key_val = line.split()[1]
key, val = key_val.split('=')
current_env[key] = val
processes = []
cmd = []
if not args.enable_elastic_training:
if args.enable_each_rank_log != "None":
# prepare the log path and the file name prefix
if os.path.isfile(args.enable_each_rank_log):
raise ValueError(f"{args.enable_each_rank_log} should not be a file, it should be a directory.")
if not os.path.exists(args.enable_each_rank_log):
try:
os.makedirs(args.enable_each_rank_log)
except Exception as e:
print(e)
raise ValueError(f"unable to create directory {args.enable_each_rank_log} for each rank log.")
log_name_prefix = time.strftime("%Y%m%d%H%M%S", time.localtime())
for local_proc in range(0, num_local_procs):
# each process's rank
dist_rank = global_rank_mapping[local_node][local_proc]
local_rank = dist_rank % num_local_procs
current_env["RANK"] = str(dist_rank)
current_env["LOCAL_RANK"] = str(local_rank)
# spawn the processes
cmd = []
if args.bind_cores_to_rank:
cores_per_rank, numactl_cmd = get_numactl_cmd(args.bind_core_list, num_local_procs, local_rank)
current_env["OMP_NUM_THREADS"] = f"{cores_per_rank}"
cmd = cmd + numactl_cmd
if not args.no_python:
cmd.append(sys.executable)
cmd.append("-u")
if args.module:
cmd.append("-m")
else:
if args.module:
raise ValueError("Don't use both the '--no_python' flag"
" and the '--module' flag at the same time.")
cmd.append(args.training_script)
# A user may not want to pass local_rank as a keyword arg so we make this optional.
if not args.no_local_rank:
cmd.append(f"--local_rank={local_rank}")
cmd += args.training_script_args
if args.enable_each_rank_log != "None":
log_file = os.path.join(args.enable_each_rank_log, f"{log_name_prefix}_rank{dist_rank}.log")
log_fd = open(log_file, 'w')
process = subprocess.Popen(cmd, env=current_env, stdout=log_fd, stderr=log_fd)
else:
process = subprocess.Popen(cmd, env=current_env)
# logs the command from processes
logger.info(f"process {process.pid} spawned with command: {cmd}")
processes.append(process) # 添加到进程中
else:
from ..elasticity import DSElasticAgent
from torch.distributed.elastic.rendezvous import RendezvousParameters
from torch.distributed.elastic.agent.server.api import WorkerSpec
import torch.distributed.elastic.rendezvous.registry as rdzv_registry
from torch.distributed.elastic.multiprocessing import Std
if args.min_elastic_nodes == -1:
args.min_elastic_nodes = 1
if args.max_elastic_nodes == -1:
args.max_elastic_nodes = args.nnodes
assert args.max_elastic_nodes > 0 and args.min_elastic_nodes > 0, "Max and Min nodes should be positive"
current_env["NCCL_ASYNC_ERROR_HANDLING"] = str(1)
# Get config and arguments
cmd = []
if not args.no_python:
cmd = [sys.executable, "-u"]
if args.module:
cmd.append("-m")
else:
if args.module:
raise ValueError("Don't use both the '--no_python' flag"
" and the '--module' flag at the same time.")
cmd.append(args.training_script)
cmd += args.training_script_args
cmd_args = cmd[1:]
rdzv_configs: Dict[str, str] = {'timeout': 100}
run_id = os.environ.get("ELASTIC_RUN_ID", ELASTIC_TRAINING_ID_DEFAULT)
# Creating config for rendezvous class
rdzv_parameters = RendezvousParameters(backend='c10d',
endpoint=args.master_addr + ":" + str(args.master_port),
run_id=run_id,
min_nodes=args.min_elastic_nodes,
max_nodes=args.max_elastic_nodes,
**rdzv_configs)
spec = WorkerSpec(
role='trainer',
local_world_size=num_local_procs,
entrypoint=cmd[0],
args=cmd[1:],
rdzv_handler=rdzv_registry.get_rendezvous_handler(rdzv_parameters),
max_restarts=100,
monitor_interval=5,
redirects=Std.from_str("0"),
tee=Std.from_str("0"),
master_addr=None,
master_port=None,
)
agent = DSElasticAgent(spec, current_env)
agent.run()
sig_names = {2: "SIGINT", 15: "SIGTERM"}
last_return_code = None
def sigkill_handler(signum, frame):
for process in processes:
logger.info(f"Killing subprocess {process.pid}")
try:
terminate_process_tree(process.pid)
except Exception:
pass
if last_return_code is not None:
logger.error(f"{cmd} exits with return code = {last_return_code}")
sys.exit(last_return_code)
if signum in sig_names:
logger.info(f"Main process received {sig_names[signum]}, exiting")
if args.save_pid:
if os.path.isfile(pid_file):
os.remove(pid_file)
sys.exit(1)
# pass SIGINT/SIGTERM to children if the parent is being terminated
signal.signal(signal.SIGINT, sigkill_handler)
signal.signal(signal.SIGTERM, sigkill_handler)
alive_processes = set(processes)
while len(alive_processes):
finished_processes = []
for process in alive_processes:
if process.poll() is None:
# the process is still running
continue
else:
if process.returncode != 0:
last_return_code = process.returncode # for sigkill_handler
sigkill_handler(signal.SIGTERM, None) # not coming back
else:
# exited cleanly
logger.info(f"Process {process.pid} exits successfully.")
finished_processes.append(process)
alive_processes = set(alive_processes) - set(finished_processes)
time.sleep(1)
cuda accelerate 类来监控device测配置
# /home/mtn/DeepSpeed/deepspeed/accelerator/cuda_accelerator.py
class CUDA_Accelerator(DeepSpeedAccelerator):
def __init__(self):
self._name = 'cuda'
self._communication_backend_name = 'nccl'
if pynvml is None:
self._init_pynvml()
def _init_pynvml(self):
global pynvml
try:
import pynvml
except ImportError:
return
try:
pynvml.nvmlInit()
except pynvml.NVMLError:
pynvml = None
return
def is_synchronized_device(self):
return False
def use_host_timers(self):
return self.is_synchronized_device()
def resolves_data_dependency(self):
return self.is_synchronized_device()
def handles_memory_backpressure(self):
return self.is_synchronized_device()
# Device APIs
def device_name(self, device_index=None):
if device_index is None:
return 'cuda'
return 'cuda:{}'.format(device_index)
def device(self, device_index=None):
return torch.cuda.device(device_index)
def set_device(self, device_index):
torch.cuda.set_device(device_index)
def current_device(self):
return torch.cuda.current_device()
def current_device_name(self):
return 'cuda:{}'.format(torch.cuda.current_device())
def device_count(self):
return torch.cuda.device_count()
def synchronize(self, device_index=None):
return torch.cuda.synchronize(device_index)
# RNG APIs
def random(self):
return torch.random
def set_rng_state(self, new_state, device_index=None):
if device_index is None:
return torch.cuda.set_rng_state(new_state)
return torch.cuda.set_rng_state(new_state, device_index)
def get_rng_state(self, device_index=None):
if device_index is None:
return torch.cuda.get_rng_state()
return torch.cuda.get_rng_state(device_index)
def manual_seed(self, seed):
return torch.cuda.manual_seed(seed)
def manual_seed_all(self, seed):
return torch.cuda.manual_seed_all(seed)
def initial_seed(self, seed):
return torch.cuda.initial_seed(seed)
def default_generator(self, device_index):
return torch.cuda.default_generators[device_index]
...
3 训练脚本解读
3.1 单进程读取数据
pytorch在分布式训练过程中,对于数据的读取是采用主进程预读取并缓存,然后其它进程从缓存中读取,不同进程之间的数据同步具体通过torch.distributed.barrier()实现, torch中采用了barrier()函数对其它非主进程进行阻塞,来达到同步的目的。
torch.distributed.barrier(),设置一个阻塞栅栏,让符合条件进程处于等待状态,等待所有进程到达栅栏处(包括主进程数据处理完毕);如果处理 create dataloader()函数的进程是主进程,其会直接去读取数据并处理,然后其处理结束之后会接着遇到torch.distributed.barrier(),此时,所有进程都到达了当前的栅栏处,这样所有进程就达到了同步,并同时得到释放。
要点是 不同进程可能在不同位置到达 barrier 处
3.2 ds 相关配置信息
{'train_batch_size': 16, 'steps_per_print': 2000, 'optimizer': {'type': 'Adam', 'params': {...}}, 'scheduler': {'type': 'WarmupLR', 'params': {...}},
'gradient_clipping': 1.0, 'prescale_gradients': False, 'bf16': {'enabled': False}, 'fp16': {'enabled': True, 'fp16_master_weights_and_grads': False, 'loss_scale': 0,
'loss_scale_window': 500, 'hysteresis': 2, 'min_loss_scale': 1, 'initial_scale_power': 15}, 'wall_clock_breakdown': False,
'zero_optimization': {'stage': 0, 'allgather_partitions': True, 'reduce_scatter': True, 'allgather_bucket_size': 50000000,
'reduce_bucket_size': 50000000, 'overlap_comm': True, 'contiguous_gradients': True, 'cpu_offload': False}}
3.3 deepspeed.initialize
- /home/mtn/DeepSpeed/deepspeed/init.py
3.3.1 用户接口**
先初始化DeepSpeed配置(即DeepSpeedConfig),然后初始化DeepSpeed引擎(DeepSpeedHybridEngine/DeepSpeedEngine/PipelineEngint).
返回engine、engine的optimizer、engine的training_dataloader、engine的lr_scheduler;
model_engine, optimizer, trainloader, __ = deepspeed.initialize(
args=args,
model=net,
model_parameters=parameters,
training_data=trainset,
config=ds_config,
)
3.3.2 initialize 中通信后端初始化
始化加速器并更新到全局变量ds_accelerator(详见get_accelerator函数),并初始化分布式计算后端框架(用于多个计算节点协同工作以加速训练,处理模型参数和梯度同步、通信等操作,存入全局变量ccl_backend,CCLBackend类,详见init_deepspeed_backend函数);
调用父类TorchBackend完成初始化(实际上调用torch.distributed.init_process_group初始化torch分布式环环境中进程组);
通过ccl_comm_op(即CCLCommBuilder)的get_kvs_addr创建一个主要的key-value存储器并将其内存地址广播(TorchBackend的broadcast方法)给其他机器, 根据当前机器编号(rank)、key-value存储器地址等初始化ccl_comm_op(即CCLCommBuilder类)
- available_coll:通信操作名列表(broadcast、all_reduce、inference_all_reduce、all_reduce_caching、barrier),详见ccl_comm_op的get_available_coll;
# cdb = TorchBackend(dist_backend, timeout, init_method, rank, world_size)
# TorchBackend 中对 pytorch 中的分布式 进行了封装
from deepspeed import comm as dist
dist_backend = get_accelerator().communication_backend_name()
dist.init_distributed(dist_backend=dist_backend, dist_init_required=dist_init_required)
3.3.3 DeepSpeedConfig 初始化
# DeepSpeedConfig初始化
# _param_dict:传入的配置dict;
# 根据_param_dict初始化如下变量,详见_initialize_params函数
train_batch_size:
train_micro_batch_size_per_gpu:
gradient_accumulation_steps:
steps_per_print:配置中取值10;
prescale_gradients:配置中配置为false;
gradient_clipping:配置中取值为1.0;
zero_config:zero_optimization配置参数;
zero_optimization_stage:zero_optimization配置参数中"stage"参数;
zero_enabled:zero_optimization_stage是否大于0;
……
activation_checkpointing_config:初始化DeepSpeedActivationCheckpointingConfig类;
comms_config:初始化DeepSpeedCommsConfig类;
flops_profiler_config:初始化DeepSpeedFlopsProfilerConfig类;
autotuning_config:初始化DeepSpeedAutotuningConfig类;
nebula_config:初始化DeepSpeedNebulaConfig类;
weight_quantization_config:初始化WeightQuantConfig类;
设置训练batch_size相关参数(如果没有配置则根据其他配置计算),详见_configure_train_batch_size函数
gradient_accumulation_steps:train_batch_size/(train_micro_batch_size_per_gpu*分布式进程个数);
train_micro_batch_size_per_gpu:train_batch_size/(gradient_accumulation_steps*分布式进程个数)
train_batch_size:train_micro_batch_size_per_gpu*分布式进程个数*gradient_accumulation_steps
train_micro_batch_size_per_gpu:train_batch_size/分布式进程个数(如果没有配置gradient_accumulation_steps,则gradient_accumulation_steps设置为1);
gradient_accumulation_steps:train_micro_batch_size_per_gpu*分布式进程个数(如果没有配置gradient_accumulation_steps,则gradient_accumulation_steps设置为1);
合法性检测,详见_do_sanity_check函数;