From e82bce4a460ccdd2d8d05f30873157fe7e396f2c Mon Sep 17 00:00:00 2001 From: abbrowne126 <81702808+abbrowne126@users.noreply.github.com> Date: Tue, 15 Jul 2025 16:12:13 -0400 Subject: [PATCH 01/11] fix: if otel is enabled, run user callback under process span --- .../subscriber/_protocol/streaming_pull_manager.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index 486a728b4..552afc035 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -148,8 +148,10 @@ def _wrap_callback_errors( try: if message.opentelemetry_data: message.opentelemetry_data.end_subscribe_concurrency_control_span() - message.opentelemetry_data.start_process_span() - callback(message) + with message.opentelemetry_data.start_process_span(): + callback(message) + else: + callback(message) except BaseException as exc: # Note: the likelihood of this failing is extremely low. This just adds # a message to a queue, so if this doesn't work the world is in an From b55b5e432f526aef6df08bf03c107de908f213e4 Mon Sep 17 00:00:00 2001 From: abbrowne126 <81702808+abbrowne126@users.noreply.github.com> Date: Tue, 15 Jul 2025 16:25:34 -0400 Subject: [PATCH 02/11] Update subscribe_opentelemetry.py --- .../cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py b/google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py index 88870be60..595734d14 100644 --- a/google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py +++ b/google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py @@ -160,7 +160,7 @@ def end_subscribe_scheduler_span(self) -> None: assert self._scheduler_span is not None self._scheduler_span.end() - def start_process_span(self) -> None: + def start_process_span(self) -> trace.Span: assert self._subscribe_span is not None tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME) publish_create_span_link: Optional[trace.Link] = None @@ -186,6 +186,7 @@ def start_process_span(self) -> None: end_on_exit=False, ) as process_span: self._process_span = process_span + return process_span def end_process_span(self) -> None: assert self._process_span is not None From a265032a037aaf3328eafe126ddc5f9d77f8c399 Mon Sep 17 00:00:00 2001 From: abbrowne126 <81702808+abbrowne126@users.noreply.github.com> Date: Tue, 15 Jul 2025 17:24:11 -0400 Subject: [PATCH 03/11] Update test_streaming_pull_manager.py --- tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index 86d2461e7..1069d77d2 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -2956,7 +2956,7 @@ def test_opentelemetry_subscriber_concurrency_control_span_end(span_exporter): streaming_pull_manager._wrap_callback_errors(mock.Mock(), mock.Mock(), msg) spans = span_exporter.get_finished_spans() - assert len(spans) == 1 + assert len(spans) == 2 concurrency_control_span = spans[0] concurrency_control_span.name == "subscriber concurrency control" From 87f1826094c8e3db058e4bb896b0da8ca6a2aa3f Mon Sep 17 00:00:00 2001 From: abbrowne126 <81702808+abbrowne126@users.noreply.github.com> Date: Tue, 15 Jul 2025 17:27:13 -0400 Subject: [PATCH 04/11] Update test_streaming_pull_manager.py --- tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index 1069d77d2..f45959637 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -2959,7 +2959,7 @@ def test_opentelemetry_subscriber_concurrency_control_span_end(span_exporter): assert len(spans) == 2 concurrency_control_span = spans[0] - concurrency_control_span.name == "subscriber concurrency control" + assert concurrency_control_span.name == "subscriber concurrency control" def test_opentelemetry_wrap_callback_error(span_exporter): From 0dd9115644cf8e694f75c320719a922adac70c96 Mon Sep 17 00:00:00 2001 From: abbrowne126 <81702808+abbrowne126@users.noreply.github.com> Date: Tue, 22 Jul 2025 14:06:44 -0400 Subject: [PATCH 05/11] use context manager to allow child spans of the message processing span --- .../pubsub_v1/open_telemetry/subscribe_opentelemetry.py | 6 ++++++ .../subscriber/_protocol/streaming_pull_manager.py | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py b/google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py index 595734d14..6c9a016a7 100644 --- a/google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py +++ b/google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py @@ -201,6 +201,12 @@ def add_process_span_event(self, event: str) -> None: }, ) + def __enter__(self) -> trace.Span: + return self.start_process_span() + + def __exit__(self, exc_type, exc_val, traceback): + if self._process_span: + self.end_process_span() def start_modack_span( subscribe_span_links: List[trace.Link], diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index 552afc035..618c5a6d0 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -148,7 +148,7 @@ def _wrap_callback_errors( try: if message.opentelemetry_data: message.opentelemetry_data.end_subscribe_concurrency_control_span() - with message.opentelemetry_data.start_process_span(): + with message.opentelemetry_data as otel_span: callback(message) else: callback(message) From 77e6057e5cff33f072422c91dd5055c14f28480c Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Tue, 22 Jul 2025 18:08:58 +0000 Subject: [PATCH 06/11] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20?= =?UTF-8?q?post-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py | 1 + 1 file changed, 1 insertion(+) diff --git a/google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py b/google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py index 6c9a016a7..5a6abd21b 100644 --- a/google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py +++ b/google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py @@ -208,6 +208,7 @@ def __exit__(self, exc_type, exc_val, traceback): if self._process_span: self.end_process_span() + def start_modack_span( subscribe_span_links: List[trace.Link], subscription_id: Optional[str], From 8f71624e3d9264362a0a85d936cc6157d25946bc Mon Sep 17 00:00:00 2001 From: abbrowne126 <81702808+abbrowne126@users.noreply.github.com> Date: Tue, 22 Jul 2025 14:09:56 -0400 Subject: [PATCH 07/11] Update subscribe_opentelemetry.py --- .../pubsub_v1/open_telemetry/subscribe_opentelemetry.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py b/google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py index 6c9a016a7..12c562184 100644 --- a/google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py +++ b/google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py @@ -168,9 +168,9 @@ def start_process_span(self) -> trace.Span: publish_create_span: trace.Span = trace.get_current_span( self._publisher_create_span_context ) - span_context: Optional[ - trace.SpanContext - ] = publish_create_span.get_span_context() + span_context: Optional[trace.SpanContext] = ( + publish_create_span.get_span_context() + ) publish_create_span_link = ( trace.Link(span_context) if span_context else None ) @@ -208,6 +208,7 @@ def __exit__(self, exc_type, exc_val, traceback): if self._process_span: self.end_process_span() + def start_modack_span( subscribe_span_links: List[trace.Link], subscription_id: Optional[str], From 8768bff4adae2441ef978cf6818e93a4c76be555 Mon Sep 17 00:00:00 2001 From: Owl Bot Date: Tue, 22 Jul 2025 18:12:14 +0000 Subject: [PATCH 08/11] =?UTF-8?q?=F0=9F=A6=89=20Updates=20from=20OwlBot=20?= =?UTF-8?q?post-processor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md --- .../pubsub_v1/open_telemetry/subscribe_opentelemetry.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py b/google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py index 12c562184..5a6abd21b 100644 --- a/google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py +++ b/google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py @@ -168,9 +168,9 @@ def start_process_span(self) -> trace.Span: publish_create_span: trace.Span = trace.get_current_span( self._publisher_create_span_context ) - span_context: Optional[trace.SpanContext] = ( - publish_create_span.get_span_context() - ) + span_context: Optional[ + trace.SpanContext + ] = publish_create_span.get_span_context() publish_create_span_link = ( trace.Link(span_context) if span_context else None ) From 17b7c034d98ae84b27f3aa464bc7d00820c17a8c Mon Sep 17 00:00:00 2001 From: abbrowne126 <81702808+abbrowne126@users.noreply.github.com> Date: Tue, 22 Jul 2025 14:16:18 -0400 Subject: [PATCH 09/11] Update noxfile.py --- noxfile.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/noxfile.py b/noxfile.py index dd182f105..e1701ef1b 100644 --- a/noxfile.py +++ b/noxfile.py @@ -34,7 +34,7 @@ MYPY_VERSION = "mypy==1.10.0" -DEFAULT_PYTHON_VERSION = "3.8" +DEFAULT_PYTHON_VERSION = "3.13" UNIT_TEST_PYTHON_VERSIONS: List[str] = [ "3.7", From 8daa2d1f98277fc9cab87cee71b4babbe2685265 Mon Sep 17 00:00:00 2001 From: abbrowne126 <81702808+abbrowne126@users.noreply.github.com> Date: Tue, 22 Jul 2025 14:16:32 -0400 Subject: [PATCH 10/11] Update noxfile.py --- noxfile.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/noxfile.py b/noxfile.py index e1701ef1b..dd182f105 100644 --- a/noxfile.py +++ b/noxfile.py @@ -34,7 +34,7 @@ MYPY_VERSION = "mypy==1.10.0" -DEFAULT_PYTHON_VERSION = "3.13" +DEFAULT_PYTHON_VERSION = "3.8" UNIT_TEST_PYTHON_VERSIONS: List[str] = [ "3.7", From a3d308dc2ab5f621c393e8f761f7bd1b0aa8dcda Mon Sep 17 00:00:00 2001 From: abbrowne126 <81702808+abbrowne126@users.noreply.github.com> Date: Thu, 24 Jul 2025 16:34:49 -0400 Subject: [PATCH 11/11] Update streaming_pull_manager.py --- .../pubsub_v1/subscriber/_protocol/streaming_pull_manager.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index 618c5a6d0..1307f6b5d 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -1281,7 +1281,7 @@ def _on_fatal_exception(self, exception: BaseException) -> None: Called whenever `self.consumer` receives a non-retryable exception. We close the manager on such non-retryable cases. """ - _LOGGER.exception( + _LOGGER.info( "Streaming pull terminating after receiving non-recoverable error: %s", exception, ) @@ -1326,7 +1326,7 @@ def _should_terminate(self, exception: BaseException) -> bool: is_api_error = isinstance(exception, exceptions.GoogleAPICallError) # Terminate any non-API errors, or non-retryable errors (permission denied, unauthorized, etc.) if not is_api_error or isinstance(exception, _TERMINATING_STREAM_ERRORS): - _LOGGER.error("Observed terminating stream error %s", exception) + _LOGGER.debug("Observed terminating stream error %s", exception) return True _LOGGER.debug("Observed non-terminating stream error %s", exception) return False