Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Populate client.Info from config helpers #4423

Merged
merged 7 commits into from
Dec 8, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Wrap stream to override context
Signed-off-by: Juraci Paixão Kröhling <[email protected]>
  • Loading branch information
jpkrohling committed Dec 7, 2021
commit d30814d0588fb8ae5b257a786b062b5ab954748d
8 changes: 5 additions & 3 deletions config/configauth/serverauth.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"errors"

grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

Expand Down Expand Up @@ -100,11 +101,12 @@ func DefaultGRPCStreamServerInterceptor(srv interface{}, stream grpc.ServerStrea
return errMetadataNotFound
}

// TODO: propagate the context down the stream
_, err := authenticate(ctx, headers)
ctx, err := authenticate(ctx, headers)
if err != nil {
return err
}

return handler(srv, stream)
wrapped := grpc_middleware.WrapServerStream(stream)
wrapped.WrappedContext = ctx
return handler(srv, wrapped)
}
19 changes: 17 additions & 2 deletions config/configauth/serverauth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@ package configauth
import (
"context"
"fmt"
"net"
"testing"

"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

"go.opentelemetry.io/collector/client"
)

func TestDefaultUnaryInterceptorAuthSucceeded(t *testing.T) {
Expand All @@ -30,10 +33,16 @@ func TestDefaultUnaryInterceptorAuthSucceeded(t *testing.T) {
authCalled := false
authFunc := func(context.Context, map[string][]string) (context.Context, error) {
authCalled = true
return context.Background(), nil
ctx := client.NewContext(context.Background(), client.Info{
Addr: &net.IPAddr{IP: net.IPv4(1, 2, 3, 4)},
})

return ctx, nil
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
handlerCalled = true
cl := client.FromContext(ctx)
assert.Equal(t, "1.2.3.4", cl.Addr.String())
return nil, nil
}
ctx := metadata.NewIncomingContext(context.Background(), metadata.Pairs("authorization", "some-auth-data"))
Expand Down Expand Up @@ -96,9 +105,15 @@ func TestDefaultStreamInterceptorAuthSucceeded(t *testing.T) {
authCalled := false
authFunc := func(context.Context, map[string][]string) (context.Context, error) {
authCalled = true
return context.Background(), nil
ctx := client.NewContext(context.Background(), client.Info{
Addr: &net.IPAddr{IP: net.IPv4(1, 2, 3, 4)},
})
return ctx, nil
}
handler := func(srv interface{}, stream grpc.ServerStream) error {
// ensure that the client information is propagated down to the underlying stream
cl := client.FromContext(stream.Context())
assert.Equal(t, "1.2.3.4", cl.Addr.String())
handlerCalled = true
return nil
}
Expand Down
10 changes: 6 additions & 4 deletions config/configgrpc/configgrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"strings"
"time"

grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/mostynb/go-grpc-compression/snappy"
"github.com/mostynb/go-grpc-compression/zstd"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
Expand Down Expand Up @@ -375,13 +376,14 @@ func GetGRPCCompressionKey(compressionType string) string {

// enhanceWithClientInformation intercepts the incoming RPC, replacing the incoming context with one that includes
// a client.Info, potentially with the peer's address.
func enhanceWithClientInformation(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
func enhanceWithClientInformation(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
return handler(contextWithClient(ctx), req)
}

func enhanceStreamWithClientInformation(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
// TODO: how to change the context in this case?
return nil
func enhanceStreamWithClientInformation(srv interface{}, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
wrapped := grpc_middleware.WrapServerStream(ss)
wrapped.WrappedContext = contextWithClient(ss.Context())
return handler(srv, wrapped)
}

// contextWithClient attempts to add the peer address to the client.Info from the context. When no
Expand Down
127 changes: 125 additions & 2 deletions config/configgrpc/configgrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"testing"
"time"

pb_testproto "github.com/grpc-ecosystem/go-grpc-middleware/testing/testproto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
Expand Down Expand Up @@ -631,12 +632,134 @@ func TestContextWithClient(t *testing.T) {
}
}

type grpcTraceServer struct{}
func TestClientInfoInterceptors(t *testing.T) {
testCases := []struct {
desc string
tester func(context.Context, pb_testproto.TestServiceClient)
}{
{
desc: "stream",
tester: func(ctx context.Context, cl pb_testproto.TestServiceClient) {
stream, err := cl.PingList(ctx, &pb_testproto.PingRequest{})
require.NoError(t, err)

_, err = stream.Recv()
require.NoError(t, err)
},
},
{
desc: "unary",
tester: func(ctx context.Context, cl pb_testproto.TestServiceClient) {
resp, errResp := cl.Ping(ctx, &pb_testproto.PingRequest{})
require.NoError(t, errResp)
require.NotNil(t, resp)
},
},
}
for _, tC := range testCases {
t.Run(tC.desc, func(t *testing.T) {
mock := &pingService{}
var l net.Listener

// prepare the server
{
gss := &GRPCServerSettings{
NetAddr: confignet.NetAddr{
Endpoint: "localhost:0",
Transport: "tcp",
},
}
opts, err := gss.ToServerOption(componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings())
require.NoError(t, err)
srv := grpc.NewServer(opts...)
pb_testproto.RegisterTestServiceServer(srv, mock)

defer srv.Stop()

l, err = gss.ToListener()
require.NoError(t, err)

go func() {
_ = srv.Serve(l)
}()
}

// prepare the client and execute a RPC
{
gcs := &GRPCClientSettings{
Endpoint: l.Addr().String(),
TLSSetting: configtls.TLSClientSetting{
Insecure: true,
},
}

tt, err := obsreporttest.SetupTelemetry()
require.NoError(t, err)
defer func() {
require.NoError(t, tt.Shutdown(context.Background()))
}()

clientOpts, errClient := gcs.ToDialOptions(componenttest.NewNopHost(), tt.TelemetrySettings)
require.NoError(t, errClient)

grpcClientConn, errDial := grpc.Dial(gcs.Endpoint, clientOpts...)
require.NoError(t, errDial)

func (gts *grpcTraceServer) Export(context.Context, otlpgrpc.TracesRequest) (otlpgrpc.TracesResponse, error) {
cl := pb_testproto.NewTestServiceClient(grpcClientConn)
ctx, cancelFunc := context.WithTimeout(context.Background(), 2*time.Second)
defer cancelFunc()

// test
tC.tester(ctx, cl)
}

// verify
cl := client.FromContext(mock.recordedContext)

// the client address is something like 127.0.0.1:41086
assert.Contains(t, cl.Addr.String(), "127.0.0.1")
})
}
}

type grpcTraceServer struct {
}

func (gts *grpcTraceServer) Export(ctx context.Context, _ otlpgrpc.TracesRequest) (otlpgrpc.TracesResponse, error) {
return otlpgrpc.NewTracesResponse(), nil
}

// pingService comes from the grpc-middleware project and is made available to test
// interceptors and other gRPC middleware code. We use it here as it provides a
// streaming interface, which we don't have for OTLP services.
type pingService struct {
recordedContext context.Context
}

func (s *pingService) PingEmpty(ctx context.Context, _ *pb_testproto.Empty) (*pb_testproto.PingResponse, error) {
s.recordedContext = ctx
return &pb_testproto.PingResponse{}, nil
}

func (s *pingService) Ping(ctx context.Context, _ *pb_testproto.PingRequest) (*pb_testproto.PingResponse, error) {
s.recordedContext = ctx
return &pb_testproto.PingResponse{}, nil
}

func (s *pingService) PingError(ctx context.Context, _ *pb_testproto.PingRequest) (*pb_testproto.Empty, error) {
s.recordedContext = ctx
return &pb_testproto.Empty{}, nil
}

func (s *pingService) PingList(req *pb_testproto.PingRequest, stream pb_testproto.TestService_PingListServer) error {
s.recordedContext = stream.Context()
return stream.Send(&pb_testproto.PingResponse{})
}

func (s *pingService) PingStream(stream pb_testproto.TestService_PingStreamServer) error {
return nil
}

// tempSocketName provides a temporary Unix socket name for testing.
func tempSocketName(t *testing.T) string {
tmpfile, err := ioutil.TempFile("", "sock")
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/gogo/protobuf v1.3.2
github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.8.0
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/knadh/koanf v1.3.3
github.com/magiconair/properties v1.8.5
github.com/mitchellh/mapstructure v1.4.3
Expand Down
7 changes: 7 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,8 @@ github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5m
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 h1:+9834+KizmvFV7pXQGSXQTsaWhq2GjuNUt0aUU0YBYw=
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0/go.mod h1:z0ButlSOZa5vEBq9m2m2hlwIgKw+rp3sdCBRoJY+30Y=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q=
github.com/hashicorp/consul/sdk v0.1.1/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8=
Expand Down Expand Up @@ -326,6 +328,7 @@ github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQ
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
jpkrohling marked this conversation as resolved.
Show resolved Hide resolved
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE=
Expand Down Expand Up @@ -455,14 +458,17 @@ go.opentelemetry.io/otel/sdk/metric v0.25.0/go.mod h1:G4xzj4LvC6xDDSsVXpvRVclQCb
go.opentelemetry.io/otel/trace v1.2.0 h1:Ys3iqbqZhcf28hHzrm5WAquMkDHNZTUkw7KHbuNjej0=
go.opentelemetry.io/otel/trace v1.2.0/go.mod h1:N5FLswTubnxKxOJHM7XZC074qpeEdLy3CgAVsdMucK0=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.11-0.20210813005559-691160354723 h1:sHOAIxRGBp443oHZIPB+HsUGaksVCXVQENPxwTfQdH4=
go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/multierr v1.7.0 h1:zaiO/rmgFjbmCXdSYJWQcdvOCsthmdaHfr3Gm2Kx4Ec=
go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo=
go.uber.org/zap v1.19.1 h1:ue41HOKd1vGURxrmeKIgELGb3jPW9DMUDGtsinblHwI=
go.uber.org/zap v1.19.1/go.mod h1:j3DNczoxDZroyBnOT1L/Q79cfUMGZxlv/9dzN7SM1rI=
Expand Down Expand Up @@ -761,6 +767,7 @@ google.golang.org/genproto v0.0.0-20200228133532-8c2c7df3a383/go.mod h1:55QSHmfG
google.golang.org/genproto v0.0.0-20200305110556-506484158171/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200312145019-da6875a35672/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200331122359-1ee6d9798940/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200423170343-7949de9c1215/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200430143042-b979b6f78d84/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200511104702-f5ebc3bea380/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
Expand Down