更新时间:2025-03-04 gmt 08:00

预置框架启动文件的启动流程说明-九游平台

modelarts standard训练服务预置了多种ai框架,并对不同的框架提供了针对性适配,用户在使用这些预置框架进行模型训练时,训练的启动命令也需要做相应适配。

本章节详细介绍基于不同的预置框架创建训练作业时,如何修改训练的启动文件。

ascend-powered-engine框架启动原理

在modelarts创建训练作业界面选择ai框架时,有一个ai框架是“ascend-powered-engine”,它既不是一个ai框架(如:pytorch、tensorflow)也不是一个并行执行框架(如:mpi),而是适配加速芯片ascend的一组ai框架 运行环境 启动方式的集合。

由于主流的snt9系列ascend加速卡都跑在arm cpu规格的机器上,因此上层docker镜像也都是arm镜像。相对于gpu场景的镜像中安装了与gpu驱动适配的cuda(由英伟达推出的统一计算架构)计算库,ascend-powered-engine引擎的镜像中安装了与ascend驱动适配的cann(华为针对ai场景推出的异构计算架构)计算库。

提交训练作业后,modelarts standard平台会自动运行训练作业的启动文件。

ascend-powered-engine框架单机启动命令和分布式启动命令无区别。

ascend-powered-engine框架支持3种启动方式来启动“启动文件”,默认是基于“rank_table_file”启动,也可以通过配置环境变量“ma_run_method”使用其他方式来启动。ma_run_method环境变量支持torchrun和msrun。

  • 启动方式一:使用rtf文件启动训练作业

    在没有配置环境变量“ma_run_method”时,modelarts standard平台默认使用rank table file(rtf)文件启动训练作业的“启动文件”。

    每个训练作业的启动文件的运行次数取决于任务卡数,即在训练作业运行时,有n个任务卡数训练作业内就会运行n次启动文件。例如,单机1卡,则worker-0任务的启动文件会被运行1次;单机8卡,则worker-0任务的启动文件会被运行8次。因此需要避免在启动文件中进行端口监听。

    启动文件会被自动设置如下环境变量:

    • rank_table_file:rtf文件路径。
    • ascend_device_id:逻辑device_id,例如单卡训练,该值始终为 0。
    • rank_id:可以理解为训练作业级的device逻辑(顺序)编号。
    • rank_size:根据rtf中device的数目设置该值,例如“4 * snt9b”,则该值即为4。

    当需要启动文件仍然在逻辑上仅运行1次时,则可以在启动文件中判断“ascend_device_id”的值,当值为“0”则执行逻辑,当值为非0则直接退出。

    当需要启用ranktable动态路由进行训练网络加速时,则可以添加环境变量“route_plan=true”。同时,训练作业需要满足如下要求才能正常实现ranktable动态路由加速。
    • 联系九游平台的技术支持检查集群的cabinet插件是否开启。仅部分存量集群需要手动开启cabinet插件,大部分集群都是默认开启的。
    • 训练使用的python版本是3.7、3.8或3.9。
    • 训练作业的实例数要大于或等于3。
    • 路由加速的原理是改变rank编号,所以代码中对rank的使用要统一。
    • 需要确保训练作业的实例规格是节点满卡,如节点8卡场景,每个节点都必须是8卡,不能出现2卡或4卡场景。

    ascend-powered-engine框架对应的代码示例“mindspore-verification.py”,请参见训练mindspore-verification.py文件

  • 启动方式二:使用torchrun命令启动训练作业
    当环境变量“ma_run_method=torchrun”时,表示modelarts standard平台使用torchrun命令启动训练作业的“启动文件”。

    要求pytorch版本大于等于1.11.0。

    • 单机时,modelarts standard平台使用如下命令启动训练作业的“启动文件”。
      torchrun --standalone --nnodes=${ma_num_hosts} --nproc_per_node=${ma_num_gpus} ${ma_extra_torchrun_params} "启动文件" {arg1} {arg2} ...
    • 多机时,modelarts standard平台使用如下命令启动训练作业的“启动文件”。
      torchrun --nnodes=${ma_num_hosts} --nproc_per_node=${ma_num_gpus} --node_rank=${vc_task_index} --master_addr={master_addr} --master_port=${ma_torchrun_master_port} --rdzv_id={ma_job_name} --rdzv_backend=static ${ma_extra_torchrun_params} "启动文件" {arg1} {arg2} ...

    参数说明如下:

    • standalone:标识为单任务实例作业。
    • nnodes:任务实例个数。
    • nproc_per_node:每个任务实例启动的主进程数,设置为任务分配的npu数相同。
    • node_rank:任务rank,用于多任务分布式训练。
    • master_addr:主任务(rank 0)的地址,设置为任务worker-0的通信域名。
    • master_port:在主任务(rank 0)上,用于分布式训练期间通信的端口。默认设置为18888端口。当遇到master_port冲突问题时,可通过设置ma_torchrun_master_port环境变量值修改端口配置。
    • rdzv_id:rendezvous标识,设置为带有训练作业id的值。
    • rdzv_backend:rendezvous后端,固定设置为static,即不使用rendezvous,而是使用master_addr和master_port配置。另外,可通过设置ma_extra_torchrun_params环境变量值,以增加额外的torchrun命令参数,或是覆盖预设的torchrun命令参数。例如配置torchrun命令中rdzv_conf参数的训练作业api环境变量的部分示例如下:
      "environments": {
      "ma_run_method": "torchrun",
      "ma_extra_torchrun_params": "--rdzv_conf=timeout=7200"
      }

    如果在torchrun初始化分布式一致性协商阶段出现“runtimeerror:socket timeout”错误时,可以通过增加如下环境变量再次创建训练作业以查看torchrun初始化阶段的详细信息,进一步排查问题。

    • loglevel=info
    • torch_cpp_log_level=info
    • torch_distributed_debug=detail

    出现“runtimeerror: socket timeout”错误,一般是因为不同任务执行torchrun命令的时机差距过大导致的。torchrun命令执行时机差距过大,又多是因为在torchrun命令被执行之前任务还有一些初始化动作,例如下载训练数据集、ckpt等。这些初始化动作执行耗时差距过大会直接导致出现socket timeout错误。所以遇到socket timeout问题时首先需要排查的是各个任务执行torchrun的时间点差距是否在合理范围内,如果时间点差距过大,需要优化执行torchrun命令之前的初始化动作,使其时间点差距在合理范围内。

  • 启动方式三:使用msrun命令启动训练作业
    当环境变量“ma_run_method=msrun”时,表示modelarts standard平台使用msrun命令启动训练作业的“启动文件”。

    要求mindspore版本大于等于2.3.0。

    该方案支持动态组网和基于rank table file文件组网两种方式。当配置了环境变量ms_ranktable_enable="true",则msrun会读取rank table file文件内容进行组网。否则默认使用动态组网。

    msrun使用如下命令启动训练作业的“启动文件”。

    msrun --worker_num=${msrun_worker_num} --local_worker_num=${ma_num_gpus} --master_addr=${msrun_master_addr} --node_rank=${vc_task_index} --master_port=${msrun_master_port} --log_dir=${msrun_log_dir} --join=true --cluster_time_out=${msrun_cluster_time_out} --rank_table_file=${msrun_rank_table_file} "启动文件" {arg1} {arg2} ...

    参数说明如下:

    • worker_num:所有进程个数。因为一个卡起一个进程,所以也表示使用总卡数。
    • local_worker_num:当前节点进程个数,即当前节点使用的卡数。
    • master_addr:msrun组网调度进程所在节点的ip地址,单机场景无需配置。
    • master_port:msrun组网调度进程的端口。
    • node_rank:当前节点的编号。
    • log_dir:msrun组网和各个进程的日志输出地址。
    • join:训练进程拉起后,msrun进程是否仍存在,默认配置为“true”,等待所有进程退出后再退出。
    • cluster_time_out:集群组网超时时间,默认是“600s”,可通过环境变量“msrun_cluster_time_out”控制。
    • rank_table_file:rank table file文件地址,如果配置了环境变量“ms_ranktable_enable="true"”,启动时会增加该参数。

