mirror of https://github.com/grpc/grpc-go.git
client: fix bug with cancellation propagation for unary RPCs (#3106)
This commit is contained in:
parent
5e3ef93a5f
commit
7c3115d8bb
|
|
@ -318,8 +318,7 @@ func (s *Stream) waitOnHeader() {
|
||||||
case <-s.ctx.Done():
|
case <-s.ctx.Done():
|
||||||
// Close the stream to prevent headers/trailers from changing after
|
// Close the stream to prevent headers/trailers from changing after
|
||||||
// this function returns.
|
// this function returns.
|
||||||
err := ContextErr(s.ctx.Err())
|
s.ct.CloseStream(s, ContextErr(s.ctx.Err()))
|
||||||
s.ct.closeStream(s, err, false, 0, status.Convert(err), nil, false)
|
|
||||||
// headerChan could possibly not be closed yet if closeStream raced
|
// headerChan could possibly not be closed yet if closeStream raced
|
||||||
// with operateHeaders; wait until it is closed explicitly here.
|
// with operateHeaders; wait until it is closed explicitly here.
|
||||||
<-s.headerChan
|
<-s.headerChan
|
||||||
|
|
|
||||||
|
|
@ -7482,3 +7482,47 @@ func (s) TestGRPCMethodAccessibleToCredsViaContextRequestInfo(t *testing.T) {
|
||||||
t.Fatalf("ss.client.EmptyCall(_, _) = _, %v; want _, _.Message()=%q", err, wantMethod)
|
t.Fatalf("ss.client.EmptyCall(_, _) = _, %v; want _, _.Message()=%q", err, wantMethod)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s) TestClientCancellationPropagatesUnary(t *testing.T) {
|
||||||
|
wg := &sync.WaitGroup{}
|
||||||
|
called, done := make(chan struct{}), make(chan struct{})
|
||||||
|
ss := &stubServer{
|
||||||
|
emptyCall: func(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) {
|
||||||
|
close(called)
|
||||||
|
<-ctx.Done()
|
||||||
|
err := ctx.Err()
|
||||||
|
if err != context.Canceled {
|
||||||
|
t.Errorf("ctx.Err() = %v; want context.Canceled", err)
|
||||||
|
}
|
||||||
|
close(done)
|
||||||
|
return nil, err
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if err := ss.Start(nil); err != nil {
|
||||||
|
t.Fatalf("Error starting endpoint server: %v", err)
|
||||||
|
}
|
||||||
|
defer ss.Stop()
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||||
|
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
if _, err := ss.client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Canceled {
|
||||||
|
t.Errorf("ss.client.EmptyCall() = _, %v; want _, Code()=codes.Canceled", err)
|
||||||
|
}
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-called:
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
t.Fatalf("failed to perform EmptyCall after 10s")
|
||||||
|
}
|
||||||
|
cancel()
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
case <-time.After(5 * time.Second):
|
||||||
|
t.Fatalf("server failed to close done chan due to cancellation propagation")
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue