Skip to content

Commit f65f899

Browse files
committed
docs(distributed/collective): revision & add stream apis
1 parent 9b53b4c commit f65f899

26 files changed

+501
-136
lines changed

docs/api/paddle/distributed/Overview_cn.rst

Lines changed: 41 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@ paddle.distributed 目录包含的 API 支撑飞桨框架大规模分布式训
88
- :ref:`Fleet 分布式高层 API <01>`
99
- :ref:`环境配置和训练启动管理 <02>`
1010
- :ref:`数据加载 <03>`
11-
- :ref:`集合通信算法 API <04>`
11+
- :ref:`集合通信 API <04>`
12+
- :ref:`Stream 集合通信高级 API <05>`
1213

1314
.. _01:
1415

@@ -70,29 +71,50 @@ paddle.distributed.fleet 是分布式训练的统一入口 API,用于配置分
7071

7172
.. _04:
7273

73-
集合通信算法 API
74+
集合通信 API
7475
::::::::::::::::::::::
7576

76-
在集群上,对多设备的进程组的参数数据 tensor 或 object 进行计算处理。
77+
在集群上,对多设备的进程组的参数数据 tensor 或 object 进行计算处理,包括规约、聚合、广播、分发等
7778

7879
.. csv-table::
7980
:header: "API 名称", "API 功能"
8081
:widths: 20, 50
8182

