From df3e4f2593f025cad6584c48c11b6f00a9c951c4 Mon Sep 17 00:00:00 2001 From: Wenjun Si Date: Tue, 26 Apr 2022 16:47:16 +0800 Subject: [PATCH 1/5] Fix apply --- mars/dataframe/groupby/aggregation.py | 4 +++- mars/dataframe/reduction/aggregation.py | 1 + mars/dataframe/reduction/core.py | 4 +++- mars/dataframe/reduction/tests/test_reduction_execution.py | 6 ++++++ 4 files changed, 13 insertions(+), 2 deletions(-) diff --git a/mars/dataframe/groupby/aggregation.py b/mars/dataframe/groupby/aggregation.py index 7d593814c5..7039e517b8 100644 --- a/mars/dataframe/groupby/aggregation.py +++ b/mars/dataframe/groupby/aggregation.py @@ -94,6 +94,7 @@ def get(self): "skew": lambda x, bias=False: x.skew(bias=bias), "kurt": lambda x, bias=False: x.kurt(bias=bias), "kurtosis": lambda x, bias=False: x.kurtosis(bias=bias), + "nunique": lambda x: x.nunique(), } _series_col_name = "col_name" @@ -720,7 +721,8 @@ def _do_custom_agg(op, custom_reduction, *input_objs): result = (result,) if out.ndim == 2: - result = tuple(r.to_frame().T for r in result) + if result[0].ndim == 1: + result = tuple(r.to_frame().T for r in result) if op.stage == OperandStage.agg: result = tuple(r.astype(out.dtypes) for r in result) else: diff --git a/mars/dataframe/reduction/aggregation.py b/mars/dataframe/reduction/aggregation.py index 6945ecec9a..4248dcdcd9 100644 --- a/mars/dataframe/reduction/aggregation.py +++ b/mars/dataframe/reduction/aggregation.py @@ -78,6 +78,7 @@ def where_function(cond, var1, var2): "skew": lambda x, skipna=True, bias=False: x.skew(skipna=skipna, bias=bias), "kurt": lambda x, skipna=True, bias=False: x.kurt(skipna=skipna, bias=bias), "kurtosis": lambda x, skipna=True, bias=False: x.kurtosis(skipna=skipna, bias=bias), + "nunique": lambda x: x.nunique(), } diff --git a/mars/dataframe/reduction/core.py b/mars/dataframe/reduction/core.py index 14d4050038..5ba42af40f 100644 --- a/mars/dataframe/reduction/core.py +++ b/mars/dataframe/reduction/core.py @@ -972,13 +972,15 @@ def _compile_function(self, func, func_name=None, ndim=1) -> ReductionSteps: else: map_func_name, agg_func_name = step_func_name, step_func_name + op_custom_reduction = getattr(t.op, "custom_reduction", None) + # build agg description agg_funcs.append( ReductionAggStep( agg_input_key, map_func_name, agg_func_name, - custom_reduction, + op_custom_reduction or custom_reduction, t.key, output_limit, t.op.get_reduction_args(axis=self._axis), diff --git a/mars/dataframe/reduction/tests/test_reduction_execution.py b/mars/dataframe/reduction/tests/test_reduction_execution.py index 81d4a88e27..be6ca29815 100644 --- a/mars/dataframe/reduction/tests/test_reduction_execution.py +++ b/mars/dataframe/reduction/tests/test_reduction_execution.py @@ -671,6 +671,12 @@ def test_nunique(setup, check_ref_counts): expected = data1.nunique(axis=1) pd.testing.assert_series_equal(result, expected) + # test with agg func + df = md.DataFrame(data1, chunk_size=3) + result = df.agg("nunique").execute().fetch() + expected = data1.agg("nunique") + pd.testing.assert_series_equal(result, expected) + @pytest.mark.skipif(pa is None, reason="pyarrow not installed") def test_use_arrow_dtype_nunique(setup, check_ref_counts): From cf14894fa285d216b54b5c295e148be709d3af05 Mon Sep 17 00:00:00 2001 From: Wenjun Si Date: Tue, 26 Apr 2022 19:49:16 +0800 Subject: [PATCH 2/5] Fix tests --- mars/dataframe/groupby/aggregation.py | 56 ++++++++++++------- mars/dataframe/groupby/tests/test_groupby.py | 21 +++++++ .../groupby/tests/test_groupby_execution.py | 19 ++++--- mars/dataframe/reduction/nunique.py | 44 ++++++--------- 4 files changed, 86 insertions(+), 54 deletions(-) diff --git a/mars/dataframe/groupby/aggregation.py b/mars/dataframe/groupby/aggregation.py index 7039e517b8..e173929ba2 100644 --- a/mars/dataframe/groupby/aggregation.py +++ b/mars/dataframe/groupby/aggregation.py @@ -701,34 +701,36 @@ def _do_custom_agg(op, custom_reduction, *input_objs): out = op.outputs[0] for group_key in input_objs[0].groups.keys(): group_objs = [o.get_group(group_key) for o in input_objs] + agg_done = False if op.stage == OperandStage.map: - result = custom_reduction.pre(group_objs[0]) + res_tuple = custom_reduction.pre(group_objs[0]) agg_done = custom_reduction.pre_with_agg - if not isinstance(result, tuple): - result = (result,) + if not isinstance(res_tuple, tuple): + res_tuple = (res_tuple,) else: - result = group_objs + res_tuple = group_objs if not agg_done: - result = custom_reduction.agg(*result) - if not isinstance(result, tuple): - result = (result,) + res_tuple = custom_reduction.agg(*res_tuple) + if not isinstance(res_tuple, tuple): + res_tuple = (res_tuple,) if op.stage == OperandStage.agg: - result = custom_reduction.post(*result) - if not isinstance(result, tuple): - result = (result,) - - if out.ndim == 2: - if result[0].ndim == 1: - result = tuple(r.to_frame().T for r in result) - if op.stage == OperandStage.agg: - result = tuple(r.astype(out.dtypes) for r in result) - else: - result = tuple(xdf.Series(r) for r in result) + res_tuple = custom_reduction.post(*res_tuple) + if not isinstance(res_tuple, tuple): + res_tuple = (res_tuple,) + + new_res_list = [] + for r in res_tuple: + if out.ndim == 2 and r.ndim == 1: + r = r.to_frame().T + elif out.ndim < 2: + if getattr(r, "ndim", 0) == 2: + r = r.iloc[0, :] + else: + r = xdf.Series(r) - for r in result: if len(input_objs[0].grouper.names) == 1: r.index = xdf.Index( [group_key], name=input_objs[0].grouper.names[0] @@ -737,7 +739,21 @@ def _do_custom_agg(op, custom_reduction, *input_objs): r.index = xdf.MultiIndex.from_tuples( [group_key], names=input_objs[0].grouper.names ) - results.append(result) + + if op.groupby_params.get("selection"): + # correct columns for groupby-selection-agg paradigms + selection = op.groupby_params["selection"] + r.columns = [selection] if input_objs[0].ndim == 1 else selection + + if out.ndim == 2 and op.stage == OperandStage.agg: + dtype_cols = set(out.dtypes.index) & set(r.columns) + conv_dtypes = { + k: v for k, v in out.dtypes.items() if k in dtype_cols + } + r = r.astype(conv_dtypes) + new_res_list.append(r) + + results.append(tuple(new_res_list)) if not results and op.stage == OperandStage.agg: empty_df = pd.DataFrame( [], columns=out.dtypes.index, index=out.index_value.to_pandas()[:0] diff --git a/mars/dataframe/groupby/tests/test_groupby.py b/mars/dataframe/groupby/tests/test_groupby.py index 50a5041da5..83960da53f 100644 --- a/mars/dataframe/groupby/tests/test_groupby.py +++ b/mars/dataframe/groupby/tests/test_groupby.py @@ -476,3 +476,24 @@ def test_groupby_fill(): assert len(r.chunks) == 4 assert r.shape == (len(s1),) assert r.chunks[0].shape == (np.nan,) + + +def test_groupby_nunique(): + df1 = pd.DataFrame( + [ + [1, 1, 10], + [1, 1, np.nan], + [1, 1, np.nan], + [1, 2, np.nan], + [1, 2, 20], + [1, 2, np.nan], + [1, 3, np.nan], + [1, 3, np.nan], + ], + columns=["one", "two", "three"], + ) + mdf = md.DataFrame(df1, chunk_size=3) + + r = tile(mdf.groupby(["one", "two"]).nunique()) + assert len(r.chunks) == 1 + assert isinstance(r.chunks[0].op, DataFrameGroupByAgg) diff --git a/mars/dataframe/groupby/tests/test_groupby_execution.py b/mars/dataframe/groupby/tests/test_groupby_execution.py index c1208e2194..2cace472b9 100644 --- a/mars/dataframe/groupby/tests/test_groupby_execution.py +++ b/mars/dataframe/groupby/tests/test_groupby_execution.py @@ -1241,13 +1241,16 @@ def test_groupby_nunique(setup): # test with as_index=False mdf = md.DataFrame(df1, chunk_size=13) if _agg_size_as_frame: + res = mdf.groupby("b", as_index=False)["a"].nunique().execute().fetch() + expected = df1.groupby("b", as_index=False)["a"].nunique() pd.testing.assert_frame_equal( - mdf.groupby("b", as_index=False)["a"] - .nunique() - .execute() - .fetch() - .sort_values(by="b", ignore_index=True), - df1.groupby("b", as_index=False)["a"] - .nunique() - .sort_values(by="b", ignore_index=True), + res.sort_values(by="b", ignore_index=True), + expected.sort_values(by="b", ignore_index=True), + ) + + res = mdf.groupby("b", as_index=False)[["a", "c"]].nunique().execute().fetch() + expected = df1.groupby("b", as_index=False)[["a", "c"]].nunique() + pd.testing.assert_frame_equal( + res.sort_values(by="b", ignore_index=True), + expected.sort_values(by="b", ignore_index=True), ) diff --git a/mars/dataframe/reduction/nunique.py b/mars/dataframe/reduction/nunique.py index 07d6abf243..73cece906c 100644 --- a/mars/dataframe/reduction/nunique.py +++ b/mars/dataframe/reduction/nunique.py @@ -41,61 +41,53 @@ def __init__( self._dropna = dropna self._use_arrow_dtype = use_arrow_dtype - @staticmethod - def _drop_duplicates_to_arrow(v, explode=False): + def _drop_duplicates(self, xdf, value, explode=False): if explode: - v = v.explode() - try: - return ArrowListArray([v.drop_duplicates().to_numpy()]) - except pa.ArrowInvalid: - # fallback due to diverse dtypes - return [v.drop_duplicates().to_list()] + value = value.explode() + + if not self._use_arrow_dtype or xdf is cudf: + return [value.drop_duplicates().to_numpy()] + else: + try: + return ArrowListArray([value.drop_duplicates().to_numpy()]) + except pa.ArrowInvalid: + # fallback due to diverse dtypes + return [value.drop_duplicates().to_numpy()] def pre(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ xdf = cudf if self.is_gpu() else pd if isinstance(in_data, xdf.Series): - unique_values = in_data.drop_duplicates() + unique_values = self._drop_duplicates(xdf, in_data) return xdf.Series(unique_values, name=in_data.name) else: if self._axis == 0: data = dict() for d, v in in_data.iteritems(): - if not self._use_arrow_dtype or xdf is cudf: - data[d] = [v.drop_duplicates().to_list()] - else: - data[d] = self._drop_duplicates_to_arrow(v) + data[d] = self._drop_duplicates(xdf, v) df = xdf.DataFrame(data) else: df = xdf.DataFrame(columns=[0]) for d, v in in_data.iterrows(): - if not self._use_arrow_dtype or xdf is cudf: - df.loc[d] = [v.drop_duplicates().to_list()] - else: - df.loc[d] = self._drop_duplicates_to_arrow(v) + df.loc[d] = self._drop_duplicates(xdf, v) return df def agg(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ xdf = cudf if self.is_gpu() else pd if isinstance(in_data, xdf.Series): - unique_values = in_data.explode().drop_duplicates() + unique_values = self._drop_duplicates(xdf, in_data, explode=True) return xdf.Series(unique_values, name=in_data.name) else: if self._axis == 0: data = dict() for d, v in in_data.iteritems(): - if not self._use_arrow_dtype or xdf is cudf: - data[d] = [v.explode().drop_duplicates().to_list()] - else: + if self._use_arrow_dtype and xdf is not cudf: v = pd.Series(v.to_numpy()) - data[d] = self._drop_duplicates_to_arrow(v, explode=True) + data[d] = self._drop_duplicates(xdf, v, explode=True) df = xdf.DataFrame(data) else: df = xdf.DataFrame(columns=[0]) for d, v in in_data.iterrows(): - if not self._use_arrow_dtype or xdf is cudf: - df.loc[d] = [v.explode().drop_duplicates().to_list()] - else: - df.loc[d] = self._drop_duplicates_to_arrow(v, explode=True) + df.loc[d] = self._drop_duplicates(xdf, v, explode=True) return df def post(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ From 2c6e530d91e2cabd94bfb4bc048ad5501fc5ee58 Mon Sep 17 00:00:00 2001 From: Wenjun Si Date: Thu, 5 May 2022 20:22:21 +0800 Subject: [PATCH 3/5] Simplify agg with single return --- mars/dataframe/groupby/aggregation.py | 50 +++++++++++++++++++++++---- 1 file changed, 43 insertions(+), 7 deletions(-) diff --git a/mars/dataframe/groupby/aggregation.py b/mars/dataframe/groupby/aggregation.py index e173929ba2..6ae14d75f0 100644 --- a/mars/dataframe/groupby/aggregation.py +++ b/mars/dataframe/groupby/aggregation.py @@ -695,7 +695,28 @@ def _pack_inputs(agg_funcs: List[ReductionAggStep], in_data): return out_dict @staticmethod - def _do_custom_agg(op, custom_reduction, *input_objs): + def _do_custom_agg_single(op, custom_reduction, input_obj): + if op.stage == OperandStage.map: + if custom_reduction.pre_with_agg: + apply_fun = custom_reduction.pre + else: + + def apply_fun(obj): + return custom_reduction.agg(custom_reduction.pre(obj)) + + elif op.stage == OperandStage.agg: + + def apply_fun(obj): + return custom_reduction.post(custom_reduction.agg(obj)) + + else: + apply_fun = custom_reduction.agg + + res = input_obj.apply(apply_fun) + return (res,) + + @staticmethod + def _do_custom_agg_multiple(op, custom_reduction, *input_objs): xdf = cudf if op.gpu else pd results = [] out = op.outputs[0] @@ -763,6 +784,13 @@ def _do_custom_agg(op, custom_reduction, *input_objs): concat_result = tuple(xdf.concat(parts) for parts in zip(*results)) return concat_result + @classmethod + def _do_custom_agg(cls, op, custom_reduction, *input_objs, output_limit: int = 1): + if output_limit == 1: + return cls._do_custom_agg_single(op, custom_reduction, input_objs[0]) + else: + return cls._do_custom_agg_multiple(op, custom_reduction, *input_objs) + @staticmethod def _do_predefined_agg(input_obj, agg_func, single_func=False, **kwds): ndim = getattr(input_obj, "ndim", None) or input_obj.obj.ndim @@ -857,12 +885,16 @@ def _wrapped_func(col): _agg_func_name, custom_reduction, _output_key, - _output_limit, + output_limit, kwds, ) in op.agg_funcs: input_obj = ret_map_groupbys[input_key] if map_func_name == "custom_reduction": - agg_dfs.extend(cls._do_custom_agg(op, custom_reduction, input_obj)) + agg_dfs.extend( + cls._do_custom_agg( + op, custom_reduction, input_obj, output_limit=output_limit + ) + ) else: single_func = map_func_name == op.raw_func agg_dfs.append( @@ -903,12 +935,16 @@ def _execute_combine(cls, ctx, op: "DataFrameGroupByAgg"): agg_func_name, custom_reduction, output_key, - _output_limit, + output_limit, kwds, ) in op.agg_funcs: input_obj = in_data_dict[output_key] if agg_func_name == "custom_reduction": - combines.extend(cls._do_custom_agg(op, custom_reduction, *input_obj)) + combines.extend( + cls._do_custom_agg( + op, custom_reduction, *input_obj, output_limit=output_limit + ) + ) else: combines.append( cls._do_predefined_agg(input_obj, agg_func_name, **kwds) @@ -943,7 +979,7 @@ def _execute_agg(cls, ctx, op: "DataFrameGroupByAgg"): agg_func_name, custom_reduction, output_key, - _output_limit, + output_limit, kwds, ) in op.agg_funcs: if agg_func_name == "custom_reduction": @@ -951,7 +987,7 @@ def _execute_agg(cls, ctx, op: "DataFrameGroupByAgg"): cls._get_grouped(op, o, ctx) for o in in_data_dict[output_key] ) in_data_dict[output_key] = cls._do_custom_agg( - op, custom_reduction, *input_obj + op, custom_reduction, *input_obj, output_limit=output_limit )[0] else: input_obj = cls._get_grouped(op, in_data_dict[output_key], ctx) From f833a7b30a8bb71cc60b0aa7fc7a75870614274d Mon Sep 17 00:00:00 2001 From: wjsi Date: Fri, 6 May 2022 00:52:09 +0800 Subject: [PATCH 4/5] Use numpy methods to accelerate --- mars/dataframe/groupby/aggregation.py | 12 +++-- mars/dataframe/reduction/core.py | 2 + mars/dataframe/reduction/nunique.py | 76 +++++++++++++++++---------- 3 files changed, 57 insertions(+), 33 deletions(-) diff --git a/mars/dataframe/groupby/aggregation.py b/mars/dataframe/groupby/aggregation.py index 6ae14d75f0..31018729dc 100644 --- a/mars/dataframe/groupby/aggregation.py +++ b/mars/dataframe/groupby/aggregation.py @@ -51,6 +51,7 @@ ReductionCompiler, ReductionSteps, ReductionAggStep, + CustomReduction, ) from ..reduction.aggregation import is_funcs_aggregate, normalize_reduction_funcs from ..utils import parse_index, build_concatenated_rows_frame, is_cudf @@ -695,7 +696,7 @@ def _pack_inputs(agg_funcs: List[ReductionAggStep], in_data): return out_dict @staticmethod - def _do_custom_agg_single(op, custom_reduction, input_obj): + def _do_custom_agg_single(op, custom_reduction: CustomReduction, input_obj): if op.stage == OperandStage.map: if custom_reduction.pre_with_agg: apply_fun = custom_reduction.pre @@ -705,9 +706,12 @@ def apply_fun(obj): return custom_reduction.agg(custom_reduction.pre(obj)) elif op.stage == OperandStage.agg: + if custom_reduction.post_with_agg: + apply_fun = custom_reduction.post + else: - def apply_fun(obj): - return custom_reduction.post(custom_reduction.agg(obj)) + def apply_fun(obj): + return custom_reduction.post(custom_reduction.agg(obj)) else: apply_fun = custom_reduction.agg @@ -716,7 +720,7 @@ def apply_fun(obj): return (res,) @staticmethod - def _do_custom_agg_multiple(op, custom_reduction, *input_objs): + def _do_custom_agg_multiple(op, custom_reduction: CustomReduction, *input_objs): xdf = cudf if op.gpu else pd results = [] out = op.outputs[0] diff --git a/mars/dataframe/reduction/core.py b/mars/dataframe/reduction/core.py index 5ba42af40f..f668b71fda 100644 --- a/mars/dataframe/reduction/core.py +++ b/mars/dataframe/reduction/core.py @@ -655,6 +655,8 @@ class CustomReduction: # set to True when pre() already performs aggregation pre_with_agg = False + # set to True when post() already performs aggregation + post_with_agg = False def __init__(self, name=None, is_gpu=None): self.name = name or "" diff --git a/mars/dataframe/reduction/nunique.py b/mars/dataframe/reduction/nunique.py index 73cece906c..77dfd871cc 100644 --- a/mars/dataframe/reduction/nunique.py +++ b/mars/dataframe/reduction/nunique.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import numpy as np import pandas as pd try: @@ -27,82 +28,99 @@ from ..arrays import ArrowListArray, ArrowListDtype from .core import DataFrameReductionOperand, DataFrameReductionMixin, CustomReduction +cp = lazy_import("cupy", globals=globals(), rename="cp") cudf = lazy_import("cudf", globals=globals()) class NuniqueReduction(CustomReduction): pre_with_agg = True + post_with_agg = True def __init__( - self, name="unique", axis=0, dropna=True, use_arrow_dtype=False, is_gpu=False + self, name="nunique", axis=0, dropna=True, use_arrow_dtype=False, is_gpu=False ): super().__init__(name, is_gpu=is_gpu) self._axis = axis self._dropna = dropna self._use_arrow_dtype = use_arrow_dtype - def _drop_duplicates(self, xdf, value, explode=False): + def _get_modules(self): + if not self.is_gpu(): + return np, pd + else: # pragma: no cover + return cp, cudf + + def _drop_duplicates(self, value, explode=False, agg=False): + xp, xdf = self._get_modules() + if self._use_arrow_dtype and xp is not cp and hasattr(value, "to_numpy"): + value = value.to_numpy() + else: + value = value.values + if explode: - value = value.explode() + value = xp.concatenate(value) - if not self._use_arrow_dtype or xdf is cudf: - return [value.drop_duplicates().to_numpy()] + value = xdf.unique(value) + + if not agg: + if not self._use_arrow_dtype or xp is cp: + return [value] + else: + try: + return ArrowListArray([value]) + except pa.ArrowInvalid: + # fallback due to diverse dtypes + return [value] else: - try: - return ArrowListArray([value.drop_duplicates().to_numpy()]) - except pa.ArrowInvalid: - # fallback due to diverse dtypes - return [value.drop_duplicates().to_numpy()] + if self._dropna: + return xp.sum(xdf.notna(value)) + return len(value) def pre(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ - xdf = cudf if self.is_gpu() else pd + xp, xdf = self._get_modules() if isinstance(in_data, xdf.Series): - unique_values = self._drop_duplicates(xdf, in_data) - return xdf.Series(unique_values, name=in_data.name) + unique_values = self._drop_duplicates(in_data) + return xdf.Series(unique_values, name=in_data.name, dtype=object) else: if self._axis == 0: data = dict() for d, v in in_data.iteritems(): - data[d] = self._drop_duplicates(xdf, v) - df = xdf.DataFrame(data) + data[d] = self._drop_duplicates(v) + df = xdf.DataFrame(data, copy=False, dtype=object) else: df = xdf.DataFrame(columns=[0]) for d, v in in_data.iterrows(): - df.loc[d] = self._drop_duplicates(xdf, v) + df.loc[d] = self._drop_duplicates(v) return df def agg(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ - xdf = cudf if self.is_gpu() else pd + xp, xdf = self._get_modules() if isinstance(in_data, xdf.Series): - unique_values = self._drop_duplicates(xdf, in_data, explode=True) - return xdf.Series(unique_values, name=in_data.name) + unique_values = self._drop_duplicates(in_data, explode=True) + return xdf.Series(unique_values, name=in_data.name, dtype=object) else: if self._axis == 0: data = dict() for d, v in in_data.iteritems(): - if self._use_arrow_dtype and xdf is not cudf: - v = pd.Series(v.to_numpy()) - data[d] = self._drop_duplicates(xdf, v, explode=True) - df = xdf.DataFrame(data) + data[d] = self._drop_duplicates(v, explode=True) + df = xdf.DataFrame(data, copy=False, dtype=object) else: df = xdf.DataFrame(columns=[0]) for d, v in in_data.iterrows(): - df.loc[d] = self._drop_duplicates(xdf, v, explode=True) + df.loc[d] = self._drop_duplicates(v, explode=True) return df def post(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ - xdf = cudf if self.is_gpu() else pd + xp, xdf = self._get_modules() if isinstance(in_data, xdf.Series): - return in_data.explode().nunique(dropna=self._dropna) + return self._drop_duplicates(in_data, explode=True, agg=True) else: in_data_iter = ( in_data.iteritems() if self._axis == 0 else in_data.iterrows() ) data = dict() for d, v in in_data_iter: - if isinstance(v.dtype, ArrowListDtype): - v = xdf.Series(v.to_numpy()) - data[d] = v.explode().nunique(dropna=self._dropna) + data[d] = self._drop_duplicates(v, explode=True, agg=True) return xdf.Series(data) From 5c6758078186e5786fff6cad691adef09aad9258 Mon Sep 17 00:00:00 2001 From: Wenjun Si Date: Fri, 6 May 2022 16:12:57 +0800 Subject: [PATCH 5/5] Use shuffle when nunique is calculated --- mars/dataframe/groupby/aggregation.py | 62 ++++++++++++++++---- mars/dataframe/groupby/tests/test_groupby.py | 21 ------- mars/dataframe/merge/concat.py | 5 +- mars/dataframe/reduction/nunique.py | 20 +++++-- 4 files changed, 70 insertions(+), 38 deletions(-) diff --git a/mars/dataframe/groupby/aggregation.py b/mars/dataframe/groupby/aggregation.py index 31018729dc..f9ee600ed7 100644 --- a/mars/dataframe/groupby/aggregation.py +++ b/mars/dataframe/groupby/aggregation.py @@ -25,7 +25,7 @@ from ... import opcodes as OperandDef from ...config import options from ...core.custom_log import redirect_custom_log -from ...core import ENTITY_TYPE, OutputType +from ...core import ENTITY_TYPE, OutputType, recursive_tile from ...core.context import get_context from ...core.operand import OperandStage from ...serialization.serializables import ( @@ -64,6 +64,8 @@ _support_get_group_without_as_index = pd_release_version[:2] > (1, 0) +_FUNCS_PREFER_SHUFFLE = {"nunique"} + class SizeRecorder: def __init__(self): @@ -163,6 +165,8 @@ class DataFrameGroupByAgg(DataFrameOperand, DataFrameOperandMixin): method = StringField("method") use_inf_as_na = BoolField("use_inf_as_na") + map_on_shuffle = AnyField("map_on_shuffle") + # for chunk combine_size = Int32Field("combine_size") chunk_store_limit = Int64Field("chunk_store_limit") @@ -421,10 +425,29 @@ def _tile_with_shuffle( in_df: TileableType, out_df: TileableType, func_infos: ReductionSteps, + agg_chunks: List[ChunkType] = None, ): - # First, perform groupby and aggregation on each chunk. - agg_chunks = cls._gen_map_chunks(op, in_df.chunks, out_df, func_infos) - return cls._perform_shuffle(op, agg_chunks, in_df, out_df, func_infos) + if op.map_on_shuffle is None: + op.map_on_shuffle = all( + agg_fun.custom_reduction is None for agg_fun in func_infos.agg_funcs + ) + + if not op.map_on_shuffle: + groupby_params = op.groupby_params.copy() + selection = groupby_params.pop("selection", None) + groupby = in_df.groupby(**groupby_params) + if selection: + groupby = groupby[selection] + result = groupby.transform( + op.raw_func, _call_agg=True, index=out_df.index_value + ) + return (yield from recursive_tile(result)) + else: + # First, perform groupby and aggregation on each chunk. + agg_chunks = agg_chunks or cls._gen_map_chunks( + op, in_df.chunks, out_df, func_infos + ) + return cls._perform_shuffle(op, agg_chunks, in_df, out_df, func_infos) @classmethod def _perform_shuffle( @@ -624,8 +647,10 @@ def _tile_auto( else: # otherwise, use shuffle logger.debug("Choose shuffle method for groupby operand %s", op) - return cls._perform_shuffle( - op, chunks + left_chunks, in_df, out_df, func_infos + return ( + yield from cls._tile_with_shuffle( + op, in_df, out_df, func_infos, chunks + left_chunks + ) ) @classmethod @@ -638,12 +663,16 @@ def tile(cls, op: "DataFrameGroupByAgg"): func_infos = cls._compile_funcs(op, in_df) if op.method == "auto": - if len(in_df.chunks) <= op.combine_size: + if set(op.func) & _FUNCS_PREFER_SHUFFLE: + return ( + yield from cls._tile_with_shuffle(op, in_df, out_df, func_infos) + ) + elif len(in_df.chunks) <= op.combine_size: return cls._tile_with_tree(op, in_df, out_df, func_infos) else: return (yield from cls._tile_auto(op, in_df, out_df, func_infos)) if op.method == "shuffle": - return cls._tile_with_shuffle(op, in_df, out_df, func_infos) + return (yield from cls._tile_with_shuffle(op, in_df, out_df, func_infos)) elif op.method == "tree": return cls._tile_with_tree(op, in_df, out_df, func_infos) else: # pragma: no cover @@ -1075,7 +1104,15 @@ def execute(cls, ctx, op: "DataFrameGroupByAgg"): pd.reset_option("mode.use_inf_as_na") -def agg(groupby, func=None, method="auto", combine_size=None, *args, **kwargs): +def agg( + groupby, + func=None, + method="auto", + combine_size=None, + map_on_shuffle=None, + *args, + **kwargs, +): """ Aggregate using one or more operations on grouped data. @@ -1091,7 +1128,11 @@ def agg(groupby, func=None, method="auto", combine_size=None, *args, **kwargs): in distributed mode and use 'tree' in local mode. combine_size : int The number of chunks to combine when method is 'tree' - + map_on_shuffle : bool + When not specified, will decide whether to perform aggregation on the + map stage of shuffle (currently no aggregation when there is custom + reduction in functions). Otherwise, whether to call map on map stage + of shuffle is determined by the value. Returns ------- @@ -1138,5 +1179,6 @@ def agg(groupby, func=None, method="auto", combine_size=None, *args, **kwargs): combine_size=combine_size or options.combine_size, chunk_store_limit=options.chunk_store_limit, use_inf_as_na=use_inf_as_na, + map_on_shuffle=map_on_shuffle, ) return agg_op(groupby) diff --git a/mars/dataframe/groupby/tests/test_groupby.py b/mars/dataframe/groupby/tests/test_groupby.py index 83960da53f..50a5041da5 100644 --- a/mars/dataframe/groupby/tests/test_groupby.py +++ b/mars/dataframe/groupby/tests/test_groupby.py @@ -476,24 +476,3 @@ def test_groupby_fill(): assert len(r.chunks) == 4 assert r.shape == (len(s1),) assert r.chunks[0].shape == (np.nan,) - - -def test_groupby_nunique(): - df1 = pd.DataFrame( - [ - [1, 1, 10], - [1, 1, np.nan], - [1, 1, np.nan], - [1, 2, np.nan], - [1, 2, 20], - [1, 2, np.nan], - [1, 3, np.nan], - [1, 3, np.nan], - ], - columns=["one", "two", "three"], - ) - mdf = md.DataFrame(df1, chunk_size=3) - - r = tile(mdf.groupby(["one", "two"]).nunique()) - assert len(r.chunks) == 1 - assert isinstance(r.chunks[0].op, DataFrameGroupByAgg) diff --git a/mars/dataframe/merge/concat.py b/mars/dataframe/merge/concat.py index 7bb3cab721..e82d986b61 100644 --- a/mars/dataframe/merge/concat.py +++ b/mars/dataframe/merge/concat.py @@ -324,7 +324,10 @@ def _auto_concat_dataframe_chunks(chunk, inputs): ) if chunk.op.axis is not None: - return xdf.concat(inputs, axis=op.axis) + try: + return xdf.concat(inputs, axis=op.axis) + except: + raise # auto generated concat when executing a DataFrame if len(inputs) == 1: diff --git a/mars/dataframe/reduction/nunique.py b/mars/dataframe/reduction/nunique.py index 77dfd871cc..3ff1854e56 100644 --- a/mars/dataframe/reduction/nunique.py +++ b/mars/dataframe/reduction/nunique.py @@ -25,7 +25,7 @@ from ...config import options from ...serialization.serializables import BoolField from ...utils import lazy_import -from ..arrays import ArrowListArray, ArrowListDtype +from ..arrays import ArrowListArray from .core import DataFrameReductionOperand, DataFrameReductionMixin, CustomReduction cp = lazy_import("cupy", globals=globals(), rename="cp") @@ -52,18 +52,24 @@ def _get_modules(self): def _drop_duplicates(self, value, explode=False, agg=False): xp, xdf = self._get_modules() + use_arrow_dtype = self._use_arrow_dtype and xp is not cp if self._use_arrow_dtype and xp is not cp and hasattr(value, "to_numpy"): value = value.to_numpy() else: value = value.values if explode: + if len(value) == 0: + if not use_arrow_dtype: + return [xp.array([], dtype=object)] + else: + return [ArrowListArray([])] value = xp.concatenate(value) value = xdf.unique(value) if not agg: - if not self._use_arrow_dtype or xp is cp: + if not use_arrow_dtype: return [value] else: try: @@ -78,15 +84,16 @@ def _drop_duplicates(self, value, explode=False, agg=False): def pre(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ xp, xdf = self._get_modules() + out_dtype = object if not self._use_arrow_dtype or xp is cp else None if isinstance(in_data, xdf.Series): unique_values = self._drop_duplicates(in_data) - return xdf.Series(unique_values, name=in_data.name, dtype=object) + return xdf.Series(unique_values, name=in_data.name, dtype=out_dtype) else: if self._axis == 0: data = dict() for d, v in in_data.iteritems(): data[d] = self._drop_duplicates(v) - df = xdf.DataFrame(data, copy=False, dtype=object) + df = xdf.DataFrame(data, copy=False, dtype=out_dtype) else: df = xdf.DataFrame(columns=[0]) for d, v in in_data.iterrows(): @@ -95,15 +102,16 @@ def pre(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ def agg(self, in_data): # noqa: W0221 # pylint: disable=arguments-differ xp, xdf = self._get_modules() + out_dtype = object if not self._use_arrow_dtype or xp is cp else None if isinstance(in_data, xdf.Series): unique_values = self._drop_duplicates(in_data, explode=True) - return xdf.Series(unique_values, name=in_data.name, dtype=object) + return xdf.Series(unique_values, name=in_data.name, dtype=out_dtype) else: if self._axis == 0: data = dict() for d, v in in_data.iteritems(): data[d] = self._drop_duplicates(v, explode=True) - df = xdf.DataFrame(data, copy=False, dtype=object) + df = xdf.DataFrame(data, copy=False, dtype=out_dtype) else: df = xdf.DataFrame(columns=[0]) for d, v in in_data.iterrows():