在 AI 运行时上加载数据

重要

用于单节点任务的 AI 运行时为 公共预览版。 多 GPU 工作负载的分布式训练 API 仍为 Beta 版。

在 AI Runtime 上加载用于机器学习和深度学习工作负载的训练数据。 所有数据访问都通过 Unity 目录:使用 Spark Connect 从 Delta 表读取表格数据,以及大型数据集和非结构化文件的 Unity 目录卷,例如图像、音频和文本。 对于多轮训练,请将数据缓存到本地的 /tmp,以加快访问速度。 若要了解如何使用 Spark Python API 加载和转换数据,请参阅本教程

注释

需要 Unity Catalog。 AI 运行时上的所有数据访问都通过 Unity 目录。 表和卷必须在 Unity 目录中注册,并可供用户或服务主体访问。

加载表格数据

使用 Spark Connect 从 Delta 表加载表格机器学习数据。

对于单节点训练,可以使用 PySpark 方法toPandas()将 Apache Spark 数据帧转换为 pandas 数据帧,然后使用 PySpark 方法to_numpy()选择性地转换为 NumPy 格式。

注释

Spark Connect 将分析和名称解析推迟到执行时间,这可能会更改代码的行为。 请参阅 “将 Spark 连接与 Spark 经典版进行比较”。

Spark Connect 支持大多数 PySpark API,包括 Spark SQL、Spark 上的 Pandas API、结构化流式处理和 MLlib(基于数据帧)。 有关最新支持的 API,请参阅 PySpark API 参考文档

有关其他限制,请参阅 无服务器计算限制

使用卷加载大型Delta表

对于太大而无法通过toPandas()转换的大型 Delta 表,您可以将数据导出到 Unity Catalog 卷,并直接使用 PyTorch 或 Hugging Face 进行加载:

# Step 1: Export the Delta table to Parquet files in a UC volume
output_path = "/Volumes/catalog/schema/my_volume/training_data"
spark.table("catalog.schema.my_table").write.mode("overwrite").parquet(output_path)
# Step 2: Load the exported data directly using Hugging Face datasets
from datasets import load_dataset

dataset = load_dataset("parquet", data_files="/Volumes/catalog/schema/my_volume/training_data/*.parquet")

此方法可避免在训练期间产生 Spark 开销,并且适用于单 GPU 和分布式训练工作流。

通过 UCVolumeDataset 从卷中加载非结构化数据

对于存储在 Unity 目录卷中的非结构化数据(如图像、音频和文本文件),请使用 UCVolumeDataset 包中的 serverless_gpu.data 文件。 UCVolumeDataset 是一个 PyTorch IterableDataset,用于在首次访问时将每个文件从卷复制到高速本地缓存,并返回缓存后的本地文件路径。 它处理了原本需要你手动实现的性能和分发相关问题:

  • 本地缓存。 文件在首次访问时会从 FUSE 挂载点复制到本地缓存目录,之后从缓存中提供,因此多轮次训练不会重新读取该卷。
  • 自动分区。 当初始化 torch.distributed 时,文件会先在各个 rank 之间进行划分,再进一步分配给 DataLoader 个工作进程,因此每个 (rank, worker) 配对都会收到一个互不重叠的数据切片,无需额外设置。

注释

UCVolumeDatasetserverless_gpu.data.DataLoader 需要 GPU 环境 5 或更高版本。

UCVolumeDataset 生成原始本地文件路径。 若要将这些文件解码为张量,请再用第二个 IterableDataset 将其包裹起来,使其接收路径流并应用你的解析逻辑。 这样可以将 I/O 和解析相关的关注点分开。

from serverless_gpu.data import UCVolumeDataset
from torch.utils.data import IterableDataset
from PIL import Image
import torchvision.transforms.functional as TF

class ImageDataset(IterableDataset):
    """Decodes each cached file path from UCVolumeDataset into a tensor."""

    def __init__(self, path_dataset: UCVolumeDataset):
        self._path_dataset = path_dataset

    def __iter__(self):
        for local_path in self._path_dataset:
            image = Image.open(local_path).convert("RGB")
            yield TF.to_tensor(image)

path_dataset = UCVolumeDataset("/Volumes/catalog/schema/my_volume/images")
dataset = ImageDataset(path_dataset)

封装器接收的是已经缓存的本地路径,因此解析步骤绝不会访问 FUSE 挂载点。 可以链接其他包装器进行扩充、标记化或筛选。

为获得最佳性能,请将 UCVolumeDatasetserverless_gpu.data.DataLoader 搭配使用,而不是使用默认的 PyTorch DataLoader。 它针对无服务器 GPU I/O 进行优化,并在 GPU 计算时同时提取和缓存文件。 请参阅 数据加载性能

在修饰器中 @distributed 加载数据

使用 无服务器 GPU API 进行分布式训练时,在 @distributed 修饰器内移动数据加载代码。 数据集大小可以超出 pickle 允许的最大大小,因此建议在装饰器内生成数据集,如下所示:

from serverless_gpu import distributed

# This may cause a pickle error if the dataset is too large
dataset = get_dataset(file_path)

@distributed(gpus=8, gpu_type='H100')
def run_train():
    # Load data inside the decorator to avoid pickle serialization issues
    dataset = get_dataset(file_path)
    ...

当你在修饰器内部构造一个 UCVolumeDataset 时,它会在迭代时读取 torch.distributed 的 rank 信息,并自动将文件划分到各个 rank,因此对于基于文件的体数据,你不需要使用 DistributedSampler

数据加载性能

/Workspace/Volumes 目录托管在远程 Unity 目录存储上。 如果数据集存储在 Unity 目录中,则数据加载速度受可用网络带宽的限制。 如果要训练多个轮次,推荐的方法是使用 UCVolumeDataset,它会为你执行这种缓存:首次访问时,它会将每个文件复制到本地存储,后续读取则直接从本地副本提供。 对于存储卷中的数据集,优先使用这种方式,而不是手动使用 shutil.copytree,因为后者会预先复制整个目录树,即使训练只用到其中的一部分。

如果数据集很大,以下技术可以提高吞吐量:

  • 使用 serverless_gpu.data.DataLoader 并行获取。 这是一个针对无服务器 GPU I/O 调优的 torch DataLoader 可直接替换使用的子类:num_workers 的默认值为 6,prefetch_factor 的默认值为 4(相比之下,PyTorch 中分别为 0 和 2),因此文件会在 GPU 计算的同时并发获取并缓存。 它还将每批提取计时记录到活动 MLflow 运行,这有助于发现数据加载瓶颈。

    from serverless_gpu.data import DataLoader
    
    loader = DataLoader(
        dataset,
        batch_size=32,
        pin_memory=True,
        # num_workers=6, by default
        # prefetch_factor=4, by default
        # raise num_workers to increase parallel reads, or prefetch_factor to deepen each worker's queue.
    )
    

    所有 rank 都必须使用相同的 num_workers 值,因为 UCVolumeDataset 会使用跨越 world_size × num_workers 个槽位的全局步长对文件进行分区。 不匹配的值会导致文件重复或跳过。

  • 增加批大小。 较大的批量可以将每批次的数据加载开销分摊到更多样本上,并减少每一步的文件获取操作次数。 如果 GPU 内存是限制因素,请将更大的批大小与渐变累积相结合,以保留有效的批大小。

流式数据集

对于内存中不适合的非常大的数据集,请使用流式处理方法: