Skip to content

Commit de73bf5

Browse files
committed
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into rewrite_multi_gru
2 parents 59ef696 + 8d3457f commit de73bf5

File tree

321 files changed

+6050
-3741
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

321 files changed

+6050
-3741
lines changed

cmake/external/brpc.cmake

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ ExternalProject_Add(
4747
extern_brpc
4848
${EXTERNAL_PROJECT_LOG_ARGS}
4949
GIT_REPOSITORY "https://github.com/apache/incubator-brpc"
50-
GIT_TAG 1.2.0
50+
GIT_TAG 1.4.0
5151
PREFIX ${BRPC_PREFIX_DIR}
5252
UPDATE_COMMAND ""
5353
CMAKE_ARGS -DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER}

cmake/external/gloo.cmake

+17
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,23 @@ if(WITH_GPU)
4242
endif()
4343
endif()
4444

45+
if(CMAKE_COMPILER_IS_GNUCC)
46+
execute_process(COMMAND ${CMAKE_C_COMPILER} -dumpfullversion -dumpversion
47+
OUTPUT_VARIABLE GCC_VERSION)
48+
string(REGEX MATCHALL "[0-9]+" GCC_VERSION_COMPONENTS ${GCC_VERSION})
49+
list(GET GCC_VERSION_COMPONENTS 0 GCC_MAJOR)
50+
list(GET GCC_VERSION_COMPONENTS 1 GCC_MINOR)
51+
set(GCC_VERSION "${GCC_MAJOR}.${GCC_MINOR}")
52+
if(GCC_VERSION GREATER_EQUAL "12.0")
53+
file(TO_NATIVE_PATH ${PADDLE_SOURCE_DIR}/patches/gloo/device.cc.patch
54+
native_dst)
55+
set(GLOO_PATCH_COMMAND patch -d ${GLOO_SOURCE_DIR}/gloo/transport/tcp <
56+
${native_dst})
57+
file(TO_NATIVE_PATH ${PADDLE_SOURCE_DIR}/patches/gloo/types.h.patch
58+
types_header)
59+
set(GLOO_PATCH_COMMAND patch -d ${GLOO_SOURCE_DIR}/gloo/ < ${types_header})
60+
endif()
61+
endif()
4562
include_directories(${GLOO_INCLUDE_DIR})
4663

4764
if(WITH_ASCEND OR WITH_ASCEND_CL)

paddle/fluid/distributed/collective/CMakeLists.txt

+3-1
Original file line numberDiff line numberDiff line change
@@ -61,5 +61,7 @@ if(WITH_CUSTOM_DEVICE)
6161
place
6262
enforce
6363
collective_helper
64-
device_context)
64+
device_context
65+
comm_static_check
66+
dense_tensor)
6567
endif()

paddle/fluid/distributed/collective/process_group_custom.cc

+111-67
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,13 @@
1616

1717
#include "paddle/fluid/distributed/collective/common.h"
1818
#include "paddle/fluid/distributed/collective/custom_ccl_tools.h"
19+
#include "paddle/fluid/distributed/collective/utils.h"
1920
#include "paddle/fluid/memory/malloc.h"
2021
#include "paddle/fluid/platform/device_context.h"
2122
#include "paddle/fluid/platform/place.h"
2223
#include "paddle/phi/api/lib/utils/allocator.h"
2324
#include "paddle/phi/common/place.h"
25+
#include "paddle/phi/core/distributed/check/static_check.h"
2426

2527
DECLARE_bool(xccl_blocking_wait);
2628

@@ -234,10 +236,21 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupCustom::AllGather(
234236
const phi::DenseTensor& in_tensor,
235237
int64_t offset,
236238
int64_t numel,
237-
bool sync_op // for compatibility, no use now
238-
) {
239-
std::vector<phi::DenseTensor> in_wrapper{in_tensor};
239+
bool sync_op, // for compatibility, no use now
240+
bool use_calc_stream) {
241+
// numel > 0 indicates the tensor need to be sliced
242+
const phi::DenseTensor& in_tensor_maybe_partial =
243+
numel > 0
244+
? paddle::distributed::GetPartialTensor(in_tensor, offset, numel)
245+
: in_tensor;
246+
phi::distributed::CommStaticCheck::GatherLikeShape(*out_tensor,
247+
in_tensor_maybe_partial,
248+
/*dst_rank*/ rank_,
249+
/*cur_rank*/ rank_,
250+
size_);
251+
std::vector<phi::DenseTensor> in_wrapper{in_tensor_maybe_partial};
240252
std::vector<phi::DenseTensor> out_wrapper{*out_tensor};
253+
241254
return Collective(
242255
in_wrapper,
243256
out_wrapper,
@@ -247,80 +260,23 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupCustom::AllGather(
247260
const phi::stream::Stream& stream) {
248261
return phi::DeviceManager::CCLAllGather(
249262
device_type_,
250-
XcclGetPointerByOffset(input.data(), offset, input.dtype()),
263+
input.data(),
251264
output.data(),
252-
numel,
265+
input.numel(),
253266
phi::ccl::ToCCLDataType(input.dtype()),
254267
comm,
255268
stream);
256269
},
257270
CommType::ALLGATHER);
258271
}
259272

