Skip to content

Implement selectedrows serialize and deserialize #7042

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Dec 28, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion paddle/framework/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ cc_library(ddim SRCS ddim.cc DEPS eigen3)
cc_test(ddim_test SRCS ddim_test.cc DEPS ddim)
nv_test(dim_test SRCS dim_test.cu DEPS ddim)

cc_library(tensor SRCS tensor.cc DEPS ddim place paddle_memory device_context)
cc_library(tensor SRCS tensor.cc DEPS ddim place paddle_memory device_context framework_proto)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tensor is decoupled from proto. Why we need the link with framework_proto?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Serialize need the Tensor description, and serialize to a proto message is a simple way, so link the framework_proto.


cc_test(tensor_test SRCS tensor_test.cc DEPS tensor)
cc_test(tensor_util_test SRCS tensor_util_test.cc DEPS tensor)
Expand Down
166 changes: 39 additions & 127 deletions paddle/framework/lod_tensor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,45 @@ void LoDTensor::ShrinkInLevel(size_t level, size_t elem_begin,
ShareDataWith(Slice(begin, end));
}

void LoDTensor::SerializeToStream(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we already have a SerializeToStream interface. Furthermore, SerializeToStream should be a global function. Tensor and LoDTensor only describe a math computing library.

Copy link
Contributor Author

@Yancey0623 Yancey0623 Dec 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that SerializeToStream only support LoDTensor, and I believe make SerializeToStream which supports all kinds of variable as a global interface is a good choice as the discussion just now.

std::ostream &os, const platform::DeviceContext &dev_ctx) const {
Tensor::SerializeToStream(os, dev_ctx);
{ // serialize lod information
// uint64_t lod_level
// uint64_t lod_level_1 size in byte.
// int* lod_level_1 data
// ...
auto lod = this->lod();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lod information should never be Serialized, except for checkpoint(We will not do checkpoint at current stage).

  1. In save the model, lod information is useless.
  2. In sending Tensor to ParameterServer, optimizer doesn't need the lod information.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a PR implement the checkpoint, but our current design cannot fit it well.

  1. checkpoint must be Async Runnable.
  2. checkpoint need specify the synchronize parameters from GPUs, nodes.
  3. Will checkpoint snapshot persistable object in Scope? How about Communicator in NCCL?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The conclusion is we will not touch that at the current stage. Leaving that in the future.

Copy link
Contributor Author

@Yancey0623 Yancey0623 Dec 27, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with checkpoint will be left in the future, and we use the Serialize in saving the model and send/recv Op.

lod information should never be Serialized

  • It's not used in data parallelism, but maybe used in model parallelism.
  • send/recv is a general Op, so I belive it will send/recv all kinds of variables which all information.

So mabye serialize the Lod information is a better way.

uint64_t size = lod.size();
os.write(reinterpret_cast<const char *>(&size), sizeof(size));

for (auto &each : lod) {
size = each.size() * sizeof(framework::LoD::value_type::value_type);
os.write(reinterpret_cast<const char *>(&size), sizeof(size));
os.write(reinterpret_cast<const char *>(each.data()),
static_cast<std::streamsize>(size));
}
}
}

void LoDTensor::DeserializeFromStream(std::istream &is) {
Tensor::DeserializeFromStream(is);
{ // read lod
uint64_t lod_level;
is.read(reinterpret_cast<char *>(&lod_level), sizeof(lod_level));
// auto &lod = this->mutable_lod();
this->lod_.resize(lod_level);
for (uint64_t i = 0; i < lod_level; ++i) {
uint64_t size;
is.read(reinterpret_cast<char *>(&size), sizeof(size));
std::vector<size_t> tmp(size / sizeof(size_t));
is.read(reinterpret_cast<char *>(tmp.data()),
static_cast<std::streamsize>(size));
this->lod_[i] = tmp;
}
}
}

using LoDAndOffset = std::pair<LoD, std::pair<size_t, size_t>>;
LoDAndOffset GetSubLoDAndAbsoluteOffset(const LoD &lod, size_t start_idx,
size_t end_idx, size_t start_level) {
Expand Down Expand Up @@ -187,132 +226,5 @@ void AppendLoD(LoD *lod, const LoD &lod_length) {
}
}

