Quick start for distributed training¶
Distributed training with Fleet API¶
Since PaddlePaddle Release 1.5.1, it is officially recommended to use the Fleet API for distributed training.
Preparation¶
[x] Install PaddlePaddle. If not already installed, please refer to Beginner’s Guide.
[x] Master the most basic single node training method. Please refer to the single card training described in Single-node training.
Click-through rate prediction¶
Here, we will use a simple example, click-through rate prediction task, to illustrate how to configure Fleet API for distributed training, and gives an example by using a single node environment to simulate the distributed environment.
In order to facilitate learning, the example given here is a mixed code of single node and multi node. You can start single node or multi node tasks through different startup commands.
from args import parse_args
import os
import sys
import paddle
import paddle.distributed.fleet.base.role_maker as role_maker
import paddle.distributed.fleet as fleet
from network_conf import ctr_dnn_model_dataset
dense_feature_dim = 13
sparse_feature_dim = 10000001
batch_size = 100
thread_num = 10
embedding_size = 10
args = parse_args()
def main_function(is_local):
# common code for local training and distributed training
dense_input = paddle.static.data(
name="dense_input", shape=[dense_feature_dim], dtype='float32')
sparse_input_ids = [
paddle.static.data(name="C" + str(i), shape=[1], lod_level=1,
dtype="int64") for i in range(1, 27)]
label = paddle.static.data(name="label", shape=[1], dtype="int64")
dataset = paddle.distributed.QueueDataset()
dataset.init(
batch_size=batch_size,
thread_num=thread_num,
input_type=0,
pipe_command=python criteo_reader.py %d" % sparse_feature_dim,
use_var=[dense_input] + sparse_input_ids + [label])
whole_filelist = ["raw_data/part-%d" % x
for x in range(len(os.listdir("raw_data")))]
dataset.set_filelist(whole_filelist)
loss, auc_var, batch_auc_var = ctr_dnn_model_dataset(
dense_input, sparse_input_ids, label, embedding_size,
sparse_feature_dim)
exe = paddle.static.Executor(paddle.CPUPlace())
def train_loop(epoch=20):
for i in range(epoch):
exe.train_from_dataset(program=paddle.static.default_main_program(),
dataset=dataset,
fetch_list=[auc_var],
fetch_info=["auc"],
debug=False)
# local training
def local_train():
optimizer = paddle.optimizer.SGD(learning_rate=1e-4)
optimizer.minimize(loss)
exe.run(paddle.static.default_startup_program())
train_loop()
# distributed training
def dist_train():
role = role_maker.PaddleCloudRoleMaker()
fleet.init(role)
strategy = paddle.distributed.fleet.DistributedStrategy()
strategy.a_sync = True
optimizer = paddle.optimizer.SGD(learning_rate=1e-4)
optimizer = fleet.distributed_optimizer(optimizer, strategy)
optimizer.minimize(loss)
if fleet.is_server():
fleet.init_server()
fleet.run_server()
elif fleet.is_worker():
fleet.init_worker()
exe.run(paddle.static.default_startup_program())
train_loop()
if is_local:
local_train()
else:
dist_train()
if __name__ == '__main__':
main_function(args.is_local)
Note: The IO method used in this example is dataset, please refer to Dataset API for specific documents and usage.
Start command of single node training¶
python train.py --is_local 1
Start command of single machine simulation distributed training¶
Here we use launch_ps, a built-in launcher of paddle, which users can specify the number of workers and servers to start the parameter server tasks.
python -m paddle.distributed.launch_ps --worker_num 2 --server_num 2 train.py
The task running log can be viewed in the logs directory of the working directory. When you can use a single machine to simulate distributed training, you can perform true multi node distributed training.