Skip to content

feature/parallel_gpu #7293

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Jan 10, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 29 additions & 32 deletions paddle/framework/lod_tensor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,19 @@ std::ostream &operator<<(std::ostream &os, const LoD &lod) {
}

std::ostream &operator<<(std::ostream &os, const LoDTensor &t) {
PADDLE_ENFORCE(platform::is_cpu_place(t.place()));
PADDLE_ENFORCE(t.type().hash_code() == typeid(float).hash_code());

if (!platform::is_cpu_place(t.place())) {
LoDTensor tt;
framework::Copy(t, platform::CPUPlace(), &tt);
platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();
auto &dev_ctx = *pool.Get(t.place());
dev_ctx.Wait();

os << tt;
return os;
}

os << "dim: " << t.dims() << "\n";
os << "lod: " << t.lod() << "\n";

Expand Down Expand Up @@ -211,67 +221,54 @@ void DeserializeFromStream(std::istream &is, LoDTensor *tensor,
DeserializeFromStream(is, static_cast<Tensor *>(tensor), dev_ctx);
}

// TODO(tonyyang-svail): make this function support LoD
std::vector<LoDTensor> LoDTensor::SplitLoDTensor(
const std::vector<platform::Place> places) const {
check_memory_size();
// PADDLE_ENFORCE(lod().empty() || (lod().size() == 1 && lod()[0].empty())
// , "Disable parallel lod for now");
PADDLE_ENFORCE(lod().empty(), "Disable parallel lod for now");
PADDLE_ENFORCE(dims()[0] % places.size() == 0,
"Batch size should be divided by places size");

std::vector<LoDTensor> lods;
for (size_t place_idx = 0; place_idx < places.size(); ++place_idx) {
size_t begin = place_idx * dims()[0] / places.size();
size_t end = (place_idx + 1) * dims()[0] / places.size();
auto src = Slice(static_cast<int>(begin), static_cast<int>(end));
int begin = place_idx * dims()[0] / places.size();
int end = (place_idx + 1) * dims()[0] / places.size();

LoDTensor dst;
dst.Resize(src.dims());
auto src = Slice(begin, end);
auto &dst_place = places[place_idx];
auto dst_ptr = dst.mutable_data(dst_place, src.type());

// TODO(tonyyang-svail):
// change the following to framework::Copy
auto src_place = src.place();
auto src_ptr = src.data<void>();
auto size = src.numel() * SizeOfType(src.type());
if (platform::is_cpu_place(src_place) &&
platform::is_cpu_place(dst_place)) {
memory::Copy(boost::get<platform::CPUPlace>(dst_place), dst_ptr,
boost::get<platform::CPUPlace>(src_place), src_ptr, size);
} else {
PADDLE_THROW("Not Implemented");
}
LoDTensor dst;
framework::Copy(src, dst_place, &dst);

lods.emplace_back(dst);
}

return lods;
}

// TODO(tonyyang-svail): make this function support LoD
void LoDTensor::MergeLoDTensor(
const std::vector<const LoDTensor *> &lod_tensors, platform::Place place) {
PADDLE_ENFORCE(platform::is_cpu_place(place));
const std::vector<const LoDTensor *> &lod_tensors,
platform::Place dst_place) {
PADDLE_ENFORCE(!lod_tensors.empty());

framework::DDim new_dim = lod_tensors[0]->dims();
std::type_index new_type = lod_tensors[0]->type();
auto new_layout = lod_tensors[0]->layout();
for (auto *lod : lod_tensors) {
PADDLE_ENFORCE(new_dim == lod->dims());
PADDLE_ENFORCE(new_type == lod->type());
PADDLE_ENFORCE(platform::is_cpu_place(lod->place()));
PADDLE_ENFORCE(new_layout == lod->layout());
}
new_dim[0] *= lod_tensors.size();
Resize(new_dim);
set_layout(new_layout);

auto *dst_ptr = reinterpret_cast<uint8_t *>(mutable_data(place, new_type));
mutable_data(dst_place, new_type);
int begin = 0;
for (auto *src : lod_tensors) {
auto size = src->numel() * SizeOfType(src->type());
memory::Copy(boost::get<platform::CPUPlace>(place), dst_ptr,
boost::get<platform::CPUPlace>(src->place()),
src->data<void>(), size);
dst_ptr += size;
int end = begin + src->dims()[0];
auto dst = Slice(begin, end);
framework::Copy(*src, dst_place, &dst);
begin = end;
}
}

Expand Down
36 changes: 18 additions & 18 deletions paddle/framework/tensor_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ namespace framework {
*
* @note Copy supports CPU <-> GPU, GPU <-> GPU.
*/

inline void Copy(const Tensor& src, const platform::Place& dst_place,
const platform::DeviceContext& ctx, Tensor* dst) {
VLOG(3) << "Copy " << src.dims() << " from " << src.place() << " to "
<< dst_place;
src.check_memory_size();

dst->Resize(src.dims());
Expand Down Expand Up @@ -88,26 +89,25 @@ inline void Copy(const Tensor& src, const platform::Place& dst_place,
}

/**
* @brief Copy supports CPU <-> CPU
* @brief Wrapper on
* Copy(const Tensor& src, const platform::Place& dst_place,
* const platform::DeviceContext& ctx, Tensor* dst);
*
* @param[in] src The external tensor.
* @param[in] dst_place The dst place.
*
* @note Copy supports CPU <-> GPU, GPU <-> GPU.
*/
inline void Copy(const Tensor& src, const platform::Place& dst_place,
Tensor* dst) {
src.check_memory_size();
dst->Resize(src.dims());
dst->set_layout(src.layout());

auto src_place = src.place();
auto src_ptr = src.data<void>();

auto dst_ptr = dst->mutable_data(dst_place, src.type());

auto size = src.numel() * SizeOfType(src.type());

PADDLE_ENFORCE(platform::is_cpu_place(src_place) &&
platform::is_cpu_place(dst_place));

memory::Copy(boost::get<platform::CPUPlace>(dst_place), dst_ptr,
boost::get<platform::CPUPlace>(src_place), src_ptr, size);
platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance();
const platform::DeviceContext* dev_ctx;
if (platform::is_gpu_place(src.place())) {
dev_ctx = pool.Get(src.place());
} else {
dev_ctx = pool.Get(dst_place);
}
Copy(src, dst_place, *dev_ctx, dst);
}

/**
Expand Down
2 changes: 1 addition & 1 deletion paddle/framework/var_desc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ const proto::TensorDesc &VarDesc::tensor_desc() const {
case proto::VarDesc::LOD_TENSOR_ARRAY:
return desc_.tensor_array().tensor();
default:
PADDLE_THROW("The type of var '", this->Name(), "' is unsupported.");
PADDLE_THROW("The type of var %s is unsupported.", this->Name());
}
}

Expand Down
3 changes: 2 additions & 1 deletion paddle/operators/get_places_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -111,4 +111,5 @@ class GetPlacesInferShape : public framework::InferShapeBase {
namespace ops = paddle::operators;

REGISTER_OPERATOR(get_places, ops::GetPlacesOp, ops::GetPlacesOpProtoMaker,
ops::GetPlacesInferVarType, ops::GetPlacesInferShape);
ops::GetPlacesInferVarType, ops::GetPlacesInferShape,
paddle::framework::EmptyGradOpMaker);
56 changes: 38 additions & 18 deletions paddle/operators/parallel_do_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ void SplitTensorAndMoveTensorToScopes(
const std::vector<framework::Scope *> &sub_scopes,
const std::vector<platform::Place> &places,
const std::vector<std::string> &names) {
PADDLE_ENFORCE_EQ(sub_scopes.size(), places.size());
for (auto &argu : names) {
auto *var = scope.FindVar(argu);
const auto &tensor = var->Get<LoDTensor>();
Expand All @@ -54,6 +55,15 @@ void SplitTensorAndMoveTensorToScopes(
}
}

void WaitOnPlaces(const std::vector<platform::Place> places) {
platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance();

for (auto &place : places) {
auto &dev_ctx = *pool.Get(place);
dev_ctx.Wait();
}
}

class ParallelDoOp : public framework::OperatorBase {
public:
ParallelDoOp(const std::string &type,
Expand All @@ -71,19 +81,30 @@ class ParallelDoOp : public framework::OperatorBase {
auto *block = Attr<framework::BlockDesc *>(kParallelBlock);
auto *program = block->Program();

// TODO(tonyyang-svail): get places from input
std::vector<platform::Place> places;
places.emplace_back(platform::CPUPlace());
places.emplace_back(platform::CPUPlace());
auto &places = scope.FindVar(Input(kPlaces))->Get<platform::PlaceList>();

auto &sub_scopes = *scope.FindVar(Output(kParallelScopes))
->GetMutable<std::vector<framework::Scope *>>();
for (size_t place_idx = 0; place_idx < places.size(); ++place_idx) {
sub_scopes.push_back(&scope.NewScope());
}

// split input
SplitTensorAndMoveTensorToScopes(scope, sub_scopes, places,
Inputs(kInputs));
// copy parameter
for (auto &param : Inputs(kParameters)) {
PADDLE_ENFORCE(scope.FindVar(param)->IsType<LoDTensor>(),
"Only support parameter type as LoDTensor");
auto &src = scope.FindVar(param)->Get<LoDTensor>();
for (size_t i = 0; i < places.size(); ++i) {
auto &place = places[i];
auto *sub_scope = sub_scopes[i];
auto *dst = sub_scope->Var(param)->GetMutable<LoDTensor>();
framework::Copy(src, place, dst);
}
}
WaitOnPlaces(places);

std::vector<std::future<void>> workers;
workers.reserve(places.size());
Expand All @@ -93,12 +114,6 @@ class ParallelDoOp : public framework::OperatorBase {
auto &place = places[place_idx];
auto *cur_scope = sub_scopes[place_idx];

// copy parameter
// some version of boost lacks != for boost::variant
if (!(dev_ctx.GetPlace() == place)) {
PADDLE_THROW("Not Implemented");
}

workers.emplace_back(framework::Async([program, cur_scope, place, block] {
framework::Executor executor(place);
executor.Run(*program, cur_scope, block->ID(),
Expand All @@ -108,6 +123,7 @@ class ParallelDoOp : public framework::OperatorBase {
for (auto &worker : workers) {
worker.wait();
}
WaitOnPlaces(places);

// merge output
for (auto &o_name : Outputs(kOutputs)) {
Expand All @@ -121,6 +137,7 @@ class ParallelDoOp : public framework::OperatorBase {
scope.FindVar(o_name)->GetMutable<LoDTensor>();
lod_tensor_to_be_merged->MergeLoDTensor(lod_tensors, dev_ctx.GetPlace());
}
WaitOnPlaces(places);
}
};

Expand Down Expand Up @@ -161,15 +178,14 @@ class ParallelDoGradOp : public OperatorBase {
auto &sub_scopes = scope.FindVar(Input(kParallelScopes))
->Get<std::vector<framework::Scope *>>();

// TODO(tonyyang-svail): get places from input
std::vector<platform::Place> places;
places.emplace_back(platform::CPUPlace());
places.emplace_back(platform::CPUPlace());
auto &places = scope.FindVar(Input(kPlaces))->Get<platform::PlaceList>();

// feed output@grad
SplitTensorAndMoveTensorToScopes(scope, sub_scopes, places,
Inputs(framework::GradVarName(kOutputs)));
WaitOnPlaces(places);

// for debugging
for (auto &s : Inputs(framework::GradVarName(kOutputs))) {
VLOG(3) << s;
VLOG(3) << scope.FindVar(s)->Get<LoDTensor>();
Expand All @@ -196,10 +212,11 @@ class ParallelDoGradOp : public OperatorBase {
for (auto &worker : workers) {
worker.wait();
}
WaitOnPlaces(places);

// merge grad
for (auto &s : Outputs(framework::GradVarName(kParameters))) {
VLOG(3) << s;
VLOG(3) << "merge grad " << s;

auto &t = sub_scopes[0]->FindVar(s)->Get<LoDTensor>();
VLOG(3) << t;
Expand All @@ -216,7 +233,8 @@ class ParallelDoGradOp : public OperatorBase {
auto sum_op = framework::OpRegistry::CreateOp(
"sum", {{"X", {s, s_buf}}}, {{"Out", {s}}},
framework::AttributeMap{});
sum_op->Run(*sub_scopes[0], place);
sum_op->Run(*sub_scopes[0], places[0]);
WaitOnPlaces(places);
}

VLOG(3) << t;
Expand All @@ -236,8 +254,10 @@ class ParallelDoGradOpDescMaker : public framework::SingleGradOpDescMaker {
for (auto &input_param : this->InputNames()) {
VLOG(3) << input_param;
grad->SetInput(input_param, this->Input(input_param));
grad->SetOutput(framework::GradVarName(input_param),
this->InputGrad(input_param, false));
if (input_param != kPlaces) {
grad->SetOutput(framework::GradVarName(input_param),
this->InputGrad(input_param, false));
}
}

for (auto &output_param : this->OutputNames()) {
Expand Down
2 changes: 1 addition & 1 deletion python/paddle/v2/fluid/tests/test_parallel_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def setUp(self):
append_batch_size=False,
stop_gradient=False)

places = fluid.default_main_program().global_block().create_var()
places = layers.get_places(device_count=4)
pd = layers.ParallelDo(places=places)

with pd.do():
Expand Down