void SerializeToStream(std::ostream &os, const LoDTensor &tensor,
const platform::DeviceContext &dev_ctx) {
// TODO(typhoonzero): serialize to ostream
{ // the 1st field, uint32_t version
constexpr uint32_t version = 0;
os.write(reinterpret_cast<const char *>(&version), sizeof(version));
}
{ // the 2nd field, tensor description
// int32_t size
// void* protobuf message
proto::TensorDesc desc;
desc.set_data_type(framework::ToDataType(tensor.type()));
auto dims = framework::vectorize(tensor.dims());
auto *pb_dims = desc.mutable_dims();
pb_dims->Resize(static_cast<int>(dims.size()), 0);
std::copy(dims.begin(), dims.end(), pb_dims->begin());
int32_t size = desc.ByteSize();
os.write(reinterpret_cast<const char *>(&size), sizeof(size));
auto out = desc.SerializeAsString();
os.write(out.data(), size);
}
{ // the 3rd field, tensor data
uint64_t size = tensor.memory_size();
auto *data_ptr = tensor.data<void>();
PADDLE_ENFORCE(size < std::numeric_limits<std::streamsize>::max(),
"Index overflow when writing tensor");
if (platform::is_gpu_place(tensor.place())) {
#ifdef PADDLE_WITH_CUDA
constexpr size_t kBufSize = 1024 * 1024 * 64; // 64MB
std::unique_ptr<char[]> buf(new char[kBufSize]);
auto &gpu_dev_ctx =
static_cast<const platform::CUDADeviceContext &>(dev_ctx);
platform::CPUPlace cpu;
uintptr_t data = reinterpret_cast<uintptr_t>(data_ptr);
while (size != 0) {
size_t size_to_write = std::min(kBufSize, static_cast<size_t>(size));
memory::Copy(cpu, buf.get(),
boost::get<platform::CUDAPlace>(tensor.place()),
reinterpret_cast<const void *>(data), size_to_write,
gpu_dev_ctx.stream());
gpu_dev_ctx.Wait();
os.write(buf.get(), size_to_write);
data += size_to_write;
size -= size_to_write;
}
#else
PADDLE_THROW("Unexpected branch");
#endif
} else {
os.write(static_cast<const char *>(data_ptr),
static_cast<std::streamsize>(size));
}
}
{ // the 4th field, lod information
// uint64_t lod_level
// uint64_t lod_level_1 size in byte.
// int* lod_level_1 data
// ...
auto lod = tensor.lod();
uint64_t size = lod.size();
os.write(reinterpret_cast<const char *>(&size), sizeof(size));

for (auto &each : lod) {
size = each.size() * sizeof(framework::LoD::value_type::value_type);
os.write(reinterpret_cast<const char *>(&size), sizeof(size));
os.write(reinterpret_cast<const char *>(each.data()),
static_cast<std::streamsize>(size));
}
}
}

void DeserializeFromStream(std::istream &is, LoDTensor *tensor) {
uint32_t version;
is.read(reinterpret_cast<char *>(&version), sizeof(version));
PADDLE_ENFORCE_EQ(version, 0U, "Only version 0 is supported");
proto::TensorDesc desc;
{ // int32_t size
// proto buffer
int32_t size;
is.read(reinterpret_cast<char *>(&size), sizeof(size));
std::unique_ptr<char[]> buf(new char[size]);
is.read(reinterpret_cast<char *>(buf.get()), size);
PADDLE_ENFORCE(desc.ParseFromArray(buf.get(), size),
"Cannot parse tensor desc");
}
{ // read tensor
std::vector<int64_t> dims;
dims.reserve(static_cast<size_t>(desc.dims().size()));
std::copy(desc.dims().begin(), desc.dims().end(), std::back_inserter(dims));
tensor->Resize(framework::make_ddim(dims));

void *buf;
platform::Place cpu = platform::CPUPlace();
switch (desc.data_type()) {
case proto::FP32:
buf = tensor->mutable_data<float>(cpu);
break;
case proto::FP64:
buf = tensor->mutable_data<double>(cpu);
break;
case proto::INT32:
buf = tensor->mutable_data<int>(cpu);
break;
case proto::INT64:
buf = tensor->mutable_data<int64_t>(cpu);
break;
default:
PADDLE_THROW("DataType %d not supported", desc.data_type());
}
is.read(static_cast<char *>(buf), tensor->memory_size());
}
{ // read lod
uint64_t lod_level;
is.read(reinterpret_cast<char *>(&lod_level), sizeof(lod_level));
auto &lod = *tensor->mutable_lod();
lod.resize(lod_level);
for (uint64_t i = 0; i < lod_level; ++i) {
uint64_t size;
is.read(reinterpret_cast<char *>(&size), sizeof(size));
std::vector<size_t> tmp(size / sizeof(size_t));
is.read(reinterpret_cast<char *>(tmp.data()),
static_cast<std::streamsize>(size));
lod[i] = tmp;
}
}
}

} // namespace framework
} // namespace paddle
13 changes: 4 additions & 9 deletions paddle/framework/lod_tensor.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ class LoDTensor : public Tensor {
*/
void ShrinkInLevel(size_t level, size_t elem_begin, size_t elem_end);

void SerializeToStream(std::ostream& os,
const platform::DeviceContext& dev_ctx) const;
void DeserializeFromStream(std::istream& is);

private:
LoD lod_;
};
Expand Down Expand Up @@ -201,14 +205,5 @@ std::pair<LoD, std::pair<size_t, size_t>> GetSubLoDAndAbsoluteOffset(

void AppendLoD(LoD* lod, const LoD& lod_length);

/*
* Serialize/Desiralize LoDTensor to std::ostream
* You can pass ofstream or ostringstream to serilize to file
* or to a in memory string. GPU tensor will be copied to CPU.
*/
void SerializeToStream(std::ostream& os, const LoDTensor& tensor,
const platform::DeviceContext& dev_ctx);
void DeserializeFromStream(std::istream& is, LoDTensor* tensor);

} // namespace framework
} // namespace paddle
15 changes: 15 additions & 0 deletions paddle/framework/lod_tensor_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,21 @@ TEST_F(LoDTensorTester, ShrinkInLevel) {
EXPECT_NE(t1.data<float>(), lod_tensor_.data<float>());
}

TEST_F(LoDTensorTester, SerializeAndDeserialize) {
LoDTensor dst_tensor;
platform::CPUDeviceContext cpu_ctx((platform::CPUPlace()));
std::ostringstream oss;
lod_tensor_.SerializeToStream(oss, cpu_ctx);

std::istringstream iss(oss.str());
dst_tensor.DeserializeFromStream(iss);
float* dst_ptr = dst_tensor.mutable_data<float>(platform::CPUPlace());
for (int i = 0; i < kLodTensorSize; ++i) {
EXPECT_EQ(dst_ptr[i], i);
}
EXPECT_EQ(dst_tensor.lod(), lod_tensor_.lod());
}

TEST(LodExpand, test) {
LoD lod{{0, 2}};
LoDTensor tensor;
Expand Down
42 changes: 41 additions & 1 deletion paddle/framework/selected_rows.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,45 @@ limitations under the License. */
#include "paddle/framework/selected_rows.h"

namespace paddle {
namespace framework {} // namespace framework
namespace framework {
void SelectedRows::SerializeToStream(std::ostream &os,
const platform::DeviceContext &dev_ctx) {
PADDLE_ENFORCE_NOT_NULL(
value_, "serialize SelectedRows failed since Tensor is nullptr.");
value_->SerializeToStream(os, dev_ctx);
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

version field

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has also been serialized in the Tensor::SerializeToStream .

// serialize rows information
uint64_t size = rows_.size();
os.write(reinterpret_cast<const char *>(&size), sizeof(size));
for (uint64_t i = 0; i < size; ++i) {
os.write(reinterpret_cast<const char *>(&rows_[i]), sizeof(rows_[i]));
}
}
{
// serialize height field
os.write(reinterpret_cast<const char *>(&this->height_), sizeof(int64_t));
}
}

void SelectedRows::DeserializeFromStream(std::istream &is) {
value_.reset(new Tensor());
value_->DeserializeFromStream(is);
{
// deserialize rows information
uint64_t size;
is.read(reinterpret_cast<char *>(&size), sizeof(size));
rows_.resize(size);
for (uint64_t i = 0; i < size; ++i) {
int64_t tmp;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any better way to read an int from istream?
seems tmp not a good choice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, delete the temporary variable tmp.

is.read(reinterpret_cast<char *>(&tmp), sizeof(int64_t));
rows_[i] = tmp;
}
}
{
// deserialize height field
is.read(reinterpret_cast<char *>(&this->height_), sizeof(int64_t));
}
}

} // namespace framework
} // namespace paddle
5 changes: 5 additions & 0 deletions paddle/framework/selected_rows.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ class SelectedRows {
return make_ddim(dims);
}

void SerializeToStream(std::ostream& os,
const platform::DeviceContext& dev_ctx);

void DeserializeFromStream(std::istream& is);

private:
// Notice: rows can be duplicate. We can have {0, 4, 7, 0, 5, 7, 9} here.
// SelectedRows are simplely concated when adding together. Until a
Expand Down
14 changes: 14 additions & 0 deletions paddle/framework/selected_rows_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,19 @@ TEST_F(SelectedRowsTester, complete_dims) {
ASSERT_EQ(selected_rows_->GetCompleteDims(), make_ddim({10, 100}));
}

TEST_F(SelectedRowsTester, SerializeAndDeseralize) {
SelectedRows dst_tensor;
platform::CPUDeviceContext cpu_ctx((platform::CPUPlace()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete ()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

std::ostringstream oss;

selected_rows_->SerializeToStream(oss, cpu_ctx);

std::istringstream iss(oss.str());
dst_tensor.DeserializeFromStream(iss);

ASSERT_EQ(selected_rows_->rows(), dst_tensor.rows());
ASSERT_EQ(selected_rows_->height(), dst_tensor.height());
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need GPU Test and SerializeToStream should copy to CPU if element is in GPU

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add the test in tensor_util_test.

} // namespace framework
} // namespace paddle
10 changes: 10 additions & 0 deletions paddle/framework/tensor.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,16 @@ class Tensor {
*/
inline Tensor Slice(int begin_idx, int end_idx) const;

/*
* Serialize/Desiralize Tensor to std::ostream
* You can pass ofstream or ostringstream to serilize to file
* or to a in memory string. GPU tensor will be copied to CPU.
*/
inline void SerializeToStream(std::ostream& os,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put it into tensor_util.h

const platform::DeviceContext& dev_ctx) const;

inline void DeserializeFromStream(std::istream& is);

platform::Place place() const {
PADDLE_ENFORCE_NOT_NULL(
holder_, "Tensor not initialized yet when Tensor::place() is called.");
Expand Down
Loading