forked from open-telemetry/opentelemetry-collector
-
Notifications
You must be signed in to change notification settings - Fork 0
/
processor.go
233 lines (199 loc) · 8.95 KB
/
processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package component // import "go.opentelemetry.io/collector/component"
import (
"context"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/consumer"
)
// Processor defines the common functions that must be implemented by TracesProcessor
// and MetricsProcessor.
type Processor interface {
Component
}
// TracesProcessor is a processor that can consume traces.
type TracesProcessor interface {
Processor
consumer.Traces
}
// MetricsProcessor is a processor that can consume metrics.
type MetricsProcessor interface {
Processor
consumer.Metrics
}
// LogsProcessor is a processor that can consume logs.
type LogsProcessor interface {
Processor
consumer.Logs
}
// ProcessorCreateSettings is passed to Create* functions in ProcessorFactory.
type ProcessorCreateSettings struct {
TelemetrySettings
// BuildInfo can be used by components for informational purposes
BuildInfo BuildInfo
}
// ProcessorFactory is Factory interface for processors.
//
// This interface cannot be directly implemented. Implementations must
// use the NewProcessorFactory to implement it.
type ProcessorFactory interface {
Factory
// CreateDefaultConfig creates the default configuration for the Processor.
// This method can be called multiple times depending on the pipeline
// configuration and should not cause side-effects that prevent the creation
// of multiple instances of the Processor.
// The object returned by this method needs to pass the checks implemented by
// 'configtest.CheckConfigStruct'. It is recommended to have these checks in the
// tests of any implementation of the Factory interface.
CreateDefaultConfig() config.Processor
// CreateTracesProcessor creates a trace processor based on this config.
// If the processor type does not support tracing or if the config is not valid,
// an error will be returned instead.
CreateTracesProcessor(
ctx context.Context,
set ProcessorCreateSettings,
cfg config.Processor,
nextConsumer consumer.Traces,
) (TracesProcessor, error)
// CreateMetricsProcessor creates a metrics processor based on this config.
// If the processor type does not support metrics or if the config is not valid,
// an error will be returned instead.
CreateMetricsProcessor(
ctx context.Context,
set ProcessorCreateSettings,
cfg config.Processor,
nextConsumer consumer.Metrics,
) (MetricsProcessor, error)
// CreateLogsProcessor creates a processor based on the config.
// If the processor type does not support logs or if the config is not valid,
// an error will be returned instead.
CreateLogsProcessor(
ctx context.Context,
set ProcessorCreateSettings,
cfg config.Processor,
nextConsumer consumer.Logs,
) (LogsProcessor, error)
}
// ProcessorCreateDefaultConfigFunc is the equivalent of ProcessorFactory.CreateDefaultConfig().
type ProcessorCreateDefaultConfigFunc func() config.Processor
// CreateDefaultConfig implements ProcessorFactory.CreateDefaultConfig().
func (f ProcessorCreateDefaultConfigFunc) CreateDefaultConfig() config.Processor {
return f()
}
// ProcessorFactoryOption apply changes to ProcessorOptions.
type ProcessorFactoryOption interface {
// applyProcessorFactoryOption applies the option.
applyProcessorFactoryOption(o *processorFactory)
}
var _ ProcessorFactoryOption = (*processorFactoryOptionFunc)(nil)
// processorFactoryOptionFunc is an ProcessorFactoryOption created through a function.
type processorFactoryOptionFunc func(*processorFactory)
func (f processorFactoryOptionFunc) applyProcessorFactoryOption(o *processorFactory) {
f(o)
}
// CreateTracesProcessorFunc is the equivalent of ProcessorFactory.CreateTracesProcessor().
type CreateTracesProcessorFunc func(context.Context, ProcessorCreateSettings, config.Processor, consumer.Traces) (TracesProcessor, error)
// CreateTracesProcessor implements ProcessorFactory.CreateTracesProcessor().
func (f CreateTracesProcessorFunc) CreateTracesProcessor(
ctx context.Context,
set ProcessorCreateSettings,
cfg config.Processor,
nextConsumer consumer.Traces) (TracesProcessor, error) {
if f == nil {
return nil, ErrDataTypeIsNotSupported
}
return f(ctx, set, cfg, nextConsumer)
}
// CreateMetricsProcessorFunc is the equivalent of ProcessorFactory.CreateMetricsProcessor().
type CreateMetricsProcessorFunc func(context.Context, ProcessorCreateSettings, config.Processor, consumer.Metrics) (MetricsProcessor, error)
// CreateMetricsProcessor implements ProcessorFactory.CreateMetricsProcessor().
func (f CreateMetricsProcessorFunc) CreateMetricsProcessor(
ctx context.Context,
set ProcessorCreateSettings,
cfg config.Processor,
nextConsumer consumer.Metrics,
) (MetricsProcessor, error) {
if f == nil {
return nil, ErrDataTypeIsNotSupported
}
return f(ctx, set, cfg, nextConsumer)
}
// CreateLogsProcessorFunc is the equivalent of ProcessorFactory.CreateLogsProcessor().
type CreateLogsProcessorFunc func(context.Context, ProcessorCreateSettings, config.Processor, consumer.Logs) (LogsProcessor, error)
// CreateLogsProcessor implements ProcessorFactory.CreateLogsProcessor().
func (f CreateLogsProcessorFunc) CreateLogsProcessor(
ctx context.Context,
set ProcessorCreateSettings,
cfg config.Processor,
nextConsumer consumer.Logs,
) (LogsProcessor, error) {
if f == nil {
return nil, ErrDataTypeIsNotSupported
}
return f(ctx, set, cfg, nextConsumer)
}
type processorFactory struct {
baseFactory
ProcessorCreateDefaultConfigFunc
CreateTracesProcessorFunc
CreateMetricsProcessorFunc
CreateLogsProcessorFunc
}
// WithTracesProcessor overrides the default "error not supported" implementation for CreateTracesProcessor.
// Deprecated: [v0.55.0] Use WithTracesProcessorAndStabilityLevel instead.
func WithTracesProcessor(createTracesProcessor CreateTracesProcessorFunc) ProcessorFactoryOption {
return WithTracesProcessorAndStabilityLevel(createTracesProcessor, StabilityLevelUndefined)
}
// WithTracesProcessorAndStabilityLevel overrides the default "error not supported" implementation for CreateTracesProcessor and the default "undefined" stability level.
func WithTracesProcessorAndStabilityLevel(createTracesProcessor CreateTracesProcessorFunc, sl StabilityLevel) ProcessorFactoryOption {
return processorFactoryOptionFunc(func(o *processorFactory) {
o.stability[config.TracesDataType] = sl
o.CreateTracesProcessorFunc = createTracesProcessor
})
}
// WithMetricsProcessor overrides the default "error not supported" implementation for CreateMetricsProcessor.
// Deprecated: [v0.55.0] Use WithMetricsProcessorAndStabilityLevel instead.
func WithMetricsProcessor(createMetricsProcessor CreateMetricsProcessorFunc) ProcessorFactoryOption {
return WithMetricsProcessorAndStabilityLevel(createMetricsProcessor, StabilityLevelUndefined)
}
// WithMetricsProcessorAndStabilityLevel overrides the default "error not supported" implementation for CreateMetricsProcessor and the default "undefined" stability level.
func WithMetricsProcessorAndStabilityLevel(createMetricsProcessor CreateMetricsProcessorFunc, sl StabilityLevel) ProcessorFactoryOption {
return processorFactoryOptionFunc(func(o *processorFactory) {
o.stability[config.MetricsDataType] = sl
o.CreateMetricsProcessorFunc = createMetricsProcessor
})
}
// WithLogsProcessor overrides the default "error not supported" implementation for CreateLogsProcessor.
// Deprecated: [v0.55.0] Use WithLogsProcessorAndStabilityLevel instead.
func WithLogsProcessor(createLogsProcessor CreateLogsProcessorFunc) ProcessorFactoryOption {
return WithLogsProcessorAndStabilityLevel(createLogsProcessor, StabilityLevelUndefined)
}
// WithLogsProcessorAndStabilityLevel overrides the default "error not supported" implementation for CreateLogsProcessor and the default "undefined" stability level.
func WithLogsProcessorAndStabilityLevel(createLogsProcessor CreateLogsProcessorFunc, sl StabilityLevel) ProcessorFactoryOption {
return processorFactoryOptionFunc(func(o *processorFactory) {
o.stability[config.LogsDataType] = sl
o.CreateLogsProcessorFunc = createLogsProcessor
})
}
// NewProcessorFactory returns a ProcessorFactory.
func NewProcessorFactory(cfgType config.Type, createDefaultConfig ProcessorCreateDefaultConfigFunc, options ...ProcessorFactoryOption) ProcessorFactory {
f := &processorFactory{
baseFactory: baseFactory{cfgType: cfgType, stability: make(map[config.DataType]StabilityLevel)},
ProcessorCreateDefaultConfigFunc: createDefaultConfig,
}
for _, opt := range options {
opt.applyProcessorFactoryOption(f)
}
return f
}