InMemoryDataset

class paddle.distributed. InMemoryDataset [source]
Api_attr

Static Graph

It will load data into memory and shuffle data before training.

Examples

>>> import paddle
>>> paddle.enable_static()
>>> dataset = paddle.distributed.InMemoryDataset()
update_settings ( **kwargs )

update_settings

Api_attr

Static Graph

should be called in user’s python scripts to update settings of dataset instance.

Parameters
  • kwargs – Keyword arguments. Currently, we support following keys in **kwargs, including single node settings and advanced distributed related settings:

  • batch_size (int) – batch size. It will be effective during training. default is 1.

  • thread_num (int) – thread num, it is the num of readers. default is 1.

  • use_var (list) – list of variables. Variables which you will use. default is [].

  • input_type (int) – the input type of generated input. 0 is for one sample, 1 is for one batch. default is 0.

  • fs_name (str) – fs name. default is “”.

  • fs_ugi (str) – fs ugi. default is “”.

  • pipe_command (str) – pipe command of current dataset. A pipe command is a UNIX pipeline command that can be used only. default is “cat”

  • download_cmd (str) – customized download command. default is “cat”

  • data_feed_type (str) – data feed type used in c++ code. default is “MultiSlotInMemoryDataFeed”.

  • queue_num (int) – Dataset output queue num, training threads get data from queues. default is-1, which is set same as thread number in c++.

  • merge_size (int) – ins size to merge, if merge_size > 0, set merge by line id, instances of same line id will be merged after shuffle, you should parse line id in data generator. default is -1.

  • parse_ins_id (bool) – Set if Dataset need to parse ins_id. default is False.

  • parse_content (bool) – Set if Dataset need to parse content. default is False.

  • fleet_send_batch_size (int) – Set fleet send batch size in one rpc, default is 1024

  • fleet_send_sleep_seconds (int) – Set fleet send sleep time, default is 0

  • fea_eval (bool) – Set if Dataset need to do feature importance evaluation using slots shuffle. default is False.

  • candidate_size (int) – if fea_eval is set True, set the candidate size used in slots shuffle.

Examples

>>> import paddle
>>> paddle.enable_static()

>>> dataset = paddle.distributed.InMemoryDataset()
>>> dataset.init(
...     batch_size=1,
...     thread_num=2,
...     input_type=1,
...     pipe_command="cat",
...     use_var=[])
>>> dataset._init_distributed_settings(
...     parse_ins_id=True,
...     parse_content=True,
...     fea_eval=True,
...     candidate_size=10000)
>>> dataset.update_settings(batch_size=2)
init ( **kwargs )

init

Api_attr

Static Graph

should be called only once in user’s python scripts to initialize settings of dataset instance

Parameters
  • kwargs – Keyword arguments. Currently, we support following keys in **kwargs:

  • batch_size (int) – batch size. It will be effective during training. default is 1.

  • thread_num (int) – thread num, it is the num of readers. default is 1.

  • use_var (list) – list of variables. Variables which you will use. default is [].

  • input_type (int) – the input type of generated input. 0 is for one sample, 1 is for one batch. default is 0.

  • fs_name (str) – fs name. default is “”.

  • fs_ugi (str) – fs ugi. default is “”.

  • pipe_command (str) – pipe command of current dataset. A pipe command is a UNIX pipeline command that can be used only. default is “cat”

  • download_cmd (str) – customized download command. default is “cat”

  • data_feed_type (str) – data feed type used in c++ code. default is “MultiSlotInMemoryDataFeed”.

  • queue_num (int) – Dataset output queue num, training threads get data from queues. default is -1, which is set same as thread number in c++.

Examples

>>> 
>>> import paddle
>>> import os
>>> paddle.enable_static()

