add some comments

This commit is contained in:
iamqizhao 2015-03-17 16:49:43 -07:00
parent 902316edc6
commit 80ae01071c
2 changed files with 14 additions and 0 deletions

View File

@ -258,6 +258,9 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
EndStream: false,
EndHeaders: endHeaders,
}
// Do a force flush for the buffered frames iff it is the last headers frame
// and there is header metadata to be sent. Otherwise, there is flushing until
// the corresponding data frame is written.
err = t.framer.writeHeaders(hasMD && endHeaders, p)
first = false
} else {
@ -391,15 +394,22 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
if opts.Last && r.Len() == 0 {
endStream = true
}
// Indicate there is a writer who is about to write a data frame.
t.framer.adjustNumWriters(1)
// Got some quota. Try to acquire writing privilege on the transport.
if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil {
if t.framer.adjustNumWriters(-1) == 0 {
// This writer is the last one in this batch and has the
// responsibility to flush the buffered frames. It queues
// a flush request to controlBuf instead of flushing directly
// in order to avoid the race with other writing or flushing.
t.controlBuf.put(&flushIO{})
}
return err
}
if r.Len() == 0 && t.framer.adjustNumWriters(0) == 1 {
// Do a force flush iff this is last frame for the entire gRPC message
// and the caller is the only writer at this moment.
forceFlush = true
}
// If WriteData fails, all the pending streams will be handled

View File

@ -523,6 +523,10 @@ func (t *http2Server) Write(s *Stream, data []byte, opts *Options) error {
// transport.
if _, err := wait(s.ctx, t.shutdownChan, t.writableChan); err != nil {
if t.framer.adjustNumWriters(-1) == 0 {
// This writer is the last one in this batch and has the
// responsibility to flush the buffered frames. It queues
// a flush request to controlBuf instead of flushing directly
// in order to avoid the race with other writing or flushing.
t.controlBuf.put(&flushIO{})
}
return err