Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,11 @@
_validate_timeouts,
_WarmedInstanceKey,
)
from google.cloud.bigtable.data._metrics import BigtableClientSideMetricsController
from google.cloud.bigtable.data._metrics import (
BigtableClientSideMetricsController,
OperationType,
tracked_retry,
)
from google.cloud.bigtable.data.exceptions import (
FailedQueryShardError,
ShardedReadRowsExceptionGroup,
Expand Down Expand Up @@ -1431,26 +1435,28 @@ async def sample_row_keys(
retryable_excs = _get_retryable_errors(retryable_errors, self)
predicate = retries.if_exception_type(*retryable_excs)

sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60)

@CrossSync.convert
async def execute_rpc():
results = await self.client._gapic_client.sample_row_keys(
request=SampleRowKeysRequest(
app_profile_id=self.app_profile_id, **self._request_path
),
timeout=next(attempt_timeout_gen),
retry=None,
with self._metrics.create_operation(
OperationType.SAMPLE_ROW_KEYS
) as operation_metric:

@CrossSync.convert
async def execute_rpc():
results = await self.client._gapic_client.sample_row_keys(
request=SampleRowKeysRequest(
app_profile_id=self.app_profile_id, **self._request_path
),
timeout=next(attempt_timeout_gen),
retry=None,
)
return [(s.row_key, s.offset_bytes) async for s in results]

return await tracked_retry(
retry_fn=CrossSync.retry_target,
operation=operation_metric,
target=execute_rpc,
predicate=predicate,
timeout=operation_timeout,
)
Comment thread
daniel-sanche marked this conversation as resolved.
return [(s.row_key, s.offset_bytes) async for s in results]

return await CrossSync.retry_target(
execute_rpc,
predicate,
sleep_generator,
operation_timeout,
exception_factory=_retry_exception_factory,
)

@CrossSync.convert(replace_symbols={"MutationsBatcherAsync": "MutationsBatcher"})
def mutations_batcher(
Expand Down Expand Up @@ -1561,28 +1567,29 @@ async def mutate_row(
# mutations should not be retried
predicate = retries.if_exception_type()

sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60)

target = partial(
self.client._gapic_client.mutate_row,
request=MutateRowRequest(
row_key=row_key.encode("utf-8")
if isinstance(row_key, str)
else row_key,
mutations=[mutation._to_pb() for mutation in mutations_list],
app_profile_id=self.app_profile_id,
**self._request_path,
),
timeout=attempt_timeout,
retry=None,
)
return await CrossSync.retry_target(
target,
predicate,
sleep_generator,
operation_timeout,
exception_factory=_retry_exception_factory,
)
with self._metrics.create_operation(
OperationType.MUTATE_ROW
) as operation_metric:
target = partial(
self.client._gapic_client.mutate_row,
request=MutateRowRequest(
row_key=row_key.encode("utf-8")
if isinstance(row_key, str)
else row_key,
mutations=[mutation._to_pb() for mutation in mutations_list],
app_profile_id=self.app_profile_id,
**self._request_path,
),
timeout=attempt_timeout,
retry=None,
)
return await tracked_retry(
retry_fn=CrossSync.retry_target,
operation=operation_metric,
target=target,
predicate=predicate,
timeout=operation_timeout,
)
Comment thread
daniel-sanche marked this conversation as resolved.

@CrossSync.convert
async def bulk_mutate_rows(
Expand Down Expand Up @@ -1693,21 +1700,25 @@ async def check_and_mutate_row(
):
false_case_mutations = [false_case_mutations]
false_case_list = [m._to_pb() for m in false_case_mutations or []]
result = await self.client._gapic_client.check_and_mutate_row(
request=CheckAndMutateRowRequest(
true_mutations=true_case_list,
false_mutations=false_case_list,
predicate_filter=predicate._to_pb() if predicate is not None else None,
row_key=row_key.encode("utf-8")
if isinstance(row_key, str)
else row_key,
app_profile_id=self.app_profile_id,
**self._request_path,
),
timeout=operation_timeout,
retry=None,
)
return result.predicate_matched

with self._metrics.create_operation(OperationType.CHECK_AND_MUTATE):
result = await self.client._gapic_client.check_and_mutate_row(
request=CheckAndMutateRowRequest(
true_mutations=true_case_list,
false_mutations=false_case_list,
predicate_filter=predicate._to_pb()
if predicate is not None
else None,
row_key=row_key.encode("utf-8")
if isinstance(row_key, str)
else row_key,
app_profile_id=self.app_profile_id,
**self._request_path,
),
timeout=operation_timeout,
retry=None,
)
return result.predicate_matched

@CrossSync.convert
async def read_modify_write_row(
Expand Down Expand Up @@ -1747,20 +1758,22 @@ async def read_modify_write_row(
rules = [rules]
if not rules:
raise ValueError("rules must contain at least one item")
result = await self.client._gapic_client.read_modify_write_row(
request=ReadModifyWriteRowRequest(
rules=[rule._to_pb() for rule in rules],
row_key=row_key.encode("utf-8")
if isinstance(row_key, str)
else row_key,
app_profile_id=self.app_profile_id,
**self._request_path,
),
timeout=operation_timeout,
retry=None,
)
# construct Row from result
return Row._from_pb(result.row)

with self._metrics.create_operation(OperationType.READ_MODIFY_WRITE):
result = await self.client._gapic_client.read_modify_write_row(
request=ReadModifyWriteRowRequest(
rules=[rule._to_pb() for rule in rules],
row_key=row_key.encode("utf-8")
if isinstance(row_key, str)
else row_key,
app_profile_id=self.app_profile_id,
**self._request_path,
),
timeout=operation_timeout,
retry=None,
)
# construct Row from result
return Row._from_pb(result.row)

@CrossSync.convert
async def close(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ def _retry_exception_factory(
tuple[Exception, Exception|None]:
tuple of the exception to raise, and a cause exception if applicable
"""
exc_list = exc_list.copy()
if reason == RetryFailureReason.TIMEOUT:
timeout_val_str = f"of {timeout_val:0.1f}s " if timeout_val is not None else ""
# if failed due to timeout, raise deadline exceeded as primary exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,11 @@
_validate_timeouts,
_WarmedInstanceKey,
)
from google.cloud.bigtable.data._metrics import BigtableClientSideMetricsController
from google.cloud.bigtable.data._metrics import (
BigtableClientSideMetricsController,
OperationType,
tracked_retry,
)
from google.cloud.bigtable.data.exceptions import (
FailedQueryShardError,
ShardedReadRowsExceptionGroup,
Expand Down Expand Up @@ -1182,25 +1186,27 @@ def sample_row_keys(
)
retryable_excs = _get_retryable_errors(retryable_errors, self)
predicate = retries.if_exception_type(*retryable_excs)
sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60)

def execute_rpc():
results = self.client._gapic_client.sample_row_keys(
request=SampleRowKeysRequest(
app_profile_id=self.app_profile_id, **self._request_path
),
timeout=next(attempt_timeout_gen),
retry=None,
with self._metrics.create_operation(
OperationType.SAMPLE_ROW_KEYS
) as operation_metric:

def execute_rpc():
results = self.client._gapic_client.sample_row_keys(
request=SampleRowKeysRequest(
app_profile_id=self.app_profile_id, **self._request_path
),
timeout=next(attempt_timeout_gen),
retry=None,
)
return [(s.row_key, s.offset_bytes) for s in results]

return tracked_retry(
retry_fn=CrossSync._Sync_Impl.retry_target,
operation=operation_metric,
target=execute_rpc,
predicate=predicate,
timeout=operation_timeout,
)
Comment thread
daniel-sanche marked this conversation as resolved.
return [(s.row_key, s.offset_bytes) for s in results]

return CrossSync._Sync_Impl.retry_target(
execute_rpc,
predicate,
sleep_generator,
operation_timeout,
exception_factory=_retry_exception_factory,
)

def mutations_batcher(
self,
Expand Down Expand Up @@ -1301,27 +1307,29 @@ def mutate_row(
)
else:
predicate = retries.if_exception_type()
sleep_generator = retries.exponential_sleep_generator(0.01, 2, 60)
target = partial(
self.client._gapic_client.mutate_row,
request=MutateRowRequest(
row_key=row_key.encode("utf-8")
if isinstance(row_key, str)
else row_key,
mutations=[mutation._to_pb() for mutation in mutations_list],
app_profile_id=self.app_profile_id,
**self._request_path,
),
timeout=attempt_timeout,
retry=None,
)
return CrossSync._Sync_Impl.retry_target(
target,
predicate,
sleep_generator,
operation_timeout,
exception_factory=_retry_exception_factory,
)
with self._metrics.create_operation(
OperationType.MUTATE_ROW
) as operation_metric:
target = partial(
self.client._gapic_client.mutate_row,
request=MutateRowRequest(
row_key=row_key.encode("utf-8")
if isinstance(row_key, str)
else row_key,
mutations=[mutation._to_pb() for mutation in mutations_list],
app_profile_id=self.app_profile_id,
**self._request_path,
),
timeout=attempt_timeout,
retry=None,
)
return tracked_retry(
retry_fn=CrossSync._Sync_Impl.retry_target,
operation=operation_metric,
target=target,
predicate=predicate,
timeout=operation_timeout,
)
Comment thread
daniel-sanche marked this conversation as resolved.

def bulk_mutate_rows(
self,
Expand Down Expand Up @@ -1425,21 +1433,24 @@ def check_and_mutate_row(
):
false_case_mutations = [false_case_mutations]
false_case_list = [m._to_pb() for m in false_case_mutations or []]
result = self.client._gapic_client.check_and_mutate_row(
request=CheckAndMutateRowRequest(
true_mutations=true_case_list,
false_mutations=false_case_list,
predicate_filter=predicate._to_pb() if predicate is not None else None,
row_key=row_key.encode("utf-8")
if isinstance(row_key, str)
else row_key,
app_profile_id=self.app_profile_id,
**self._request_path,
),
timeout=operation_timeout,
retry=None,
)
return result.predicate_matched
with self._metrics.create_operation(OperationType.CHECK_AND_MUTATE):
result = self.client._gapic_client.check_and_mutate_row(
request=CheckAndMutateRowRequest(
true_mutations=true_case_list,
false_mutations=false_case_list,
predicate_filter=predicate._to_pb()
if predicate is not None
else None,
row_key=row_key.encode("utf-8")
if isinstance(row_key, str)
else row_key,
app_profile_id=self.app_profile_id,
**self._request_path,
),
timeout=operation_timeout,
retry=None,
)
return result.predicate_matched

def read_modify_write_row(
self,
Expand Down Expand Up @@ -1476,19 +1487,20 @@ def read_modify_write_row(
rules = [rules]
if not rules:
raise ValueError("rules must contain at least one item")
result = self.client._gapic_client.read_modify_write_row(
request=ReadModifyWriteRowRequest(
rules=[rule._to_pb() for rule in rules],
row_key=row_key.encode("utf-8")
if isinstance(row_key, str)
else row_key,
app_profile_id=self.app_profile_id,
**self._request_path,
),
timeout=operation_timeout,
retry=None,
)
return Row._from_pb(result.row)
with self._metrics.create_operation(OperationType.READ_MODIFY_WRITE):
result = self.client._gapic_client.read_modify_write_row(
request=ReadModifyWriteRowRequest(
rules=[rule._to_pb() for rule in rules],
row_key=row_key.encode("utf-8")
if isinstance(row_key, str)
else row_key,
app_profile_id=self.app_profile_id,
**self._request_path,
),
timeout=operation_timeout,
retry=None,
)
return Row._from_pb(result.row)

def close(self):
"""Called to close the Table instance and release any resources held by it."""
Expand Down
4 changes: 0 additions & 4 deletions packages/google-cloud-bigtable/tests/system/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@
script_path = os.path.dirname(os.path.realpath(__file__))
sys.path.append(script_path)

pytest_plugins = [
"data.setup_fixtures",
]


@pytest.fixture(scope="session")
def event_loop():
Expand Down
Loading
Loading