From 2809bc723f1019124dd95ebad58b177594d5267d Mon Sep 17 00:00:00 2001 From: Wang Huan Date: Fri, 14 Oct 2022 10:26:55 +0000 Subject: [PATCH 01/16] eager tensor support pickler --- .../paddle/incubate/multiprocessing/reductions.py | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/python/paddle/incubate/multiprocessing/reductions.py b/python/paddle/incubate/multiprocessing/reductions.py index c54626175bc7db..8582b08f188aaa 100644 --- a/python/paddle/incubate/multiprocessing/reductions.py +++ b/python/paddle/incubate/multiprocessing/reductions.py @@ -83,10 +83,10 @@ def cuda_from_cache(key): def rebuild_tensor(cls, lodtensor, metadata): - if cls == paddle.fluid.framework.ParamBase: - tensor = paddle.fluid.framework.ParamBase(lodtensor.shape(), - lodtensor._dtype(), - **metadata) + if cls == paddle.fluid.framework.EagerParamBase: + tensor = paddle.fluid.framework.EagerParamBase(lodtensor.shape(), + lodtensor._dtype(), + **metadata) tensor.value().get_tensor()._share_data_with(lodtensor) else: size, stop_gradient = metadata @@ -109,7 +109,7 @@ def reduce_tensor(tensor): # TODO: add serializing name and hooks check if tensor.place.is_cpu_place() or tensor.place.is_gpu_place( ) or tensor.place.is_cuda_pinned_place(): - if type(tensor) == paddle.fluid.framework.ParamBase: + if type(tensor) == paddle.fluid.framework.EagerParamBase: metadata = copy.deepcopy(tensor.__dict__) else: metadata = (tensor.size, tensor.stop_gradient) @@ -182,6 +182,7 @@ def init_reductions(): return ForkingPickler.register(paddle.Tensor, reduce_tensor) - ForkingPickler.register(paddle.fluid.core.VarBase, reduce_tensor) - ForkingPickler.register(paddle.fluid.framework.ParamBase, reduce_tensor) + ForkingPickler.register(paddle.fluid.core.eager.Tensor, reduce_tensor) + ForkingPickler.register(paddle.fluid.framework.EagerParamBase, + reduce_tensor) ForkingPickler.register(paddle.fluid.core.LoDTensor, reduce_lodtensor) From 771cd067ab0e2013128896928d20db6002981a72 Mon Sep 17 00:00:00 2001 From: Wang Huan Date: Mon, 17 Oct 2022 01:21:00 +0000 Subject: [PATCH 02/16] refine --- .../unittests/test_paddle_multiprocessing.py | 20 ------------------- .../incubate/multiprocessing/reductions.py | 2 +- 2 files changed, 1 insertion(+), 21 deletions(-) diff --git a/python/paddle/fluid/tests/unittests/test_paddle_multiprocessing.py b/python/paddle/fluid/tests/unittests/test_paddle_multiprocessing.py index 8f4131c8d66400..8db33aae9dd7ae 100644 --- a/python/paddle/fluid/tests/unittests/test_paddle_multiprocessing.py +++ b/python/paddle/fluid/tests/unittests/test_paddle_multiprocessing.py @@ -18,16 +18,12 @@ import time import paddle import paddle.incubate.multiprocessing as mp -from paddle.fluid.framework import _enable_legacy_dygraph, _test_eager_guard, in_dygraph_mode REPEAT = 20 HAS_SHM_FILES = os.path.isdir('/dev/shm') def fill_tensor(queue, event): - # make sure run in legacy dygraph - if in_dygraph_mode(): - _enable_legacy_dygraph() data = queue.get() with paddle.no_grad(): data[0][:] = 5 @@ -182,36 +178,24 @@ def test_receive(): class TestMultiprocessingCpu(TestMultiprocessingBase): def func_test_pass_tensor(self): - if in_dygraph_mode(): - return paddle.set_device("cpu") self._test_sharing(repeat=REPEAT) def test_pass_tensor(self): - with _test_eager_guard(): - self.func_test_pass_tensor() self.func_test_pass_tensor() def func_test_pass_parambase(self): - if in_dygraph_mode(): - return paddle.set_device("cpu") self._test_sharing(repeat=1, param=True) def test_pass_parambase(self): - with _test_eager_guard(): - self.func_test_pass_parambase() self.func_test_pass_parambase() def func_test_pass_empty(self): - if in_dygraph_mode(): - return paddle.set_device("cpu") self._test_empty() def test_pass_empty(self): - with _test_eager_guard(): - self.func_test_pass_empty() self.func_test_pass_empty() @@ -220,14 +204,10 @@ class TestMultiprocessingGpu(TestMultiprocessingBase): @unittest.skipIf(not paddle.fluid.core.is_compiled_with_cuda(), "core is not compiled with CUDA") def func_test_pass_tensor(self): - if in_dygraph_mode(): - return paddle.set_device("gpu") self._test_sharing(mp.get_context("spawn"), "gpu") def test_pass_tensor(self): - with _test_eager_guard(): - self.func_test_pass_tensor() self.func_test_pass_tensor() diff --git a/python/paddle/incubate/multiprocessing/reductions.py b/python/paddle/incubate/multiprocessing/reductions.py index 8582b08f188aaa..4035ff5668f37a 100644 --- a/python/paddle/incubate/multiprocessing/reductions.py +++ b/python/paddle/incubate/multiprocessing/reductions.py @@ -90,7 +90,7 @@ def rebuild_tensor(cls, lodtensor, metadata): tensor.value().get_tensor()._share_data_with(lodtensor) else: size, stop_gradient = metadata - tensor = paddle.fluid.core.VarBase() + tensor = paddle.fluid.core.eager.Tensor() if lodtensor._is_initialized(): tensor.value().get_tensor()._share_data_with(lodtensor) else: From 9c0a403fa57991bf9f2b95ba4df8dcd657f947da Mon Sep 17 00:00:00 2001 From: Wang Huan Date: Mon, 17 Oct 2022 10:57:20 +0000 Subject: [PATCH 03/16] refine --- python/paddle/__init__.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/python/paddle/__init__.py b/python/paddle/__init__.py index b9801428201e6d..6e1268fa87450e 100755 --- a/python/paddle/__init__.py +++ b/python/paddle/__init__.py @@ -669,3 +669,7 @@ 'take', 'frexp', ] + +from .incubate.multiprocessing.reductions import init_reductions + +init_reductions() From 749c4b16b5a0281b9f80a03592e4b69542b96859 Mon Sep 17 00:00:00 2001 From: Wang Huan Date: Tue, 18 Oct 2022 01:49:32 +0000 Subject: [PATCH 04/16] refine --- python/paddle/__init__.py | 4 ---- python/paddle/incubate/__init__.py | 4 ++++ python/paddle/incubate/multiprocessing/__init__.py | 3 --- 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/python/paddle/__init__.py b/python/paddle/__init__.py index 6e1268fa87450e..b9801428201e6d 100755 --- a/python/paddle/__init__.py +++ b/python/paddle/__init__.py @@ -669,7 +669,3 @@ 'take', 'frexp', ] - -from .incubate.multiprocessing.reductions import init_reductions - -init_reductions() diff --git a/python/paddle/incubate/__init__.py b/python/paddle/incubate/__init__.py index 543b0b815c16ee..787551415150b8 100644 --- a/python/paddle/incubate/__init__.py +++ b/python/paddle/incubate/__init__.py @@ -55,3 +55,7 @@ 'segment_min', 'identity_loss', ] + +from .multiprocessing.reductions import init_reductions + +init_reductions() diff --git a/python/paddle/incubate/multiprocessing/__init__.py b/python/paddle/incubate/multiprocessing/__init__.py index 27c23be3a89411..088a7ec79334bc 100644 --- a/python/paddle/incubate/multiprocessing/__init__.py +++ b/python/paddle/incubate/multiprocessing/__init__.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from .reductions import init_reductions import multiprocessing __all__ = [] @@ -23,5 +22,3 @@ # Only support linux for now # Only support file_system sharing strategy. - -init_reductions() From 0eedb4f27ddb43a378519814f07b09a1e2e60371 Mon Sep 17 00:00:00 2001 From: Wang Huan Date: Mon, 24 Oct 2022 12:29:45 +0000 Subject: [PATCH 05/16] refine --- python/paddle/fluid/tests/custom_runtime/CMakeLists.txt | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/python/paddle/fluid/tests/custom_runtime/CMakeLists.txt b/python/paddle/fluid/tests/custom_runtime/CMakeLists.txt index 367d1e6399032f..c78aeae5dd89ec 100644 --- a/python/paddle/fluid/tests/custom_runtime/CMakeLists.txt +++ b/python/paddle/fluid/tests/custom_runtime/CMakeLists.txt @@ -24,8 +24,11 @@ if(WITH_CUSTOM_DEVICE AND NOT WITH_GPU) PLUGIN_URL=${PLUGIN_URL} PLUGIN_TAG=${PLUGIN_TAG}) - set_tests_properties(test_custom_cpu_plugin PROPERTIES TIMEOUT 120) - set_tests_properties(test_custom_cpu_profiler_plugin PROPERTIES TIMEOUT 120) + set_tests_properties(test_custom_cpu_plugin + PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE" TIMEOUT 120) + set_tests_properties(test_custom_cpu_profiler_plugin + PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE" TIMEOUT 120) set_tests_properties(test_fleet_launch_custom_device PROPERTIES TIMEOUT 120) - set_tests_properties(test_custom_cpu_to_static PROPERTIES TIMEOUT 120) + set_tests_properties(test_custom_cpu_to_static + PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE" TIMEOUT 120) endif() From ab77b2958c2141db5b8244adc811cf151a42ed25 Mon Sep 17 00:00:00 2001 From: Wang Huan Date: Tue, 25 Oct 2022 02:03:19 +0000 Subject: [PATCH 06/16] refine --- python/paddle/fluid/dataloader/dataloader_iter.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/python/paddle/fluid/dataloader/dataloader_iter.py b/python/paddle/fluid/dataloader/dataloader_iter.py index 83d95c479250ef..7d848ed787f962 100644 --- a/python/paddle/fluid/dataloader/dataloader_iter.py +++ b/python/paddle/fluid/dataloader/dataloader_iter.py @@ -691,6 +691,11 @@ def _get_data(self): if self._thread_done_event.is_set(): continue + # get(timeout) will call _poll(timeout) and may raise IOError + if isinstance(e, queue.Empty) or isinstance(e, IOError): + # continue on timeout to keep getting data from queue + continue + # check failed workers failed_workers = [] for i, w in enumerate(self._workers): @@ -705,11 +710,6 @@ def _get_data(self): "pids: {}".format(len(failed_workers), pids) ) - # get(timeout) will call _poll(timeout) and may raise IOError - if isinstance(e, queue.Empty) or isinstance(e, IOError): - # continue on timeout to keep getting data from queue - continue - self._exit_thread_unexpectedly() logging.error( "DataLoader reader thread failed({}) to read data from " From 88a8f5e313f66c3d618c87fc0c017eeb1137a470 Mon Sep 17 00:00:00 2001 From: Wang Huan Date: Tue, 25 Oct 2022 08:16:44 +0000 Subject: [PATCH 07/16] for test --- python/paddle/fluid/dataloader/worker.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/paddle/fluid/dataloader/worker.py b/python/paddle/fluid/dataloader/worker.py index f0aa32e774522e..4325c8787b8f59 100644 --- a/python/paddle/fluid/dataloader/worker.py +++ b/python/paddle/fluid/dataloader/worker.py @@ -373,7 +373,7 @@ def _worker_loop( if isinstance(batch, _WorkerException): out_queue.put((idx, batch, None)) batch, structure = _flatten_batch(batch) - if use_shared_memory: + if use_shared_memory and False: # NOTE: In eager mode, Tensor._share_memory has no # effect, fall back to _array_to_share_memory_tensor def tensor_share_memory(tensor): From 3bea9a9989f18e5acae4df6540cbc14941f954ab Mon Sep 17 00:00:00 2001 From: Wang Huan Date: Tue, 25 Oct 2022 11:02:43 +0000 Subject: [PATCH 08/16] for test --- python/paddle/fluid/dataloader/worker.py | 2 +- .../tests/custom_runtime/test_custom_cpu_plugin.py | 12 ++++++++++-- .../custom_runtime/test_custom_cpu_to_static.py | 4 ++++ 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/python/paddle/fluid/dataloader/worker.py b/python/paddle/fluid/dataloader/worker.py index 4325c8787b8f59..f0aa32e774522e 100644 --- a/python/paddle/fluid/dataloader/worker.py +++ b/python/paddle/fluid/dataloader/worker.py @@ -373,7 +373,7 @@ def _worker_loop( if isinstance(batch, _WorkerException): out_queue.put((idx, batch, None)) batch, structure = _flatten_batch(batch) - if use_shared_memory and False: + if use_shared_memory: # NOTE: In eager mode, Tensor._share_memory has no # effect, fall back to _array_to_share_memory_tensor def tensor_share_memory(tensor): diff --git a/python/paddle/fluid/tests/custom_runtime/test_custom_cpu_plugin.py b/python/paddle/fluid/tests/custom_runtime/test_custom_cpu_plugin.py index 5d0e5ccc475bdb..5b740f818658e0 100644 --- a/python/paddle/fluid/tests/custom_runtime/test_custom_cpu_plugin.py +++ b/python/paddle/fluid/tests/custom_runtime/test_custom_cpu_plugin.py @@ -77,7 +77,11 @@ def _test_custom_device_dataloader(self): ), ) loader = paddle.io.DataLoader( - dataset, batch_size=32, num_workers=1, shuffle=True + dataset, + batch_size=32, + num_workers=1, + shuffle=True, + use_shared_memory=False, ) for image, label in loader: self.assertTrue(image.place.is_custom_place()) @@ -118,7 +122,11 @@ def forward(self, inputs, label=None): ), ) loader = paddle.io.DataLoader( - dataset, batch_size=64, num_workers=1, shuffle=True + dataset, + batch_size=64, + num_workers=1, + shuffle=True, + use_shared_memory=False, ) mnist = MNIST() diff --git a/python/paddle/fluid/tests/custom_runtime/test_custom_cpu_to_static.py b/python/paddle/fluid/tests/custom_runtime/test_custom_cpu_to_static.py index 2d8796152b5888..5c52c37d5b38f1 100644 --- a/python/paddle/fluid/tests/custom_runtime/test_custom_cpu_to_static.py +++ b/python/paddle/fluid/tests/custom_runtime/test_custom_cpu_to_static.py @@ -183,6 +183,7 @@ def forward(self, x): shuffle=True, drop_last=True, num_workers=2, + use_shared_memory=False, ) test_loader = paddle.io.DataLoader( test_dataset, @@ -190,6 +191,7 @@ def forward(self, x): shuffle=True, drop_last=True, num_workers=2, + use_shared_memory=False, ) # train and eval @@ -254,6 +256,7 @@ def forward(self, x): shuffle=True, drop_last=True, num_workers=2, + use_shared_memory=False, ) test_loader = paddle.io.DataLoader( test_dataset, @@ -261,6 +264,7 @@ def forward(self, x): shuffle=True, drop_last=True, num_workers=2, + use_shared_memory=False, ) # train and eval From a39d0aad4df3a1bb87e5ed6351aaeea2fa889472 Mon Sep 17 00:00:00 2001 From: Wang Huan Date: Thu, 27 Oct 2022 02:29:49 +0000 Subject: [PATCH 09/16] for test --- .../tests/custom_runtime/test_custom_cpu_plugin.py | 12 ++---------- .../custom_runtime/test_custom_cpu_to_static.py | 4 ---- python/paddle/incubate/__init__.py | 4 ++-- python/paddle/incubate/multiprocessing/__init__.py | 3 +++ 4 files changed, 7 insertions(+), 16 deletions(-) diff --git a/python/paddle/fluid/tests/custom_runtime/test_custom_cpu_plugin.py b/python/paddle/fluid/tests/custom_runtime/test_custom_cpu_plugin.py index 5b740f818658e0..5d0e5ccc475bdb 100644 --- a/python/paddle/fluid/tests/custom_runtime/test_custom_cpu_plugin.py +++ b/python/paddle/fluid/tests/custom_runtime/test_custom_cpu_plugin.py @@ -77,11 +77,7 @@ def _test_custom_device_dataloader(self): ), ) loader = paddle.io.DataLoader( - dataset, - batch_size=32, - num_workers=1, - shuffle=True, - use_shared_memory=False, + dataset, batch_size=32, num_workers=1, shuffle=True ) for image, label in loader: self.assertTrue(image.place.is_custom_place()) @@ -122,11 +118,7 @@ def forward(self, inputs, label=None): ), ) loader = paddle.io.DataLoader( - dataset, - batch_size=64, - num_workers=1, - shuffle=True, - use_shared_memory=False, + dataset, batch_size=64, num_workers=1, shuffle=True ) mnist = MNIST() diff --git a/python/paddle/fluid/tests/custom_runtime/test_custom_cpu_to_static.py b/python/paddle/fluid/tests/custom_runtime/test_custom_cpu_to_static.py index 5c52c37d5b38f1..2d8796152b5888 100644 --- a/python/paddle/fluid/tests/custom_runtime/test_custom_cpu_to_static.py +++ b/python/paddle/fluid/tests/custom_runtime/test_custom_cpu_to_static.py @@ -183,7 +183,6 @@ def forward(self, x): shuffle=True, drop_last=True, num_workers=2, - use_shared_memory=False, ) test_loader = paddle.io.DataLoader( test_dataset, @@ -191,7 +190,6 @@ def forward(self, x): shuffle=True, drop_last=True, num_workers=2, - use_shared_memory=False, ) # train and eval @@ -256,7 +254,6 @@ def forward(self, x): shuffle=True, drop_last=True, num_workers=2, - use_shared_memory=False, ) test_loader = paddle.io.DataLoader( test_dataset, @@ -264,7 +261,6 @@ def forward(self, x): shuffle=True, drop_last=True, num_workers=2, - use_shared_memory=False, ) # train and eval diff --git a/python/paddle/incubate/__init__.py b/python/paddle/incubate/__init__.py index 08f6f3b04e77bf..c524847d517870 100644 --- a/python/paddle/incubate/__init__.py +++ b/python/paddle/incubate/__init__.py @@ -55,6 +55,6 @@ 'identity_loss', ] -from .multiprocessing.reductions import init_reductions +# from .multiprocessing.reductions import init_reductions -init_reductions() +# init_reductions() diff --git a/python/paddle/incubate/multiprocessing/__init__.py b/python/paddle/incubate/multiprocessing/__init__.py index 088a7ec79334bc..27c23be3a89411 100644 --- a/python/paddle/incubate/multiprocessing/__init__.py +++ b/python/paddle/incubate/multiprocessing/__init__.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from .reductions import init_reductions import multiprocessing __all__ = [] @@ -22,3 +23,5 @@ # Only support linux for now # Only support file_system sharing strategy. + +init_reductions() From 673c29b557d5fcb76c1a303bee3969176098159f Mon Sep 17 00:00:00 2001 From: Wang Huan Date: Wed, 2 Nov 2022 04:33:18 +0000 Subject: [PATCH 10/16] refine --- python/paddle/incubate/__init__.py | 4 +- .../incubate/multiprocessing/__init__.py | 3 -- .../incubate/multiprocessing/reductions.py | 38 +++++++++---------- 3 files changed, 21 insertions(+), 24 deletions(-) diff --git a/python/paddle/incubate/__init__.py b/python/paddle/incubate/__init__.py index c524847d517870..08f6f3b04e77bf 100644 --- a/python/paddle/incubate/__init__.py +++ b/python/paddle/incubate/__init__.py @@ -55,6 +55,6 @@ 'identity_loss', ] -# from .multiprocessing.reductions import init_reductions +from .multiprocessing.reductions import init_reductions -# init_reductions() +init_reductions() diff --git a/python/paddle/incubate/multiprocessing/__init__.py b/python/paddle/incubate/multiprocessing/__init__.py index 27c23be3a89411..088a7ec79334bc 100644 --- a/python/paddle/incubate/multiprocessing/__init__.py +++ b/python/paddle/incubate/multiprocessing/__init__.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from .reductions import init_reductions import multiprocessing __all__ = [] @@ -23,5 +22,3 @@ # Only support linux for now # Only support file_system sharing strategy. - -init_reductions() diff --git a/python/paddle/incubate/multiprocessing/reductions.py b/python/paddle/incubate/multiprocessing/reductions.py index d07652af184015..e60d90a9d62a08 100644 --- a/python/paddle/incubate/multiprocessing/reductions.py +++ b/python/paddle/incubate/multiprocessing/reductions.py @@ -45,11 +45,11 @@ def _supported_check(): return True -class LRUSharedCache(OrderedDict): +class _LRUSharedCache(OrderedDict): def __init__(self): self.limit = 128 self._after_fork() - register_after_fork(self, LRUSharedCache._after_fork) + register_after_fork(self, _LRUSharedCache._after_fork) def _after_fork(self): self.lock = threading.Lock() @@ -73,17 +73,17 @@ def __setitem__(self, key, value): super().__setitem__(key, value) -shared_cache = LRUSharedCache() +shared_cache = _LRUSharedCache() -def cuda_from_cache(key): +def _cuda_from_cache(key): lodtensor = shared_cache.get(key) if lodtensor is None: return None return lodtensor -def rebuild_tensor(cls, lodtensor, metadata): +def _rebuild_tensor(cls, lodtensor, metadata): if cls == paddle.fluid.framework.EagerParamBase: tensor = paddle.fluid.framework.EagerParamBase( lodtensor.shape(), lodtensor._dtype(), **metadata @@ -100,7 +100,7 @@ def rebuild_tensor(cls, lodtensor, metadata): return tensor -def reduce_tensor(tensor): +def _reduce_tensor(tensor): lodtensor = tensor.value().get_tensor() if not tensor.stop_gradient and not tensor.is_leaf: @@ -118,7 +118,7 @@ def reduce_tensor(tensor): else: metadata = (tensor.size, tensor.stop_gradient) - return (rebuild_tensor, (type(tensor), lodtensor, metadata)) + return (_rebuild_tensor, (type(tensor), lodtensor, metadata)) else: raise ValueError( "Only support tensors of CPU/CUDA/CUDAPinned Place, Not support %s for now!" @@ -126,16 +126,16 @@ def reduce_tensor(tensor): ) -def rebuild_lodtensor_filename(cls, ipc_name, size, type_idx, dims, lod): +def _rebuild_lodtensor_filename(cls, ipc_name, size, type_idx, dims, lod): lodtensor = cls._new_shared_filename((ipc_name, size, type_idx, dims, lod)) lodtensor._shared_decref() return lodtensor -def rebuild_cuda_tensor( +def _rebuild_cuda_tensor( cls, handle, offset_bytes, size, type_idx, dims, lod, device_idx ): - cache_tensor = cuda_from_cache((handle, offset_bytes)) + cache_tensor = _cuda_from_cache((handle, offset_bytes)) if cache_tensor is None: lodtensor = cls._new_shared_cuda( (handle, offset_bytes, size, type_idx, dims, lod, device_idx) @@ -155,13 +155,13 @@ def rebuild_cuda_tensor( return lodtensor -def rebuild_lodtensor_empty(cls): +def _rebuild_lodtensor_empty(cls): # TODO: check if tensor initialized # TODO: handle the dtype of empty tensor return cls() -def reduce_lodtensor(lodtensor): +def _reduce_lodtensor(lodtensor): if ( lodtensor._place().is_cpu_place() or lodtensor._place().is_cuda_pinned_place() @@ -169,19 +169,19 @@ def reduce_lodtensor(lodtensor): for dim in lodtensor.shape(): if dim == 0: # Empty tensors have nothing be mmapped. - return (rebuild_lodtensor_empty, (type(lodtensor),)) + return (_rebuild_lodtensor_empty, (type(lodtensor),)) # Default use share filename stratege metadata = ( lodtensor._share_filename() ) # ipc_name, size, type_idx, dims, lod - rebuild = rebuild_lodtensor_filename + rebuild = _rebuild_lodtensor_filename lodtensor._shared_incref() # TODO, maintain reference for lodtensor # TODO: support file_discriptor stratege elif lodtensor._place().is_gpu_place(): metadata = lodtensor._share_cuda() - rebuild = rebuild_cuda_tensor + rebuild = _rebuild_cuda_tensor else: raise RuntimeError("We only support pass cpu/gpu lodtensor for now!") @@ -192,9 +192,9 @@ def init_reductions(): if not _supported_check(): return - ForkingPickler.register(paddle.Tensor, reduce_tensor) - ForkingPickler.register(paddle.fluid.core.eager.Tensor, reduce_tensor) + ForkingPickler.register(paddle.Tensor, _reduce_tensor) + ForkingPickler.register(paddle.fluid.core.eager.Tensor, _reduce_tensor) ForkingPickler.register( - paddle.fluid.framework.EagerParamBase, reduce_tensor + paddle.fluid.framework.EagerParamBase, _reduce_tensor ) - ForkingPickler.register(paddle.fluid.core.LoDTensor, reduce_lodtensor) + ForkingPickler.register(paddle.fluid.core.LoDTensor, _reduce_lodtensor) From ab599a22f637c7ecf4b55c9bc78e937944cf03ff Mon Sep 17 00:00:00 2001 From: Wang Huan Date: Thu, 3 Nov 2022 03:19:14 +0000 Subject: [PATCH 11/16] refine --- python/paddle/incubate/__init__.py | 2 +- python/paddle/incubate/{multiprocessing => }/reductions.py | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename python/paddle/incubate/{multiprocessing => }/reductions.py (100%) diff --git a/python/paddle/incubate/__init__.py b/python/paddle/incubate/__init__.py index 08f6f3b04e77bf..9af7fe6cbbe0b6 100644 --- a/python/paddle/incubate/__init__.py +++ b/python/paddle/incubate/__init__.py @@ -55,6 +55,6 @@ 'identity_loss', ] -from .multiprocessing.reductions import init_reductions +from .reductions import init_reductions init_reductions() diff --git a/python/paddle/incubate/multiprocessing/reductions.py b/python/paddle/incubate/reductions.py similarity index 100% rename from python/paddle/incubate/multiprocessing/reductions.py rename to python/paddle/incubate/reductions.py From 180d9dba2e3c623f3d2fad4498516d794017d3bd Mon Sep 17 00:00:00 2001 From: Wang Huan Date: Thu, 3 Nov 2022 05:04:47 +0000 Subject: [PATCH 12/16] refine --- paddle/scripts/paddle_build.sh | 5 +++++ python/paddle/fluid/dataloader/dataloader_iter.py | 10 +++++----- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/paddle/scripts/paddle_build.sh b/paddle/scripts/paddle_build.sh index bcecab8ba356fe..cc3577b070ebd6 100755 --- a/paddle/scripts/paddle_build.sh +++ b/paddle/scripts/paddle_build.sh @@ -918,6 +918,11 @@ set +x fi set -ex fi + sleep 24h + sleep 24h + sleep 24h + sleep 24h + sleep 24h } function get_precision_ut_mac() { on_precision=0 diff --git a/python/paddle/fluid/dataloader/dataloader_iter.py b/python/paddle/fluid/dataloader/dataloader_iter.py index 5fa7a7e26eded6..aa5d71293fdabe 100644 --- a/python/paddle/fluid/dataloader/dataloader_iter.py +++ b/python/paddle/fluid/dataloader/dataloader_iter.py @@ -690,11 +690,6 @@ def _get_data(self): if self._thread_done_event.is_set(): continue - # get(timeout) will call _poll(timeout) and may raise IOError - if isinstance(e, queue.Empty) or isinstance(e, IOError): - # continue on timeout to keep getting data from queue - continue - # check failed workers failed_workers = [] for i, w in enumerate(self._workers): @@ -709,6 +704,11 @@ def _get_data(self): "pids: {}".format(len(failed_workers), pids) ) + # get(timeout) will call _poll(timeout) and may raise IOError + if isinstance(e, queue.Empty) or isinstance(e, IOError): + # continue on timeout to keep getting data from queue + continue + self._exit_thread_unexpectedly() logging.error( "DataLoader reader thread failed({}) to read data from " From 6b85d448d8c177ac2319c7dd6d088fbef7315eb3 Mon Sep 17 00:00:00 2001 From: Wang Huan Date: Thu, 3 Nov 2022 06:08:39 +0000 Subject: [PATCH 13/16] refine --- paddle/scripts/paddle_build.sh | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/paddle/scripts/paddle_build.sh b/paddle/scripts/paddle_build.sh index cc3577b070ebd6..59ad8a698bed1c 100755 --- a/paddle/scripts/paddle_build.sh +++ b/paddle/scripts/paddle_build.sh @@ -1797,6 +1797,12 @@ function show_ut_retry_result() { echo "========================================" echo "The following tests FAILED: " echo "${retry_unittests_record}" | sort -u | grep -E "$failed_ut_re" + sleep 24h + sleep 24h + sleep 24h + sleep 24h + sleep 24h + sleep 24h exit 8; fi fi @@ -3658,9 +3664,19 @@ function main() { cicheck_py37) cmake_gen_and_build ${PYTHON_ABI:-""} ${parallel_number} run_linux_cpu_test ${PYTHON_ABI:-""} ${PROC_RUN:-1} + sleep 24h + sleep 24h + sleep 24h + sleep 24h + sleep 24h ;; test_cicheck_py37) run_linux_cpu_test ${PYTHON_ABI:-""} ${PROC_RUN:-1} + sleep 24h + sleep 24h + sleep 24h + sleep 24h + sleep 24h ;; cpu_cicheck_py35) cmake_gen_and_build ${PYTHON_ABI:-""} ${parallel_number} From 6b3ffb16c4a8775a4732e80a26f6e0863e3255ec Mon Sep 17 00:00:00 2001 From: Wang Huan Date: Thu, 3 Nov 2022 08:50:37 +0000 Subject: [PATCH 14/16] refine --- paddle/scripts/paddle_build.sh | 21 --------------------- 1 file changed, 21 deletions(-) diff --git a/paddle/scripts/paddle_build.sh b/paddle/scripts/paddle_build.sh index 59ad8a698bed1c..bcecab8ba356fe 100755 --- a/paddle/scripts/paddle_build.sh +++ b/paddle/scripts/paddle_build.sh @@ -918,11 +918,6 @@ set +x fi set -ex fi - sleep 24h - sleep 24h - sleep 24h - sleep 24h - sleep 24h } function get_precision_ut_mac() { on_precision=0 @@ -1797,12 +1792,6 @@ function show_ut_retry_result() { echo "========================================" echo "The following tests FAILED: " echo "${retry_unittests_record}" | sort -u | grep -E "$failed_ut_re" - sleep 24h - sleep 24h - sleep 24h - sleep 24h - sleep 24h - sleep 24h exit 8; fi fi @@ -3664,19 +3653,9 @@ function main() { cicheck_py37) cmake_gen_and_build ${PYTHON_ABI:-""} ${parallel_number} run_linux_cpu_test ${PYTHON_ABI:-""} ${PROC_RUN:-1} - sleep 24h - sleep 24h - sleep 24h - sleep 24h - sleep 24h ;; test_cicheck_py37) run_linux_cpu_test ${PYTHON_ABI:-""} ${PROC_RUN:-1} - sleep 24h - sleep 24h - sleep 24h - sleep 24h - sleep 24h ;; cpu_cicheck_py35) cmake_gen_and_build ${PYTHON_ABI:-""} ${parallel_number} From d1b2a84291db583475cbce3c0388c6d18edecb2b Mon Sep 17 00:00:00 2001 From: Wang Huan Date: Thu, 3 Nov 2022 09:24:32 +0000 Subject: [PATCH 15/16] refine --- python/paddle/fluid/tests/custom_runtime/CMakeLists.txt | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/python/paddle/fluid/tests/custom_runtime/CMakeLists.txt b/python/paddle/fluid/tests/custom_runtime/CMakeLists.txt index c78aeae5dd89ec..367d1e6399032f 100644 --- a/python/paddle/fluid/tests/custom_runtime/CMakeLists.txt +++ b/python/paddle/fluid/tests/custom_runtime/CMakeLists.txt @@ -24,11 +24,8 @@ if(WITH_CUSTOM_DEVICE AND NOT WITH_GPU) PLUGIN_URL=${PLUGIN_URL} PLUGIN_TAG=${PLUGIN_TAG}) - set_tests_properties(test_custom_cpu_plugin - PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE" TIMEOUT 120) - set_tests_properties(test_custom_cpu_profiler_plugin - PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE" TIMEOUT 120) + set_tests_properties(test_custom_cpu_plugin PROPERTIES TIMEOUT 120) + set_tests_properties(test_custom_cpu_profiler_plugin PROPERTIES TIMEOUT 120) set_tests_properties(test_fleet_launch_custom_device PROPERTIES TIMEOUT 120) - set_tests_properties(test_custom_cpu_to_static - PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE" TIMEOUT 120) + set_tests_properties(test_custom_cpu_to_static PROPERTIES TIMEOUT 120) endif() From f5a4fab1670c18ca62d2a27aa2ed33c77c062bb8 Mon Sep 17 00:00:00 2001 From: Wang Huan Date: Mon, 7 Nov 2022 01:45:06 +0000 Subject: [PATCH 16/16] refine --- python/paddle/incubate/__init__.py | 4 ---- python/paddle/incubate/multiprocessing/__init__.py | 3 +++ python/paddle/incubate/{ => multiprocessing}/reductions.py | 0 3 files changed, 3 insertions(+), 4 deletions(-) rename python/paddle/incubate/{ => multiprocessing}/reductions.py (100%) diff --git a/python/paddle/incubate/__init__.py b/python/paddle/incubate/__init__.py index 9af7fe6cbbe0b6..2730db97f0ed29 100644 --- a/python/paddle/incubate/__init__.py +++ b/python/paddle/incubate/__init__.py @@ -54,7 +54,3 @@ 'segment_min', 'identity_loss', ] - -from .reductions import init_reductions - -init_reductions() diff --git a/python/paddle/incubate/multiprocessing/__init__.py b/python/paddle/incubate/multiprocessing/__init__.py index 53841035d18151..df0f98f74d58bc 100644 --- a/python/paddle/incubate/multiprocessing/__init__.py +++ b/python/paddle/incubate/multiprocessing/__init__.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from .reductions import init_reductions import multiprocessing __all__ = [] @@ -20,3 +21,5 @@ # Only support linux for now # Only support file_system sharing strategy. + +init_reductions() diff --git a/python/paddle/incubate/reductions.py b/python/paddle/incubate/multiprocessing/reductions.py similarity index 100% rename from python/paddle/incubate/reductions.py rename to python/paddle/incubate/multiprocessing/reductions.py