GPU 分布式训练——PyTorch 版
type
Post
date
Mar 31, 2025
summary
在当今 AI 飞速发展的时代,模型变得越来越庞大复杂,单块 GPU 的计算能力已逐渐无法满足训练需求。这时,GPU 分布式训练应运而生,它像一个强大的引擎,通过协同多块 GPU 甚至多台机器上的 GPU,将原本漫长的训练过程大幅缩短。这不仅能让我们在更短的时间内迭代模型,探索更广阔的参数空间,也为训练超大规模模型(如大型语言模型和高分辨率图像模型)提供了可能
category
学习笔记
tags
分布式训练
DP
DDP
password
URL
Property
Jun 20, 2025 06:39 AM
分布式训练方式
- 数据并行 (Data Parallelism):
- 模型在每个 GPU 上都完整复制一份,然后将每个批次的训练数据分成多份给不同的 GPU,每个 GPU 计算出自己部分的梯度后,所有其他 GPU 将梯度提交汇总到主 GPU(通常是 0 号 GPU) 进行计算(通常是求平均),然后主 GPU 再将更新好的梯度和参数分发同步给其他 GPU,继续下一批次的训练。
- 优点: 实现相对简单,扩展性好,适用于模型大小适中但数据量很大的情况。
- 缺点: 当模型非常大,无法完全放入单个 GPU 的显存时,就无法使用这种方法。通信开销主要集中在梯度同步。
- 模型并行 (Model Parallelism):
- 当模型太大导致单个 GPU 的显存无法容纳时,可以将模型的不同层或不同部分放置在不同的 GPU 上(进行模型拆分),将同一份数据分别输入不同的 GPU 进行模型训练。
- 优点: 可以训练非常大的模型,突破单个 GPU 的显存限制。
- 缺点: 实现复杂,需要仔细划分模型,以确保负载均衡和最小化通信开销。通信开销通常发生在层与层之间的数据传输。
分布式训练框架
DeepSpeed(主流)
DeepSpeed 是由微软开发的一个深度学习优化库,旨在让训练和部署万亿参数级别的模型变得更简单、更高效。它的核心目标是解决大规模模型训练中的显存瓶颈和计算效率问题。DeepSpeed 核心特点和技术如下。
- ZeRO (Zero Redundancy Optimizer) 优化器: 是 DeepSpeed 解决显存问题的核心。
- ZeRO-DP (ZeRO Data Parallelism): 将优化器状态 (optimizer states)、梯度 (gradients) 和模型参数 (model parameters) 分片(sharding)到不同的 GPU 上,而不是在每个 GPU 上都保留一份完整的副本。
- ZeRO Stage 1: 仅分片优化器状态。
- ZeRO Stage 2: 分片优化器状态和梯度。
- ZeRO Stage 3: 分片优化器状态、梯度和模型参数。最激进,节省最多显存。
- ZeRO-Offload: 将部分优化器状态和梯度卸载到 CPU 内存甚至 NVMe 硬盘,进一步释放 GPU 显存。
- ZeRO-Infinity: 结合了 ZeRO Stage 3 和 NVMe Offload,旨在支持训练前所未有的超大模型(如万亿参数)。
对比图待更新
- 混合精度训练 (Mixed Precision Training): 支持使用 FP16 (半精度浮点数) 进行训练,在不损失模型精度的情况下,加速训练并减少显存占用。
- 大批量训练优化: 提供多种策略来支持更大的有效批量大小,如梯度累积 (Gradient Accumulation) 和 AdaScale。
- DeepSpeed MoE (Mixture of Experts): 专门为 Mixture-of-Experts (MoE) 模型设计的优化,这些模型通过稀疏激活来扩展模型规模,DeepSpeed 提供了高效的路由和通信机制来训练这类模型。
- 内存高效的注意力机制 (Memory-efficient Attention): 针对 Transformer 模型中的注意力机制进行优化,降低其显存消耗。
Megatron-LM
Megatron-LM 是由 NVIDIA 开发的一个项目,专注于高效地训练超大型 Transformer 语言模型。它通过高度优化的并行策略来解决训练这些巨型模型所带来的计算和显存挑战。Megatron-LM 不仅仅是一个框架,它更像是一个用于大模型训练的“基础设施”和方法论。核心特点和技术如下。
- 张量并行 (Tensor Parallelism):
- 这是 Megatron-LM 的标志性技术之一。它不将整个层分割到不同设备上,而是将**单个操作(如矩阵乘法)**的输入或输出张量沿某个维度进行分割。每个设备只计算这个操作的一部分结果。例如,一个大的矩阵乘法 C=A×B 可以被分割成多个小的矩阵乘法,每个 GPU 处理 A 的一部分和 B 的一部分,最终将各自的结果合并得到完整的 C。
- 流水线并行 (Pipeline Parallelism):
- 是模型并行的一种特殊形式。它将模型分成多个阶段(或层组),每个阶段分配给一个 GPU。数据(或微批次)以流水线的方式流经这些阶段,从而允许不同 GPU 同时处理不同阶段的不同微批次,减少 GPU 的空闲时间。
- 数据并行 (Data Parallelism):
- Megatron-LM 也支持传统的数据并行,通常与张量并行和流水线并行结合使用,以进一步扩展到更多的 GPU。例如,可以在每个节点内使用张量并行和流水线并行,然后在节点之间使用数据并行。
- 高效的通信优化: 为各种并行策略设计了高度优化的通信模式,以最小化数据传输延迟。
- 混合精度训练: 同样支持 FP16 和 BF16 混合精度训练,以提高计算速度和减少显存。
如何选择
训练方式如何选择
- 对于小模型训练(参数量通常在亿级以下,单卡显存足够存放整个模型),最常见且推荐的分布式训练方式是数据并行 (Data Parallelism)。
- 对于大模型训练(参数量从数十亿到万亿级,单个 GPU 无法存放整个模型),由于单个 GPU 无法容纳完整的模型参数、梯度和优化器状态,因此需要采用更复杂的混合并行策略,将数据并行与模型并行相结合。
- 数据并行 + 流水线并行:流水线并行将模型的不同层切分到不同的 GPU 组(或设备)上。每个 GPU 组只负责模型的一部分计算,数据以流水线的方式流经这些 GPU 组。在此基础上,数据并行可以应用于每个流水线阶段的副本之间,进一步扩展训练规模。例如,可以将模型分成 4 个流水线阶段,每个阶段在 8 个 GPU 上进行数据并行。
- 数据并行 + ZeRO 优化器
- 数据并行 + 流水线并行 + 张量并行 + ZeRO 优化:这是训练万亿参数级别超大模型的最完整和复杂的组合。
训练框架如何选择
训练框架通常用于大模型训练。
DeepSpeed 和 Megatron-LM 并非相互排斥的竞争者,而是可以相互补充的。
- Megatron-LM 提供了基础的并行策略(张量并行、流水线并行),并专注于如何有效地切分模型和数据来运行 Transformer。
- DeepSpeed 提供了强大的内存优化(ZeRO)和额外的优化功能。
因此,常见的做法是将两者结合使用:
- 使用 Megatron-LM 的张量并行和流水线并行来结构化模型的计算流。
- 在每个并行组内部(例如,在 Megatron-LM 的数据并行维度),使用 DeepSpeed 的 ZeRO 优化来进一步减少显存占用,从而允许训练更大的模型或使用更大的批量。
PyTorch + DeepSpeed/Megatron-LM/FairScale/FSDP: PyTorch 配合这些优化库,是大模型训练的主流选择。DeepSpeed 提供了 ZeRO 等先进的内存优化,而 Megatron-LM 则在张量并行和流水线并行方面表现出色。PyTorch 原生的 Fully Sharded Data Parallel (FSDP) 也是一种类似 ZeRO 的参数分片数据并行方案。
小模型数据并行实现(PyThorch 版)
先来回顾一下数据并行的核心思想:
- 模型复制: 在每个可用的 GPU 上复制一份完整的模型。
- 数据分发: 将一个大的训练批次 (batch) 分割成多个小批次 (mini-batches),每个小批次被发送到一个不同的 GPU 进行处理。
- 独立计算: 每个 GPU 独立地对分到的数据执行前向传播和反向传播,计算出各自的梯度。
- 梯度同步: 所有 GPU 计算出的梯度需要被聚合(通常是求平均),然后用这个聚合后的梯度来更新所有模型副本的参数,确保所有 GPU 上的模型参数保持一致。
PyThorch 实现并行数据并行主要有两种方式:DataParallel(DP)、DistributedDataParallel(DDP)
DataParallel(DP)
DataParallel 是 PyTorch 提供的一种单机多卡数据并行方案。它实现起来非常简单,但有一些显著的性能限制。
工作原理
- 主 GPU 负载: DataParallel 会将整个模型放到一个主 GPU (默认是 cuda:0) 上,然后在这个主 GPU 上进行计算和汇总。
- 数据分发: 输入数据被切分后发送到所有可见的 GPU(包括主 GPU)。
- 子 GPU 计算: 其他 GPU(子 GPU)从主 GPU 接收模型副本和数据,进行前向传播和反向传播,然后将计算出的梯度发送回主 GPU。
- 主 GPU 聚合和更新: 主 GPU 负责接收所有子 GPU 传回的梯度,进行聚合,更新模型参数,然后将更新后的参数广播给所有子 GPU。
示例代码
import torch import torch.nn as nn from torch.optim import SGD # 1. 定义一个简单的模型 class SimpleModel(nn.Module): def __init__(self): super(SimpleModel, self).__init__() self.linear = nn.Linear(10, 1) # 输入特征10,输出特征1 def forward(self, x): return self.linear(x) # 确保有多个 GPU 可用,否则 DP 不会有效果 if torch.cuda.device_count() > 1: print(f"检测到 {torch.cuda.device_count()} 个 GPU,使用 DataParallel。") # 2. 实例化模型 model = SimpleModel() # 3. 使用 DataParallel 包装模型 model = nn.DataParallel(model) # 默认使用所有可见的 GPU # 将模型移动到第一个 GPU (主 GPU) # 虽然 DataParallel 会自动处理,但为了明确,我们通常会将其放到设备上 model.to('cuda:0') # 4. 定义损失函数和优化器 criterion = nn.MSELoss() optimizer = SGD(model.parameters(), lr=0.01) # 5. 模拟训练循环 for epoch in range(10): # 模拟生成一个大批次数据 # 注意:整个批次数据仍然需要先放在主 GPU 上,然后 DataParallel 会自动切分 input_data = torch.randn(64, 10).to('cuda:0') # 假设 batch_size = 64 labels = torch.randn(64, 1).to('cuda:0') optimizer.zero_grad() outputs = model(input_data) # 前向传播 loss = criterion(outputs, labels) loss.backward() # 反向传播 optimizer.step() # 参数更新 print(f"Epoch {epoch+1}, Loss: {loss.item():.4f}") else: print("GPU 数量不足,无法演示 DataParallel。请确保至少有2个GPU。")
最后总结
- 优点:简单易用,只需一行代码
model = nn.DataParallel(model)
即可实现多 GPU 训练。
- 缺点 (为何不推荐):
- 负载不均衡: 主 GPU (通常是
cuda:0
) 承担了所有梯度的聚合、参数更新和将更新后的参数广播给所有子 GPU的任务。这导致主 GPU 的显存和计算负载远高于其他 GPU,使得cuda:0
很容易成为性能瓶颈,甚至出现 OOM (显存溢出)。 - 效率低下:
- GIL (Global Interpreter Lock) 限制:
DataParallel
内部使用了 Python 的多线程,受到 GIL 的限制,无法真正实现并行计算,尤其是在数据传输和模型更新阶段。 - 通信效率低: 每次梯度聚合和模型更新,主 GPU 都需要等待所有子 GPU 完成计算并将梯度传回,然后再将更新后的参数广播出去,导致大量数据传输和同步开销。
- 无法跨机器:
DataParallel
仅支持单机多 GPU,无法扩展到多台机器进行分布式训练。
DistributedDataParallel(DDP)
DistributedDataParallel 是 PyTorch 官方推荐的分布式训练方式,它解决了 DataParallel 的所有主要缺点,支持单机多卡和多机多卡训练,并且具有更高的效率和更好的扩展性。
工作原理
- 进程级别并行: DDP 为每个 GPU 启动一个独立的进程。每个进程都维护一个独立的模型副本。
- 数据分发: 每个进程只负责处理自己分到的那一部分数据。
- 独立计算: 每个进程独立地进行前向传播和反向传播,计算各自的梯度。
- 分布式梯度同步:
- 在反向传播过程中,当一个参数的梯度计算完成后,DDP 会立即启动一个异步的 All-Reduce 操作,将所有进程中该参数的梯度进行汇总(求平均)。
- 这个 All-Reduce 操作是去中心化的,每个进程都参与梯度的传输和聚合,没有主 GPU 瓶颈。
- 由于是异步进行,通信和计算可以部分重叠,大大提高了效率。
- 参数更新: 每个进程收到聚合后的梯度后,独立地使用自己的优化器更新自己模型副本的参数。由于所有进程都使用相同的聚合梯度进行更新,因此可以保证所有模型副本的参数保持同步。
示例代码
DDP
的使用稍微复杂一些,因为它涉及到多进程的设置,需要初始化进程组 (Process Group)。# train_ddp.py (这是一个Python脚本) import os import torch import torch.nn as nn import torch.optim as optim import torch.distributed as dist from torch.nn.parallel import DistributedDataParallel as DDP from torch.utils.data import DataLoader, Dataset from torch.utils.data.distributed import DistributedSampler # 0. 定义一个简单的Dataset class SimpleDataset(Dataset): def __init__(self, num_samples): self.data = torch.randn(num_samples, 10) self.labels = torch.randn(num_samples, 1) def __len__(self): return len(self.data) def __getitem__(self, idx): return self.data[idx], self.labels[idx] # 1. 定义模型 class SimpleModel(nn.Module): def __init__(self): super(SimpleModel, self).__init__() self.linear = nn.Linear(10, 1) def forward(self, x): return self.linear(x) # 2. DDP 训练函数 def train_ddp(rank, world_size): # 2.1 初始化进程组 # 'env://' 表示从环境变量中获取 master_addr 和 master_port # 这通常由 torchrun/torch.distributed.launch 自动设置 dist.init_process_group(backend='nccl', rank=rank, world_size=world_size) # 对于每个进程,设置其对应的 GPU 设备 torch.cuda.set_device(rank) device = torch.device(f"cuda:{rank}") # 2.2 实例化模型并包装为 DDP model = SimpleModel().to(device) model = DDP(model, device_ids=[rank]) # 告知 DDP 这个进程使用哪个 GPU # 2.3 定义损失函数和优化器 criterion = nn.MSELoss() optimizer = optim.SGD(model.parameters(), lr=0.01) # 2.4 数据加载器 (使用 DistributedSampler) dataset = SimpleDataset(num_samples=1000) # 假设有1000个样本 # DistributedSampler 会确保每个进程只看到一部分数据,且数据不重复 sampler = DistributedSampler(dataset, num_replicas=world_size, rank=rank, shuffle=True) dataloader = DataLoader(dataset, batch_size=32, sampler=sampler) # 2.5 训练循环 print(f"Rank {rank} 启动训练...") for epoch in range(10): # 在每个 epoch 开始时,调用 set_epoch 来 shuffle 数据 sampler.set_epoch(epoch) total_loss = 0 for batch_idx, (inputs, labels) in enumerate(dataloader): inputs, labels = inputs.to(device), labels.to(device) optimizer.zero_grad() outputs = model(inputs) loss = criterion(outputs, labels) loss.backward() optimizer.step() total_loss += loss.item() avg_loss = total_loss / len(dataloader) if rank == 0: # 通常只在主进程打印日志 print(f"Epoch {epoch+1}, Rank {rank}, Avg Loss: {avg_loss:.4f}") # 2.6 清理进程组 dist.destroy_process_group() print(f"Rank {rank} 训练完成并清理进程组。") if __name__ == "__main__": # DDP 训练通常由外部工具(如 torchrun)启动, # 这些工具会设置 LOCAL_RANK, RANK, WORLD_SIZE 等环境变量 # 我们这里只是演示,实际运行时这些值会由 torchrun 传递 rank = int(os.environ.get('RANK', '0')) world_size = int(os.environ.get('WORLD_SIZE', '1')) # 默认单卡模式 # 如果是本地直接运行,且有多个GPU,可以手动设置这些变量进行测试 # 注意:直接运行需要手动管理进程,不推荐用于实际训练 if world_size > 1: train_ddp(rank, world_size) else: print("请使用 torchrun (或 torch.distributed.launch) 启动 DDP 训练。") print("示例命令: torchrun --nproc_per_node=2 train_ddp.py") # 也可以手动调用,但需要在代码里处理多进程启动逻辑 # from multiprocessing import Process # world_size = torch.cuda.device_count() # processes = [] # for i in range(world_size): # p = Process(target=train_ddp, args=(i, world_size)) # p.start() # processes.append(p) # for p in processes: # p.join()
一些参数解释:
# 初始化进程组 dist.init_process_group( backend='nccl', # 进程间通信的后端; # 'nccl':(默认值)性能最好,利用 NVIDIA NCCL 库; # 'gloo':CPU 后端,或者当 NCCL 不可用时的备选方案。 # 'mpi':多用途后端,但配置可能更复杂。 init_method='env://', # 初始化方法:指定如何发现集群中所有进程的地址和端口 # 'env://':(默认值)最常用。从环境变量(MASTER_ADDR, MASTER_PORT, RANK, WORLD_SIZE)中获取信息。torchrun 默认使用这种方式。 # 'tcp://<master_addr>:<master_port>':手动指定主节点的地址和端口。 # 'file:///path/to/shared_file':通过共享文件进行同步。 world_size=len(self.gpu_ids), # 参与分布式训练的进程总数;由环境变量 WORLD_SIZE 提供 rank=rank # 当前进程在整个分布式训练任务中的全局唯一排名;由环境变量 RANK 提供 ) # 环境变量 MASTER_ADDR: 主节点的 IP 地址。默认值:localhost (当 init_method='env://' 且未显式设置时)。 MASTER_PORT: 主节点的 IP 端口号。默认值:29500 (当 init_method='env://' 且未显式设置时)。 WORLD_SIZE: 进程总数;通常等于用于训练的 GPU 数量。默认值:1。 RANK: 当前进程在整个集群中的全局排名。默认值:0。 LOCAL_RANK: 当前进程在当前节点(机器)上的本地排名。默认值:通常为 0。 NODE_RANK: 当前节点在整个集群中的排名。默认值:0。 NCCL_DEBUG: 设置 NCCL 的调试级别。默认值:空(无调试输出)。 NCCL_IB_DISABLE: 禁用 InfiniBand 网络。默认值:0 (不禁用)。 NCCL_P2P_DISABLE: 禁用 GPU 之间的 Peer-to-Peer (P2P) 通信。默认值:0 (不禁用)。 NCCL_TOPO_DUMP_DIR: 指定 NCCL 拓扑文件输出目录。默认值:空(不生成拓扑文件)。 GLOO_SOCKET_IFNAME: 当使用 Gloo 后端时,指定使用的网络接口名称。无默认值,需要根据系统配置指定。 # torchrun 命令行参数 --nproc_per_node: 每个节点(机器)上启动的进程(通常是 GPU)数量。默认值:1。 --nnodes: 参与训练的总节点(机器)数量。默认值:1。 --node_rank: 当前节点在整个集群中的排名(从 0 开始)。默认值:0。 --master_addr: 主节点(用于通信协调的节点)的 IP 地址。默认值:127.0.0.1 (仅限单机)。 --master_port: 主节点的 IP 端口号。默认值:29500。 --rdzv_backend: rendezvous(集合点)后端,用于多节点进程发现和同步。默认值:static (或 c10d,具体取决于 PyTorch 版本)。 --rdzv_endpoint: rendezvous 后端使用的地址和端口,格式通常为 "<master_addr>:<master_port>"。默认值:空(与 master_addr 和 master_port 配合使用或由 rdzv_backend 决定)。 --rdzv_id: rendezvous 唯一 ID,用于区分不同的分布式训练任务。默认值:自动生成。 --standalone: 启用独立模式,所有 rendezvous 信息在本地处理(通常用于单节点多 GPU 训练)。默认值:False (即不启用)。 --max_restarts: 允许进程失败后最大重启次数。默认值:0。 --log_dir: 指定日志文件保存目录。默认值:./ (当前目录)。 命令行参数是用户设置分布式训练配置的主要方式,而这些参数的大部分最终会转化或映射为环境变量,供 PyTorch 进程内部读取和使用。
运行命令:
# 单机多卡 (例如,2个GPU) torchrun --nproc_per_node=2 train_ddp.py # 或者 (旧版本) # python -m torch.distributed.launch --nproc_per_node=2 train_ddp.py # 多机多卡 (例如,两台机器,每台4个GPU,总共8个GPU) # 假设机器A的IP是 192.168.1.100 # 在机器A上运行: # master_addr 指向主进程的IP # master_port 通常是一个未被占用的端口 # node_rank=0 表示这是第一台机器 torchrun \ --nnodes=2 \ --node_rank=0 \ --master_addr="192.168.1.100" \ --master_port="29500" \ --nproc_per_node=4 \ train_ddp.py # 在机器B上运行: # node_rank=1 表示这是第二台机器 torchrun \ --nnodes=2 \ --node_rank=1 \ --master_addr="192.168.1.100" \ --master_port="29500" \ --nproc_per_node=4 \ train_ddp.py
最后总结
- 优点 (为何推荐)
- 性能高效:
- 无主 GPU 瓶颈: 每个进程独立运行,梯度聚合使用去中心化的 All-Reduce 算法,所有 GPU 平衡参与计算和通信,避免了单点瓶颈。
- 异步通信: 梯度聚合可以与反向传播的计算重叠,减少等待时间。
- 多进程: 避免了 Python GIL 的限制,实现真正的并行。
- 扩展性强: 支持单机多卡和多机多卡训练,可以轻松扩展到数百甚至数千个 GPU。
- 内存效率更高 (相对 DP): 每个 GPU 上的进程只维护自己的模型副本,而不需要主进程在 CPU 或单个 GPU 上维护所有子进程的中间状态。虽然每个 GPU 仍然需要完整的模型副本,但在处理大数据量时,DDP 对每个 GPU 的显存压力更均衡。
- 社区和生态: DDP 是 PyTorch 官方和社区的主流推荐,有更活跃的开发和支持。
- 缺点
- 实现稍复杂: 需要手动设置进程组,并使用 DistributedSampler 来处理数据加载,相比 DataParallel 增加了初始化代码。
- 调试相对复杂: 多进程环境下的调试可能比单进程多线程的 DataParallel 更具挑战性。
对比选择
特性 | torch.nn.DataParallel (DP) | torch.nn.parallel.DistributedDataParallel (DDP) |
易用性 | 极简单,一行代码 | 稍复杂,需要进程组初始化和 DistributedSampler |
并行方式 | 单进程多线程 | 多进程多 GPU |
通信方式 | 主 GPU 汇总/广播 | 去中心化 All-Reduce |
性能 | 低效,主 GPU 瓶颈 | 高效,负载均衡,通信与计算重叠 |
扩展性 | 仅限单机多卡 | 单机多卡和多机多卡 |
推荐程度 | 不推荐使用 | 强烈推荐使用 |
使用场景 | 极少使用,仅限代码快速验证 | 绝大多数分布式训练场景 |
对于小模型的数据并行训练,甚至所有 PyTorch 的分布式训练,torch.nn.parallel.DistributedDataParallel (DDP) 都是首选和标准做法。虽然它在代码上比 DataParallel 多了几行初始化,但这些额外的努力会换来显著的性能提升、更好的可扩展性以及更稳定的训练体验。