Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
b5b62c8
feat!: add new v3.0.0 API skeleton (#745)
daniel-sanche Mar 14, 2023
7d51eeb
chore: merge branch 'main' into v3
daniel-sanche Mar 15, 2023
507da99
feat: improve rows filters (#751)
daniel-sanche Mar 23, 2023
71b0312
feat: read rows query model class (#752)
daniel-sanche Apr 3, 2023
c55099f
feat: implement row and cell model classes (#753)
daniel-sanche Apr 5, 2023
f9a1907
feat: add pooled grpc transport (#748)
daniel-sanche Apr 24, 2023
3de7a68
feat: implement read_rows (#762)
daniel-sanche May 24, 2023
9b81289
feat: implement mutate rows (#769)
daniel-sanche Jun 6, 2023
ec3fd01
feat: literal value filter (#767)
daniel-sanche Jun 6, 2023
5d65703
feat: row_exists and read_row (#778)
daniel-sanche Jun 16, 2023
432d159
feat: read_modify_write and check_and_mutate_row (#780)
daniel-sanche Jun 16, 2023
ec2b983
feat: sharded read rows (#766)
daniel-sanche Jun 23, 2023
ceaf598
feat: ping and warm with metadata (#810)
daniel-sanche Jun 26, 2023
1ecf65f
feat: mutate rows batching (#770)
daniel-sanche Jun 26, 2023
eedde1e
chore: restructure module paths (#816)
daniel-sanche Jun 28, 2023
07438ca
feat: improve timeout structure (#819)
daniel-sanche Jun 29, 2023
0d92a84
fix: api errors apply to all bulk mutations
daniel-sanche Jul 24, 2023
a8cdf7c
chore: reduce public api surface (#820)
daniel-sanche Aug 1, 2023
aa760b2
feat: improve error group tracebacks on < py11 (#825)
daniel-sanche Aug 16, 2023
0323dde
feat: optimize read_rows (#852)
daniel-sanche Aug 17, 2023
0b3606f
chore: add user agent suffix (#842)
daniel-sanche Aug 17, 2023
b6d232a
feat: optimize retries (#854)
daniel-sanche Aug 17, 2023
8708a25
feat: add test proxy (#836)
daniel-sanche Aug 18, 2023
1d3a7c1
chore(tests): add conformance tests to CI for v3 (#870)
daniel-sanche Oct 16, 2023
50531e5
chore(tests): turn off fast fail for conformance tets (#882)
daniel-sanche Oct 26, 2023
8ff1216
feat: add TABLE_DEFAULTS enum for table method arguments (#880)
daniel-sanche Oct 26, 2023
94bfe66
fix: pass None for retry in gapic calls (#881)
daniel-sanche Oct 27, 2023
3ac80a9
feat: replace internal dictionaries with protos in gapic calls (#875)
daniel-sanche Nov 22, 2023
b191451
chore: optimize gapic calls (#863)
daniel-sanche Dec 1, 2023
285cdd3
feat: expose retryable error codes to users (#879)
daniel-sanche Dec 1, 2023
9342e27
chore: update api_core submodule (#897)
daniel-sanche Dec 15, 2023
858b93a
chore: merge main into experimental_v3 (#900)
daniel-sanche Dec 15, 2023
cc79d8c
chore: pin conformance tests to v0.0.2 (#903)
daniel-sanche Dec 15, 2023
f0d75de
fix: bulk mutation eventual success (#909)
daniel-sanche Dec 19, 2023
cdfcc2a
chore: merge branch 'main'
daniel-sanche Jan 18, 2024
5e11180
chore: move batcher file
daniel-sanche Jan 18, 2024
35996a8
chore: Merge branch 'main'
daniel-sanche Jan 18, 2024
54dbc08
chore(tests): fixed failing test
daniel-sanche Jan 18, 2024
fd4fba3
chore: downgraded system_emulated
daniel-sanche Jan 18, 2024
f4ac54f
fix: use default project in emulator mode
daniel-sanche Jan 18, 2024
79157c9
chore(tests): system tests support emulator
daniel-sanche Jan 18, 2024
b5c533b
chore(tests): use legacy client to spin up instance in system tests
daniel-sanche Jan 18, 2024
54462e2
chore(tests): revert back to gapic clients
daniel-sanche Jan 18, 2024
aeea5c6
fixed support for system_emulated
daniel-sanche Jan 18, 2024
ef29512
chore: ran blacken
daniel-sanche Jan 19, 2024
3b5b033
chore(tests): use emulator mode in github actions unit tests
daniel-sanche Jan 19, 2024
73171eb
run unit tests in emulator mode by default
daniel-sanche Jan 19, 2024
6fa49ab
iterating on tests
daniel-sanche Jan 19, 2024
52ec52a
use the same client init function in unit tests
daniel-sanche Jan 19, 2024
b2bf56f
use _make_client in more places
daniel-sanche Jan 19, 2024
7931069
iterating on tests
daniel-sanche Jan 19, 2024
8b886b3
changed cover requirement
daniel-sanche Jan 19, 2024
c3ed5aa
updated README
daniel-sanche Jan 19, 2024
158ceed
updated owlbot file
daniel-sanche Jan 19, 2024
22e1947
updated api_core version in setup.py
daniel-sanche Jan 19, 2024
9e60999
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Jan 19, 2024
1a32de2
Revert "updated README"
daniel-sanche Jan 27, 2024
cab3694
fixed versions
daniel-sanche Jan 29, 2024
952d91c
remove conformance from kokoro main run
daniel-sanche Jan 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: improve timeout structure (#819)
  • Loading branch information
daniel-sanche authored Jun 29, 2023
commit 07438ca301f2bdbb3945b6330c5bb4c7c687747d
4 changes: 2 additions & 2 deletions .github/.OwlBot.lock.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@
# limitations under the License.
docker:
image: gcr.io/cloud-devrel-public-resources/owlbot-python:latest
digest: sha256:240b5bcc2bafd450912d2da2be15e62bc6de2cf839823ae4bf94d4f392b451dc
# created: 2023-06-03T21:25:37.968717478Z
digest: sha256:ddf4551385d566771dc713090feb7b4c1164fb8a698fe52bbe7670b24236565b
# created: 2023-06-27T13:04:21.96690344Z
8 changes: 4 additions & 4 deletions google/cloud/bigtable/data/_async/_mutate_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@ def __init__(
table: "TableAsync",
mutation_entries: list["RowMutationEntry"],
operation_timeout: float,
per_request_timeout: float | None,
attempt_timeout: float | None,
):
"""
Args:
- gapic_client: the client to use for the mutate_rows call
- table: the table associated with the request
- mutation_entries: a list of RowMutationEntry objects to send to the server
- operation_timeout: the timeout t o use for the entire operation, in seconds.
- per_request_timeout: the timeoutto use for each mutate_rows attempt, in seconds.
- operation_timeout: the timeout to use for the entire operation, in seconds.
- attempt_timeout: the timeout to use for each mutate_rows attempt, in seconds.
If not specified, the request will run until operation_timeout is reached.
"""
# check that mutations are within limits
Expand Down Expand Up @@ -99,7 +99,7 @@ def __init__(
self._operation = _convert_retry_deadline(retry_wrapped, operation_timeout)
# initialize state
self.timeout_generator = _attempt_timeout_generator(
per_request_timeout, operation_timeout
attempt_timeout, operation_timeout
)
self.mutations = mutation_entries
self.remaining_indices = list(range(len(self.mutations)))
Expand Down
6 changes: 3 additions & 3 deletions google/cloud/bigtable/data/_async/_read_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,14 @@ def __init__(
client: BigtableAsyncClient,
*,
operation_timeout: float = 600.0,
per_request_timeout: float | None = None,
attempt_timeout: float | None = None,
):
"""
Args:
- request: the request dict to send to the Bigtable API
- client: the Bigtable client to use to make the request
- operation_timeout: the timeout to use for the entire operation, in seconds
- per_request_timeout: the timeout to use when waiting for each individual grpc request, in seconds
- attempt_timeout: the timeout to use when waiting for each individual grpc request, in seconds
If not specified, defaults to operation_timeout
"""
self._last_emitted_row_key: bytes | None = None
Expand All @@ -79,7 +79,7 @@ def __init__(
self.operation_timeout = operation_timeout
# use generator to lower per-attempt timeout as we approach operation_timeout deadline
attempt_timeout_gen = _attempt_timeout_generator(
per_request_timeout, operation_timeout
attempt_timeout, operation_timeout
)
row_limit = request.get("rows_limit", 0)
# lock in paramters for retryable wrapper
Expand Down
368 changes: 233 additions & 135 deletions google/cloud/bigtable/data/_async/client.py

Large diffs are not rendered by default.

31 changes: 13 additions & 18 deletions google/cloud/bigtable/data/_async/mutations_batcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from google.cloud.bigtable.data.mutations import RowMutationEntry
from google.cloud.bigtable.data.exceptions import MutationsExceptionGroup
from google.cloud.bigtable.data.exceptions import FailedMutationEntryError
from google.cloud.bigtable.data._helpers import _validate_timeouts

from google.cloud.bigtable.data._async._mutate_rows import _MutateRowsOperationAsync
from google.cloud.bigtable.data._async._mutate_rows import (
Expand Down Expand Up @@ -189,7 +190,7 @@ def __init__(
flow_control_max_mutation_count: int = 100_000,
flow_control_max_bytes: int = 100 * _MB_SIZE,
batch_operation_timeout: float | None = None,
batch_per_request_timeout: float | None = None,
batch_attempt_timeout: float | None = None,
):
"""
Args:
Expand All @@ -202,26 +203,20 @@ def __init__(
- flow_control_max_mutation_count: Maximum number of inflight mutations.
- flow_control_max_bytes: Maximum number of inflight bytes.
- batch_operation_timeout: timeout for each mutate_rows operation, in seconds. If None,
table default_operation_timeout will be used
- batch_per_request_timeout: timeout for each individual request, in seconds. If None,
table default_per_request_timeout will be used
table default_mutate_rows_operation_timeout will be used
- batch_attempt_timeout: timeout for each individual request, in seconds. If None,
table default_mutate_rows_attempt_timeout will be used, or batch_operation_timeout
if that is also None.
"""
self._operation_timeout: float = (
batch_operation_timeout or table.default_operation_timeout
batch_operation_timeout or table.default_mutate_rows_operation_timeout
)
self._per_request_timeout: float = (
batch_per_request_timeout
or table.default_per_request_timeout
self._attempt_timeout: float = (
batch_attempt_timeout
or table.default_mutate_rows_attempt_timeout
or self._operation_timeout
)
if self._operation_timeout <= 0:
raise ValueError("batch_operation_timeout must be greater than 0")
if self._per_request_timeout <= 0:
raise ValueError("batch_per_request_timeout must be greater than 0")
if self._per_request_timeout > self._operation_timeout:
raise ValueError(
"batch_per_request_timeout must be less than batch_operation_timeout"
)
_validate_timeouts(self._operation_timeout, self._attempt_timeout)
self.closed: bool = False
self._table = table
self._staged_entries: list[RowMutationEntry] = []
Expand Down Expand Up @@ -346,7 +341,7 @@ async def _execute_mutate_rows(

Args:
- batch: list of RowMutationEntry objects to send to server
- timeout: timeout in seconds. Used as operation_timeout and per_request_timeout.
- timeout: timeout in seconds. Used as operation_timeout and attempt_timeout.
If not given, will use table defaults
Returns:
- list of FailedMutationEntryError objects for mutations that failed.
Expand All @@ -361,7 +356,7 @@ async def _execute_mutate_rows(
self._table,
batch,
operation_timeout=self._operation_timeout,
per_request_timeout=self._per_request_timeout,
attempt_timeout=self._attempt_timeout,
)
await operation.start()
except MutationsExceptionGroup as e:
Expand Down
23 changes: 23 additions & 0 deletions google/cloud/bigtable/data/_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,26 @@ def wrapper(*args, **kwargs):
handle_error()

return wrapper_async if iscoroutinefunction(func) else wrapper


def _validate_timeouts(
operation_timeout: float, attempt_timeout: float | None, allow_none: bool = False
):
"""
Helper function that will verify that timeout values are valid, and raise
an exception if they are not.

Args:
- operation_timeout: The timeout value to use for the entire operation, in seconds.
- attempt_timeout: The timeout value to use for each attempt, in seconds.
- allow_none: If True, attempt_timeout can be None. If False, None values will raise an exception.
Raises:
- ValueError if operation_timeout or attempt_timeout are invalid.
"""
if operation_timeout <= 0:
raise ValueError("operation_timeout must be greater than 0")
if not allow_none and attempt_timeout is None:
raise ValueError("attempt_timeout must not be None")
elif attempt_timeout is not None:
if attempt_timeout <= 0:
raise ValueError("attempt_timeout must be greater than 0")
3 changes: 1 addition & 2 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,10 +366,9 @@ def docfx(session):

session.install("-e", ".")
session.install(
"sphinx==4.0.1",
"gcp-sphinx-docfx-yaml",
"alabaster",
"recommonmark",
"gcp-sphinx-docfx-yaml",
)

shutil.rmtree(os.path.join("docs", "_build"), ignore_errors=True)
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/data/_async/test__mutate_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def _make_one(self, *args, **kwargs):
kwargs["table"] = kwargs.pop("table", AsyncMock())
kwargs["mutation_entries"] = kwargs.pop("mutation_entries", [])
kwargs["operation_timeout"] = kwargs.pop("operation_timeout", 5)
kwargs["per_request_timeout"] = kwargs.pop("per_request_timeout", 0.1)
kwargs["attempt_timeout"] = kwargs.pop("attempt_timeout", 0.1)
return self._target_class()(*args, **kwargs)

async def _mock_stream(self, mutation_list, error_dict):
Expand Down Expand Up @@ -267,7 +267,7 @@ async def test_run_attempt_single_entry_success(self):
mock_gapic_fn = self._make_mock_gapic({0: mutation})
instance = self._make_one(
mutation_entries=[mutation],
per_request_timeout=expected_timeout,
attempt_timeout=expected_timeout,
)
with mock.patch.object(instance, "_gapic_fn", mock_gapic_fn):
await instance._run_attempt()
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/data/_async/test__read_rows.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def test_ctor(self):
request,
client,
operation_timeout=expected_operation_timeout,
per_request_timeout=expected_request_timeout,
attempt_timeout=expected_request_timeout,
)
assert time_gen_mock.call_count == 1
time_gen_mock.assert_called_once_with(
Expand Down
Loading