Skip to content

Commit 4d3ad62

Browse files
committed
Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into rewrite_multi_gru
2 parents 0246039 + 84fe2de commit 4d3ad62

File tree

104 files changed

+4141
-1017
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

104 files changed

+4141
-1017
lines changed

cmake/experiments/cuda_module_loading_lazy.cmake

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ if(LINUX)
3131
message("cuda 11.7+ already support lazy module loading")
3232
return()
3333
endif()
34+
if(${CUDA_VERSION} VERSION_LESS "11.2" AND ${CMAKE_CXX_COMPILER_VERSION}
35+
VERSION_GREATER_EQUAL 12.0)
36+
message("cuda less than 11.2 doesn't support gcc12")
37+
return()
38+
endif()
3439

3540
message(
3641
"for cuda before 11.7, libcudart.so must be used for the lazy module loading trick to work, instead of libcudart_static.a"

cmake/external/gloo.cmake

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,17 @@ set(GLOO_LIBRARIES
3131
"${GLOO_INSTALL_DIR}/lib/libgloo.a"
3232
CACHE FILEPATH "gloo library." FORCE)
3333

34+
set(GLOO_PATCH_COMMAND "")
35+
if(WITH_GPU)
36+
if(${CMAKE_CUDA_COMPILER_VERSION} LESS 12.0 AND ${CMAKE_CXX_COMPILER_VERSION}
37+
VERSION_GREATER 12.0)
38+
file(TO_NATIVE_PATH ${PADDLE_SOURCE_DIR}/patches/gloo/device.cc.patch
39+
native_dst)
40+
set(GLOO_PATCH_COMMAND patch -d ${GLOO_SOURCE_DIR}/gloo/transport/tcp <
41+
${native_dst})
42+
endif()
43+
endif()
44+
3445
include_directories(${GLOO_INCLUDE_DIR})
3546

3647
if(WITH_ASCEND OR WITH_ASCEND_CL)
@@ -59,6 +70,7 @@ else()
5970
GIT_TAG ${GLOO_TAG}
6071
PREFIX "${GLOO_PREFIX_DIR}"
6172
UPDATE_COMMAND ""
73+
PATCH_COMMAND ${GLOO_PATCH_COMMAND}
6274
CONFIGURE_COMMAND ""
6375
BUILD_COMMAND
6476
mkdir -p ${GLOO_SOURCE_DIR}/build && cd ${GLOO_SOURCE_DIR}/build && cmake

cmake/external/protobuf.cmake

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,12 @@ function(build_protobuf TARGET_NAME BUILD_FOR_HOST)
250250
else()
251251
set(PROTOBUF_REPOSITORY ${GIT_URL}/protocolbuffers/protobuf.git)
252252
set(PROTOBUF_TAG 9f75c5aa851cd877fb0d93ccc31b8567a6706546)
253+
if(WITH_GPU)
254+
if(${CMAKE_CUDA_COMPILER_VERSION} LESS 12.0
255+
AND ${CMAKE_CXX_COMPILER_VERSION} VERSION_GREATER 12.0)
256+
set(PROTOBUF_TAG 2dc747c574b68a808ea4699d26942c8132fe2b09)
257+
endif()
258+
endif()
253259
endif()
254260
if(WITH_ARM_BRPC)
255261
set(ARM_PROTOBUF_URL
@@ -322,6 +328,12 @@ elseif(WITH_ARM_BRPC)
322328
set(PROTOBUF_VERSION 3.7.1-baidu-ee-common)
323329
else()
324330
set(PROTOBUF_VERSION 3.1.0)
331+
if(WITH_GPU)
332+
if(${CMAKE_CUDA_COMPILER_VERSION} LESS 12.0
333+
AND ${CMAKE_CXX_COMPILER_VERSION} VERSION_GREATER 12.0)
334+
set(PROTOBUF_VERSION 3.16.0)
335+
endif()
336+
endif()
325337
endif()
326338

327339
if(NOT PROTOBUF_FOUND)

cmake/external/warpctc.cmake

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,19 @@ set(WARPCTC_INSTALL_DIR ${THIRD_PARTY_PATH}/install/warpctc)
2525
set(WARPCTC_REPOSITORY ${GIT_URL}/baidu-research/warp-ctc.git)
2626
set(WARPCTC_TAG bdc2b4550453e0ef2d3b5190f9c6103a84eff184)
2727

28+
set(WARPCTC_SOURCE_DIR ${THIRD_PARTY_PATH}/warpctc/src/extern_warpctc)
29+
set(WARPCTC_PATCH_COMMAND "")
30+
set(WARPCTC_CCBIN_OPTION "")
31+
if(NOT WIN32 AND WITH_GPU)
32+
if(${CMAKE_CUDA_COMPILER_VERSION} LESS 12.0 AND ${CMAKE_CXX_COMPILER_VERSION}
33+
VERSION_GREATER 12.0)
34+
file(TO_NATIVE_PATH
35+
${PADDLE_SOURCE_DIR}/patches/warpctc/CMakeLists.txt.patch native_src)
36+
set(WARPCTC_PATCH_COMMAND patch -d ${WARPCTC_SOURCE_DIR} < ${native_src})
37+
set(WARPCTC_CCBIN_OPTION -DCCBIN_COMPILER=${CCBIN_COMPILER})
38+
endif()
39+
endif()
40+
2841
set(WARPCTC_INCLUDE_DIR
2942
"${WARPCTC_INSTALL_DIR}/include"
3043
CACHE PATH "Warp-ctc Directory" FORCE)
@@ -112,7 +125,7 @@ else()
112125
GIT_TAG ${WARPCTC_TAG}
113126
PREFIX ${WARPCTC_PREFIX_DIR}
114127
UPDATE_COMMAND ""
115-
PATCH_COMMAND ""
128+
PATCH_COMMAND ${WARPCTC_PATCH_COMMAND}
116129
#BUILD_ALWAYS 1
117130
CMAKE_ARGS -DCMAKE_CXX_COMPILER=${CMAKE_CXX_COMPILER}
118131
-DCMAKE_C_COMPILER=${CMAKE_C_COMPILER}
@@ -132,7 +145,9 @@ else()
132145
-DBUILD_TESTS=OFF
133146
-DCMAKE_POSITION_INDEPENDENT_CODE=ON
134147
-DCMAKE_BUILD_TYPE=${THIRD_PARTY_BUILD_TYPE}
148+
-DCUDA_TOOLKIT_ROOT_DIR=${CUDA_TOOLKIT_ROOT_DIR}
135149
${EXTERNAL_OPTIONAL_ARGS}
150+
${WARPCTC_CCBIN_OPTION}
136151
CMAKE_CACHE_ARGS
137152
-DCMAKE_BUILD_TYPE:STRING=${THIRD_PARTY_BUILD_TYPE}
138153
-DCMAKE_POSITION_INDEPENDENT_CODE:BOOL=ON

paddle/fluid/distributed/fleet_executor/CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ cc_library(
3737
compute_interceptor.cc
3838
amplifier_interceptor.cc
3939
cond_interceptor.cc
40+
start_interceptor.cc
4041
source_interceptor.cc
4142
sink_interceptor.cc
4243
message_service.cc
@@ -69,6 +70,8 @@ if(WITH_DISTRIBUTE)
6970
${DISTRIBUTE_COMPILE_FLAGS})
7071
set_source_files_properties(
7172
cond_interceptor.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
73+
set_source_files_properties(
74+
start_interceptor.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
7275
set_source_files_properties(
7376
source_interceptor.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS})
7477
set_source_files_properties(

paddle/fluid/distributed/fleet_executor/carrier.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ USE_INTERCEPTOR(Compute);
3636
USE_INTERCEPTOR(Amplifier);
3737
USE_INTERCEPTOR(Sink);
3838
USE_INTERCEPTOR(Cond);
39+
USE_INTERCEPTOR(Start);
3940

4041
void Carrier::Init(
4142
int64_t rank,

paddle/fluid/distributed/fleet_executor/compute_interceptor.cc

Lines changed: 43 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include "paddle/fluid/distributed/fleet_executor/task_node.h"
1919
#include "paddle/fluid/framework/executor_gc_helper.h"
2020
#include "paddle/fluid/framework/operator.h"
21+
#include "paddle/phi/core/errors.h"
2122

2223
namespace paddle {
2324
namespace distributed {
@@ -33,23 +34,30 @@ void ComputeInterceptor::PrepareDeps() {
3334
auto& downstream = node_->downstream();
3435

3536
for (auto up : upstream) {
36-
in_readys_.emplace(up.first, std::make_pair(up.second, 0));
37+
std::map<int64_t, int64_t> ready_size_map;
38+
for (int64_t i = 0; i < node_->max_run_times(); ++i) {
39+
ready_size_map.emplace(i, 0);
40+
}
41+
in_readys_.emplace(up.first, std::make_pair(up.second, ready_size_map));
3742
}
3843
for (auto down : downstream) {
3944
out_buffs_.emplace(down.first, std::make_pair(down.second, 0));
4045
}
4146
}
4247

43-
void ComputeInterceptor::IncreaseReady(int64_t up_id) {
48+
void ComputeInterceptor::IncreaseReady(int64_t up_id, int64_t scope_id) {
4449
auto it = in_readys_.find(up_id);
4550
PADDLE_ENFORCE_NE(it,
4651
in_readys_.end(),
4752
platform::errors::NotFound(
4853
"Cannot find upstream=%lld in in_readys.", up_id));
4954

5055
auto max_ready_size = it->second.first;
51-
auto ready_size = it->second.second;
52-
ready_size += 1;
56+
const auto& ready_scope_map = it->second.second;
57+
int64_t ready_size = 0;
58+
for (auto& scope_iter : ready_scope_map) {
59+
ready_size += scope_iter.second;
60+
}
5361
if (max_ready_size != INFINITE_BUFFER_SIZE) {
5462
PADDLE_ENFORCE_LE(
5563
ready_size,
@@ -61,7 +69,14 @@ void ComputeInterceptor::IncreaseReady(int64_t up_id) {
6169
ready_size,
6270
max_ready_size));
6371
}
64-
it->second.second = ready_size;
72+
PADDLE_ENFORCE_NE(
73+
it->second.second.find(scope_id),
74+
it->second.second.end(),
75+
platform::errors::OutOfRange(
76+
"Interceptor %lld can not find scope %lld in upstream ready map",
77+
interceptor_id_,
78+
scope_id));
79+
it->second.second.at(scope_id) = ready_scope_map.at(scope_id) + 1;
6580
}
6681

6782
void ComputeInterceptor::DecreaseBuff(int64_t down_id) {
@@ -83,16 +98,21 @@ void ComputeInterceptor::DecreaseBuff(int64_t down_id) {
8398
}
8499

85100
bool ComputeInterceptor::IsInputReady() {
86-
for (auto& ins : in_readys_) {
87-
auto ready_size = ins.second.second;
88-
// not ready, return false
89-
if (ready_size == 0) {
90-
VLOG(3) << "Interceptor " << GetInterceptorId()
101+
for (int64_t i = 0; i < node_->max_run_times(); ++i) {
102+
bool flag = true;
103+
for (auto& ins : in_readys_) {
104+
auto ready_size_map = ins.second.second;
105+
flag = flag && (ready_size_map.at(i) != 0);
106+
}
107+
if (flag) {
108+
cur_scope_id_ = i;
109+
return true;
110+
} else {
111+
VLOG(3) << "Interceptor " << GetInterceptorId() << " in scope " << i
91112
<< "'s upstreams aren't all ready.";
92-
return false;
93113
}
94114
}
95-
return true;
115+
return false;
96116
}
97117

98118
bool ComputeInterceptor::CanWriteOutput() {
@@ -144,7 +164,7 @@ void ComputeInterceptor::SendDataReadyToDownStream() {
144164
void ComputeInterceptor::ReplyCompletedToUpStream() {
145165
for (auto& ins : in_readys_) {
146166
auto up_id = ins.first;
147-
auto ready_size = ins.second.second;
167+
auto ready_size = ins.second.second.at(cur_scope_id_);
148168
ready_size -= 1;
149169
PADDLE_ENFORCE_GE(
150170
ready_size,
@@ -153,7 +173,7 @@ void ComputeInterceptor::ReplyCompletedToUpStream() {
153173
"upstream=%lld ready_size must >= 0, but now got %lld",
154174
up_id,
155175
ready_size));
156-
ins.second.second = ready_size;
176+
ins.second.second[cur_scope_id_] = ready_size;
157177

158178
VLOG(3) << "ComputeInterceptor " << interceptor_id_
159179
<< " Reply data_is_useless msg to " << up_id
@@ -187,11 +207,8 @@ void ComputeInterceptor::RunOps() {
187207

188208
void ComputeInterceptor::Run() {
189209
while (IsInputReady() && CanWriteOutput()) {
190-
VLOG(3) << "id=" << GetInterceptorId() << " ComputeInterceptor running";
191-
192-
// get the ready scope id from queue
193-
cur_scope_id_ = ready_queue_.front();
194-
ready_queue_.pop();
210+
VLOG(0) << "id=" << GetInterceptorId()
211+
<< " ComputeInterceptor running in scope " << cur_scope_id_;
195212

196213
RunOps();
197214

@@ -204,10 +221,15 @@ void ComputeInterceptor::Run() {
204221

205222
void ComputeInterceptor::Compute(const InterceptorMessage& msg) {
206223
if (msg.message_type() == DATA_IS_READY) {
207-
IncreaseReady(msg.src_id());
208-
ready_queue_.push(msg.scope_idx());
224+
VLOG(3) << "Compute interceptor " << interceptor_id_
225+
<< " receive data_is_ready " << msg.src_id() << " "
226+
<< msg.scope_idx() << " ";
227+
IncreaseReady(msg.src_id(), msg.scope_idx());
209228
Run();
210229
} else if (msg.message_type() == DATA_IS_USELESS) {
230+
VLOG(3) << "Compute interceptor " << interceptor_id_
231+
<< " receive data_is_useless " << msg.src_id() << " "
232+
<< msg.scope_idx() << " ";
211233
DecreaseBuff(msg.src_id());
212234
Run();
213235
}

paddle/fluid/distributed/fleet_executor/compute_interceptor.h

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -32,25 +32,24 @@ class ComputeInterceptor : public Interceptor {
3232
virtual void RunOps();
3333
virtual void SendDataReadyToDownStream();
3434
virtual void ReplyCompletedToUpStream();
35+
virtual void Compute(const InterceptorMessage& msg);
36+
void Run();
37+
void IncreaseReady(int64_t up_id, int64_t scope_id);
38+
void DecreaseBuff(int64_t down_id);
3539

36-
std::queue<int64_t> ready_queue_;
3740
int64_t cur_scope_id_;
3841

42+
// upstream_id-->(max_ready_size, scope-->ready_size)
43+
std::map<int64_t, std::pair<int64_t, std::map<int64_t, int64_t>>>
44+
in_readys_{};
45+
// downstream_id-->(max_buffer_size, used_size)
46+
std::map<int64_t, std::pair<int64_t, int64_t>> out_buffs_{};
47+
3948
private:
4049
void PrepareDeps();
4150

42-
void IncreaseReady(int64_t up_id);
43-
void DecreaseBuff(int64_t down_id);
4451
bool IsInputReady();
4552
bool CanWriteOutput();
46-
47-
void Run();
48-
void Compute(const InterceptorMessage& msg);
49-
50-
// upstream_id-->(max_ready_size, ready_size)
51-
std::map<int64_t, std::pair<int64_t, int64_t>> in_readys_{};
52-
// downstream_id-->(max_buffer_size, used_size)
53-
std::map<int64_t, std::pair<int64_t, int64_t>> out_buffs_{};
5453
};
5554

5655
} // namespace distributed

paddle/fluid/distributed/fleet_executor/cond_interceptor.cc

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,6 @@ void CondInterceptor::ReplyDataIsUseless(int64_t up_id) {
9898
}
9999

100100
void CondInterceptor::Compute() {
101-
cur_scope_id_ = ready_queue_.front();
102-
ready_queue_.pop();
103101
bool cond = GetCondResult();
104102
VLOG(3) << "Cond interceptor get condition var " << node_->cond_var()
105103
<< " with value " << cond;
@@ -109,14 +107,14 @@ void CondInterceptor::Compute() {
109107
SendDataReady(down_id);
110108
}
111109
} else {
112-
VLOG(3) << "Finish loop in scope " << cur_scope_id_;
110+
VLOG(0) << "Finish loop in scope " << cur_scope_id_;
113111
SendDataReady(stop_loop_id_);
114112
}
115113
}
116114

117115
void CondInterceptor::Run(const InterceptorMessage& msg) {
118116
if (msg.message_type() == DATA_IS_READY) {
119-
ready_queue_.push(msg.scope_idx());
117+
cur_scope_id_ = msg.scope_idx();
120118
Compute();
121119
} else if (msg.message_type() == DATA_IS_USELESS) {
122120
if (node_->id_to_dep_type().at(msg.src_id()) == DependType::STOP_LOOP) {

paddle/fluid/distributed/fleet_executor/cond_interceptor.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ class CondInterceptor final : public Interceptor {
3939
void SendDataReady(int64_t down_id);
4040
void ReplyDataIsUseless(int64_t up_id);
4141

42-
std::queue<int64_t> ready_queue_;
4342
int64_t cur_scope_id_;
4443

4544
std::set<int64_t> normal_in_id_;

0 commit comments

Comments
 (0)