shard_dataloader¶
- paddle.distributed. shard_dataloader ( dataloader, meshes, input_keys=None, shard_dims=None, is_dataset_splitted=False ) [源代码] ¶
将单卡视角的数据加载器转变为分布式视角,与普通的 dataloader 相比,其提供了两个能力: 1. 如果 dataloader 的 shard_dim 不为 None,则按 shard_dim 拆分 dataloader 以进行数据并行。 2. 将 dataloader 的输出添加上分布式属性,即把普通 tensor 变为分布式 tensor。
参数¶
dataloader (paddle.io.DataLoader) - 单卡视角的 dataloader。
meshes (ProcessMesh|list|tuple) - 切分 dataloader 使用的 mesh。可以是个 ProcessMesh 或者 list,如果是个 list,则表示不同的输入需要在不同的 mesh 上。
input_keys (list|tuple,可选) - 如果 dataloader 的迭代结果是一个张量字典,input_keys 是这个字典的键,标识哪个张量位于哪个 mesh 上,与 meshes 一一对应。默认值 None,表示 dataloader 的迭代结果不是 dict。
shard_dims (str|int|list|tuple,可选) - 对 dataloader 进行分片的 mesh 维度。默认值 None,代表不切分 dataloader,通常使用数据并行的情况下,必须设置此参数。
is_dataset_splitted (bool,可选) - 数据集是否已根据数据并行的 rank 进行了切分。默认值 False。
返回¶
ShardDataloader:一个具有分布式视角的数据加载器对象。
代码示例¶
>>> import os
>>> import numpy as np
>>> import paddle
>>> import paddle.distributed as dist
>>> from paddle.io import BatchSampler, DataLoader, Dataset
>>> mesh0 = dist.ProcessMesh([[0, 1], [2, 3]], dim_names=['x', 'y'])
>>> mesh1 = dist.ProcessMesh([[4, 5], [6, 7]], dim_names=['x', 'y'])
>>> paddle.seed(1024)
>>> np.random.seed(1024)
>>> class RandomDataset(Dataset):
>>> def __init__(self, seq_len, hidden, num_samples=8):
... super().__init__()
... self.seq_len = seq_len
... self.hidden = hidden
... self.num_samples = num_samples
... self.inputs = [np.random.uniform(size=[self.seq_len, self.hidden]).astype("float32") for _ in range(num_samples)]
... self.labels = [np.array(index, dtype="float32") for index in range(num_samples)]
... def __getitem__(self, index):
... return self.inputs[index], self.labels[index]
... def __len__(self):
... return self.num_samples
>>> class MlpModel(paddle.nn.Layer):
... def __init__(self):
... super(MlpModel, self).__init__()
... self.w0 = dist.shard_tensor(
... self.create_parameter(shape=[8, 8]),
... mesh0, [dist.Replicate(), dist.Shard(1)])
... self.w1 = dist.shard_tensor(
... self.create_parameter(shape=[8, 8]),
... mesh1, [dist.Replicate(), dist.Shard(0)])
... def forward(self, x):
... y = paddle.matmul(x, self.w0)
... y = dist.reshard(y, mesh1, [dist.Shard(0), dist.Shard(2)])
... z = paddle.matmul(y, self.w1)
... return z
>>> model = MlpModel()
>>> dataset = RandomDataset(4, 8)
>>> sampler = BatchSampler(
... dataset,
... batch_size=2,
... )
>>> dataloader = DataLoader(
... dataset,
... batch_sampler=sampler,
... )
>>> dist_dataloader = dist.shard_dataloader(
... dataloader=dataloader,
... meshes=[mesh0, mesh1],
... shard_dims="x"
... )
>>> opt = paddle.optimizer.AdamW(learning_rate=0.001, parameters=model.parameters())
>>> dist_opt = dist.shard_optimizer(opt)
>>> def loss_fn(logits, label):
... # logits: [bs, seq_len, hidden], label: [bs]
... loss = paddle.nn.MSELoss(reduction="sum")
... logits = paddle.sum(logits, axis=[1, 2])
... return loss(logits, label)
>>> RUN_STATIC = eval(os.environ['RUN_STATIC'])
>>> def run_dynamic():
... for step, (input, label) in enumerate(dist_dataloader()):
... logits = model(input)
... loss = loss_fn(logits, label)
... print("step:{}, loss:{}".format(step, loss))
... loss.backward()
... dist_opt.step()
... dist_opt.clear_grad()
>>> def run_static():
... dist_model = dist.to_static(
... model, dist_dataloader, loss_fn, opt
... )
... dist_model.train()
... for step, (input, label) in enumerate(dist_dataloader()):
... print("label:", label)
... loss = dist_model(input, label)
... print("step:{}, loss:{}".format(step, loss))
>>> if RUN_STATIC == 0:
... run_dynamic()
... else:
... run_static()
>>> # This case need to be executed in multi-card environment
>>> # export CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7
>>> # RUN_STATIC=1 python -u -m paddle.distributed.launch --gpus "0,1,2,3,4,5,6,7" {test_case}.py
>>> # RUN_STATIC=0 python -u -m paddle.distributed.launch --gpus "0,1,2,3,4,5,6,7" {test_case}.py