Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
298edba
handled cardinality violation and added tests
Pranjali-2501 Jun 6, 2025
4c3a18a
Merge branch 'master' into server_streaming
Pranjali-2501 Jun 6, 2025
fd5d614
remove vet errors
Pranjali-2501 Jun 6, 2025
7e4206c
modified tests
Pranjali-2501 Jun 6, 2025
324566b
modified tests
Pranjali-2501 Jun 6, 2025
9fbf6b7
Merge branch 'master' into server_streaming
Pranjali-2501 Jun 19, 2025
9b9cddf
change server.recvmsg() to catch cardinality violation
Pranjali-2501 Jun 23, 2025
63565f2
Merge branch 'master' into server_streaming
Pranjali-2501 Jun 23, 2025
67549e7
replace srv with _
Pranjali-2501 Jun 23, 2025
6030b90
resolving comments
Pranjali-2501 Jul 1, 2025
dff08b3
resolving vets
Pranjali-2501 Jul 1, 2025
f4f1b61
addressed comments
Pranjali-2501 Jul 8, 2025
83e8664
resolving vets
Pranjali-2501 Jul 8, 2025
5acd9ab
resolving vets
Pranjali-2501 Jul 8, 2025
8f4f39b
minor change
Pranjali-2501 Jul 15, 2025
6de922d
minor change
Pranjali-2501 Jul 15, 2025
5f6c715
minor change
Pranjali-2501 Jul 22, 2025
990efe1
resolving comments
Pranjali-2501 Jul 24, 2025
7f6d31f
resolving nits
Pranjali-2501 Jul 24, 2025
41d8328
vet changes
Pranjali-2501 Jul 24, 2025
59ad122
added comment
Pranjali-2501 Jul 24, 2025
0f5248a
added comment
Pranjali-2501 Jul 25, 2025
1ff8868
resolving comments
Pranjali-2501 Jul 28, 2025
68fd0f8
update comment
Pranjali-2501 Jul 28, 2025
19e9e71
nits
Pranjali-2501 Jul 29, 2025
0d30b18
modifying test
Pranjali-2501 Jul 30, 2025
83fd598
resolving vet
Pranjali-2501 Jul 30, 2025
efa8e5a
remove comment
Pranjali-2501 Jul 30, 2025
29f6657
modifying tests
Pranjali-2501 Jul 31, 2025
183b1da
resolving comments
Pranjali-2501 Aug 3, 2025
749a52c
resolving comments
Pranjali-2501 Aug 4, 2025
1b1800d
resolving comments
Pranjali-2501 Aug 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1598,6 +1598,7 @@ func (s *Server) processStreamingRPC(ctx context.Context, stream *transport.Serv
s: stream,
p: &parser{r: stream, bufferPool: s.opts.bufferPool},
codec: s.getCodec(stream.ContentSubtype()),
desc: sd,
maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
maxSendMessageSize: s.opts.maxSendMessageSize,
trInfo: trInfo,
Expand Down
22 changes: 21 additions & 1 deletion stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -1580,6 +1580,7 @@ type serverStream struct {
s *transport.ServerStream
p *parser
codec baseCodec
desc *StreamDesc

compressorV0 Compressor
compressorV1 encoding.Compressor
Expand All @@ -1588,6 +1589,8 @@ type serverStream struct {

sendCompressorName string

recvFirstMsg bool // set after the first message is received

maxReceiveMessageSize int
maxSendMessageSize int
trInfo *traceInfo
Expand Down Expand Up @@ -1774,13 +1777,18 @@ func (ss *serverStream) RecvMsg(m any) (err error) {
binlog.Log(ss.ctx, chc)
}
}
// Received no request msg for non-client streaming rpcs.
if !ss.desc.ClientStreams && !ss.recvFirstMsg {
return status.Error(codes.Internal, "cardinality violation: received no request message from non-client-streaming RPC")
}
return err
}
if err == io.ErrUnexpectedEOF {
err = status.Error(codes.Internal, io.ErrUnexpectedEOF.Error())
}
return toRPCErr(err)
}
ss.recvFirstMsg = true
if len(ss.statsHandler) != 0 {
for _, sh := range ss.statsHandler {
sh.HandleRPC(ss.s.Context(), &stats.InPayload{
Expand All @@ -1800,7 +1808,19 @@ func (ss *serverStream) RecvMsg(m any) (err error) {
binlog.Log(ss.ctx, cm)
}
}
return nil

if ss.desc.ClientStreams {
// Subsequent messages should be received by subsequent RecvMsg calls.
return nil
}
// Special handling for non-client-stream rpcs.
// This recv expects EOF or errors, so we don't collect inPayload.
if err := recv(ss.p, ss.codec, ss.s, ss.decompressorV0, m, ss.maxReceiveMessageSize, nil, ss.decompressorV1, true); err == io.EOF {
return nil
} else if err != nil {
return err
}
return status.Error(codes.Internal, "cardinality violation: received multiple request messages for non-client-streaming RPC")
}

// MethodFromServerStream returns the method string for the input stream.
Expand Down
240 changes: 240 additions & 0 deletions test/end2end_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import (
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/credentials/local"
"google.golang.org/grpc/health"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/binarylog"
Expand Down Expand Up @@ -3740,6 +3741,245 @@ func (s) TestClientStreaming_ReturnErrorAfterSendAndClose(t *testing.T) {
}
}

// Tests the behavior for server-side streaming when client calls SendMsg twice.
// Second call to SendMsg should fail with Internal error and result in closing
// the connection with a RST_STREAM.
func (s) TestServerStreaming_ClientCallSendMsgTwice(t *testing.T) {
// To ensure initial call to server.recvMsg() made by the generated code is successfully
// completed. Otherwise, if the client attempts to send a second request message, that
// will trigger a RST_STREAM from the client due to the application violating the RPC's
// protocol. The RST_STREAM could cause the server’s first RecvMsg to fail and will prevent
// the method handler from being called.
recvDoneOnServer := make(chan struct{})
// To ensure goroutine for test does not end before RPC handler performs error
// checking.
handlerDone := make(chan struct{})
ss := stubserver.StubServer{
StreamingOutputCallF: func(_ *testpb.StreamingOutputCallRequest, stream testgrpc.TestService_StreamingOutputCallServer) error {
close(recvDoneOnServer)
// Block until the stream’s context is done. Second call to client.SendMsg
// triggers a RST_STREAM which cancels the stream context on the server.
<-stream.Context().Done()
if err := stream.SendMsg(&testpb.StreamingOutputCallRequest{}); status.Code(err) != codes.Canceled {
t.Errorf("stream.SendMsg() = %v, want error %v", err, codes.Canceled)
}
close(handlerDone)
return nil
},
}
if err := ss.Start(nil); err != nil {
t.Fatal("Error starting server:", err)
}
defer ss.Stop()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc, err := grpc.NewClient(ss.Address, grpc.WithTransportCredentials(local.NewCredentials()))
if err != nil {
t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", ss.Address, err)
}
defer cc.Close()

desc := &grpc.StreamDesc{
StreamName: "StreamingOutputCall",
ServerStreams: true,
ClientStreams: false,
}

stream, err := cc.NewStream(ctx, desc, "/grpc.testing.TestService/StreamingOutputCall")
if err != nil {
t.Fatalf("cc.NewStream() failed unexpectedly: %v", err)
}

if err := stream.SendMsg(&testpb.Empty{}); err != nil {
t.Errorf("stream.SendMsg() = %v, want <nil>", err)
}

<-recvDoneOnServer
if err := stream.SendMsg(&testpb.Empty{}); status.Code(err) != codes.Internal {
t.Errorf("stream.SendMsg() = %v, want error %v", err, codes.Internal)
}
<-handlerDone
}

// TODO(i/7286) : Add tests to check server-side behavior for Unary RPC.
// Tests the behavior for unary RPC when client calls SendMsg twice. Second call
// to SendMsg should fail with Internal error.
func (s) TestUnaryRPC_ClientCallSendMsgTwice(t *testing.T) {
ss := stubserver.StubServer{
UnaryCallF: func(context.Context, *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{}, nil
},
}
if err := ss.Start(nil); err != nil {
t.Fatal("Error starting server:", err)
}
defer ss.Stop()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc, err := grpc.NewClient(ss.Address, grpc.WithTransportCredentials(local.NewCredentials()))
if err != nil {
t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", ss.Address, err)
}
defer cc.Close()

desc := &grpc.StreamDesc{
StreamName: "UnaryCall",
ServerStreams: false,
ClientStreams: false,
}

stream, err := cc.NewStream(ctx, desc, "/grpc.testing.TestService/UnaryCall")
if err != nil {
t.Fatalf("cc.NewStream() failed unexpectedly: %v", err)
}

if err := stream.SendMsg(&testpb.Empty{}); err != nil {
t.Errorf("stream.SendMsg() = %v, want <nil>", err)
}

if err := stream.SendMsg(&testpb.Empty{}); status.Code(err) != codes.Internal {
t.Errorf("stream.SendMsg() = %v, want error %v", status.Code(err), codes.Internal)
}
}

// Tests the behavior for server-side streaming RPC when client misbehaves as Bidi-streaming
// and sends multiple messages.
func (s) TestServerStreaming_ClientSendsMultipleMessages(t *testing.T) {
// The initial call to recvMsg made by the generated code, will return the error.
ss := stubserver.StubServer{}
if err := ss.Start(nil); err != nil {
t.Fatal("Error starting server:", err)
}
defer ss.Stop()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc, err := grpc.NewClient(ss.Address, grpc.WithTransportCredentials(local.NewCredentials()))
if err != nil {
t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", ss.Address, err)
}
defer cc.Close()

// Making the client bi-di to bypass the client side checks that stop a non-streaming client
// from sending multiple messages.
desc := &grpc.StreamDesc{
StreamName: "StreamingOutputCall",
ServerStreams: true,
ClientStreams: true,
}

stream, err := cc.NewStream(ctx, desc, "/grpc.testing.TestService/StreamingOutputCall")
if err != nil {
t.Fatalf("cc.NewStream() failed unexpectedly: %v", err)
}

if err := stream.SendMsg(&testpb.Empty{}); err != nil {
t.Errorf("stream.SendMsg() = %v, want <nil>", err)
}

if err := stream.SendMsg(&testpb.Empty{}); err != nil {
t.Errorf("stream.SendMsg() = %v, want <nil>", err)
}

if err := stream.RecvMsg(&testpb.Empty{}); status.Code(err) != codes.Internal {
t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), codes.Internal)
}
}

