InMemoryDataset¶
- class paddle.distributed. InMemoryDataset [source]
-
- Api_attr
-
Static Graph
It will load data into memory and shuffle data before training.
Examples
>>> import paddle >>> paddle.enable_static() >>> dataset = paddle.distributed.InMemoryDataset()
-
update_settings
(
**kwargs
)
update_settings¶
-
- Api_attr
-
Static Graph
should be called in user’s python scripts to update setings of dataset instance.
- Parameters
-
kwargs – Keyword arguments. Currently, we support following keys in **kwargs, including single node settings and advanced distributed related settings:
batch_size (int) – batch size. It will be effective during training. default is 1.
thread_num (int) – thread num, it is the num of readers. default is 1.
use_var (list) – list of variables. Variables which you will use. default is [].
input_type (int) – the input type of generated input. 0 is for one sample, 1 is for one batch. default is 0.
fs_name (str) – fs name. default is “”.
fs_ugi (str) – fs ugi. default is “”.
pipe_command (str) – pipe command of current dataset. A pipe command is a UNIX pipeline command that can be used only. default is “cat”
download_cmd (str) – customized download command. default is “cat”
data_feed_type (str) – data feed type used in c++ code. default is “MultiSlotInMemoryDataFeed”.
queue_num (int) – Dataset output queue num, training threads get data from queues. default is-1, which is set same as thread number in c++.
merge_size (int) – ins size to merge, if merge_size > 0, set merge by line id, instances of same line id will be merged after shuffle, you should parse line id in data generator. default is -1.
parse_ins_id (bool) – Set if Dataset need to parse ins_id. default is False.
parse_content (bool) – Set if Dataset need to parse content. default is False.
fleet_send_batch_size (int) – Set fleet send batch size in one rpc, default is 1024
fleet_send_sleep_seconds (int) – Set fleet send sleep time, default is 0
fea_eval (bool) – Set if Dataset need to do feature importance evaluation using slots shuffle. default is False.
candidate_size (int) – if fea_eval is set True, set the candidate size used in slots shuffle.
Examples
>>> import paddle >>> paddle.enable_static() >>> dataset = paddle.distributed.InMemoryDataset() >>> dataset.init( ... batch_size=1, ... thread_num=2, ... input_type=1, ... pipe_command="cat", ... use_var=[]) >>> dataset._init_distributed_settings( ... parse_ins_id=True, ... parse_content=True, ... fea_eval=True, ... candidate_size=10000) >>> dataset.update_settings(batch_size=2)
-
init
(
**kwargs
)
init¶
-
- Api_attr
-
Static Graph
should be called only once in user’s python scripts to initialize setings of dataset instance
- Parameters
-
kwargs – Keyword arguments. Currently, we support following keys in **kwargs:
batch_size (int) – batch size. It will be effective during training. default is 1.
thread_num (int) – thread num, it is the num of readers. default is 1.
use_var (list) – list of variables. Variables which you will use. default is [].
input_type (int) – the input type of generated input. 0 is for one sample, 1 is for one batch. default is 0.
fs_name (str) – fs name. default is “”.
fs_ugi (str) – fs ugi. default is “”.
pipe_command (str) – pipe command of current dataset. A pipe command is a UNIX pipeline command that can be used only. default is “cat”
download_cmd (str) – customized download command. default is “cat”
data_feed_type (str) – data feed type used in c++ code. default is “MultiSlotInMemoryDataFeed”.
queue_num (int) – Dataset output queue num, training threads get data from queues. default is -1, which is set same as thread number in c++.
Examples
>>> >>> import paddle >>> import os >>> paddle.enable_static() >>> with open("test_queue_dataset_run_a.txt", "w") as f: ... data = "2 1 2 2 5 4 2 2 7 2 1 3" ... f.write(data) >>> with open("test_queue_dataset_run_b.txt", "w") as f: ... data = "2 1 2 2 5 4 2 2 7 2 1 3" ... f.write(data) >>> slots = ["slot1", "slot2", "slot3", "slot4"] >>> slots_vars = [] >>> for slot in slots: ... var = paddle.static.data( ... name=slot, shape=[None, 1], dtype="int64", lod_level=1) ... slots_vars.append(var) >>> dataset = paddle.distributed.InMemoryDataset() >>> dataset.init( ... batch_size=1, ... thread_num=2, ... input_type=1, ... pipe_command="cat", ... use_var=slots_vars) >>> dataset.set_filelist( ... ["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"]) >>> dataset.load_into_memory() >>> place = paddle.CPUPlace() >>> exe = paddle.static.Executor(place) >>> startup_program = paddle.static.Program() >>> main_program = paddle.static.Program() >>> exe.run(startup_program) >>> exe.train_from_dataset(main_program, dataset) >>> os.remove("./test_queue_dataset_run_a.txt") >>> os.remove("./test_queue_dataset_run_b.txt")
-
set_date
(
date
)
set_date¶
-
- Api_attr
-
Static Graph
Set training date for pull sparse parameters, saving and loading model. Only used in psgpu
- Parameters
-
date (str) – training date(format : YYMMDD). eg.20211111
Examples
>>> import paddle >>> paddle.enable_static() >>> dataset = paddle.distributed.InMemoryDataset() >>> slots = ["slot1", "slot2", "slot3", "slot4"] >>> slots_vars = [] >>> for slot in slots: ... var = paddle.static.data( ... name=slot, shape=[None, 1], dtype="int64", lod_level=1) ... slots_vars.append(var) >>> dataset.init( ... batch_size=1, ... thread_num=2, ... input_type=1, ... pipe_command="cat", ... use_var=slots_vars) >>> dataset.set_date("20211111")
-
load_into_memory
(
is_shuffle=False
)
load_into_memory¶
-
- Api_attr
-
Static Graph
Load data into memory
- Parameters
-
is_shuffle (bool) – whether to use local shuffle, default is False
Examples
>>> >>> import paddle >>> paddle.enable_static() >>> dataset = paddle.distributed.InMemoryDataset() >>> slots = ["slot1", "slot2", "slot3", "slot4"] >>> slots_vars = [] >>> for slot in slots: ... var = paddle.static.data( ... name=slot, shape=[None, 1], dtype="int64", lod_level=1) ... slots_vars.append(var) >>> dataset.init( ... batch_size=1, ... thread_num=2, ... input_type=1, ... pipe_command="cat", ... use_var=slots_vars) >>> filelist = ["a.txt", "b.txt"] >>> dataset.set_filelist(filelist) >>> dataset.load_into_memory()
-
preload_into_memory
(
thread_num=None
)
preload_into_memory¶
-
- Api_attr
-
Static Graph
Load data into memory in async mode
- Parameters
-
thread_num (int) – preload thread num
Examples
>>> >>> import paddle >>> paddle.enable_static() >>> dataset = paddle.distributed.InMemoryDataset() >>> slots = ["slot1", "slot2", "slot3", "slot4"] >>> slots_vars = [] >>> for slot in slots: ... var = paddle.static.data( ... name=slot, shape=[None, 1], dtype="int64", lod_level=1) ... slots_vars.append(var) >>> dataset.init( ... batch_size=1, ... thread_num=2, ... input_type=1, ... pipe_command="cat", ... use_var=slots_vars) >>> filelist = ["a.txt", "b.txt"] >>> dataset.set_filelist(filelist) >>> dataset.preload_into_memory() >>> dataset.wait_preload_done()
-
wait_preload_done
(
)
wait_preload_done¶
-
- Api_attr
-
Static Graph
Wait preload_into_memory done
Examples
>>> >>> import paddle >>> paddle.enable_static() >>> dataset = paddle.distributed.InMemoryDataset() >>> slots = ["slot1", "slot2", "slot3", "slot4"] >>> slots_vars = [] >>> for slot in slots: ... var = paddle.static.data( ... name=slot, shape=[None, 1], dtype="int64", lod_level=1) ... slots_vars.append(var) >>> dataset.init( ... batch_size=1, ... thread_num=2, ... input_type=1, ... pipe_command="cat", ... use_var=slots_vars) >>> filelist = ["a.txt", "b.txt"] >>> dataset.set_filelist(filelist) >>> dataset.preload_into_memory() >>> dataset.wait_preload_done()
-
local_shuffle
(
)
local_shuffle¶
-
- Api_attr
-
Static Graph
Local shuffle
Examples
>>> >>> import paddle >>> paddle.enable_static() >>> dataset = paddle.distributed.InMemoryDataset() >>> slots = ["slot1", "slot2", "slot3", "slot4"] >>> slots_vars = [] >>> for slot in slots: ... var = paddle.static.data( ... name=slot, shape=[None, 1], dtype="int64", lod_level=1) ... slots_vars.append(var) >>> dataset.init( ... batch_size=1, ... thread_num=2, ... input_type=1, ... pipe_command="cat", ... use_var=slots_vars) >>> filelist = ["a.txt", "b.txt"] >>> dataset.set_filelist(filelist) >>> dataset.load_into_memory() >>> dataset.local_shuffle()
-
global_shuffle
(
fleet=None,
thread_num=12
)
global_shuffle¶
-
- Api_attr
-
Static Graph
Global shuffle. Global shuffle can be used only in distributed mode. i.e. multiple processes on single machine or multiple machines training together. If you run in distributed mode, you should pass fleet instead of None.
Examples
>>> >>> import paddle >>> paddle.enable_static() >>> dataset = paddle.distributed.InMemoryDataset() >>> slots = ["slot1", "slot2", "slot3", "slot4"] >>> slots_vars = [] >>> for slot in slots: ... var = paddle.static.data( ... name=slot, shape=[None, 1], dtype="int64", lod_level=1) ... slots_vars.append(var) >>> dataset.init( ... batch_size=1, ... thread_num=2, ... input_type=1, ... pipe_command="cat", ... use_var=slots_vars) >>> filelist = ["a.txt", "b.txt"] >>> dataset.set_filelist(filelist) >>> dataset.load_into_memory() >>> dataset.global_shuffle()
- Parameters
-
fleet (Fleet) – fleet singleton. Default None.
thread_num (int) – shuffle thread num. Default is 12.
-
release_memory
(
)
release_memory¶
-
- Api_attr
-
Static Graph
Release InMemoryDataset memory data, when data will not be used again.
Examples
>>> >>> import paddle >>> paddle.enable_static() >>> dataset = paddle.distributed.InMemoryDataset() >>> slots = ["slot1", "slot2", "slot3", "slot4"] >>> slots_vars = [] >>> for slot in slots: ... var = paddle.static.data( ... name=slot, shape=[None, 1], dtype="int64", lod_level=1) ... slots_vars.append(var) >>> dataset.init( ... batch_size=1, ... thread_num=2, ... input_type=1, ... pipe_command="cat", ... use_var=slots_vars) >>> filelist = ["a.txt", "b.txt"] >>> dataset.set_filelist(filelist) >>> dataset.load_into_memory() >>> dataset.global_shuffle() >>> exe = paddle.static.Executor(paddle.CPUPlace()) >>> startup_program = paddle.static.Program() >>> main_program = paddle.static.Program() >>> exe.run(startup_program) >>> exe.train_from_dataset(main_program, dataset) >>> dataset.release_memory()
-
get_memory_data_size
(
fleet=None
)
get_memory_data_size¶
-
- Api_attr
-
Static Graph
Get memory data size, user can call this function to know the num of ins in all workers after load into memory.
Note
This function may cause bad performance, because it has barrier
- Parameters
-
fleet (Fleet) – Fleet Object.
- Returns
-
The size of memory data.
Examples
>>> >>> import paddle >>> paddle.enable_static() >>> dataset = paddle.distributed.InMemoryDataset() >>> slots = ["slot1", "slot2", "slot3", "slot4"] >>> slots_vars = [] >>> for slot in slots: ... var = paddle.static.data( ... name=slot, shape=[None, 1], dtype="int64", lod_level=1) ... slots_vars.append(var) >>> dataset.init( ... batch_size=1, ... thread_num=2, ... input_type=1, ... pipe_command="cat", ... use_var=slots_vars) >>> filelist = ["a.txt", "b.txt"] >>> dataset.set_filelist(filelist) >>> dataset.load_into_memory() >>> print(dataset.get_memory_data_size())
-
get_shuffle_data_size
(
fleet=None
)
get_shuffle_data_size¶
-
- Api_attr
-
Static Graph
Get shuffle data size, user can call this function to know the num of ins in all workers after local/global shuffle.
Note
This function may cause bad performance to local shuffle, because it has barrier. It does not affect global shuffle.
- Parameters
-
fleet (Fleet) – Fleet Object.
- Returns
-
The size of shuffle data.
Examples
>>> >>> import paddle >>> paddle.enable_static() >>> dataset = paddle.distributed.InMemoryDataset() >>> dataset = paddle.distributed.InMemoryDataset() >>> slots = ["slot1", "slot2", "slot3", "slot4"] >>> slots_vars = [] >>> for slot in slots: ... var = paddle.static.data( ... name=slot, shape=[None, 1], dtype="int64", lod_level=1) ... slots_vars.append(var) >>> dataset.init( ... batch_size=1, ... thread_num=2, ... input_type=1, ... pipe_command="cat", ... use_var=slots_vars) >>> filelist = ["a.txt", "b.txt"] >>> dataset.set_filelist(filelist) >>> dataset.load_into_memory() >>> dataset.global_shuffle() >>> print(dataset.get_shuffle_data_size())
-
slots_shuffle
(
slots
)
slots_shuffle¶
-
Slots Shuffle Slots Shuffle is a shuffle method in slots level, which is usually used in sparse feature with large scale of instances. To compare the metric, i.e. auc while doing slots shuffle on one or several slots with baseline to evaluate the importance level of slots(features).
- Parameters
-
slots (list[string]) – the set of slots(string) to do slots shuffle.
Examples
>>> >>> import paddle >>> paddle.enable_static() >>> dataset = paddle.distributed.InMemoryDataset() >>> dataset._init_distributed_settings(fea_eval=True) >>> slots = ["slot1", "slot2", "slot3", "slot4"] >>> slots_vars = [] >>> for slot in slots: ... var = paddle.static.data( ... name=slot, shape=[None, 1], dtype="int64", lod_level=1) ... slots_vars.append(var) >>> dataset.init( ... batch_size=1, ... thread_num=2, ... input_type=1, ... pipe_command="cat", ... use_var=slots_vars) >>> filelist = ["a.txt", "b.txt"] >>> dataset.set_filelist(filelist) >>> dataset.load_into_memory() >>> dataset.slots_shuffle(['slot1'])
-
set_filelist
(
filelist
)
set_filelist¶
-
Set file list in current worker. The filelist is indicated by a list of file names (string).
Examples
>>> import paddle >>> dataset = paddle.distributed.fleet.DatasetBase() >>> dataset.set_filelist(['a.txt', 'b.txt'])
- Parameters
-
filelist (list[str]) – list of file names of inputs.