Merge pull request #633 from iamqizhao/master

Fix window update counting for the canceled streams
This commit is contained in:
Menghan Li 2016-04-12 13:44:11 -07:00
commit 326d66361a
3 changed files with 41 additions and 2 deletions

View File

@ -196,6 +196,28 @@ func (f *inFlow) onData(n uint32) error {
return nil
}
// adjustConnPendingUpdate increments the connection level pending updates by n.
// This is called to make the proper connection level window updates when
// receiving data frame targeting the canceled RPCs.
func (f *inFlow) adjustConnPendingUpdate(n uint32) (uint32, error) {
if n == 0 || f.conn != nil {
return 0, nil
}
f.mu.Lock()
defer f.mu.Unlock()
if f.pendingData+f.pendingUpdate+n > f.limit {
return 0, ConnectionErrorf("received %d-bytes data exceeding the limit %d bytes", f.pendingData+f.pendingUpdate+n, f.limit)
}
f.pendingUpdate += n
if f.pendingUpdate >= f.limit/4 {
ret := f.pendingUpdate
f.pendingUpdate = 0
return ret, nil
}
return 0, nil
}
// connOnRead updates the connection level states when the application consumes data.
func (f *inFlow) connOnRead(n uint32) uint32 {
if n == 0 || f.conn != nil {

View File

@ -571,11 +571,19 @@ func (t *http2Client) updateWindow(s *Stream, n uint32) {
func (t *http2Client) handleData(f *http2.DataFrame) {
// Select the right stream to dispatch.
size := len(f.Data())
s, ok := t.getStream(f)
if !ok {
cwu, err := t.fc.adjustConnPendingUpdate(uint32(size))
if err != nil {
t.notifyError(err)
return
}
if cwu > 0 {
t.controlBuf.put(&windowUpdate{0, cwu})
}
return
}
size := len(f.Data())
if size > 0 {
if err := s.fc.onData(uint32(size)); err != nil {
if _, ok := err.(ConnectionError); ok {

View File

@ -318,11 +318,20 @@ func (t *http2Server) updateWindow(s *Stream, n uint32) {
func (t *http2Server) handleData(f *http2.DataFrame) {
// Select the right stream to dispatch.
size := len(f.Data())
s, ok := t.getStream(f)
if !ok {
cwu, err := t.fc.adjustConnPendingUpdate(uint32(size))
if err != nil {
grpclog.Printf("transport: http2Server %v", err)
t.Close()
return
}
if cwu > 0 {
t.controlBuf.put(&windowUpdate{0, cwu})
}
return
}
size := len(f.Data())
if size > 0 {
if err := s.fc.onData(uint32(size)); err != nil {
if _, ok := err.(ConnectionError); ok {