260-
std::shared_ptr<ProcessGroup::Task> ProcessGroupCustom::AllReduce(
261-
phi::DenseTensor* out_tensor,
262-
const phi::DenseTensor& in_tensor,
263-
const AllreduceOptions& opts,
264-
bool sync_op // for compatibility, no use now
265-
) {
266-
std::vector<phi::DenseTensor> in_wrapper{in_tensor};
267-
std::vector<phi::DenseTensor> out_wrapper{*out_tensor};
268-
return AllReduce(in_wrapper, out_wrapper, opts);
269-
}
270-
271-
std::shared_ptr<ProcessGroup::Task> ProcessGroupCustom::Broadcast(
273+
std::shared_ptr<ProcessGroup::Task> ProcessGroupCustom::AllGather(
272274
phi::DenseTensor* out_tensor,
273275
const phi::DenseTensor& in_tensor,
274-
const BroadcastOptions& opts,
275-
bool sync_op // for compatibility, no use now
276-
) {
277-
std::vector<phi::DenseTensor> in_wrapper{in_tensor};
278-
std::vector<phi::DenseTensor> out_wrapper{*out_tensor};
279-
return Broadcast(in_wrapper, out_wrapper, opts);
280-
}
281-
282-
std::shared_ptr<ProcessGroup::Task> ProcessGroupCustom::Barrier(
283-
const BarrierOptions& opts) {
284-
// Only support single card single process
285-
PADDLE_ENFORCE_GE(opts.device_id,
286-
0,
287-
platform::errors::PreconditionNotMet(
288-
"The barrier device id must greater or equal than 0."));
289-
platform::CustomPlace place(device_type_, opts.device_id);
290-
auto allocator = std::unique_ptr<phi::Allocator>(
291-
new paddle::experimental::DefaultAllocator(place));
292-
phi::DenseTensorMeta meta(phi::DataType::FLOAT32, phi::DDim{1});
293-
phi::DenseTensor barrier_tensor{allocator.get(), meta};
294-
295-
auto task = ProcessGroupCustom::AllReduce(&barrier_tensor,
296-
barrier_tensor,
297-
{},
298-
/*sync_op*/ true);
299-
auto xccl_task = dynamic_cast<ProcessGroupCustom::CustomTask*>(task.get());
300-
xccl_task->barrierTensors_ = {barrier_tensor};
301-
return task;
302-
}
303-
304-
phi::DeviceContext* ProcessGroupCustom::GetDeviceContext(
305-
const Place& place) const {
306-
const std::string key = GetKeyFromPlace(place);
307-
const auto& iter = places_to_ctx_.find(key);
308-
PADDLE_ENFORCE_NE(
309-
iter,
310-
places_to_ctx_.end(),
311-
platform::errors::NotFound(
312-
"Cannot find the device context in this process group."));
313-
return iter->second[0].get();
314-
}
315-
316-
phi::ccl::CCLComm ProcessGroupCustom::CustomCCLComm(const Place& place) const {
317-
std::vector<Place> places = {place};
318-
const auto& iter = places_to_customcomm_.find(GetKeyFromPlaces(places));
319-
PADDLE_ENFORCE_NE(iter,
320-
places_to_customcomm_.end(),
321-
platform::errors::InvalidArgument(
322-
"Cannot find nccl comm in process group."));
323-
return iter->second[0]->GetCustomCCLComm();
276+
int64_t offset,
277+
int64_t numel,
278+
bool sync_op) {
279+
return AllGather(out_tensor, in_tensor, offset, numel, sync_op);
324280
}
325281

326282
// TODO(sunyilun): methods below will be removed later
@@ -356,6 +312,28 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupCustom::AllGather(
356312
CommType::ALLGATHER);
357313
}
358314

