20
20
import paddle
21
21
from paddle .fluid import core
22
22
from paddle .fluid .dygraph .parallel import ParallelEnv
23
- from paddle .fluid .framework import _test_eager_guard
24
23
25
24
26
25
class TestProcessGroupFp32 (unittest .TestCase ):
@@ -35,154 +34,151 @@ def config(self):
35
34
self .shape = (2 , 10 , 5 )
36
35
37
36
def test_create_process_group_gloo (self ):
38
- with _test_eager_guard ():
39
- nranks = ParallelEnv ().nranks
40
- rank = ParallelEnv ().local_rank
41
- is_master = True if rank == 0 else False
42
- store = paddle .fluid .core .TCPStore (
43
- "127.0.0.1" , 6272 , is_master , nranks , 30
44
- )
45
- pg = paddle .fluid .core .ProcessGroupGloo .create (store , rank , nranks )
46
-
47
- # test allreduce sum
48
- # rank 0
49
- paddle .device .set_device ('cpu' )
50
- x = np .random .random (self .shape ).astype (self .dtype )
51
- tensor_x = paddle .to_tensor (x )
52
- # rank 1
53
- y = np .random .random (self .shape ).astype (self .dtype )
54
- tensor_y = paddle .to_tensor (y )
55
-
56
- sum_result = x + y
57
- if rank == 0 :
58
- task = pg .allreduce (tensor_x )
59
- task .wait ()
60
- np .testing .assert_equal (tensor_x , sum_result )
61
- else :
62
- task = pg .allreduce (tensor_y )
63
- task .wait ()
64
- np .testing .assert_equal (tensor_y , sum_result )
65
-
66
- print ("test allreduce sum api ok" )
67
-
68
- # test allreduce max
69
- # rank 0
70
- x = np .random .random (self .shape ).astype (self .dtype )
71
- tensor_x = paddle .to_tensor (x )
72
- # rank 1
73
- y = np .random .random (self .shape ).astype (self .dtype )
74
- tensor_y = paddle .to_tensor (y )
75
-
76
- max_result = paddle .maximum (tensor_x , tensor_y )
77
-
78
- if rank == 0 :
79
- task = pg .allreduce (tensor_x , core .ReduceOp .MAX )
80
- task .wait ()
81
- assert np .array_equal (tensor_x , max_result )
82
- else :
83
- task = pg .allreduce (tensor_y , core .ReduceOp .MAX )
84
- task .wait ()
85
- assert np .array_equal (tensor_y , max_result )
86
-
87
- print ("test allreduce max api ok" )
88
-
89
- # test broadcast
90
- # rank 0
91
- x = np .random .random (self .shape ).astype (self .dtype )
92
- tensor_x = paddle .to_tensor (x )
93
- # rank 1
94
- y = np .random .random (self .shape ).astype (self .dtype )
95
- tensor_y = paddle .to_tensor (y )
96
-
97
- broadcast_result = paddle .assign (tensor_x )
98
- if rank == 0 :
99
- task = pg .broadcast (tensor_x , 0 )
100
- assert np .array_equal (broadcast_result , tensor_x )
101
- else :
102
- task = pg .broadcast (tensor_y , 0 )
103
- assert np .array_equal (broadcast_result , tensor_y )
104
- print ("test broadcast api ok" )
105
-
106
- # test barrier
107
- # rank 0
108
- if pg .rank () == 0 :
109
- task = pg .barrier ()
110
- task .wait ()
111
- # rank 1
112
- else :
113
- task = pg .barrier ()
114
- task .wait ()
115
-
116
- print ("test barrier api ok\n " )
117
-
118
- # test allgather
119
- # rank 0
120
- x = np .random .random (self .shape ).astype (self .dtype )
121
- y = np .random .random (self .shape ).astype (self .dtype )
122
- tensor_x = paddle .to_tensor (x )
123
- tensor_y = paddle .to_tensor (y )
124
- out_shape = list (self .shape )
125
- out_shape [0 ] *= 2
126
- out = np .random .random (out_shape ).astype (self .dtype )
127
- tensor_out = paddle .to_tensor (out )
128
- if pg .rank () == 0 :
129
- task = pg .all_gather (tensor_x , tensor_out )
130
- task .wait ()
131
- paddle .device .cuda .synchronize ()
132
- # rank 1
133
- else :
134
- task = pg .all_gather (tensor_y , tensor_out )
135
- task .wait ()
136
- out_1 = paddle .slice (tensor_out , [0 ], [0 ], [out_shape [0 ] // 2 ])
137
- out_2 = paddle .slice (
138
- tensor_out , [0 ], [out_shape [0 ] // 2 ], [out_shape [0 ]]
139
- )
140
- assert np .array_equal (tensor_x , out_1 )
141
- assert np .array_equal (tensor_y , out_2 )
142
- print ("test allgather api ok\n " )
143
-
144
- # test Reduce
145
- # rank 0
146
- x = np .random .random (self .shape ).astype (self .dtype )
147
- y = np .random .random (self .shape ).astype (self .dtype )
148
- tensor_x = paddle .to_tensor (x )
149
- tensor_y = paddle .to_tensor (y )
150
- sum_result = tensor_x + tensor_y
151
- if pg .rank () == 0 :
152
- task = pg .reduce (tensor_x , 0 )
153
- task .wait ()
154
- # rank 1
155
- else :
156
- task = pg .reduce (tensor_y , 0 )
157
- task .wait ()
158
- if pg .rank () == 0 :
159
- assert np .array_equal (tensor_x , sum_result )
160
- print ("test reduce sum api ok\n " )
161
-
162
- # test Scatter
163
- # rank 0
164
- in_shape = list (self .shape )
165
- in_shape [0 ] *= 2
166
- x = np .random .random (in_shape ).astype (self .dtype )
167
- y = np .random .random (self .shape ).astype (self .dtype )
168
- tensor_x = paddle .to_tensor (x )
169
- tensor_y = paddle .to_tensor (y )
170
- if pg .rank () == 0 :
171
- task = pg .scatter (tensor_x , tensor_y , 0 )
172
- task .wait ()
173
- # rank 1
174
- else :
175
- task = pg .scatter (tensor_x , tensor_y , 0 )
176
- task .wait ()
177
- out1 = paddle .slice (tensor_x , [0 ], [0 ], [self .shape [0 ]])
178
- out2 = paddle .slice (
179
- tensor_x , [0 ], [self .shape [0 ]], [self .shape [0 ] * 2 ]
180
- )
181
- if pg .rank () == 0 :
182
- assert np .array_equal (tensor_y , out1 )
183
- else :
184
- assert np .array_equal (tensor_y , out2 )
185
- print ("test scatter api ok\n " )
37
+ nranks = ParallelEnv ().nranks
38
+ rank = ParallelEnv ().local_rank
39
+ is_master = True if rank == 0 else False
40
+ store = paddle .fluid .core .TCPStore (
41
+ "127.0.0.1" , 6272 , is_master , nranks , 30
42
+ )
43
+ pg = paddle .fluid .core .ProcessGroupGloo .create (store , rank , nranks )
44
+
45
+ # test allreduce sum
46
+ # rank 0
47
+ paddle .device .set_device ('cpu' )
48
+ x = np .random .random (self .shape ).astype (self .dtype )
49
+ tensor_x = paddle .to_tensor (x )
50
+ # rank 1
51
+ y = np .random .random (self .shape ).astype (self .dtype )
52
+ tensor_y = paddle .to_tensor (y )
53
+
54
+ sum_result = x + y
55
+ if rank == 0 :
56
+ task = pg .allreduce (tensor_x )
57
+ task .wait ()
58
+ np .testing .assert_equal (tensor_x , sum_result )
59
+ else :
60
+ task = pg .allreduce (tensor_y )
61
+ task .wait ()
62
+ np .testing .assert_equal (tensor_y , sum_result )
63
+
64
+ print ("test allreduce sum api ok" )
65
+
66
+ # test allreduce max
67
+ # rank 0
68
+ x = np .random .random (self .shape ).astype (self .dtype )
69
+ tensor_x = paddle .to_tensor (x )
70
+ # rank 1
71
+ y = np .random .random (self .shape ).astype (self .dtype )
72
+ tensor_y = paddle .to_tensor (y )
73
+
74
+ max_result = paddle .maximum (tensor_x , tensor_y )
75
+
76
+ if rank == 0 :
77
+ task = pg .allreduce (tensor_x , core .ReduceOp .MAX )
78
+ task .wait ()
79
+ assert np .array_equal (tensor_x , max_result )
80
+ else :
81
+ task = pg .allreduce (tensor_y , core .ReduceOp .MAX )
82
+ task .wait ()
83
+ assert np .array_equal (tensor_y , max_result )
84
+
85
+ print ("test allreduce max api ok" )
86
+
87
+ # test broadcast
88
+ # rank 0
89
+ x = np .random .random (self .shape ).astype (self .dtype )
90
+ tensor_x = paddle .to_tensor (x )
91
+ # rank 1
92
+ y = np .random .random (self .shape ).astype (self .dtype )
93
+ tensor_y = paddle .to_tensor (y )
94
+
95
+ broadcast_result = paddle .assign (tensor_x )
96
+ if rank == 0 :
97
+ task = pg .broadcast (tensor_x , 0 )
98
+ assert np .array_equal (broadcast_result , tensor_x )
99
+ else :
100
+ task = pg .broadcast (tensor_y , 0 )
101
+ assert np .array_equal (broadcast_result , tensor_y )
102
+ print ("test broadcast api ok" )
103
+
104
+ # test barrier
105
+ # rank 0
106
+ if pg .rank () == 0 :
107
+ task = pg .barrier ()
108
+ task .wait ()
109
+ # rank 1
110
+ else :
111
+ task = pg .barrier ()
112
+ task .wait ()
113
+
114
+ print ("test barrier api ok\n " )
115
+
116
+ # test allgather
117
+ # rank 0
118
+ x = np .random .random (self .shape ).astype (self .dtype )
119
+ y = np .random .random (self .shape ).astype (self .dtype )
120
+ tensor_x = paddle .to_tensor (x )
121
+ tensor_y = paddle .to_tensor (y )
122
+ out_shape = list (self .shape )
123
+ out_shape [0 ] *= 2
124
+ out = np .random .random (out_shape ).astype (self .dtype )
125
+ tensor_out = paddle .to_tensor (out )
126
+ if pg .rank () == 0 :
127
+ task = pg .all_gather (tensor_x , tensor_out )
128
+ task .wait ()
129
+ paddle .device .cuda .synchronize ()
130
+ # rank 1
131
+ else :
132
+ task = pg .all_gather (tensor_y , tensor_out )
133
+ task .wait ()
134
+ out_1 = paddle .slice (tensor_out , [0 ], [0 ], [out_shape [0 ] // 2 ])
135
+ out_2 = paddle .slice (
136
+ tensor_out , [0 ], [out_shape [0 ] // 2 ], [out_shape [0 ]]
137
+ )
138
+ assert np .array_equal (tensor_x , out_1 )
139
+ assert np .array_equal (tensor_y , out_2 )
140
+ print ("test allgather api ok\n " )
141
+
142
+ # test Reduce
143
+ # rank 0
144
+ x = np .random .random (self .shape ).astype (self .dtype )
145
+ y = np .random .random (self .shape ).astype (self .dtype )
146
+ tensor_x = paddle .to_tensor (x )
147
+ tensor_y = paddle .to_tensor (y )
148
+ sum_result = tensor_x + tensor_y
149
+ if pg .rank () == 0 :
150
+ task = pg .reduce (tensor_x , 0 )
151
+ task .wait ()
152
+ # rank 1
153
+ else :
154
+ task = pg .reduce (tensor_y , 0 )
155
+ task .wait ()
156
+ if pg .rank () == 0 :
157
+ assert np .array_equal (tensor_x , sum_result )
158
+ print ("test reduce sum api ok\n " )
159
+
160
+ # test Scatter
161
+ # rank 0
162
+ in_shape = list (self .shape )
163
+ in_shape [0 ] *= 2
164
+ x = np .random .random (in_shape ).astype (self .dtype )
165
+ y = np .random .random (self .shape ).astype (self .dtype )
166
+ tensor_x = paddle .to_tensor (x )
167
+ tensor_y = paddle .to_tensor (y )
168
+ if pg .rank () == 0 :
169
+ task = pg .scatter (tensor_x , tensor_y , 0 )
170
+ task .wait ()
171
+ # rank 1
172
+ else :
173
+ task = pg .scatter (tensor_x , tensor_y , 0 )
174
+ task .wait ()
175
+ out1 = paddle .slice (tensor_x , [0 ], [0 ], [self .shape [0 ]])
176
+ out2 = paddle .slice (tensor_x , [0 ], [self .shape [0 ]], [self .shape [0 ] * 2 ])
177
+ if pg .rank () == 0 :
178
+ assert np .array_equal (tensor_y , out1 )
179
+ else :
180
+ assert np .array_equal (tensor_y , out2 )
181
+ print ("test scatter api ok\n " )
186
182
187
183
188
184
if __name__ == "__main__" :
0 commit comments