>>> with open("test_queue_dataset_run_a.txt", "w") as f:
...     data = "2 1 2 2 5 4 2 2 7 2 1 3"
...     f.write(data)
>>> with open("test_queue_dataset_run_b.txt", "w") as f:
...     data = "2 1 2 2 5 4 2 2 7 2 1 3"
...     f.write(data)
>>> slots = ["slot1", "slot2", "slot3", "slot4"]
>>> slots_vars = []
>>> for slot in slots:
...     var = paddle.static.data(
...         name=slot, shape=[None, 1], dtype="int64", lod_level=1)
...     slots_vars.append(var)
>>> dataset = paddle.distributed.InMemoryDataset()
>>> dataset.init(
...     batch_size=1,
...     thread_num=2,
...     input_type=1,
...     pipe_command="cat",
...     use_var=slots_vars)
>>> dataset.set_filelist(
...     ["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"])
>>> dataset.load_into_memory()

>>> place = paddle.CPUPlace()
>>> exe = paddle.static.Executor(place)
>>> startup_program = paddle.static.Program()
>>> main_program = paddle.static.Program()
>>> exe.run(startup_program)

>>> exe.train_from_dataset(main_program, dataset)

>>> os.remove("./test_queue_dataset_run_a.txt")
>>> os.remove("./test_queue_dataset_run_b.txt")
set_date ( date )

set_date

Api_attr

Static Graph

Set training date for pull sparse parameters, saving and loading model. Only used in psgpu

Parameters

date (str) – training date(format : YYMMDD). eg.20211111

Examples

>>> import paddle
>>> paddle.enable_static()

>>> dataset = paddle.distributed.InMemoryDataset()
>>> slots = ["slot1", "slot2", "slot3", "slot4"]
>>> slots_vars = []
>>> for slot in slots:
...     var = paddle.static.data(
...         name=slot, shape=[None, 1], dtype="int64", lod_level=1)
...     slots_vars.append(var)
>>> dataset.init(
...     batch_size=1,
...     thread_num=2,
...     input_type=1,
...     pipe_command="cat",
...     use_var=slots_vars)
>>> dataset.set_date("20211111")
load_into_memory ( is_shuffle=False )

load_into_memory

Api_attr

Static Graph

Load data into memory

Parameters

is_shuffle (bool) – whether to use local shuffle, default is False

Examples

>>> 
>>> import paddle
>>> paddle.enable_static()

>>> dataset = paddle.distributed.InMemoryDataset()
>>> slots = ["slot1", "slot2", "slot3", "slot4"]
>>> slots_vars = []
>>> for slot in slots:
...     var = paddle.static.data(
...         name=slot, shape=[None, 1], dtype="int64", lod_level=1)
...     slots_vars.append(var)
>>> dataset.init(
...     batch_size=1,
...     thread_num=2,
...     input_type=1,
...     pipe_command="cat",
...     use_var=slots_vars)
>>> filelist = ["a.txt", "b.txt"]
>>> dataset.set_filelist(filelist)
>>> dataset.load_into_memory()
preload_into_memory ( thread_num=None )

preload_into_memory

Api_attr

Static Graph

Load data into memory in async mode

Parameters

thread_num (int) – preload thread num

Examples

>>> 
>>> import paddle
>>> paddle.enable_static()

>>> dataset = paddle.distributed.InMemoryDataset()
>>> slots = ["slot1", "slot2", "slot3", "slot4"]
>>> slots_vars = []
>>> for slot in slots:
...     var = paddle.static.data(
...         name=slot, shape=[None, 1], dtype="int64", lod_level=1)
...     slots_vars.append(var)
>>> dataset.init(
...     batch_size=1,
...     thread_num=2,
...     input_type=1,
...     pipe_command="cat",
...     use_var=slots_vars)
>>> filelist = ["a.txt", "b.txt"]
>>> dataset.set_filelist(filelist)
>>> dataset.preload_into_memory()
>>> dataset.wait_preload_done()
wait_preload_done ( )

wait_preload_done

Api_attr

Static Graph

Wait preload_into_memory done

