Skip to content

Commit

Permalink
refactor: gRPC Invoke's refactor
Browse files Browse the repository at this point in the history
Moving things around and renaming, as a pre-requisition for the async
Invoke, it also moves a timeout inicialisation logic close to the moment
where it should be used.
  • Loading branch information
olegbespalov committed Apr 24, 2024
1 parent b866c10 commit a98a065
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 61 deletions.
44 changes: 29 additions & 15 deletions js/modules/k6/grpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,28 +278,44 @@ func (c *Client) Invoke(
method string,
req goja.Value,
params goja.Value,
) (*grpcext.Response, error) {
) (*grpcext.InvokeResponse, error) {
grpcReq, err := c.buildInvokeRequest(method, req, params)
if err != nil {
return nil, err
}

return c.conn.Invoke(c.vu.Context(), grpcReq)
}

// buildInvokeRequest creates a new InvokeRequest from the given method name, request object and parameters
func (c *Client) buildInvokeRequest(
method string,
req goja.Value,
params goja.Value,
) (grpcext.InvokeRequest, error) {
grpcReq := grpcext.InvokeRequest{}

state := c.vu.State()
if state == nil {
return nil, common.NewInitContextError("invoking RPC methods in the init context is not supported")
return grpcReq, common.NewInitContextError("invoking RPC methods in the init context is not supported")
}
if c.conn == nil {
return nil, errors.New("no gRPC connection, you must call connect first")
return grpcReq, errors.New("no gRPC connection, you must call connect first")
}
if method == "" {
return nil, errors.New("method to invoke cannot be empty")
return grpcReq, errors.New("method to invoke cannot be empty")
}
if method[0] != '/' {
method = "/" + method
}
methodDesc := c.mds[method]
if methodDesc == nil {
return nil, fmt.Errorf("method %q not found in file descriptors", method)
return grpcReq, fmt.Errorf("method %q not found in file descriptors", method)
}

p, err := newCallParams(c.vu, params)
if err != nil {
return nil, fmt.Errorf("invalid GRPC's client.invoke() parameters: %w", err)
return grpcReq, fmt.Errorf("invalid GRPC's client.invoke() parameters: %w", err)
}

// k6 GRPC Invoke's default timeout is 2 minutes
Expand All @@ -308,25 +324,23 @@ func (c *Client) Invoke(
}

if req == nil {
return nil, errors.New("request cannot be nil")
return grpcReq, errors.New("request cannot be nil")
}
b, err := req.ToObject(c.vu.Runtime()).MarshalJSON()
if err != nil {
return nil, fmt.Errorf("unable to serialise request object: %w", err)
return grpcReq, fmt.Errorf("unable to serialise request object: %w", err)
}

ctx, cancel := context.WithTimeout(c.vu.Context(), p.Timeout)
defer cancel()

p.SetSystemTags(state, c.addr, method)

reqmsg := grpcext.Request{
return grpcext.InvokeRequest{
Method: method,
MethodDescriptor: methodDesc,
Timeout: p.Timeout,
Message: b,
TagsAndMeta: &p.TagsAndMeta,
}

return c.conn.Invoke(ctx, method, p.Metadata, reqmsg)
Metadata: p.Metadata,
}, nil
}

// Close will close the client gRPC connection
Expand Down
48 changes: 29 additions & 19 deletions lib/netext/grpcext/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"net"
"strconv"
"strings"
"time"

"github.com/sirupsen/logrus"
"go.k6.io/k6/lib"
Expand All @@ -27,30 +28,34 @@ import (
"google.golang.org/protobuf/types/dynamicpb"
)

// Request represents a gRPC request.
type Request struct {
MethodDescriptor protoreflect.MethodDescriptor
TagsAndMeta *metrics.TagsAndMeta
Message []byte
}

