diff --git a/oneflow/api/python/framework/tensor_functions.cpp b/oneflow/api/python/framework/tensor_functions.cpp index 724bc06e739..41d5e74fcc9 100644 --- a/oneflow/api/python/framework/tensor_functions.cpp +++ b/oneflow/api/python/framework/tensor_functions.cpp @@ -694,14 +694,17 @@ static PyObject* PyTensorObject_local_to_global(PyObject* self, PyObject* args, PyObject* placement_obj = Py_None; PyObject* sbp_obj = Py_None; PyObject* check_meta_obj = Py_True; + PyObject* sync_data_obj = Py_True; PyObject* copy_obj = Py_False; - static const char* keywords[5] = {"placement", "sbp", "check_meta", "copy", NULL}; - if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|OO$O!O!:local_to_global", + static const char* keywords[6] = {"placement", "sbp", "check_meta", "sync_data", "copy", NULL}; + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|OO$O!O!O!:local_to_global", const_cast(keywords), &placement_obj, &sbp_obj, - &PyBool_Type, &check_meta_obj, &PyBool_Type, ©_obj)) { + &PyBool_Type, &check_meta_obj, &PyBool_Type, &sync_data_obj, + &PyBool_Type, ©_obj)) { return NULL; } const bool check_meta = (check_meta_obj == Py_True); + const bool sync_data = (sync_data_obj == Py_True); const bool copy = (copy_obj == Py_True); CHECK_OR_THROW(placement_obj != Py_None && sbp_obj != Py_None) @@ -720,8 +723,9 @@ static PyObject* PyTensorObject_local_to_global(PyObject* self, PyObject* args, << functional::PyStringAsString(PyObject_Str((PyObject*)Py_TYPE(sbp_obj))); sbp = functional::PyUnpackSbpParallelSequence(sbp_obj); } - return PyTensor_New(ASSERT_PTR(functional::ToGlobal( - tensor, functional::PyUnpackParallelDesc(placement_obj), sbp, {}, check_meta, copy))); + return PyTensor_New( + ASSERT_PTR(functional::ToGlobal(tensor, functional::PyUnpackParallelDesc(placement_obj), sbp, + {}, check_meta, sync_data, copy))); END_HANDLE_ERRORS } @@ -736,14 +740,18 @@ static PyObject* PyTensorObject_global_to_global(PyObject* self, PyObject* args, std::vector> sbp; std::vector> grad_sbp; PyObject* check_meta_obj = Py_False; + PyObject* sync_data_obj = Py_True; PyObject* copy_obj = Py_False; - static const char* keywords[6] = {"placement", "sbp", "grad_sbp", "check_meta", "copy", NULL}; - if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|OO$OO!O!:global_to_global", + static const char* keywords[7] = {"placement", "sbp", "grad_sbp", "check_meta", + "sync_data", "copy", NULL}; + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|OO$OO!O!O!:global_to_global", const_cast(keywords), &placement_obj, &sbp_obj, - &grad_sbp_obj, &PyBool_Type, &check_meta_obj, ©_obj)) { + &grad_sbp_obj, &PyBool_Type, &check_meta_obj, &PyBool_Type, + &sync_data_obj, &PyBool_Type, ©_obj)) { return NULL; } const bool check_meta = (check_meta_obj == Py_True); + const bool sync_data = (sync_data_obj == Py_True); const bool copy = (copy_obj == Py_True); // sbp @@ -780,8 +788,8 @@ static PyObject* PyTensorObject_global_to_global(PyObject* self, PyObject* args, } else if (functional::PySbpParallelSequenceCheck(grad_sbp_obj)) { grad_sbp = functional::PyUnpackSbpParallelSequence(grad_sbp_obj); } - return PyTensor_New( - ASSERT_PTR(functional::ToGlobal(tensor, placement, sbp, grad_sbp, check_meta, copy))); + return PyTensor_New(ASSERT_PTR( + functional::ToGlobal(tensor, placement, sbp, grad_sbp, check_meta, sync_data, copy))); END_HANDLE_ERRORS } @@ -845,8 +853,8 @@ static PyObject* PyTensorObject_type_as(PyObject* self, PyObject* args, PyObject for (int32_t i = 0; i < ndsbp->sbp_parallel_size(); i++) { sbp.emplace_back(ndsbp->sbp_parallel(i)); } - return PyTensor_New( - ASSERT_PTR(functional::ToGlobal(value_tensor, placement, sbp, {}, true, /*copy=*/false))); + return PyTensor_New(ASSERT_PTR( + functional::ToGlobal(value_tensor, placement, sbp, {}, true, true, /*copy=*/false))); END_HANDLE_ERRORS } @@ -945,7 +953,7 @@ int PyTensorObject_setitem(PyObject* self, PyObject* item, PyObject* value) { << Error::RuntimeError() << "tensor_setitem(): value must be a global tensor when self is global"; value_tensor = ASSERT_PTR( - functional::ToGlobal(value_tensor, placement, sbp, {}, true, /*copy=*/false)); + functional::ToGlobal(value_tensor, placement, sbp, {}, true, true, /*copy=*/false)); } } else { if (functional::PyScalarCheck(value)) { diff --git a/oneflow/api/python/utils/tensor_utils.cpp b/oneflow/api/python/utils/tensor_utils.cpp index e14cbeefa2c..a0e6ae0320a 100644 --- a/oneflow/api/python/utils/tensor_utils.cpp +++ b/oneflow/api/python/utils/tensor_utils.cpp @@ -266,7 +266,7 @@ Maybe MakeGlobalTensorFromData(PyObject* data, const Optional> grad_sbp_tuple; auto global_tensor = JUST(functional::ToGlobal(broadcast_tensor, placement, sbp_tuple, grad_sbp_tuple, - /* check_meta */ false, /*copy=*/false)); + /* check_meta */ false, /* sync_data */ true, /*copy=*/false)); JUST(global_tensor->set_requires_grad(requires_grad)); return global_tensor; } @@ -282,7 +282,7 @@ Maybe MakeTensorFromOtherTensor(const std::shared_ptr& other, std::vector> grad_sbp_tuple; // TODO:(zhaoluyang) global case support pin_memory return functional::ToGlobal(other, JUST(other->parallel_desc()), sbp_tuple, grad_sbp_tuple, - /* check_meta */ false, /*copy=*/false); + /* check_meta */ false, /* sync_data */ true, /*copy=*/false); } } @@ -318,8 +318,9 @@ Maybe MakeTensorFromOtherTensor(const std::shared_ptr& other, const bool requires_grad) { std::vector> grad_sbp_tuple; bool check_meta = other->is_global() ? false : true; - std::shared_ptr tensor = JUST(functional::ToGlobal( - other, placement, sbp_tuple, grad_sbp_tuple, check_meta, /*copy=*/false)); + std::shared_ptr tensor = + JUST(functional::ToGlobal(other, placement, sbp_tuple, grad_sbp_tuple, check_meta, + /* sync_data */ true, /*copy=*/false)); if (dtype) { const Symbol& dtype_ = JUST(dtype); if (tensor->dtype() != dtype_) { diff --git a/oneflow/core/autograd/autograd_engine.cpp b/oneflow/core/autograd/autograd_engine.cpp index 3812c1007f3..f5ca9d21524 100644 --- a/oneflow/core/autograd/autograd_engine.cpp +++ b/oneflow/core/autograd/autograd_engine.cpp @@ -169,7 +169,8 @@ Maybe FunctionNode::AccGrad4LeafTensor(bool create_graph) { const auto& nd_sbp = JUST(tensor_info.sbp()); JUST(out->set_acc_grad( JUST(functional::ToGlobal(acc_grad, placement, *JUST(GetSbpList(nd_sbp)), - GetNoneSbpList(), /* check_meta */ false, /*copy=*/false)))); + GetNoneSbpList(), /* check_meta */ false, /*sync_data*/ true, + /*copy=*/false)))); } } } diff --git a/oneflow/core/autograd/gradient_funcs/global_cast.cpp b/oneflow/core/autograd/gradient_funcs/global_cast.cpp index 781456fd520..5c3f9b3f599 100644 --- a/oneflow/core/autograd/gradient_funcs/global_cast.cpp +++ b/oneflow/core/autograd/gradient_funcs/global_cast.cpp @@ -60,9 +60,10 @@ class LocalToGlobal : public OpExprGradFunction { { Symbol nd_sbp_constraint = ctx->nd_sbp; Symbol parallel_desc_constraint = ctx->parallel_desc; - out_grad = JUST(functional::ToGlobal(out_grad, parallel_desc_constraint, - *JUST(GetSbpList(nd_sbp_constraint)), GetNoneSbpList(), - /* check_meta */ false, /*copy=*/false)); + out_grad = + JUST(functional::ToGlobal(out_grad, parallel_desc_constraint, + *JUST(GetSbpList(nd_sbp_constraint)), GetNoneSbpList(), + /* check_meta */ false, /* sync_data */ true, /*copy=*/false)); } in_grads->at(0) = JUST(OpInterpUtil::Dispatch(*grad_op_, {out_grad})); return Maybe::Ok(); diff --git a/oneflow/core/autograd/gradient_funcs/global_to_global.cpp b/oneflow/core/autograd/gradient_funcs/global_to_global.cpp index 10a8e409699..482713d0116 100644 --- a/oneflow/core/autograd/gradient_funcs/global_to_global.cpp +++ b/oneflow/core/autograd/gradient_funcs/global_to_global.cpp @@ -60,13 +60,15 @@ class GlobalToGlobalGradFunction : public OpExprGradFunctionparallel_desc, *grad_sbp_list, - {}, /* check_meta */ false, /*copy=*/false)); + (*in_grads)[0] = + JUST(one::functional::ToGlobal(out_grad, ctx->parallel_desc, *grad_sbp_list, {}, + /* check_meta */ false, /* sync_data */ true, + /*copy=*/false)); } else { const auto& grad_grad_sbp_list = JUST(GetSbpList(ctx->nd_sbp)); (*in_grads)[0] = JUST(one::functional::ToGlobal(out_grad, ctx->parallel_desc, *grad_sbp_list, *grad_grad_sbp_list, /* check_meta */ false, - /*copy=*/false)); + /* sync_data */ true, /*copy=*/false)); } return Maybe::Ok(); } diff --git a/oneflow/core/framework/nn_graph.cpp b/oneflow/core/framework/nn_graph.cpp index 698466f6a06..d639f027026 100644 --- a/oneflow/core/framework/nn_graph.cpp +++ b/oneflow/core/framework/nn_graph.cpp @@ -747,7 +747,8 @@ Maybe NNGraph::GetVariableRealBlobAfterSyncPlan() { // To consistent from a local or global tensor. bool check_meta = load_tensor_iter->second->is_global() ? false : true; tensor = JUST(one::functional::ToGlobal(load_tensor_iter->second, placement, *sbp_tuple, - grad_sbp_tuple, check_meta, /*copy=*/false)); + grad_sbp_tuple, check_meta, /* sync_data */ true, + /*copy=*/false)); JUST(vm::CurrentRankSync()); VLOG(2) << "Lazy nn.Graph name " << name_ << " op: " << op_attribute.op_conf().name() << " created in JobPass, nn.Graph has loaded the tensor from state dict for this " @@ -782,7 +783,7 @@ Maybe NNGraph::GetVariableRealBlobAfterSyncPlan() { auto lazy_mode_disabled_guard = LazyMode::Guard(/* is_enabled */ false); const auto& new_tensor = JUST(one::functional::ToGlobal( tensor, JUST(tensor->parallel_desc()), optimized_sbp_parallels, {}, - /* check_meta */ false, /*copy=*/false)); + /* check_meta */ false, /* sync_data */ true, /*copy=*/false)); JUST(vm::CurrentRankSync()); // Use tensor.set_data inferface and make new TensorImpl instead of the old one. JUST(tensor->set_data(new_tensor)); diff --git a/oneflow/core/framework/tensor.cpp b/oneflow/core/framework/tensor.cpp index 02a8b581a8b..c08951ef1ef 100644 --- a/oneflow/core/framework/tensor.cpp +++ b/oneflow/core/framework/tensor.cpp @@ -226,7 +226,8 @@ Maybe GlobalTensor::clone() const { std::shared_ptr input = std::const_pointer_cast(shared_from_this()); DisableCheckGlobalTensorMetaScope disable_meta_check{}; return JUST(functional::ToGlobal(input, JUST(parallel_desc()), *JUST(GetSbpList(JUST(nd_sbp()))), - /*grad_sbp_parallels=*/{}, /* sync_data */ true, /*copy=*/true)); + /*grad_sbp_parallels=*/{}, /* check_meta */ true, + /* sync_data */ true, /*copy=*/true)); } Maybe GlobalTensor::MakeTensor(const std::shared_ptr& shape, diff --git a/oneflow/core/framework/tensor_util.cpp b/oneflow/core/framework/tensor_util.cpp index c4ef23554cc..5a3c93e90c1 100644 --- a/oneflow/core/framework/tensor_util.cpp +++ b/oneflow/core/framework/tensor_util.cpp @@ -94,7 +94,7 @@ Maybe GetItemInScalarTensor(const std::shared_ptr& scalar_tensor, } const auto& broadcast_sbp = JUST(MakeBroadcastSbpParallel()); tensor = JUST(functional::ToGlobal(tensor, parallel_desc, {broadcast_sbp}, /*grad_sbp=*/{}, - /*check_meta=*/false, /*copy=*/false)); + /*check_meta=*/false, /*sync_data=*/true, /*copy=*/false)); tensor = JUST(functional::GlobalToLocal(tensor, /*copy=*/false)); } local_tensor = JUST(tensor->AsLocalTensor()); diff --git a/oneflow/core/functional/functional_api.yaml b/oneflow/core/functional/functional_api.yaml index 3323db0e93c..3b25f79dd39 100644 --- a/oneflow/core/functional/functional_api.yaml +++ b/oneflow/core/functional/functional_api.yaml @@ -2404,7 +2404,7 @@ bind_python: False - name: "to_global" - signature: "Tensor (Tensor x, Placement placement, SbpList sbp, SbpList grad_sbp, Bool check_meta, Bool copy=False) => ToGlobal" + signature: "Tensor (Tensor x, Placement placement, SbpList sbp, SbpList grad_sbp, Bool check_meta, Bool sync_data, Bool copy=False) => ToGlobal" bind_python: True - name: "to_local" diff --git a/oneflow/core/functional/impl/array_functor.cpp b/oneflow/core/functional/impl/array_functor.cpp index aef7ef62a3b..07e14c2058f 100644 --- a/oneflow/core/functional/impl/array_functor.cpp +++ b/oneflow/core/functional/impl/array_functor.cpp @@ -146,7 +146,7 @@ class GlobalTensorConstantFunctor { const auto& fixed_sbp_tuple = JUST(NdSbpReplacePartialByBroadcast(sbp_tuple)); const auto& tensor = JUST(dispatch_constant(*fixed_sbp_tuple)); return functional::ToGlobal(tensor, placement, sbp_tuple, {}, /* check_meta */ false, - /*copy*/ false); + /* sync_data */ true, /*copy*/ false); } else { return dispatch_constant(sbp_tuple); } @@ -237,7 +237,7 @@ class GlobalConstantFunctor { const auto& fixed_sbp_tuple = JUST(NdSbpReplacePartialByBroadcast(sbp_tuple)); const auto& tensor = JUST(dispatch_constant(*fixed_sbp_tuple)); return functional::ToGlobal(tensor, placement, sbp_tuple, {}, /* check_meta */ false, - /*copy*/ false); + /* sync_data */ true, /*copy*/ false); } else { return dispatch_constant(sbp_tuple); } diff --git a/oneflow/core/functional/impl/global_cast.cpp b/oneflow/core/functional/impl/global_cast.cpp index 86d2e3255d2..f5cc09930d0 100644 --- a/oneflow/core/functional/impl/global_cast.cpp +++ b/oneflow/core/functional/impl/global_cast.cpp @@ -520,7 +520,7 @@ class ToGlobalFunctor { Symbol parallel_desc, const std::vector>& sbp_parallels, const std::vector>& grad_sbp_parallels, - bool check_meta, bool copy) const { + bool check_meta, bool sync_data, bool copy) const { JUST(CheckDeviceIdsIsValid(parallel_desc)); NonRecursiveMetaInfoConsistencyCheckScope scope; JUST(MetaInfoConsistencyCheck(parallel_desc, sbp_parallels, grad_sbp_parallels, 1, @@ -531,8 +531,9 @@ class ToGlobalFunctor { } else { DeviceType device_type = parallel_desc->device_type(); if (ccl::IsBroadcastRegistered(device_type)) { - tensor = JUST(LocalToGlobal(x, parallel_desc, sbp_parallels, NullOpt, NullOpt, - local_to_global_op_, check_meta, /* sync_data */ true, copy)); + tensor = + JUST(LocalToGlobal(x, parallel_desc, sbp_parallels, NullOpt, NullOpt, + local_to_global_op_, check_meta, /* sync_data */ sync_data, copy)); } else { // Assuming that the newly adapted hardware device does not support collective // communication, since local to global may need to synchronize data (through the @@ -543,7 +544,7 @@ class ToGlobalFunctor { JUST(ReplaceDeviceType(parallel_desc, DeviceType::kCPU)); std::shared_ptr cpu_tensor = JUST(LocalToGlobal(x, cpu_parallel_desc, sbp_parallels, NullOpt, NullOpt, - local_to_global_op_, check_meta, /* sync_data */ true, copy)); + local_to_global_op_, check_meta, /* sync_data */ sync_data, copy)); tensor = JUST(GlobalToGlobal(cpu_tensor, parallel_desc, sbp_parallels, GetNoneSbpList(), copy)); } diff --git a/oneflow/core/functional/impl/math_functor.cpp b/oneflow/core/functional/impl/math_functor.cpp index 97d0f2b69dd..059f3323944 100644 --- a/oneflow/core/functional/impl/math_functor.cpp +++ b/oneflow/core/functional/impl/math_functor.cpp @@ -1625,7 +1625,8 @@ class GlobalHannWindowFunctor { result = JUST(ScalarDiv(JUST(ScalarSub(1, JUST(Cos(div_result)), 1)), 2)); } } - result = JUST(ToGlobal(result, placement, sbp, {}, true, /*copy=*/false)); + result = JUST(ToGlobal(result, placement, sbp, {}, /* check_meta */ true, + /* sync_data */ true, /*copy=*/false)); JUST(result->set_requires_grad(requires_grad)); return result; } diff --git a/oneflow/core/functional/impl/nn_functor.cpp b/oneflow/core/functional/impl/nn_functor.cpp index ef4f2f92070..d5d0398d754 100644 --- a/oneflow/core/functional/impl/nn_functor.cpp +++ b/oneflow/core/functional/impl/nn_functor.cpp @@ -2316,10 +2316,10 @@ class SparseSoftmaxCrossEntropyFunctor { s0s1_sbp_parallels.emplace_back(logits_nd_sbp.sbp_parallel(1)); max_global_stage_input0 = JUST(functional::ToGlobal( (*max_device_stage)[0], JUST((*max_device_stage)[0]->parallel_desc()), new_sbp_parallels, - s0s1_sbp_parallels, /* check_meta */ false, /*copy=*/false)); + s0s1_sbp_parallels, /* check_meta */ false, /* sync_data */ true, /*copy=*/false)); max_global_stage_input1 = JUST(functional::ToGlobal( (*max_device_stage)[2], JUST((*max_device_stage)[0]->parallel_desc()), new_sbp_parallels, - s0s1_sbp_parallels, /* check_meta */ false, /*copy=*/false)); + s0s1_sbp_parallels, /* check_meta */ false, /* sync_data */ true, /*copy=*/false)); } // op_reduce_max_global_stage_ auto& reduce_max_global_attrs = THREAD_CACHED_MUTABLE_ATTR_MAP("axis", "keepdims"); @@ -2331,7 +2331,7 @@ class SparseSoftmaxCrossEntropyFunctor { if (logits_nd_sbp.sbp_parallel_size() == 2) { broadcast_sub_input = JUST(functional::ToGlobal( broadcast_sub_input, JUST((*max_device_stage)[0]->parallel_desc()), new_sbp_parallels, - new_sbp_parallels, /* check_meta */ false, /*copy=*/false)); + new_sbp_parallels, /* check_meta */ false, /* sync_data */ true, /*copy=*/false)); } // op_broadcast_sub_ const auto& output_broadcast_sub = JUST( @@ -2349,7 +2349,8 @@ class SparseSoftmaxCrossEntropyFunctor { std::vector> empty_grad_sbp_parallels; broadcast_div_input1 = JUST(functional::ToGlobal( (*output_reduce_sum)[0], JUST((*output_reduce_sum)[0]->parallel_desc()), - new_sbp_parallels, new_sbp_parallels, /* check_meta */ false, /*copy=*/false)); + new_sbp_parallels, new_sbp_parallels, /* check_meta */ false, /* sync_data */ true, + /*copy=*/false)); } // op_broadcast_div_ const auto& predictions = JUST(OpInterpUtil::Dispatch( diff --git a/oneflow/core/functional/tensor_index.cpp b/oneflow/core/functional/tensor_index.cpp index 1d0fc108e36..cae684d1fff 100644 --- a/oneflow/core/functional/tensor_index.cpp +++ b/oneflow/core/functional/tensor_index.cpp @@ -567,7 +567,8 @@ Maybe UnifyInputAndIndicesOnDevice(const std::shared_ptr& x, LazyMode::Guard lazy_mode_disabled_guard(/*is_enabled*/ false); tensor_indices[i] = JUST(ToGlobal(tensor_index, placement, std::vector>(n, broadcast_sbp), - grad_sbp_tuple, /*check_meta=*/false, /*copy=*/false)); + grad_sbp_tuple, /*check_meta=*/false, /*sync_data*/ true, + /*copy=*/false)); } } } diff --git a/python/oneflow/framework/docstr/tensor.py b/python/oneflow/framework/docstr/tensor.py index 82d001aec74..7eb96c3c754 100644 --- a/python/oneflow/framework/docstr/tensor.py +++ b/python/oneflow/framework/docstr/tensor.py @@ -395,7 +395,7 @@ add_docstr( oneflow.Tensor.local_to_global, """ - Tensor.local_to_global(placement=None, sbp=None, *, check_meta=True, copy=False) -> Tensor + Tensor.local_to_global(placement=None, sbp=None, *, check_meta=True, sync_data=True, copy=False) -> Tensor Creates a global tensor from a local tensor. @@ -426,7 +426,7 @@ >>> # Run on 2 ranks respectively >>> import oneflow as flow >>> input = flow.tensor([0., 1.], dtype=flow.float32) # doctest: +SKIP - >>> output = input.local_to_global(placement=flow.placement("cpu", ranks=[0, 1]), sbp=[flow.sbp.split(0)], check_meta=False) # doctest: +SKIP + >>> output = input.local_to_global(placement=flow.placement("cpu", ranks=[0, 1]), sbp=[flow.sbp.split(0)], check_meta=False, sync_data=True) # doctest: +SKIP >>> print(output.size()) # doctest: +SKIP >>> print(output) # doctest: +SKIP @@ -539,7 +539,7 @@ >>> # Run on 2 ranks respectively >>> import oneflow as flow >>> input = flow.tensor([0., 1.], dtype=flow.float32) # doctest: +SKIP - >>> output = input.to_global(placement=flow.placement("cpu", ranks=[0, 1]), sbp=[flow.sbp.split(0)], check_meta=False) # doctest: +SKIP + >>> output = input.to_global(placement=flow.placement("cpu", ranks=[0, 1]), sbp=[flow.sbp.split(0)], check_meta=False, sync_data=True) # doctest: +SKIP >>> print(output.size()) # doctest: +SKIP >>> print(output) # doctest: +SKIP