pytorch-gpu框架启动原理

单机多卡场景下平台会为启动文件额外拼接 --init_method "tcp://:" 参数。

多机多卡场景下平台会为启动文件额外拼接 --init_method "tcp://:" --rank --world_size 参数。

启动文件需要解析上述参数。

pytorch-gpu框架的代码示例,请参见示例:创建ddp分布式训练(pytorch gpu)中的方式一

tensorflow-gpu框架启动原理

单机场景下(即选择的实例数为1),modelarts只会在一个节点上启动一个训练容器,该训练容器独享节点规格的可使用资源。

多机场景下(即选择的实例数大于1),modelarts会优先在相同节点上启动一个parameter server(以下简称ps)和一个worker,平台会自动一比一分配ps与worker任务。例如,双机场景会分配2个ps和2个worker任务,并为启动文件额外注入如下参数。

--task_index  --ps_hosts  --worker_hosts  --job_name  

启动文件需要解析如下参数。

  • vc_task_index:task序号,如0、1、2。
  • tf_ps_hosts :ps节点地址数组,如“[xx-ps-0.xx:tcp_port,xx-ps-1.xx:tcp_port]”,tcp_port是一个在5000~10000的随机端口。
  • tf_worker_hosts:worker节点地址数组,如“[xx-worker-0.xx:tcp_port,xx-worker-1.xx:tcp_port]”,tcp_port是一个在5000~10000的随机端口。
  • ma_task_name:任务名称,取值是ps或worker。

