-
Notifications
You must be signed in to change notification settings - Fork 62
feat: read_modify_write and check_and_mutate_row #780
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
daniel-sanche
merged 62 commits into
googleapis:v3
from
daniel-sanche:mutate_rows_other_rpcs
Jun 16, 2023
Merged
Changes from 60 commits
Commits
Show all changes
62 commits
Select commit
Hold shift + click to select a range
1d02154
added initial implementation of mutate_rows
daniel-sanche ab63cba
implemented mutation models
daniel-sanche cf9daa5
added retries to mutate_row
daniel-sanche 1247da4
return exception group if possible
daniel-sanche 3b3ed8c
check for idempotence
daniel-sanche 5d20037
initial implementation for bulk_mutations
daniel-sanche 3d322a1
include successes in bulk mutation error message
daniel-sanche a31232b
fixed style checks
daniel-sanche 8da2d65
added basic system tests
daniel-sanche 2b89d9c
added unit tests for mutate_row
daniel-sanche 47c5985
ran blacken
daniel-sanche 38fdcd7
improved exceptions
daniel-sanche 504d2d8
added bulk_mutate_rows unit tests
daniel-sanche b16067f
ran blacken
daniel-sanche 3ab1405
support __new___ for exceptions for python3.11+
daniel-sanche 0a6c0c6
added exception unit tests
daniel-sanche ec043cf
makde exceptions tuple
daniel-sanche 518530e
got exceptions to print consistently across versions
daniel-sanche 9624729
added test for 311 rich traceback
daniel-sanche 3087081
moved retryable row mutations to new file
daniel-sanche 9df588f
use index map
daniel-sanche 7ed8be3
added docstring
daniel-sanche 2536cc4
added predicate check to failed mutations
daniel-sanche 1f6875c
added _mutate_rows tests
daniel-sanche 1ea24e6
improved client tests
daniel-sanche 25ca2d2
refactored to loop by raising exception
daniel-sanche c0787db
refactored retry deadline logic into shared wrapper
daniel-sanche 3ed5c3d
ran black
daniel-sanche a91fbcb
pulled in table default timeouts
daniel-sanche df8a058
added tests for shared deadline parsing function
daniel-sanche b866b57
added tests for mutation models
daniel-sanche 54a4d43
fixed linter errors
daniel-sanche bd51dc4
added tests for BulkMutationsEntry
daniel-sanche 921b05a
improved mutations documentation
daniel-sanche 82ea61f
refactored mutate_rows logic into helper function
daniel-sanche fa42b86
implemented callbacks for mutate_rows
daniel-sanche 01a16f3
made exceptions into a tuple
daniel-sanche e6df77e
improved and tested read_modify_write_rules models
daniel-sanche 2d8ee3f
implemented read_modify_write
daniel-sanche af77dc3
added unit tests
daniel-sanche ebe2f94
added system test
daniel-sanche 8af5c71
added test for large values
daniel-sanche 1242836
allow string for append value rule
daniel-sanche afe839c
added append value system test
daniel-sanche d0781d0
added chained value system test
daniel-sanche ef30977
support creating SetValueMutation with int
daniel-sanche 6140acb
remove aborted from retryable errors
daniel-sanche 36ba2b6
improved SetCell mutation
daniel-sanche b3c9017
fixed mutations tests
daniel-sanche cac9e2d
SetCell timestamps use millisecond precision
daniel-sanche 34b051f
renamed BulkMutationsEntry to RowMutationEntry
daniel-sanche baf3378
implemented check_and_mutate
daniel-sanche bad11e5
added system tests
daniel-sanche 1d79202
fixed test issues
daniel-sanche 63ac35c
Merge branch 'v3' into mutate_rows
daniel-sanche 4138c89
Merge branch 'mutate_rows' into mutate_rows_other_rpcs
daniel-sanche 3c27fb7
Merge branch 'v3' into mutate_rows_other_rpcs
daniel-sanche b9b9dac
adjusted tests; require kwargs for check_and_mutate
daniel-sanche 234ea6c
added metadata
daniel-sanche fb818d4
clean up
daniel-sanche c9cebc2
changed timeout values
daniel-sanche ef8879e
Merge branch 'v3' into mutate_rows_other_rpcs
daniel-sanche File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -55,11 +55,12 @@ | |
| from google.cloud.bigtable._helpers import _make_metadata | ||
| from google.cloud.bigtable._helpers import _convert_retry_deadline | ||
|
|
||
| from google.cloud.bigtable.read_modify_write_rules import ReadModifyWriteRule | ||
| from google.cloud.bigtable.row_filters import RowFilter | ||
|
|
||
| if TYPE_CHECKING: | ||
| from google.cloud.bigtable.mutations_batcher import MutationsBatcher | ||
| from google.cloud.bigtable import RowKeySamples | ||
| from google.cloud.bigtable.row_filters import RowFilter | ||
| from google.cloud.bigtable.read_modify_write_rules import ReadModifyWriteRule | ||
|
|
||
|
|
||
| class BigtableDataClient(ClientWithProject): | ||
|
|
@@ -742,10 +743,11 @@ async def bulk_mutate_rows( | |
| async def check_and_mutate_row( | ||
| self, | ||
| row_key: str | bytes, | ||
| predicate: RowFilter | None, | ||
| predicate: RowFilter | dict[str, Any] | None, | ||
| *, | ||
| true_case_mutations: Mutation | list[Mutation] | None = None, | ||
| false_case_mutations: Mutation | list[Mutation] | None = None, | ||
| operation_timeout: int | float | None = 60, | ||
| operation_timeout: int | float | None = 600, | ||
| ) -> bool: | ||
| """ | ||
| Mutates a row atomically based on the output of a predicate filter | ||
|
|
@@ -779,17 +781,43 @@ async def check_and_mutate_row( | |
| Raises: | ||
| - GoogleAPIError exceptions from grpc call | ||
| """ | ||
| raise NotImplementedError | ||
| operation_timeout = operation_timeout or self.default_operation_timeout | ||
| if operation_timeout <= 0: | ||
| raise ValueError("operation_timeout must be greater than 0") | ||
| row_key = row_key.encode("utf-8") if isinstance(row_key, str) else row_key | ||
| if true_case_mutations is not None and not isinstance( | ||
| true_case_mutations, list | ||
| ): | ||
| true_case_mutations = [true_case_mutations] | ||
| true_case_dict = [m._to_dict() for m in true_case_mutations or []] | ||
| if false_case_mutations is not None and not isinstance( | ||
| false_case_mutations, list | ||
| ): | ||
| false_case_mutations = [false_case_mutations] | ||
| false_case_dict = [m._to_dict() for m in false_case_mutations or []] | ||
| if predicate is not None and not isinstance(predicate, dict): | ||
| predicate = predicate.to_dict() | ||
| metadata = _make_metadata(self.table_name, self.app_profile_id) | ||
| result = await self.client._gapic_client.check_and_mutate_row( | ||
| request={ | ||
| "predicate_filter": predicate, | ||
| "true_mutations": true_case_dict, | ||
| "false_mutations": false_case_dict, | ||
| "table_name": self.table_name, | ||
| "row_key": row_key, | ||
| "app_profile_id": self.app_profile_id, | ||
| }, | ||
| metadata=metadata, | ||
| timeout=operation_timeout, | ||
| ) | ||
| return result.predicate_matched | ||
|
|
||
| async def read_modify_write_row( | ||
| self, | ||
| row_key: str | bytes, | ||
| rules: ReadModifyWriteRule | ||
| | list[ReadModifyWriteRule] | ||
| | dict[str, Any] | ||
| | list[dict[str, Any]], | ||
| rules: ReadModifyWriteRule | list[ReadModifyWriteRule], | ||
| *, | ||
| operation_timeout: int | float | None = 60, | ||
| operation_timeout: int | float | None = 600, | ||
|
||
| ) -> Row: | ||
| """ | ||
| Reads and modifies a row atomically according to input ReadModifyWriteRules, | ||
|
|
@@ -813,7 +841,29 @@ async def read_modify_write_row( | |
| Raises: | ||
| - GoogleAPIError exceptions from grpc call | ||
| """ | ||
| raise NotImplementedError | ||
| operation_timeout = operation_timeout or self.default_operation_timeout | ||
| row_key = row_key.encode("utf-8") if isinstance(row_key, str) else row_key | ||
| if operation_timeout <= 0: | ||
| raise ValueError("operation_timeout must be greater than 0") | ||
| if rules is not None and not isinstance(rules, list): | ||
| rules = [rules] | ||
| if not rules: | ||
| raise ValueError("rules must contain at least one item") | ||
| # concert to dict representation | ||
| rules_dict = [rule._to_dict() for rule in rules] | ||
| metadata = _make_metadata(self.table_name, self.app_profile_id) | ||
| result = await self.client._gapic_client.read_modify_write_row( | ||
| request={ | ||
| "rules": rules_dict, | ||
| "table_name": self.table_name, | ||
| "row_key": row_key, | ||
| "app_profile_id": self.app_profile_id, | ||
| }, | ||
| metadata=metadata, | ||
| timeout=operation_timeout, | ||
| ) | ||
| # construct Row from result | ||
| return Row._from_pb(result.row) | ||
|
|
||
| async def close(self): | ||
| """ | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
10 minutes seems too long.. the java default is only 20 seconds. https://cloud.google.com/java/docs/reference/google-cloud-bigtable/latest/com.google.cloud.bigtable.data.v2.stub.EnhancedBigtableStubSettings#com_google_cloud_bigtable_data_v2_stub_EnhancedBigtableStubSettings_checkAndMutateRowSettings__
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed, but FYI I will likely be changing the timeout structure after all the rpc PRs are merged: #782