Skip to content

Commit

Permalink
Remove Receive from obsreport.Receiver funcs (open-telemetry#3326)
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored May 27, 2021
1 parent 6e1137c commit 474c0ed
Show file tree
Hide file tree
Showing 14 changed files with 82 additions and 112 deletions.
16 changes: 11 additions & 5 deletions obsreport/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,26 @@
// Receivers should use the respective start and end according to the data type
// being received, ie.:
//
// * TraceData receive operations should use the pair:
// StartTraceDataReceiveOp/EndTraceDataReceiveOp
// * Traces receive operations should use the pair:
// StartTracesOp/EndTracesOp
//
// * Metrics receive operations should use the pair:
// StartMetricsReceiveOp/EndMetricsReceiveOp
// StartMetricsOp/EndMetricsOp
//
// * Logs receive operations should use the pair:
// StartLogsOp/EndLogsOp
//
// Similar for exporters:
//
// * TraceData export operations should use the pair:
// StartTraceDataExportOp/EndTraceDataExportOp
// * Traces export operations should use the pair:
// StartTracesExportOp/EndTracesExportOp
//
// * Metrics export operations should use the pair:
// StartMetricsExportOp/EndMetricsExportOp
//
// * Metrics export operations should use the pair:
// StartLogsExportOp/EndLogsExportOp
//
// The package is capable of generating legacy metrics by using the
// observability package allowing a controlled transition from legacy to the
// new metrics. The goal is to eventually remove the legacy metrics and use only
Expand Down
92 changes: 28 additions & 64 deletions obsreport/obsreport_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type StartReceiveOption func(*StartReceiveOptions)
// for {
// // Since the context outlives the individual receive operations call obsreport using
// // WithLongLivedCtx().
// ctx := obsreport.StartTraceDataReceiveOp(
// ctx := obsreport.StartTracesOp(
// longLivedCtx,
// r.config.Name(),
// r.transport,
Expand All @@ -62,7 +62,7 @@ type StartReceiveOption func(*StartReceiveOptions)
// if ok {
// err = r.nextConsumer.ConsumeTraces(ctx, td)
// }
// obsreport.EndTraceDataReceiveOp(
// obsreport.EndTracesOp(
// ctx,
// r.format,
// len(td.Spans),
Expand Down Expand Up @@ -99,94 +99,58 @@ func NewReceiver(cfg ReceiverSettings) *Receiver {
}
}

