mirror of https://github.com/grpc/grpc-go.git
Update flow control test to have multiple concurrent streams. (#2170)
This commit is contained in:
parent
92d38b03b1
commit
8e18752766
|
@ -1812,47 +1812,69 @@ func testFlowControlAccountCheck(t *testing.T, msgSize int, wc windowSizeConfig)
|
|||
}
|
||||
server.mu.Unlock()
|
||||
ct := client.(*http2Client)
|
||||
cstream, err := client.NewStream(context.Background(), &CallHdr{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create stream. Err: %v", err)
|
||||
}
|
||||
msg := make([]byte, msgSize)
|
||||
buf := make([]byte, msgSize+5)
|
||||
buf[0] = byte(0)
|
||||
binary.BigEndian.PutUint32(buf[1:], uint32(msgSize))
|
||||
copy(buf[5:], msg)
|
||||
opts := Options{}
|
||||
header := make([]byte, 5)
|
||||
for i := 1; i <= 10; i++ {
|
||||
if err := ct.Write(cstream, nil, buf, &opts); err != nil {
|
||||
t.Fatalf("Error on client while writing message: %v", err)
|
||||
}
|
||||
if _, err := cstream.Read(header); err != nil {
|
||||
t.Fatalf("Error on client while reading data frame header: %v", err)
|
||||
}
|
||||
sz := binary.BigEndian.Uint32(header[1:])
|
||||
recvMsg := make([]byte, int(sz))
|
||||
if _, err := cstream.Read(recvMsg); err != nil {
|
||||
t.Fatalf("Error on client while reading data: %v", err)
|
||||
}
|
||||
if len(recvMsg) != len(msg) {
|
||||
t.Fatalf("Length of message received by client: %v, want: %v", len(recvMsg), len(msg))
|
||||
const numStreams = 10
|
||||
clientStreams := make([]*Stream, numStreams)
|
||||
for i := 0; i < numStreams; i++ {
|
||||
var err error
|
||||
clientStreams[i], err = client.NewStream(context.Background(), &CallHdr{})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create stream. Err: %v", err)
|
||||
}
|
||||
}
|
||||
var sstream *Stream
|
||||
var wg sync.WaitGroup
|
||||
// For each stream send pingpong messages to the server.
|
||||
for _, stream := range clientStreams {
|
||||
wg.Add(1)
|
||||
go func(stream *Stream) {
|
||||
defer wg.Done()
|
||||
buf := make([]byte, msgSize+5)
|
||||
buf[0] = byte(0)
|
||||
binary.BigEndian.PutUint32(buf[1:], uint32(msgSize))
|
||||
opts := Options{}
|
||||
header := make([]byte, 5)
|
||||
for i := 1; i <= 10; i++ {
|
||||
if err := ct.Write(stream, nil, buf, &opts); err != nil {
|
||||
t.Errorf("Error on client while writing message: %v", err)
|
||||
return
|
||||
}
|
||||
if _, err := stream.Read(header); err != nil {
|
||||
t.Errorf("Error on client while reading data frame header: %v", err)
|
||||
return
|
||||
}
|
||||
sz := binary.BigEndian.Uint32(header[1:])
|
||||
recvMsg := make([]byte, int(sz))
|
||||
if _, err := stream.Read(recvMsg); err != nil {
|
||||
t.Errorf("Error on client while reading data: %v", err)
|
||||
return
|
||||
}
|
||||
if len(recvMsg) != msgSize {
|
||||
t.Errorf("Length of message received by client: %v, want: %v", len(recvMsg), msgSize)
|
||||
return
|
||||
}
|
||||
}
|
||||
}(stream)
|
||||
}
|
||||
wg.Wait()
|
||||
serverStreams := map[uint32]*Stream{}
|
||||
loopyClientStreams := map[uint32]*outStream{}
|
||||
loopyServerStreams := map[uint32]*outStream{}
|
||||
// Get all the streams from server reader and writer and client writer.
|
||||
st.mu.Lock()
|
||||
for _, v := range st.activeStreams {
|
||||
sstream = v
|
||||
for _, stream := range clientStreams {
|
||||
id := stream.id
|
||||
serverStreams[id] = st.activeStreams[id]
|
||||
loopyServerStreams[id] = st.loopy.estdStreams[id]
|
||||
loopyClientStreams[id] = ct.loopy.estdStreams[id]
|
||||
|
||||
}
|
||||
st.mu.Unlock()
|
||||
loopyServerStream := st.loopy.estdStreams[sstream.id]
|
||||
loopyClientStream := ct.loopy.estdStreams[cstream.id]
|
||||
ct.Write(cstream, nil, nil, &Options{Last: true}) // Close the stream.
|
||||
if _, err := cstream.Read(header); err != io.EOF {
|
||||
t.Fatalf("Client expected an EOF from the server. Got: %v", err)
|
||||
// Close all streams
|
||||
for _, stream := range clientStreams {
|
||||
ct.Write(stream, nil, nil, &Options{Last: true})
|
||||
if _, err := stream.Read(make([]byte, 5)); err != io.EOF {
|
||||
t.Fatalf("Client expected an EOF from the server. Got: %v", err)
|
||||
}
|
||||
}
|
||||
// Sleep for a little to make sure both sides flush out their buffers.
|
||||
time.Sleep(time.Millisecond * 500)
|
||||
// Close down both server and client so that their internals can be read without data
|
||||
// races.
|
||||
ct.Close()
|
||||
|
@ -1861,6 +1883,19 @@ func testFlowControlAccountCheck(t *testing.T, msgSize int, wc windowSizeConfig)
|
|||
<-st.writerDone
|
||||
<-ct.readerDone
|
||||
<-ct.writerDone
|
||||
for _, cstream := range clientStreams {
|
||||
id := cstream.id
|
||||
sstream := serverStreams[id]
|
||||
loopyServerStream := loopyServerStreams[id]
|
||||
loopyClientStream := loopyClientStreams[id]
|
||||
// Check stream flow control.
|
||||
if int(cstream.fc.limit+cstream.fc.delta-cstream.fc.pendingData-cstream.fc.pendingUpdate) != int(st.loopy.oiws)-loopyServerStream.bytesOutStanding {
|
||||
t.Fatalf("Account mismatch: client stream inflow limit(%d) + delta(%d) - pendingData(%d) - pendingUpdate(%d) != server outgoing InitialWindowSize(%d) - outgoingStream.bytesOutStanding(%d)", cstream.fc.limit, cstream.fc.delta, cstream.fc.pendingData, cstream.fc.pendingUpdate, st.loopy.oiws, loopyServerStream.bytesOutStanding)
|
||||
}
|
||||
if int(sstream.fc.limit+sstream.fc.delta-sstream.fc.pendingData-sstream.fc.pendingUpdate) != int(ct.loopy.oiws)-loopyClientStream.bytesOutStanding {
|
||||
t.Fatalf("Account mismatch: server stream inflow limit(%d) + delta(%d) - pendingData(%d) - pendingUpdate(%d) != client outgoing InitialWindowSize(%d) - outgoingStream.bytesOutStanding(%d)", sstream.fc.limit, sstream.fc.delta, sstream.fc.pendingData, sstream.fc.pendingUpdate, ct.loopy.oiws, loopyClientStream.bytesOutStanding)
|
||||
}
|
||||
}
|
||||
// Check transport flow control.
|
||||
if ct.fc.limit != ct.fc.unacked+st.loopy.sendQuota {
|
||||
t.Fatalf("Account mismatch: client transport inflow(%d) != client unacked(%d) + server sendQuota(%d)", ct.fc.limit, ct.fc.unacked, st.loopy.sendQuota)
|
||||
|
@ -1868,13 +1903,6 @@ func testFlowControlAccountCheck(t *testing.T, msgSize int, wc windowSizeConfig)
|
|||
if st.fc.limit != st.fc.unacked+ct.loopy.sendQuota {
|
||||
t.Fatalf("Account mismatch: server transport inflow(%d) != server unacked(%d) + client sendQuota(%d)", st.fc.limit, st.fc.unacked, ct.loopy.sendQuota)
|
||||
}
|
||||
// Check stream flow control.
|
||||
if int(cstream.fc.limit+cstream.fc.delta-cstream.fc.pendingData-cstream.fc.pendingUpdate) != int(st.loopy.oiws)-loopyServerStream.bytesOutStanding {
|
||||
t.Fatalf("Account mismatch: client stream inflow limit(%d) + delta(%d) - pendingData(%d) - pendingUpdate(%d) != server outgoing InitialWindowSize(%d) - outgoingStream.bytesOutStanding(%d)", cstream.fc.limit, cstream.fc.delta, cstream.fc.pendingData, cstream.fc.pendingUpdate, st.loopy.oiws, loopyServerStream.bytesOutStanding)
|
||||
}
|
||||
if int(sstream.fc.limit+sstream.fc.delta-sstream.fc.pendingData-sstream.fc.pendingUpdate) != int(ct.loopy.oiws)-loopyClientStream.bytesOutStanding {
|
||||
t.Fatalf("Account mismatch: server stream inflow limit(%d) + delta(%d) - pendingData(%d) - pendingUpdate(%d) != client outgoing InitialWindowSize(%d) - outgoingStream.bytesOutStanding(%d)", sstream.fc.limit, sstream.fc.delta, sstream.fc.pendingData, sstream.fc.pendingUpdate, ct.loopy.oiws, loopyClientStream.bytesOutStanding)
|
||||
}
|
||||
}
|
||||
|
||||
func waitWhileTrue(t *testing.T, condition func() (bool, error)) {
|
||||
|
|
Loading…
Reference in New Issue