具体示例请参见:tensorflow-gpu框架的代码示例mnist.py(单机)

horovod/mpi/mindspore-gpu

使用horovod/mpi/mindspore-gpu预置框架来运行的启动文件,平台自动以mpirun命令启动之。使用modelarts standard训练相应预置引擎,用户仅需关注启动文件(即训练脚本)的编写;mpirun命令和训练作业集群的构建都由平台自动完成。平台不会为启动文件额外拼接参数。

“pytorch_synthetic_benchmark.py”文件示例如下:

import argparse
import torch.backends.cudnn as cudnn
import torch.nn.functional as f
import torch.optim as optim
import torch.utils.data.distributed
from torchvision import models
import horovod.torch as hvd
import timeit
import numpy as np
# benchmark settings
parser = argparse.argumentparser(description='pytorch synthetic benchmark',
                                 formatter_class=argparse.argumentdefaultshelpformatter)
parser.add_argument('--fp16-allreduce', action='store_true', default=false,
                    help='use fp16 compression during allreduce')
parser.add_argument('--model', type=str, default='resnet50',
                    help='model to benchmark')
parser.add_argument('--batch-size', type=int, default=32,
                    help='input batch size')
parser.add_argument('--num-warmup-batches', type=int, default=10,
                    help='number of warm-up batches that don\'t count towards benchmark')
parser.add_argument('--num-batches-per-iter', type=int, default=10,
                    help='number of batches per benchmark iteration')
parser.add_argument('--num-iters', type=int, default=10,
                    help='number of benchmark iterations')
parser.add_argument('--no-cuda', action='store_true', default=false,
                    help='disables cuda training')
parser.add_argument('--use-adasum', action='store_true', default=false,
                    help='use adasum algorithm to do reduction')
args = parser.parse_args()
args.cuda = not args.no_cuda and torch.cuda.is_available()
hvd.init()
if args.cuda:
    # horovod: pin gpu to local rank.
    torch.cuda.set_device(hvd.local_rank())
