py_reader

paddle.fluid.layers.io. py_reader ( capacity, shapes, dtypes, lod_levels=None, name=None, use_double_buffer=True ) [source]
api_attr

Static Graph

Create a Python reader for data feeding in Python

This operator returns a Reader Variable. The Reader provides decorate_paddle_reader() and decorate_tensor_provider() to set a Python generator as the data source and feed the data from the data source to the Reader Variable. When Executor::Run() is invoked in C++ side, the data from the generator would be read automatically. Unlike DataFeeder.feed(), the data reading process and Executor::Run() process can run in parallel using py_reader. The start() method of the Reader should be called when each pass begins, while the reset() method should be called when the pass ends and fluid.core.EOFException raises.

Note

Program.clone() method cannot clone py_reader. You can refer to api_fluid_Program for more details.

The read_file call needs to be in the program block of py_reader. You can refer to api_fluid_layers_read_file for more details.

Parameters
  • capacity (int) – The buffer capacity maintained by py_reader.

  • shapes (list|tuple) – List of tuples which declaring data shapes. shapes[i] represents the i-th data shape.

  • dtypes (list|tuple) – List of strings which declaring data type. Supported dtype: bool, float16, float32, float64, int8, int16, int32, int64, uint8.

  • lod_levels (list|tuple) – List of ints which declaring data lod_level.

  • name (basestring) – The default value is None. Normally there is no need for user to set this property. For more information, please refer to Name.

  • use_double_buffer (bool) – Whether use double buffer or not. The double buffer is for pre-reading the data of the next batch and copy the data asynchronously from CPU to GPU. Default is True.

Returns

A Reader from which we can get feeding data.

Return Type:

Variable

Examples

  1. The basic usage of py_reader is as follows:

import paddle
import paddle.fluid as fluid
import paddle.dataset.mnist as mnist

def network(image, label):
    # user defined network, here a softmax regession example
    predict = fluid.layers.fc(input=image, size=10, act='softmax')
    return fluid.layers.cross_entropy(input=predict, label=label)

reader = fluid.layers.py_reader(capacity=64,
                                shapes=[(-1, 1, 28, 28), (-1, 1)],
                                dtypes=['float32', 'int64'])
reader.decorate_paddle_reader(
    paddle.reader.shuffle(paddle.batch(mnist.train(), batch_size=5),
                          buf_size=1000))

img, label = fluid.layers.read_file(reader)
loss = network(img, label)

fluid.Executor(fluid.CUDAPlace(0)).run(fluid.default_startup_program())
exe = fluid.ParallelExecutor(use_cuda=True)
for epoch_id in range(10):
    reader.start()
    try:
        while True:
            exe.run(fetch_list=[loss.name])
    except fluid.core.EOFException:
        reader.reset()

fluid.io.save_inference_model(dirname='./model',
                              feeded_var_names=[img.name, label.name],
                              target_vars=[loss],
                              executor=fluid.Executor(fluid.CUDAPlace(0)))

2. When training and testing are both performed, two different py_reader should be created with different names, e.g.:

import paddle
import paddle.fluid as fluid
import paddle.dataset.mnist as mnist

def network(reader):
    img, label = fluid.layers.read_file(reader)
    # User defined network. Here a simple regression as example
    predict = fluid.layers.fc(input=img, size=10, act='softmax')
    loss = fluid.layers.cross_entropy(input=predict, label=label)
    return fluid.layers.mean(loss)

# Create train_main_prog and train_startup_prog
train_main_prog = fluid.Program()
train_startup_prog = fluid.Program()
with fluid.program_guard(train_main_prog, train_startup_prog):
    # Use fluid.unique_name.guard() to share parameters with test program
    with fluid.unique_name.guard():
        train_reader = fluid.layers.py_reader(capacity=64,
                                              shapes=[(-1, 1, 28, 28),
                                                      (-1, 1)],
                                              dtypes=['float32', 'int64'],
                                              name='train_reader')
        train_reader.decorate_paddle_reader(
            paddle.reader.shuffle(paddle.batch(mnist.train(), batch_size=5),
                                  buf_size=500))
        train_loss = network(train_reader)  # some network definition
        adam = fluid.optimizer.Adam(learning_rate=0.01)
        adam.minimize(train_loss)

# Create test_main_prog and test_startup_prog
test_main_prog = fluid.Program()
test_startup_prog = fluid.Program()
with fluid.program_guard(test_main_prog, test_startup_prog):
    # Use fluid.unique_name.guard() to share parameters with train program
    with fluid.unique_name.guard():
        test_reader = fluid.layers.py_reader(capacity=32,
                                             shapes=[(-1, 1, 28, 28), (-1, 1)],
                                             dtypes=['float32', 'int64'],
                                             name='test_reader')
        test_reader.decorate_paddle_reader(paddle.batch(mnist.test(), 512))
        test_loss = network(test_reader)

fluid.Executor(fluid.CUDAPlace(0)).run(train_startup_prog)
fluid.Executor(fluid.CUDAPlace(0)).run(test_startup_prog)

train_exe = fluid.ParallelExecutor(use_cuda=True,
                                   loss_name=train_loss.name,
                                   main_program=train_main_prog)
test_exe = fluid.ParallelExecutor(use_cuda=True,
                                  loss_name=test_loss.name,
                                  main_program=test_main_prog)
for epoch_id in range(10):
    train_reader.start()
    try:
        while True:
           train_exe.run(fetch_list=[train_loss.name])
    except fluid.core.EOFException:
        train_reader.reset()

test_reader.start()
try:
    while True:
        test_exe.run(fetch_list=[test_loss.name])
except fluid.core.EOFException:
    test_reader.reset()