更新时间:2025-03-04 gmt 08:00

从0制作自定义镜像用于创建训练作业(mindspore ascend)-九游平台

本案例介绍如何从0到1制作ascend容器镜像,并使用该镜像在modelarts平台上进行训练。镜像中使用的ai引擎是mindspore,训练使用的资源是专属资源池的ascend芯片。

场景描述

目标:构建安装如下软件的容器镜像,并在modelarts平台上使用ascend规格资源运行训练作业。

  • ubuntu-18.04
  • cann-6.3.rc2 (商用版本)
  • python-3.7.13
  • mindspore-2.1.1
  • 本教程以cann-6.3.rc2、mindspore-2.1.1为例介绍。
  • 本示例仅用于示意ascend容器镜像制作流程,且在匹配正确的ascend驱动/固件版本的专属资源池上运行通过。

操作流程

使用自定义镜像创建训练作业时,需要您熟悉docker软件的使用,并具备一定的开发经验。详细步骤如下所示:

  1. step1 创建obs桶和文件夹
  2. step2 准备脚本文件并上传至obs中
  3. step3 制作自定义镜像
  4. step4 上传镜像至swr
  5. step5 在modelarts上创建notebook并调试
  6. step6 在modelarts上创建训练作业

约束限制

  • 由于案例中需要下载商用版cann,因此本案例仅面向有下载权限的渠道用户,非渠道用户建议参考其他自定义镜像制作教程。
  • mindspore版本与cann版本,cann版本与ascend驱动/固件版本均有严格的匹配关系,版本不匹配会导致训练失败。

前提条件

已注册华为账号并开通华为云,且在使用modelarts前检查账号状态,账号不能处于欠费或冻结状态。

step1 创建obs桶和文件夹

在obs服务中创建桶和文件夹,用于存放样例数据集以及训练代码。如下示例中,请创建命名为“test-modelarts”的桶,并创建如表1所示的文件夹。

创建obs桶和文件夹的操作指导请参见创建桶新建文件夹

请确保您使用的obs与modelarts在同一区域。

表1 obs桶文件夹列表

文件夹名称

用途

obs://test-modelarts/ascend/demo-code/

用于存储ascend训练脚本文件。

obs://test-modelarts/ascend/demo-code/run_ascend/

用于存储ascend训练脚本的启动脚本。

obs://test-modelarts/ascend/log/

用于存储训练日志文件。

step2 准备脚本文件并上传至obs中

  1. 准备本案例所需训练脚本mindspore-verification.py文件和ascend的启动脚本文件(共5个)。

    mindspore-verification.py和run_ascend.py脚本文件在创建训练作业时的“启动命令”参数中调用,具体请参见启动命令

    run_ascend.py脚本运行时会调用common.py、rank_table.py、manager.py、fmk.py脚本。

  2. 上传训练脚本mindspore-verification.py文件至obs桶的“obs://test-modelarts/ascend/demo-code/”文件夹下。
  3. 上传ascend的启动脚本文件(共5个)至obs桶的“obs://test-modelarts/ascend/demo-code/run_ascend/”文件夹下。

step3 制作自定义镜像

此处介绍如何通过编写dockerfile文件制作自定义镜像的操作步骤。

目标:构建安装好如下软件的容器镜像,并使用modelarts训练服务运行。

  • ubuntu-18.04
  • cann-6.3.rc2(商用版本)
  • python-3.7.13
  • mindspore-2.1.1

mindspore版本与cann版本,cann版本和ascend驱动/固件版本均有严格的匹配关系,版本不匹配会导致训练失败。

本示例仅用于示意ascend容器镜像制作流程,且在匹配正确的ascend驱动/固件版本的专属资源池上运行通过。

  1. 准备一台linux aarch64架构的主机,操作系统使用ubuntu-18.04。您可以准备相同规格的弹性云服务器ecs或者应用本地已有的主机进行自定义镜像的制作。

    购买ecs服务器的具体操作请参考购买并登录linux弹性云服务器“cpu架构”选择“x86计算”“镜像”选择“公共镜像”,推荐使用ubuntu18.04的镜像。

  2. 安装docker。

    以linux aarch64架构的操作系统为例,获取docker安装包。您可以使用以下指令安装docker。关于安装docker的更多指导内容参见。

    curl -fssl get.docker.com -o get-docker.sh
    sh get-docker.sh

    如果docker images命令可以执行成功,表示docker已安装,此步骤可跳过。

    启动docker。
    systemctl start docker 
  3. 确认docker engine版本。执行如下命令。
    docker version | grep -a 1 engine
    命令回显如下。
     engine:
      version:          18.09.0

    推荐使用大于等于该版本的docker engine来制作自定义镜像。

  4. 准备名为context的文件夹。
    mkdir -p context
  5. 准备可用的pip源文件pip.conf。本示例使用华为开源镜像站提供的pip源,其pip.conf文件内容如下。
    [global]
    index-url = https://repo.huaweicloud.com/repository/pypi/simple
    trusted-host = repo.huaweicloud.com
    timeout = 120

    在华为开源镜像站https://mirrors.huaweicloud.com/home中,搜索pypi,可以查看pip.conf文件内容。

  6. 准备可用的apt源文件ubuntu-ports-bionic.list。本示例使用华为开源镜像站提供的apt源,执行如下命令获取apt源文件。
    wget -o ubuntu-ports-bionic.list https://repo.huaweicloud.com/repository/conf/ubuntu-ports-bionic.list

    在华为开源镜像站https://mirrors.huaweicloud.com/home中,搜索ubuntu-ports,可以查看获取apt源文件的命令。

  7. 下载cann 6.3.rc2-linux aarch64与mindspore-2.1.1-cp37-cp37m-linux_aarch64.whl安装文件。
    • 下载run文件“ascend-cann-nnae_6.3.rc2_linux-aarch64.run”()。
    • 下载whl文件“mindspore-2.1.1-cp37-cp37m-linux_aarch64.whl”()。

    modelarts当前仅支持cann商用版本,不支持社区版。

  8. 下载miniconda3安装文件。

    使用地址https://repo.anaconda.com/miniconda/miniconda3-py37_4.10.3-linux-aarch64.sh,下载miniconda3-py37-4.10.3安装文件(对应python 3.7.10)。

  9. 将上述pip源文件、*.run文件、*.whl文件、miniconda3安装文件放置在context文件夹内,context文件夹内容如下。
    context
    ├── ascend-cann-nnae_6.3.rc2_linux-aarch64.run
    ├── mindspore-2.1.1-cp37-cp37m-linux_aarch64.whl
    ├── miniconda3-py37_4.10.3-linux-aarch64.sh
    ├── pip.conf
    └── ubuntu-ports-bionic.list
  10. 编写容器镜像dockerfile文件。
    在context文件夹内新建名为dockerfile的空文件,并将下述内容写入其中。
    # 容器镜像构建主机需要连通公网
    from arm64v8/ubuntu:18.04 as builder
    # 基础容器镜像的默认用户已经是 root
    # user root
    # 安装 os 依赖(使用华为开源镜像站)
    copy ubuntu-ports-bionic.list /tmp
    run cp -a /etc/apt/sources.list /etc/apt/sources.list.bak && \
        mv /tmp/ubuntu-ports-bionic.list /etc/apt/sources.list && \
        echo > /etc/apt/apt.conf.d/00skip-verify-peer.conf "acquire { https::verify-peer false }" && \
        apt-get update && \
        apt-get install -y \
        # utils
        ca-certificates vim curl \
        # cann 6.3.rc2
        gcc-7 g   make cmake zlib1g zlib1g-dev openssl libsqlite3-dev libssl-dev libffi-dev unzip pciutils net-tools libblas-dev gfortran libblas3 && \
        apt-get clean && \
        mv /etc/apt/sources.list.bak /etc/apt/sources.list && \
        # 修改 cann 6.3.rc2 安装目录的父目录权限,使得 ma-user 可以写入
        chmod o w /usr/local
    run useradd -m -d /home/ma-user -s /bin/bash -g 100 -u 1000 ma-user
    # 设置容器镜像默认用户与工作目录
    user ma-user
    workdir /home/ma-user
    # 使用华为开源镜像站提供的 pypi 配置
    run mkdir -p /home/ma-user/.pip/
    copy --chown=ma-user:100 pip.conf /home/ma-user/.pip/pip.conf
    # 复制待安装文件到基础容器镜像中的 /tmp 目录
    copy --chown=ma-user:100 miniconda3-py37_4.10.3-linux-aarch64.sh /tmp
    # https://conda.io/projects/conda/en/latest/user-guide/install/linux.html#installing-on-linux
    # 安装 miniconda3 到基础容器镜像的 /home/ma-user/miniconda3 目录中
    run bash /tmp/miniconda3-py37_4.10.3-linux-aarch64.sh -b -p /home/ma-user/miniconda3
    env path=$path:/home/ma-user/miniconda3/bin
    # 安装 cann 6.3.rc2 python package 依赖
    run pip install numpy~=1.14.3 decorator~=4.4.0 sympy~=1.4 cffi~=1.12.3 protobuf~=3.11.3 \
        attrs pyyaml pathlib2 scipy requests psutil absl-py
    # 安装 cann 6.3.rc2 至 /usr/local/ascend 目录
    copy --chown=ma-user:100 ascend-cann-nnae_6.3.rc2_linux-aarch64.run /tmp
    run chmod  x /tmp/ascend-cann-nnae_6.3.rc2_linux-aarch64.run && \
        /tmp/ascend-cann-nnae_6.3.rc2_linux-aarch64.run --install --install-path=/usr/local/ascend
    # 安装 mindspore 2.1.1
    copy --chown=ma-user:100 mindspore-2.1.1-cp37-cp37m-linux_aarch64.whl /tmp
    run chmod  x /tmp/mindspore-2.1.1-cp37-cp37m-linux_aarch64.whl && \
        pip install /tmp/mindspore-2.1.1-cp37-cp37m-linux_aarch64.whl
    # 构建最终容器镜像
    from arm64v8/ubuntu:18.04
    # 安装 os 依赖(使用华为开源镜像站)
    copy ubuntu-ports-bionic.list /tmp
    run cp -a /etc/apt/sources.list /etc/apt/sources.list.bak && \
        mv /tmp/ubuntu-ports-bionic.list /etc/apt/sources.list && \
        echo > /etc/apt/apt.conf.d/00skip-verify-peer.conf "acquire { https::verify-peer false }" && \
        apt-get update && \
        apt-get install -y \
        # utils
        ca-certificates vim curl \
        # cann 6.3.rc2
        gcc-7 g   make cmake zlib1g zlib1g-dev openssl libsqlite3-dev libssl-dev libffi-dev unzip pciutils net-tools libblas-dev gfortran libblas3 && \
        apt-get clean && \
        mv /etc/apt/sources.list.bak /etc/apt/sources.list
    run useradd -m -d /home/ma-user -s /bin/bash -g 100 -u 1000 ma-user
    # 从上述 builder stage 中复制目录到当前容器镜像的同名目录
    copy --chown=ma-user:100 --from=builder /home/ma-user/miniconda3 /home/ma-user/miniconda3
    copy --chown=ma-user:100 --from=builder /home/ma-user/ascend /home/ma-user/ascend
    copy --chown=ma-user:100 --from=builder /home/ma-user/var /home/ma-user/var
    copy --chown=ma-user:100 --from=builder /usr/local/ascend /usr/local/ascend
    # 设置容器镜像预置环境变量
    # 请务必设置 cann 相关环境变量
    # 请务必设置 ascend driver 相关环境变量
    # 请务必设置 pythonunbuffered=1, 以免日志丢失
    env path=$path:/usr/local/ascend/nnae/latest/bin:/usr/local/ascend/nnae/latest/compiler/ccec_compiler/bin:/home/ma-user/miniconda3/bin \
        ld_library_path=$ld_library_path:/usr/local/ascend/driver/lib64:/usr/local/ascend/driver/lib64/common:/usr/local/ascend/driver/lib64/driver:/usr/local/ascend/nnae/latest/lib64:/usr/local/ascend/nnae/latest/lib64/plugin/opskernel:/usr/local/ascend/nnae/latest/lib64/plugin/nnengine \
        pythonpath=$pythonpath:/usr/local/ascend/nnae/latest/python/site-packages:/usr/local/ascend/nnae/latest/opp/built-in/op_impl/ai_core/tbe \
        ascend_aicpu_path=/usr/local/ascend/nnae/latest \
        ascend_opp_path=/usr/local/ascend/nnae/latest/opp \
        ascend_home_path=/usr/local/ascend/nnae/latest \
        pythonunbuffered=1
    # 设置容器镜像默认用户与工作目录
    user ma-user
    workdir /home/ma-user

    关于dockerfile文件编写的更多指导内容参见。

  11. 确认已创建完成dockerfile文件。此时context文件夹内容如下。
    context
    ├── ascend-cann-nnae_6.3.rc2_linux-aarch64.run
    ├── dockerfile
    ├── mindspore-2.1.1-cp37-cp37m-linux_aarch64.whl
    ├── miniconda3-py37_4.10.3-linux-aarch64.sh
    ├── pip.conf
    └── ubuntu-ports-bionic.list
  12. 构建容器镜像。在dockerfile文件所在的目录执行如下命令构建容器镜像。
    1
    dockerbuild.-tmindspore:2.1.1-cann6.3.rc2
    
    构建过程结束时出现如下构建日志说明镜像构建成功。
    successfully tagged mindspore:2.1.1-cann6.3.rc2
  13. 将制作完成的镜像上传至swr服务,具体参见step4 上传镜像至swr

