diff --git a/google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py b/google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py index 88870be60..5a6abd21b 100644 --- a/google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py +++ b/google/cloud/pubsub_v1/open_telemetry/subscribe_opentelemetry.py @@ -160,7 +160,7 @@ def end_subscribe_scheduler_span(self) -> None: assert self._scheduler_span is not None self._scheduler_span.end() - def start_process_span(self) -> None: + def start_process_span(self) -> trace.Span: assert self._subscribe_span is not None tracer = trace.get_tracer(_OPEN_TELEMETRY_TRACER_NAME) publish_create_span_link: Optional[trace.Link] = None @@ -186,6 +186,7 @@ def start_process_span(self) -> None: end_on_exit=False, ) as process_span: self._process_span = process_span + return process_span def end_process_span(self) -> None: assert self._process_span is not None @@ -200,6 +201,13 @@ def add_process_span_event(self, event: str) -> None: }, ) + def __enter__(self) -> trace.Span: + return self.start_process_span() + + def __exit__(self, exc_type, exc_val, traceback): + if self._process_span: + self.end_process_span() + def start_modack_span( subscribe_span_links: List[trace.Link], diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index 486a728b4..1307f6b5d 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -148,8 +148,10 @@ def _wrap_callback_errors( try: if message.opentelemetry_data: message.opentelemetry_data.end_subscribe_concurrency_control_span() - message.opentelemetry_data.start_process_span() - callback(message) + with message.opentelemetry_data as otel_span: + callback(message) + else: + callback(message) except BaseException as exc: # Note: the likelihood of this failing is extremely low. This just adds # a message to a queue, so if this doesn't work the world is in an @@ -1279,7 +1281,7 @@ def _on_fatal_exception(self, exception: BaseException) -> None: Called whenever `self.consumer` receives a non-retryable exception. We close the manager on such non-retryable cases. """ - _LOGGER.exception( + _LOGGER.info( "Streaming pull terminating after receiving non-recoverable error: %s", exception, ) @@ -1324,7 +1326,7 @@ def _should_terminate(self, exception: BaseException) -> bool: is_api_error = isinstance(exception, exceptions.GoogleAPICallError) # Terminate any non-API errors, or non-retryable errors (permission denied, unauthorized, etc.) if not is_api_error or isinstance(exception, _TERMINATING_STREAM_ERRORS): - _LOGGER.error("Observed terminating stream error %s", exception) + _LOGGER.debug("Observed terminating stream error %s", exception) return True _LOGGER.debug("Observed non-terminating stream error %s", exception) return False diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index 86d2461e7..f45959637 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -2956,10 +2956,10 @@ def test_opentelemetry_subscriber_concurrency_control_span_end(span_exporter): streaming_pull_manager._wrap_callback_errors(mock.Mock(), mock.Mock(), msg) spans = span_exporter.get_finished_spans() - assert len(spans) == 1 + assert len(spans) == 2 concurrency_control_span = spans[0] - concurrency_control_span.name == "subscriber concurrency control" + assert concurrency_control_span.name == "subscriber concurrency control" def test_opentelemetry_wrap_callback_error(span_exporter):