IterableDataset

class paddle.io. IterableDataset [source]

An abstract class to encapsulate methods and behaviors of iterable datasets.

All datasets in iterable-style (can only get sample one by one sequentially, like a Python iterator) should be a subclass of IterableDataset . All subclasses should implement following methods:

__iter__: yield sample sequentially. This method is required by reading dataset sample in DataLoader .

Note

do not implement __getitem__ and __len__ in IterableDataset, should not be called either.

see DataLoader .

Examples

>>> import numpy as np
>>> from paddle.io import IterableDataset

>>> # define a random dataset
>>> class RandomDataset(IterableDataset):
...     def __init__(self, num_samples):
...         self.num_samples = num_samples
...
...     def __iter__(self):
...         for i in range(self.num_samples):
...             image = np.random.random([784]).astype('float32')
...             label = np.random.randint(0, 9, (1, )).astype('int64')
...             yield image, label
...
>>> dataset = RandomDataset(10)
>>> for img, label in dataset:
...     # do something
...     ...

When num_workers > 0, each worker has a different copy of the dataset object and will yield whole dataset samples, which means samples in dataset will be repeated in num_workers times. If it is required for each sample to yield only once, there are two methods to configure different copy in each worker process to avoid duplicate data among workers as follows. In both the methods, worker information that can be getted in a worker process by paddle.io.get_worker_info will be needed.

splitting data copy in each worker in __iter__

>>> import math
>>> import paddle
>>> import numpy as np
>>> from paddle.io import IterableDataset, DataLoader, get_worker_info

>>> class SplitedIterableDataset(IterableDataset):
...     def __init__(self, start, end):
...         self.start = start
...         self.end = end
...
...     def __iter__(self):
...         worker_info = get_worker_info()
...         if worker_info is None:
...             iter_start = self.start
...             iter_end = self.end
...         else:
...             per_worker = int(
...                 math.ceil((self.end - self.start) / float(
...                     worker_info.num_workers)))
...             worker_id = worker_info.id
...             iter_start = self.start + worker_id * per_worker
...             iter_end = min(iter_start + per_worker, self.end)
...
...         for i in range(iter_start, iter_end):
...             yield np.array([i])
...
>>> dataset = SplitedIterableDataset(start=2, end=9)
>>> dataloader = DataLoader(
...     dataset,
...     num_workers=2,
...     batch_size=1,
...     drop_last=True)
...
>>> for data in dataloader:
...     print(data) 
Tensor(shape=[1, 1], dtype=int64, place=Place(cpu), stop_gradient=True,
    [[2]])
Tensor(shape=[1, 1], dtype=int64, place=Place(cpu), stop_gradient=True,
    [[3]])
Tensor(shape=[1, 1], dtype=int64, place=Place(cpu), stop_gradient=True,
    [[4]])
Tensor(shape=[1, 1], dtype=int64, place=Place(cpu), stop_gradient=True,
    [[5]])
Tensor(shape=[1, 1], dtype=int64, place=Place(cpu), stop_gradient=True,
    [[6]])
Tensor(shape=[1, 1], dtype=int64, place=Place(cpu), stop_gradient=True,
    [[7]])
Tensor(shape=[1, 1], dtype=int64, place=Place(cpu), stop_gradient=True,
    [[8]])

splitting data copy in each worker by worker_init_fn

>>> import math
>>> import paddle
>>> import numpy as np
>>> from paddle.io import IterableDataset, DataLoader, get_worker_info

>>> class RangeIterableDataset(IterableDataset):
...     def __init__(self, start, end):
...         self.start = start
...         self.end = end
...
...     def __iter__(self):
...         for i in range(self.start, self.end):
...             yield np.array([i])
...
>>> dataset = RangeIterableDataset(start=2, end=9)

>>> def worker_init_fn(worker_id):
...     worker_info = get_worker_info()
...
...     dataset = worker_info.dataset
...     start = dataset.start
...     end = dataset.end
...     num_per_worker = int(
...         math.ceil((end - start) / float(worker_info.num_workers)))
...
...     worker_id = worker_info.id
...     dataset.start = start + worker_id * num_per_worker
...     dataset.end = min(dataset.start + num_per_worker, end)
...
>>> dataloader = DataLoader(
...     dataset,
...     num_workers=2,
...     batch_size=1,
...     drop_last=True,
...     worker_init_fn=worker_init_fn)
...
>>> for data in dataloader:
...     print(data) 
Tensor(shape=[1, 1], dtype=int64, place=Place(cpu), stop_gradient=True,
    [[2]])
Tensor(shape=[1, 1], dtype=int64, place=Place(cpu), stop_gradient=True,
    [[3]])
Tensor(shape=[1, 1], dtype=int64, place=Place(cpu), stop_gradient=True,
    [[4]])
Tensor(shape=[1, 1], dtype=int64, place=Place(cpu), stop_gradient=True,
    [[5]])
Tensor(shape=[1, 1], dtype=int64, place=Place(cpu), stop_gradient=True,
    [[6]])
Tensor(shape=[1, 1], dtype=int64, place=Place(cpu), stop_gradient=True,
    [[7]])
Tensor(shape=[1, 1], dtype=int64, place=Place(cpu), stop_gradient=True,
    [[8]])