This commit is contained in:
iamqizhao 2016-07-19 16:29:15 -07:00
commit 0e86f69ef3
2 changed files with 22 additions and 3 deletions

View File

@ -257,7 +257,17 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
if err != nil { if err != nil {
cs.finish(err) cs.finish(err)
} }
if err == nil || err == io.EOF { if err == nil {
return
}
if err == io.EOF {
// Specialize the process for server streaming. SendMesg is only called
// once when creating the stream object. io.EOF needs to be skipped when
// the rpc is early finished (before the stream object is created.).
// TODO: It is probably better to move this into the generated code.
if !cs.desc.ClientStreams && cs.desc.ServerStreams {
err = nil
}
return return
} }
if _, ok := err.(transport.ConnectionError); !ok { if _, ok := err.(transport.ConnectionError); !ok {

View File

@ -433,14 +433,21 @@ func TestMaxStreams(t *testing.T) {
} }
done := make(chan struct{}) done := make(chan struct{})
ch := make(chan int) ch := make(chan int)
ready := make(chan struct{})
go func() { go func() {
for { for {
select { select {
case <-time.After(5 * time.Millisecond): case <-time.After(5 * time.Millisecond):
ch <- 0 select {
case ch <- 0:
case <-ready:
return
}
case <-time.After(5 * time.Second): case <-time.After(5 * time.Second):
close(done) close(done)
return return
case <-ready:
return
} }
} }
}() }()
@ -467,6 +474,7 @@ func TestMaxStreams(t *testing.T) {
} }
cc.mu.Unlock() cc.mu.Unlock()
} }
close(ready)
// Close the pending stream so that the streams quota becomes available for the next new stream. // Close the pending stream so that the streams quota becomes available for the next new stream.
ct.CloseStream(s, nil) ct.CloseStream(s, nil)
select { select {
@ -690,7 +698,8 @@ func TestClientWithMisbehavedServer(t *testing.T) {
Host: "localhost", Host: "localhost",
Method: "foo.MaxFrame", Method: "foo.MaxFrame",
} }
for i := 0; i < int(initialConnWindowSize/initialWindowSize+10); i++ { // Make the server flood the traffic to violate flow control window size of the connection.
for {
s, err := ct.NewStream(context.Background(), callHdr) s, err := ct.NewStream(context.Background(), callHdr)
if err != nil { if err != nil {
break break