cudnn.benchmark = true
# set up standard model.
model = getattr(models, args.model)()
# by default, adasum doesn't need scaling up learning rate.
lr_scaler = hvd.size() if not args.use_adasum else 1
if args.cuda:
    # move model to gpu.
    model.cuda()
    # if using gpu adasum allreduce, scale learning rate by local_size.
    if args.use_adasum and hvd.nccl_built():
        lr_scaler = hvd.local_size()
optimizer = optim.sgd(model.parameters(), lr=0.01 * lr_scaler)
# horovod: (optional) compression algorithm.
compression = hvd.compression.fp16 if args.fp16_allreduce else hvd.compression.none
# horovod: wrap optimizer with distributedoptimizer.
optimizer = hvd.distributedoptimizer(optimizer,
                                     named_parameters=model.named_parameters(),
                                     compression=compression,
                                     op=hvd.adasum if args.use_adasum else hvd.average)
# horovod: broadcast parameters & optimizer state.
hvd.broadcast_parameters(model.state_dict(), root_rank=0)
hvd.broadcast_optimizer_state(optimizer, root_rank=0)
# set up fixed fake data
data = torch.randn(args.batch_size, 3, 224, 224)
target = torch.longtensor(args.batch_size).random_() % 1000
if args.cuda:
    data, target = data.cuda(), target.cuda()
def benchmark_step():
    optimizer.zero_grad()
    output = model(data)
    loss = f.cross_entropy(output, target)
    loss.backward()
    optimizer.step()
def log(s, nl=true):
    if hvd.rank() != 0:
        return
    print(s, end='\n' if nl else '')
log('model: %s' % args.model)
log('batch size: %d' % args.batch_size)
device = 'gpu' if args.cuda else 'cpu'
log('number of %ss: %d' % (device, hvd.size()))
# warm-up
log('running warmup...')
timeit.timeit(benchmark_step, number=args.num_warmup_batches)
# benchmark
log('running benchmark...')
img_secs = []
for x in range(args.num_iters):
    time = timeit.timeit(benchmark_step, number=args.num_batches_per_iter)
    img_sec = args.batch_size * args.num_batches_per_iter / time
    log('iter #%d: %.1f img/sec per %s' % (x, img_sec, device))
    img_secs.append(img_sec)
# results
img_sec_mean = np.mean(img_secs)
img_sec_conf = 1.96 * np.std(img_secs)
log('img/sec per %s: %.1f  -%.1f' % (device, img_sec_mean, img_sec_conf))
log('total img/sec on %d %s(s): %.1f  -%.1f' %
    (hvd.size(), device, hvd.size() * img_sec_mean, hvd.size() * img_sec_conf))

run_mpi.sh文件内容如下:

#!/bin/bash
my_home=/home/ma-user
my_sshd_port=${my_sshd_port:-"36666"}
my_mpi_btl_tcp_if=${my_mpi_btl_tcp_if:-"eth0,bond0"}
my_task_index=${ma_task_index:-${vc_task_index:-${vk_task_index}}}
my_mpi_slots=${my_mpi_slots:-"${ma_num_gpus}"}
my_mpi_tune_file="${my_home}/env_for_user_process"
if [ -z ${my_mpi_slots} ]; then
    echo "[run_mpi] my_mpi_slots is empty, set it be 1"
    my_mpi_slots="1"
