Skip to content

Commit

Permalink
[service] Split component.Host functionality into separate `service…
Browse files Browse the repository at this point in the history
…Host` struct (#5292)

* [service] Split `component.Host` functionality into separate `service.Host`

* [service] Move `serviceHost` to its own file
  • Loading branch information
mx-psi authored Apr 29, 2022
1 parent 8255219 commit 041f398
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 85 deletions.
2 changes: 1 addition & 1 deletion service/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func TestCollectorReportError(t *testing.T) {
return Running == col.GetState()
}, 2*time.Second, 200*time.Millisecond)

col.service.ReportFatalError(errors.New("err2"))
col.service.host.ReportFatalError(errors.New("err2"))

wg.Wait()
assert.Equal(t, Closed, col.GetState())
Expand Down
66 changes: 66 additions & 0 deletions service/host.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package service // import "go.opentelemetry.io/collector/service"

import (
"go.opentelemetry.io/contrib/zpages"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
"go.opentelemetry.io/collector/service/internal/builder"
"go.opentelemetry.io/collector/service/internal/extensions"
)

var _ component.Host = (*serviceHost)(nil)

type serviceHost struct {
asyncErrorChannel chan error
factories component.Factories
zPagesSpanProcessor *zpages.SpanProcessor

builtExporters builder.Exporters
builtReceivers builder.Receivers
builtPipelines builder.BuiltPipelines
builtExtensions extensions.Extensions
}

// ReportFatalError is used to report to the host that the receiver encountered
// a fatal error (i.e.: an error that the instance can't recover from) after
// its start function has already returned.
func (host *serviceHost) ReportFatalError(err error) {
host.asyncErrorChannel <- err
}

func (host *serviceHost) GetFactory(kind component.Kind, componentType config.Type) component.Factory {
switch kind {
case component.KindReceiver:
return host.factories.Receivers[componentType]
case component.KindProcessor:
return host.factories.Processors[componentType]
case component.KindExporter:
return host.factories.Exporters[componentType]
case component.KindExtension:
return host.factories.Extensions[componentType]
}
return nil
}

func (host *serviceHost) GetExtensions() map[config.ComponentID]component.Extension {
return host.builtExtensions.ToMap()
}

func (host *serviceHost) GetExporters() map[config.DataType]map[config.ComponentID]component.Exporter {
return host.builtExporters.ToMapByDataType()
}
87 changes: 26 additions & 61 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"fmt"

"go.opentelemetry.io/contrib/zpages"
"go.uber.org/multierr"