82-
83-
" :ref:`reduce <cn_api_distributed_reduce>` ", "规约,规约进程组内的 tensor,返回结果至指定进程"
84-
" :ref:`ReduceOp <cn_api_distributed_ReduceOp>` ", "规约,指定逐元素规约操作"
85-
" :ref:`all_reduce <cn_api_distributed_all_reduce>` ", "组规约,规约进程组内的 tensor,结果广播至每个进程"
86-
" :ref:`all_gather <cn_api_distributed_all_gather>` ", "组聚合,聚合进程组内的 tensor,结果广播至每个进程"
87-
" :ref:`all_gather_object <cn_api_distributed_all_gather_object>` ", "组聚合,聚合进程组内的 object,结果广播至每个进程"
88-
" :ref:`alltoall <cn_api_distributed_alltoall>` ", "分发 tensor 列表到每个进程并进行聚合"
89-
" :ref:`alltoall_single <cn_api_distributed_alltoall_single>` ", "分发单个 tensor 到每个进程并聚合至目标 tensor"
90-
" :ref:`broadcast <cn_api_distributed_broadcast>` ", "广播一个 tensor 到每个进程"
91-
" :ref:`scatter <cn_api_distributed_scatter>` ", "分发 tensor 到每个进程"
92-
" :ref:`split <cn_api_distributed_split>` ", "切分参数到多个设备"
93-
" :ref:`barrier <cn_api_distributed_barrier>` ", "同步路障,进行阻塞操作,实现组内所有进程的同步"
94-
" :ref:`send <cn_api_distributed_send>` ", "发送一个 tensor 到指定的进程"
83+
" :ref:`ReduceOp <cn_api_distributed_ReduceOp>` ", "规约操作的类型"
84+
" :ref:`reduce <cn_api_distributed_reduce>` ", "规约,规约进程组内的一个 tensor,随后将结果发送到指定进程"
85+
" :ref:`all_reduce <cn_api_distributed_all_reduce>` ", "组规约,规约进程组内的 tensor,随后将结果发送到每个进程"
86+
" :ref:`all_gather <cn_api_distributed_all_gather>` ", "组聚合,聚合进程组内的 tensor,随后将结果发送到每个进程"
87+
" :ref:`all_gather_object <cn_api_distributed_all_gather_object>` ", "组聚合,聚合进程组内的 object,随后将结果发送到每个进程"
88+
" :ref:`alltoall <cn_api_distributed_alltoall>` ", "将一组 tensor 分发到每个进程并进行聚合"
89+
" :ref:`alltoall_single <cn_api_distributed_alltoall_single>` ", "将一个 tensor 分发到每个进程并聚合到目标 tensor"
90+
" :ref:`broadcast <cn_api_distributed_broadcast>` ", "将一个 tensor 发送到每个进程"
91+
" :ref:`scatter <cn_api_distributed_scatter>` ", "将一组 tensor 分发到每个进程"
92+
" :ref:`reduce_scatter <cn_api_distributed_reduce_scatter>` ", "规约一组 tensor,随后将规约结果分发到每个进程"
93+
" :ref:`isend <cn_api_distributed_isend>` ", "异步发送一个 tensor 到指定进程"
94+
" :ref:`irecv <cn_api_distributed_irecv>` ", "异步接收一个来自指定进程的 tensor"
95+
" :ref:`send <cn_api_distributed_send>` ", "发送一个 tensor 到指定进程"
9596
" :ref:`recv <cn_api_distributed_recv>` ", "接收一个来自指定进程的 tensor"
96-
" :ref:`isend <cn_api_paddle_distributed_isend>` ", "异步发送一个 tensor 到指定的进程"
97-
" :ref:`irecv <cn_api_paddle_distributed_irecv>` ", "异步接收一个来自指定进程的 tensor"
98-
" :ref:`reduce_scatter <cn_api_paddle_distributed_reduce_scatter>` ", "规约,然后将 tensor 列表分散到组中的所有进程上"
97+
" :ref:`barrier <cn_api_distributed_barrier>` ", "同步路障,阻塞操作以实现组内进程同步"
98+
99+
.. _05:
100+
101+
Stream 集合通信高级 API
102+
::::::::::::::::::::::
103+
104+
paddle.distributed.stream 在集合通信 API 的基础上,提供更统一的语义和对计算流的更精细的控制能力,有助于在特定场景下提高性能。
105+
106+
.. csv-table::
107+
:header: "API 名称", "API 功能"
108+
:widths: 30, 50
109+
110+
111+
" :ref:`stream.reduce <cn_api_distributed_stream_reduce>` ", "规约,规约进程组内的 tensor,随后将结果发送到指定进程"
112+
" :ref:`stream.all_reduce <cn_api_distributed_stream_all_reduce>` ", "组规约,规约进程组内的 tensor,随后将结果发送到每个进程"
113+
" :ref:`stream.all_gather <cn_api_distributed_stream_all_gather>` ", "组聚合,聚合进程组内的 tensor,随后将结果发送到每个进程"
114+
" :ref:`stream.alltoall <cn_api_distributed_stream_alltoall>` ", "分发一组 tensor 到每个进程并进行聚合"
115+
" :ref:`stream.alltoall_single <cn_api_distributed_stream_alltoall_single>` ", "分发一个 tensor 到每个进程并聚合到目标 tensor"
116+
" :ref:`stream.broadcast <cn_api_distributed_stream_broadcast>` ", "发送一个 tensor 到每个进程"
117+
" :ref:`stream.scatter <cn_api_distributed_stream_scatter>` ", "分发一个 tensor 到每个进程"
118+
" :ref:`stream.reduce_scatter <cn_api_distributed_stream_reduce_scatter>` ", "规约一组 tensor,随后将规约结果分发到每个进程"
119+
" :ref:`stream.send <cn_api_distributed_stream_send>` ", "发送一个 tensor 到指定进程"
120+
" :ref:`stream.recv <cn_api_distributed_stream_recv>` ", "接收一个来自指定进程的 tensor"

docs/api/paddle/distributed/ReduceOp_cn.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ ReduceOp
55

66
.. py:class:: paddle.distributed.ReduceOp()
77
8-
指定规约类操作的逐元素操作类型,需要是下述值之一
8+
指定规约操作的类型,必须是下述值之一
99

1010
ReduceOp.SUM
1111

docs/api/paddle/distributed/all_gather_cn.rst

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@ all_gather
44
-------------------------------
55

66

