UtilBase

class paddle.distributed.fleet. UtilBase [source]
all_reduce ( input, mode='sum', comm_world='worker' ) [source]

all_reduce

All reduce input between specified collection. This is a distributed API.

Parameters
  • input (list|tuple|numpy.array) – The input variable to do all_reduce between specified collection.

  • mode (str) – “sum” or “min” or “max”.

  • comm_world (str, optional) – Collection used to execute all_reduce operation. Supported collections include worker , server and all . The default is worker .

Returns

A numpy array with the same shape as the input .

Return type

output(Numpy.array|None)

Examples

>>> 
>>> # Save the following code in `train.py` , and then execute the command `fleetrun --server_num 2 --worker_num 2 train.py` .
>>> import paddle.distributed.fleet as fleet
>>> from paddle.distributed.fleet import PaddleCloudRoleMaker
>>> import sys
>>> import numpy as np
>>> import os

>>> os.environ["PADDLE_WITH_GLOO"] = "2"

>>> def train():
...     role = PaddleCloudRoleMaker(
...         is_collective=False,
...         init_gloo=True,
...         path="./tmp_gloo")
...     fleet.init(role)
...
...     if fleet.is_server():
...         input = np.array([1, 2])
...         output = fleet.util.all_reduce(input, "sum", "server")
...         print(output) # [2, 4]
...     elif fleet.is_worker():
...         input = np.array([3, 4])
...         output = fleet.util.all_reduce(input, "sum", "worker")
...         print(output) # [6, 8]
...     output = fleet.util.all_reduce(input, "sum", "all")
...     print(output) # [8, 12]

>>> if __name__ == "__main__":
...     train()
barrier ( comm_world='worker' )

barrier

Barrier between specified collection.

Parameters

comm_world (str, optional) – Collection used to execute barrier operation. Supported collections include worker , server and all . The default is worker .

Examples

>>> 
>>> # Save the following code in `train.py` , and then execute the command `fleetrun --server_num 2 --worker_num 2 train.py` .
>>> import paddle.distributed.fleet as fleet
>>> from paddle.distributed.fleet import PaddleCloudRoleMaker
>>> import sys
>>> import os

>>> os.environ["PADDLE_WITH_GLOO"] = "2"

>>> def train():
...     role = PaddleCloudRoleMaker(
...         is_collective=False,
...         init_gloo=True,
...         path="./tmp_gloo")
...     fleet.init(role)
...
...     if fleet.is_server():
...         fleet.util.barrier("server")
...         print("all server arrive here") # all server arrive here
...     elif fleet.is_worker():
...         fleet.util.barrier("worker")
...         print("all server arrive here") # all server arrive here
...     fleet.util.barrier("all")
...     print("all servers and workers arrive here") #all servers and workers arrive here

>>> if __name__ == "__main__":
...     train()
all_gather ( input, comm_world='worker' )

all_gather

All gather input between specified collection.

Parameters
  • input (Int|Float) – The input variable to do all_gather between specified collection.

  • comm_world (str, optional) – Collection used to execute all_reduce operation. Supported collections include worker , server and all . The default is worker .

Returns

A list of gathered values.

Return type

output (List)

Examples

>>> 
>>> # Save the following code in `train.py` , and then execute the command `fleetrun --server_num 2 --worker_num 2 train.py` .
>>> import paddle.distributed.fleet as fleet
>>> from paddle.distributed.fleet import PaddleCloudRoleMaker
>>> import sys
>>> import os

>>> os.environ["PADDLE_WITH_GLOO"] = "2"

>>> def train():
...     role = PaddleCloudRoleMaker(
...         is_collective=False,
...         init_gloo=True,
...         path="./tmp_gloo")
...     fleet.init(role)
...
...     if fleet.is_server():
...         input = fleet.server_index()
...         output = fleet.util.all_gather(input, "server")
...         print(output) # [0, 1]
...     elif fleet.is_worker():
...         input = fleet.worker_index()
...         output = fleet.util.all_gather(input, "worker")
...         print(output) # [0, 1]
...     output = fleet.util.all_gather(input, "all")
...     print(output) # [0, 1, 0, 1]

>>> if __name__ == "__main__":
...     train()
get_file_shard ( files )

get_file_shard

Split files before distributed training, and return filelist assigned to the current trainer.

example 1: files is [a, b, c ,d, e]  and trainer_num = 2, then trainer
        0 gets [a, b, c] and trainer 1 gets [d, e].
example 2: files is [a, b], and trainer_num = 3, then trainer 0 gets
        [a], trainer 1 gets [b],  trainer 2 gets []
Parameters

files (list) – File list need to be read.

Returns

Files belong to this worker.

Return type

List

Examples

>>> 
>>> import paddle.distributed.fleet as fleet
>>> from paddle.distributed.fleet import UserDefinedRoleMaker

>>> role = UserDefinedRoleMaker(
...     is_collective=False,
...     init_gloo=False,
...     current_id=0,
...     role=fleet.Role.WORKER,
...     worker_endpoints=["127.0.0.1:6003", "127.0.0.1:6004"],
...     server_endpoints=["127.0.0.1:6001", "127.0.0.1:6002"])
>>> fleet.init(role)

>>> files = fleet.util.get_file_shard(["file1", "file2", "file3"])
>>> print(files)
["file1", "file2"]
print_on_rank ( message, rank_id )

print_on_rank

Worker of rank rank_id print some message.

Parameters
  • message (str) – Log to be printed.

  • rank_id (int) – trainer id.

Examples

>>> 
>>> import paddle.distributed.fleet as fleet
>>> from paddle.distributed.fleet import UserDefinedRoleMaker

>>> role = UserDefinedRoleMaker(
...     is_collective=False,
...     init_gloo=False,
...     current_id=0,
...     role=fleet.Role.WORKER,
...     worker_endpoints=["127.0.0.1:6003", "127.0.0.1:6004"],
...     server_endpoints=["127.0.0.1:6001", "127.0.0.1:6002"])
>>> fleet.init(role)

>>> fleet.util.print_on_rank("I'm worker 0", 0)
I'm worker 0