mirror of https://github.com/grpc/grpc-go.git
balancer: add trailer metadata to DoneInfo (#2359)
This commit is contained in:
parent
55cdff2adc
commit
c195587d96
|
@ -28,6 +28,7 @@ import (
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
"google.golang.org/grpc/connectivity"
|
"google.golang.org/grpc/connectivity"
|
||||||
"google.golang.org/grpc/credentials"
|
"google.golang.org/grpc/credentials"
|
||||||
|
"google.golang.org/grpc/metadata"
|
||||||
"google.golang.org/grpc/resolver"
|
"google.golang.org/grpc/resolver"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -160,6 +161,8 @@ type PickOptions struct {
|
||||||
type DoneInfo struct {
|
type DoneInfo struct {
|
||||||
// Err is the rpc error the RPC finished with. It could be nil.
|
// Err is the rpc error the RPC finished with. It could be nil.
|
||||||
Err error
|
Err error
|
||||||
|
// Trailer contains the metadata from the RPC's trailer, if present.
|
||||||
|
Trailer metadata.MD
|
||||||
// BytesSent indicates if any bytes have been sent to the server.
|
// BytesSent indicates if any bytes have been sent to the server.
|
||||||
BytesSent bool
|
BytesSent bool
|
||||||
// BytesReceived indicates if any byte has been received from the server.
|
// BytesReceived indicates if any byte has been received from the server.
|
||||||
|
|
|
@ -682,7 +682,9 @@ func (t *http2Client) CloseStream(s *Stream, err error) {
|
||||||
func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.ErrCode, st *status.Status, mdata map[string][]string, eosReceived bool) {
|
func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.ErrCode, st *status.Status, mdata map[string][]string, eosReceived bool) {
|
||||||
// Set stream status to done.
|
// Set stream status to done.
|
||||||
if s.swapState(streamDone) == streamDone {
|
if s.swapState(streamDone) == streamDone {
|
||||||
// If it was already done, return.
|
// If it was already done, return. If multiple closeStream calls
|
||||||
|
// happen simultaneously, wait for the first to finish.
|
||||||
|
<-s.done
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// status and trailers can be updated here without any synchronization because the stream goroutine will
|
// status and trailers can be updated here without any synchronization because the stream goroutine will
|
||||||
|
@ -696,8 +698,6 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
|
||||||
// This will unblock reads eventually.
|
// This will unblock reads eventually.
|
||||||
s.write(recvMsg{err: err})
|
s.write(recvMsg{err: err})
|
||||||
}
|
}
|
||||||
// This will unblock write.
|
|
||||||
close(s.done)
|
|
||||||
// If headerChan isn't closed, then close it.
|
// If headerChan isn't closed, then close it.
|
||||||
if atomic.SwapUint32(&s.headerDone, 1) == 0 {
|
if atomic.SwapUint32(&s.headerDone, 1) == 0 {
|
||||||
s.noHeaders = true
|
s.noHeaders = true
|
||||||
|
@ -733,6 +733,8 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
t.controlBuf.executeAndPut(addBackStreamQuota, cleanup)
|
t.controlBuf.executeAndPut(addBackStreamQuota, cleanup)
|
||||||
|
// This will unblock write.
|
||||||
|
close(s.done)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close kicks off the shutdown process of the transport. This should be called
|
// Close kicks off the shutdown process of the transport. This should be called
|
||||||
|
|
|
@ -816,11 +816,14 @@ func (a *csAttempt) finish(err error) {
|
||||||
|
|
||||||
if a.done != nil {
|
if a.done != nil {
|
||||||
br := false
|
br := false
|
||||||
|
var tr metadata.MD
|
||||||
if a.s != nil {
|
if a.s != nil {
|
||||||
br = a.s.BytesReceived()
|
br = a.s.BytesReceived()
|
||||||
|
tr = a.s.Trailer()
|
||||||
}
|
}
|
||||||
a.done(balancer.DoneInfo{
|
a.done(balancer.DoneInfo{
|
||||||
Err: err,
|
Err: err,
|
||||||
|
Trailer: tr,
|
||||||
BytesSent: a.s != nil,
|
BytesSent: a.s != nil,
|
||||||
BytesReceived: br,
|
BytesReceived: br,
|
||||||
})
|
})
|
||||||
|
|
|
@ -152,7 +152,7 @@ func testDoneInfo(t *testing.T, e env) {
|
||||||
grpc.WithBalancerName(testBalancerName),
|
grpc.WithBalancerName(testBalancerName),
|
||||||
}
|
}
|
||||||
te.userAgent = failAppUA
|
te.userAgent = failAppUA
|
||||||
te.startServer(&testServer{security: e.security, unaryCallSleepTime: time.Second})
|
te.startServer(&testServer{security: e.security})
|
||||||
defer te.tearDown()
|
defer te.tearDown()
|
||||||
|
|
||||||
cc := te.clientConn()
|
cc := te.clientConn()
|
||||||
|
@ -164,7 +164,14 @@ func testDoneInfo(t *testing.T, e env) {
|
||||||
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); !reflect.DeepEqual(err, wantErr) {
|
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); !reflect.DeepEqual(err, wantErr) {
|
||||||
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %v", err, wantErr)
|
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %v", err, wantErr)
|
||||||
}
|
}
|
||||||
if len(b.doneInfo) != 1 || !reflect.DeepEqual(b.doneInfo[0].Err, wantErr) {
|
if _, err := tc.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
|
||||||
|
t.Fatalf("TestService.UnaryCall(%v, _, _, _) = _, %v; want _, <nil>", ctx, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(b.doneInfo) < 1 || !reflect.DeepEqual(b.doneInfo[0].Err, wantErr) {
|
||||||
t.Fatalf("b.doneInfo = %v; want b.doneInfo[0].Err = %v", b.doneInfo, wantErr)
|
t.Fatalf("b.doneInfo = %v; want b.doneInfo[0].Err = %v", b.doneInfo, wantErr)
|
||||||
}
|
}
|
||||||
|
if len(b.doneInfo) < 2 || !reflect.DeepEqual(b.doneInfo[1].Trailer, testTrailerMetadata) {
|
||||||
|
t.Fatalf("b.doneInfo = %v; want b.doneInfo[1].Trailer = %v", b.doneInfo, testTrailerMetadata)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue