Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,13 @@
_RESUME_THRESHOLD = 0.8
"""The load threshold below which to resume the incoming message stream."""

_DEFAULT_STREAM_ACK_DEADLINE = 60
"""The default message acknowledge deadline in seconds for incoming message stream.

This default deadline is dynamically modified for the messages that are added
to the lease management.
"""


def _maybe_wrap_exception(exception):
"""Wraps a gRPC exception class, if needed."""
Expand Down Expand Up @@ -384,8 +391,17 @@ def open(self, callback, on_callback_error):
)

# Create the RPC
subscription = self._client.api.get_subscription(self._subscription)
stream_ack_deadline_seconds = subscription.ack_deadline_seconds

# We must use a fixed value for the ACK deadline, as we cannot read it
# from the subscription. The latter would require `pubsub.subscriptions.get`
# permission, which is not granted to the default subscriber role
# `roles/pubsub.subscriber`.
# See also https://github.com/googleapis/google-cloud-python/issues/9339
#
# When dynamic lease management is enabled for the "on hold" messages,
# the default stream ACK deadline should again be set based on the
# historic ACK timing data, i.e. `self.ack_histogram.percentile(99)`.
stream_ack_deadline_seconds = _DEFAULT_STREAM_ACK_DEADLINE

get_initial_request = functools.partial(
self._get_initial_request, stream_ack_deadline_seconds
Expand Down
16 changes: 10 additions & 6 deletions pubsub/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,10 @@ class CallbackError(Exception):
with pytest.raises(CallbackError):
future.result(timeout=30)

@pytest.mark.xfail(
reason="The default stream ACK deadline is static and received messages "
"exceeding FlowControl.max_messages are currently not lease managed."
)
def test_streaming_pull_ack_deadline(
self, publisher, subscriber, project, topic_path, subscription_path, cleanup
):
Expand All @@ -395,29 +399,29 @@ def test_streaming_pull_ack_deadline(
# Subscribe to the topic. This must happen before the messages
# are published.
subscriber.create_subscription(
subscription_path, topic_path, ack_deadline_seconds=60
subscription_path, topic_path, ack_deadline_seconds=240
)

# publish some messages and wait for completion
self._publish_messages(publisher, topic_path, batch_sizes=[2])

# subscribe to the topic
callback = StreamingPullCallback(
processing_time=15, # more than the default ACK deadline of 10 seconds
processing_time=70, # more than the default stream ACK deadline (60s)
resolve_at_msg_count=3, # one more than the published messages count
)
flow_control = types.FlowControl(max_messages=1)
subscription_future = subscriber.subscribe(
subscription_path, callback, flow_control=flow_control
)

# We expect to process the first two messages in 2 * 15 seconds, and
# We expect to process the first two messages in 2 * 70 seconds, and
# any duplicate message that is re-sent by the backend in additional
# 15 seconds, totalling 45 seconds (+ overhead) --> if there have been
# no duplicates in 60 seconds, we can reasonably assume that there
# 70 seconds, totalling 210 seconds (+ overhead) --> if there have been
# no duplicates in 240 seconds, we can reasonably assume that there
# won't be any.
try:
callback.done_future.result(timeout=60)
callback.done_future.result(timeout=240)
except exceptions.TimeoutError:
# future timed out, because we received no excessive messages
assert sorted(callback.seen_message_ids) == [1, 2]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,12 +404,9 @@ def test_heartbeat_inactive():
"google.cloud.pubsub_v1.subscriber._protocol.heartbeater.Heartbeater", autospec=True
)
def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bidi_rpc):
stream_ack_deadline = streaming_pull_manager._DEFAULT_STREAM_ACK_DEADLINE

manager = make_manager()
manager._client.api.get_subscription.return_value = types.Subscription(
name="projects/foo/subscriptions/bar",
topic="projects/foo/topics/baz",
ack_deadline_seconds=123,
)

manager.open(mock.sentinel.callback, mock.sentinel.on_callback_error)

Expand Down Expand Up @@ -437,7 +434,8 @@ def test_open(heartbeater, dispatcher, leaser, background_consumer, resumable_bi
)
initial_request_arg = resumable_bidi_rpc.call_args.kwargs["initial_request"]
assert initial_request_arg.func == manager._get_initial_request
assert initial_request_arg.args[0] == 123
assert initial_request_arg.args[0] == stream_ack_deadline
assert not manager._client.api.get_subscription.called

resumable_bidi_rpc.return_value.add_done_callback.assert_called_once_with(
manager._on_rpc_done
Expand Down