-
Notifications
You must be signed in to change notification settings - Fork 4.6k
grpc: Fix cardinality violations in non-client streaming RPCs. #8385
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 29 commits
298edba
4c3a18a
fd5d614
7e4206c
324566b
9fbf6b7
9b9cddf
63565f2
67549e7
6030b90
dff08b3
f4f1b61
83e8664
5acd9ab
8f4f39b
6de922d
5f6c715
990efe1
7f6d31f
41d8328
59ad122
0f5248a
1ff8868
68fd0f8
19e9e71
0d30b18
83fd598
efa8e5a
29f6657
183b1da
749a52c
1b1800d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -3740,6 +3740,238 @@ func (s) TestClientStreaming_ReturnErrorAfterSendAndClose(t *testing.T) { | |||||||||||||
} | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
// Tests the behavior for server-side streaming when client calls SendMsg twice. | ||||||||||||||
// Second call to SendMsg should fail with Internal error and result in closing | ||||||||||||||
// the connection with a RST_STREAM. | ||||||||||||||
func (s) TestServerStreaming_ClientCallSendMsgTwice(t *testing.T) { | ||||||||||||||
// To ensure server.recvMsg() is successfully completed. | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this actually needed? Why? If the client were to attempt to send its second message immediately, and that caused a RST_STREAM, that would all happen after the server processed the headers, and the handler should get invoked regardless, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The first call to server.RecvMsg is made by the generated handler before the test's server handler is invoked. If this call fails, the server handler in the test will not be executed at all. To handle this, I synchronise the calls in the following way:
An alternative approach would be to use a fake server implementation, which would allow us to directly observe and assert the error returned from server.RecvMsg(). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How does that help? The first There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, it will not hung. Line 1102 in ac13172
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh yes, since the client knows it's not client-streaming, too, then it will do END_STREAM along with the first call to SendMsg. So then the reason for the synchronization here is that, if the client does a RST_STREAM, that might propagate to the server and cause its first recv to fail -- even though the first request message was received perfectly normally -- because we cancel the stream's context upon RST_STREAM receipt? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, correct. So, should we keep the test as it is or change it with alternative approach?
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No, but can you explain more like how I did, to say why the synchronization is needed? Because there would be a race between client cancellation and the server reading the first request message. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please leave the function-level comment to just a description of the test, and put the explanation of the various steps in intra-function comments. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What I was hoping for was more like this:
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||||||||||||||
recvDoneOnServer := make(chan struct{}) | ||||||||||||||
// To ensure goroutine for test does not end before RPC handler performs error | ||||||||||||||
// checking. | ||||||||||||||
handlerDone := make(chan struct{}) | ||||||||||||||
ss := stubserver.StubServer{ | ||||||||||||||
StreamingOutputCallF: func(_ *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error { | ||||||||||||||
// The initial call to recvMsg is made by the generated code. | ||||||||||||||
close(recvDoneOnServer) | ||||||||||||||
<-stream.Context().Done() | ||||||||||||||
if err := stream.SendMsg(&testpb.StreamingOutputCallRequest{}); status.Code(err) != codes.Canceled { | ||||||||||||||
t.Errorf("stream.SendMsg() = %v, want error %v", err, codes.Canceled) | ||||||||||||||
} | ||||||||||||||
close(handlerDone) | ||||||||||||||
return nil | ||||||||||||||
}, | ||||||||||||||
} | ||||||||||||||
if err := ss.Start(nil); err != nil { | ||||||||||||||
t.Fatal("Error starting server:", err) | ||||||||||||||
} | ||||||||||||||
defer ss.Stop() | ||||||||||||||
|
||||||||||||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) | ||||||||||||||
defer cancel() | ||||||||||||||
cc, err := grpc.NewClient(ss.Address, grpc.WithTransportCredentials(insecure.NewCredentials())) | ||||||||||||||
if err != nil { | ||||||||||||||
t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", ss.Address, err) | ||||||||||||||
} | ||||||||||||||
defer cc.Close() | ||||||||||||||
|
||||||||||||||
desc := &grpc.StreamDesc{ | ||||||||||||||
StreamName: "StreamingOutputCall", | ||||||||||||||
ServerStreams: true, | ||||||||||||||
ClientStreams: false, | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
stream, err := cc.NewStream(ctx, desc, "/grpc.testing.TestService/StreamingOutputCall") | ||||||||||||||
if err != nil { | ||||||||||||||
t.Fatalf("cc.NewStream() failed unexpectedly: %v", err) | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
if err := stream.SendMsg(&testpb.Empty{}); err != nil { | ||||||||||||||
t.Errorf("stream.SendMsg() = %v, want <nil>", err) | ||||||||||||||
} | ||||||||||||||
<-recvDoneOnServer | ||||||||||||||
if err := stream.SendMsg(&testpb.Empty{}); status.Code(err) != codes.Internal { | ||||||||||||||
t.Errorf("stream.SendMsg() = %v, want error %v", err, codes.Internal) | ||||||||||||||
} | ||||||||||||||
<-handlerDone | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
// Tests the behavior for unary RPC when client calls SendMsg twice. Second call | ||||||||||||||
// to SendMsg should fail with Internal error. | ||||||||||||||
func (s) TestUnaryRPC_ClientCallSendMsgTwice(t *testing.T) { | ||||||||||||||
dfawley marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||
ss := stubserver.StubServer{ | ||||||||||||||
UnaryCallF: func(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { | ||||||||||||||
return &testpb.SimpleResponse{}, nil | ||||||||||||||
}, | ||||||||||||||
} | ||||||||||||||
if err := ss.Start(nil); err != nil { | ||||||||||||||
t.Fatal("Error starting server:", err) | ||||||||||||||
} | ||||||||||||||
defer ss.Stop() | ||||||||||||||
|
||||||||||||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) | ||||||||||||||
defer cancel() | ||||||||||||||
cc, err := grpc.NewClient(ss.Address, grpc.WithTransportCredentials(insecure.NewCredentials())) | ||||||||||||||
if err != nil { | ||||||||||||||
t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", ss.Address, err) | ||||||||||||||
} | ||||||||||||||
defer cc.Close() | ||||||||||||||
|
||||||||||||||
desc := &grpc.StreamDesc{ | ||||||||||||||
StreamName: "UnaryCall", | ||||||||||||||
ServerStreams: false, | ||||||||||||||
ClientStreams: false, | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
stream, err := cc.NewStream(ctx, desc, "/grpc.testing.TestService/UnaryCall") | ||||||||||||||
if err != nil { | ||||||||||||||
t.Fatalf("cc.NewStream() failed unexpectedly: %v", err) | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
if err := stream.SendMsg(&testpb.Empty{}); err != nil { | ||||||||||||||
t.Errorf("stream.SendMsg() = %v, want <nil>", err) | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
if err := stream.SendMsg(&testpb.Empty{}); status.Code(err) != codes.Internal { | ||||||||||||||
t.Errorf("stream.SendMsg() = %v, want error %v", status.Code(err), codes.Internal) | ||||||||||||||
} | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
// Tests the behavior for server-side streaming RPC when client misbehaves as Bidi-streaming | ||||||||||||||
// and sends multiple messages. | ||||||||||||||
func (s) TestServerStreaming_ClientSendsMultipleMessages(t *testing.T) { | ||||||||||||||
// The initial call to recvMsg made by the generated code, will return the error. | ||||||||||||||
ss := stubserver.StubServer{} | ||||||||||||||
if err := ss.Start(nil); err != nil { | ||||||||||||||
t.Fatal("Error starting server:", err) | ||||||||||||||
} | ||||||||||||||
defer ss.Stop() | ||||||||||||||
|
||||||||||||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) | ||||||||||||||
defer cancel() | ||||||||||||||
cc, err := grpc.NewClient(ss.Address, grpc.WithTransportCredentials(insecure.NewCredentials())) | ||||||||||||||
if err != nil { | ||||||||||||||
t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", ss.Address, err) | ||||||||||||||
} | ||||||||||||||
defer cc.Close() | ||||||||||||||
|
||||||||||||||
// Making the client bi-di to bypass the client side checks that stop a non-streaming client | ||||||||||||||
// from sending multiple messages. | ||||||||||||||
desc := &grpc.StreamDesc{ | ||||||||||||||
StreamName: "StreamingOutputCall", | ||||||||||||||
ServerStreams: true, | ||||||||||||||
ClientStreams: true, | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
stream, err := cc.NewStream(ctx, desc, "/grpc.testing.TestService/StreamingOutputCall") | ||||||||||||||
if err != nil { | ||||||||||||||
t.Fatalf("cc.NewStream() failed unexpectedly: %v", err) | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
if err := stream.SendMsg(&testpb.Empty{}); err != nil { | ||||||||||||||
t.Errorf("stream.SendMsg() = %v, want <nil>", err) | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
if err := stream.SendMsg(&testpb.Empty{}); err != nil { | ||||||||||||||
t.Errorf("stream.SendMsg() = %v, want <nil>", err) | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
if err := stream.RecvMsg(&testpb.Empty{}); status.Code(err) != codes.Internal { | ||||||||||||||
t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal) | ||||||||||||||
} | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
// Tests the behavior of server for server-side streaming RPC when client sends zero request messages. | ||||||||||||||
func (s) TestServerStreaming_ServerRecvZeroRequests(t *testing.T) { | ||||||||||||||
testCases := []struct { | ||||||||||||||
name string | ||||||||||||||
desc *grpc.StreamDesc | ||||||||||||||
wantCode codes.Code | ||||||||||||||
}{ | ||||||||||||||
{ | ||||||||||||||
name: "BidiStreaming", | ||||||||||||||
desc: &grpc.StreamDesc{ | ||||||||||||||
StreamName: "StreamingOutputCall", | ||||||||||||||
ServerStreams: true, | ||||||||||||||
ClientStreams: true, | ||||||||||||||
}, | ||||||||||||||
wantCode: codes.Internal, | ||||||||||||||
}, | ||||||||||||||
{ | ||||||||||||||
name: "ClientStreaming", | ||||||||||||||
desc: &grpc.StreamDesc{ | ||||||||||||||
StreamName: "StreamingOutputCall", | ||||||||||||||
ServerStreams: false, | ||||||||||||||
ClientStreams: true, | ||||||||||||||
}, | ||||||||||||||
wantCode: codes.Internal, | ||||||||||||||
}, | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
for _, tc := range testCases { | ||||||||||||||
// The initial call to recvMsg made by the generated code, will return the error. | ||||||||||||||
ss := stubserver.StubServer{} | ||||||||||||||
if err := ss.Start(nil); err != nil { | ||||||||||||||
t.Fatal("Error starting server:", err) | ||||||||||||||
} | ||||||||||||||
defer ss.Stop() | ||||||||||||||
|
||||||||||||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) | ||||||||||||||
defer cancel() | ||||||||||||||
cc, err := grpc.NewClient(ss.Address, grpc.WithTransportCredentials(insecure.NewCredentials())) | ||||||||||||||
if err != nil { | ||||||||||||||
t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", ss.Address, err) | ||||||||||||||
} | ||||||||||||||
defer cc.Close() | ||||||||||||||
|
||||||||||||||
stream, err := cc.NewStream(ctx, tc.desc, "/grpc.testing.TestService/StreamingOutputCall") | ||||||||||||||
if err != nil { | ||||||||||||||
t.Fatalf("cc.NewStream() failed unexpectedly: %v", err) | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
if err := stream.CloseSend(); err != nil { | ||||||||||||||
t.Errorf("stream.CloseSend() = %v, want <nil>", err) | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
if err := stream.RecvMsg(&testpb.Empty{}); status.Code(err) != tc.wantCode { | ||||||||||||||
t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), tc.wantCode) | ||||||||||||||
} | ||||||||||||||
} | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
// Tests the behavior of client for server-side streaming RPC when client sends zero request messages. | ||||||||||||||
func (s) TestServerStreaming_ClientSendsZeroRequests(t *testing.T) { | ||||||||||||||
t.Skip() | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please add a string here and include the issue number (#7286) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please use the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||||||||||||||
// The initial call to recvMsg made by the generated code, will return the error. | ||||||||||||||
ss := stubserver.StubServer{} | ||||||||||||||
if err := ss.Start(nil); err != nil { | ||||||||||||||
t.Fatal("Error starting server:", err) | ||||||||||||||
} | ||||||||||||||
defer ss.Stop() | ||||||||||||||
|
||||||||||||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) | ||||||||||||||
defer cancel() | ||||||||||||||
cc, err := grpc.NewClient(ss.Address, grpc.WithTransportCredentials(insecure.NewCredentials())) | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you switch all these tests to use local credentials instead of insecure? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||||||||||||||
if err != nil { | ||||||||||||||
t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", ss.Address, err) | ||||||||||||||
} | ||||||||||||||
defer cc.Close() | ||||||||||||||
|
||||||||||||||
desc := &grpc.StreamDesc{ | ||||||||||||||
arjan-bal marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||
StreamName: "StreamingOutputCall", | ||||||||||||||
ServerStreams: true, | ||||||||||||||
ClientStreams: false, | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems like we should have a test case that sets this (And if we don't have such a check we may want to add one.) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just to clarify — you're suggesting me to add a test where the client behaves as client/bidi-streaming and sends zero request, while server behave as server-streaming, and then assert that it fails on the server side due to a cardinality violation. Is that correct? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correct. This exact test, pretty much, but set this field to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have modified the test to a table-driven test, where server will run against multiple streamdesc including client-streaming, server-streaming and bidi-streaming. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The key difference between the two cases that I'd like to see is whether the client knows it's required to send a message. In the case where the client knows ( |
||||||||||||||
} | ||||||||||||||
|
||||||||||||||
stream, err := cc.NewStream(ctx, desc, "/grpc.testing.TestService/StreamingOutputCall") | ||||||||||||||
if err != nil { | ||||||||||||||
t.Fatalf("cc.NewStream() failed unexpectedly: %v", err) | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
if err := stream.CloseSend(); status.Code(err) != codes.Internal { | ||||||||||||||
t.Errorf("stream.CloseSend() = %v, want error %v", status.Code(err), codes.Internal) | ||||||||||||||
} | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
// Tests that a client receives a cardinality violation error for client-streaming | ||||||||||||||
// RPCs if the server call SendMsg multiple times. | ||||||||||||||
func (s) TestClientStreaming_ServerHandlerSendMsgAfterSendMsg(t *testing.T) { | ||||||||||||||
|
Uh oh!
There was an error while loading. Please reload this page.