Skip to content

Commit

Permalink
Populate client.Info from config helpers (open-telemetry#4423)
Browse files Browse the repository at this point in the history
* Populate client.Info from config helpers

Fixes open-telemetry#4419

Signed-off-by: Juraci Paixão Kröhling <[email protected]>

* Add changelog entries

Signed-off-by: Juraci Paixão Kröhling <[email protected]>

* Wrap stream to override context

Signed-off-by: Juraci Paixão Kröhling <[email protected]>

* Use latest go-grpc-middleware

Signed-off-by: Juraci Paixão Kröhling <[email protected]>

* Remove opentracing-go from go.sum

Signed-off-by: Juraci Paixão Kröhling <[email protected]>

* Remove dependency on go-grpc-middleware

Signed-off-by: Juraci Paixão Kröhling <[email protected]>

* nolint context keys in tests

Signed-off-by: Juraci Paixão Kröhling <[email protected]>
  • Loading branch information
jpkrohling authored Dec 8, 2021
1 parent 5733742 commit d505629
Show file tree
Hide file tree
Showing 17 changed files with 502 additions and 97 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@

- Fix handling of corrupted records by persistent buffer (experimental) (#4475)

## 💡 Enhancements 💡

- `client.Info` pre-populated for all receivers using common helpers like `confighttp` and `configgrpc` (#4423)

## v0.40.0 Beta

## 🛑 Breaking changes 🛑
Expand Down
33 changes: 0 additions & 33 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,6 @@ package client // import "go.opentelemetry.io/collector/client"
import (
"context"
"net"
"net/http"

"google.golang.org/grpc/peer"
)

type ctxKey struct{}
Expand Down Expand Up @@ -140,33 +137,3 @@ func FromContext(ctx context.Context) Info {
}
return c
}

// FromGRPC takes a GRPC context and tries to extract client information from it
func FromGRPC(ctx context.Context) (Info, bool) {
if p, ok := peer.FromContext(ctx); ok {
ip := parseIP(p.Addr.String())
if ip != nil {
return Info{Addr: ip}, true
}
}
return Info{}, false
}

// FromHTTP takes a net/http Request object and tries to extract client information from it
func FromHTTP(r *http.Request) (Info, bool) {
ip := parseIP(r.RemoteAddr)
if ip == nil {
return Info{}, false
}
return Info{Addr: ip}, true
}

func parseIP(source string) net.Addr {
ipstr, _, err := net.SplitHostPort(source)
if err == nil {
source = ipstr
}
return &net.IPAddr{
IP: net.ParseIP(source),
}
}
23 changes: 0 additions & 23 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@ package client
import (
"context"
"net"
"net/http"
"testing"

"github.com/stretchr/testify/assert"
"google.golang.org/grpc/peer"
)

func TestNewContext(t *testing.T) {
Expand Down Expand Up @@ -88,24 +86,3 @@ func TestFromContext(t *testing.T) {
})
}
}

func TestParsingGRPC(t *testing.T) {
grpcCtx := peer.NewContext(context.Background(), &peer.Peer{
Addr: &net.TCPAddr{
IP: net.ParseIP("192.168.1.1"),
Port: 80,
},
})

client, ok := FromGRPC(grpcCtx)
assert.True(t, ok)
assert.NotNil(t, client)
assert.Equal(t, client.Addr.String(), "192.168.1.1")
}

func TestParsingHTTP(t *testing.T) {
client, ok := FromHTTP(&http.Request{RemoteAddr: "192.168.1.2"})
assert.True(t, ok)
assert.NotNil(t, client)
assert.Equal(t, client.Addr.String(), "192.168.1.2")
}
8 changes: 5 additions & 3 deletions config/configauth/serverauth.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"google.golang.org/grpc/metadata"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/internal/middleware"
)