fi
printf "my_home: ${my_home}\nmy_sshd_port: ${my_sshd_port}\nmy_mpi_btl_tcp_if: ${my_mpi_btl_tcp_if}\nmy_task_index: ${my_task_index}\nmy_mpi_slots: ${my_mpi_slots}\n"
env | grep -e '^ma_|shared_|^s3_|^path|^vc_worker_|^scc|^cred' | grep -v '=$' > ${my_mpi_tune_file}
# add -x to each line
sed -i 's/^/-x /' ${my_mpi_tune_file}
sed -i "s|{{my_sshd_port}}|${my_sshd_port}|g" ${my_home}/etc/ssh/sshd_config
# start sshd service
bash -c "$(which sshd) -f ${my_home}/etc/ssh/sshd_config"
# confirm the sshd is up
netstat -anp | grep lis | grep ${my_sshd_port}
if [ $my_task_index -eq 0 ]; then
    # generate the hostfile of mpi
    for ((i=0; i<$ma_num_hosts; i  ))
    do
        eval hostname=${ma_vj_name}-${ma_task_name}-${i}.${ma_vj_name}
        echo "[run_mpi] hostname: ${hostname}"
        ip=""
        while [ -z "$ip" ]; do
            ip=$(ping -c 1 ${hostname} | grep "ping" | sed -e 's/ping .* .([0-9.] ). .*/\1/g')
            sleep 1
        done
        echo "[run_mpi] resolved ip: ${ip}"
        # test the sshd is up
        while :
        do
            if [ cat < /dev/null >/dev/tcp/${ip}/${my_sshd_port} ]; then
                break
            fi
            sleep 1
        done
        echo "[run_mpi] the sshd of ip ${ip} is up"
        echo "${ip} slots=$my_mpi_slots" >> ${my_home}/hostfile
    done
    printf "[run_mpi] hostfile:\n`cat ${my_home}/hostfile`\n"
fi
ret_code=0
if [ $my_task_index -eq 0 ]; then
    echo "[run_mpi] start exec command time: "$(date  "%y-%m-%d-%h:%m:%s")
    np=$(( ${ma_num_hosts} * ${my_mpi_slots} ))
    echo "[run_mpi] command: mpirun -np ${np} -hostfile ${my_home}/hostfile -mca plm_rsh_args \"-p ${my_sshd_port}\" -tune ${my_mpi_tune_file} ... $@"
    # execute mpirun at worker-0
    # mpirun
    mpirun \
        -np ${np} \
        -hostfile ${my_home}/hostfile \
        -mca plm_rsh_args "-p ${my_sshd_port}" \
        -tune ${my_mpi_tune_file} \
        -bind-to none -map-by slot \
        -x nccl_debug=info -x nccl_socket_ifname=${my_mpi_btl_tcp_if} -x nccl_socket_family=af_inet \
        -x horovod_mpi_threads_disable=1 \
        -x ld_library_path \
        -mca pml ob1 -mca btl ^openib -mca plm_rsh_no_tree_spawn true \
        "$@"
    ret_code=$?
    if [ $ret_code -ne 0 ]; then
        echo "[run_mpi] exec command failed, exited with $ret_code"
    else
        echo "[run_mpi] exec command successfully, exited with $ret_code"
    fi
    # stop 1...n worker by killing the sleep proc
    sed -i '1d' ${my_home}/hostfile
    if [ `cat ${my_home}/hostfile | wc -l` -ne 0 ]; then
        echo "[run_mpi] stop 1 to (n - 1) worker by killing the sleep proc"
        sed -i 's/${my_mpi_slots}/1/g' ${my_home}/hostfile
        printf "[run_mpi] hostfile:\n`cat ${my_home}/hostfile`\n"
        mpirun \
        --hostfile ${my_home}/hostfile \
        --mca btl_tcp_if_include ${my_mpi_btl_tcp_if} \
        --mca plm_rsh_args "-p ${my_sshd_port}" \
        -x path -x ld_library_path \
        pkill sleep \
        > /dev/null 2>&1
    fi
    echo "[run_mpi] exit time: "$(date  "%y-%m-%d-%h:%m:%s")
else
    echo "[run_mpi] the training log is in worker-0"
    sleep 365d
    echo "[run_mpi] exit time: "$(date  "%y-%m-%d-%h:%m:%s")
fi
exit $ret_code

相关文档

网站地图