7-
.. py:function:: paddle.distributed.all_gather(tensor_list, tensor, group=0)
7+
.. py:function:: paddle.distributed.all_gather(tensor_list, tensor, group=None, sync_op=True)
88
9-
进程组内所有进程的指定 tensor 进行聚合操作,并返回给所有进程聚合的结果。
10-
如下图所示,4 个 GPU 分别开启 1 个进程,每张卡上的数据用卡号代表,
11-
经过 all_gather 算子后,每张卡都会拥有所有卡的数据。
9+
组聚合,聚合进程组内的指定 tensor,随后将聚合后的 tensor 列表发送到每个进程。
10+
11+
如下图所示,4 个 GPU 分别开启 1 个进程,进程拥有的数据用其在组内的 rank 表示。
12+
聚合操作后,每个进程都会得到所有进程拥有的数据。
1213

1314
.. image:: ./img/allgather.png
1415
:width: 800
@@ -17,13 +18,14 @@ all_gather
1718

1819
参数
1920
:::::::::
20-
- **tensor_list** (list) - 操作的输出 Tensor 列表。列表中的每个元素均为 Tensor,每个 Tensor 的数据类型为:float16、float32、float64、int32、int64、int8、uint8、bool、bfloat16、complex64、complex128。
21-
- **tensor** (Tensor) - 操作的输入 Tensor。Tensor 的数据类型为:float16、float32、float64、int32、int64、int8、uint8、bool、bfloat16、complex64、complex128。
22-
- **group** (int,可选) - 工作的进程组编号,默认为 0。
21+
- **tensor_list** (List[Tensor]) - 用于保存聚合结果的 tensor 列表。若不为空,其中每个 tensor 的数据类型必须与输入的 tensor 保持一致。
22+
- **tensor** (Tensor) - 待聚合的 tensor。支持的数据类型包括:float16、float32、float64、int32、int64、int8、uint8、bool、bfloat16、complex64、complex128。
23+
- **group** (Group,可选) - 执行该操作的进程组实例(通过 ``new_group`` 创建)。默认为 None,即使用全局默认进程组。
24+
- **sync_op** (bool,可选) - 该操作是否为同步操作。默认为 True,即同步操作。
2325

2426
返回
2527
:::::::::
26-
28+
无返回值。
2729

2830
代码示例
2931
:::::::::

docs/api/paddle/distributed/all_gather_object_cn.rst

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,23 @@ all_gather_object
44
-------------------------------
55

66

7-
.. py:function:: paddle.distributed.all_gather_object(object_list, object, group=0)
8-
9-
进程组内所有进程指定的 picklable 对象进行聚合操作,并返回给所有进程聚合的结果。和 all_gather 类似,但可以传入自定义的 python 对象。
7+
.. py:function:: paddle.distributed.all_gather_object(object_list, obj, group=None)
108
119
.. warning::
1210
该 API 只支持动态图模式。
1311

12+
组聚合,聚合进程组内指定的 picklable 对象,随后将聚合后的对象列表发送到每个进程。
13+
过程与 ``all_gather`` 类似,但可以传入自定义的 python 对象。
14+
1415
参数
1516
:::::::::
16-
- **object_list** (list) - 操作的输出 Object 列表
17-
- **object** (Any) - 操作的输入 Object,需要保证输入自定义的 Object 是 picklable 的。
18-
- **group** (int,可选) - 工作的进程组编号,默认为 0
17+
- **object_list** (List[Any]) - 用于保存聚合结果的列表
18+
- **object** (Any) - 待聚合的对象。需要保证该对象是 picklable 的。
19+
- **group** (Group,可选) - 执行该操作的进程组实例(通过 ``new_group`` 创建)。默认为 None,即使用全局默认进程组
1920

2021
返回
2122
:::::::::
22-
23+
无返回值。
2324

2425
代码示例
2526
:::::::::

docs/api/paddle/distributed/all_reduce_cn.rst

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,12 @@ all_reduce
44
-------------------------------
55

66