var (
Expand Down Expand Up @@ -100,11 +101,12 @@ func DefaultGRPCStreamServerInterceptor(srv interface{}, stream grpc.ServerStrea
return errMetadataNotFound
}

// TODO: propagate the context down the stream
_, err := authenticate(ctx, headers)
ctx, err := authenticate(ctx, headers)
if err != nil {
return err
}

return handler(srv, stream)
wrapped := middleware.WrapServerStream(stream)
wrapped.WrappedContext = ctx
return handler(srv, wrapped)
}
19 changes: 17 additions & 2 deletions config/configauth/serverauth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@ package configauth
import (
"context"
"fmt"
"net"
"testing"

"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

"go.opentelemetry.io/collector/client"
)

func TestDefaultUnaryInterceptorAuthSucceeded(t *testing.T) {
Expand All @@ -30,10 +33,16 @@ func TestDefaultUnaryInterceptorAuthSucceeded(t *testing.T) {
authCalled := false
authFunc := func(context.Context, map[string][]string) (context.Context, error) {
authCalled = true
return context.Background(), nil
ctx := client.NewContext(context.Background(), client.Info{
Addr: &net.IPAddr{IP: net.IPv4(1, 2, 3, 4)},
})

return ctx, nil
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
handlerCalled = true
cl := client.FromContext(ctx)
assert.Equal(t, "1.2.3.4", cl.Addr.String())
return nil, nil
}
ctx := metadata.NewIncomingContext(context.Background(), metadata.Pairs("authorization", "some-auth-data"))
Expand Down Expand Up @@ -96,9 +105,15 @@ func TestDefaultStreamInterceptorAuthSucceeded(t *testing.T) {
authCalled := false
authFunc := func(context.Context, map[string][]string) (context.Context, error) {
authCalled = true
return context.Background(), nil
ctx := client.NewContext(context.Background(), client.Info{
Addr: &net.IPAddr{IP: net.IPv4(1, 2, 3, 4)},
})
return ctx, nil
}
handler := func(srv interface{}, stream grpc.ServerStream) error {
// ensure that the client information is propagated down to the underlying stream
cl := client.FromContext(stream.Context())
assert.Equal(t, "1.2.3.4", cl.Addr.String())
handlerCalled = true
return nil
}
Expand Down
29 changes: 29 additions & 0 deletions config/configgrpc/configgrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package configgrpc // import "go.opentelemetry.io/collector/config/configgrpc"

import (
"context"
"crypto/tls"
"fmt"
"net"
Expand All @@ -30,11 +31,14 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/encoding/gzip"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/peer"

"go.opentelemetry.io/collector/client"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configauth"
"go.opentelemetry.io/collector/config/confignet"
"go.opentelemetry.io/collector/config/configtls"
"go.opentelemetry.io/collector/internal/middleware"
)

// Compression gRPC keys for supported compression types within collector.
Expand Down Expand Up @@ -352,6 +356,9 @@ func (gss *GRPCServerSettings) ToServerOption(host component.Host, settings comp
otelgrpc.WithPropagators(otel.GetTextMapPropagator()),
))

uInterceptors = append(uInterceptors, enhanceWithClientInformation)
sInterceptors = append(sInterceptors, enhanceStreamWithClientInformation)

opts = append(opts, grpc.ChainUnaryInterceptor(uInterceptors...), grpc.ChainStreamInterceptor(sInterceptors...))

return opts, nil
Expand All @@ -366,3 +373,25 @@ func GetGRPCCompressionKey(compressionType string) string {
}
return CompressionUnsupported
}

// enhanceWithClientInformation intercepts the incoming RPC, replacing the incoming context with one that includes
// a client.Info, potentially with the peer's address.
func enhanceWithClientInformation(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
return handler(contextWithClient(ctx), req)
}

func enhanceStreamWithClientInformation(srv interface{}, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
wrapped := middleware.WrapServerStream(ss)
wrapped.WrappedContext = contextWithClient(ss.Context())
return handler(srv, wrapped)
}

// contextWithClient attempts to add the peer address to the client.Info from the context. When no
// client.Info exists in the context, one is created.
func contextWithClient(ctx context.Context) context.Context {
cl := client.FromContext(ctx)
if p, ok := peer.FromContext(ctx); ok {
cl.Addr = p.Addr
}
return client.NewContext(ctx, cl)
}
Loading

0 comments on commit d505629

Please sign in to comment.