快速开始-数据并行¶
数据并行(data parallelism)是大规模深度学习训练中常用的并行模式,它会在每个进程(设备)中维护完整的模型和参数, 但在每个进程上处理不同的数据。因此,数据并行非常适合单卡已经能够放得下完整的模型和参数,但希望通过并行来增大 全局数据(global batch)大小来提升训练的吞吐量。
本节将采用自定义卷积网络和 Paddle 内置的 CIFAR-10 数据集来介绍如何使用 paddle.distributed (paddle.distributed.fleet) 进行数据并行训练。
1.1 版本要求¶
在编写分布式训练程序之前,用户需要确保已经安装 GPU 版的 PaddlePaddle 2.3.0 及以上版本。
1.2 具体步骤¶
与单机单卡的普通模型训练相比,数据并行训练只需要按照如下 5 个步骤对代码进行简单调整即可:
导入分布式训练依赖包
初始化 Fleet 环境
构建分布式训练使用的网络模型
构建分布式训练使用的优化器
构建分布式训练使用的数据加载器
下面将逐一进行讲解。
1.2.1 导入分布式训练依赖包¶
导入飞桨分布式训练专用包 Fleet。
# 导入分布式专用 Fleet API
from paddle.distributed import fleet
# 导入分布式训练数据所需 API
from paddle.io import DataLoader, DistributedBatchSampler
# 设置 GPU 环境
paddle.set_device('gpu')
1.2.2 初始化 Fleet 环境¶
分布式初始化需要:
设置 is_collective 为 True,表示分布式训练采用 Collective 模式。
[可选] 设置分布式策略 cn_api_distributed_fleet_DistributedStrategy ,跳过将使用缺省配置。
# 选择不设置分布式策略
fleet.init(is_collective=True)
# 选择设置分布式策略
strategy = fleet.DistributedStrategy()
fleet.init(is_collective=True, strategy=strategy)
1.2.3 构建分布式训练使用的网络模型¶
只需要使用 fleet.distributed_model
对原始串行网络模型进行封装。
# 等号右边 model 为原始串行网络模型
model = fleet.distributed_model(model)
1.2.4 构建分布式训练使用的优化器¶
只需要使用 fleet.distributed_optimizer
对原始串行优化器进行封装。
# 等号右边 optimizer 为原始串行网络模型
optimizer = fleet.distributed_optimizer(optimizer)
1.2.5 构建分布式训练使用的数据加载器¶
由于分布式训练过程中每个进程可能读取不同数据,所以需要对数据集进行合理拆分后再进行加载。这里只需要在构建 cn_api_fluid_io_DataLoader 时, 设置分布式数据采样器 cn_api_io_cn_DistributedBatchSampler 即可。
# 构建分布式数据采样器
# 注意:需要保证 batch 中每个样本数据 shape 相同,若原尺寸不一,需进行预处理
train_sampler = DistributedBatchSampler(train_dataset, 32, shuffle=True, drop_last=True)
val_sampler = DistributedBatchSampler(val_dataset, 32)
# 构建分布式数据加载器
train_loader = DataLoader(train_dataset, batch_sampler=train_sampler, num_workers=2)
valid_loader = DataLoader(val_dataset, batch_sampler=val_sampler, num_workers=2)
1.3 完整示例代码¶
# -*- coding: UTF-8 -*-
import numpy as np
import matplotlib.pyplot as plt
import paddle
import paddle.nn.functional as F
from paddle.vision.transforms import ToTensor
# 一、导入分布式专用 Fleet API
from paddle.distributed import fleet
# 构建分布式数据加载器所需 API
from paddle.io import DataLoader, DistributedBatchSampler
# 设置 GPU 环境
paddle.set_device('gpu')
class MyNet(paddle.nn.Layer):
def __init__(self, num_classes=1):
super().__init__()
self.conv1 = paddle.nn.Conv2D(in_channels=3, out_channels=32, kernel_size=(3, 3))
self.pool1 = paddle.nn.MaxPool2D(kernel_size=2, stride=2)
self.conv2 = paddle.nn.Conv2D(in_channels=32, out_channels=64, kernel_size=(3,3))
self.pool2 = paddle.nn.MaxPool2D(kernel_size=2, stride=2)
self.conv3 = paddle.nn.Conv2D(in_channels=64, out_channels=64, kernel_size=(3,3))
self.flatten = paddle.nn.Flatten()
self.linear1 = paddle.nn.Linear(in_features=1024, out_features=64)
self.linear2 = paddle.nn.Linear(in_features=64, out_features=num_classes)
def forward(self, x):
x = self.conv1(x)
x = F.relu(x)
x = self.pool1(x)
x = self.conv2(x)
x = F.relu(x)
x = self.pool2(x)
x = self.conv3(x)
x = F.relu(x)
x = self.flatten(x)
x = self.linear1(x)
x = F.relu(x)
x = self.linear2(x)
return x
epoch_num = 10
batch_size = 32
learning_rate = 0.001
val_acc_history = []
val_loss_history = []
def train():
# 二、初始化 Fleet 环境
fleet.init(is_collective=True)
model = MyNet(num_classes=10)
# 三、构建分布式训练使用的网络模型
model = fleet.distributed_model(model)
opt = paddle.optimizer.Adam(learning_rate=learning_rate,parameters=model.parameters())
# 四、构建分布式训练使用的优化器
opt = fleet.distributed_optimizer(opt)
transform = ToTensor()
cifar10_train = paddle.vision.datasets.Cifar10(mode='train',
transform=transform)
cifar10_test = paddle.vision.datasets.Cifar10(mode='test',
transform=transform)
# 五、构建分布式训练使用的数据集
train_sampler = DistributedBatchSampler(cifar10_train, 32, shuffle=True, drop_last=True)
train_loader = DataLoader(cifar10_train, batch_sampler=train_sampler, num_workers=2)
valid_sampler = DistributedBatchSampler(cifar10_test, 32, drop_last=True)
valid_loader = DataLoader(cifar10_test, batch_sampler=valid_sampler, num_workers=2)
for epoch in range(epoch_num):
model.train()
for batch_id, data in enumerate(train_loader()):
x_data = data[0]
y_data = paddle.to_tensor(data[1])
y_data = paddle.unsqueeze(y_data, 1)
logits = model(x_data)
loss = F.cross_entropy(logits, y_data)
if batch_id % 1000 == 0:
print("epoch: {}, batch_id: {}, loss is: {}".format(epoch, batch_id, loss.numpy()))
loss.backward()
opt.step()
opt.clear_grad()
model.eval()
accuracies = []
losses = []
for batch_id, data in enumerate(valid_loader()):
x_data = data[0]
y_data = paddle.to_tensor(data[1])
y_data = paddle.unsqueeze(y_data, 1)
logits = model(x_data)
loss = F.cross_entropy(logits, y_data)
acc = paddle.metric.accuracy(logits, y_data)
accuracies.append(acc.numpy())
losses.append(loss.numpy())
avg_acc, avg_loss = np.mean(accuracies), np.mean(losses)
print("[validation] accuracy/loss: {}/{}".format(avg_acc, avg_loss))
val_acc_history.append(avg_acc)
val_loss_history.append(avg_loss)
if __name__ == "__main__":
train()
1.4 分布式启动¶
准备好分布式训练脚本后,就可以通过 paddle.distributed.launch 在集群上启动分布式训练:
-
- 单机多卡训练
-
假设只使用集群的一个节点,节点上可使用的 GPU 卡数为 4,那么只需要在节点终端运行如下命令:
python -m paddle.distributed.launch --gpus=0,1,2,3 train_with_fleet.py
-
- 多机多卡训练
-
假设集群包含两个节点,每个节点上可使用的 GPU 卡数为 4,IP 地址分别为 192.168.1.2 和 192.168.1.3,那么需要在两个节点的终端上分别运行如下命令:
在 192.168.1.2 节点运行:
python -m paddle.distributed.launch \ --gpus=0,1,2,3 \ --ips=192.168.1.2,192.168.1.3 \ train_with_fleet.py
在 192.168.1.3 节点运行相同命令:
python -m paddle.distributed.launch \ --gpus=0,1,2,3 \ --ips=192.168.1.2,192.168.1.3 \ train_with_fleet.py
相关启动问题,可参考 cn_api_distributed_launch 。