在 AI Runtime 上加载用于机器学习和深度学习工作负载的训练数据。 所有数据访问都通过 Unity 目录:使用 Spark Connect 从 Delta 表读取表格数据,以及大型数据集和非结构化文件的 Unity 目录卷,例如图像、音频和文本。 对于多轮训练,请将数据缓存到本地的 /tmp,以加快访问速度。 若要了解如何使用 Spark Python API 加载和转换数据,请参阅本教程。
注释
需要 Unity Catalog。 AI 运行时上的所有数据访问都通过 Unity 目录。 表和卷必须在 Unity 目录中注册,并可供用户或服务主体访问。
加载表格数据
使用 Spark Connect 从 Delta 表加载表格机器学习数据。
对于单节点训练,可以使用 PySpark 方法toPandas()将 Apache Spark 数据帧转换为 pandas 数据帧,然后使用 PySpark 方法to_numpy()选择性地转换为 NumPy 格式。
注释
Spark Connect 将分析和名称解析推迟到执行时间,这可能会更改代码的行为。 请参阅 “将 Spark 连接与 Spark 经典版进行比较”。
Spark Connect 支持大多数 PySpark API,包括 Spark SQL、Spark 上的 Pandas API、结构化流式处理和 MLlib(基于数据帧)。 有关最新支持的 API,请参阅 PySpark API 参考文档 。
有关其他限制,请参阅 无服务器计算限制。
使用卷加载大型Delta表
对于太大而无法通过toPandas()转换的大型 Delta 表,您可以将数据导出到 Unity Catalog 卷,并直接使用 PyTorch 或 Hugging Face 进行加载:
# Step 1: Export the Delta table to Parquet files in a UC volume
output_path = "/Volumes/catalog/schema/my_volume/training_data"
spark.table("catalog.schema.my_table").write.mode("overwrite").parquet(output_path)
# Step 2: Load the exported data directly using Hugging Face datasets
from datasets import load_dataset
dataset = load_dataset("parquet", data_files="/Volumes/catalog/schema/my_volume/training_data/*.parquet")
此方法可避免在训练期间产生 Spark 开销,并且适用于单 GPU 和分布式训练工作流。
通过 UCVolumeDataset 从卷中加载非结构化数据
对于存储在 Unity 目录卷中的非结构化数据(如图像、音频和文本文件),请使用 UCVolumeDataset 包中的 serverless_gpu.data 文件。
UCVolumeDataset 是一个 PyTorch IterableDataset,用于在首次访问时将每个文件从卷复制到高速本地缓存,并返回缓存后的本地文件路径。 它处理了原本需要你手动实现的性能和分发相关问题:
- 本地缓存。 文件在首次访问时会从 FUSE 挂载点复制到本地缓存目录,之后从缓存中提供,因此多轮次训练不会重新读取该卷。
- 自动分区。 当初始化
torch.distributed时,文件会先在各个 rank 之间进行划分,再进一步分配给DataLoader个工作进程,因此每个(rank, worker)配对都会收到一个互不重叠的数据切片,无需额外设置。
注释
UCVolumeDataset 和 serverless_gpu.data.DataLoader 需要 GPU 环境 5 或更高版本。
UCVolumeDataset 生成原始本地文件路径。 若要将这些文件解码为张量,请再用第二个 IterableDataset 将其包裹起来,使其接收路径流并应用你的解析逻辑。 这样可以将 I/O 和解析相关的关注点分开。
from serverless_gpu.data import UCVolumeDataset
from torch.utils.data import IterableDataset
from PIL import Image
import torchvision.transforms.functional as TF
class ImageDataset(IterableDataset):
"""Decodes each cached file path from UCVolumeDataset into a tensor."""
def __init__(self, path_dataset: UCVolumeDataset):
self._path_dataset = path_dataset
def __iter__(self):
for local_path in self._path_dataset:
image = Image.open(local_path).convert("RGB")
yield TF.to_tensor(image)
path_dataset = UCVolumeDataset("/Volumes/catalog/schema/my_volume/images")
dataset = ImageDataset(path_dataset)
封装器接收的是已经缓存的本地路径,因此解析步骤绝不会访问 FUSE 挂载点。 可以链接其他包装器进行扩充、标记化或筛选。
为获得最佳性能,请将 UCVolumeDataset 与 serverless_gpu.data.DataLoader 搭配使用,而不是使用默认的 PyTorch DataLoader。 它针对无服务器 GPU I/O 进行优化,并在 GPU 计算时同时提取和缓存文件。 请参阅 数据加载性能。
在修饰器中 @distributed 加载数据
使用 无服务器 GPU API 进行分布式训练时,在 @distributed 修饰器内移动数据加载代码。 数据集大小可以超出 pickle 允许的最大大小,因此建议在装饰器内生成数据集,如下所示:
from serverless_gpu import distributed
# This may cause a pickle error if the dataset is too large
dataset = get_dataset(file_path)
@distributed(gpus=8, gpu_type='H100')
def run_train():
# Load data inside the decorator to avoid pickle serialization issues
dataset = get_dataset(file_path)
...
当你在修饰器内部构造一个 UCVolumeDataset 时,它会在迭代时读取 torch.distributed 的 rank 信息,并自动将文件划分到各个 rank,因此对于基于文件的体数据,你不需要使用 DistributedSampler。
数据加载性能
/Workspace 和 /Volumes 目录托管在远程 Unity 目录存储上。 如果数据集存储在 Unity 目录中,则数据加载速度受可用网络带宽的限制。 如果要训练多个轮次,推荐的方法是使用 UCVolumeDataset,它会为你执行这种缓存:首次访问时,它会将每个文件复制到本地存储,后续读取则直接从本地副本提供。 对于存储卷中的数据集,优先使用这种方式,而不是手动使用 shutil.copytree,因为后者会预先复制整个目录树,即使训练只用到其中的一部分。
如果数据集很大,以下技术可以提高吞吐量:
使用
serverless_gpu.data.DataLoader并行获取。 这是一个针对无服务器 GPU I/O 调优的 torchDataLoader可直接替换使用的子类:num_workers的默认值为 6,prefetch_factor的默认值为 4(相比之下,PyTorch 中分别为 0 和 2),因此文件会在 GPU 计算的同时并发获取并缓存。 它还将每批提取计时记录到活动 MLflow 运行,这有助于发现数据加载瓶颈。from serverless_gpu.data import DataLoader loader = DataLoader( dataset, batch_size=32, pin_memory=True, # num_workers=6, by default # prefetch_factor=4, by default # raise num_workers to increase parallel reads, or prefetch_factor to deepen each worker's queue. )所有 rank 都必须使用相同的
num_workers值,因为UCVolumeDataset会使用跨越world_size × num_workers个槽位的全局步长对文件进行分区。 不匹配的值会导致文件重复或跳过。增加批大小。 较大的批量可以将每批次的数据加载开销分摊到更多样本上,并减少每一步的文件获取操作次数。 如果 GPU 内存是限制因素,请将更大的批大小与渐变累积相结合,以保留有效的批大小。
流式数据集
对于内存中不适合的非常大的数据集,请使用流式处理方法:
-
UCVolumeDataset来自serverless_gpu.data,用于从 Unity Catalog 卷中流式传输文件,并提供本地缓存和自动分布式分区。 请参阅使用UCVolumeDataset从卷加载非结构化数据。 - 用于自定义流式处理逻辑的 PyTorch IterableDataset。
- Hugging Face 数据集与 Hub 或卷上托管的数据集进行流式处理。
- 用于分布式批处理数据处理的 Ray 数据。