"go.opentelemetry.io/collector/component"
Expand All @@ -29,49 +28,44 @@ import (

// service represents the implementation of a component.Host.
type service struct {
factories component.Factories
buildInfo component.BuildInfo
config *config.Config
telemetry component.TelemetrySettings
zPagesSpanProcessor *zpages.SpanProcessor
asyncErrorChannel chan error

builtExporters builder.Exporters
builtReceivers builder.Receivers
builtPipelines builder.BuiltPipelines
builtExtensions extensions.Extensions
buildInfo component.BuildInfo
config *config.Config
telemetry component.TelemetrySettings
host *serviceHost
}

func newService(set *svcSettings) (*service, error) {
srv := &service{
factories: set.Factories,
buildInfo: set.BuildInfo,
config: set.Config,
telemetry: set.Telemetry,
zPagesSpanProcessor: set.ZPagesSpanProcessor,
asyncErrorChannel: set.AsyncErrorChannel,
buildInfo: set.BuildInfo,
config: set.Config,
telemetry: set.Telemetry,
host: &serviceHost{
factories: set.Factories,
zPagesSpanProcessor: set.ZPagesSpanProcessor,
asyncErrorChannel: set.AsyncErrorChannel,
},
}

var err error
if srv.builtExtensions, err = extensions.Build(srv.telemetry, srv.buildInfo, srv.config, srv.factories.Extensions); err != nil {
if srv.host.builtExtensions, err = extensions.Build(srv.telemetry, srv.buildInfo, srv.config, srv.host.factories.Extensions); err != nil {
return nil, fmt.Errorf("cannot build extensions: %w", err)
}

// Pipeline is built backwards, starting from exporters, so that we create objects
// which are referenced before objects which reference them.

// First create exporters.
if srv.builtExporters, err = builder.BuildExporters(srv.telemetry, srv.buildInfo, srv.config, srv.factories.Exporters); err != nil {
if srv.host.builtExporters, err = builder.BuildExporters(srv.telemetry, srv.buildInfo, srv.config, srv.host.factories.Exporters); err != nil {
return nil, fmt.Errorf("cannot build exporters: %w", err)
}

// Create pipelines and their processors and plug exporters to the end of the pipelines.
if srv.builtPipelines, err = builder.BuildPipelines(srv.telemetry, srv.buildInfo, srv.config, srv.builtExporters, srv.factories.Processors); err != nil {
if srv.host.builtPipelines, err = builder.BuildPipelines(srv.telemetry, srv.buildInfo, srv.config, srv.host.builtExporters, srv.host.factories.Processors); err != nil {
return nil, fmt.Errorf("cannot build pipelines: %w", err)
}

// Create receivers and plug them into the start of the pipelines.
if srv.builtReceivers, err = builder.BuildReceivers(srv.telemetry, srv.buildInfo, srv.config, srv.builtPipelines, srv.factories.Receivers); err != nil {
if srv.host.builtReceivers, err = builder.BuildReceivers(srv.telemetry, srv.buildInfo, srv.config, srv.host.builtPipelines, srv.host.factories.Receivers); err != nil {
return nil, fmt.Errorf("cannot build receivers: %w", err)
}

Expand All @@ -80,33 +74,33 @@ func newService(set *svcSettings) (*service, error) {

func (srv *service) Start(ctx context.Context) error {
srv.telemetry.Logger.Info("Starting extensions...")
if err := srv.builtExtensions.StartAll(ctx, srv); err != nil {
if err := srv.host.builtExtensions.StartAll(ctx, srv.host); err != nil {
return fmt.Errorf("failed to start extensions: %w", err)
}

srv.telemetry.Logger.Info("Starting exporters...")
if err := srv.builtExporters.StartAll(ctx, srv); err != nil {
if err := srv.host.builtExporters.StartAll(ctx, srv.host); err != nil {
return fmt.Errorf("cannot start exporters: %w", err)
}

srv.telemetry.Logger.Info("Starting processors...")
if err := srv.builtPipelines.StartProcessors(ctx, srv); err != nil {
if err := srv.host.builtPipelines.StartProcessors(ctx, srv.host); err != nil {
return fmt.Errorf("cannot start processors: %w", err)
}

srv.telemetry.Logger.Info("Starting receivers...")
if err := srv.builtReceivers.StartAll(ctx, srv); err != nil {
if err := srv.host.builtReceivers.StartAll(ctx, srv.host); err != nil {
return fmt.Errorf("cannot start receivers: %w", err)
}

return srv.builtExtensions.NotifyPipelineReady()
return srv.host.builtExtensions.NotifyPipelineReady()
}

func (srv *service) Shutdown(ctx context.Context) error {
// Accumulate errors and proceed with shutting down remaining components.
var errs error

if err := srv.builtExtensions.NotifyPipelineNotReady(); err != nil {
if err := srv.host.builtExtensions.NotifyPipelineNotReady(); err != nil {
errs = multierr.Append(errs, fmt.Errorf("failed to notify that pipeline is not ready: %w", err))
}

Expand All @@ -115,53 +109,24 @@ func (srv *service) Shutdown(ctx context.Context) error {
// time should be part of configuration.

srv.telemetry.Logger.Info("Stopping receivers...")
if err := srv.builtReceivers.ShutdownAll(ctx); err != nil {
if err := srv.host.builtReceivers.ShutdownAll(ctx); err != nil {
errs = multierr.Append(errs, fmt.Errorf("failed to shutdown receivers: %w", err))
}

srv.telemetry.Logger.Info("Stopping processors...")
if err := srv.builtPipelines.ShutdownProcessors(ctx); err != nil {
if err := srv.host.builtPipelines.ShutdownProcessors(ctx); err != nil {
errs = multierr.Append(errs, fmt.Errorf("failed to shutdown processors: %w", err))
}

srv.telemetry.Logger.Info("Stopping exporters...")
if err := srv.builtExporters.ShutdownAll(ctx); err != nil {
if err := srv.host.builtExporters.ShutdownAll(ctx); err != nil {
errs = multierr.Append(errs, fmt.Errorf("failed to shutdown exporters: %w", err))
}

srv.telemetry.Logger.Info("Stopping extensions...")
if err := srv.builtExtensions.ShutdownAll(ctx); err != nil {
if err := srv.host.builtExtensions.ShutdownAll(ctx); err != nil {
errs = multierr.Append(errs, fmt.Errorf("failed to shutdown extensions: %w", err))
}

return errs
}

// ReportFatalError is used to report to the host that the receiver encountered
// a fatal error (i.e.: an error that the instance can't recover from) after
// its start function has already returned.
func (srv *service) ReportFatalError(err error) {
srv.asyncErrorChannel <- err
}

func (srv *service) GetFactory(kind component.Kind, componentType config.Type) component.Factory {
switch kind {
case component.KindReceiver:
return srv.factories.Receivers[componentType]
case component.KindProcessor:
return srv.factories.Processors[componentType]
case component.KindExporter:
return srv.factories.Exporters[componentType]
case component.KindExtension:
return srv.factories.Extensions[componentType]
}
return nil
}

func (srv *service) GetExtensions() map[config.ComponentID]component.Extension {
return srv.builtExtensions.ToMap()
}

func (srv *service) GetExporters() map[config.DataType]map[config.ComponentID]component.Exporter {
return srv.builtExporters.ToMapByDataType()
}
22 changes: 11 additions & 11 deletions service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,20 +38,20 @@ func TestService_GetFactory(t *testing.T) {
assert.NoError(t, srv.Shutdown(context.Background()))
})

assert.Nil(t, srv.GetFactory(component.KindReceiver, "wrongtype"))
assert.Equal(t, factories.Receivers["nop"], srv.GetFactory(component.KindReceiver, "nop"))
assert.Nil(t, srv.host.GetFactory(component.KindReceiver, "wrongtype"))
assert.Equal(t, factories.Receivers["nop"], srv.host.GetFactory(component.KindReceiver, "nop"))

assert.Nil(t, srv.GetFactory(component.KindProcessor, "wrongtype"))
assert.Equal(t, factories.Processors["nop"], srv.GetFactory(component.KindProcessor, "nop"))
assert.Nil(t, srv.host.GetFactory(component.KindProcessor, "wrongtype"))
assert.Equal(t, factories.Processors["nop"], srv.host.GetFactory(component.KindProcessor, "nop"))

assert.Nil(t, srv.GetFactory(component.KindExporter, "wrongtype"))
assert.Equal(t, factories.Exporters["nop"], srv.GetFactory(component.KindExporter, "nop"))
assert.Nil(t, srv.host.GetFactory(component.KindExporter, "wrongtype"))
assert.Equal(t, factories.Exporters["nop"], srv.host.GetFactory(component.KindExporter, "nop"))

assert.Nil(t, srv.GetFactory(component.KindExtension, "wrongtype"))
assert.Equal(t, factories.Extensions["nop"], srv.GetFactory(component.KindExtension, "nop"))
assert.Nil(t, srv.host.GetFactory(component.KindExtension, "wrongtype"))
assert.Equal(t, factories.Extensions["nop"], srv.host.GetFactory(component.KindExtension, "nop"))

// Try retrieve non existing component.Kind.
assert.Nil(t, srv.GetFactory(42, "nop"))
assert.Nil(t, srv.host.GetFactory(42, "nop"))
}

func TestService_GetExtensions(t *testing.T) {
Expand All @@ -64,7 +64,7 @@ func TestService_GetExtensions(t *testing.T) {
assert.NoError(t, srv.Shutdown(context.Background()))
})

extMap := srv.GetExtensions()
extMap := srv.host.GetExtensions()

assert.Len(t, extMap, 1)
assert.Contains(t, extMap, config.NewComponentID("nop"))
Expand All @@ -80,7 +80,7 @@ func TestService_GetExporters(t *testing.T) {
assert.NoError(t, srv.Shutdown(context.Background()))
})

expMap := srv.GetExporters()
expMap := srv.host.GetExporters()
assert.Len(t, expMap, 3)
assert.Len(t, expMap[config.TracesDataType], 1)
assert.Contains(t, expMap[config.TracesDataType], config.NewComponentID("nop"))
Expand Down
2 changes: 1 addition & 1 deletion service/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func (tel *colTelemetry) initOnce(col *Collector) error {
}

func (tel *colTelemetry) initOpenCensus(col *Collector, instanceID string) (http.Handler, error) {
processMetricsViews, err := telemetry2.NewProcessMetricsViews(getBallastSize(col.service))
processMetricsViews, err := telemetry2.NewProcessMetricsViews(getBallastSize(col.service.host))
if err != nil {
return nil, err
}
Expand Down
22 changes: 11 additions & 11 deletions service/zpages.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,17 @@ const (
zExtensionName = "zextensionname"
)

func (srv *service) RegisterZPages(mux *http.ServeMux, pathPrefix string) {
mux.Handle(path.Join(pathPrefix, tracezPath), otelzpages.NewTracezHandler(srv.zPagesSpanProcessor))
mux.HandleFunc(path.Join(pathPrefix, servicezPath), srv.handleServicezRequest)
mux.HandleFunc(path.Join(pathPrefix, pipelinezPath), srv.handlePipelinezRequest)
func (host *serviceHost) RegisterZPages(mux *http.ServeMux, pathPrefix string) {
mux.Handle(path.Join(pathPrefix, tracezPath), otelzpages.NewTracezHandler(host.zPagesSpanProcessor))
mux.HandleFunc(path.Join(pathPrefix, servicezPath), host.handleServicezRequest)
mux.HandleFunc(path.Join(pathPrefix, pipelinezPath), host.handlePipelinezRequest)
mux.HandleFunc(path.Join(pathPrefix, featurezPath), handleFeaturezRequest)
mux.HandleFunc(path.Join(pathPrefix, extensionzPath), func(w http.ResponseWriter, r *http.Request) {
handleExtensionzRequest(srv, w, r)
handleExtensionzRequest(host, w, r)
})
}

func (srv *service) handleServicezRequest(w http.ResponseWriter, r *http.Request) {
func (host *serviceHost) handleServicezRequest(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/html; charset=utf-8")
zpages.WriteHTMLPageHeader(w, zpages.HeaderData{Title: "service"})
zpages.WriteHTMLComponentHeader(w, zpages.ComponentHeaderData{
Expand All @@ -72,15 +72,15 @@ func (srv *service) handleServicezRequest(w http.ResponseWriter, r *http.Request
zpages.WriteHTMLPageFooter(w)
}

func (srv *service) handlePipelinezRequest(w http.ResponseWriter, r *http.Request) {
func (host *serviceHost) handlePipelinezRequest(w http.ResponseWriter, r *http.Request) {
qValues := r.URL.Query()
pipelineName := qValues.Get(zPipelineName)
componentName := qValues.Get(zComponentName)
componentKind := qValues.Get(zComponentKind)

w.Header().Set("Content-Type", "text/html; charset=utf-8")
zpages.WriteHTMLPageHeader(w, zpages.HeaderData{Title: "Pipelines"})
zpages.WriteHTMLPipelinesSummaryTable(w, srv.getPipelinesSummaryTableData())
zpages.WriteHTMLPipelinesSummaryTable(w, host.getPipelinesSummaryTableData())
if pipelineName != "" && componentName != "" && componentKind != "" {
fullName := componentName
if componentKind == "processor" {
Expand All @@ -94,11 +94,11 @@ func (srv *service) handlePipelinezRequest(w http.ResponseWriter, r *http.Reques
zpages.WriteHTMLPageFooter(w)
}

func (srv *service) getPipelinesSummaryTableData() zpages.SummaryPipelinesTableData {
func (host *serviceHost) getPipelinesSummaryTableData() zpages.SummaryPipelinesTableData {
data := zpages.SummaryPipelinesTableData{}

data.Rows = make([]zpages.SummaryPipelinesTableRowData, 0, len(srv.builtPipelines))
for c, p := range srv.builtPipelines {
data.Rows = make([]zpages.SummaryPipelinesTableRowData, 0, len(host.builtPipelines))
for c, p := range host.builtPipelines {
// TODO: Change the template to use ID.
var recvs []string
for _, recvID := range p.Config.Receivers {
Expand Down

0 comments on commit 041f398

Please sign in to comment.