Examples

>>> 
>>> import paddle
>>> paddle.enable_static()

>>> dataset = paddle.distributed.InMemoryDataset()
>>> slots = ["slot1", "slot2", "slot3", "slot4"]
>>> slots_vars = []
>>> for slot in slots:
...     var = paddle.static.data(
...         name=slot, shape=[None, 1], dtype="int64", lod_level=1)
...     slots_vars.append(var)
>>> dataset.init(
...     batch_size=1,
...     thread_num=2,
...     input_type=1,
...     pipe_command="cat",
...     use_var=slots_vars)
>>> filelist = ["a.txt", "b.txt"]
>>> dataset.set_filelist(filelist)
>>> dataset.preload_into_memory()
>>> dataset.wait_preload_done()
local_shuffle ( )

local_shuffle

Api_attr

Static Graph

Local shuffle

Examples

>>> 
>>> import paddle
>>> paddle.enable_static()

>>> dataset = paddle.distributed.InMemoryDataset()
>>> slots = ["slot1", "slot2", "slot3", "slot4"]
>>> slots_vars = []
>>> for slot in slots:
...     var = paddle.static.data(
...         name=slot, shape=[None, 1], dtype="int64", lod_level=1)
...     slots_vars.append(var)
>>> dataset.init(
...     batch_size=1,
...     thread_num=2,
...     input_type=1,
...     pipe_command="cat",
...     use_var=slots_vars)
>>> filelist = ["a.txt", "b.txt"]
>>> dataset.set_filelist(filelist)
>>> dataset.load_into_memory()
>>> dataset.local_shuffle()
global_shuffle ( fleet=None, thread_num=12 )

global_shuffle

Api_attr

Static Graph

Global shuffle. Global shuffle can be used only in distributed mode. i.e. multiple processes on single machine or multiple machines training together. If you run in distributed mode, you should pass fleet instead of None.

Examples

>>> 
>>> import paddle
>>> paddle.enable_static()

>>> dataset = paddle.distributed.InMemoryDataset()
>>> slots = ["slot1", "slot2", "slot3", "slot4"]
>>> slots_vars = []
>>> for slot in slots:
...     var = paddle.static.data(
...         name=slot, shape=[None, 1], dtype="int64", lod_level=1)
...     slots_vars.append(var)
>>> dataset.init(
...     batch_size=1,
...     thread_num=2,
...     input_type=1,
...     pipe_command="cat",
...     use_var=slots_vars)
>>> filelist = ["a.txt", "b.txt"]
>>> dataset.set_filelist(filelist)
>>> dataset.load_into_memory()
>>> dataset.global_shuffle()
Parameters
  • fleet (Fleet) – fleet singleton. Default None.

  • thread_num (int) – shuffle thread num. Default is 12.

release_memory ( )

release_memory

Api_attr

Static Graph

Release InMemoryDataset memory data, when data will not be used again.

Examples

>>> 
>>> import paddle
>>> paddle.enable_static()

>>> dataset = paddle.distributed.InMemoryDataset()
>>> slots = ["slot1", "slot2", "slot3", "slot4"]
>>> slots_vars = []
>>> for slot in slots:
...     var = paddle.static.data(
...         name=slot, shape=[None, 1], dtype="int64", lod_level=1)
...     slots_vars.append(var)
>>> dataset.init(
...     batch_size=1,
...     thread_num=2,
...     input_type=1,
...     pipe_command="cat",
...     use_var=slots_vars)
>>> filelist = ["a.txt", "b.txt"]
>>> dataset.set_filelist(filelist)
>>> dataset.load_into_memory()
>>> dataset.global_shuffle()
>>> exe = paddle.static.Executor(paddle.CPUPlace())
>>> startup_program = paddle.static.Program()
>>> main_program = paddle.static.Program()
>>> exe.run(startup_program)
>>> exe.train_from_dataset(main_program, dataset)
>>> dataset.release_memory()
get_memory_data_size ( fleet=None )