7-
.. py:function:: paddle.distributed.all_reduce(tensor, op=ReduceOp.SUM, group=0)
7+
.. py:function:: paddle.distributed.all_reduce(tensor, op=ReduceOp.SUM, group=None, sync_op=True)
88
9-
进程组内所有进程的指定 tensor 进行归约操作,并返回给所有进程归约的结果。
10-
如下图所示,4 个 GPU 分别开启 1 个进程,每张卡上的数据用卡号代表,规约操作为求和,
11-
经过 all_reduce 算子后,每张卡都会拥有所有卡数据的总和。
9+
规约进程组内的一个 tensor,随后将结果发送到每个进程。
10+
11+
如下图所示,4 个 GPU 分别开启 1 个进程,进程拥有的数据用其在组内的 rank 表示,规约操作为求和。
12+
规约操作后,每个进程都会得到所有进程数据的总和。
1213

1314
.. image:: ./img/allreduce.png
1415
:width: 800
@@ -17,13 +18,14 @@ all_reduce
1718

1819
参数
1920
:::::::::
20-
- **tensor** (Tensor) - 操作的输入 Tensor,同时也会将归约结果返回至此 Tensor 中。Tensor 的数据类型为:float16、float32、float64、int32、int64、int8、uint8、bool、bfloat16。
21-
- **op** (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.MIN|ReduceOp.PROD,可选) - 归约的具体操作,比如求和,取最大值,取最小值和求乘积,默认为求和归约。
22-
- **group** (int,可选) - 工作的进程组编号,默认为 0。
21+
- **tensor** (Tensor) - 输入的 tensor。返回结果也将保存到该 tensor 中。支持的数据类型包括:float16、float32、float64、int32、int64、int8、uint8、bool、bfloat16。
22+
- **op** (ReduceOp.SUM|ReduceOp.MAX|ReduceOp.MIN|ReduceOp.PROD,可选) - 归约的操作类型,包括求和、取最大值、取最小值和求乘积。默认为求和。
23+
- **group** (Group,可选) - 执行该操作的进程组实例(通过 ``new_group`` 创建)。默认为 None,即使用全局默认进程组。
24+
- **sync_op** (bool,可选) - 该操作是否为同步操作。默认为 True,即同步操作。
2325

2426
返回
2527
:::::::::
26-
28+
返回 Task 实例。
2729

2830
代码示例
2931
:::::::::

docs/api/paddle/distributed/alltoall_cn.rst

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,14 @@ alltoall
44
-------------------------------
55

66

7-
.. py:function:: paddle.distributed.alltoall(in_tensor_list, out_tensor_list, group=None, use_calc_stream=True)
7+
.. py:function:: paddle.distributed.alltoall(in_tensor_list, out_tensor_list, group=None, sync_op=True)
88
9-
将 in_tensor_list 里面的 tensors 按照卡数均分并按照卡的顺序分发到所有参与的卡并将结果 tensors 汇总到 out_tensor_list。
10-
如下图所示,GPU0 卡的 in_tensor_list 会按照两张卡拆分成 0_0 和 0_1, GPU1 卡的 in_tensor_list 同样拆分成 1_0 和 1_1,经过 alltoall 算子后,
11-
GPU0 卡的 0_0 会发送给 GPU0,GPU0 卡的 0_1 会发送给 GPU1,GPU1 卡的 1_0 会发送给 GPU0,GPU1 卡的 1_1 会发送给 GPU1,所以 GPU0 卡的 out_tensor_list 包含 0_0 和 1_0,
12-
GPU1 卡的 out_tensor_list 包含 0_1 和 1_1。
9+
将 in_tensor_list 中的一组 tensor 分发到每个进程,随后在每个进程上将分发结果聚合到 out_tensor_list。
10+
11+
如下图所示,2 个 GPU 分别开启 1 个进程,rank=0 的进程的 in_tensor_list 包含 0_0 和 0_1 两个 tensor,rank=1 的进程的 in_tensor_list 包含 1_0 和 1_1 两个 tensor。
12+
操作后,rank=0 的进程的 out_tensor_list 会包含 0_0 和 1_0 两个 tensor,rank=1 的进程的 out_tensor_list 会包含 0_0 和 1_1 两个 tensor。
13+
14+
简单来说,该操作类似于 scatter + gather。更直观地,如果将全部进程上的数据看作一个矩阵,该操作类似于对矩阵进行转置。
1315

