Skip to content
Prev Previous commit
Next Next commit
fixed typing issues
  • Loading branch information
daniel-sanche committed Oct 26, 2023
commit e31d765fd03bf4495c7cbdbe57a8c04ca7339bb7
12 changes: 5 additions & 7 deletions google/cloud/bigtable/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#

from typing import List, Tuple

from google.cloud.bigtable import gapic_version as package_version

from google.cloud.bigtable.data._async.client import BigtableDataClientAsync
Expand Down Expand Up @@ -44,10 +41,10 @@
from google.cloud.bigtable.data.exceptions import MutationsExceptionGroup
from google.cloud.bigtable.data.exceptions import ShardedReadRowsExceptionGroup

# Type alias for the output of sample_keys
RowKeySamples = List[Tuple[bytes, int]]
# type alias for the output of query.shard()
ShardedQuery = List[ReadRowsQuery]
from google.cloud.bigtable.data._helpers import TABLE_DEFAULT
from google.cloud.bigtable.data._helpers import RowKeySamples
from google.cloud.bigtable.data._helpers import ShardedQuery


__version__: str = package_version.__version__

Expand All @@ -74,4 +71,5 @@
"MutationsExceptionGroup",
"ShardedReadRowsExceptionGroup",
"ShardedQuery",
"TABLE_DEFAULT",
)
32 changes: 9 additions & 23 deletions google/cloud/bigtable/data/_async/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
AsyncIterable,
Optional,
Set,
Literal,
TYPE_CHECKING,
)

Expand All @@ -32,9 +31,7 @@
import sys
import random
import os
import enum

from collections import namedtuple

from google.cloud.bigtable_v2.services.bigtable.client import BigtableClientMeta
from google.cloud.bigtable_v2.services.bigtable.async_client import BigtableAsyncClient
Expand All @@ -61,37 +58,26 @@

from google.cloud.bigtable.data.mutations import Mutation, RowMutationEntry
from google.cloud.bigtable.data._async._mutate_rows import _MutateRowsOperationAsync
from google.cloud.bigtable.data._helpers import TABLE_DEFAULT
from google.cloud.bigtable.data._helpers import _WarmedInstanceKey
from google.cloud.bigtable.data._helpers import _CONCURRENCY_LIMIT
from google.cloud.bigtable.data._helpers import _make_metadata
from google.cloud.bigtable.data._helpers import _convert_retry_deadline
from google.cloud.bigtable.data._helpers import _validate_timeouts
from google.cloud.bigtable.data._helpers import _get_timeouts
from google.cloud.bigtable.data._helpers import _attempt_timeout_generator
from google.cloud.bigtable.data._async.mutations_batcher import MutationsBatcherAsync
from google.cloud.bigtable.data._async.mutations_batcher import _MB_SIZE
from google.cloud.bigtable.data._helpers import _attempt_timeout_generator

from google.cloud.bigtable.data.read_modify_write_rules import ReadModifyWriteRule
from google.cloud.bigtable.data.row_filters import RowFilter
from google.cloud.bigtable.data.row_filters import StripValueTransformerFilter
from google.cloud.bigtable.data.row_filters import CellsRowLimitFilter
from google.cloud.bigtable.data.row_filters import RowFilterChain

if TYPE_CHECKING:
from google.cloud.bigtable.data import RowKeySamples
from google.cloud.bigtable.data import ShardedQuery

# used by read_rows_sharded to limit how many requests are attempted in parallel
_CONCURRENCY_LIMIT = 10

# used to register instance data with the client for channel warming
_WarmedInstanceKey = namedtuple(
"_WarmedInstanceKey", ["instance_name", "table_name", "app_profile_id"]
)


class TABLE_DEFAULT(enum.Enum):
DEFAULT = "DEFAULT"
READ_ROWS = "READ_ROWS_DEFAULT"
MUTATE_ROWS = "MUTATE_ROWS_DEFAULT"
if TYPE_CHECKING:
from google.cloud.bigtable.data._types import RowKeySamples
from google.cloud.bigtable.data._types import ShardedQuery


class BigtableDataClientAsync(ClientWithProject):
Expand Down Expand Up @@ -1077,7 +1063,7 @@ async def check_and_mutate_row(
Raises:
- GoogleAPIError exceptions from grpc call
"""
operation_timeout._ = _get_timeouts(operation_timeout, None, self)
operation_timeout, _ = _get_timeouts(operation_timeout, None, self)
row_key = row_key.encode("utf-8") if isinstance(row_key, str) else row_key
if true_case_mutations is not None and not isinstance(
true_case_mutations, list
Expand Down Expand Up @@ -1136,7 +1122,7 @@ async def read_modify_write_row(
Raises:
- GoogleAPIError exceptions from grpc call
"""
operation_timeout._ = _get_timeouts(operation_timeout, None, self)
operation_timeout, _ = _get_timeouts(operation_timeout, None, self)
row_key = row_key.encode("utf-8") if isinstance(row_key, str) else row_key
if operation_timeout <= 0:
raise ValueError("operation_timeout must be greater than 0")
Expand Down
7 changes: 4 additions & 3 deletions google/cloud/bigtable/data/_async/mutations_batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
#
from __future__ import annotations

from typing import Any, Literal, TYPE_CHECKING
from typing import Any, TYPE_CHECKING
import asyncio
import atexit
import warnings
Expand All @@ -24,6 +24,7 @@
from google.cloud.bigtable.data.exceptions import MutationsExceptionGroup
from google.cloud.bigtable.data.exceptions import FailedMutationEntryError
from google.cloud.bigtable.data._helpers import _get_timeouts
from google.cloud.bigtable.data._helpers import TABLE_DEFAULT

from google.cloud.bigtable.data._async._mutate_rows import _MutateRowsOperationAsync
from google.cloud.bigtable.data._async._mutate_rows import (
Expand Down Expand Up @@ -189,8 +190,8 @@ def __init__(
flush_limit_bytes: int = 20 * _MB_SIZE,
flow_control_max_mutation_count: int = 100_000,
flow_control_max_bytes: int = 100 * _MB_SIZE,
batch_operation_timeout: float | "_TABLE_DEFAULT" = "MUTATE_ROWS_DEFAULT",
batch_attempt_timeout: float | None | "_TABLE_DEFAULT" = "MUTATE_ROWS_DEFAULT",
batch_operation_timeout: float | TABLE_DEFAULT = TABLE_DEFAULT.MUTATE_ROWS,
batch_attempt_timeout: float | None | TABLE_DEFAULT = TABLE_DEFAULT.MUTATE_ROWS,
):
"""
Args:
Expand Down
38 changes: 32 additions & 6 deletions google/cloud/bigtable/data/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@
#
from __future__ import annotations

from typing import Callable, Literal, Any
from typing import Callable, List, Tuple, Any
import time
import enum
from collections import namedtuple
from google.cloud.bigtable.data.read_rows_query import ReadRowsQuery

from google.api_core import exceptions as core_exceptions
from google.cloud.bigtable.data.exceptions import RetryExceptionGroup
Expand All @@ -23,6 +26,30 @@
Helper functions used in various places in the library.
"""

# Type alias for the output of sample_keys
RowKeySamples = List[Tuple[bytes, int]]

# type alias for the output of query.shard()
ShardedQuery = List[ReadRowsQuery]

# used by read_rows_sharded to limit how many requests are attempted in parallel
_CONCURRENCY_LIMIT = 10

# used to register instance data with the client for channel warming
_WarmedInstanceKey = namedtuple(
"_WarmedInstanceKey", ["instance_name", "table_name", "app_profile_id"]
)


# enum used on method calls when table defaults should be used
class TABLE_DEFAULT(enum.Enum):
# default for mutate_row, sample_row_keys, check_and_mutate_row, and read_modify_write_row
DEFAULT = "DEFAULT"
# default for read_rows, read_rows_stream, read_rows_sharded, row_exists, and read_row
READ_ROWS = "READ_ROWS_DEFAULT"
# default for bulk_mutate_rows and mutations_batcher
MUTATE_ROWS = "MUTATE_ROWS_DEFAULT"


def _make_metadata(
table_name: str, app_profile_id: str | None
Expand Down Expand Up @@ -115,15 +142,14 @@ def wrapper(*args, **kwargs):


def _get_timeouts(
operation: float | "_TABLE_DEFAULT", attempt: float | None | "_TABLE_DEFAULT", table
operation: float | TABLE_DEFAULT, attempt: float | None | TABLE_DEFAULT, table
) -> tuple[float, float]:
# TODO: docstring
# TODO: use enum for _TABLE_DEFAULT
if operation == "DEFAULT":
if operation == TABLE_DEFAULT.DEFAULT:
final_operation = table.default_operation_timeout
elif operation == "READ_ROWS_DEFAULT":
elif operation == TABLE_DEFAULT.READ_ROWS:
final_operation = table.default_read_rows_operation_timeout
elif operation == "MUTATE_ROWS_DEFAULT":
elif operation == TABLE_DEFAULT.MUTATE_ROWS:
final_operation = table.default_mutate_rows_operation_timeout
else:
final_operation = operation
Expand Down