Skip to content

Commit 1858cac

Browse files
【Hackathon No.93】增加 linux 下 cpu tensor file_descriptor 传输方案 (#369)
* 【Hackathon No.93】增加 linux 下 cpu tensor file_descriptor 传输方案 * Update 20230222_design_for_cpu_tensor_file_descriptor.md * Update 20230222_design_for_cpu_tensor_file_descriptor.md
1 parent 22f408e commit 1858cac

File tree

1 file changed

+324
-0
lines changed

1 file changed

+324
-0
lines changed
Lines changed: 324 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,324 @@
1+
# paddle cpu tensor file_descriptor 设计文档
2+
3+
增加 linux 下 cpu tensor file_descriptor 传输方案
4+
5+
| API名称 | 新增API名称 |
6+
| ------------ | ------------------------------------------------- |
7+
| 提交作者 | 核心已转储 |
8+
| 提交时间 | 2023-02-21 |
9+
| 版本号 | V1.0 |
10+
| 依赖飞桨版本 | develop |
11+
| 文件名 | 20230222_design_for_cpu_tensor_file_descriptor.md |
12+
13+
# 一、概述
14+
15+
## 1、相关背景
16+
17+
本文档,主要设计完善`paddle.multiprocessing`模块。通过自定义Tensor序列化、反序列化方式,使用共享内存技术,实现paddle 的cpu Tensor在进程间快速传输、共享。
18+
19+
## 2、功能目标
20+
21+
完善 `paddle.multiprocessing`模块,可在多进程间,方便快捷的传输Tensor。
22+
23+
## 3、意义
24+
25+
multiprocessing 是支持进程间 Tensor 传输的一种方式。
26+
27+
# 二、飞桨现状
28+
29+
- 可参考 [paddle.multiprocessing 设计文档](https://github.com/PaddlePaddle/Paddle/wiki/paddle进程间tensor传输设计文档-paddle.multiprocessing)
30+
- 目前 paddle 支持了 file_system 的 cpu 传输方式,以文档形式存储传输tensor 的中间态。file_descriptor 打开文件句柄之后立即删除,更加安全,不容易发生文件残留
31+
- #[37302](https://github.com/PaddlePaddle/Paddle/pull/37302) 初步支持了paddle的tensor进程间传输,需要继续完善
32+
33+
# 三、业内方案调研
34+
35+
在pytorch中,一旦张量或者存储被移动到共享单元(share_memory_),它可以不需要任何其他复制操作的发送到其他的进程中。
36+
37+
1. **文件描述符的传递**
38+
39+
```python
40+
def reduce_storage(storage):
41+
...
42+
# file_descriptor方案
43+
fd, size = storage._share_fd_cpu_()
44+
df = multiprocessing.reduction.DupFd(fd)
45+
46+
metadata = (df, size)
47+
rebuild = rebuild_storage_fd # type: ignore[assignment]
48+
...
49+
return (rebuild, (type(storage),) + metadata)
50+
51+
def rebuild_storage_fd(cls, df, size):
52+
fd = df.detach()
53+
try:
54+
...
55+
storage = cls._new_shared_fd_cpu(fd, size)
56+
...
57+
return storage
58+
finally:
59+
os.close(fd)
60+
```
61+
62+
通过C++接口申请共享内存,返回fd和size,由于直接在进程间传递fd并无意义,所以需要利用python.multiprocessing模块进行fd的传输
63+
64+
2. **序列化时**
65+
66+
```c++
67+
static PyObject* THPStorage_shareFd(PyObject* _self, PyObject* noargs) {
68+
auto self = (THPStorage*)_self;
69+
70+
c10::StorageImpl* storage = self->cdata;
71+
72+
// 类似paddle中的phi::Allocation
73+
at::MapAllocator* ctx;
74+
75+
if ((ctx = at::MapAllocator::fromDataPtr(storage->data_ptr()))) {
76+
// 数据已经在shmem中了
77+
} else {
78+
at::Storage new_storage(THPStorage_newFdStorage(storage->nbytes()));
79+
at::Storage _self_aten = torch::createStorage(_self);
80+
{
81+
// Copying into shared memory can be slow, so release the GIL
82+
pybind11::gil_scoped_release no_gil;
83+
storage_copy(new_storage, _self_aten);
84+
}
85+
86+
std::swap(*storage, *new_storage.unsafeGetStorageImpl());
87+
ctx = at::MapAllocator::fromDataPtr(storage->data_ptr());
88+
AT_ASSERT(ctx);
89+
}
90+
91+
// 伪代码:
92+
// return tuple(ctx->fd(), size);
93+
}
94+
```
95+
96+
3. **反序列化时**
97+
98+
```c++
99+
static PyObject* THPStorage_newSharedFd(PyObject* _unused, PyObject* args) {
100+
...
101+
PyObject* _tmp_fd = PyTuple_GET_ITEM(args, 0);
102+
PyObject* _size = PyTuple_GET_ITEM(args, 1);
103+
104+
int fd;
105+
int tmp_fd = (int)THPUtils_unpackLong(_tmp_fd);
106+
int64_t size = THPUtils_unpackLong(_size);
107+
if ((fd = dup(tmp_fd)) == -1) {
108+
THPUtils_setError("could not duplicate a shared memory file descriptor");
109+
return nullptr;
110+
}
111+
112+
int flags = at::ALLOCATOR_MAPPED_SHAREDMEM | at::ALLOCATOR_MAPPED_NOCREATE |
113+
at::ALLOCATOR_MAPPED_KEEPFD | at::ALLOCATOR_MAPPED_FROMFD;
114+
return THPStorage_New(c10::make_intrusive<at::StorageImpl>(
115+
c10::StorageImpl::use_byte_size_t(),
116+
size,
117+
at::MapAllocator::makeDataPtr(at::WITH_FD, "", fd, flags, size, nullptr),
118+
/*allocator=*/nullptr,
119+
/*resizable=*/false));
120+
END_HANDLE_TH_ERRORS
121+
}
122+
```
123+
124+
# 四、对比分析
125+
126+
计划采用和pytorch类似的方案,实现file_descriptor策略
127+
128+
# 五、设计思路与实现方案
129+
130+
## 命名与参数设计
131+
132+
参考:[飞桨API 设计及命名规范](https://www.paddlepaddle.org.cn/documentation/docs/zh/develop/dev_guides/api_contributing_guides/api_design_guidelines_standard_cn.html)
133+
134+
## API实现方案
135+
136+
1. 在python/paddle/incubate/multiprocessing/__init__.py中增加set_sharing_strategy和get_sharing_strategy
137+
138+
```python
139+
if sys.platform == 'darwin' or sys.platform == 'win32':
140+
_sharing_strategy = 'file_system'
141+
else:
142+
_sharing_strategy = 'file_descriptor'
143+
144+
def set_sharing_strategy(sharing_strategy):
145+
if sharing_strategy != "file_descriptor" and sharing_strategy != "file_system":
146+
raise RuntimeError("We only support file_system mode and file_descriptor mode")
147+
else:
148+
_sharing_strategy = sharing_strategy
149+
150+
def get_sharing_strategy():
151+
return _sharing_strategy;
152+
```
153+
154+
2. 增加python/paddle/incubate/multiprocessing/reductions.py中的_reduce_lodtensor_fd和_rebuild_lodtensor_fd,支持利用fd进行序列化。_reduce_lodtensor将根据共享策略选择序列化方法
155+
156+
```python
157+
def _reduce_lodtensor(lodtensor):
158+
if get_sharing_strategy() == "file_descriptor":
159+
return _reduce_lodtensor_fd(lodtensor)
160+
else:
161+
return _reduce_lodtensor_fs(lodtensor)
162+
163+
def _reduce_lodtensor_fs(lodtensor):
164+
# 原来 file_system 的函数
165+
166+
def _reduce_lodtensor_fd(lodtensor):
167+
if (lodtensor._place().is_cpu_place()):
168+
for dim in lodtensor.shape():
169+
if dim == 0:
170+
# Empty tensors have nothing be mmapped.
171+
return (_rebuild_lodtensor_empty, (type(lodtensor),))
172+
173+
metadata = (lodtensor._share_file_descriptor()) # fd, size, type_idx, dims, lod
174+
metadata[0] = multiprocessing.reduction.DupFd(metadata[0]) # 利用multiprocessing传输fd
175+
else:
176+
raise RuntimeError("We only support pass cpu lodtensor using file_descriptor stratege for now!")
177+
178+
return (_rebuild_lodtensor_fd, (type(lodtensor),) + metadata)
179+
180+
181+
def _rebuild_lodtensor_fd(cls, df, size, type_idx, dims, lod):
182+
fd = df.detach()
183+
lodtensor = cls._new_file_descriptor((fd, size, type_idx, dims, lod))
184+
os.close(fd)
185+
return lodtensor
186+
```
187+
188+
3. 在paddle/fuild/memory/allocation/mmap_allocator.h和mmap_allocator.cc中修改MemoryMapAllocation
189+
190+
```c++
191+
class MemoryMapAllocation : public Allocation {
192+
public:
193+
...
194+
inline const int &fd() const { return fd_; }
195+
...
196+
};
197+
198+
void AllocateMemoryMap(
199+
std::string filename, int flags, size_t size, void **map_ptr_, int *fd_) {
200+
...
201+
// 无论采用FD还是FS的传输策略, shm_open的步骤都是一样的:
202+
if (flags & MAPPED_SHAREDMEM) {
203+
fd = shm_open(filename.c_str(), file_flags, (mode_t)0600);
204+
PADDLE_ENFORCE_NE(
205+
fd,
206+
-1,
207+
platform::errors::Unavailable(
208+
"File descriptor %s open failed, unable in read-write mode",
209+
filename.c_str()));
210+
VLOG(6) << "shm_open: " << filename;
211+
}
212+
213+
...
214+
// 基于fd传输的策略, 需要设置MAPPED_KEEPFD的标志位
215+
...
216+
if (flags & MAPPED_FROMFD) {
217+
PADDLE_ENFORCE_NE(shm_unlink(filename);,
218+
-1,
219+
platform::errors::Unavailable(
220+
"Could not unlink the shared memory file <", filename, ">"));
221+
}
222+
}
223+
224+
// 序列化时fd为-1, 这时申请一块shmem
225+
// 反序列化时fd为通过传输获得的, 这时直接mmap就行
226+
std::shared_ptr<MemoryMapAllocation>
227+
AllocateMemoryMapAllocationAndUnlink(int flags,
228+
size_t size,
229+
int fd) {
230+
void *ptr = nullptr;
231+
if (-1 == fd) {
232+
std::string handle = memory::allocation::GetIPCName();
233+
AllocateMemoryMap(handle, flags, size, &ptr, &fd);
234+
} else {
235+
AllocateMemoryMap("", flags, size, &ptr, &fd);
236+
}
237+
// 构造1个shmem的wapper
238+
return std::make_shared<MemoryMapAllocation>(
239+
ptr, size, "", flags, fd);
240+
}
241+
```
242+
243+
4. 在paddle/fuild/pybind/tensor.cc中增加_share_file_descriptor和_new_file_descriptor的绑定,用于序列化和反序列化,传递fd
244+
245+
* shm_open获得句柄后,立即shm_unlink删除inode,多进程间传输句柄。
246+
* 实际上,这个内存段直到访问它的所有进程都退出时才会删除
247+
* 句柄全部关闭后,文件系统释放存储
248+
249+
```c++
250+
.def("_share_file_descriptor",
251+
[](phi::DenseTensor &self) {
252+
...
253+
auto *mmap_allocation = dynamic_cast<
254+
memory::allocation::MemoryMapAllocation *>(
255+
holder.get());
256+
257+
// 如果这个tensor已经被共享过, 就可以直接返回它的metadata, 否则, 要在shmem上新开辟一块地方
258+
if (mmap_allocation == nullptr) {
259+
...
260+
int flags = memory::allocation::MAPPED_SHAREDMEM |
261+
memory::allocation::MAPPED_EXCLUSIVE|
262+
memory::allocation::MAPPED_FROMFD|
263+
memory::allocation::MAPPED_KEEPFD;
264+
265+
auto shared_holder =
266+
memory::allocation::AllocateMemoryMapAllocationAndUnlink(
267+
flags, data_size, -1);
268+
269+
memory::Copy(platform::CPUPlace(), shared_holder->ptr(),
270+
platform::CPUPlace(), data_ptr, data_size);
271+
self.ResetHolder(shared_holder);
272+
mmap_allocation = shared_holder.get();
273+
}
274+
...
275+
276+
return py::make_tuple(mmap_allocation->fd(),
277+
mmap_allocation->size(), type_idx,
278+
vectorize(self.dims()), self.lod());
279+
},
280+
.def("_new_shared_file_descriptor",
281+
[](py::tuple t) {
282+
...
283+
phi::DenseTensor tensor;
284+
285+
const int &fd = t[0].cast<int>();
286+
size_t size = t[1].cast<size_t>();
287+
int flags = memory::allocation::MAPPED_SHAREDMEM |
288+
memory::allocation::MAPPED_NOCREATE|
289+
memory::allocation::MAPPED_FROMFD;
290+
291+
auto shared_holder =
292+
memory::allocation::AllocateMemoryMapAllocationAndUnlink(
293+
flags, size, fd);
294+
295+
tensor.ResetHolderWithType(
296+
shared_holder,
297+
static_cast<paddle::experimental::DataType>(t[2].cast<int>()));
298+
tensor.Resize(phi::make_ddim(t[3].cast<std::vector<int>>()));
299+
tensor.set_lod(t[4].cast<framework::LoD>());
300+
301+
return tensor;
302+
},
303+
```
304+
305+
# 六、测试和验收的考量
306+
307+
测试考虑的case如下:
308+
309+
1. 测试api是否可以正确执行
310+
2. 测试tensor是否可以被正确共享,数值计算是否正确
311+
3. 测试是否发生文件残留
312+
4. 输入Tensor的`dtype``float32``float64``int32``int64`等类型时的结果正确性;
313+
314+
# 七、可行性分析和排期规划
315+
316+
工期上可以满足在当前版本周期内开发完成。
317+
318+
# 八、影响面
319+
320+
为独立新增API,对其他模块没有影响
321+
322+
# 名词解释
323+
324+
# 附件及参考资料

0 commit comments

Comments
 (0)