315+
std::shared_ptr<ProcessGroup::Task> ProcessGroupCustom::AllReduce(
316+
phi::DenseTensor* out_tensor,
317+
const phi::DenseTensor& in_tensor,
318+
const AllreduceOptions& opts,
319+
bool sync_op, // for compatibility, no use now
320+
bool use_calc_stream) {
321+
std::vector<phi::DenseTensor> in_wrapper{in_tensor};
322+
std::vector<phi::DenseTensor> out_wrapper{*out_tensor};
323+
return AllReduce(in_wrapper, out_wrapper, opts);
324+
}
325+
326+
std::shared_ptr<ProcessGroup::Task> ProcessGroupCustom::AllReduce(
327+
phi::DenseTensor* out_tensor,
328+
const phi::DenseTensor& in_tensor,
329+
const AllreduceOptions& opts,
330+
bool sync_op // for compatibility, no use now
331+
) {
332+
std::vector<phi::DenseTensor> in_wrapper{in_tensor};
333+
std::vector<phi::DenseTensor> out_wrapper{*out_tensor};
334+
return AllReduce(in_wrapper, out_wrapper, opts);
335+
}
336+
359337
std::shared_ptr<ProcessGroup::Task> ProcessGroupCustom::AllReduce(
360338
std::vector<phi::DenseTensor>& in_tensors, // NOLINT
361339
std::vector<phi::DenseTensor>& out_tensors, // NOLINT
@@ -390,6 +368,72 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupCustom::AllReduce(
390368
CommType::ALLREDUCE);
391369
}
392370

