Skip to content

Commit

Permalink
Wrap stream to override context
Browse files Browse the repository at this point in the history
Signed-off-by: Juraci Paixão Kröhling <[email protected]>
  • Loading branch information
jpkrohling committed Nov 29, 2021
1 parent a8620f5 commit 279c255
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 12 deletions.
8 changes: 5 additions & 3 deletions config/configauth/serverauth.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"errors"

grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

Expand Down Expand Up @@ -100,11 +101,12 @@ func DefaultGRPCStreamServerInterceptor(srv interface{}, stream grpc.ServerStrea
return errMetadataNotFound
}

// TODO: propagate the context down the stream
_, err := authenticate(ctx, headers)
ctx, err := authenticate(ctx, headers)
if err != nil {
return err
}

return handler(srv, stream)
wrapped := grpc_middleware.WrapServerStream(stream)
wrapped.WrappedContext = ctx
return handler(srv, wrapped)
}
19 changes: 17 additions & 2 deletions config/configauth/serverauth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,14 @@ package configauth
import (
"context"
"fmt"
"net"
"testing"

"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

"go.opentelemetry.io/collector/client"
)

func TestDefaultUnaryInterceptorAuthSucceeded(t *testing.T) {
Expand All @@ -30,10 +33,16 @@ func TestDefaultUnaryInterceptorAuthSucceeded(t *testing.T) {
authCalled := false
authFunc := func(context.Context, map[string][]string) (context.Context, error) {
authCalled = true
return context.Background(), nil
ctx := client.NewContext(context.Background(), client.Info{
Addr: &net.IPAddr{IP: net.IPv4(1, 2, 3, 4)},
})

return ctx, nil
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
handlerCalled = true
cl := client.FromContext(ctx)
assert.Equal(t, "1.2.3.4", cl.Addr.String())
return nil, nil
}
ctx := metadata.NewIncomingContext(context.Background(), metadata.Pairs("authorization", "some-auth-data"))
Expand Down Expand Up @@ -96,9 +105,15 @@ func TestDefaultStreamInterceptorAuthSucceeded(t *testing.T) {
authCalled := false
authFunc := func(context.Context, map[string][]string) (context.Context, error) {
authCalled = true
return context.Background(), nil
ctx := client.NewContext(context.Background(), client.Info{
Addr: &net.IPAddr{IP: net.IPv4(1, 2, 3, 4)},
})
return ctx, nil
}
handler := func(srv interface{}, stream grpc.ServerStream) error {
// ensure that the client information is propagated down to the underlying stream
cl := client.FromContext(stream.Context())
assert.Equal(t, "1.2.3.4", cl.Addr.String())
handlerCalled = true
return nil
}
Expand Down
10 changes: 6 additions & 4 deletions config/configgrpc/configgrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"strings"
"time"

grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
"github.com/mostynb/go-grpc-compression/snappy"
"github.com/mostynb/go-grpc-compression/zstd"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
Expand Down Expand Up @@ -375,13 +376,14 @@ func GetGRPCCompressionKey(compressionType string) string {

// enhanceWithClientInformation intercepts the incoming RPC, replacing the incoming context with one that includes
// a client.Info, potentially with the peer's address.
func enhanceWithClientInformation(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
func enhanceWithClientInformation(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
return handler(contextWithClient(ctx), req)
}

func enhanceStreamWithClientInformation(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
// TODO: how to change the context in this case?
return nil
func enhanceStreamWithClientInformation(srv interface{}, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
wrapped := grpc_middleware.WrapServerStream(ss)
wrapped.WrappedContext = contextWithClient(ss.Context())
return handler(srv, wrapped)
}

// contextWithClient attempts to add the peer address to the client.Info from the context. When no
Expand Down
127 changes: 125 additions & 2 deletions config/configgrpc/configgrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"testing"
"time"

pb_testproto "github.com/grpc-ecosystem/go-grpc-middleware/testing/testproto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
Expand Down Expand Up @@ -631,12 +632,134 @@ func TestContextWithClient(t *testing.T) {
}
}

type grpcTraceServer struct{}
func TestClientInfoInterceptors(t *testing.T) {
testCases := []struct {
desc string
tester func(context.Context, pb_testproto.TestServiceClient)
}{
{
desc: "stream",
tester: func(ctx context.Context, cl pb_testproto.TestServiceClient) {
stream, err := cl.PingList(ctx, &pb_testproto.PingRequest{})
require.NoError(t, err)

_, err = stream.Recv()
require.NoError(t, err)
},
},
{
desc: "unary",
tester: func(ctx context.Context, cl pb_testproto.TestServiceClient) {
resp, errResp := cl.Ping(ctx, &pb_testproto.PingRequest{})
require.NoError(t, errResp)
require.NotNil(t, resp)
},
},
}
for _, tC := range testCases {
t.Run(tC.desc, func(t *testing.T) {
mock := &pingService{}
var l net.Listener

// prepare the server
{
gss := &GRPCServerSettings{
NetAddr: confignet.NetAddr{
Endpoint: "localhost:0",
Transport: "tcp",
},
}
opts, err := gss.ToServerOption(componenttest.NewNopHost(), componenttest.NewNopTelemetrySettings())
require.NoError(t, err)
srv := grpc.NewServer(opts...)
pb_testproto.RegisterTestServiceServer(srv, mock)

defer srv.Stop()

l, err = gss.ToListener()
require.NoError(t, err)

go func() {
_ = srv.Serve(l)
}()
}

// prepare the client and execute a RPC
{
gcs := &GRPCClientSettings{
Endpoint: l.Addr().String(),
TLSSetting: configtls.TLSClientSetting{
Insecure: true,
},
}

tt, err := obsreporttest.SetupTelemetry()
require.NoError(t, err)
defer func() {
require.NoError(t, tt.Shutdown(context.Background()))
}()

clientOpts, errClient := gcs.ToDialOptions(componenttest.NewNopHost(), tt.TelemetrySettings)
require.NoError(t, errClient)

grpcClientConn, errDial := grpc.Dial(gcs.Endpoint, clientOpts...)
require.NoError(t, errDial)

func (gts *grpcTraceServer) Export(context.Context, otlpgrpc.TracesRequest) (otlpgrpc.TracesResponse, error) {
cl := pb_testproto.NewTestServiceClient(grpcClientConn)
ctx, cancelFunc := context.WithTimeout(context.Background(), 2*time.Second)
defer cancelFunc()

// test
tC.tester(ctx, cl)
}

// verify
cl := client.FromContext(mock.recordedContext)

// the client address is something like 127.0.0.1:41086
assert.Contains(t, cl.Addr.String(), "127.0.0.1")
})
}
}

type grpcTraceServer struct {
}

func (gts *grpcTraceServer) Export(ctx context.Context, _ otlpgrpc.TracesRequest) (otlpgrpc.TracesResponse, error) {
return otlpgrpc.NewTracesResponse(), nil
}

// pingService comes from the grpc-middleware project and is made available to test
// interceptors and other gRPC middleware code. We use it here as it provides a
// streaming interface, which we don't have for OTLP services.
type pingService struct {
recordedContext context.Context
}

func (s *pingService) PingEmpty(ctx context.Context, _ *pb_testproto.Empty) (*pb_testproto.PingResponse, error) {
s.recordedContext = ctx
return &pb_testproto.PingResponse{}, nil
}

func (s *pingService) Ping(ctx context.Context, _ *pb_testproto.PingRequest) (*pb_testproto.PingResponse, error) {
s.recordedContext = ctx
return &pb_testproto.PingResponse{}, nil
}

func (s *pingService) PingError(ctx context.Context, _ *pb_testproto.PingRequest) (*pb_testproto.Empty, error) {
s.recordedContext = ctx
return &pb_testproto.Empty{}, nil
}

func (s *pingService) PingList(req *pb_testproto.PingRequest, stream pb_testproto.TestService_PingListServer) error {
s.recordedContext = stream.Context()
return stream.Send(&pb_testproto.PingResponse{})
}

func (s *pingService) PingStream(stream pb_testproto.TestService_PingStreamServer) error {
return nil
}

// tempSocketName provides a temporary Unix socket name for testing.
func tempSocketName(t *testing.T) string {
tmpfile, err := ioutil.TempFile("", "sock")
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/gogo/protobuf v1.3.2
github.com/google/uuid v1.3.0
github.com/gorilla/mux v1.8.0
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/knadh/koanf v1.3.2
github.com/magiconair/properties v1.8.5
github.com/mitchellh/mapstructure v1.4.2
Expand Down
8 changes: 7 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5m
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 h1:+9834+KizmvFV7pXQGSXQTsaWhq2GjuNUt0aUU0YBYw=
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0/go.mod h1:z0ButlSOZa5vEBq9m2m2hlwIgKw+rp3sdCBRoJY+30Y=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q=
github.com/hashicorp/consul/sdk v0.1.1/go.mod h1:VKf9jXwCTEY1QZP2MOLRhb5i/I/ssyNV1vwHyQBF0x8=
Expand Down Expand Up @@ -328,6 +330,7 @@ github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQ
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE=
Expand Down Expand Up @@ -450,18 +453,20 @@ go.opentelemetry.io/otel/sdk/export/metric v0.25.0 h1:6UjAFmVB5Fza3K5qUJpYWGrk8Q
go.opentelemetry.io/otel/sdk/export/metric v0.25.0/go.mod h1:Ej7NOa+WpN49EIcr1HMUYRvxXXCCnQCg2+ovdt2z8Pk=
go.opentelemetry.io/otel/sdk/metric v0.25.0 h1:J+Ta+4IAA5W9AdWhGQLfciEpavBqqSkBzTDeYvJLFNU=
go.opentelemetry.io/otel/sdk/metric v0.25.0/go.mod h1:G4xzj4LvC6xDDSsVXpvRVclQCbofGGg4ZU2VKKtDRfg=
go.opentelemetry.io/otel/trace v1.1.0/go.mod h1:i47XtdcBQiktu5IsrPqOHe8w+sBmnLwwHt8wiUsWGTI=
go.opentelemetry.io/otel/trace v1.2.0 h1:Ys3iqbqZhcf28hHzrm5WAquMkDHNZTUkw7KHbuNjej0=
go.opentelemetry.io/otel/trace v1.2.0/go.mod h1:N5FLswTubnxKxOJHM7XZC074qpeEdLy3CgAVsdMucK0=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/goleak v1.1.11-0.20210813005559-691160354723 h1:sHOAIxRGBp443oHZIPB+HsUGaksVCXVQENPxwTfQdH4=
go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/multierr v1.7.0 h1:zaiO/rmgFjbmCXdSYJWQcdvOCsthmdaHfr3Gm2Kx4Ec=
go.uber.org/multierr v1.7.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo=
go.uber.org/zap v1.19.1 h1:ue41HOKd1vGURxrmeKIgELGb3jPW9DMUDGtsinblHwI=
go.uber.org/zap v1.19.1/go.mod h1:j3DNczoxDZroyBnOT1L/Q79cfUMGZxlv/9dzN7SM1rI=
Expand Down Expand Up @@ -759,6 +764,7 @@ google.golang.org/genproto v0.0.0-20200228133532-8c2c7df3a383/go.mod h1:55QSHmfG
google.golang.org/genproto v0.0.0-20200305110556-506484158171/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200312145019-da6875a35672/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200331122359-1ee6d9798940/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200423170343-7949de9c1215/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200430143042-b979b6f78d84/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200511104702-f5ebc3bea380/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
Expand Down

0 comments on commit 279c255

Please sign in to comment.