// Tests the behavior of server for server-side streaming RPC when client sends zero request messages.
func (s) TestServerStreaming_ServerRecvZeroRequests(t *testing.T) {
testCases := []struct {
name string
desc *grpc.StreamDesc
wantCode codes.Code
}{
{
name: "BidiStreaming",
desc: &grpc.StreamDesc{
StreamName: "StreamingOutputCall",
ServerStreams: true,
ClientStreams: true,
},
wantCode: codes.Internal,
},
{
name: "ClientStreaming",
desc: &grpc.StreamDesc{
StreamName: "StreamingOutputCall",
ServerStreams: false,
ClientStreams: true,
},
wantCode: codes.Internal,
},
}

for _, tc := range testCases {
// The initial call to recvMsg made by the generated code, will return the error.
ss := stubserver.StubServer{}
if err := ss.Start(nil); err != nil {
t.Fatal("Error starting server:", err)
}
defer ss.Stop()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc, err := grpc.NewClient(ss.Address, grpc.WithTransportCredentials(local.NewCredentials()))
if err != nil {
t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", ss.Address, err)
}
defer cc.Close()

stream, err := cc.NewStream(ctx, tc.desc, "/grpc.testing.TestService/StreamingOutputCall")
if err != nil {
t.Fatalf("cc.NewStream() failed unexpectedly: %v", err)
}

if err := stream.CloseSend(); err != nil {
t.Errorf("stream.CloseSend() = %v, want <nil>", err)
}

if err := stream.RecvMsg(&testpb.Empty{}); status.Code(err) != tc.wantCode {
t.Errorf("stream.RecvMsg() = %v, want error %v", status.Code(err), tc.wantCode)
}
}
}