371+
std::shared_ptr<ProcessGroup::Task> ProcessGroupCustom::Broadcast(
372+
phi::DenseTensor* out_tensor,
373+
const phi::DenseTensor& in_tensor,
374+
const BroadcastOptions& opts,
375+
bool sync_op, // for compatibility, no use now
376+
bool use_calc_stream) {
377+
std::vector<phi::DenseTensor> in_wrapper{in_tensor};
378+
std::vector<phi::DenseTensor> out_wrapper{*out_tensor};
379+
return Broadcast(in_wrapper, out_wrapper, opts);
380+
}
381+
382+
std::shared_ptr<ProcessGroup::Task> ProcessGroupCustom::Broadcast(
383+
phi::DenseTensor* out_tensor,
384+
const phi::DenseTensor& in_tensor,
385+
const BroadcastOptions& opts,
386+
bool sync_op) {
387+
std::vector<phi::DenseTensor> in_wrapper{in_tensor};
388+
std::vector<phi::DenseTensor> out_wrapper{*out_tensor};
389+
return Broadcast(in_wrapper, out_wrapper, opts);
390+
}
391+
392+
std::shared_ptr<ProcessGroup::Task> ProcessGroupCustom::Barrier(
393+
const BarrierOptions& opts) {
394+
// Only support single card single process
395+
PADDLE_ENFORCE_GE(opts.device_id,
396+
0,
397+
platform::errors::PreconditionNotMet(
398+
"The barrier device id must greater or equal than 0."));
399+
platform::CustomPlace place(device_type_, opts.device_id);
400+
auto allocator = std::unique_ptr<phi::Allocator>(
401+
new paddle::experimental::DefaultAllocator(place));
402+
phi::DenseTensorMeta meta(phi::DataType::FLOAT32, phi::DDim{1});
403+
phi::DenseTensor barrier_tensor{allocator.get(), meta};
404+
405+
auto task = ProcessGroupCustom::AllReduce(&barrier_tensor,
406+
barrier_tensor,
407+
{},
408+
/*sync_op*/ true,
409+
false);
410+
auto xccl_task = dynamic_cast<ProcessGroupCustom::CustomTask*>(task.get());
411+
xccl_task->barrierTensors_ = {barrier_tensor};
412+
return task;
413+
}
414+
415+
phi::DeviceContext* ProcessGroupCustom::GetDeviceContext(
416+
const Place& place) const {
417+
const std::string key = GetKeyFromPlace(place);
418+
const auto& iter = places_to_ctx_.find(key);
419+
PADDLE_ENFORCE_NE(
420+
iter,
421+
places_to_ctx_.end(),
422+
platform::errors::NotFound(
423+
"Cannot find the device context in this process group."));
424+
return iter->second[0].get();
425+
}
426+
427+
phi::ccl::CCLComm ProcessGroupCustom::CustomCCLComm(const Place& place) const {
428+
std::vector<Place> places = {place};
429+
const auto& iter = places_to_customcomm_.find(GetKeyFromPlaces(places));
430+
PADDLE_ENFORCE_NE(iter,
431+
places_to_customcomm_.end(),
432+
platform::errors::InvalidArgument(
433+
"Cannot find nccl comm in process group."));
434+
return iter->second[0]->GetCustomCCLComm();
435+
}
436+
393437
std::shared_ptr<ProcessGroup::Task> ProcessGroupCustom::Broadcast(
394438
std::vector<phi::DenseTensor>& in_tensors, // NOLINT
395439
std::vector<phi::DenseTensor>& out_tensors, // NOLINT

paddle/fluid/distributed/collective/process_group_custom.h

+41-19
Original file line numberDiff line numberDiff line change
@@ -80,25 +80,6 @@ class ProcessGroupCustom : public ProcessGroupWithoutStream {
8080

8181
std::string GetBackendName() const override { return "XCCL_" + device_type_; }
8282

83-
std::shared_ptr<ProcessGroup::Task> AllGather(
84-
phi::DenseTensor* out_tensor,
85-
const phi::DenseTensor& in_tensor,
86-
int64_t offset,
87-
int64_t numel,
88-
bool sync_op) override;
89-
90-
std::shared_ptr<ProcessGroup::Task> AllReduce(
91-
phi::DenseTensor* out_tensor,
92-
const phi::DenseTensor& in_tensor,
93-
const AllreduceOptions& opts,
94-
bool sync_op) override;
95-
96-
std::shared_ptr<ProcessGroup::Task> Broadcast(
97-
phi::DenseTensor* out_tensor,
98-
const phi::DenseTensor& in_tensor,
99-
const BroadcastOptions& opts,
100-
bool sync_op) override;
101-
10283
std::shared_ptr<ProcessGroup::Task> Barrier(
10384
const BarrierOptions& = BarrierOptions()) override;
10485

@@ -111,16 +92,57 @@ class ProcessGroupCustom : public ProcessGroupWithoutStream {
11192
std::vector<phi::DenseTensor>& in_tensors,
11293
std::vector<phi::DenseTensor>& out_tensors) override;
11394

95+
std::shared_ptr<ProcessGroup::Task> AllGather(
96+
phi::DenseTensor* out_tensor,
97+
const phi::DenseTensor& in_tensor,
98+
int64_t offset,
99+
int64_t numel,
100+
bool sync_op,
101+
bool use_calc_stream) override;
102+
103+
std::shared_ptr<ProcessGroup::Task> AllGather(
104+
phi::DenseTensor* out_tensor,
105+
const phi::DenseTensor& in_tensor,
106+
int64_t offset,
107+
int64_t numel,
108+
bool sync_op) override;
109+
114110
std::shared_ptr<ProcessGroup::Task> AllReduce(
115111
std::vector<phi::DenseTensor>& in_tensors,
116112
std::vector<phi::DenseTensor>& out_tensors,
117113
const AllreduceOptions& = AllreduceOptions()) override;
118114

115+
std::shared_ptr<ProcessGroup::Task> AllReduce(
116+
phi::DenseTensor* out_tensor,
117+
const phi::DenseTensor& in_tensor,
118+
const AllreduceOptions& opts,
119+
bool sync_op,
120+
bool use_calc_stream) override;
121+
122+
std::shared_ptr<ProcessGroup::Task> AllReduce(
123+
phi::DenseTensor* out_tensor,
124+
const phi::DenseTensor& in_tensor,
125+
const AllreduceOptions& opts,
126+
bool sync_op) override;
127+
119128
std::shared_ptr<ProcessGroup::Task> Broadcast(
120129
std::vector<phi::DenseTensor>& in_tensors,
121130
std::vector<phi::DenseTensor>& out_tensors,
122131
const BroadcastOptions& = BroadcastOptions()) override;
123132

133+
std::shared_ptr<ProcessGroup::Task> Broadcast(
134+
phi::DenseTensor* out_tensor,
135+
const phi::DenseTensor& in_tensor,
136+
const BroadcastOptions& opts,
137+
bool sync_op,
138+
bool use_calc_stream) override;
139+
140+
std::shared_ptr<ProcessGroup::Task> Broadcast(
141+
phi::DenseTensor* out_tensor,
142+
const phi::DenseTensor& in_tensor,
143+
const BroadcastOptions& opts,
144+
bool sync_op) override;
145+
124146
protected:
125147
virtual std::shared_ptr<ProcessGroupCustom::CustomTask> CreateTask(
126148
std::vector<Place> places,

paddle/fluid/eager/auto_code_generator/generator/eager_gen.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -1840,7 +1840,7 @@ def GenerateHigherOrderNodeCreationCode(self):
18401840

18411841
if is_composite_grad_api and next_grad_node_creation_str != '':
18421842
next_grad_node_creation_str = f"""
1843-
if (!paddle::prim::PrimCommonUtils::IsBwdPrimEnabled()) {{
1843+
if (!paddle::prim::PrimCommonUtils::IsEagerPrimEnabled()) {{
18441844
{next_grad_node_creation_str}
18451845
}}
18461846
"""
@@ -2260,7 +2260,7 @@ def GenerateNodeDefinition(
22602260
# TODO(Ruting):using composite only when we don't have backward kernel in the future.
22612261
elif is_composite_grad_api:
22622262
grad_function_call_str = f"""
2263-
if (paddle::prim::PrimCommonUtils::IsBwdPrimEnabled()) {{
2263+
if (paddle::prim::PrimCommonUtils::IsEagerPrimEnabled()) {{
22642264
{indent}{composite_grad_api_namespace}{composite_grad_api_name}{composite_template_name}({composite_grad_api_args_str});
22652265
VLOG(4) << "Composite api {composite_grad_api_name} is called ";
22662266
}}else{{

0 commit comments

Comments
 (0)