step4 上传镜像至swr

本章节介绍如何将制作好的镜像上传至swr服务,方便后续在modelarts上创建训练作业时调用。

  1. 登录容器镜像服务控制台,选择区域,要和modelarts区域保持一致,否则无法选择到镜像。
  2. 单击右上角“创建组织”,输入组织名称完成组织创建。请自定义组织名称,本示例使用“deep-learning”,下面的命令中涉及到组织名称“deep-learning”也请替换为自定义的值。
  3. 单击右上角“登录指令”,获取登录访问指令,本文选择复制临时登录指令。
  4. 以root用户登录本地环境,输入复制的swr临时登录指令。
  5. 上传镜像至容器镜像服务镜像仓库。
    1. 使用docker tag命令给上传镜像打标签。
      #region和domain信息请替换为实际值,组织名称deep-learning也请替换为自定义的值。
      sudo docker tag mindspore:2.1.1-cann6.3.rc2 swr.{region}.{domain}/deep-learning/mindspore:2.1.1-cann6.3.rc2
      #以华为云北京四为例:
      sudo docker tag mindspore:2.1.1-cann6.3.rc2 swr.cn-north-4.myhuaweicloud.com/deep-learning/mindspore:2.1.1-cann6.3.rc2
    2. 使用docker push命令上传镜像。
      #region和domain信息请替换为实际值,组织名称deep-learning也请替换为自定义的值。
      sudo docker push swr.{region}.{domain}/deep-learning/mindspore:2.1.1-cann6.3.rc2
      #以华为云北京四为例:
      sudo docker push swr.cn-north-4.myhuaweicloud.com/deep-learning/mindspore:2.1.1-cann6.3.rc2
  6. 完成镜像上传后,在“容器镜像服务控制台>我的镜像”页面可查看已上传的自定义镜像。

    “swr.cn-north-4.myhuaweicloud.com/deep-learning/mindspore:2.1.1-cann6.3.rc2”即为此自定义镜像的“swr_url”

