shard_dataloader

paddle.distributed. shard_dataloader ( dataloader, meshes, input_keys=None, shard_dims=None, is_dataset_splitted=False ) [源代码]

将单卡视角的数据加载器转变为分布式视角,与普通的 dataloader 相比,其提供了两个能力: 1. 如果 dataloader 的 shard_dim 不为 None,则按 shard_dim 拆分 dataloader 以进行数据并行。 2. 将 dataloader 的输出添加上分布式属性,即把普通 tensor 变为分布式 tensor。

参数

  • dataloader (paddle.io.DataLoader) - 单卡视角的 dataloader。

  • meshes (ProcessMesh|list|tuple) - 切分 dataloader 使用的 mesh。可以是个 ProcessMesh 或者 list,如果是个 list,则表示不同的输入需要在不同的 mesh 上。

  • input_keys (list|tuple,可选) - 如果 dataloader 的迭代结果是一个张量字典,input_keys 是这个字典的键,标识哪个张量位于哪个 mesh 上,与 meshes 一一对应。默认值 None,表示 dataloader 的迭代结果不是 dict。

  • shard_dims (str|int|list|tuple,可选) - 对 dataloader 进行分片的 mesh 维度。默认值 None,代表不切分 dataloader,通常使用数据并行的情况下,必须设置此参数。

  • is_dataset_splitted (bool,可选) - 数据集是否已根据数据并行的 rank 进行了切分。默认值 False。

返回

ShardDataloader:一个具有分布式视角的数据加载器对象。

代码示例

>>> import os
>>> import numpy as np
>>> import paddle
>>> import paddle.distributed as dist
>>> from paddle.io import BatchSampler, DataLoader, Dataset

>>> mesh0 = dist.ProcessMesh([[0, 1], [2, 3]], dim_names=['x', 'y'])
>>> mesh1 = dist.ProcessMesh([[4, 5], [6, 7]], dim_names=['x', 'y'])

>>> paddle.seed(1024)
>>> np.random.seed(1024)
>>> class RandomDataset(Dataset):
>>>     def __init__(self, seq_len, hidden, num_samples=8):
...         super().__init__()
...         self.seq_len = seq_len
...         self.hidden = hidden
...         self.num_samples = num_samples
...         self.inputs = [np.random.uniform(size=[self.seq_len, self.hidden]).astype("float32") for _ in range(num_samples)]
...         self.labels = [np.array(index, dtype="float32") for index in range(num_samples)]

...     def __getitem__(self, index):
...         return self.inputs[index], self.labels[index]

...     def __len__(self):
...         return self.num_samples

>>> class MlpModel(paddle.nn.Layer):
...     def __init__(self):
...         super(MlpModel, self).__init__()
...         self.w0 = dist.shard_tensor(
...             self.create_parameter(shape=[8, 8]),
...             mesh0, [dist.Replicate(), dist.Shard(1)])
...         self.w1 = dist.shard_tensor(
...             self.create_parameter(shape=[8, 8]),
...             mesh1, [dist.Replicate(), dist.Shard(0)])

...     def forward(self, x):
...         y = paddle.matmul(x, self.w0)
...         y = dist.reshard(y, mesh1, [dist.Shard(0), dist.Shard(2)])
...         z = paddle.matmul(y, self.w1)
...         return z

>>> model = MlpModel()
>>> dataset = RandomDataset(4, 8)
>>> sampler = BatchSampler(
...     dataset,
...     batch_size=2,
... )
>>> dataloader = DataLoader(
...     dataset,
...     batch_sampler=sampler,
... )
>>> dist_dataloader = dist.shard_dataloader(
...     dataloader=dataloader,
...     meshes=[mesh0, mesh1],
...     shard_dims="x"
... )
>>> opt = paddle.optimizer.AdamW(learning_rate=0.001, parameters=model.parameters())
>>> dist_opt = dist.shard_optimizer(opt)
>>> def loss_fn(logits, label):
...     # logits: [bs, seq_len, hidden], label: [bs]
...     loss = paddle.nn.MSELoss(reduction="sum")
...     logits = paddle.sum(logits, axis=[1, 2])
...     return loss(logits, label)

>>> RUN_STATIC = eval(os.environ['RUN_STATIC'])
>>> def run_dynamic():
...     for step, (input, label) in enumerate(dist_dataloader()):
...         logits = model(input)
...         loss = loss_fn(logits, label)
...         print("step:{}, loss:{}".format(step, loss))
...         loss.backward()
...         dist_opt.step()
...         dist_opt.clear_grad()

>>> def run_static():
...     dist_model = dist.to_static(
...         model, dist_dataloader, loss_fn, opt
...     )
...     dist_model.train()
...     for step, (input, label) in enumerate(dist_dataloader()):
...         print("label:", label)
...         loss = dist_model(input, label)
...         print("step:{}, loss:{}".format(step, loss))

>>> if RUN_STATIC == 0:
...     run_dynamic()
... else:
...     run_static()

>>> # This case need to be executed in multi-card environment
>>> # export CUDA_VISIBLE_DEVICES=0,1,2,3,4,5,6,7
>>> # RUN_STATIC=1 python -u -m paddle.distributed.launch --gpus "0,1,2,3,4,5,6,7" {test_case}.py
>>> # RUN_STATIC=0 python -u -m paddle.distributed.launch --gpus "0,1,2,3,4,5,6,7" {test_case}.py

使用本API的教程文档