-
Notifications
You must be signed in to change notification settings - Fork 5.7k
Implement selectedrows serialize and deserialize #7042
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
f91e769
842d881
7dd129f
bedbc14
cb6aeff
8ea49bd
57218ea
d279283
75aff5a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -152,6 +152,45 @@ void LoDTensor::ShrinkInLevel(size_t level, size_t elem_begin, | |
ShareDataWith(Slice(begin, end)); | ||
} | ||
|
||
void LoDTensor::SerializeToStream( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe we already have a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, that |
||
std::ostream &os, const platform::DeviceContext &dev_ctx) const { | ||
Tensor::SerializeToStream(os, dev_ctx); | ||
{ // serialize lod information | ||
// uint64_t lod_level | ||
// uint64_t lod_level_1 size in byte. | ||
// int* lod_level_1 data | ||
// ... | ||
auto lod = this->lod(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. lod information should never be Serialized, except for checkpoint(We will not do checkpoint at current stage).
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have a PR implement the checkpoint, but our current design cannot fit it well.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The conclusion is we will not touch that at the current stage. Leaving that in the future. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree with
So mabye serialize the Lod information is a better way. |
||
uint64_t size = lod.size(); | ||
os.write(reinterpret_cast<const char *>(&size), sizeof(size)); | ||
|
||
for (auto &each : lod) { | ||
size = each.size() * sizeof(framework::LoD::value_type::value_type); | ||
os.write(reinterpret_cast<const char *>(&size), sizeof(size)); | ||
os.write(reinterpret_cast<const char *>(each.data()), | ||
static_cast<std::streamsize>(size)); | ||
} | ||
} | ||
} | ||
|
||
void LoDTensor::DeserializeFromStream(std::istream &is) { | ||
Tensor::DeserializeFromStream(is); | ||
{ // read lod | ||
uint64_t lod_level; | ||
is.read(reinterpret_cast<char *>(&lod_level), sizeof(lod_level)); | ||
// auto &lod = this->mutable_lod(); | ||
this->lod_.resize(lod_level); | ||
for (uint64_t i = 0; i < lod_level; ++i) { | ||
uint64_t size; | ||
is.read(reinterpret_cast<char *>(&size), sizeof(size)); | ||
std::vector<size_t> tmp(size / sizeof(size_t)); | ||
is.read(reinterpret_cast<char *>(tmp.data()), | ||
static_cast<std::streamsize>(size)); | ||
this->lod_[i] = tmp; | ||
} | ||
} | ||
} | ||
|
||
using LoDAndOffset = std::pair<LoD, std::pair<size_t, size_t>>; | ||
LoDAndOffset GetSubLoDAndAbsoluteOffset(const LoD &lod, size_t start_idx, | ||
size_t end_idx, size_t start_level) { | ||
|
@@ -187,132 +226,5 @@ void AppendLoD(LoD *lod, const LoD &lod_length) { | |
} | ||
} | ||
|
||
void SerializeToStream(std::ostream &os, const LoDTensor &tensor, | ||
const platform::DeviceContext &dev_ctx) { | ||
// TODO(typhoonzero): serialize to ostream | ||
{ // the 1st field, uint32_t version | ||
constexpr uint32_t version = 0; | ||
os.write(reinterpret_cast<const char *>(&version), sizeof(version)); | ||
} | ||
{ // the 2nd field, tensor description | ||
// int32_t size | ||
// void* protobuf message | ||
proto::TensorDesc desc; | ||
desc.set_data_type(framework::ToDataType(tensor.type())); | ||
auto dims = framework::vectorize(tensor.dims()); | ||
auto *pb_dims = desc.mutable_dims(); | ||
pb_dims->Resize(static_cast<int>(dims.size()), 0); | ||
std::copy(dims.begin(), dims.end(), pb_dims->begin()); | ||
int32_t size = desc.ByteSize(); | ||
os.write(reinterpret_cast<const char *>(&size), sizeof(size)); | ||
auto out = desc.SerializeAsString(); | ||
os.write(out.data(), size); | ||
} | ||
{ // the 3rd field, tensor data | ||
uint64_t size = tensor.memory_size(); | ||
auto *data_ptr = tensor.data<void>(); | ||
PADDLE_ENFORCE(size < std::numeric_limits<std::streamsize>::max(), | ||
"Index overflow when writing tensor"); | ||
if (platform::is_gpu_place(tensor.place())) { | ||
#ifdef PADDLE_WITH_CUDA | ||
constexpr size_t kBufSize = 1024 * 1024 * 64; // 64MB | ||
std::unique_ptr<char[]> buf(new char[kBufSize]); | ||
auto &gpu_dev_ctx = | ||
static_cast<const platform::CUDADeviceContext &>(dev_ctx); | ||
platform::CPUPlace cpu; | ||
uintptr_t data = reinterpret_cast<uintptr_t>(data_ptr); | ||
while (size != 0) { | ||
size_t size_to_write = std::min(kBufSize, static_cast<size_t>(size)); | ||
memory::Copy(cpu, buf.get(), | ||
boost::get<platform::CUDAPlace>(tensor.place()), | ||
reinterpret_cast<const void *>(data), size_to_write, | ||
gpu_dev_ctx.stream()); | ||
gpu_dev_ctx.Wait(); | ||
os.write(buf.get(), size_to_write); | ||
data += size_to_write; | ||
size -= size_to_write; | ||
} | ||
#else | ||
PADDLE_THROW("Unexpected branch"); | ||
#endif | ||
} else { | ||
os.write(static_cast<const char *>(data_ptr), | ||
static_cast<std::streamsize>(size)); | ||
} | ||
} | ||
{ // the 4th field, lod information | ||
// uint64_t lod_level | ||
// uint64_t lod_level_1 size in byte. | ||
// int* lod_level_1 data | ||
// ... | ||
auto lod = tensor.lod(); | ||
uint64_t size = lod.size(); | ||
os.write(reinterpret_cast<const char *>(&size), sizeof(size)); | ||
|
||
for (auto &each : lod) { | ||
size = each.size() * sizeof(framework::LoD::value_type::value_type); | ||
os.write(reinterpret_cast<const char *>(&size), sizeof(size)); | ||
os.write(reinterpret_cast<const char *>(each.data()), | ||
static_cast<std::streamsize>(size)); | ||
} | ||
} | ||
} | ||
|
||
void DeserializeFromStream(std::istream &is, LoDTensor *tensor) { | ||
uint32_t version; | ||
is.read(reinterpret_cast<char *>(&version), sizeof(version)); | ||
PADDLE_ENFORCE_EQ(version, 0U, "Only version 0 is supported"); | ||
proto::TensorDesc desc; | ||
{ // int32_t size | ||
// proto buffer | ||
int32_t size; | ||
is.read(reinterpret_cast<char *>(&size), sizeof(size)); | ||
std::unique_ptr<char[]> buf(new char[size]); | ||
is.read(reinterpret_cast<char *>(buf.get()), size); | ||
PADDLE_ENFORCE(desc.ParseFromArray(buf.get(), size), | ||
"Cannot parse tensor desc"); | ||
} | ||
{ // read tensor | ||
std::vector<int64_t> dims; | ||
dims.reserve(static_cast<size_t>(desc.dims().size())); | ||
std::copy(desc.dims().begin(), desc.dims().end(), std::back_inserter(dims)); | ||
tensor->Resize(framework::make_ddim(dims)); | ||
|
||
void *buf; | ||
platform::Place cpu = platform::CPUPlace(); | ||
switch (desc.data_type()) { | ||
case proto::FP32: | ||
buf = tensor->mutable_data<float>(cpu); | ||
break; | ||
case proto::FP64: | ||
buf = tensor->mutable_data<double>(cpu); | ||
break; | ||
case proto::INT32: | ||
buf = tensor->mutable_data<int>(cpu); | ||
break; | ||
case proto::INT64: | ||
buf = tensor->mutable_data<int64_t>(cpu); | ||
break; | ||
default: | ||
PADDLE_THROW("DataType %d not supported", desc.data_type()); | ||
} | ||
is.read(static_cast<char *>(buf), tensor->memory_size()); | ||
} | ||
{ // read lod | ||
uint64_t lod_level; | ||
is.read(reinterpret_cast<char *>(&lod_level), sizeof(lod_level)); | ||
auto &lod = *tensor->mutable_lod(); | ||
lod.resize(lod_level); | ||
for (uint64_t i = 0; i < lod_level; ++i) { | ||
uint64_t size; | ||
is.read(reinterpret_cast<char *>(&size), sizeof(size)); | ||
std::vector<size_t> tmp(size / sizeof(size_t)); | ||
is.read(reinterpret_cast<char *>(tmp.data()), | ||
static_cast<std::streamsize>(size)); | ||
lod[i] = tmp; | ||
} | ||
} | ||
} | ||
|
||
} // namespace framework | ||
} // namespace paddle |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,5 +12,45 @@ limitations under the License. */ | |
#include "paddle/framework/selected_rows.h" | ||
|
||
namespace paddle { | ||
namespace framework {} // namespace framework | ||
namespace framework { | ||
void SelectedRows::SerializeToStream(std::ostream &os, | ||
const platform::DeviceContext &dev_ctx) { | ||
PADDLE_ENFORCE_NOT_NULL( | ||
value_, "serialize SelectedRows failed since Tensor is nullptr."); | ||
value_->SerializeToStream(os, dev_ctx); | ||
{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. version field There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It has also been serialized in the |
||
// serialize rows information | ||
uint64_t size = rows_.size(); | ||
os.write(reinterpret_cast<const char *>(&size), sizeof(size)); | ||
for (uint64_t i = 0; i < size; ++i) { | ||
os.write(reinterpret_cast<const char *>(&rows_[i]), sizeof(rows_[i])); | ||
} | ||
} | ||
{ | ||
// serialize height field | ||
os.write(reinterpret_cast<const char *>(&this->height_), sizeof(int64_t)); | ||
} | ||
} | ||
|
||
void SelectedRows::DeserializeFromStream(std::istream &is) { | ||
value_.reset(new Tensor()); | ||
value_->DeserializeFromStream(is); | ||
{ | ||
// deserialize rows information | ||
uint64_t size; | ||
is.read(reinterpret_cast<char *>(&size), sizeof(size)); | ||
rows_.resize(size); | ||
for (uint64_t i = 0; i < size; ++i) { | ||
int64_t tmp; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there any better way to read an int from istream? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done, delete the temporary variable |
||
is.read(reinterpret_cast<char *>(&tmp), sizeof(int64_t)); | ||
rows_[i] = tmp; | ||
} | ||
} | ||
{ | ||
// deserialize height field | ||
is.read(reinterpret_cast<char *>(&this->height_), sizeof(int64_t)); | ||
} | ||
} | ||
|
||
} // namespace framework | ||
} // namespace paddle |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -43,5 +43,19 @@ TEST_F(SelectedRowsTester, complete_dims) { | |
ASSERT_EQ(selected_rows_->GetCompleteDims(), make_ddim({10, 100})); | ||
} | ||
|
||
TEST_F(SelectedRowsTester, SerializeAndDeseralize) { | ||
SelectedRows dst_tensor; | ||
platform::CPUDeviceContext cpu_ctx((platform::CPUPlace())); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. delete () There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
std::ostringstream oss; | ||
|
||
selected_rows_->SerializeToStream(oss, cpu_ctx); | ||
|
||
std::istringstream iss(oss.str()); | ||
dst_tensor.DeserializeFromStream(iss); | ||
|
||
ASSERT_EQ(selected_rows_->rows(), dst_tensor.rows()); | ||
ASSERT_EQ(selected_rows_->height(), dst_tensor.height()); | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Need GPU Test and SerializeToStream should copy to CPU if element is in GPU There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add the test in tensor_util_test. |
||
} // namespace framework | ||
} // namespace paddle |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -99,6 +99,16 @@ class Tensor { | |
*/ | ||
inline Tensor Slice(int begin_idx, int end_idx) const; | ||
|
||
/* | ||
* Serialize/Desiralize Tensor to std::ostream | ||
* You can pass ofstream or ostringstream to serilize to file | ||
* or to a in memory string. GPU tensor will be copied to CPU. | ||
*/ | ||
inline void SerializeToStream(std::ostream& os, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Put it into tensor_util.h |
||
const platform::DeviceContext& dev_ctx) const; | ||
|
||
inline void DeserializeFromStream(std::istream& is); | ||
|
||
platform::Place place() const { | ||
PADDLE_ENFORCE_NOT_NULL( | ||
holder_, "Tensor not initialized yet when Tensor::place() is called."); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tensor is decoupled from proto. Why we need the link with framework_proto?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Serialize need the Tensor description, and serialize to a proto message is a simple way, so link the framework_proto.