get_memory_data_size

Api_attr

Static Graph

Get memory data size, user can call this function to know the num of ins in all workers after load into memory.

Note

This function may cause bad performance, because it has barrier

Parameters

fleet (Fleet) – Fleet Object.

Returns

The size of memory data.

Examples

>>> 
>>> import paddle
>>> paddle.enable_static()

>>> dataset = paddle.distributed.InMemoryDataset()
>>> slots = ["slot1", "slot2", "slot3", "slot4"]
>>> slots_vars = []
>>> for slot in slots:
...     var = paddle.static.data(
...         name=slot, shape=[None, 1], dtype="int64", lod_level=1)
...     slots_vars.append(var)

>>> dataset.init(
...     batch_size=1,
...     thread_num=2,
...     input_type=1,
...     pipe_command="cat",
...     use_var=slots_vars)

>>> filelist = ["a.txt", "b.txt"]
>>> dataset.set_filelist(filelist)
>>> dataset.load_into_memory()
>>> print(dataset.get_memory_data_size())
get_shuffle_data_size ( fleet=None )

get_shuffle_data_size

Api_attr

Static Graph

Get shuffle data size, user can call this function to know the num of ins in all workers after local/global shuffle.

Note

This function may cause bad performance to local shuffle, because it has barrier. It does not affect global shuffle.

Parameters

fleet (Fleet) – Fleet Object.

Returns

The size of shuffle data.

Examples

>>> 
>>> import paddle
>>> paddle.enable_static()

>>> dataset = paddle.distributed.InMemoryDataset()
>>> dataset = paddle.distributed.InMemoryDataset()
>>> slots = ["slot1", "slot2", "slot3", "slot4"]
>>> slots_vars = []
>>> for slot in slots:
...     var = paddle.static.data(
...         name=slot, shape=[None, 1], dtype="int64", lod_level=1)
...     slots_vars.append(var)

>>> dataset.init(
...     batch_size=1,
...     thread_num=2,
...     input_type=1,
...     pipe_command="cat",
...     use_var=slots_vars)

>>> filelist = ["a.txt", "b.txt"]
>>> dataset.set_filelist(filelist)
>>> dataset.load_into_memory()
>>> dataset.global_shuffle()
>>> print(dataset.get_shuffle_data_size())
slots_shuffle ( slots )

slots_shuffle

Slots Shuffle Slots Shuffle is a shuffle method in slots level, which is usually used in sparse feature with large scale of instances. To compare the metric, i.e. auc while doing slots shuffle on one or several slots with baseline to evaluate the importance level of slots(features).

Parameters

slots (list[string]) – the set of slots(string) to do slots shuffle.

Examples

>>> 
>>> import paddle
>>> paddle.enable_static()

>>> dataset = paddle.distributed.InMemoryDataset()
>>> dataset._init_distributed_settings(fea_eval=True)
>>> slots = ["slot1", "slot2", "slot3", "slot4"]
>>> slots_vars = []
>>> for slot in slots:
...     var = paddle.static.data(
...         name=slot, shape=[None, 1], dtype="int64", lod_level=1)
...     slots_vars.append(var)
>>> dataset.init(
...     batch_size=1,
...     thread_num=2,
...     input_type=1,
...     pipe_command="cat",
...     use_var=slots_vars)
>>> filelist = ["a.txt", "b.txt"]
>>> dataset.set_filelist(filelist)
>>> dataset.load_into_memory()
>>> dataset.slots_shuffle(['slot1'])
set_filelist ( filelist )

set_filelist

Set file list in current worker. The filelist is indicated by a list of file names (string).

Examples

>>> import paddle
>>> dataset = paddle.distributed.fleet.DatasetBase()
>>> dataset.set_filelist(['a.txt', 'b.txt'])
Parameters

filelist (list[str]) – list of file names of inputs.