// Tests the behavior of client for server-side streaming RPC when client sends zero request messages.
func (s) TestServerStreaming_ClientSendsZeroRequests(t *testing.T) {
t.Skip("blocked on i/7286")
// The initial call to recvMsg made by the generated code, will return the error.
ss := stubserver.StubServer{}
if err := ss.Start(nil); err != nil {
t.Fatal("Error starting server:", err)
}
defer ss.Stop()

ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc, err := grpc.NewClient(ss.Address, grpc.WithTransportCredentials(local.NewCredentials()))
if err != nil {
t.Fatalf("grpc.NewClient(%q) failed unexpectedly: %v", ss.Address, err)
}
defer cc.Close()

desc := &grpc.StreamDesc{
StreamName: "StreamingOutputCall",
ServerStreams: true,
ClientStreams: false,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like we should have a test case that sets this true, also, and ensures that the server errors? This is theoretically catching a client-side check that errors if zero requests are sent before CloseSend is called when the client knows it is not client-streaming.

(And if we don't have such a check we may want to add one.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to clarify — you're suggesting me to add a test where the client behaves as client/bidi-streaming and sends zero request, while server behave as server-streaming, and then assert that it fails on the server side due to a cardinality violation. Is that correct?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. This exact test, pretty much, but set this field to true.

Copy link
Contributor Author

@Pranjali-2501 Pranjali-2501 Jul 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have modified the test to a table-driven test, where server will run against multiple streamdesc including client-streaming, server-streaming and bidi-streaming.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The key difference between the two cases that I'd like to see is whether the client knows it's required to send a message.

In the case where the client knows (ClientStreams: false), we should detect the error locally and send a RST_STREAM to the server, but if the client doesn't know (ClientStreams: true), the server should detect the error and end the stream with an INTERNAL error. Can we confirm these things are happening? (And AFACT the test will fail since the client doesn't check whether it has sent a message in CloseSend, so it's fine to make that test case Skip until we fix it in another PR.)

}

stream, err := cc.NewStream(ctx, desc, "/grpc.testing.TestService/StreamingOutputCall")
if err != nil {
t.Fatalf("cc.NewStream() failed unexpectedly: %v", err)
}

if err := stream.CloseSend(); status.Code(err) != codes.Internal {
t.Errorf("stream.CloseSend() = %v, want error %v", status.Code(err), codes.Internal)
}
}

// Tests that a client receives a cardinality violation error for client-streaming
// RPCs if the server call SendMsg multiple times.
func (s) TestClientStreaming_ServerHandlerSendMsgAfterSendMsg(t *testing.T) {
Expand Down