1416
.. image:: ./img/alltoall.png
1517
:width: 800
@@ -18,14 +20,14 @@ GPU1 卡的 out_tensor_list 包含 0_1 和 1_1。
1820

1921
参数
2022
:::::::::
21-
- **in_tensor_list** (list) - 包含所有输入 Tensors 的一个列表。在列表里面的所有元素都必须是一个 Tensor,Tensor 的数据类型必须是 float16、float32、float64、int32、int64、int8、uint8、bool、bfloat16。
22-
- **out_tensor_list** (Tensor) - 包含所有输出 Tensors 的一个列表。在列表里面的所有元素数据类型要和输入的 Tensors 数据类型一致
23-
- **group** (Group,可选) - new_group 返回的 Group 实例,或者设置为 None 表示默认地全局组。默认值:None
24-
- **use_calc_stream** (bool,可选) - 标识使用计算流还是通信流。默认值:True。
23+
- **in_tensor_list** (List[Tensor]) - 输入的 tensor 列表。支持的数据类型包括:float16、float32、float64、int32、int64、int8、uint8、bool、bfloat16。
24+
- **out_tensor_list** (List[Tensor]) - 用于保存操作结果的 tensor 列表。其中每个 tensor 的数据类型必须与输入的 tensor 保持一致
25+
- **group** (Group,可选) - 执行该操作的进程组实例(通过 ``new_group`` 创建)。默认为 None,即使用全局默认进程组
26+
- **sync_op** (bool,可选) - 该操作是否为同步操作。默认为 True,即同步操作
2527

2628
返回
2729
:::::::::
28-
30+
无返回值。
2931

3032
代码示例
3133
:::::::::

docs/api/paddle/distributed/alltoall_single_cn.rst

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,25 @@ alltoall_single
44
-------------------------------
55

66

7-
.. py:function:: alltoall_single(in_tensor, out_tensor, in_split_sizes=None, out_split_sizes=None, group=None, use_calc_stream=True)
7+
.. py:function:: paddle.distributed.alltoall_single(in_tensor, out_tensor, in_split_sizes=None, out_split_sizes=None, group=None, sync_op=True)
88
9-
将输入的 tensor 分发到所有进程,并将接收到的 tensor 聚合到 out_tensor 中。
9+
.. warning::
10+
该 API 只支持动态图模式。
11+
12+
将输入的 tensor 分发到每个进程,随后在每个进程上将分发结果聚合到 out_tensor 中。
1013

1114
参数
1215
:::::::::
13-
- in_tensor (Tensor): 输入的 tensor,其数据类型必须是 float16、float32、float64、int32、int64、int8、uint8、bool、bfloat16。
14-
- out_tensor (Tensor): 输出的 tensor,其数据类型与输入的 tensor 一致。
15-
- in_split_sizes (list[int],可选): 对 in_tensor 的 dim[0] 进行切分的大小。若该参数未指定,in_tensor 将被均匀切分到各个进程中(需要确保 in_tensor 的大小能够被组中的进程数整除)。默认值:None。
16-
- out_split_sizes (list[int],可选): 对 out_tensor 的 dim[0] 进行切分的大小。若该参数未指定,out_tensor 将均匀地聚合来自各个进程的数据(需要确保 out_tensor 的大小能够被组中的进程数整除)。默认值:None。
17-
- use_calc_stream (bool,可选) - 标识使用计算流(若为 True)还是通信流。默认值:True。
16+
- **in_tensor** (Tensor): 输入的 tensor。支持的数据类型包括:float16、float32、float64、int32、int64、int8、uint8、bool、bfloat16。
17+
- **out_tensor** (Tensor): 用于保存操作结果的 tensor,数据类型必须与输入的 tensor 保持一致。
18+
- **in_split_sizes** (List[int],可选): 对 in_tensor 的 dim[0] 进行切分的大小。默认为 None,即将 in_tensor 均匀地分发到各个进程中(需要确保 in_tensor 的大小能够被组中的进程数整除)。
19+
- **out_split_sizes** (List[int],可选): 对 out_tensor 的 dim[0] 进行切分的大小。默认为 None,即 out_tensor 将均匀地聚合来自各个进程的数据(需要确保 out_tensor 的大小能够被组中的进程数整除)。
20+
- **group** (Group,可选) - 执行该操作的进程组实例(通过 ``new_group`` 创建)。默认为 None,即使用全局默认进程组。
21+
- **sync_op** (bool,可选) - 该操作是否为同步操作。默认为 True,即同步操作。
1822

