diff --git a/bigframes/constants.py b/bigframes/constants.py index 0751501085..c6d8f3acc2 100644 --- a/bigframes/constants.py +++ b/bigframes/constants.py @@ -92,3 +92,6 @@ LEP_ENABLED_BIGQUERY_LOCATIONS = frozenset( ALL_BIGQUERY_LOCATIONS - REP_ENABLED_BIGQUERY_LOCATIONS ) + +# BigQuery default is 10000, leave 100 for overhead +MAX_COLUMNS = 9900 diff --git a/bigframes/core/block_transforms.py b/bigframes/core/block_transforms.py index c789b2a69c..1eae73014c 100644 --- a/bigframes/core/block_transforms.py +++ b/bigframes/core/block_transforms.py @@ -15,6 +15,7 @@ import functools import typing +from typing import Sequence import pandas as pd @@ -105,6 +106,39 @@ def indicate_duplicates( ) +def quantile( + block: blocks.Block, + columns: Sequence[str], + qs: Sequence[float], + grouping_column_ids: Sequence[str] = (), +) -> blocks.Block: + # TODO: handle windowing and more interpolation methods + window = core.WindowSpec( + grouping_keys=tuple(grouping_column_ids), + ) + quantile_cols = [] + labels = [] + if len(columns) * len(qs) > constants.MAX_COLUMNS: + raise NotImplementedError("Too many aggregates requested.") + for col in columns: + for q in qs: + label = block.col_id_to_label[col] + new_label = (*label, q) if isinstance(label, tuple) else (label, q) + labels.append(new_label) + block, quantile_col = block.apply_window_op( + col, + agg_ops.QuantileOp(q), + window_spec=window, + ) + quantile_cols.append(quantile_col) + block, results = block.aggregate( + grouping_column_ids, + tuple((col, agg_ops.AnyValueOp()) for col in quantile_cols), + dropna=True, + ) + return block.select_columns(results).with_column_labels(labels) + + def interpolate(block: blocks.Block, method: str = "linear") -> blocks.Block: supported_methods = [ "linear", diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 5b411e5416..f6850020df 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -1498,12 +1498,17 @@ def stack(self, how="left", levels: int = 1): row_label_tuples = utils.index_as_tuples(row_labels) - if col_labels is not None: + if col_labels is None: + result_index: pd.Index = pd.Index([None]) + result_col_labels: Sequence[Tuple] = list([()]) + elif (col_labels.nlevels == 1) and all( + col_labels.isna() + ): # isna not implemented for MultiIndex for newer pandas versions + result_index = pd.Index([None]) + result_col_labels = utils.index_as_tuples(col_labels.drop_duplicates()) + else: result_index = col_labels.drop_duplicates().dropna(how="all") result_col_labels = utils.index_as_tuples(result_index) - else: - result_index = pd.Index([None]) - result_col_labels = list([()]) # Get matching columns unpivot_columns: List[Tuple[str, List[str]]] = [] diff --git a/bigframes/core/compile/aggregate_compiler.py b/bigframes/core/compile/aggregate_compiler.py index ae21243506..98d296c779 100644 --- a/bigframes/core/compile/aggregate_compiler.py +++ b/bigframes/core/compile/aggregate_compiler.py @@ -148,6 +148,14 @@ def _( return cast(ibis_types.NumericValue, value) +@compile_unary_agg.register +@numeric_op +def _( + op: agg_ops.QuantileOp, column: ibis_types.NumericColumn, window=None +) -> ibis_types.NumericValue: + return _apply_window_if_present(column.quantile(op.q), window) + + @compile_unary_agg.register @numeric_op def _( diff --git a/bigframes/core/groupby/__init__.py b/bigframes/core/groupby/__init__.py index e2b28553c6..0f53342352 100644 --- a/bigframes/core/groupby/__init__.py +++ b/bigframes/core/groupby/__init__.py @@ -15,6 +15,7 @@ from __future__ import annotations import typing +from typing import Sequence, Union import bigframes_vendored.pandas.core.groupby as vendored_pandas_groupby import pandas as pd @@ -115,14 +116,35 @@ def mean(self, numeric_only: bool = False, *args) -> df.DataFrame: def median( self, numeric_only: bool = False, *, exact: bool = False ) -> df.DataFrame: - if exact: - raise NotImplementedError( - f"Only approximate median is supported. {constants.FEEDBACK_LINK}" - ) if not numeric_only: self._raise_on_non_numeric("median") + if exact: + return self.quantile(0.5) return self._aggregate_all(agg_ops.median_op, numeric_only=True) + def quantile( + self, q: Union[float, Sequence[float]] = 0.5, *, numeric_only: bool = False + ) -> df.DataFrame: + if not numeric_only: + self._raise_on_non_numeric("quantile") + q_cols = tuple( + col + for col in self._selected_cols + if self._column_type(col) in dtypes.NUMERIC_BIGFRAMES_TYPES_PERMISSIVE + ) + multi_q = utils.is_list_like(q) + result = block_ops.quantile( + self._block, + q_cols, + qs=tuple(q) if multi_q else (q,), # type: ignore + grouping_column_ids=self._by_col_ids, + ) + result_df = df.DataFrame(result) + if multi_q: + return result_df.stack() + else: + return result_df.droplevel(-1, 1) + def min(self, numeric_only: bool = False, *args) -> df.DataFrame: return self._aggregate_all(agg_ops.min_op, numeric_only=numeric_only) @@ -466,8 +488,31 @@ def sum(self, *args) -> series.Series: def mean(self, *args) -> series.Series: return self._aggregate(agg_ops.mean_op) - def median(self, *args, **kwargs) -> series.Series: - return self._aggregate(agg_ops.mean_op) + def median( + self, + *args, + exact: bool = False, + **kwargs, + ) -> series.Series: + if exact: + return self.quantile(0.5) + else: + return self._aggregate(agg_ops.median_op) + + def quantile( + self, q: Union[float, Sequence[float]] = 0.5, *, numeric_only: bool = False + ) -> series.Series: + multi_q = utils.is_list_like(q) + result = block_ops.quantile( + self._block, + (self._value_column,), + qs=tuple(q) if multi_q else (q,), # type: ignore + grouping_column_ids=self._by_col_ids, + ) + if multi_q: + return series.Series(result.stack()) + else: + return series.Series(result.stack()).droplevel(-1) def std(self, *args, **kwargs) -> series.Series: return self._aggregate(agg_ops.std_op) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 2deef95277..953a89c34f 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -2009,8 +2009,34 @@ def median( frame = self._raise_on_non_numeric("median") else: frame = self._drop_non_numeric() - block = frame._block.aggregate_all_and_stack(agg_ops.median_op) - return bigframes.series.Series(block.select_column("values")) + if exact: + return self.quantile() + else: + block = frame._block.aggregate_all_and_stack(agg_ops.median_op) + return bigframes.series.Series(block.select_column("values")) + + def quantile( + self, q: Union[float, Sequence[float]] = 0.5, *, numeric_only: bool = False + ): + if not numeric_only: + frame = self._raise_on_non_numeric("median") + else: + frame = self._drop_non_numeric() + multi_q = utils.is_list_like(q) + result = block_ops.quantile( + frame._block, frame._block.value_columns, qs=tuple(q) if multi_q else (q,) # type: ignore + ) + if multi_q: + return DataFrame(result.stack()).droplevel(0) + else: + result_df = ( + DataFrame(result) + .stack(list(range(0, frame.columns.nlevels))) + .droplevel(0) + ) + result_series = bigframes.series.Series(result_df._block) + result_series.name = q + return result_series def std( self, axis: typing.Union[str, int] = 0, *, numeric_only: bool = False diff --git a/bigframes/operations/aggregations.py b/bigframes/operations/aggregations.py index f33dc16e30..0d27d1d75d 100644 --- a/bigframes/operations/aggregations.py +++ b/bigframes/operations/aggregations.py @@ -109,6 +109,18 @@ def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionT return input_types[0] +@dataclasses.dataclass(frozen=True) +class QuantileOp(UnaryAggregateOp): + q: float + + @property + def name(self): + return f"{int(self.q*100)}%" + + def output_type(self, *input_types: dtypes.ExpressionType) -> dtypes.ExpressionType: + return signatures.UNARY_REAL_NUMERIC.output_type(input_types[0]) + + @dataclasses.dataclass(frozen=True) class ApproxQuartilesOp(UnaryAggregateOp): quartile: int diff --git a/bigframes/series.py b/bigframes/series.py index 2f9123f9a3..b834411bce 100644 --- a/bigframes/series.py +++ b/bigframes/series.py @@ -23,7 +23,7 @@ import os import textwrap import typing -from typing import Any, Literal, Mapping, Optional, Sequence, Tuple, Union +from typing import Any, cast, Literal, Mapping, Optional, Sequence, Tuple, Union import bigframes_vendored.pandas.core.series as vendored_pandas_series import google.cloud.bigquery as bigquery @@ -968,10 +968,19 @@ def mean(self) -> float: def median(self, *, exact: bool = False) -> float: if exact: - raise NotImplementedError( - f"Only approximate median is supported. {constants.FEEDBACK_LINK}" - ) - return typing.cast(float, self._apply_aggregation(agg_ops.median_op)) + return typing.cast(float, self.quantile(0.5)) + else: + return typing.cast(float, self._apply_aggregation(agg_ops.median_op)) + + def quantile(self, q: Union[float, Sequence[float]] = 0.5) -> Union[Series, float]: + qs = tuple(q) if utils.is_list_like(q) else (q,) + result = block_ops.quantile(self._block, (self._value_column,), qs=qs) + if utils.is_list_like(q): + result = result.stack() + result = result.drop_levels([result.index_columns[0]]) + return Series(result) + else: + return cast(float, Series(result).to_pandas().squeeze()) def sum(self) -> float: return typing.cast(float, self._apply_aggregation(agg_ops.sum_op)) diff --git a/tests/system/small/test_dataframe.py b/tests/system/small/test_dataframe.py index e70764fcc0..7fef7a9dc7 100644 --- a/tests/system/small/test_dataframe.py +++ b/tests/system/small/test_dataframe.py @@ -2504,7 +2504,10 @@ def test_df_melt_default(scalars_dfs): # Pandas produces int64 index, Bigframes produces Int64 (nullable) pd.testing.assert_frame_equal( - bf_result, pd_result, check_index_type=False, check_dtype=False + bf_result, + pd_result, + check_index_type=False, + check_dtype=False, ) @@ -3029,6 +3032,31 @@ def test_dataframe_aggregates_median(scalars_df_index, scalars_pandas_df_index): ) +def test_dataframe_aggregates_quantile_mono(scalars_df_index, scalars_pandas_df_index): + q = 0.45 + col_names = ["int64_too", "int64_col", "float64_col"] + bf_result = scalars_df_index[col_names].quantile(q=q).to_pandas() + pd_result = scalars_pandas_df_index[col_names].quantile(q=q) + + # Pandas may produce narrower numeric types, but bigframes always produces Float64 + pd_result = pd_result.astype("Float64") + + pd.testing.assert_series_equal(bf_result, pd_result, check_index_type=False) + + +def test_dataframe_aggregates_quantile_multi(scalars_df_index, scalars_pandas_df_index): + q = [0, 0.33, 0.67, 1.0] + col_names = ["int64_too", "int64_col", "float64_col"] + bf_result = scalars_df_index[col_names].quantile(q=q).to_pandas() + pd_result = scalars_pandas_df_index[col_names].quantile(q=q) + + # Pandas may produce narrower numeric types, but bigframes always produces Float64 + pd_result = pd_result.astype("Float64") + pd_result.index = pd_result.index.astype("Float64") + + pd.testing.assert_frame_equal(bf_result, pd_result) + + @pytest.mark.parametrize( ("op"), [ diff --git a/tests/system/small/test_groupby.py b/tests/system/small/test_groupby.py index ba79ba1ab1..7b36a06f49 100644 --- a/tests/system/small/test_groupby.py +++ b/tests/system/small/test_groupby.py @@ -65,6 +65,24 @@ def test_dataframe_groupby_median(scalars_df_index, scalars_pandas_df_index): assert ((pd_min <= bf_result_computed) & (bf_result_computed <= pd_max)).all().all() +@pytest.mark.parametrize( + ("q"), + [ + ([0.2, 0.4, 0.6, 0.8]), + (0.11), + ], +) +def test_dataframe_groupby_quantile(scalars_df_index, scalars_pandas_df_index, q): + col_names = ["int64_too", "float64_col", "int64_col", "string_col"] + bf_result = ( + scalars_df_index[col_names].groupby("string_col").quantile(q) + ).to_pandas() + pd_result = scalars_pandas_df_index[col_names].groupby("string_col").quantile(q) + pd.testing.assert_frame_equal( + pd_result, bf_result, check_dtype=False, check_index_type=False + ) + + @pytest.mark.parametrize( ("operator"), [ @@ -389,3 +407,20 @@ def test_dataframe_groupby_nonnumeric_with_mean(): pd.testing.assert_frame_equal( pd_result, bf_result, check_index_type=False, check_dtype=False ) + + +@pytest.mark.parametrize( + ("q"), + [ + ([0.2, 0.4, 0.6, 0.8]), + (0.11), + ], +) +def test_series_groupby_quantile(scalars_df_index, scalars_pandas_df_index, q): + bf_result = ( + scalars_df_index.groupby("string_col")["int64_col"].quantile(q) + ).to_pandas() + pd_result = scalars_pandas_df_index.groupby("string_col")["int64_col"].quantile(q) + pd.testing.assert_series_equal( + pd_result, bf_result, check_dtype=False, check_index_type=False + ) diff --git a/tests/system/small/test_series.py b/tests/system/small/test_series.py index d27cd0a236..87267696ba 100644 --- a/tests/system/small/test_series.py +++ b/tests/system/small/test_series.py @@ -1320,6 +1320,27 @@ def test_median(scalars_dfs): assert pd_min < bf_result < pd_max +def test_median_exact(scalars_dfs): + scalars_df, scalars_pandas_df = scalars_dfs + col_name = "int64_col" + bf_result = scalars_df[col_name].median(exact=True) + pd_result = scalars_pandas_df[col_name].median() + assert math.isclose(pd_result, bf_result) + + +def test_series_quantile(scalars_dfs): + scalars_df, scalars_pandas_df = scalars_dfs + col_name = "int64_col" + bf_series = scalars_df[col_name] + pd_series = scalars_pandas_df[col_name] + + pd_result = pd_series.quantile([0.0, 0.4, 0.6, 1.0]) + bf_result = bf_series.quantile([0.0, 0.4, 0.6, 1.0]) + pd.testing.assert_series_equal( + pd_result, bf_result.to_pandas(), check_dtype=False, check_index_type=False + ) + + def test_numeric_literal(scalars_dfs): scalars_df, _ = scalars_dfs col_name = "numeric_col" diff --git a/third_party/bigframes_vendored/ibis/backends/bigquery/registry.py b/third_party/bigframes_vendored/ibis/backends/bigquery/registry.py index 88826b31ce..fddeab19a2 100644 --- a/third_party/bigframes_vendored/ibis/backends/bigquery/registry.py +++ b/third_party/bigframes_vendored/ibis/backends/bigquery/registry.py @@ -3,6 +3,7 @@ import bigframes_vendored.ibis.expr.operations as vendored_ibis_ops from ibis.backends.bigquery.registry import OPERATION_REGISTRY +import ibis.expr.operations.reductions as ibis_reductions def _approx_quantiles(translator, op: vendored_ibis_ops.ApproximateMultiQuantile): @@ -31,12 +32,19 @@ def _generate_array(translator, op: vendored_ibis_ops.GenerateArray): return f"GENERATE_ARRAY(0, {arg})" +def _quantile(translator, op: ibis_reductions.Quantile): + arg = translator.translate(op.arg) + quantile = translator.translate(op.quantile) + return f"PERCENTILE_CONT({arg}, {quantile})" + + patched_ops = { vendored_ibis_ops.ApproximateMultiQuantile: _approx_quantiles, # type:ignore vendored_ibis_ops.FirstNonNullValue: _first_non_null_value, # type:ignore vendored_ibis_ops.LastNonNullValue: _last_non_null_value, # type:ignore vendored_ibis_ops.ToJsonString: _to_json_string, # type:ignore vendored_ibis_ops.GenerateArray: _generate_array, # type:ignore + ibis_reductions.Quantile: _quantile, # type:ignore } OPERATION_REGISTRY.update(patched_ops) diff --git a/third_party/bigframes_vendored/pandas/core/frame.py b/third_party/bigframes_vendored/pandas/core/frame.py index 6707dc1403..e894900646 100644 --- a/third_party/bigframes_vendored/pandas/core/frame.py +++ b/third_party/bigframes_vendored/pandas/core/frame.py @@ -4509,13 +4509,51 @@ def median(self, *, numeric_only: bool = False, exact: bool = False): Default False. Include only float, int, boolean columns. exact (bool. default False): Default False. Get the exact median instead of an approximate - one. Note: ``exact=True`` not yet supported. + one. Returns: bigframes.series.Series: Series with the median of values. """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + def quantile( + self, q: Union[float, Sequence[float]] = 0.5, *, numeric_only: bool = False + ): + """ + Return values at the given quantile over requested axis. + + **Examples:** + >>> import bigframes.pandas as bpd + >>> bpd.options.display.progress_bar = None + >>> df = bpd.DataFrame(np.array([[1, 1], [2, 10], [3, 100], [4, 100]]), + ... columns=['a', 'b']) + >>> df.quantile(.1) + a 1.3 + b 3.7 + Name: 0.1, dtype: Float64 + >>> df.quantile([.1, .5]) + a b + 0.1 1.3 3.7 + 0.5 2.5 55.0 + + [2 rows x 2 columns] + + Args: + q (float or array-like, default 0.5 (50% quantile)): + Value between 0 <= q <= 1, the quantile(s) to compute. + numeric_only (bool, default False): + Include only `float`, `int` or `boolean` data. + + Returns: + Series or DataFrame: + If ``q`` is an array, a DataFrame will be returned where the + index is ``q``, the columns are the columns of self, and the + values are the quantiles. + If ``q`` is a float, a Series will be returned where the + index is the columns of self and the values are the quantiles. + """ + raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + def var(self, axis=0, *, numeric_only: bool = False): """Return unbiased variance over requested axis. diff --git a/third_party/bigframes_vendored/pandas/core/groupby/__init__.py b/third_party/bigframes_vendored/pandas/core/groupby/__init__.py index ed4ca66f38..6310d7e271 100644 --- a/third_party/bigframes_vendored/pandas/core/groupby/__init__.py +++ b/third_party/bigframes_vendored/pandas/core/groupby/__init__.py @@ -85,6 +85,36 @@ def median( """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + def quantile(self, q=0.5, *, numeric_only: bool = False): + """ + Return group values at the given quantile, a la numpy.percentile. + + **Examples:** + >>> import bigframes.pandas as bpd + >>> bpd.options.display.progress_bar = None + >>> df = bpd.DataFrame([ + ... ['a', 1], ['a', 2], ['a', 3], + ... ['b', 1], ['b', 3], ['b', 5] + ... ], columns=['key', 'val']) + >>> df.groupby('key').quantile() + val + key + a 2.0 + b 3.0 + + [2 rows x 1 columns] + + Args: + q (float or array-like, default 0.5 (50% quantile)): + Value(s) between 0 and 1 providing the quantile(s) to compute. + numeric_only (bool, default False): + Include only `float`, `int` or `boolean` data. + + Returns: + Series or DataFrame: Return type determined by caller of GroupBy object. + """ + raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + def std( self, *, diff --git a/third_party/bigframes_vendored/pandas/core/series.py b/third_party/bigframes_vendored/pandas/core/series.py index 46bc9714f8..5e3b4c46ef 100644 --- a/third_party/bigframes_vendored/pandas/core/series.py +++ b/third_party/bigframes_vendored/pandas/core/series.py @@ -3,7 +3,16 @@ """ from __future__ import annotations -from typing import Hashable, IO, Literal, Mapping, Optional, Sequence, TYPE_CHECKING +from typing import ( + Hashable, + IO, + Literal, + Mapping, + Optional, + Sequence, + TYPE_CHECKING, + Union, +) from bigframes_vendored.pandas.core.generic import NDFrame import numpy @@ -3151,6 +3160,37 @@ def median(self, *, exact: bool = False): """ raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + def quantile( + self, + q: Union[float, Sequence[float]] = 0.5, + ) -> Union[Series, float]: + """ + Return value at the given quantile. + + **Examples:** + >>> import bigframes.pandas as bpd + >>> bpd.options.display.progress_bar = None + >>> s = bpd.Series([1, 2, 3, 4]) + >>> s.quantile(.5) + 2.5 + >>> s.quantile([.25, .5, .75]) + 0.25 1.75 + 0.5 2.5 + 0.75 3.25 + dtype: Float64 + + Args: + q (float or array-like, default 0.5 (50% quantile)): + The quantile(s) to compute, which can lie in range: 0 <= q <= 1. + + Returns: + float or Series: + If ``q`` is an array, a Series will be returned where the + index is ``q`` and the values are the quantiles, otherwise + a float will be returned. + """ + raise NotImplementedError(constants.ABSTRACT_METHOD_ERROR_MESSAGE) + def prod(self): """Return the product of the values over the requested axis. diff --git a/third_party/bigframes_vendored/pandas/plotting/_core.py b/third_party/bigframes_vendored/pandas/plotting/_core.py index 19f56965df..bf016357a6 100644 --- a/third_party/bigframes_vendored/pandas/plotting/_core.py +++ b/third_party/bigframes_vendored/pandas/plotting/_core.py @@ -11,6 +11,7 @@ class PlotAccessor: For Series: >>> import bigframes.pandas as bpd + >>> bpd.options.display.progress_bar = None >>> ser = bpd.Series([1, 2, 3, 3]) >>> plot = ser.plot(kind='hist', title="My plot") @@ -57,6 +58,7 @@ def hist( >>> import bigframes.pandas as bpd >>> import numpy as np + >>> bpd.options.display.progress_bar = None >>> df = bpd.DataFrame(np.random.randint(1, 7, 6000), columns=['one']) >>> df['two'] = np.random.randint(1, 7, 6000) + np.random.randint(1, 7, 6000) >>> ax = df.plot.hist(bins=12, alpha=0.5) @@ -93,6 +95,7 @@ def line( **Examples:** >>> import bigframes.pandas as bpd + >>> bpd.options.display.progress_bar = None >>> df = bpd.DataFrame( ... { ... 'one': [1, 2, 3, 4], @@ -160,6 +163,7 @@ def area( Draw an area plot based on basic business metrics: >>> import bigframes.pandas as bpd + >>> bpd.options.display.progress_bar = None >>> df = bpd.DataFrame( ... { ... 'sales': [3, 2, 3, 9, 10, 6],