// StartTraceDataReceiveOp is called when a request is received from a client.
// StartTracesOp is called when a request is received from a client.
// The returned context should be used in other calls to the obsreport functions
// dealing with the same receive operation.
func (rec *Receiver) StartTraceDataReceiveOp(
operationCtx context.Context,
opt ...StartReceiveOption,
) context.Context {
return rec.traceReceiveOp(
operationCtx,
obsmetrics.ReceiveTraceDataOperationSuffix,
opt...)
func (rec *Receiver) StartTracesOp(operationCtx context.Context, opt ...StartReceiveOption) context.Context {
return rec.startOp(operationCtx, obsmetrics.ReceiveTraceDataOperationSuffix, opt...)
}

// EndTraceDataReceiveOp completes the receive operation that was started with
// StartTraceDataReceiveOp.
func (rec *Receiver) EndTraceDataReceiveOp(
// EndTracesOp completes the receive operation that was started with
// StartTracesOp.
func (rec *Receiver) EndTracesOp(
receiverCtx context.Context,
format string,
numReceivedSpans int,
err error,
) {
rec.endReceiveOp(
receiverCtx,
format,
numReceivedSpans,
err,
config.TracesDataType,
)
rec.endOp(receiverCtx, format, numReceivedSpans, err, config.TracesDataType)
}

// StartLogsReceiveOp is called when a request is received from a client.
// StartLogsOp is called when a request is received from a client.
// The returned context should be used in other calls to the obsreport functions
// dealing with the same receive operation.
func (rec *Receiver) StartLogsReceiveOp(
operationCtx context.Context,
opt ...StartReceiveOption,
) context.Context {
return rec.traceReceiveOp(
operationCtx,
obsmetrics.ReceiverLogsOperationSuffix,
opt...)
func (rec *Receiver) StartLogsOp(operationCtx context.Context, opt ...StartReceiveOption) context.Context {
return rec.startOp(operationCtx, obsmetrics.ReceiverLogsOperationSuffix, opt...)
}

// EndLogsReceiveOp completes the receive operation that was started with
// StartLogsReceiveOp.
func (rec *Receiver) EndLogsReceiveOp(
// EndLogsOp completes the receive operation that was started with
// StartLogsOp.
func (rec *Receiver) EndLogsOp(
receiverCtx context.Context,
format string,
numReceivedLogRecords int,
err error,
) {
rec.endReceiveOp(
receiverCtx,
format,
numReceivedLogRecords,
err,
config.LogsDataType,
)
rec.endOp(receiverCtx, format, numReceivedLogRecords, err, config.LogsDataType)
}

// StartMetricsReceiveOp is called when a request is received from a client.
// StartMetricsOp is called when a request is received from a client.
// The returned context should be used in other calls to the obsreport functions
// dealing with the same receive operation.
func (rec *Receiver) StartMetricsReceiveOp(
operationCtx context.Context,
opt ...StartReceiveOption,
) context.Context {
return rec.traceReceiveOp(
operationCtx,
obsmetrics.ReceiverMetricsOperationSuffix,
opt...)
func (rec *Receiver) StartMetricsOp(operationCtx context.Context, opt ...StartReceiveOption) context.Context {
return rec.startOp(operationCtx, obsmetrics.ReceiverMetricsOperationSuffix, opt...)
}

// EndMetricsReceiveOp completes the receive operation that was started with
// StartMetricsReceiveOp.
func (rec *Receiver) EndMetricsReceiveOp(
// EndMetricsOp completes the receive operation that was started with
// StartMetricsOp.
func (rec *Receiver) EndMetricsOp(
receiverCtx context.Context,
format string,
numReceivedPoints int,
err error,
) {
rec.endReceiveOp(
receiverCtx,
format,
numReceivedPoints,
err,
config.MetricsDataType,
)
rec.endOp(receiverCtx, format, numReceivedPoints, err, config.MetricsDataType)
}

// ReceiverContext adds the keys used when recording observability metrics to
Expand All @@ -205,9 +169,9 @@ func ReceiverContext(
return ctx
}

// traceReceiveOp creates the span used to trace the operation. Returning
// startOp creates the span used to trace the operation. Returning
// the updated context with the created span.
func (rec *Receiver) traceReceiveOp(
func (rec *Receiver) startOp(
receiverCtx context.Context,
operationSuffix string,
opt ...StartReceiveOption,
Expand All @@ -224,7 +188,7 @@ func (rec *Receiver) traceReceiveOp(
ctx, span = trace.StartSpan(receiverCtx, spanName)
} else {
// Since the receiverCtx is long lived do not use it to start the span.
// This way this trace ends when the EndTraceDataReceiveOp is called.
// This way this trace ends when the EndTracesOp is called.
// Here is safe to ignore the returned context since it is not used below.
_, span = trace.StartSpan(context.Background(), spanName)

Expand All @@ -240,8 +204,8 @@ func (rec *Receiver) traceReceiveOp(
return ctx
}

// endReceiveOp records the observability signals at the end of an operation.
func (rec *Receiver) endReceiveOp(
// endOp records the observability signals at the end of an operation.
func (rec *Receiver) endOp(
receiverCtx context.Context,
format string,
numReceivedItems int,
Expand Down
16 changes: 8 additions & 8 deletions obsreport/obsreport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,10 @@ func TestReceiveTraceDataOp(t *testing.T) {
rcvdSpans := []int{13, 42}
for i, param := range params {
rec := NewReceiver(ReceiverSettings{ReceiverID: receiver, Transport: param.transport})
ctx := rec.StartTraceDataReceiveOp(receiverCtx)
ctx := rec.StartTracesOp(receiverCtx)
assert.NotNil(t, ctx)

rec.EndTraceDataReceiveOp(
rec.EndTracesOp(
ctx,
format,
rcvdSpans[i],
Expand Down Expand Up @@ -133,10 +133,10 @@ func TestReceiveLogsOp(t *testing.T) {
rcvdLogRecords := []int{13, 42}
for i, param := range params {
rec := NewReceiver(ReceiverSettings{ReceiverID: receiver, Transport: param.transport})
ctx := rec.StartLogsReceiveOp(receiverCtx)
ctx := rec.StartLogsOp(receiverCtx)
assert.NotNil(t, ctx)

rec.EndLogsReceiveOp(
rec.EndLogsOp(
ctx,
format,
rcvdLogRecords[i],
Expand Down Expand Up @@ -194,10 +194,10 @@ func TestReceiveMetricsOp(t *testing.T) {
rcvdMetricPts := []int{23, 29}
for i, param := range params {
rec := NewReceiver(ReceiverSettings{ReceiverID: receiver, Transport: param.transport})
ctx := rec.StartMetricsReceiveOp(receiverCtx)
ctx := rec.StartMetricsOp(receiverCtx)
assert.NotNil(t, ctx)

rec.EndMetricsReceiveOp(
rec.EndMetricsOp(
ctx,
format,
rcvdMetricPts[i],
Expand Down Expand Up @@ -465,12 +465,12 @@ func TestReceiveWithLongLivedCtx(t *testing.T) {
// Use a new context on each operation to simulate distinct operations
// under the same long lived context.
rec := NewReceiver(ReceiverSettings{ReceiverID: receiver, Transport: transport})
ctx := rec.StartTraceDataReceiveOp(
ctx := rec.StartTracesOp(
longLivedCtx,
WithLongLivedCtx())
assert.NotNil(t, ctx)

rec.EndTraceDataReceiveOp(
rec.EndTracesOp(
ctx,
format,
op.numSpans,
Expand Down
12 changes: 6 additions & 6 deletions obsreport/obsreporttest/obsreporttest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ func TestCheckReceiverTracesViews(t *testing.T) {

receiverCtx := obsreport.ReceiverContext(context.Background(), receiver, transport)
rec := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: receiver, Transport: transport})
ctx := rec.StartTraceDataReceiveOp(receiverCtx)
ctx := rec.StartTracesOp(receiverCtx)
assert.NotNil(t, ctx)
rec.EndTraceDataReceiveOp(
rec.EndTracesOp(
ctx,
format,
7,
Expand All @@ -62,9 +62,9 @@ func TestCheckReceiverMetricsViews(t *testing.T) {

receiverCtx := obsreport.ReceiverContext(context.Background(), receiver, transport)
rec := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: receiver, Transport: transport})
ctx := rec.StartMetricsReceiveOp(receiverCtx)
ctx := rec.StartMetricsOp(receiverCtx)
assert.NotNil(t, ctx)
rec.EndMetricsReceiveOp(ctx, format, 7, nil)
rec.EndMetricsOp(ctx, format, 7, nil)

obsreporttest.CheckReceiverMetrics(t, receiver, transport, 7, 0)
}
Expand All @@ -76,9 +76,9 @@ func TestCheckReceiverLogsViews(t *testing.T) {

receiverCtx := obsreport.ReceiverContext(context.Background(), receiver, transport)
rec := obsreport.NewReceiver(obsreport.ReceiverSettings{ReceiverID: receiver, Transport: transport})
ctx := rec.StartLogsReceiveOp(receiverCtx)
ctx := rec.StartLogsOp(receiverCtx)
assert.NotNil(t, ctx)
rec.EndLogsReceiveOp(ctx, format, 7, nil)
rec.EndLogsOp(ctx, format, 7, nil)

obsreporttest.CheckReceiverLogs(t, receiver, transport, 7, 0)
}
Expand Down
14 changes: 7 additions & 7 deletions receiver/jaegerreceiver/trace_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,10 +245,10 @@ func (h *agentHandler) EmitZipkinBatch(context.Context, []*zipkincore.Span) (err
// Jaeger spans received by the Jaeger agent processor.
func (h *agentHandler) EmitBatch(ctx context.Context, batch *jaeger.Batch) error {
ctx = obsreport.ReceiverContext(ctx, h.id, h.transport)
ctx = h.obsrecv.StartTraceDataReceiveOp(ctx)
ctx = h.obsrecv.StartTracesOp(ctx)

numSpans, err := consumeTraces(ctx, batch, h.nextConsumer)
h.obsrecv.EndTraceDataReceiveOp(ctx, thriftFormat, numSpans, err)
h.obsrecv.EndTracesOp(ctx, thriftFormat, numSpans, err)
return err
}

Expand All @@ -273,12 +273,12 @@ func (jr *jReceiver) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest)
}

ctx = obsreport.ReceiverContext(ctx, jr.id, grpcTransport)
ctx = jr.grpcObsrecv.StartTraceDataReceiveOp(ctx)
ctx = jr.grpcObsrecv.StartTracesOp(ctx)

td := jaegertranslator.ProtoBatchToInternalTraces(r.GetBatch())

err := jr.nextConsumer.ConsumeTraces(ctx, td)
jr.grpcObsrecv.EndTraceDataReceiveOp(ctx, protobufFormat, len(r.GetBatch().Spans), err)
jr.grpcObsrecv.EndTracesOp(ctx, protobufFormat, len(r.GetBatch().Spans), err)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -423,12 +423,12 @@ func (jr *jReceiver) HandleThriftHTTPBatch(w http.ResponseWriter, r *http.Reques
}

ctx = obsreport.ReceiverContext(ctx, jr.id, collectorHTTPTransport)
ctx = jr.httpObsrecv.StartTraceDataReceiveOp(ctx)
ctx = jr.httpObsrecv.StartTracesOp(ctx)

batch, hErr := jr.decodeThriftHTTPBody(r)
if hErr != nil {
http.Error(w, html.EscapeString(hErr.msg), hErr.statusCode)
jr.httpObsrecv.EndTraceDataReceiveOp(ctx, thriftFormat, 0, hErr)
jr.httpObsrecv.EndTracesOp(ctx, thriftFormat, 0, hErr)
return
}

Expand All @@ -438,7 +438,7 @@ func (jr *jReceiver) HandleThriftHTTPBatch(w http.ResponseWriter, r *http.Reques
} else {
w.WriteHeader(http.StatusAccepted)
}
jr.httpObsrecv.EndTraceDataReceiveOp(ctx, thriftFormat, numSpans, err)
jr.httpObsrecv.EndTracesOp(ctx, thriftFormat, numSpans, err)
}

func (jr *jReceiver) startCollector(host component.Host) error {
Expand Down
8 changes: 4 additions & 4 deletions receiver/kafkareceiver/kafka_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe
session.MarkMessage(message, "")

ctx := obsreport.ReceiverContext(session.Context(), c.id, transport)
ctx = c.obsrecv.StartTraceDataReceiveOp(ctx)
ctx = c.obsrecv.StartTracesOp(ctx)
statsTags := []tag.Mutator{tag.Insert(tagInstanceName, c.id.String())}
_ = stats.RecordWithTags(ctx, statsTags,
statMessageCount.M(1),
Expand All @@ -275,7 +275,7 @@ func (c *tracesConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSe

spanCount := traces.SpanCount()
err = c.nextConsumer.ConsumeTraces(session.Context(), traces)
c.obsrecv.EndTraceDataReceiveOp(ctx, c.unmarshaler.Encoding(), spanCount, err)
c.obsrecv.EndTracesOp(ctx, c.unmarshaler.Encoding(), spanCount, err)
if err != nil {
return err
}
Expand Down Expand Up @@ -312,7 +312,7 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess
session.MarkMessage(message, "")

ctx := obsreport.ReceiverContext(session.Context(), c.id, transport)
ctx = c.obsrecv.StartTraceDataReceiveOp(ctx)
ctx = c.obsrecv.StartTracesOp(ctx)
_ = stats.RecordWithTags(
ctx,
[]tag.Mutator{tag.Insert(tagInstanceName, c.id.String())},
Expand All @@ -328,7 +328,7 @@ func (c *logsConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSess

err = c.nextConsumer.ConsumeLogs(session.Context(), logs)
// TODO
c.obsrecv.EndTraceDataReceiveOp(ctx, c.unmarshaler.Encoding(), logs.LogRecordCount(), err)
c.obsrecv.EndTracesOp(ctx, c.unmarshaler.Encoding(), logs.LogRecordCount(), err)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions receiver/opencensusreceiver/ocmetrics/opencensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (ocr *Receiver) processReceivedMsg(
}

func (ocr *Receiver) sendToNextConsumer(longLivedRPCCtx context.Context, node *commonpb.Node, resource *resourcepb.Resource, metrics []*ocmetrics.Metric) error {
ctx := ocr.obsrecv.StartMetricsReceiveOp(
ctx := ocr.obsrecv.StartMetricsOp(
longLivedRPCCtx,
obsreport.WithLongLivedCtx())

Expand All @@ -141,7 +141,7 @@ func (ocr *Receiver) sendToNextConsumer(longLivedRPCCtx context.Context, node *c
consumerErr = ocr.nextConsumer.ConsumeMetrics(ctx, internaldata.OCToMetrics(node, resource, metrics))
}

ocr.obsrecv.EndMetricsReceiveOp(
ocr.obsrecv.EndMetricsOp(
ctx,
receiverDataFormat,
numPoints,
Expand Down
Loading

0 comments on commit 474c0ed

Please sign in to comment.