From 0961efb54f1f585b86bac4de4641e7bd94509923 Mon Sep 17 00:00:00 2001 From: Tigran Najaryan <4194920+tigrannajaryan@users.noreply.github.com> Date: Tue, 2 Mar 2021 13:43:58 -0500 Subject: [PATCH] Fix Shutdown behavior for batchprocessor (#2537) I added a Shutdown() test that does basic verification of the behavior of the Shutdown() function. More verifications can be added later. The test revealed a bug in batchprocessor Shutdown() function which would not wait until all pending data was drained. --- component/componenttest/shutdown_verifier.go | 69 +++++++++++++++++++ processor/batchprocessor/batch_processor.go | 3 + .../batchprocessor/batch_processor_test.go | 5 ++ 3 files changed, 77 insertions(+) create mode 100644 component/componenttest/shutdown_verifier.go diff --git a/component/componenttest/shutdown_verifier.go b/component/componenttest/shutdown_verifier.go new file mode 100644 index 00000000000..5a6d97481a1 --- /dev/null +++ b/component/componenttest/shutdown_verifier.go @@ -0,0 +1,69 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package componenttest + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configerror" + "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/internal/testdata" +) + +func verifyTraceProcessorDoesntProduceAfterShutdown(t *testing.T, factory component.ProcessorFactory, cfg configmodels.Processor) { + // Create a processor and output its produce to a sink. + nextSink := new(consumertest.TracesSink) + processor, err := factory.CreateTracesProcessor( + context.Background(), + component.ProcessorCreateParams{Logger: zap.NewNop()}, + cfg, + nextSink, + ) + if err != nil { + if err == configerror.ErrDataTypeIsNotSupported { + return + } + require.NoError(t, err) + } + err = processor.Start(context.Background(), NewNopHost()) + assert.NoError(t, err) + + // Send some traces to the processor. + const generatedCount = 10 + for i := 0; i < generatedCount; i++ { + processor.ConsumeTraces(context.Background(), testdata.GenerateTraceDataOneSpan()) + } + + // Now shutdown the processor. + err = processor.Shutdown(context.Background()) + assert.NoError(t, err) + + // The Shutdown() is done. It means the processor must have sent everything we + // gave it to the next sink. + assert.EqualValues(t, generatedCount, nextSink.SpansCount()) +} + +func VerifyProcessorShutdown(t *testing.T, factory component.ProcessorFactory, cfg configmodels.Processor) { + verifyTraceProcessorDoesntProduceAfterShutdown(t, factory, cfg) + // TODO: add metrics and logs verification. + // TODO: add other shutdown verifications. +} diff --git a/processor/batchprocessor/batch_processor.go b/processor/batchprocessor/batch_processor.go index 66fbdcf53b8..4374f4837ed 100644 --- a/processor/batchprocessor/batch_processor.go +++ b/processor/batchprocessor/batch_processor.go @@ -108,6 +108,8 @@ func (bp *batchProcessor) Start(context.Context, component.Host) error { // Shutdown is invoked during service shutdown. func (bp *batchProcessor) Shutdown(context.Context) error { bp.cancel() + + // Wait until current batch is drained. <-bp.done return nil } @@ -132,6 +134,7 @@ func (bp *batchProcessor) startProcessingCycle() { // make it cancellable using the context that Shutdown gets as a parameter bp.sendItems(statTimeoutTriggerSend) } + // Indicate that we finished draining. close(bp.done) return case item := <-bp.newItem: diff --git a/processor/batchprocessor/batch_processor_test.go b/processor/batchprocessor/batch_processor_test.go index b21bbe3dff8..9a6b95ec3c7 100644 --- a/processor/batchprocessor/batch_processor_test.go +++ b/processor/batchprocessor/batch_processor_test.go @@ -689,3 +689,8 @@ func logsReceivedByName(lds []pdata.Logs) map[string]pdata.LogRecord { } return logsReceivedByName } + +func TestShutdown(t *testing.T) { + factory := NewFactory() + componenttest.VerifyProcessorShutdown(t, factory, factory.CreateDefaultConfig()) +}