Skip to content

Commit

Permalink
[extension/zpages] Register span processor within zpages extension co…
Browse files Browse the repository at this point in the history
…de (open-telemetry#5294)

* [extension/zpages] Register span processor within zpages extension code

* Improve zpages test to test RegisterSpanProcessor method

* Add comment about sampler

* `make fmt`

* Solve `make lint` issues (fingers crossed)

* Address review comments
  • Loading branch information
mx-psi authored May 12, 2022
1 parent 40443ec commit 85c26ea
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 50 deletions.
2 changes: 1 addition & 1 deletion extension/zpagesextension/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,5 @@ func createDefaultConfig() config.Extension {

// createExtension creates the extension based on this config.
func createExtension(_ context.Context, set component.ExtensionCreateSettings, cfg config.Extension) (component.Extension, error) {
return newServer(cfg.(*Config), set.Logger), nil
return newServer(cfg.(*Config), set.TelemetrySettings), nil
}
62 changes: 52 additions & 10 deletions extension/zpagesextension/zpagesextension.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,62 @@ package zpagesextension // import "go.opentelemetry.io/collector/extension/zpage
import (
"context"
"net/http"
"path"

"go.opentelemetry.io/contrib/zpages"
"go.opentelemetry.io/otel/sdk/trace"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
)

const (
tracezPath = "tracez"
)

type zpagesExtension struct {
config *Config
logger *zap.Logger
server http.Server
stopCh chan struct{}
config *Config
telemetry component.TelemetrySettings
zpagesSpanProcessor *zpages.SpanProcessor
server http.Server
stopCh chan struct{}
}

// registerableTracerProvider is a tracer that supports
// the SDK methods RegisterSpanProcessor and UnregisterSpanProcessor.
//
// We use an interface instead of casting to the SDK tracer type to support tracer providers
// that extend the SDK.
type registerableTracerProvider interface {
// RegisterSpanProcessor adds the given SpanProcessor to the list of SpanProcessors.
// https://pkg.go.dev/go.opentelemetry.io/otel/sdk/trace#TracerProvider.RegisterSpanProcessor.
RegisterSpanProcessor(SpanProcessor trace.SpanProcessor)

// UnregisterSpanProcessor removes the given SpanProcessor from the list of SpanProcessors.
// https://pkg.go.dev/go.opentelemetry.io/otel/sdk/trace#TracerProvider.UnregisterSpanProcessor.
UnregisterSpanProcessor(SpanProcessor trace.SpanProcessor)
}

func (zpe *zpagesExtension) Start(_ context.Context, host component.Host) error {
zPagesMux := http.NewServeMux()

sdktracer, ok := zpe.telemetry.TracerProvider.(registerableTracerProvider)
if ok {
sdktracer.RegisterSpanProcessor(zpe.zpagesSpanProcessor)
zPagesMux.Handle(path.Join("/debug", tracezPath), zpages.NewTracezHandler(zpe.zpagesSpanProcessor))
zpe.telemetry.Logger.Info("Registered zPages span processor on tracer provider")
} else {
zpe.telemetry.Logger.Warn("zPages span processor registration is not available")
}

hostZPages, ok := host.(interface {
RegisterZPages(mux *http.ServeMux, pathPrefix string)
})
if ok {
zpe.logger.Info("Register Host's zPages")
hostZPages.RegisterZPages(zPagesMux, "/debug")
zpe.telemetry.Logger.Info("Registered Host's zPages")
} else {
zpe.logger.Info("Host's zPages not available")
zpe.telemetry.Logger.Warn("Host's zPages not available")
}

// Start the listener here so we can have earlier failure if port is
Expand All @@ -50,7 +82,7 @@ func (zpe *zpagesExtension) Start(_ context.Context, host component.Host) error
return err
}

zpe.logger.Info("Starting zPages extension", zap.Any("config", zpe.config))
zpe.telemetry.Logger.Info("Starting zPages extension", zap.Any("config", zpe.config))
zpe.server = http.Server{Handler: zPagesMux}
zpe.stopCh = make(chan struct{})
go func() {
Expand All @@ -69,12 +101,22 @@ func (zpe *zpagesExtension) Shutdown(context.Context) error {
if zpe.stopCh != nil {
<-zpe.stopCh
}

sdktracer, ok := zpe.telemetry.TracerProvider.(registerableTracerProvider)
if ok {
sdktracer.UnregisterSpanProcessor(zpe.zpagesSpanProcessor)
zpe.telemetry.Logger.Info("Unregistered zPages span processor on tracer provider")
} else {
zpe.telemetry.Logger.Warn("zPages span processor registration is not available")
}

return err
}

func newServer(config *Config, logger *zap.Logger) *zpagesExtension {
func newServer(config *Config, telemetry component.TelemetrySettings) *zpagesExtension {
return &zpagesExtension{
config: config,
logger: logger,
config: config,
telemetry: telemetry,
zpagesSpanProcessor: zpages.NewSpanProcessor(),
}
}
34 changes: 23 additions & 11 deletions extension/zpagesextension/zpagesextension_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ import (
"context"
"net"
"net/http"
"path"
"runtime"
"testing"

"github.com/stretchr/testify/require"
"go.uber.org/zap"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
Expand All @@ -39,10 +39,22 @@ func newZPagesHost() *zpagesHost {
return &zpagesHost{Host: componenttest.NewNopHost()}
}

func (*zpagesHost) RegisterZPages(mux *http.ServeMux, pathPrefix string) {
mux.HandleFunc(path.Join(pathPrefix, "tracez"), func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
})
func (*zpagesHost) RegisterZPages(mux *http.ServeMux, pathPrefix string) {}

var _ registerableTracerProvider = (*registerableProvider)(nil)
var _ registerableTracerProvider = sdktrace.NewTracerProvider()

type registerableProvider struct {
trace.TracerProvider
}

func (*registerableProvider) RegisterSpanProcessor(sdktrace.SpanProcessor) {}
func (*registerableProvider) UnregisterSpanProcessor(sdktrace.SpanProcessor) {}

func newZpagesTelemetrySettings() component.TelemetrySettings {
set := componenttest.NewNopTelemetrySettings()
set.TracerProvider = &registerableProvider{set.TracerProvider}
return set
}

func TestZPagesExtensionUsage(t *testing.T) {
Expand All @@ -52,7 +64,7 @@ func TestZPagesExtensionUsage(t *testing.T) {
},
}

zpagesExt := newServer(cfg, zap.NewNop())
zpagesExt := newServer(cfg, newZpagesTelemetrySettings())
require.NotNil(t, zpagesExt)

require.NoError(t, zpagesExt.Start(context.Background(), newZPagesHost()))
Expand Down Expand Up @@ -83,7 +95,7 @@ func TestZPagesExtensionPortAlreadyInUse(t *testing.T) {
Endpoint: endpoint,
},
}
zpagesExt := newServer(cfg, zap.NewNop())
zpagesExt := newServer(cfg, newZpagesTelemetrySettings())
require.NotNil(t, zpagesExt)

require.Error(t, zpagesExt.Start(context.Background(), componenttest.NewNopHost()))
Expand All @@ -96,7 +108,7 @@ func TestZPagesMultipleStarts(t *testing.T) {
},
}

zpagesExt := newServer(cfg, zap.NewNop())
zpagesExt := newServer(cfg, newZpagesTelemetrySettings())
require.NotNil(t, zpagesExt)

require.NoError(t, zpagesExt.Start(context.Background(), componenttest.NewNopHost()))
Expand All @@ -113,7 +125,7 @@ func TestZPagesMultipleShutdowns(t *testing.T) {
},
}

zpagesExt := newServer(cfg, zap.NewNop())
zpagesExt := newServer(cfg, newZpagesTelemetrySettings())
require.NotNil(t, zpagesExt)

require.NoError(t, zpagesExt.Start(context.Background(), componenttest.NewNopHost()))
Expand All @@ -128,7 +140,7 @@ func TestZPagesShutdownWithoutStart(t *testing.T) {
},
}

zpagesExt := newServer(cfg, zap.NewNop())
zpagesExt := newServer(cfg, newZpagesTelemetrySettings())
require.NotNil(t, zpagesExt)

require.NoError(t, zpagesExt.Shutdown(context.Background()))
Expand Down
21 changes: 9 additions & 12 deletions service/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"runtime"
"syscall"

"go.opentelemetry.io/contrib/zpages"
"go.opentelemetry.io/otel/metric/nonrecording"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
Expand Down Expand Up @@ -76,9 +75,8 @@ func (s State) String() string {

// Collector represents a server providing the OpenTelemetry Collector service.
type Collector struct {
set CollectorSettings
telemetry component.TelemetrySettings
zPagesSpanProcessor *zpages.SpanProcessor
set CollectorSettings
telemetry component.TelemetrySettings

service *service
state *atomic.Int32
Expand Down Expand Up @@ -210,12 +208,11 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error {
}

col.service, err = newService(&svcSettings{
BuildInfo: col.set.BuildInfo,
Factories: col.set.Factories,
Config: cfg,
Telemetry: col.telemetry,
ZPagesSpanProcessor: col.zPagesSpanProcessor,
AsyncErrorChannel: col.asyncErrorChannel,
BuildInfo: col.set.BuildInfo,
Factories: col.set.Factories,
Config: cfg,
Telemetry: col.telemetry,
AsyncErrorChannel: col.asyncErrorChannel,
})
if err != nil {
return err
Expand All @@ -238,10 +235,10 @@ func (col *Collector) setupConfigurationComponents(ctx context.Context) error {
// Run starts the collector according to the given configuration given, and waits for it to complete.
// Consecutive calls to Run are not allowed, Run shouldn't be called once a collector is shut down.
func (col *Collector) Run(ctx context.Context) error {
col.zPagesSpanProcessor = zpages.NewSpanProcessor()
col.telemetry.TracerProvider = sdktrace.NewTracerProvider(
// needed for supporting the zpages extension
sdktrace.WithSampler(internal.AlwaysRecord()),
sdktrace.WithSpanProcessor(col.zPagesSpanProcessor))
)

if err := col.setupConfigurationComponents(ctx); err != nil {
col.setCollectorState(Closed)
Expand Down
7 changes: 2 additions & 5 deletions service/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
package service // import "go.opentelemetry.io/collector/service"

import (
"go.opentelemetry.io/contrib/zpages"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/service/internal/builder"
Expand All @@ -26,9 +24,8 @@ import (
var _ component.Host = (*serviceHost)(nil)

type serviceHost struct {
asyncErrorChannel chan error
factories component.Factories
zPagesSpanProcessor *zpages.SpanProcessor
asyncErrorChannel chan error
factories component.Factories

builtExporters builder.Exporters
builtReceivers builder.Receivers
Expand Down
5 changes: 2 additions & 3 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,8 @@ func newService(set *svcSettings) (*service, error) {
config: set.Config,
telemetry: set.Telemetry,
host: &serviceHost{
factories: set.Factories,
zPagesSpanProcessor: set.ZPagesSpanProcessor,
asyncErrorChannel: set.AsyncErrorChannel,
factories: set.Factories,
asyncErrorChannel: set.AsyncErrorChannel,
},
}

Expand Down
4 changes: 0 additions & 4 deletions service/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package service // import "go.opentelemetry.io/collector/service"

import (
"go.opentelemetry.io/contrib/zpages"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
Expand All @@ -36,9 +35,6 @@ type svcSettings struct {
// Telemetry represents the service configured telemetry for all the components.
Telemetry component.TelemetrySettings

// ZPagesSpanProcessor represents the SpanProcessor for tracez page.
ZPagesSpanProcessor *zpages.SpanProcessor

// AsyncErrorChannel is the channel that is used to report fatal errors.
AsyncErrorChannel chan error
}
Expand Down
4 changes: 0 additions & 4 deletions service/zpages.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,13 @@ import (
"path"
"sort"

otelzpages "go.opentelemetry.io/contrib/zpages"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/internal/version"
"go.opentelemetry.io/collector/service/featuregate"
"go.opentelemetry.io/collector/service/internal/zpages"
)

const (
tracezPath = "tracez"
servicezPath = "servicez"
pipelinezPath = "pipelinez"
extensionzPath = "extensionz"
Expand All @@ -41,7 +38,6 @@ const (
)

func (host *serviceHost) RegisterZPages(mux *http.ServeMux, pathPrefix string) {
mux.Handle(path.Join(pathPrefix, tracezPath), otelzpages.NewTracezHandler(host.zPagesSpanProcessor))
mux.HandleFunc(path.Join(pathPrefix, servicezPath), host.handleServicezRequest)
mux.HandleFunc(path.Join(pathPrefix, pipelinezPath), host.handlePipelinezRequest)
mux.HandleFunc(path.Join(pathPrefix, featurezPath), handleFeaturezRequest)
Expand Down

0 comments on commit 85c26ea

Please sign in to comment.