diff --git a/bigframes/core/compile/scalar_op_compiler.py b/bigframes/core/compile/scalar_op_compiler.py index a65ff6fe0c..90025b3994 100644 --- a/bigframes/core/compile/scalar_op_compiler.py +++ b/bigframes/core/compile/scalar_op_compiler.py @@ -1298,22 +1298,36 @@ def coalesce_impl( return ibis.coalesce(x, y) -@scalar_op_compiler.register_binary_op(ops.cliplower_op) -def clip_lower( +@scalar_op_compiler.register_binary_op(ops.maximum_op) +def maximum_impl( value: ibis_types.Value, lower: ibis_types.Value, ): + # Note: propagates nulls return ibis.case().when(lower.isnull() | (value < lower), lower).else_(value).end() -@scalar_op_compiler.register_binary_op(ops.clipupper_op) -def clip_upper( +@scalar_op_compiler.register_binary_op(ops.minimum_op) +def minimum_impl( value: ibis_types.Value, upper: ibis_types.Value, ): + # Note: propagates nulls return ibis.case().when(upper.isnull() | (value > upper), upper).else_(value).end() +@scalar_op_compiler.register_binary_op(ops.BinaryRemoteFunctionOp, pass_op=True) +def binary_remote_function_op_impl( + x: ibis_types.Value, y: ibis_types.Value, op: ops.BinaryRemoteFunctionOp +): + if not hasattr(op.func, "bigframes_remote_function"): + raise TypeError( + f"only a bigframes remote function is supported as a callable. {constants.FEEDBACK_LINK}" + ) + x_transformed = op.func(x, y) + return x_transformed + + # Ternary Operations @scalar_op_compiler.register_ternary_op(ops.where_op) def where_op( diff --git a/bigframes/operations/__init__.py b/bigframes/operations/__init__.py index e52f488d38..6f99f71013 100644 --- a/bigframes/operations/__init__.py +++ b/bigframes/operations/__init__.py @@ -545,8 +545,8 @@ def output_type(self, *input_types): # Binary Ops fillna_op = create_binary_op(name="fillna", type_signature=op_typing.COERCE) -cliplower_op = create_binary_op(name="clip_lower", type_signature=op_typing.COERCE) -clipupper_op = create_binary_op(name="clip_upper", type_signature=op_typing.COERCE) +maximum_op = create_binary_op(name="maximum", type_signature=op_typing.COERCE) +minimum_op = create_binary_op(name="minimum", type_signature=op_typing.COERCE) coalesce_op = create_binary_op(name="coalesce", type_signature=op_typing.COERCE) @@ -587,6 +587,16 @@ def output_type(self, *input_types): raise TypeError(f"Cannot subtract dtypes {left_type} and {right_type}") +@dataclasses.dataclass(frozen=True) +class BinaryRemoteFunctionOp(BinaryOp): + name: typing.ClassVar[str] = "binary_remote_function" + func: typing.Callable + + def output_type(self, *input_types): + # This property should be set to a valid Dtype by the @remote_function decorator or read_gbq_function method + return self.func.output_dtype + + add_op = AddOp() sub_op = SubOp() mul_op = create_binary_op(name="mul", type_signature=op_typing.BINARY_NUMERIC) @@ -713,4 +723,6 @@ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionT np.divide: div_op, np.power: pow_op, np.arctan2: arctan2_op, + np.maximum: maximum_op, + np.minimum: minimum_op, } diff --git a/bigframes/series.py b/bigframes/series.py index 313380e4a4..d1fb0d679b 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -1031,9 +1031,9 @@ def clip(self, lower, upper): if lower is None and upper is None: return self if lower is None: - return self._apply_binary_op(upper, ops.clipupper_op, alignment="left") + return self._apply_binary_op(upper, ops.minimum_op, alignment="left") if upper is None: - return self._apply_binary_op(lower, ops.cliplower_op, alignment="left") + return self._apply_binary_op(lower, ops.maximum_op, alignment="left") value_id, lower_id, upper_id, block = self._align3(lower, upper) block, result_id = block.apply_ternary_op( value_id, lower_id, upper_id, ops.clip_op @@ -1374,6 +1374,38 @@ def apply( materialized_series = result_series._cached() return materialized_series + def combine( + self, + other, + func, + ) -> Series: + if not callable(func): + raise ValueError( + "Only a ufunc (a function that applies to the entire Series) or a remote function that only works on single values are supported." + ) + + if not hasattr(func, "bigframes_remote_function"): + # Keep this in sync with .apply + try: + return func(self, other) + except Exception as ex: + # This could happen if any of the operators in func is not + # supported on a Series. Let's guide the customer to use a + # remote function instead + if hasattr(ex, "message"): + ex.message += f"\n{_remote_function_recommendation_message}" + raise + + reprojected_series = Series(self._block._force_reproject()) + result_series = reprojected_series._apply_binary_op( + other, ops.BinaryRemoteFunctionOp(func=func) + ) + + # return Series with materialized result so that any error in the remote + # function is caught early + materialized_series = result_series._cached() + return materialized_series + def add_prefix(self, prefix: str, axis: int | str | None = None) -> Series: return Series(self._get_block().add_prefix(prefix)) diff --git a/tests/system/large/test_remote_function.py b/tests/system/large/test_remote_function.py index eb2a0884fe..9df59c1b9b 100644 --- a/tests/system/large/test_remote_function.py +++ b/tests/system/large/test_remote_function.py @@ -219,6 +219,41 @@ def stringify(x): ) +# @pytest.mark.flaky(retries=2, delay=120) +def test_remote_function_binop(session, scalars_dfs, dataset_id, bq_cf_connection): + try: + + def func(x, y): + return x * abs(y % 4) + + remote_func = session.remote_function( + [str, int], + str, + dataset_id, + bq_cf_connection, + reuse=False, + )(func) + + scalars_df, scalars_pandas_df = scalars_dfs + + scalars_df = scalars_df.dropna() + scalars_pandas_df = scalars_pandas_df.dropna() + bf_result = ( + scalars_df["string_col"] + .combine(scalars_df["int64_col"], remote_func) + .to_pandas() + ) + pd_result = scalars_pandas_df["string_col"].combine( + scalars_pandas_df["int64_col"], func + ) + pandas.testing.assert_series_equal(bf_result, pd_result) + finally: + # clean up the gcp assets created for the remote function + cleanup_remote_function_assets( + session.bqclient, session.cloudfunctionsclient, remote_func + ) + + @pytest.mark.flaky(retries=2, delay=120) def test_remote_function_decorator_with_bigframes_series( session, scalars_dfs, dataset_id, bq_cf_connection diff --git a/tests/system/small/test_numpy.py b/tests/system/small/test_numpy.py index 8e349e472a..8f62d9628c 100644 --- a/tests/system/small/test_numpy.py +++ b/tests/system/small/test_numpy.py @@ -73,27 +73,6 @@ def test_df_ufuncs(scalars_dfs, opname): pd.testing.assert_frame_equal(bf_result, pd_result) -@pytest.mark.parametrize( - ("opname",), - [ - ("add",), - ("subtract",), - ("multiply",), - ("divide",), - ("power",), - ("arctan2",), - ], -) -def test_series_binary_ufuncs(floats_product_pd, floats_product_bf, opname): - bf_result = getattr(np, opname)( - floats_product_bf.float64_col_x, floats_product_bf.float64_col_y - ).to_pandas() - pd_result = getattr(np, opname)( - floats_product_pd.float64_col_x, floats_product_pd.float64_col_y - ) - pd.testing.assert_series_equal(bf_result, pd_result) - - @pytest.mark.parametrize( ("opname",), [ @@ -106,17 +85,16 @@ def test_series_binary_ufuncs(floats_product_pd, floats_product_bf, opname): ) def test_df_binary_ufuncs(scalars_dfs, opname): scalars_df, scalars_pandas_df = scalars_dfs + op = getattr(np, opname) - bf_result = getattr(np, opname)( - scalars_df[["float64_col", "int64_col"]], 5.1 - ).to_pandas() - pd_result = getattr(np, opname)( - scalars_pandas_df[["float64_col", "int64_col"]], 5.1 - ) + bf_result = op(scalars_df[["float64_col", "int64_col"]], 5.1).to_pandas() + pd_result = op(scalars_pandas_df[["float64_col", "int64_col"]], 5.1) pd.testing.assert_frame_equal(bf_result, pd_result) +# Operations tested here don't work on full dataframe in numpy+pandas +# Maybe because of nullable dtypes? @pytest.mark.parametrize( ("x", "y"), [ @@ -124,12 +102,25 @@ def test_df_binary_ufuncs(scalars_dfs, opname): ("float64_col", "int64_col"), ], ) -def test_series_atan2(scalars_dfs, x, y): - # Test atan2 separately as pandas errors when passing entire df as input, so pass only series +@pytest.mark.parametrize( + ("opname",), + [ + ("add",), + ("subtract",), + ("multiply",), + ("divide",), + ("arctan2",), + ("minimum",), + ("maximum",), + ], +) +def test_series_binary_ufuncs(scalars_dfs, x, y, opname): scalars_df, scalars_pandas_df = scalars_dfs - bf_result = np.arctan2(scalars_df[x], scalars_df[y]).to_pandas() - pd_result = np.arctan2(scalars_pandas_df[x], scalars_pandas_df[y]) + op = getattr(np, opname) + + bf_result = op(scalars_df[x], scalars_df[y]).to_pandas() + pd_result = op(scalars_pandas_df[x], scalars_pandas_df[y]) pd.testing.assert_series_equal(bf_result, pd_result) diff --git a/tests/system/small/test_series.py b/tests/system/small/test_series.py index beb99b1ada..fa514784c0 100644 --- a/tests/system/small/test_series.py +++ b/tests/system/small/test_series.py @@ -3509,6 +3509,41 @@ def test_apply_numpy_ufunc(scalars_dfs, ufunc): assert_series_equal(bf_result, pd_result) +@pytest.mark.parametrize( + ("ufunc",), + [ + pytest.param(numpy.add), + pytest.param(numpy.divide), + ], + ids=[ + "add", + "divide", + ], +) +def test_combine_series_ufunc(scalars_dfs, ufunc): + scalars_df, scalars_pandas_df = scalars_dfs + + bf_col = scalars_df["int64_col"].dropna() + bf_result = bf_col.combine(bf_col, ufunc).to_pandas() + + pd_col = scalars_pandas_df["int64_col"].dropna() + pd_result = pd_col.combine(pd_col, ufunc) + + assert_series_equal(bf_result, pd_result, check_dtype=False) + + +def test_combine_scalar_ufunc(scalars_dfs): + scalars_df, scalars_pandas_df = scalars_dfs + + bf_col = scalars_df["int64_col"].dropna() + bf_result = bf_col.combine(2.5, numpy.add).to_pandas() + + pd_col = scalars_pandas_df["int64_col"].dropna() + pd_result = pd_col.combine(2.5, numpy.add) + + assert_series_equal(bf_result, pd_result, check_dtype=False) + + def test_apply_simple_udf(scalars_dfs): scalars_df, scalars_pandas_df = scalars_dfs diff --git a/third_party/bigframes_vendored/pandas/core/series.py b/third_party/bigframes_vendored/pandas/core/series.py index e155fb073a..585e20275c 100644 --- a/third_party/bigframes_vendored/pandas/core/series.py +++ b/third_party/bigframes_vendored/pandas/core/series.py @@ -1279,6 +1279,62 @@ def apply( """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + def combine( + self, + other: Series | Hashable, + func, + ) -> Series: + """ + Combine the Series with a Series or scalar according to `func`. + + Combine the Series and `other` using `func` to perform elementwise + selection for combined Series. + `fill_value` is assumed when value is missing at some index + from one of the two objects being combined. + + **Examples:** + + >>> import bigframes.pandas as bpd + >>> import numpy as np + >>> bpd.options.display.progress_bar = None + + Consider 2 Datasets ``s1`` and ``s2`` containing + highest clocked speeds of different birds. + + >>> s1 = bpd.Series({'falcon': 330.0, 'eagle': 160.0}) + >>> s1 + falcon 330.0 + eagle 160.0 + dtype: Float64 + >>> s2 = bpd.Series({'falcon': 345.0, 'eagle': 200.0, 'duck': 30.0}) + >>> s2 + falcon 345.0 + eagle 200.0 + duck 30.0 + dtype: Float64 + + Now, to combine the two datasets and view the highest speeds + of the birds across the two datasets + + >>> s1.combine(s2, np.maximum) + falcon 345.0 + eagle 200.0 + duck + dtype: Float64 + + Args: + other (Series or scalar): + The value(s) to be combined with the `Series`. + func (function): + BigFrames DataFrames ``remote_function`` to apply. + Takes two scalars as inputs and returns an element. + Also accepts some numpy binary functions. + + Returns: + Series: The result of combining the Series with the other object. + """ + raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + def groupby( self, by=None,