From 05aae56dca2bddb2982216ca05c1050d0a6dd562 Mon Sep 17 00:00:00 2001 From: WhiteSource Renovate Date: Wed, 17 Aug 2022 16:29:57 +0200 Subject: [PATCH 1/4] chore(deps): update dependency google-cloud-monitoring to v2.11.1 (#635) --- samples/metricscaler/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/metricscaler/requirements.txt b/samples/metricscaler/requirements.txt index b4ba5b8d1..dc8a48fb2 100644 --- a/samples/metricscaler/requirements.txt +++ b/samples/metricscaler/requirements.txt @@ -1,2 +1,2 @@ google-cloud-bigtable==2.11.1 -google-cloud-monitoring==2.11.0 +google-cloud-monitoring==2.11.1 From e8624600c57972fcb6a4d0fb2ed7805db4f5046b Mon Sep 17 00:00:00 2001 From: Igor Bernstein Date: Wed, 17 Aug 2022 13:48:32 -0400 Subject: [PATCH 2/4] chore: move row value classes out of row_data (#633) * chore: move row value classes out of row_data This is in preparation for extracting row merging into a separate class. See https://github.com/googleapis/python-bigtable/pull/628 Co-authored-by: Anthonios Partheniou --- google/cloud/bigtable/row.py | 252 +++++++++++++++++++++++++++++ google/cloud/bigtable/row_data.py | 257 +----------------------------- 2 files changed, 257 insertions(+), 252 deletions(-) diff --git a/google/cloud/bigtable/row.py b/google/cloud/bigtable/row.py index 9127a1aae..752458a08 100644 --- a/google/cloud/bigtable/row.py +++ b/google/cloud/bigtable/row.py @@ -28,6 +28,15 @@ MAX_MUTATIONS = 100000 """The maximum number of mutations that a row can accumulate.""" +_MISSING_COLUMN_FAMILY = "Column family {} is not among the cells stored in this row." +_MISSING_COLUMN = ( + "Column {} is not among the cells stored in this row in the column family {}." +) +_MISSING_INDEX = ( + "Index {!r} is not valid for the cells stored in this row for column {} " + "in the column family {}. There are {} such cells." +) + class Row(object): """Base representation of a Google Cloud Bigtable Row. @@ -1013,3 +1022,246 @@ def _parse_family_pb(family_pb): cells.append(val_pair) return family_pb.name, result + + +class PartialRowData(object): + """Representation of partial row in a Google Cloud Bigtable Table. + + These are expected to be updated directly from a + :class:`._generated.bigtable_service_messages_pb2.ReadRowsResponse` + + :type row_key: bytes + :param row_key: The key for the row holding the (partial) data. + """ + + def __init__(self, row_key): + self._row_key = row_key + self._cells = {} + + def __eq__(self, other): + if not isinstance(other, self.__class__): + return NotImplemented + return other._row_key == self._row_key and other._cells == self._cells + + def __ne__(self, other): + return not self == other + + def to_dict(self): + """Convert the cells to a dictionary. + + This is intended to be used with HappyBase, so the column family and + column qualiers are combined (with ``:``). + + :rtype: dict + :returns: Dictionary containing all the data in the cells of this row. + """ + result = {} + for column_family_id, columns in self._cells.items(): + for column_qual, cells in columns.items(): + key = _to_bytes(column_family_id) + b":" + _to_bytes(column_qual) + result[key] = cells + return result + + @property + def cells(self): + """Property returning all the cells accumulated on this partial row. + + For example: + + .. literalinclude:: snippets_table.py + :start-after: [START bigtable_api_row_data_cells] + :end-before: [END bigtable_api_row_data_cells] + :dedent: 4 + + :rtype: dict + :returns: Dictionary of the :class:`Cell` objects accumulated. This + dictionary has two-levels of keys (first for column families + and second for column names/qualifiers within a family). For + a given column, a list of :class:`Cell` objects is stored. + """ + return self._cells + + @property + def row_key(self): + """Getter for the current (partial) row's key. + + :rtype: bytes + :returns: The current (partial) row's key. + """ + return self._row_key + + def find_cells(self, column_family_id, column): + """Get a time series of cells stored on this instance. + + For example: + + .. literalinclude:: snippets_table.py + :start-after: [START bigtable_api_row_find_cells] + :end-before: [END bigtable_api_row_find_cells] + :dedent: 4 + + Args: + column_family_id (str): The ID of the column family. Must be of the + form ``[_a-zA-Z0-9][-_.a-zA-Z0-9]*``. + column (bytes): The column within the column family where the cells + are located. + + Returns: + List[~google.cloud.bigtable.row_data.Cell]: The cells stored in the + specified column. + + Raises: + KeyError: If ``column_family_id`` is not among the cells stored + in this row. + KeyError: If ``column`` is not among the cells stored in this row + for the given ``column_family_id``. + """ + try: + column_family = self._cells[column_family_id] + except KeyError: + raise KeyError(_MISSING_COLUMN_FAMILY.format(column_family_id)) + + try: + cells = column_family[column] + except KeyError: + raise KeyError(_MISSING_COLUMN.format(column, column_family_id)) + + return cells + + def cell_value(self, column_family_id, column, index=0): + """Get a single cell value stored on this instance. + + For example: + + .. literalinclude:: snippets_table.py + :start-after: [START bigtable_api_row_cell_value] + :end-before: [END bigtable_api_row_cell_value] + :dedent: 4 + + Args: + column_family_id (str): The ID of the column family. Must be of the + form ``[_a-zA-Z0-9][-_.a-zA-Z0-9]*``. + column (bytes): The column within the column family where the cell + is located. + index (Optional[int]): The offset within the series of values. If + not specified, will return the first cell. + + Returns: + ~google.cloud.bigtable.row_data.Cell value: The cell value stored + in the specified column and specified index. + + Raises: + KeyError: If ``column_family_id`` is not among the cells stored + in this row. + KeyError: If ``column`` is not among the cells stored in this row + for the given ``column_family_id``. + IndexError: If ``index`` cannot be found within the cells stored + in this row for the given ``column_family_id``, ``column`` + pair. + """ + cells = self.find_cells(column_family_id, column) + + try: + cell = cells[index] + except (TypeError, IndexError): + num_cells = len(cells) + msg = _MISSING_INDEX.format(index, column, column_family_id, num_cells) + raise IndexError(msg) + + return cell.value + + def cell_values(self, column_family_id, column, max_count=None): + """Get a time series of cells stored on this instance. + + For example: + + .. literalinclude:: snippets_table.py + :start-after: [START bigtable_api_row_cell_values] + :end-before: [END bigtable_api_row_cell_values] + :dedent: 4 + + Args: + column_family_id (str): The ID of the column family. Must be of the + form ``[_a-zA-Z0-9][-_.a-zA-Z0-9]*``. + column (bytes): The column within the column family where the cells + are located. + max_count (int): The maximum number of cells to use. + + Returns: + A generator which provides: cell.value, cell.timestamp_micros + for each cell in the list of cells + + Raises: + KeyError: If ``column_family_id`` is not among the cells stored + in this row. + KeyError: If ``column`` is not among the cells stored in this row + for the given ``column_family_id``. + """ + cells = self.find_cells(column_family_id, column) + if max_count is None: + max_count = len(cells) + + for index, cell in enumerate(cells): + if index == max_count: + break + + yield cell.value, cell.timestamp_micros + + +class Cell(object): + """Representation of a Google Cloud Bigtable Cell. + + :type value: bytes + :param value: The value stored in the cell. + + :type timestamp_micros: int + :param timestamp_micros: The timestamp_micros when the cell was stored. + + :type labels: list + :param labels: (Optional) List of strings. Labels applied to the cell. + """ + + def __init__(self, value, timestamp_micros, labels=None): + self.value = value + self.timestamp_micros = timestamp_micros + self.labels = list(labels) if labels is not None else [] + + @classmethod + def from_pb(cls, cell_pb): + """Create a new cell from a Cell protobuf. + + :type cell_pb: :class:`._generated.data_pb2.Cell` + :param cell_pb: The protobuf to convert. + + :rtype: :class:`Cell` + :returns: The cell corresponding to the protobuf. + """ + if cell_pb.labels: + return cls(cell_pb.value, cell_pb.timestamp_micros, labels=cell_pb.labels) + else: + return cls(cell_pb.value, cell_pb.timestamp_micros) + + @property + def timestamp(self): + return _datetime_from_microseconds(self.timestamp_micros) + + def __eq__(self, other): + if not isinstance(other, self.__class__): + return NotImplemented + return ( + other.value == self.value + and other.timestamp_micros == self.timestamp_micros + and other.labels == self.labels + ) + + def __ne__(self, other): + return not self == other + + def __repr__(self): + return "<{name} value={value!r} timestamp={timestamp}>".format( + name=self.__class__.__name__, value=self.value, timestamp=self.timestamp + ) + + +class InvalidChunk(RuntimeError): + """Exception raised to invalid chunk data from back-end.""" diff --git a/google/cloud/bigtable/row_data.py b/google/cloud/bigtable/row_data.py index 62ef5a201..e7d3d5bd4 100644 --- a/google/cloud/bigtable/row_data.py +++ b/google/cloud/bigtable/row_data.py @@ -21,74 +21,15 @@ from google.api_core import exceptions from google.api_core import retry -from google.cloud._helpers import _datetime_from_microseconds # type: ignore from google.cloud._helpers import _to_bytes # type: ignore from google.cloud.bigtable_v2.types import bigtable as data_messages_v2_pb2 from google.cloud.bigtable_v2.types import data as data_v2_pb2 +from google.cloud.bigtable.row import Cell, InvalidChunk, PartialRowData -_MISSING_COLUMN_FAMILY = "Column family {} is not among the cells stored in this row." -_MISSING_COLUMN = ( - "Column {} is not among the cells stored in this row in the " "column family {}." -) -_MISSING_INDEX = ( - "Index {!r} is not valid for the cells stored in this row for column {} " - "in the column family {}. There are {} such cells." -) - - -class Cell(object): - """Representation of a Google Cloud Bigtable Cell. - - :type value: bytes - :param value: The value stored in the cell. - - :type timestamp_micros: int - :param timestamp_micros: The timestamp_micros when the cell was stored. - - :type labels: list - :param labels: (Optional) List of strings. Labels applied to the cell. - """ - - def __init__(self, value, timestamp_micros, labels=None): - self.value = value - self.timestamp_micros = timestamp_micros - self.labels = list(labels) if labels is not None else [] - - @classmethod - def from_pb(cls, cell_pb): - """Create a new cell from a Cell protobuf. - - :type cell_pb: :class:`._generated.data_pb2.Cell` - :param cell_pb: The protobuf to convert. - - :rtype: :class:`Cell` - :returns: The cell corresponding to the protobuf. - """ - if cell_pb.labels: - return cls(cell_pb.value, cell_pb.timestamp_micros, labels=cell_pb.labels) - else: - return cls(cell_pb.value, cell_pb.timestamp_micros) - - @property - def timestamp(self): - return _datetime_from_microseconds(self.timestamp_micros) - - def __eq__(self, other): - if not isinstance(other, self.__class__): - return NotImplemented - return ( - other.value == self.value - and other.timestamp_micros == self.timestamp_micros - and other.labels == self.labels - ) - - def __ne__(self, other): - return not self == other - - def __repr__(self): - return "<{name} value={value!r} timestamp={timestamp}>".format( - name=self.__class__.__name__, value=self.value, timestamp=self.timestamp - ) +# Some classes need to be re-exported here to keep backwards +# compatibility. Those classes were moved to row_merger, but we dont want to +# break enduser's imports. This hack, ensures they don't get marked as unused. +_ = (Cell, InvalidChunk, PartialRowData) class PartialCellData(object): @@ -136,198 +77,10 @@ def append_value(self, value): self.value += value -class PartialRowData(object): - """Representation of partial row in a Google Cloud Bigtable Table. - - These are expected to be updated directly from a - :class:`._generated.bigtable_service_messages_pb2.ReadRowsResponse` - - :type row_key: bytes - :param row_key: The key for the row holding the (partial) data. - """ - - def __init__(self, row_key): - self._row_key = row_key - self._cells = {} - - def __eq__(self, other): - if not isinstance(other, self.__class__): - return NotImplemented - return other._row_key == self._row_key and other._cells == self._cells - - def __ne__(self, other): - return not self == other - - def to_dict(self): - """Convert the cells to a dictionary. - - This is intended to be used with HappyBase, so the column family and - column qualiers are combined (with ``:``). - - :rtype: dict - :returns: Dictionary containing all the data in the cells of this row. - """ - result = {} - for column_family_id, columns in self._cells.items(): - for column_qual, cells in columns.items(): - key = _to_bytes(column_family_id) + b":" + _to_bytes(column_qual) - result[key] = cells - return result - - @property - def cells(self): - """Property returning all the cells accumulated on this partial row. - - For example: - - .. literalinclude:: snippets_table.py - :start-after: [START bigtable_api_row_data_cells] - :end-before: [END bigtable_api_row_data_cells] - :dedent: 4 - - :rtype: dict - :returns: Dictionary of the :class:`Cell` objects accumulated. This - dictionary has two-levels of keys (first for column families - and second for column names/qualifiers within a family). For - a given column, a list of :class:`Cell` objects is stored. - """ - return self._cells - - @property - def row_key(self): - """Getter for the current (partial) row's key. - - :rtype: bytes - :returns: The current (partial) row's key. - """ - return self._row_key - - def find_cells(self, column_family_id, column): - """Get a time series of cells stored on this instance. - - For example: - - .. literalinclude:: snippets_table.py - :start-after: [START bigtable_api_row_find_cells] - :end-before: [END bigtable_api_row_find_cells] - :dedent: 4 - - Args: - column_family_id (str): The ID of the column family. Must be of the - form ``[_a-zA-Z0-9][-_.a-zA-Z0-9]*``. - column (bytes): The column within the column family where the cells - are located. - - Returns: - List[~google.cloud.bigtable.row_data.Cell]: The cells stored in the - specified column. - - Raises: - KeyError: If ``column_family_id`` is not among the cells stored - in this row. - KeyError: If ``column`` is not among the cells stored in this row - for the given ``column_family_id``. - """ - try: - column_family = self._cells[column_family_id] - except KeyError: - raise KeyError(_MISSING_COLUMN_FAMILY.format(column_family_id)) - - try: - cells = column_family[column] - except KeyError: - raise KeyError(_MISSING_COLUMN.format(column, column_family_id)) - - return cells - - def cell_value(self, column_family_id, column, index=0): - """Get a single cell value stored on this instance. - - For example: - - .. literalinclude:: snippets_table.py - :start-after: [START bigtable_api_row_cell_value] - :end-before: [END bigtable_api_row_cell_value] - :dedent: 4 - - Args: - column_family_id (str): The ID of the column family. Must be of the - form ``[_a-zA-Z0-9][-_.a-zA-Z0-9]*``. - column (bytes): The column within the column family where the cell - is located. - index (Optional[int]): The offset within the series of values. If - not specified, will return the first cell. - - Returns: - ~google.cloud.bigtable.row_data.Cell value: The cell value stored - in the specified column and specified index. - - Raises: - KeyError: If ``column_family_id`` is not among the cells stored - in this row. - KeyError: If ``column`` is not among the cells stored in this row - for the given ``column_family_id``. - IndexError: If ``index`` cannot be found within the cells stored - in this row for the given ``column_family_id``, ``column`` - pair. - """ - cells = self.find_cells(column_family_id, column) - - try: - cell = cells[index] - except (TypeError, IndexError): - num_cells = len(cells) - msg = _MISSING_INDEX.format(index, column, column_family_id, num_cells) - raise IndexError(msg) - - return cell.value - - def cell_values(self, column_family_id, column, max_count=None): - """Get a time series of cells stored on this instance. - - For example: - - .. literalinclude:: snippets_table.py - :start-after: [START bigtable_api_row_cell_values] - :end-before: [END bigtable_api_row_cell_values] - :dedent: 4 - - Args: - column_family_id (str): The ID of the column family. Must be of the - form ``[_a-zA-Z0-9][-_.a-zA-Z0-9]*``. - column (bytes): The column within the column family where the cells - are located. - max_count (int): The maximum number of cells to use. - - Returns: - A generator which provides: cell.value, cell.timestamp_micros - for each cell in the list of cells - - Raises: - KeyError: If ``column_family_id`` is not among the cells stored - in this row. - KeyError: If ``column`` is not among the cells stored in this row - for the given ``column_family_id``. - """ - cells = self.find_cells(column_family_id, column) - if max_count is None: - max_count = len(cells) - - for index, cell in enumerate(cells): - if index == max_count: - break - - yield cell.value, cell.timestamp_micros - - class InvalidReadRowsResponse(RuntimeError): """Exception raised to invalid response data from back-end.""" -class InvalidChunk(RuntimeError): - """Exception raised to invalid chunk data from back-end.""" - - class InvalidRetryRequest(RuntimeError): """Exception raised when retry request is invalid.""" From c71ec70e55f6e236e46127870a9ed4717eee5da5 Mon Sep 17 00:00:00 2001 From: Igor Bernstein Date: Wed, 17 Aug 2022 16:04:52 -0400 Subject: [PATCH 3/4] perf: optimize row merging (#628) This PR rewrites the row merging logic to be more correct and improve performance: - extract row merging logic into its own class to simplify complexity of ReadRows handling - Use OrderedDict instead of dict() for `{family: { qualifier: [] }}` data, this should maintain serverside ordering (family in creation order and qualifier in lexiographical). - define an explicit state machine with states implemented as methods - add various optimizations like: - __slots__ on hot objects to avoid dict lookups - avoiding dict lookups for contiguous family and qualifier keys Overall this improves performance by 20% and in my opinion is a lot more readable --- google/cloud/bigtable/row_data.py | 206 +++++------------------ google/cloud/bigtable/row_merger.py | 250 ++++++++++++++++++++++++++++ tests/unit/test_row_data.py | 141 +++------------- tests/unit/test_row_merger.py | 152 +++++++++++++++++ 4 files changed, 470 insertions(+), 279 deletions(-) create mode 100644 google/cloud/bigtable/row_merger.py diff --git a/google/cloud/bigtable/row_data.py b/google/cloud/bigtable/row_data.py index e7d3d5bd4..a50fab1ee 100644 --- a/google/cloud/bigtable/row_data.py +++ b/google/cloud/bigtable/row_data.py @@ -18,45 +18,25 @@ import copy import grpc # type: ignore - +import warnings from google.api_core import exceptions from google.api_core import retry from google.cloud._helpers import _to_bytes # type: ignore + +from google.cloud.bigtable.row_merger import _RowMerger, _State from google.cloud.bigtable_v2.types import bigtable as data_messages_v2_pb2 from google.cloud.bigtable_v2.types import data as data_v2_pb2 from google.cloud.bigtable.row import Cell, InvalidChunk, PartialRowData + # Some classes need to be re-exported here to keep backwards # compatibility. Those classes were moved to row_merger, but we dont want to # break enduser's imports. This hack, ensures they don't get marked as unused. _ = (Cell, InvalidChunk, PartialRowData) -class PartialCellData(object): - """Representation of partial cell in a Google Cloud Bigtable Table. - - These are expected to be updated directly from a - :class:`._generated.bigtable_service_messages_pb2.ReadRowsResponse` - - :type row_key: bytes - :param row_key: The key for the row holding the (partial) cell. - - :type family_name: str - :param family_name: The family name of the (partial) cell. - - :type qualifier: bytes - :param qualifier: The column qualifier of the (partial) cell. - - :type timestamp_micros: int - :param timestamp_micros: The timestamp (in microsecods) of the - (partial) cell. - - :type labels: list of str - :param labels: labels assigned to the (partial) cell - - :type value: bytes - :param value: The (accumulated) value of the (partial) cell. - """ +class PartialCellData(object): # pragma: NO COVER + """This class is no longer used and will be removed in the future""" def __init__( self, row_key, family_name, qualifier, timestamp_micros, labels=(), value=b"" @@ -69,11 +49,6 @@ def __init__( self.value = value def append_value(self, value): - """Append bytes from a new chunk to value. - - :type value: bytes - :param value: bytes to append - """ self.value += value @@ -168,14 +143,7 @@ class PartialRowsData(object): def __init__(self, read_method, request, retry=DEFAULT_RETRY_READ_ROWS): # Counter for rows returned to the user self._counter = 0 - # In-progress row, unset until first response, after commit/reset - self._row = None - # Last complete row, unset until first commit - self._previous_row = None - # In-progress cell, unset until first response, after completion - self._cell = None - # Last complete cell, unset until first completion, after new row - self._previous_cell = None + self._row_merger = _RowMerger() # May be cached from previous response self.last_scanned_row_key = None @@ -192,20 +160,35 @@ def __init__(self, read_method, request, retry=DEFAULT_RETRY_READ_ROWS): self.response_iterator = read_method(request, timeout=self.retry._deadline + 1) self.rows = {} - self._state = self.STATE_NEW_ROW # Flag to stop iteration, for any reason not related to self.retry() self._cancelled = False @property - def state(self): - """State machine state. - - :rtype: str - :returns: name of state corresponding to current row / chunk - processing. + def state(self): # pragma: NO COVER + """ + DEPRECATED: this property is deprecated and will be removed in the + future. """ - return self.read_states[self._state] + warnings.warn( + "`PartialRowsData#state()` is deprecated and will be removed in the future", + DeprecationWarning, + stacklevel=2, + ) + + # Best effort: try to map internal RowMerger states to old strings for + # backwards compatibility + internal_state = self._row_merger.state + if internal_state == _State.ROW_START: + return self.NEW_ROW + # note: _State.CELL_START, _State.CELL_COMPLETE are transient states + # and will not be visible in between chunks + elif internal_state == _State.CELL_IN_PROGRESS: + return self.CELL_IN_PROGRESS + elif internal_state == _State.ROW_COMPLETE: + return self.NEW_ROW + else: + raise RuntimeError("unexpected internal state: " + self._) def cancel(self): """Cancels the iterator, closing the stream.""" @@ -241,6 +224,7 @@ def _on_error(self, exc): if self.last_scanned_row_key: retry_request = self._create_retry_request() + self._row_merger = _RowMerger(self._row_merger.last_seen_row_key) self.response_iterator = self.read_method(retry_request) def _read_next(self): @@ -266,125 +250,23 @@ def __iter__(self): try: response = self._read_next_response() except StopIteration: - if self.state != self.NEW_ROW: - raise ValueError("The row remains partial / is not committed.") + self._row_merger.finalize() break except InvalidRetryRequest: self._cancelled = True break - for chunk in response.chunks: + for row in self._row_merger.process_chunks(response): + self.last_scanned_row_key = self._row_merger.last_seen_row_key + self._counter += 1 + + yield row + if self._cancelled: break - self._process_chunk(chunk) - if chunk.commit_row: - self.last_scanned_row_key = self._previous_row.row_key - self._counter += 1 - yield self._previous_row - - resp_last_key = response.last_scanned_row_key - if resp_last_key and resp_last_key > self.last_scanned_row_key: - self.last_scanned_row_key = resp_last_key - - def _process_chunk(self, chunk): - if chunk.reset_row: - self._validate_chunk_reset_row(chunk) - self._row = None - self._cell = self._previous_cell = None - self._state = self.STATE_NEW_ROW - return - - self._update_cell(chunk) - - if self._row is None: - if ( - self._previous_row is not None - and self._cell.row_key <= self._previous_row.row_key - ): - raise InvalidChunk() - self._row = PartialRowData(self._cell.row_key) - - if chunk.value_size == 0: - self._state = self.STATE_ROW_IN_PROGRESS - self._save_current_cell() - else: - self._state = self.STATE_CELL_IN_PROGRESS - - if chunk.commit_row: - if chunk.value_size > 0: - raise InvalidChunk() - - self._previous_row = self._row - self._row = None - self._previous_cell = None - self._state = self.STATE_NEW_ROW - - def _update_cell(self, chunk): - if self._cell is None: - qualifier = None - if chunk.HasField("qualifier"): - qualifier = chunk.qualifier.value - - family = None - if chunk.HasField("family_name"): - family = chunk.family_name.value - - self._cell = PartialCellData( - chunk.row_key, - family, - qualifier, - chunk.timestamp_micros, - chunk.labels, - chunk.value, - ) - self._copy_from_previous(self._cell) - self._validate_cell_data_new_cell() - else: - self._cell.append_value(chunk.value) - - def _validate_cell_data_new_cell(self): - cell = self._cell - if not cell.row_key or not cell.family_name or cell.qualifier is None: - raise InvalidChunk() - - prev = self._previous_cell - if prev and prev.row_key != cell.row_key: - raise InvalidChunk() - - def _validate_chunk_reset_row(self, chunk): - # No reset for new row - _raise_if(self._state == self.STATE_NEW_ROW) - - # No reset with other keys - _raise_if(chunk.row_key) - _raise_if(chunk.HasField("family_name")) - _raise_if(chunk.HasField("qualifier")) - _raise_if(chunk.timestamp_micros) - _raise_if(chunk.labels) - _raise_if(chunk.value_size) - _raise_if(chunk.value) - _raise_if(chunk.commit_row) - - def _save_current_cell(self): - """Helper for :meth:`consume_next`.""" - row, cell = self._row, self._cell - family = row._cells.setdefault(cell.family_name, {}) - qualified = family.setdefault(cell.qualifier, []) - complete = Cell.from_pb(cell) - qualified.append(complete) - self._cell, self._previous_cell = None, cell - - def _copy_from_previous(self, cell): - """Helper for :meth:`consume_next`.""" - previous = self._previous_cell - if previous is not None: - if not cell.row_key: - cell.row_key = previous.row_key - if not cell.family_name: - cell.family_name = previous.family_name - # NOTE: ``cell.qualifier`` **can** be empty string. - if cell.qualifier is None: - cell.qualifier = previous.qualifier + # The last response might not have generated any rows, but it + # could've updated last_scanned_row_key + self.last_scanned_row_key = self._row_merger.last_seen_row_key class _ReadRowsRequestManager(object): @@ -494,9 +376,3 @@ def _start_key_set(row_range): def _end_key_set(row_range): """Helper for :meth:`_filter_row_ranges`""" return row_range.end_key_open or row_range.end_key_closed - - -def _raise_if(predicate, *args): - """Helper for validation methods.""" - if predicate: - raise InvalidChunk(*args) diff --git a/google/cloud/bigtable/row_merger.py b/google/cloud/bigtable/row_merger.py new file mode 100644 index 000000000..515b91df7 --- /dev/null +++ b/google/cloud/bigtable/row_merger.py @@ -0,0 +1,250 @@ +from enum import Enum +from collections import OrderedDict +from google.cloud.bigtable.row import Cell, PartialRowData, InvalidChunk + +_MISSING_COLUMN_FAMILY = "Column family {} is not among the cells stored in this row." +_MISSING_COLUMN = ( + "Column {} is not among the cells stored in this row in the column family {}." +) +_MISSING_INDEX = ( + "Index {!r} is not valid for the cells stored in this row for column {} " + "in the column family {}. There are {} such cells." +) + + +class _State(Enum): + ROW_START = "ROW_START" + CELL_START = "CELL_START" + CELL_IN_PROGRESS = "CELL_IN_PROGRESS" + CELL_COMPLETE = "CELL_COMPLETE" + ROW_COMPLETE = "ROW_COMPLETE" + + +class _PartialRow(object): + __slots__ = [ + "row_key", + "cells", + "last_family", + "last_family_cells", + "last_qualifier", + "last_qualifier_cells", + "cell", + ] + + def __init__(self, row_key): + self.row_key = row_key + self.cells = OrderedDict() + + self.last_family = None + self.last_family_cells = OrderedDict() + self.last_qualifier = None + self.last_qualifier_cells = [] + + self.cell = None + + +class _PartialCell(object): + __slots__ = ["family", "qualifier", "timestamp", "labels", "value", "value_index"] + + def __init__(self): + self.family = None + self.qualifier = None + self.timestamp = None + self.labels = None + self.value = None + self.value_index = 0 + + +class _RowMerger(object): + """ + State machine to merge chunks from a response stream into logical rows. + + The implementation is a fairly linear state machine that is implemented as + a method for every state in the _State enum. In general the states flow + from top to bottom with some repetition. Each state handler will do some + sanity checks, update in progress data and set the next state. + + There can be multiple state transitions for each chunk, i.e. a single chunk + row will flow from ROW_START -> CELL_START -> CELL_COMPLETE -> ROW_COMPLETE + in a single iteration. + """ + + __slots__ = ["state", "last_seen_row_key", "row"] + + def __init__(self, last_seen_row=b""): + self.last_seen_row_key = last_seen_row + self.state = _State.ROW_START + self.row = None + + def process_chunks(self, response): + """ + Process the chunks in the given response and yield logical rows. + This class will maintain state across multiple response protos. + """ + if response.last_scanned_row_key: + if self.last_seen_row_key >= response.last_scanned_row_key: + raise InvalidChunk("Last scanned row key is out of order") + self.last_seen_row_key = response.last_scanned_row_key + + for chunk in response.chunks: + if chunk.reset_row: + self._handle_reset(chunk) + continue + + if self.state == _State.ROW_START: + self._handle_row_start(chunk) + + if self.state == _State.CELL_START: + self._handle_cell_start(chunk) + + if self.state == _State.CELL_IN_PROGRESS: + self._handle_cell_in_progress(chunk) + + if self.state == _State.CELL_COMPLETE: + self._handle_cell_complete(chunk) + + if self.state == _State.ROW_COMPLETE: + yield self._handle_row_complete(chunk) + elif chunk.commit_row: + raise InvalidChunk( + f"Chunk tried to commit row in wrong state (${self.state})" + ) + + def _handle_reset(self, chunk): + if self.state == _State.ROW_START: + raise InvalidChunk("Bare reset") + if chunk.row_key: + raise InvalidChunk("Reset chunk has a row key") + if chunk.HasField("family_name"): + raise InvalidChunk("Reset chunk has family_name") + if chunk.HasField("qualifier"): + raise InvalidChunk("Reset chunk has qualifier") + if chunk.timestamp_micros: + raise InvalidChunk("Reset chunk has a timestamp") + if chunk.labels: + raise InvalidChunk("Reset chunk has labels") + if chunk.value: + raise InvalidChunk("Reset chunk has a value") + + self.state = _State.ROW_START + self.row = None + + def _handle_row_start(self, chunk): + if not chunk.row_key: + raise InvalidChunk("New row is missing a row key") + if self.last_seen_row_key and self.last_seen_row_key >= chunk.row_key: + raise InvalidChunk("Out of order row keys") + + self.row = _PartialRow(chunk.row_key) + self.state = _State.CELL_START + + def _handle_cell_start(self, chunk): + # Ensure that all chunks after the first one either are missing a row + # key or the row is the same + if self.row.cells and chunk.row_key and chunk.row_key != self.row.row_key: + raise InvalidChunk("row key changed mid row") + + if not self.row.cell: + self.row.cell = _PartialCell() + + # Cells can inherit family/qualifier from previous cells + # However if the family changes, then qualifier must be specified as well + if chunk.HasField("family_name"): + self.row.cell.family = chunk.family_name.value + self.row.cell.qualifier = None + if not self.row.cell.family: + raise InvalidChunk("missing family for a new cell") + + if chunk.HasField("qualifier"): + self.row.cell.qualifier = chunk.qualifier.value + if self.row.cell.qualifier is None: + raise InvalidChunk("missing qualifier for a new cell") + + self.row.cell.timestamp = chunk.timestamp_micros + self.row.cell.labels = chunk.labels + + if chunk.value_size > 0: + # explicitly avoid pre-allocation as it seems that bytearray + # concatenation performs better than slice copies. + self.row.cell.value = bytearray() + self.state = _State.CELL_IN_PROGRESS + else: + self.row.cell.value = chunk.value + self.state = _State.CELL_COMPLETE + + def _handle_cell_in_progress(self, chunk): + # if this isn't the first cell chunk, make sure that everything except + # the value stayed constant. + if self.row.cell.value_index > 0: + if chunk.row_key: + raise InvalidChunk("found row key mid cell") + if chunk.HasField("family_name"): + raise InvalidChunk("In progress cell had a family name") + if chunk.HasField("qualifier"): + raise InvalidChunk("In progress cell had a qualifier") + if chunk.timestamp_micros: + raise InvalidChunk("In progress cell had a timestamp") + if chunk.labels: + raise InvalidChunk("In progress cell had labels") + + self.row.cell.value += chunk.value + self.row.cell.value_index += len(chunk.value) + + if chunk.value_size > 0: + self.state = _State.CELL_IN_PROGRESS + else: + self.row.cell.value = bytes(self.row.cell.value) + self.state = _State.CELL_COMPLETE + + def _handle_cell_complete(self, chunk): + # since we are guaranteed that all family & qualifier cells are + # contiguous, we can optimize away the dict lookup by caching the last + # family/qualifier and simply comparing and appending + family_changed = False + if self.row.last_family != self.row.cell.family: + family_changed = True + self.row.last_family = self.row.cell.family + self.row.cells[ + self.row.cell.family + ] = self.row.last_family_cells = OrderedDict() + + if family_changed or self.row.last_qualifier != self.row.cell.qualifier: + self.row.last_qualifier = self.row.cell.qualifier + self.row.last_family_cells[ + self.row.cell.qualifier + ] = self.row.last_qualifier_cells = [] + + self.row.last_qualifier_cells.append( + Cell( + self.row.cell.value, + self.row.cell.timestamp, + self.row.cell.labels, + ) + ) + + self.row.cell.timestamp = 0 + self.row.cell.value = None + self.row.cell.value_index = 0 + + if not chunk.commit_row: + self.state = _State.CELL_START + else: + self.state = _State.ROW_COMPLETE + + def _handle_row_complete(self, chunk): + new_row = PartialRowData(self.row.row_key) + new_row._cells = self.row.cells + + self.last_seen_row_key = new_row.row_key + self.row = None + self.state = _State.ROW_START + + return new_row + + def finalize(self): + """ + Must be called at the end of the stream to ensure there are no unmerged + rows. + """ + if self.row or self.state != _State.ROW_START: + raise ValueError("The row remains partial / is not committed.") diff --git a/tests/unit/test_row_data.py b/tests/unit/test_row_data.py index 9175bf479..382a81ef1 100644 --- a/tests/unit/test_row_data.py +++ b/tests/unit/test_row_data.py @@ -630,78 +630,6 @@ def test_partial_rows_data_cancel_between_chunks(): assert list(yrd) == [] -# 'consume_next' tested via 'TestPartialRowsData_JSON_acceptance_tests' - - -def test_partial_rows_data__copy_from_previous_unset(): - client = _Client() - client._data_stub = mock.MagicMock() - request = object() - yrd = _make_partial_rows_data(client._data_stub.read_rows, request) - cell = _PartialCellData() - yrd._copy_from_previous(cell) - assert cell.row_key == b"" - assert cell.family_name == "" - assert cell.qualifier is None - assert cell.timestamp_micros == 0 - assert cell.labels == [] - - -def test_partial_rows_data__copy_from_previous_blank(): - ROW_KEY = "RK" - FAMILY_NAME = "A" - QUALIFIER = b"C" - TIMESTAMP_MICROS = 100 - LABELS = ["L1", "L2"] - client = _Client() - client._data_stub = mock.MagicMock() - request = object() - yrd = _make_partial_rows_data(client._data_stub.ReadRows, request) - cell = _PartialCellData( - row_key=ROW_KEY, - family_name=FAMILY_NAME, - qualifier=QUALIFIER, - timestamp_micros=TIMESTAMP_MICROS, - labels=LABELS, - ) - yrd._previous_cell = _PartialCellData() - yrd._copy_from_previous(cell) - assert cell.row_key == ROW_KEY - assert cell.family_name == FAMILY_NAME - assert cell.qualifier == QUALIFIER - assert cell.timestamp_micros == TIMESTAMP_MICROS - assert cell.labels == LABELS - - -def test_partial_rows_data__copy_from_previous_filled(): - from google.cloud.bigtable_v2.services.bigtable import BigtableClient - - ROW_KEY = "RK" - FAMILY_NAME = "A" - QUALIFIER = b"C" - TIMESTAMP_MICROS = 100 - LABELS = ["L1", "L2"] - client = _Client() - data_api = mock.create_autospec(BigtableClient) - client._data_stub = data_api - request = object() - yrd = _make_partial_rows_data(client._data_stub.read_rows, request) - yrd._previous_cell = _PartialCellData( - row_key=ROW_KEY, - family_name=FAMILY_NAME, - qualifier=QUALIFIER, - timestamp_micros=TIMESTAMP_MICROS, - labels=LABELS, - ) - cell = _PartialCellData() - yrd._copy_from_previous(cell) - assert cell.row_key == ROW_KEY - assert cell.family_name == FAMILY_NAME - assert cell.qualifier == QUALIFIER - assert cell.timestamp_micros == 0 - assert cell.labels == [] - - def test_partial_rows_data_valid_last_scanned_row_key_on_start(): client = _Client() response = _ReadRowsResponseV2([], last_scanned_row_key=b"2.AFTER") @@ -732,38 +660,36 @@ def test_partial_rows_data_invalid_empty_chunk(): def test_partial_rows_data_state_cell_in_progress(): - from google.cloud.bigtable_v2.services.bigtable import BigtableClient - from google.cloud.bigtable_v2.types import bigtable as messages_v2_pb2 - - LABELS = ["L1", "L2"] - - request = object() - client = _Client() - client._data_stub = mock.create_autospec(BigtableClient) - yrd = _make_partial_rows_data(client._data_stub.read_rows, request) - - chunk = _ReadRowsResponseCellChunkPB( - row_key=ROW_KEY, - family_name=FAMILY_NAME, - qualifier=QUALIFIER, - timestamp_micros=TIMESTAMP_MICROS, - value=VALUE, - labels=LABELS, + labels = ["L1", "L2"] + resp = _ReadRowsResponseV2( + [ + _ReadRowsResponseCellChunkPB( + row_key=ROW_KEY, + family_name=FAMILY_NAME, + qualifier=QUALIFIER, + timestamp_micros=TIMESTAMP_MICROS, + value=VALUE, + value_size=(2 * len(VALUE)), + labels=labels, + ), + _ReadRowsResponseCellChunkPB(value=VALUE, commit_row=True), + ] ) - # _update_cell expects to be called after the protoplus wrapper has been - # shucked - chunk = messages_v2_pb2.ReadRowsResponse.CellChunk.pb(chunk) - yrd._update_cell(chunk) - more_cell_data = _ReadRowsResponseCellChunkPB(value=VALUE) - yrd._update_cell(more_cell_data) + def fake_read(*args, **kwargs): + return iter([resp]) + + yrd = _make_partial_rows_data(fake_read, None) + yrd.consume_all() - assert yrd._cell.row_key == ROW_KEY - assert yrd._cell.family_name == FAMILY_NAME - assert yrd._cell.qualifier == QUALIFIER - assert yrd._cell.timestamp_micros == TIMESTAMP_MICROS - assert yrd._cell.labels == LABELS - assert yrd._cell.value == VALUE + VALUE + expected_row = _make_partial_row_data(ROW_KEY) + expected_row._cells = { + QUALIFIER: [ + _make_cell( + value=(VALUE + VALUE), timestamp_micros=TIMESTAMP_MICROS, labels=labels + ) + ] + } def test_partial_rows_data_yield_rows_data(): @@ -1215,19 +1141,6 @@ def next(self): __next__ = next -class _PartialCellData(object): - - row_key = b"" - family_name = "" - qualifier = None - timestamp_micros = 0 - last_scanned_row_key = "" - - def __init__(self, **kw): - self.labels = kw.pop("labels", []) - self.__dict__.update(kw) - - def _ReadRowsResponseV2(chunks, last_scanned_row_key=b""): from google.cloud.bigtable_v2.types import bigtable as messages_v2_pb2 diff --git a/tests/unit/test_row_merger.py b/tests/unit/test_row_merger.py index f336a82ff..483c04536 100644 --- a/tests/unit/test_row_merger.py +++ b/tests/unit/test_row_merger.py @@ -7,6 +7,7 @@ from google.cloud.bigtable.row_data import PartialRowsData, PartialRowData, InvalidChunk from google.cloud.bigtable_v2.types.bigtable import ReadRowsResponse +from google.cloud.bigtable.row_merger import _RowMerger # TODO: autogenerate protos from @@ -76,3 +77,154 @@ def fake_read(*args, **kwargs): for expected, actual in zip_longest(test_case.results, actual_results): assert actual == expected + + +def test_out_of_order_rows(): + row_merger = _RowMerger(last_seen_row=b"z") + with pytest.raises(InvalidChunk): + list(row_merger.process_chunks(ReadRowsResponse(last_scanned_row_key=b"a"))) + + +def test_bare_reset(): + first_chunk = ReadRowsResponse.CellChunk( + ReadRowsResponse.CellChunk( + row_key=b"a", family_name="f", qualifier=b"q", value=b"v" + ) + ) + with pytest.raises(InvalidChunk): + _process_chunks( + first_chunk, + ReadRowsResponse.CellChunk( + ReadRowsResponse.CellChunk(reset_row=True, row_key=b"a") + ), + ) + with pytest.raises(InvalidChunk): + _process_chunks( + first_chunk, + ReadRowsResponse.CellChunk( + ReadRowsResponse.CellChunk(reset_row=True, family_name="f") + ), + ) + with pytest.raises(InvalidChunk): + _process_chunks( + first_chunk, + ReadRowsResponse.CellChunk( + ReadRowsResponse.CellChunk(reset_row=True, qualifier=b"q") + ), + ) + with pytest.raises(InvalidChunk): + _process_chunks( + first_chunk, + ReadRowsResponse.CellChunk( + ReadRowsResponse.CellChunk(reset_row=True, timestamp_micros=1000) + ), + ) + with pytest.raises(InvalidChunk): + _process_chunks( + first_chunk, + ReadRowsResponse.CellChunk( + ReadRowsResponse.CellChunk(reset_row=True, labels=["a"]) + ), + ) + with pytest.raises(InvalidChunk): + _process_chunks( + first_chunk, + ReadRowsResponse.CellChunk( + ReadRowsResponse.CellChunk(reset_row=True, value=b"v") + ), + ) + + +def test_missing_family(): + with pytest.raises(InvalidChunk): + _process_chunks( + ReadRowsResponse.CellChunk( + row_key=b"a", + qualifier=b"q", + timestamp_micros=1000, + value=b"v", + commit_row=True, + ) + ) + + +def test_mid_cell_row_key_change(): + with pytest.raises(InvalidChunk): + _process_chunks( + ReadRowsResponse.CellChunk( + row_key=b"a", + family_name="f", + qualifier=b"q", + timestamp_micros=1000, + value_size=2, + value=b"v", + ), + ReadRowsResponse.CellChunk(row_key=b"b", value=b"v", commit_row=True), + ) + + +def test_mid_cell_family_change(): + with pytest.raises(InvalidChunk): + _process_chunks( + ReadRowsResponse.CellChunk( + row_key=b"a", + family_name="f", + qualifier=b"q", + timestamp_micros=1000, + value_size=2, + value=b"v", + ), + ReadRowsResponse.CellChunk(family_name="f2", value=b"v", commit_row=True), + ) + + +def test_mid_cell_qualifier_change(): + with pytest.raises(InvalidChunk): + _process_chunks( + ReadRowsResponse.CellChunk( + row_key=b"a", + family_name="f", + qualifier=b"q", + timestamp_micros=1000, + value_size=2, + value=b"v", + ), + ReadRowsResponse.CellChunk(qualifier=b"q2", value=b"v", commit_row=True), + ) + + +def test_mid_cell_timestamp_change(): + with pytest.raises(InvalidChunk): + _process_chunks( + ReadRowsResponse.CellChunk( + row_key=b"a", + family_name="f", + qualifier=b"q", + timestamp_micros=1000, + value_size=2, + value=b"v", + ), + ReadRowsResponse.CellChunk( + timestamp_micros=2000, value=b"v", commit_row=True + ), + ) + + +def test_mid_cell_labels_change(): + with pytest.raises(InvalidChunk): + _process_chunks( + ReadRowsResponse.CellChunk( + row_key=b"a", + family_name="f", + qualifier=b"q", + timestamp_micros=1000, + value_size=2, + value=b"v", + ), + ReadRowsResponse.CellChunk(labels=["b"], value=b"v", commit_row=True), + ) + + +def _process_chunks(*chunks): + req = ReadRowsResponse.pb(ReadRowsResponse(chunks=chunks)) + return list(_RowMerger().process_chunks(req)) From bdc7929b948f40b83dccb88ddb57312df230989a Mon Sep 17 00:00:00 2001 From: "release-please[bot]" <55107282+release-please[bot]@users.noreply.github.com> Date: Wed, 17 Aug 2022 13:50:26 -0700 Subject: [PATCH 4/4] chore(main): release 2.11.3 (#638) Co-authored-by: release-please[bot] <55107282+release-please[bot]@users.noreply.github.com> --- CHANGELOG.md | 7 +++++++ setup.py | 2 +- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4b635e9c2..5783517a7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,13 @@ [1]: https://pypi.org/project/google-cloud-bigtable/#history +## [2.11.3](https://github.com/googleapis/python-bigtable/compare/v2.11.2...v2.11.3) (2022-08-17) + + +### Performance Improvements + +* optimize row merging ([#628](https://github.com/googleapis/python-bigtable/issues/628)) ([c71ec70](https://github.com/googleapis/python-bigtable/commit/c71ec70e55f6e236e46127870a9ed4717eee5da5)) + ## [2.11.2](https://github.com/googleapis/python-bigtable/compare/v2.11.1...v2.11.2) (2022-08-11) diff --git a/setup.py b/setup.py index af5b7359b..2c98c154b 100644 --- a/setup.py +++ b/setup.py @@ -22,7 +22,7 @@ name = "google-cloud-bigtable" description = "Google Cloud Bigtable API client library" -version = "2.11.2" +version = "2.11.3" # Should be one of: # 'Development Status :: 3 - Alpha' # 'Development Status :: 4 - Beta'