多 GPU 工作负荷

重要

此功能在 Beta 版中。 工作区管理员可以从 预览 页控制对此功能的访问。 请参阅 Manage Azure Databricks 预览版

可以使用 无服务器 GPU Python API 在单个节点上跨多个 GPU 启动分布式工作负荷。 API 提供了一个简单的统一接口,用于抽象化 GPU 预配、环境设置和工作负载分发的详细信息。 通过最少的代码更改,可以从同一笔记本无缝地从单 GPU 训练迁移到多 GPU 分布式执行。

注释

分布式训练需要 8xH100 加速器,该加速器预配具有 8 个 GPU 的单个节点。 使用 @distributed 修饰器时,请设置 gpus=8。 该 gpu_type 参数是可选的,并且从笔记本连接到的加速器中自动检测到该参数。

支持的框架

API @distributed 与主要的分布式训练库集成:

  • PyTorch 分布式数据并行(DDP):标准多 GPU 数据并行度。
  • 完全分片数据并行(FSDP):面向大型模型的节省内存训练。
  • DeepSpeed:Microsoft用于大型模型训练的优化库。

serverless_gpu API 与 TorchDistributor

下表将 API 与 serverless_gpu 进行比较@distributed

功能 serverless_gpu @distributed API 火炬分配器
基础结构 完全无服务器,无群集管理 需要具有 GPU 工作节点的 Spark 群集
Setup 单个修饰器,最小配置 需要 Spark 群集和 TorchDistributor 设置
框架支持 PyTorch DDP、FSDP、DeepSpeed 主要是 PyTorch DDP
数据加载 在修饰器中,使用 Unity 目录卷(UCVolumeDataset 用于流式处理文件数据) 通过 Spark 或文件系统

Databricks 上的新深度学习工作负载推荐使用serverless_gpuAPI。 TorchDistributor 仍可用于与 Spark 群集紧密耦合的工作负载。

快速入门

当在 Databricks 笔记本和作业中连接到无服务器 GPU 时,用于分布式训练的无服务器 GPU API 会被预安装。 建议 使用 GPU 环境 4 及更高版本。 若要将其用于分布式训练,请导入并使用 distributed 修饰器来分配训练函数。

将模型训练代码包装在函数中,并使用修饰器修饰函数 @distributed 。 修饰的函数将成为分布式执行的入口点,因此应在此函数内定义所有训练逻辑、数据加载和模型初始化。

若要启动分布式执行,请使用 train_function.distributed() 调用经装饰的函数。 每次调用都会自动创建一个 MLflow 实验运行;如果当前已有活动运行,则会创建一个嵌套的子运行。

警告

如果在 gpu_type 中设置 @distributed,请确保它与笔记本实例所连接的加速器类型匹配("H100""A10")。 指定错误的加速器类型将导致工作负荷失败。

下面的代码片段显示了@distributed的基本用法:

from serverless_gpu import distributed

# Decorate your training function with @distributed and specify the number of GPUs.
# gpu_type='H100' is optional and will be auto-detected if not set.
@distributed(gpus=8, gpu_type='H100')
def run_train():
    ...

run_train.distributed()

