InMemoryDataset¶
InMemoryDataset,它将数据加载到内存中,并在训练前随机整理数据。
代码示例¶
>>> import paddle
>>> paddle.enable_static()
>>> dataset = paddle.distributed.InMemoryDataset()
方法¶
init(**kwargs)¶
注意:
1. 该 API 只在非 Dygraph 模式下生效
对 InMemoryDataset 的实例进行配置初始化。
参数
kwargs - 可选的关键字参数,由调用者提供,目前支持以下关键字配置。
batch_size (int) - batch size 的大小。默认值为 1。
thread_num (int) - 用于训练的线程数,默认值为 1。
use_var (list) - 用于输入的 variable 列表,默认值为[]。
input_type (int) - 输入到模型训练样本的类型。0 代表一条样本,1 代表一个 batch。默认值为 0。
fs_name (str) - hdfs 名称。默认值为""。
fs_ugi (str) - hdfs 的 ugi。默认值为""。
pipe_command (str) - 在当前的
dataset
中设置的 pipe 命令用于数据的预处理。pipe 命令只能使用 UNIX 的 pipe 命令,默认为"cat"。download_cmd (str) - 数据下载 pipe 命令。pipe 命令只能使用 UNIX 的 pipe 命令,默认为"cat"。
返回 None。
代码示例
>>> import paddle
>>> import os
>>> paddle.enable_static()
>>> with open("test_queue_dataset_run_a.txt", "w") as f:
... data = "2 1 2 2 5 4 2 2 7 2 1 3"
... f.write(data)
>>> with open("test_queue_dataset_run_b.txt", "w") as f:
... data = "2 1 2 2 5 4 2 2 7 2 1 3"
... f.write(data)
>>> slots = ["slot1", "slot2", "slot3", "slot4"]
>>> slots_vars = []
>>> for slot in slots:
... var = paddle.static.data(
... name=slot, shape=[None, 1], dtype="int64", lod_level=1)
... slots_vars.append(var)
>>> dataset = paddle.distributed.InMemoryDataset()
>>> dataset.init(
... batch_size=1,
... thread_num=2,
... input_type=1,
... pipe_command="cat",
... use_var=slots_vars)
>>> dataset.set_filelist(
... ["test_queue_dataset_run_a.txt", "test_queue_dataset_run_b.txt"])
>>> dataset.load_into_memory()
>>> place = paddle.CPUPlace()
>>> exe = paddle.static.Executor(place)
>>> startup_program = paddle.static.Program()
>>> main_program = paddle.static.Program()
>>> exe.run(startup_program)
>>> exe.train_from_dataset(main_program, dataset)
>>> os.remove("./test_queue_dataset_run_a.txt")
>>> os.remove("./test_queue_dataset_run_b.txt")
update_settings(**kwargs)¶
注意:
1. 该 API 只在非 Dygraph 模式下生效
对 InMemoryDataset 的实例通过 init 和_init_distributed_settings 初始化的配置进行更新。
参数
kwargs - 可选的关键字参数,由调用者提供,目前支持以下关键字配置。
batch_size (int) - batch size 的大小。默认值为 1。
thread_num (int) - 用于训练的线程数,默认值为 1。
use_var (list) - 用于输入的 variable 列表,默认值为[]。
input_type (int) - 输入到模型训练样本的类型。0 代表一条样本,1 代表一个 batch。默认值为 0。
fs_name (str) - hdfs 名称。默认值为""。
fs_ugi (str) - hdfs 的 ugi。默认值为""。
pipe_command (str) - 在当前的
dataset
中设置的 pipe 命令用于数据的预处理。pipe 命令只能使用 UNIX 的 pipe 命令,默认为"cat"。download_cmd (str) - 数据下载 pipe 命令。pipe 命令只能使用 UNIX 的 pipe 命令,默认为"cat"。
merge_size (int) - 通过样本 id 来设置合并,相同 id 的样本将会在 shuffle 之后进行合并,你应该在一个 data 生成器里面解析样本 id。merge_size 表示合并的最小数量,默认值为-1,表示不做合并。
parse_ins_id (bool) - 是否需要解析每条样的 id,默认值为 False。
parse_content (bool) 是否需要解析每条样本的 content,默认值为 False。
fleet_send_batch_size (int) - 设置发送 batch 的大小,默认值为 1024。
fleet_send_sleep_seconds (int) - 设置发送 batch 后的睡眠时间,默认值为 0。
fea_eval (bool) - 设置特征打乱特征验证模式,来修正特征级别的重要性,特征打乱需要
fea_eval
被设置为 True。默认值为 False。candidate_size (int) - 特征打乱特征验证模式下,用于随机化特征的候选池大小。默认值为 10000。
返回 None。
代码示例
>>> import paddle
>>> paddle.enable_static()
>>> dataset = paddle.distributed.InMemoryDataset()
>>> dataset.init(
... batch_size=1,
... thread_num=2,
... input_type=1,
... pipe_command="cat",
... use_var=[])
>>> dataset._init_distributed_settings(
... parse_ins_id=True,
... parse_content=True,
... fea_eval=True,
... candidate_size=10000)
>>> dataset.update_settings(batch_size=2)
load_into_memory()¶
注意:
1. 该 API 只在非 Dygraph 模式下生效
向内存中加载数据。
代码示例
>>> import paddle
>>> paddle.enable_static()
>>> dataset = paddle.distributed.InMemoryDataset()
>>> slots = ["slot1", "slot2", "slot3", "slot4"]
>>> slots_vars = []
>>> for slot in slots:
... var = paddle.static.data(
... name=slot, shape=[None, 1], dtype="int64", lod_level=1)
... slots_vars.append(var)
>>> dataset.init(
... batch_size=1,
... thread_num=2,
... input_type=1,
... pipe_command="cat",
... use_var=slots_vars)
>>> filelist = ["a.txt", "b.txt"]
>>> dataset.set_filelist(filelist)
>>> dataset.load_into_memory()
preload_into_memory(thread_num=None)¶
向内存中以异步模式加载数据。
参数
thread_num (int) - 异步加载数据时的线程数。
代码示例
>>> import paddle
>>> paddle.enable_static()
>>> dataset = paddle.distributed.InMemoryDataset()
>>> slots = ["slot1", "slot2", "slot3", "slot4"]
>>> slots_vars = []
>>> for slot in slots:
... var = paddle.static.data(
... name=slot, shape=[None, 1], dtype="int64", lod_level=1)
... slots_vars.append(var)
>>> dataset.init(
... batch_size=1,
... thread_num=2,
... input_type=1,
... pipe_command="cat",
... use_var=slots_vars)
>>> filelist = ["a.txt", "b.txt"]
>>> dataset.set_filelist(filelist)
>>> dataset.preload_into_memory()
>>> dataset.wait_preload_done()
wait_preload_done()¶
等待 preload_into_memory
完成。
代码示例
>>> import paddle
>>> paddle.enable_static()
>>> dataset = paddle.distributed.InMemoryDataset()
>>> slots = ["slot1", "slot2", "slot3", "slot4"]
>>> slots_vars = []
>>> for slot in slots:
... var = paddle.static.data(
... name=slot, shape=[None, 1], dtype="int64", lod_level=1)
... slots_vars.append(var)
>>> dataset.init(
... batch_size=1,
... thread_num=2,
... input_type=1,
... pipe_command="cat",
... use_var=slots_vars)
>>> filelist = ["a.txt", "b.txt"]
>>> dataset.set_filelist(filelist)
>>> dataset.preload_into_memory()
>>> dataset.wait_preload_done()
local_shuffle()¶
局部 shuffle。加载到内存的训练样本进行单机节点内部的打乱
代码示例
>>> import paddle
>>> paddle.enable_static()
>>> dataset = paddle.distributed.InMemoryDataset()
>>> slots = ["slot1", "slot2", "slot3", "slot4"]
>>> slots_vars = []
>>> for slot in slots:
... var = paddle.static.data(
... name=slot, shape=[None, 1], dtype="int64", lod_level=1)
... slots_vars.append(var)
>>> dataset.init(
... batch_size=1,
... thread_num=2,
... input_type=1,
... pipe_command="cat",
... use_var=slots_vars)
>>> filelist = ["a.txt", "b.txt"]
>>> dataset.set_filelist(filelist)
>>> dataset.load_into_memory()
>>> dataset.local_shuffle()
global_shuffle(fleet=None, thread_num=12)¶
全局 shuffle。只能用在分布式模式(单机多进程或多机多进程)中。您如果在分布式模式中运行,应当传递 fleet 而非 None。
代码示例
>>> import paddle
>>> paddle.enable_static()
>>> dataset = paddle.distributed.InMemoryDataset()
>>> slots = ["slot1", "slot2", "slot3", "slot4"]
>>> slots_vars = []
>>> for slot in slots:
... var = paddle.static.data(
... name=slot, shape=[None, 1], dtype="int64", lod_level=1)
... slots_vars.append(var)
>>> dataset.init(
... batch_size=1,
... thread_num=2,
... input_type=1,
... pipe_command="cat",
... use_var=slots_vars)
>>> filelist = ["a.txt", "b.txt"]
>>> dataset.set_filelist(filelist)
>>> dataset.load_into_memory()
>>> dataset.global_shuffle()
参数
fleet (Fleet) – fleet 单例。默认为 None。
thread_num (int) - 全局 shuffle 时的线程数。
release_memory()¶
当数据不再使用时,释放 InMemoryDataset 内存数据。
>>> import paddle
>>> paddle.enable_static()
>>> dataset = paddle.distributed.InMemoryDataset()
>>> slots = ["slot1", "slot2", "slot3", "slot4"]
>>> slots_vars = []
>>> for slot in slots:
... var = paddle.static.data(
... name=slot, shape=[None, 1], dtype="int64", lod_level=1)
... slots_vars.append(var)
>>> dataset.init(
... batch_size=1,
... thread_num=2,
... input_type=1,
... pipe_command="cat",
... use_var=slots_vars)
>>> filelist = ["a.txt", "b.txt"]
>>> dataset.set_filelist(filelist)
>>> dataset.load_into_memory()
>>> dataset.global_shuffle()
>>> exe = paddle.static.Executor(paddle.CPUPlace())
>>> startup_program = paddle.static.Program()
>>> main_program = paddle.static.Program()
>>> exe.run(startup_program)
>>> exe.train_from_dataset(main_program, dataset)
>>> dataset.release_memory()
get_memory_data_size(fleet=None)¶
用户可以调用此函数以了解加载进内存后所有 workers 中的样本数量。
注解
该函数可能会导致性能不佳,因为它具有 barrier。
参数
fleet (Fleet) – fleet 对象。
返回 内存数据的大小。
代码示例
>>> import paddle
>>> paddle.enable_static()
>>> dataset = paddle.distributed.InMemoryDataset()
>>> slots = ["slot1", "slot2", "slot3", "slot4"]
>>> slots_vars = []
>>> for slot in slots:
... var = paddle.static.data(
... name=slot, shape=[None, 1], dtype="int64", lod_level=1)
... slots_vars.append(var)
>>> dataset.init(
... batch_size=1,
... thread_num=2,
... input_type=1,
... pipe_command="cat",
... use_var=slots_vars)
>>> filelist = ["a.txt", "b.txt"]
>>> dataset.set_filelist(filelist)
>>> dataset.load_into_memory()
>>> print(dataset.get_memory_data_size())
get_shuffle_data_size(fleet=None)¶
获取 shuffle 数据大小,用户可以调用此函数以了解局域/全局 shuffle 后所有 workers 中的样本数量。
注解
该函数可能会导致局域 shuffle 性能不佳,因为它具有 barrier。但其不影响局域 shuffle。
参数
fleet (Fleet) – fleet 对象。
返回 shuffle 数据的大小。
代码示例
>>> import paddle
>>> paddle.enable_static()
>>> dataset = paddle.distributed.InMemoryDataset()
>>> dataset = paddle.distributed.InMemoryDataset()
>>> slots = ["slot1", "slot2", "slot3", "slot4"]
>>> slots_vars = []
>>> for slot in slots:
... var = paddle.static.data(
... name=slot, shape=[None, 1], dtype="int64", lod_level=1)
... slots_vars.append(var)
>>> dataset.init(
... batch_size=1,
... thread_num=2,
... input_type=1,
... pipe_command="cat",
... use_var=slots_vars)
>>> filelist = ["a.txt", "b.txt"]
>>> dataset.set_filelist(filelist)
>>> dataset.load_into_memory()
>>> dataset.global_shuffle()
>>> print(dataset.get_shuffle_data_size())
slots_shuffle(slots)¶
该方法是在特征层次上的一个打乱方法,经常被用在有着较大缩放率实例的稀疏矩阵上,为了比较 metric,比如 auc,在一个或者多个有着 baseline 的特征上做特征打乱来验证特征 level 的重要性。
参数
slots (list[string]) - 要打乱特征的集合
代码示例
>>> import paddle
>>> paddle.enable_static()
>>> dataset = paddle.distributed.InMemoryDataset()
>>> dataset._init_distributed_settings(fea_eval=True)
>>> slots = ["slot1", "slot2", "slot3", "slot4"]
>>> slots_vars = []
>>> for slot in slots:
... var = paddle.static.data(
... name=slot, shape=[None, 1], dtype="int64", lod_level=1)
... slots_vars.append(var)
>>> dataset.init(
... batch_size=1,
... thread_num=2,
... input_type=1,
... pipe_command="cat",
... use_var=slots_vars)
>>> filelist = ["a.txt", "b.txt"]
>>> dataset.set_filelist(filelist)
>>> dataset.load_into_memory()
>>> dataset.slots_shuffle(['slot1'])