// StreamRequest represents a gRPC stream request.
type StreamRequest struct {
// InvokeRequest represents a unary gRPC request.
type InvokeRequest struct {
Method string
MethodDescriptor protoreflect.MethodDescriptor
Timeout time.Duration
TagsAndMeta *metrics.TagsAndMeta
Message []byte
Metadata metadata.MD
}

// Response represents a gRPC response.
type Response struct {
// InvokeResponse represents a gRPC response.
type InvokeResponse struct {
Message interface{}
Error interface{}
Headers map[string][]string
Trailers map[string][]string
Status codes.Code
}

// StreamRequest represents a gRPC stream request.
type StreamRequest struct {
Method string
MethodDescriptor protoreflect.MethodDescriptor
Timeout time.Duration
TagsAndMeta *metrics.TagsAndMeta
Metadata metadata.MD
}

type clientConnCloser interface {
grpc.ClientConnInterface
Close() error
Expand Down Expand Up @@ -97,22 +102,27 @@ func (c *Conn) Reflect(ctx context.Context) (*descriptorpb.FileDescriptorSet, er
// Invoke executes a unary gRPC request.
func (c *Conn) Invoke(
ctx context.Context,
url string,
md metadata.MD,
req Request,
req InvokeRequest,
opts ...grpc.CallOption,
) (*Response, error) {
if url == "" {
) (*InvokeResponse, error) {
if req.Method == "" {
return nil, fmt.Errorf("url is required")
}

if req.MethodDescriptor == nil {
return nil, fmt.Errorf("request method descriptor is required")
}
if len(req.Message) == 0 {
return nil, fmt.Errorf("request message is required")
}

ctx = metadata.NewOutgoingContext(ctx, md)
if req.Timeout != time.Duration(0) {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, req.Timeout)
defer cancel()
}

ctx = metadata.NewOutgoingContext(ctx, req.Metadata)

reqdm := dynamicpb.NewMessage(req.MethodDescriptor.Input())
if err := protojson.Unmarshal(req.Message, reqdm); err != nil {
Expand All @@ -128,9 +138,9 @@ func (c *Conn) Invoke(
copts = append(copts, opts...)
copts = append(copts, grpc.Header(&header), grpc.Trailer(&trailer))

err := c.raw.Invoke(ctx, url, reqdm, resp, copts...)
err := c.raw.Invoke(ctx, req.Method, reqdm, resp, copts...)

response := Response{
response := InvokeResponse{
Headers: header,
Trailers: trailer,
}
Expand Down
45 changes: 18 additions & 27 deletions lib/netext/grpcext/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@ func TestInvoke(t *testing.T) {
}

c := Conn{raw: invokemock(helloReply)}
r := Request{
r := InvokeRequest{
Method: "/hello.HelloService/SayHello",
MethodDescriptor: methodFromProto("SayHello"),
Message: []byte(`{"greeting":"text request"}`),
Metadata: metadata.New(nil),
}
res, err := c.Invoke(context.Background(), "/hello.HelloService/SayHello", metadata.New(nil), r)
res, err := c.Invoke(context.Background(), r)
require.NoError(t, err)

assert.Equal(t, codes.OK, res.Status)
Expand All @@ -51,11 +53,13 @@ func TestInvokeWithCallOptions(t *testing.T) {
}

c := Conn{raw: invokemock(reply)}
r := Request{
r := InvokeRequest{
Method: "/hello.HelloService/NoOp",
MethodDescriptor: methodFromProto("NoOp"),
Message: []byte(`{}`),
Metadata: metadata.New(nil),
}
res, err := c.Invoke(context.Background(), "/hello.HelloService/NoOp", metadata.New(nil), r, grpc.UseCompressor("fakeone"))
res, err := c.Invoke(context.Background(), r, grpc.UseCompressor("fakeone"))
require.NoError(t, err)
assert.NotNil(t, res)
}
Expand All @@ -68,11 +72,13 @@ func TestInvokeReturnError(t *testing.T) {
}

c := Conn{raw: invokemock(helloReply)}
r := Request{
r := InvokeRequest{
Method: "/hello.HelloService/SayHello",
MethodDescriptor: methodFromProto("SayHello"),
Message: []byte(`{"greeting":"text request"}`),
Metadata: metadata.New(nil),
}
res, err := c.Invoke(context.Background(), "/hello.HelloService/SayHello", metadata.New(nil), r)
res, err := c.Invoke(context.Background(), r)
require.NoError(t, err)

assert.Equal(t, codes.Unknown, res.Status)
Expand All @@ -92,49 +98,34 @@ func TestConnInvokeInvalid(t *testing.T) {
payload = []byte(`{"greeting":"test"}`)
)

req := Request{
MethodDescriptor: methodDesc,
Message: payload,
}

tests := []struct {
name string
ctx context.Context
md metadata.MD
url string
req Request
req InvokeRequest
experr string
}{
{
name: "EmptyMethod",
ctx: ctx,
url: "",
md: md,
req: req,
req: InvokeRequest{MethodDescriptor: methodDesc, Message: payload, Metadata: md, Method: ""},
experr: "url is required",
},
{
name: "NullMethodDescriptor",
ctx: ctx,
url: url,
md: nil,
req: Request{Message: payload},
req: InvokeRequest{Message: payload, Metadata: nil, Method: url},
experr: "method descriptor is required",
},
{
name: "NullMessage",
ctx: ctx,
url: url,
md: nil,
req: Request{MethodDescriptor: methodDesc},
req: InvokeRequest{MethodDescriptor: methodDesc, Metadata: nil, Method: url},
experr: "message is required",
},
{
name: "EmptyMessage",
ctx: ctx,
url: url,
md: nil,
req: Request{MethodDescriptor: methodDesc, Message: []byte{}},
req: InvokeRequest{MethodDescriptor: methodDesc, Message: []byte{}, Metadata: nil, Method: url},
experr: "message is required",
},
}
Expand All @@ -145,7 +136,7 @@ func TestConnInvokeInvalid(t *testing.T) {
t.Parallel()

c := Conn{}
res, err := c.Invoke(tt.ctx, tt.url, tt.md, tt.req)
res, err := c.Invoke(tt.ctx, tt.req)
require.Error(t, err)
require.Nil(t, res)
assert.Contains(t, err.Error(), tt.experr)
Expand Down

0 comments on commit a98a065

Please sign in to comment.