使用FleetAPI进行分布式训练¶
FleetAPI 设计说明¶
Fleet是PaddlePaddle分布式训练的高级API。Fleet的命名出自于PaddlePaddle,象征一个舰队中的多只双桨船协同工作。Fleet的设计在易用性和算法可扩展性方面做出了权衡。用户可以很容易从单机版的训练程序,通过添加几行代码切换到分布式训练程序。此外,分布式训练的算法也可以通过Fleet API接口灵活定义。
Fleet API快速上手示例¶
下面会针对Fleet API最常见的两种使用场景,用一个模型做示例,目的是让用户有快速上手体验的模板。
假设我们定义MLP网络如下:
import paddle def mlp(input_x, input_y, hid_dim=128, label_dim=2): fc_1 = paddle.static.nn.fc(input=input_x, size=hid_dim, act='tanh') fc_2 = paddle.static.nn.fc(input=fc_1, size=hid_dim, act='tanh') prediction = paddle.static.nn.fc(input=[fc_2], size=label_dim, act='softmax') cost = paddle.static.nn.cross_entropy(input=prediction, label=input_y) avg_cost = paddle.static.nn.mean(x=cost) return avg_cost
定义一个在内存生成数据的Reader如下:
import numpy as np def gen_data(): return {"x": np.random.random(size=(128, 32)).astype('float32'), "y": np.random.randint(2, size=(128, 1)).astype('int64')}
单机Trainer定义
import paddle from nets import mlp from utils import gen_data input_x = paddle.static.data(name="x", shape=[None, 32], dtype='float32') input_y = paddle.static.data(name="y", shape=[None, 1], dtype='int64') cost = mlp(input_x, input_y) optimizer = paddle.optimizer.SGD(learning_rate=0.01) optimizer.minimize(cost) place = paddle.CUDAPlace(0) exe = paddle.static.Executor(place) exe.run(paddle.static.default_startup_program()) step = 1001 for i in range(step): cost_val = exe.run(feed=gen_data(), fetch_list=[cost.name]) print("step%d cost=%f" % (i, cost_val[0]))
Parameter Server训练方法
参数服务器方法对于大规模数据,简单模型的并行训练非常适用,我们基于单机模型的定义给出使用Parameter Server进行训练的示例如下:
import paddle paddle.enable_static() import paddle.distributed.fleet.base.role_maker as role_maker import paddle.distributed.fleet as fleet from nets import mlp from utils import gen_data input_x = paddle.static.data(name="x", shape=[None, 32], dtype='float32') input_y = paddle.static.data(name="y", shape=[None, 1], dtype='int64') cost = mlp(input_x, input_y) optimizer = paddle.optimizer.SGD(learning_rate=0.01) role = role_maker.PaddleCloudRoleMaker() fleet.init(role) strategy = paddle.distributed.fleet.DistributedStrategy() strategy.a_sync = True optimizer = fleet.distributed_optimizer(optimizer, strategy) optimizer.minimize(cost) if fleet.is_server(): fleet.init_server() fleet.run_server() elif fleet.is_worker(): place = paddle.CPUPlace() exe = paddle.static.Executor(place) exe.run(paddle.static.default_startup_program()) step = 1001 for i in range(step): cost_val = exe.run( program=paddle.static.default_main_program(), feed=gen_data(), fetch_list=[cost.name]) print("worker_index: %d, step%d cost = %f" % (fleet.worker_index(), i, cost_val[0]))
Collective训练方法
Collective Training通常在GPU多机多卡训练中使用,一般在复杂模型的训练中比较常见,我们基于上面的单机模型定义给出使用Collective方法进行分布式训练的示例如下:
import paddle paddle.enable_static() import paddle.distributed.fleet.base.role_maker as role_maker import paddle.distributed.fleet as fleet from nets import mlp from utils import gen_data input_x = paddle.static.data(name="x", shape=[None, 32], dtype='float32') input_y = paddle.static.data(name="y", shape=[None, 1], dtype='int64') cost = mlp(input_x, input_y) optimizer = paddle.optimizer.SGD(learning_rate=0.01) role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role) optimizer = fleet.distributed_optimizer(optimizer) optimizer.minimize(cost) place = paddle.CUDAPlace(0) exe = paddle.static.Executor(place) exe.run(paddle.static.default_startup_program()) step = 1001 for i in range(step): cost_val = exe.run( program=paddle.static.default_main_program(), feed=gen_data(), fetch_list=[cost.name]) print("worker_index: %d, step%d cost = %f" % (fleet.worker_index(), i, cost_val[0]))
Fleet API相关的接口说明¶
Fleet API接口¶
init(role_maker=None)
fleet初始化,需要在使用fleet其他接口前先调用,用于定义多机的环境配置
is_worker()
Parameter Server训练中使用,判断当前节点是否是Worker节点,是则返回True,否则返回False
is_server(model_dir=None)
Parameter Server训练中使用,判断当前节点是否是Server节点,是则返回True,否则返回False
init_server()
Parameter Server训练中,fleet加载model_dir中保存的模型相关参数进行parameter server的初始化
run_server()
Parameter Server训练中使用,用来启动server端服务
init_worker()
Parameter Server训练中使用,用来启动worker端服务
stop_worker()
训练结束后,停止worker
distributed_optimizer(optimizer, strategy=None)
分布式优化算法装饰器,用户可带入单机optimizer,并配置分布式训练策略,返回一个分布式的optimizer
RoleMaker¶
PaddleCloudRoleMaker
描述:PaddleCloudRoleMaker是一个高级封装,支持使用paddle.distributed.launch或者paddle.distributed.launch_ps启动脚本
Parameter Server训练示例:
import paddle paddle.enable_static() import paddle.distributed.fleet.base.role_maker as role_maker import paddle.distributed.fleet as fleet role = role_maker.PaddleCloudRoleMaker() fleet.init(role)
启动方法:
python -m paddle.distributed.launch_ps --worker_num 2 --server_num 2 trainer.py
Collective训练示例:
import paddle paddle.enable_static() import paddle.distributed.fleet.base.role_maker as role_maker import paddle.distributed.fleet as fleet role = role_maker.PaddleCloudRoleMaker(is_collective=True) fleet.init(role)
启动方法:
python -m paddle.distributed.launch trainer.py
UserDefinedRoleMaker
描述:用户自定义节点的角色信息,IP和端口信息
示例:
import paddle paddle.enable_static() import paddle.distributed.fleet.base.role_maker as role_maker import paddle.distributed.fleet as fleet role = role_maker.UserDefinedRoleMaker( current_id=0, role=role_maker.Role.SERVER, worker_num=2, server_endpoints=["127.0.0.1:36011", "127.0.0.1:36012"]) fleet.init(role)