1923
返回
2024
:::::::::
21-
若 use_calc_stream=True,无返回值;若 use_calc_stream=False,返回一个 Task。
25+
若为同步操作,无返回值;若为异步操作,返回 Task 实例
2226

2327
代码示例
2428
:::::::::

docs/api/paddle/distributed/barrier_cn.rst

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,13 @@ barrier
44
-------------------------------
55

66

7-
.. py:function:: paddle.distributed.barrier(group=0)
7+
.. py:function:: paddle.distributed.barrier(group=None)
88
99
同步进程组内的所有进程。
1010

1111
参数
1212
:::::::::
13-
- **group** (int,可选) - 工作的进程组编号,默认为 0
13+
- **group** (Group,可选) - 执行该操作的进程组实例(通过 ``new_group`` 创建)。默认为 None,即使用全局默认进程组
1414

1515
返回
1616
:::::::::

docs/api/paddle/distributed/broadcast_cn.rst

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@ broadcast
44
-------------------------------
55

66

7-
.. py:function:: paddle.distributed.broadcast(tensor, src, group=0)
7+
.. py:function:: paddle.distributed.broadcast(tensor, src, group=None, sync_op=True)
88
9-
广播一个 Tensor 给其他所有进程。
10-
如下图所示,4 个 GPU 分别开启 1 个进程,GPU0 卡拥有数据,经过 broadcast 算子后,会将这个数据传播到所有卡上。
9+
将一个 tensor 发送到每个进程。
10+
11+
如下图所示,4 个 GPU 分别开启 1 个进程,rank=0 的进程拥有数据 0。
12+
广播操作后,数据 0 会被发送到所有进程上。
1113

1214
.. image:: ./img/broadcast.png
1315
:width: 800
@@ -16,13 +18,16 @@ broadcast
1618

1719
参数
1820
:::::::::
19-
- **tensor** (Tensor) - 如果当前进程编号是源,那么这个 Tensor 变量将被发送给其他进程,否则这个 Tensor 将接收源发送过来的数据。Tensor 的数据类型为:float16、float32、float64、int32、int64、int8、uint8、bool、bfloat16。
20-
- **src** (int) - 发送源的进程编号。
21-
- **group** (int,可选) - 工作的进程组编号,默认为 0。
21+
- **tensor** (Tensor) - 在目标进程上为待广播的 tensor,在其他进程上为用于接收广播结果的 tensor。支持的数据类型包括:float16、float32、float64、int32、int64、int8、uint8、bool、bfloat16。
22+
- **src** (int) - 目标进程的 rank,该进程传入的 tensor 将被发送到其他进程上。
23+
- **group** (Group,可选) - 执行该操作的进程组实例(通过 ``new_group`` 创建)。默认为 None,即使用全局默认进程组。
24+
- **sync_op** (bool,可选) - 该操作是否为同步操作。默认为 True,即同步操作。
2225

2326
返回
2427
:::::::::
25-
28+
动态图模式下,若为同步操作,返回 None;若为异步操作,返回 Task 实例。
29+
30+
静态图模式下,返回 None。
2631

2732
代码示例
2833
:::::::::

0 commit comments

Comments
 (0)