step5 在modelarts上创建notebook并调试

  1. 将上传到swr上的镜像注册到modelarts的镜像管理中。

    登录modelarts管理控制台,在左侧导航栏中选择“镜像管理 ”,单击“注册镜像”,根据界面提示注册镜像。注册后的镜像可以用于创建notebook。

  2. 在notebook中使用自定义镜像创建notebook并调试,调试成功后,保存镜像。
    1. 在notebook中使用自定义镜像创建notebook操作请参见基于自定义镜像创建notebook实例
    2. 保存notebook镜像操作请参见保存notebook镜像环境
  3. 已有的镜像调试成功后,再使用modelarts训练模块训练作业

step6 在modelarts上创建训练作业

  1. 登录modelarts管理控制台,在左侧导航栏中选择“模型训练 > 训练作业”,默认进入“训练作业”列表。
  2. “创建训练作业”页面,填写相关参数信息,然后单击“提交”
    • 创建方式:选择“自定义算法”
    • 启动方式:选择“自定义”
    • 镜像地址:“swr.cn-north-4.myhuaweicloud.com/deep-learning/mindspore:2.1.1-cann6.3.rc2”
    • 代码目录:设置为obs中存放启动脚本文件的目录,例如:“obs://test-modelarts/ascend/demo-code/”
    • 启动命令:“python ${ma_job_dir}/demo-code/run_ascend/run_ascend.py python ${ma_job_dir}/demo-code/mindspore-verification.py”
    • 资源池:选择专属资源池
    • 类型:选择驱动/固件版本匹配的专属资源池ascend规格。
    • 作业日志路径:设置为obs中存放训练日志的路径。例如:“obs://test-modelarts/ascend/log/”
  3. “规格确认”页面,确认训练作业的参数信息,确认无误后单击“提交”
  4. 训练作业创建完成后,后台将自动完成容器镜像下载、代码目录下载、执行启动命令等动作。

    训练作业一般需要运行一段时间,根据您的训练业务逻辑和选择的资源不同,训练时长将持续几十分钟到几小时不等。训练作业执行成功后,日志信息如图1所示。

    图1 专属资源池ascend规格运行日志信息

训练mindspore-verification.py文件

mindspore-verification.py文件内容如下:

import os
import numpy as np
from mindspore import tensor
import mindspore.ops as ops
import mindspore.context as context
print('ascend envs')
print('------')
print('job_id: ', os.environ['job_id'])
print('rank_table_file: ', os.environ['rank_table_file'])
print('rank_size: ', os.environ['rank_size'])
print('ascend_device_id: ', os.environ['ascend_device_id'])
print('device_id: ', os.environ['device_id'])
print('rank_id: ', os.environ['rank_id'])
print('------')
context.set_context(device_target="ascend")
x = tensor(np.ones([1,3,3,4]).astype(np.float32))
y = tensor(np.ones([1,3,3,4]).astype(np.float32))
print(ops.add(x, y))