下面是从笔记本中训练 8 H100 GPU 上的多层感知器(MLP)模型的完整示例:

  1. 设置模型并定义实用工具函数。

    
    # Define the model
    import os
    import torch
    import torch.distributed as dist
    import torch.nn as nn
    
    def setup():
        torch.cuda.set_device(int(os.environ["LOCAL_RANK"]))
        dist.init_process_group("nccl")
    
    def cleanup():
        dist.destroy_process_group()
    
    class SimpleMLP(nn.Module):
        def __init__(self, input_dim=10, hidden_dim=64, output_dim=1):
            super().__init__()
            self.net = nn.Sequential(
                nn.Linear(input_dim, hidden_dim),
                nn.ReLU(),
                nn.Dropout(0.2),
                nn.Linear(hidden_dim, hidden_dim),
                nn.ReLU(),
                nn.Dropout(0.2),
                nn.Linear(hidden_dim, output_dim)
            )
    
        def forward(self, x):
            return self.net(x)
    
  2. 导入 serverless_gpu 库和 分布式 模块。

    import serverless_gpu
    from serverless_gpu import distributed
    
  3. 将模型训练代码包装在函数中,并使用修饰器修饰函数 @distributed

    @distributed(gpus=8, gpu_type='H100')
    def run_train(num_epochs: int, batch_size: int) -> None:
        import mlflow
        import torch.optim as optim
        from torch.nn.parallel import DistributedDataParallel as DDP
        from torch.utils.data import DataLoader, DistributedSampler, TensorDataset
    
        # 1. Set up multi-GPU environment
        setup()
        device = torch.device(f"cuda:{int(os.environ['LOCAL_RANK'])}")
    
        # 2. Apply the Torch distributed data parallel (DDP) library for data-parellel training.
        model = SimpleMLP().to(device)
        model = DDP(model, device_ids=[device])
    
        # 3. Create and load dataset.
        x = torch.randn(5000, 10)
        y = torch.randn(5000, 1)
    
        dataset = TensorDataset(x, y)
        sampler = DistributedSampler(dataset)
        dataloader = DataLoader(dataset, sampler=sampler, batch_size=batch_size)
    
        # 4. Define the training loop.
        optimizer = optim.Adam(model.parameters(), lr=0.001)
        loss_fn = nn.MSELoss()
    
        for epoch in range(num_epochs):
            sampler.set_epoch(epoch)
            model.train()
            total_loss = 0.0
            for step, (xb, yb) in enumerate(dataloader):
                xb, yb = xb.to(device), yb.to(device)
                optimizer.zero_grad()
                loss = loss_fn(model(xb), yb)
                # Log loss to MLflow metric
                mlflow.log_metric("loss", loss.item(), step=step)
    
                loss.backward()
                optimizer.step()
                total_loss += loss.item() * xb.size(0)
    
            mlflow.log_metric("total_loss", total_loss)
            print(f"Total loss for epoch {epoch}: {total_loss}")
    
        cleanup()
    
  4. 使用用户定义的参数调用分布式函数来执行分布式训练。

    run_train.distributed(num_epochs=3, batch_size=1)
    
  5. 执行时,会在笔记本单元输出中生成 MLflow 运行链接。 单击 MLflow 运行链接或在 “试验 ”面板中找到它以查看运行结果。 有关自定义试验名称、跟踪指标和恢复运行的详细信息,请参阅 试验跟踪和可观测性

分布式执行详细信息

无服务器 GPU API 由多个关键组件组成:

  • 计算管理器:处理资源分配和管理
  • 运行时环境:管理Python环境和依赖项
  • 启动器:管理和协调作业执行和监控

在分布式模式下运行时:

  • 函数序列化并分布在指定数量的 GPU 中
  • 每个 GPU 都使用相同的参数运行函数的副本
  • 环境在所有 GPU 上保持同步
  • 从所有 GPU 收集并返回结果
  • 生命周期管理:分布式执行在笔记本的生命周期内运行。 当笔记本结束运行时,执行也会随之终止。 @distributed修饰器的默认超时时间为 3 小时。 若要设置自定义超时时间,请传入以秒为单位的 timeout;若要禁用超时,请传入 timeout=None。 GPU 环境 v5 及以上版本支持设置超时时间。

该 API 支持常用的并行训练库,例如 分布式数据并行 (DDP)、 完全分片数据并行FSDP)、DeepSpeed

可以使用 笔记本示例中的各种库找到更真实的分布式训练方案。

常见问题

应在何处放置数据加载代码?

使用 无服务器 GPU API 进行分布式训练时,在 @distributed 修饰器内移动数据加载代码。 数据集大小可以超出 pickle 允许的最大大小,因此建议在装饰器内生成数据集,如下所示:

from serverless_gpu import distributed

# this may cause pickle error
dataset = get_dataset(file_path)
@distributed(gpus=8, gpu_type='H100')
def run_train():
  # good practice
  dataset = get_dataset(file_path)
  ....

对于存储在 Unity Catalog 卷中的基于文件的数据,请使用来自 UCVolumeDatasetserverless_gpu.data,它可借助本地缓存流式传输文件,并自动在各个 rank 和工作进程之间对文件进行分区。 若要将分布式训练的检查点保存到卷中,请使用 UCVolumeWriterUCVolumeReader。 请参阅在 AI Runtime 上加载数据模型检查点

了解详细信息

有关 API 参考,请参阅 无服务器 GPU Python API 文档。