ascend的启动脚本文件

  • run_ascend.py
    import sys
    import os
    from common import runascendlog
    from common import ranktableenv
    from rank_table import ranktable, ranktabletemplate1, ranktabletemplate2
    from manager import fmkmanager
    if __name__ == '__main__':
        log = runascendlog.setup_run_ascend_logger()
        if len(sys.argv) <= 1:
            log.error('there are not enough args')
            sys.exit(1)
        train_command = sys.argv[1:]
        log.info('training command')
        log.info(train_command)
        if os.environ.get(ranktableenv.rank_table_file_v1) is not none:
            # new format rank table file
            rank_table_path = os.environ.get(ranktableenv.rank_table_file_v1)
            ranktable.wait_for_available(rank_table_path)
            rank_table = ranktabletemplate1(rank_table_path)
        else:
            # old format rank table file
            rank_table_path_origin = ranktableenv.get_rank_table_template2_file_path()
            ranktable.wait_for_available(rank_table_path_origin)
            rank_table = ranktabletemplate2(rank_table_path_origin)
        if rank_table.get_device_num() >= 1:
            log.info('set rank table %s env to %s' % (ranktableenv.rank_table_file, rank_table.get_rank_table_path()))
            ranktableenv.set_rank_table_env(rank_table.get_rank_table_path())
        else:
            log.info('device num < 1, unset rank table %s env' % ranktableenv.rank_table_file)
            ranktableenv.unset_rank_table_env()
        instance = rank_table.get_current_instance()
        server = rank_table.get_server(instance.server_id)
        current_instance = ranktable.convert_server_to_instance(server)
        fmk_manager = fmkmanager(current_instance)
        fmk_manager.run(rank_table.get_device_num(), train_command)
        return_code = fmk_manager.monitor()
        fmk_manager.destroy()
        sys.exit(return_code)
    
  • common.py
    import logging
    import os
    logo = 'training'
    # rank table constants
    class ranktableenv:
        rank_table_file = 'rank_table_file'
        rank_table_file_v1 = 'rank_table_file_v_1_0'
        hccl_connect_timeout = 'hccl_connect_timeout'
        # jobstart_hccl.json is provided by the volcano controller of cloud-container-engine(cce)
        hccl_json_file_name = 'jobstart_hccl.json'
        rank_table_file_default_value = '/user/config/%s' % hccl_json_file_name
        @staticmethod
        def get_rank_table_template1_file_dir():
            parent_dir = os.environ[modelarts.ma_mount_path_env]
            return os.path.join(parent_dir, 'rank_table')
        @staticmethod
        def get_rank_table_template2_file_path():
            rank_table_file_path = os.environ.get(ranktableenv.rank_table_file)
            if rank_table_file_path is none:
                return ranktableenv.rank_table_file_default_value
            return os.path.join(os.path.normpath(rank_table_file_path), ranktableenv.hccl_json_file_name)
        @staticmethod
        def set_rank_table_env(path):
            os.environ[ranktableenv.rank_table_file] = path
        @staticmethod
        def unset_rank_table_env():
            del os.environ[ranktableenv.rank_table_file]
    class modelarts:
        ma_mount_path_env = 'ma_mount_path'
        ma_current_instance_name_env = 'ma_current_instance_name'
        ma_vj_name = 'ma_vj_name'
        ma_current_host_ip = 'ma_current_host_ip'
        cache_dir = '/cache'
        tmp_log_dir = '/tmp/log/'
        fmk_workspace = 'workspace'
        @staticmethod
        def get_current_instance_name():
            return os.environ[modelarts.ma_current_instance_name_env]
        @staticmethod
        def get_current_host_ip():
            return os.environ.get(modelarts.ma_current_host_ip)
        @staticmethod
        def get_job_id():
            ma_vj_name = os.environ[modelarts.ma_vj_name]
            return ma_vj_name.replace('ma-job', 'modelarts-job', 1)
        @staticmethod
        def get_parent_working_dir():
            if modelarts.ma_mount_path_env in os.environ:
                return os.path.join(os.environ.get(modelarts.ma_mount_path_env), modelarts.fmk_workspace)
            return modelarts.cache_dir
    class runascendlog:
        @staticmethod
        def setup_run_ascend_logger():
            name = logo
            formatter = logging.formatter(fmt='[run ascend] %(asctime)s - %(levelname)s - %(message)s')
            handler = logging.streamhandler()
            handler.setformatter(formatter)
            logger = logging.getlogger(name)
            logger.setlevel(logging.info)
            logger.addhandler(handler)
            logger.propagate = false
            return logger
        @staticmethod
        def get_run_ascend_logger():
            return logging.getlogger(logo)
    
  • rank_table.py
    import json
    import time
    import os
    from common import modelarts
    from common import runascendlog
    from common import ranktableenv
    log = runascendlog.get_run_ascend_logger()
    class device:
        def __init__(self, device_id, device_ip, rank_id):
            self.device_id = device_id
            self.device_ip = device_ip
            self.rank_id = rank_id
    class instance:
        def __init__(self, pod_name, server_id, devices):
            self.pod_name = pod_name
            self.server_id = server_id
            self.devices = self.parse_devices(devices)
        @staticmethod
        def parse_devices(devices):
            if devices is none:
                return []
            device_object_list = []
            for device in devices:
                device_object_list.append(device(device['device_id'], device['device_ip'], ''))
            return device_object_list
        def set_devices(self, devices):
            self.devices = devices
    class group:
        def __init__(self, group_name, device_count, instance_count, instance_list):
            self.group_name = group_name
            self.device_count = int(device_count)
            self.instance_count = int(instance_count)
            self.instance_list = self.parse_instance_list(instance_list)
        @staticmethod
        def parse_instance_list(instance_list):
            instance_object_list = []
            for instance in instance_list:
                instance_object_list.append(
                    instance(instance['pod_name'], instance['server_id'], instance['devices']))
            return instance_object_list
    class ranktable:
        status_field = 'status'
        completed_status = 'completed'
        def __init__(self):
            self.rank_table_path = ""
            self.rank_table = {}
        @staticmethod
        def read_from_file(file_path):
            with open(file_path) as json_file:
                return json.load(json_file)
        @staticmethod
        def wait_for_available(rank_table_file, period=1):
            log.info('wait for rank table file at %s ready' % rank_table_file)
            complete_flag = false
            while not complete_flag:
                with open(rank_table_file) as json_file:
                    data = json.load(json_file)
                    if data[ranktable.status_field] == ranktable.completed_status:
                        log.info('rank table file is ready for read')
                        log.info('\n'   json.dumps(data, indent=4))
                        return true
                time.sleep(period)
            return false
        @staticmethod
        def convert_server_to_instance(server):
            device_list = []
            for device in server['device']:
                device_list.append(
                    device(device_id=device['device_id'], device_ip=device['device_ip'], rank_id=device['rank_id']))
            ins = instance(pod_name='', server_id=server['server_id'], devices=[])
            ins.set_devices(device_list)
            return ins
        def get_rank_table_path(self):
            return self.rank_table_path
        def get_server(self, server_id):
            for server in self.rank_table['server_list']:
                if server['server_id'] == server_id:
                    log.info('current server')
                    log.info('\n'   json.dumps(server, indent=4))
                    return server
            log.error('server [%s] is not found' % server_id)
            return none
    class ranktabletemplate2(ranktable):
        def __init__(self, rank_table_template2_path):
            super().__init__()
            json_data = self.read_from_file(file_path=rank_table_template2_path)
            self.status = json_data[ranktabletemplate2.status_field]
            if self.status != ranktabletemplate2.completed_status:
                return
            # sorted instance list by the index of instance
            # assert there is only one group
            json_data["group_list"][0]["instance_list"] = sorted(json_data["group_list"][0]["instance_list"],
                                                                 key=ranktabletemplate2.get_index)
            self.group_count = int(json_data['group_count'])
            self.group_list = self.parse_group_list(json_data['group_list'])
            self.rank_table_path, self.rank_table = self.convert_template2_to_template1_format_file()
        @staticmethod
        def parse_group_list(group_list):
            group_object_list = []
            for group in group_list:
                group_object_list.append(
                    group(group['group_name'], group['device_count'], group['instance_count'], group['instance_list']))
            return group_object_list
        @staticmethod
        def get_index(instance):
            # pod_name example: job94dc1dbf-job-bj4-yolov4-15
            pod_name = instance["pod_name"]
            return int(pod_name[pod_name.rfind("-")   1:])
        def get_current_instance(self):
            """
            get instance by pod name
            specially, return the first instance when the pod name is none
            :return:
            """
            pod_name = modelarts.get_current_instance_name()
            if pod_name is none:
                if len(self.group_list) > 0:
                    if len(self.group_list[0].instance_list) > 0:
                        return self.group_list[0].instance_list[0]
                return none
            for group in self.group_list:
                for instance in group.instance_list:
                    if instance.pod_name == pod_name:
                        return instance
            return none
        def convert_template2_to_template1_format_file(self):
            rank_table_template1_file = {
                'status': 'completed',
                'version': '1.0',
                'server_count': '0',
                'server_list': []
            }
            logic_index = 0
            server_map = {}
            # collect all devices in all groups
            for group in self.group_list:
                if group.device_count == 0:
                    continue
                for instance in group.instance_list:
                    if instance.server_id not in server_map:
                        server_map[instance.server_id] = []
                    for device in instance.devices:
                        template1_device = {
                            'device_id': device.device_id,
                            'device_ip': device.device_ip,
                            'rank_id': str(logic_index)
                        }
                        logic_index  = 1
                        server_map[instance.server_id].append(template1_device)
            server_count = 0
            for server_id in server_map:
                rank_table_template1_file['server_list'].append({
                    'server_id': server_id,
                    'device': server_map[server_id]
                })
                server_count  = 1
            rank_table_template1_file['server_count'] = str(server_count)
            log.info('rank table file (template1)')
            log.info('\n'   json.dumps(rank_table_template1_file, indent=4))
            if not os.path.exists(ranktableenv.get_rank_table_template1_file_dir()):
                os.makedirs(ranktableenv.get_rank_table_template1_file_dir())
            path = os.path.join(ranktableenv.get_rank_table_template1_file_dir(), ranktableenv.hccl_json_file_name)
            with open(path, 'w') as f:
                f.write(json.dumps(rank_table_template1_file))
                log.info('rank table file (template1) is generated at %s', path)
            return path, rank_table_template1_file
        def get_device_num(self):
            total_device_num = 0
            for group in self.group_list:
                total_device_num  = group.device_count
            return total_device_num
    class ranktabletemplate1(ranktable):
        def __init__(self, rank_table_template1_path):
            super().__init__()
            self.rank_table_path = rank_table_template1_path
            self.rank_table = self.read_from_file(file_path=rank_table_template1_path)
        def get_current_instance(self):
            current_server = none
            server_list = self.rank_table['server_list']
            if len(server_list) == 1:
                current_server = server_list[0]
            elif len(server_list) > 1:
                host_ip = modelarts.get_current_host_ip()
                if host_ip is not none:
                    for server in server_list:
                        if server['server_id'] == host_ip:
                            current_server = server
                            break
                else:
                    current_server = server_list[0]
            if current_server is none:
                log.error('server is not found')
                return none
            return self.convert_server_to_instance(current_server)
        def get_device_num(self):
            server_list = self.rank_table['server_list']
            device_num = 0
            for server in server_list:
                device_num  = len(server['device'])
            return device_num
    
  • manager.py
    import time
    import os
    import os.path
    import signal
    from common import runascendlog
    from fmk import fmk
    log = runascendlog.get_run_ascend_logger()
    class fmkmanager:
        # max destroy time: ~20 (15   5)
        # ~ 15 (1   2   4   8)
        max_test_proc_cnt = 4
        def __init__(self, instance):
            self.instance = instance
            self.fmk = []
            self.fmk_processes = []
            self.get_sigterm = false
            self.max_test_proc_cnt = fmkmanager.max_test_proc_cnt
        # break the monitor and destroy processes when get terminate signal
        def term_handle(func):
            def receive_term(signum, stack):
                log.info('received terminate signal %d, try to destroyed all processes' % signum)
                stack.f_locals['self'].get_sigterm = true
            def handle_func(self, *args, **kwargs):
                origin_handle = signal.getsignal(signal.sigterm)
                signal.signal(signal.sigterm, receive_term)
                res = func(self, *args, **kwargs)
                signal.signal(signal.sigterm, origin_handle)
                return res
            return handle_func
        def run(self, rank_size, command):
            for index, device in enumerate(self.instance.devices):
                fmk_instance = fmk(index, device)
                self.fmk.append(fmk_instance)
                self.fmk_processes.append(fmk_instance.run(rank_size, command))
        @term_handle
        def monitor(self, period=1):
            # busy waiting for all fmk processes exit by zero
            # or there is one process exit by non-zero
            fmk_cnt = len(self.fmk_processes)
            zero_ret_cnt = 0
            while zero_ret_cnt != fmk_cnt:
                zero_ret_cnt = 0
                for index in range(fmk_cnt):
                    fmk = self.fmk[index]
                    fmk_process = self.fmk_processes[index]
                    if fmk_process.poll() is not none:
                        if fmk_process.returncode != 0:
                            log.error('proc-rank-%s-device-%s (pid: %d) has exited with non-zero code: %d'
                                      % (fmk.rank_id, fmk.device_id, fmk_process.pid, fmk_process.returncode))
                            return fmk_process.returncode
                        zero_ret_cnt  = 1
                if self.get_sigterm:
                    break
                time.sleep(period)
            return 0
        def destroy(self, base_period=1):
            log.info('begin destroy training processes')
            self.send_sigterm_to_fmk_process()
            self.wait_fmk_process_end(base_period)
            log.info('end destroy training processes')
        def send_sigterm_to_fmk_process(self):
            # send sigterm to fmk processes (and process group)
            for r_index in range(len(self.fmk_processes) - 1, -1, -1):
                fmk = self.fmk[r_index]
                fmk_process = self.fmk_processes[r_index]
                if fmk_process.poll() is not none:
                    log.info('proc-rank-%s-device-%s (pid: %d) has exited before receiving the term signal',
                             fmk.rank_id, fmk.device_id, fmk_process.pid)
                    del self.fmk_processes[r_index]
                    del self.fmk[r_index]
                try:
                    os.killpg(fmk_process.pid, signal.sigterm)
                except processlookuperror:
                    pass
        def wait_fmk_process_end(self, base_period):
            test_cnt = 0
            period = base_period
            while len(self.fmk_processes) > 0 and test_cnt < self.max_test_proc_cnt:
                for r_index in range(len(self.fmk_processes) - 1, -1, -1):
                    fmk = self.fmk[r_index]
                    fmk_process = self.fmk_processes[r_index]
                    if fmk_process.poll() is not none:
                        log.info('proc-rank-%s-device-%s (pid: %d) has exited',
                                 fmk.rank_id, fmk.device_id, fmk_process.pid)
                        del self.fmk_processes[r_index]
                        del self.fmk[r_index]
                if not self.fmk_processes:
                    break
                time.sleep(period)
                period *= 2
                test_cnt  = 1
            if len(self.fmk_processes) > 0:
                for r_index in range(len(self.fmk_processes) - 1, -1, -1):
                    fmk = self.fmk[r_index]
                    fmk_process = self.fmk_processes[r_index]
                    if fmk_process.poll() is none:
                        log.warn('proc-rank-%s-device-%s (pid: %d) has not exited within the max waiting time, '
                                 'send kill signal',
                                 fmk.rank_id, fmk.device_id, fmk_process.pid)
                        os.killpg(fmk_process.pid, signal.sigkill)
    
  • fmk.py
    import os
    import subprocess
    import pathlib
    from contextlib import contextmanager
    from common import runascendlog
    from common import ranktableenv
    from common import modelarts
    log = runascendlog.get_run_ascend_logger()
    class fmk:
        def __init__(self, index, device):
            self.job_id = modelarts.get_job_id()
            self.rank_id = device.rank_id
            self.device_id = str(index)
        def gen_env_for_fmk(self, rank_size):
            current_envs = os.environ.copy()
            current_envs['job_id'] = self.job_id
            current_envs['ascend_device_id'] = self.device_id
            current_envs['device_id'] = self.device_id
            current_envs['rank_id'] = self.rank_id
            current_envs['rank_size'] = str(rank_size)
            fmk.set_env_if_not_exist(current_envs, ranktableenv.hccl_connect_timeout, str(1800))
            log_dir = fmk.get_log_dir()
            process_log_path = os.path.join(log_dir, self.job_id, 'ascend', 'process_log', 'rank_'   self.rank_id)
            fmk.set_env_if_not_exist(current_envs, 'ascend_process_log_path', process_log_path)
            pathlib.path(current_envs['ascend_process_log_path']).mkdir(parents=true, exist_ok=true)
            return current_envs
        @contextmanager
        def switch_directory(self, directory):
            owd = os.getcwd()
            try:
                os.chdir(directory)
                yield directory
            finally:
                os.chdir(owd)
        def get_working_dir(self):
            fmk_workspace_prefix = modelarts.get_parent_working_dir()
            return os.path.join(os.path.normpath(fmk_workspace_prefix), 'device%s' % self.device_id)
        @staticmethod
        def get_log_dir():
            parent_path = os.getenv(modelarts.ma_mount_path_env)
            if parent_path:
                log_path = os.path.join(parent_path, 'log')
                if os.path.exists(log_path):
                    return log_path
            return modelarts.tmp_log_dir
        @staticmethod
        def set_env_if_not_exist(envs, env_name, env_value):
            if env_name in os.environ:
                log.info('env already exists. env_name: %s, env_value: %s ' % (env_name, env_value))
                return
            envs[env_name] = env_value
        def run(self, rank_size, command):
            envs = self.gen_env_for_fmk(rank_size)
            log.info('bootstrap proc-rank-%s-device-%s' % (self.rank_id, self.device_id))
            log_dir = fmk.get_log_dir()
            if not os.path.exists(log_dir):
                os.makedirs(log_dir)
            log_file = '%s-proc-rank-%s-device-%s.txt' % (self.job_id, self.rank_id, self.device_id)
            log_file_path = os.path.join(log_dir, log_file)
            working_dir = self.get_working_dir()
            if not os.path.exists(working_dir):
                os.makedirs(working_dir)
            with self.switch_directory(working_dir):
                # os.setsid: change the process(forked) group id to itself
                training_proc = subprocess.popen(command, env=envs, preexec_fn=os.setsid,
                                                 stdout=subprocess.pipe, stderr=subprocess.stdout)
                log.info('proc-rank-%s-device-%s (pid: %d)', self.rank_id, self.device_id, training_proc.pid)
                # https://docs.python.org/3/library/subprocess.html#subprocess.popen.wait
                subprocess.popen(['tee', log_file_path], stdin=training_proc.stdout)
                return training_proc
    

相关文档

网站地图