mirror of https://github.com/grpc/grpc-go.git
Export changes to OSS. (#1962)
This commit is contained in:
parent
291de7f0ab
commit
d0a21a3347
|
|
@ -1241,7 +1241,20 @@ func (ac *addrConn) transportMonitor() {
|
||||||
// Block until we receive a goaway or an error occurs.
|
// Block until we receive a goaway or an error occurs.
|
||||||
select {
|
select {
|
||||||
case <-t.GoAway():
|
case <-t.GoAway():
|
||||||
|
done := t.Error()
|
||||||
|
cleanup := t.Close
|
||||||
|
// Since this transport will be orphaned (won't have a transportMonitor)
|
||||||
|
// we need to launch a goroutine to keep track of clientConn.Close()
|
||||||
|
// happening since it might not be noticed by any other goroutine for a while.
|
||||||
|
go func() {
|
||||||
|
<-done
|
||||||
|
cleanup()
|
||||||
|
}()
|
||||||
case <-t.Error():
|
case <-t.Error():
|
||||||
|
// In case this is triggered because clientConn.Close()
|
||||||
|
// was called, we want to immeditately close the transport
|
||||||
|
// since no other goroutine might notice it for a while.
|
||||||
|
t.Close()
|
||||||
case <-cdeadline:
|
case <-cdeadline:
|
||||||
ac.mu.Lock()
|
ac.mu.Lock()
|
||||||
// This implies that client received server preface.
|
// This implies that client received server preface.
|
||||||
|
|
|
||||||
|
|
@ -748,7 +748,6 @@ type lazyConn struct {
|
||||||
|
|
||||||
func (l *lazyConn) Write(b []byte) (int, error) {
|
func (l *lazyConn) Write(b []byte) (int, error) {
|
||||||
if atomic.LoadInt32(&(l.beLazy)) == 1 {
|
if atomic.LoadInt32(&(l.beLazy)) == 1 {
|
||||||
// The sleep duration here needs to less than the leakCheck deadline.
|
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Second)
|
||||||
}
|
}
|
||||||
return l.Conn.Write(b)
|
return l.Conn.Write(b)
|
||||||
|
|
@ -963,7 +962,7 @@ func testServerGoAwayPendingRPC(t *testing.T, e env) {
|
||||||
}
|
}
|
||||||
// The existing RPC should be still good to proceed.
|
// The existing RPC should be still good to proceed.
|
||||||
if err := stream.Send(req); err != nil {
|
if err := stream.Send(req); err != nil {
|
||||||
t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
|
t.Fatalf("%v.Send(_) = %v, want <nil>", stream, err)
|
||||||
}
|
}
|
||||||
if _, err := stream.Recv(); err != nil {
|
if _, err := stream.Recv(); err != nil {
|
||||||
t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
|
t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
|
||||||
|
|
@ -3053,7 +3052,6 @@ func testMultipleSetHeaderStreamingRPCError(t *testing.T, e env) {
|
||||||
if !reflect.DeepEqual(header, expectedHeader) {
|
if !reflect.DeepEqual(header, expectedHeader) {
|
||||||
t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
|
t.Fatalf("Received header metadata %v, want %v", header, expectedHeader)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := stream.CloseSend(); err != nil {
|
if err := stream.CloseSend(); err != nil {
|
||||||
t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
|
t.Fatalf("%v.CloseSend() got %v, want %v", stream, err, nil)
|
||||||
}
|
}
|
||||||
|
|
@ -3156,44 +3154,6 @@ func testRetry(t *testing.T, e env) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestRPCTimeout(t *testing.T) {
|
|
||||||
defer leakcheck.Check(t)
|
|
||||||
for _, e := range listTestEnv() {
|
|
||||||
testRPCTimeout(t, e)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO(zhaoq): Have a better test coverage of timeout and cancellation mechanism.
|
|
||||||
func testRPCTimeout(t *testing.T, e env) {
|
|
||||||
te := newTest(t, e)
|
|
||||||
te.startServer(&testServer{security: e.security, unaryCallSleepTime: 50 * time.Millisecond})
|
|
||||||
defer te.tearDown()
|
|
||||||
|
|
||||||
cc := te.clientConn()
|
|
||||||
tc := testpb.NewTestServiceClient(cc)
|
|
||||||
|
|
||||||
const argSize = 2718
|
|
||||||
const respSize = 314
|
|
||||||
|
|
||||||
payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
req := &testpb.SimpleRequest{
|
|
||||||
ResponseType: testpb.PayloadType_COMPRESSABLE,
|
|
||||||
ResponseSize: respSize,
|
|
||||||
Payload: payload,
|
|
||||||
}
|
|
||||||
for i := -1; i <= 10; i++ {
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(i)*time.Millisecond)
|
|
||||||
if _, err := tc.UnaryCall(ctx, req); status.Code(err) != codes.DeadlineExceeded {
|
|
||||||
t.Fatalf("TestService/UnaryCallv(_, _) = _, %v; want <nil>, error code: %s", err, codes.DeadlineExceeded)
|
|
||||||
}
|
|
||||||
cancel()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestCancel(t *testing.T) {
|
func TestCancel(t *testing.T) {
|
||||||
defer leakcheck.Check(t)
|
defer leakcheck.Check(t)
|
||||||
for _, e := range listTestEnv() {
|
for _, e := range listTestEnv() {
|
||||||
|
|
@ -3687,7 +3647,7 @@ func testClientStreaming(t *testing.T, e env, sizes []int) {
|
||||||
Payload: payload,
|
Payload: payload,
|
||||||
}
|
}
|
||||||
if err := stream.Send(req); err != nil {
|
if err := stream.Send(req); err != nil {
|
||||||
t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
|
t.Fatalf("%v.Send(_) = %v, want <nil>", stream, err)
|
||||||
}
|
}
|
||||||
sum += s
|
sum += s
|
||||||
}
|
}
|
||||||
|
|
@ -5078,7 +5038,7 @@ func TestTapTimeout(t *testing.T) {
|
||||||
ss := &stubServer{
|
ss := &stubServer{
|
||||||
emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
|
emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
|
||||||
<-ctx.Done()
|
<-ctx.Done()
|
||||||
return &testpb.Empty{}, nil
|
return nil, status.Errorf(codes.Canceled, ctx.Err().Error())
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
if err := ss.Start(sopts); err != nil {
|
if err := ss.Start(sopts); err != nil {
|
||||||
|
|
@ -6218,3 +6178,40 @@ func TestFailFastRPCErrorOnBadCertificates(t *testing.T) {
|
||||||
}
|
}
|
||||||
te.t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want err.Error() contains %q", err, clientAlwaysFailCredErrorMsg)
|
te.t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want err.Error() contains %q", err, clientAlwaysFailCredErrorMsg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRPCTimeout(t *testing.T) {
|
||||||
|
defer leakcheck.Check(t)
|
||||||
|
for _, e := range listTestEnv() {
|
||||||
|
testRPCTimeout(t, e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func testRPCTimeout(t *testing.T, e env) {
|
||||||
|
te := newTest(t, e)
|
||||||
|
te.startServer(&testServer{security: e.security, unaryCallSleepTime: 500 * time.Millisecond})
|
||||||
|
defer te.tearDown()
|
||||||
|
|
||||||
|
cc := te.clientConn()
|
||||||
|
tc := testpb.NewTestServiceClient(cc)
|
||||||
|
|
||||||
|
const argSize = 2718
|
||||||
|
const respSize = 314
|
||||||
|
|
||||||
|
payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
req := &testpb.SimpleRequest{
|
||||||
|
ResponseType: testpb.PayloadType_COMPRESSABLE,
|
||||||
|
ResponseSize: respSize,
|
||||||
|
Payload: payload,
|
||||||
|
}
|
||||||
|
for i := -1; i <= 10; i++ {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(i)*time.Millisecond)
|
||||||
|
if _, err := tc.UnaryCall(ctx, req); status.Code(err) != codes.DeadlineExceeded {
|
||||||
|
t.Fatalf("TestService/UnaryCallv(_, _) = _, %v; want <nil>, error code: %s", err, codes.DeadlineExceeded)
|
||||||
|
}
|
||||||
|
cancel()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,758 @@
|
||||||
|
/*
|
||||||
|
*
|
||||||
|
* Copyright 2014 gRPC authors.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
package transport
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"fmt"
|
||||||
|
"runtime"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"golang.org/x/net/http2"
|
||||||
|
"golang.org/x/net/http2/hpack"
|
||||||
|
)
|
||||||
|
|
||||||
|
type itemNode struct {
|
||||||
|
it interface{}
|
||||||
|
next *itemNode
|
||||||
|
}
|
||||||
|
|
||||||
|
type itemList struct {
|
||||||
|
head *itemNode
|
||||||
|
tail *itemNode
|
||||||
|
}
|
||||||
|
|
||||||
|
func (il *itemList) enqueue(i interface{}) {
|
||||||
|
n := &itemNode{it: i}
|
||||||
|
if il.tail == nil {
|
||||||
|
il.head, il.tail = n, n
|
||||||
|
return
|
||||||
|
}
|
||||||
|
il.tail.next = n
|
||||||
|
il.tail = n
|
||||||
|
}
|
||||||
|
|
||||||
|
// peek returns the first item in the list without removing it from the
|
||||||
|
// list.
|
||||||
|
func (il *itemList) peek() interface{} {
|
||||||
|
return il.head.it
|
||||||
|
}
|
||||||
|
|
||||||
|
func (il *itemList) dequeue() interface{} {
|
||||||
|
if il.head == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
i := il.head.it
|
||||||
|
il.head = il.head.next
|
||||||
|
if il.head == nil {
|
||||||
|
il.tail = nil
|
||||||
|
}
|
||||||
|
return i
|
||||||
|
}
|
||||||
|
|
||||||
|
func (il *itemList) dequeueAll() *itemNode {
|
||||||
|
h := il.head
|
||||||
|
il.head, il.tail = nil, nil
|
||||||
|
return h
|
||||||
|
}
|
||||||
|
|
||||||
|
func (il *itemList) isEmpty() bool {
|
||||||
|
return il.head == nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// The following defines various control items which could flow through
|
||||||
|
// the control buffer of transport. They represent different aspects of
|
||||||
|
// control tasks, e.g., flow control, settings, streaming resetting, etc.
|
||||||
|
|
||||||
|
type headerFrame struct {
|
||||||
|
streamID uint32
|
||||||
|
hf []hpack.HeaderField
|
||||||
|
endStream bool // Valid on server side.
|
||||||
|
initStream func(uint32) (bool, error) // Used only on the client side.
|
||||||
|
onWrite func()
|
||||||
|
wq *writeQuota // write quota for the stream created.
|
||||||
|
cleanup *cleanupStream // Valid on the server side.
|
||||||
|
onOrphaned func(error) // Valid on client-side
|
||||||
|
}
|
||||||
|
|
||||||
|
type cleanupStream struct {
|
||||||
|
streamID uint32
|
||||||
|
idPtr *uint32
|
||||||
|
rst bool
|
||||||
|
rstCode http2.ErrCode
|
||||||
|
onWrite func()
|
||||||
|
}
|
||||||
|
|
||||||
|
type dataFrame struct {
|
||||||
|
streamID uint32
|
||||||
|
endStream bool
|
||||||
|
h []byte
|
||||||
|
d []byte
|
||||||
|
// onEachWrite is called every time
|
||||||
|
// a part of d is written out.
|
||||||
|
onEachWrite func()
|
||||||
|
}
|
||||||
|
|
||||||
|
type incomingWindowUpdate struct {
|
||||||
|
streamID uint32
|
||||||
|
increment uint32
|
||||||
|
}
|
||||||
|
|
||||||
|
type outgoingWindowUpdate struct {
|
||||||
|
streamID uint32
|
||||||
|
increment uint32
|
||||||
|
}
|
||||||
|
|
||||||
|
type incomingSettings struct {
|
||||||
|
ss []http2.Setting
|
||||||
|
}
|
||||||
|
|
||||||
|
type outgoingSettings struct {
|
||||||
|
ss []http2.Setting
|
||||||
|
}
|
||||||
|
|
||||||
|
type settingsAck struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
type incomingGoAway struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
type goAway struct {
|
||||||
|
code http2.ErrCode
|
||||||
|
debugData []byte
|
||||||
|
headsUp bool
|
||||||
|
closeConn bool
|
||||||
|
}
|
||||||
|
|
||||||
|
type ping struct {
|
||||||
|
ack bool
|
||||||
|
data [8]byte
|
||||||
|
}
|
||||||
|
|
||||||
|
type outStreamState int
|
||||||
|
|
||||||
|
const (
|
||||||
|
active outStreamState = iota
|
||||||
|
empty
|
||||||
|
waitingOnStreamQuota
|
||||||
|
)
|
||||||
|
|
||||||
|
type outStream struct {
|
||||||
|
id uint32
|
||||||
|
state outStreamState
|
||||||
|
itl *itemList
|
||||||
|
bytesOutStanding int
|
||||||
|
wq *writeQuota
|
||||||
|
|
||||||
|
next *outStream
|
||||||
|
prev *outStream
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *outStream) deleteSelf() {
|
||||||
|
if s.prev != nil {
|
||||||
|
s.prev.next = s.next
|
||||||
|
}
|
||||||
|
if s.next != nil {
|
||||||
|
s.next.prev = s.prev
|
||||||
|
}
|
||||||
|
s.next, s.prev = nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type outStreamList struct {
|
||||||
|
// Following are sentinal objects that mark the
|
||||||
|
// beginning and end of the list. They do not
|
||||||
|
// contain any item lists. All valid objects are
|
||||||
|
// inserted in between them.
|
||||||
|
// This is needed so that an outStream object can
|
||||||
|
// deleteSelf() in O(1) time without knowing which
|
||||||
|
// list it belongs to.
|
||||||
|
head *outStream
|
||||||
|
tail *outStream
|
||||||
|
}
|
||||||
|
|
||||||
|
func newOutStreamList() *outStreamList {
|
||||||
|
head, tail := new(outStream), new(outStream)
|
||||||
|
head.next = tail
|
||||||
|
tail.prev = head
|
||||||
|
return &outStreamList{
|
||||||
|
head: head,
|
||||||
|
tail: tail,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *outStreamList) enqueue(s *outStream) {
|
||||||
|
e := l.tail.prev
|
||||||
|
e.next = s
|
||||||
|
s.prev = e
|
||||||
|
s.next = l.tail
|
||||||
|
l.tail.prev = s
|
||||||
|
}
|
||||||
|
|
||||||
|
// remove from the beginning of the list.
|
||||||
|
func (l *outStreamList) dequeue() *outStream {
|
||||||
|
b := l.head.next
|
||||||
|
if b == l.tail {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
b.deleteSelf()
|
||||||
|
return b
|
||||||
|
}
|
||||||
|
|
||||||
|
type controlBuffer struct {
|
||||||
|
ch chan struct{}
|
||||||
|
done <-chan struct{}
|
||||||
|
mu sync.Mutex
|
||||||
|
consumerWaiting bool
|
||||||
|
list *itemList
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
func newControlBuffer(done <-chan struct{}) *controlBuffer {
|
||||||
|
return &controlBuffer{
|
||||||
|
ch: make(chan struct{}, 1),
|
||||||
|
list: &itemList{},
|
||||||
|
done: done,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *controlBuffer) put(it interface{}) error {
|
||||||
|
_, err := c.executeAndPut(nil, it)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it interface{}) (bool, error) {
|
||||||
|
var wakeUp bool
|
||||||
|
c.mu.Lock()
|
||||||
|
if c.err != nil {
|
||||||
|
c.mu.Unlock()
|
||||||
|
return false, c.err
|
||||||
|
}
|
||||||
|
if f != nil {
|
||||||
|
if !f(it) { // f wasn't successful
|
||||||
|
c.mu.Unlock()
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if c.consumerWaiting {
|
||||||
|
wakeUp = true
|
||||||
|
c.consumerWaiting = false
|
||||||
|
}
|
||||||
|
c.list.enqueue(it)
|
||||||
|
c.mu.Unlock()
|
||||||
|
if wakeUp {
|
||||||
|
select {
|
||||||
|
case c.ch <- struct{}{}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *controlBuffer) get(block bool) (interface{}, error) {
|
||||||
|
for {
|
||||||
|
c.mu.Lock()
|
||||||
|
if c.err != nil {
|
||||||
|
c.mu.Unlock()
|
||||||
|
return nil, c.err
|
||||||
|
}
|
||||||
|
if !c.list.isEmpty() {
|
||||||
|
h := c.list.dequeue()
|
||||||
|
c.mu.Unlock()
|
||||||
|
return h, nil
|
||||||
|
}
|
||||||
|
if !block {
|
||||||
|
c.mu.Unlock()
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
c.consumerWaiting = true
|
||||||
|
c.mu.Unlock()
|
||||||
|
select {
|
||||||
|
case <-c.ch:
|
||||||
|
case <-c.done:
|
||||||
|
c.finish()
|
||||||
|
return nil, ErrConnClosing
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *controlBuffer) finish() {
|
||||||
|
c.mu.Lock()
|
||||||
|
if c.err != nil {
|
||||||
|
c.mu.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
c.err = ErrConnClosing
|
||||||
|
// There may be headers for streams in the control buffer.
|
||||||
|
// These streams need to be cleaned out since the transport
|
||||||
|
// is still not aware of these yet.
|
||||||
|
for head := c.list.dequeueAll(); head != nil; head = head.next {
|
||||||
|
hdr, ok := head.it.(*headerFrame)
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if hdr.onOrphaned != nil { // It will be nil on the server-side.
|
||||||
|
hdr.onOrphaned(ErrConnClosing)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
c.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
type side int
|
||||||
|
|
||||||
|
const (
|
||||||
|
clientSide side = iota
|
||||||
|
serverSide
|
||||||
|
)
|
||||||
|
|
||||||
|
type loopyWriter struct {
|
||||||
|
side side
|
||||||
|
cbuf *controlBuffer
|
||||||
|
sendQuota uint32
|
||||||
|
oiws uint32 // outbound initial window size.
|
||||||
|
estdStreams map[uint32]*outStream // Established streams.
|
||||||
|
activeStreams *outStreamList // Streams that are sending data.
|
||||||
|
framer *framer
|
||||||
|
hBuf *bytes.Buffer // The buffer for HPACK encoding.
|
||||||
|
hEnc *hpack.Encoder // HPACK encoder.
|
||||||
|
bdpEst *bdpEstimator
|
||||||
|
draining bool
|
||||||
|
|
||||||
|
// Side-specific handlers
|
||||||
|
ssGoAwayHandler func(*goAway) (bool, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimator) *loopyWriter {
|
||||||
|
var buf bytes.Buffer
|
||||||
|
l := &loopyWriter{
|
||||||
|
side: s,
|
||||||
|
cbuf: cbuf,
|
||||||
|
sendQuota: defaultWindowSize,
|
||||||
|
oiws: defaultWindowSize,
|
||||||
|
estdStreams: make(map[uint32]*outStream),
|
||||||
|
activeStreams: newOutStreamList(),
|
||||||
|
framer: fr,
|
||||||
|
hBuf: &buf,
|
||||||
|
hEnc: hpack.NewEncoder(&buf),
|
||||||
|
bdpEst: bdpEst,
|
||||||
|
}
|
||||||
|
return l
|
||||||
|
}
|
||||||
|
|
||||||
|
const minBatchSize = 1000
|
||||||
|
|
||||||
|
// run should be run in a separate goroutine.
|
||||||
|
func (l *loopyWriter) run() {
|
||||||
|
var (
|
||||||
|
it interface{}
|
||||||
|
err error
|
||||||
|
isEmpty bool
|
||||||
|
)
|
||||||
|
defer func() {
|
||||||
|
errorf("transport: loopyWriter.run returning. Err: %v", err)
|
||||||
|
}()
|
||||||
|
for {
|
||||||
|
it, err = l.cbuf.get(true)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err = l.handle(it); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if _, err = l.processData(); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
gosched := true
|
||||||
|
hasdata:
|
||||||
|
for {
|
||||||
|
it, err = l.cbuf.get(false)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if it != nil {
|
||||||
|
if err = l.handle(it); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if _, err = l.processData(); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
continue hasdata
|
||||||
|
}
|
||||||
|
if isEmpty, err = l.processData(); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !isEmpty {
|
||||||
|
continue hasdata
|
||||||
|
}
|
||||||
|
if gosched {
|
||||||
|
gosched = false
|
||||||
|
if l.framer.writer.offset < minBatchSize {
|
||||||
|
runtime.Gosched()
|
||||||
|
continue hasdata
|
||||||
|
}
|
||||||
|
}
|
||||||
|
l.framer.writer.Flush()
|
||||||
|
break hasdata
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *loopyWriter) outgoingWindowUpdateHandler(w *outgoingWindowUpdate) error {
|
||||||
|
return l.framer.fr.WriteWindowUpdate(w.streamID, w.increment)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *loopyWriter) incomingWindowUpdateHandler(w *incomingWindowUpdate) error {
|
||||||
|
// Otherwise update the quota.
|
||||||
|
if w.streamID == 0 {
|
||||||
|
l.sendQuota += w.increment
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
// Find the stream and update it.
|
||||||
|
if str, ok := l.estdStreams[w.streamID]; ok {
|
||||||
|
str.bytesOutStanding -= int(w.increment)
|
||||||
|
if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota > 0 && str.state == waitingOnStreamQuota {
|
||||||
|
str.state = active
|
||||||
|
l.activeStreams.enqueue(str)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *loopyWriter) outgoingSettingsHandler(s *outgoingSettings) error {
|
||||||
|
return l.framer.fr.WriteSettings(s.ss...)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *loopyWriter) incomingSettingsHandler(s *incomingSettings) error {
|
||||||
|
if err := l.applySettings(s.ss); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return l.framer.fr.WriteSettingsAck()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *loopyWriter) headerHandler(h *headerFrame) error {
|
||||||
|
if l.side == serverSide {
|
||||||
|
if h.endStream { // Case 1.A: Server wants to close stream.
|
||||||
|
// Make sure it's not a trailers only response.
|
||||||
|
if str, ok := l.estdStreams[h.streamID]; ok {
|
||||||
|
if str.state != empty { // either active or waiting on stream quota.
|
||||||
|
// add it str's list of items.
|
||||||
|
str.itl.enqueue(h)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if err := l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return l.cleanupStreamHandler(h.cleanup)
|
||||||
|
}
|
||||||
|
// Case 1.B: Server is responding back with headers.
|
||||||
|
str := &outStream{
|
||||||
|
state: empty,
|
||||||
|
itl: &itemList{},
|
||||||
|
wq: h.wq,
|
||||||
|
}
|
||||||
|
l.estdStreams[h.streamID] = str
|
||||||
|
return l.writeHeader(h.streamID, h.endStream, h.hf, h.onWrite)
|
||||||
|
}
|
||||||
|
// Case 2: Client wants to originate stream.
|
||||||
|
str := &outStream{
|
||||||
|
id: h.streamID,
|
||||||
|
state: empty,
|
||||||
|
itl: &itemList{},
|
||||||
|
wq: h.wq,
|
||||||
|
}
|
||||||
|
str.itl.enqueue(h)
|
||||||
|
return l.originateStream(str)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *loopyWriter) originateStream(str *outStream) error {
|
||||||
|
hdr := str.itl.dequeue().(*headerFrame)
|
||||||
|
sendPing, err := hdr.initStream(str.id)
|
||||||
|
if err != nil {
|
||||||
|
if err == ErrConnClosing {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// Other errors(errStreamDrain) need not close transport.
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err = l.writeHeader(str.id, hdr.endStream, hdr.hf, hdr.onWrite); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
l.estdStreams[str.id] = str
|
||||||
|
if sendPing {
|
||||||
|
return l.pingHandler(&ping{data: [8]byte{}})
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *loopyWriter) writeHeader(streamID uint32, endStream bool, hf []hpack.HeaderField, onWrite func()) error {
|
||||||
|
if onWrite != nil {
|
||||||
|
onWrite()
|
||||||
|
}
|
||||||
|
l.hBuf.Reset()
|
||||||
|
for _, f := range hf {
|
||||||
|
if err := l.hEnc.WriteField(f); err != nil {
|
||||||
|
warningf("transport: loopyWriter.writeHeader encountered error while encoding headers:", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
var (
|
||||||
|
err error
|
||||||
|
endHeaders, first bool
|
||||||
|
)
|
||||||
|
first = true
|
||||||
|
for !endHeaders {
|
||||||
|
size := l.hBuf.Len()
|
||||||
|
if size > http2MaxFrameLen {
|
||||||
|
size = http2MaxFrameLen
|
||||||
|
} else {
|
||||||
|
endHeaders = true
|
||||||
|
}
|
||||||
|
if first {
|
||||||
|
first = false
|
||||||
|
err = l.framer.fr.WriteHeaders(http2.HeadersFrameParam{
|
||||||
|
StreamID: streamID,
|
||||||
|
BlockFragment: l.hBuf.Next(size),
|
||||||
|
EndStream: endStream,
|
||||||
|
EndHeaders: endHeaders,
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
err = l.framer.fr.WriteContinuation(
|
||||||
|
streamID,
|
||||||
|
endHeaders,
|
||||||
|
l.hBuf.Next(size),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *loopyWriter) preprocessData(df *dataFrame) error {
|
||||||
|
str, ok := l.estdStreams[df.streamID]
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
// If we got data for a stream it means that
|
||||||
|
// stream was originated and the headers were sent out.
|
||||||
|
str.itl.enqueue(df)
|
||||||
|
if str.state == empty {
|
||||||
|
str.state = active
|
||||||
|
l.activeStreams.enqueue(str)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *loopyWriter) pingHandler(p *ping) error {
|
||||||
|
if !p.ack {
|
||||||
|
l.bdpEst.timesnap(p.data)
|
||||||
|
}
|
||||||
|
return l.framer.fr.WritePing(p.ack, p.data)
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *loopyWriter) cleanupStreamHandler(c *cleanupStream) error {
|
||||||
|
c.onWrite()
|
||||||
|
if str, ok := l.estdStreams[c.streamID]; ok {
|
||||||
|
// On the server side it could be a trailers-only response or
|
||||||
|
// a RST_STREAM before stream initialization thus the stream might
|
||||||
|
// not be established yet.
|
||||||
|
delete(l.estdStreams, c.streamID)
|
||||||
|
str.deleteSelf()
|
||||||
|
}
|
||||||
|
if c.rst { // If RST_STREAM needs to be sent.
|
||||||
|
if err := l.framer.fr.WriteRSTStream(c.streamID, c.rstCode); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if l.side == clientSide && l.draining && len(l.estdStreams) == 0 {
|
||||||
|
return ErrConnClosing
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *loopyWriter) incomingGoAwayHandler(*incomingGoAway) error {
|
||||||
|
if l.side == clientSide {
|
||||||
|
l.draining = true
|
||||||
|
if len(l.estdStreams) == 0 {
|
||||||
|
return ErrConnClosing
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *loopyWriter) goAwayHandler(g *goAway) error {
|
||||||
|
// Handling of outgoing GoAway is very specific to side.
|
||||||
|
if l.ssGoAwayHandler != nil {
|
||||||
|
draining, err := l.ssGoAwayHandler(g)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
l.draining = draining
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *loopyWriter) handle(i interface{}) error {
|
||||||
|
switch i := i.(type) {
|
||||||
|
case *incomingWindowUpdate:
|
||||||
|
return l.incomingWindowUpdateHandler(i)
|
||||||
|
case *outgoingWindowUpdate:
|
||||||
|
return l.outgoingWindowUpdateHandler(i)
|
||||||
|
case *incomingSettings:
|
||||||
|
return l.incomingSettingsHandler(i)
|
||||||
|
case *outgoingSettings:
|
||||||
|
return l.outgoingSettingsHandler(i)
|
||||||
|
case *headerFrame:
|
||||||
|
return l.headerHandler(i)
|
||||||
|
case *cleanupStream:
|
||||||
|
return l.cleanupStreamHandler(i)
|
||||||
|
case *incomingGoAway:
|
||||||
|
return l.incomingGoAwayHandler(i)
|
||||||
|
case *dataFrame:
|
||||||
|
return l.preprocessData(i)
|
||||||
|
case *ping:
|
||||||
|
return l.pingHandler(i)
|
||||||
|
case *goAway:
|
||||||
|
return l.goAwayHandler(i)
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("transport: unknown control message type %T", i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *loopyWriter) applySettings(ss []http2.Setting) error {
|
||||||
|
for _, s := range ss {
|
||||||
|
switch s.ID {
|
||||||
|
case http2.SettingInitialWindowSize:
|
||||||
|
o := l.oiws
|
||||||
|
l.oiws = s.Val
|
||||||
|
if o < l.oiws {
|
||||||
|
// If the new limit is greater make all depleted streams active.
|
||||||
|
for _, stream := range l.estdStreams {
|
||||||
|
if stream.state == waitingOnStreamQuota {
|
||||||
|
stream.state = active
|
||||||
|
l.activeStreams.enqueue(stream)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *loopyWriter) processData() (bool, error) {
|
||||||
|
if l.sendQuota == 0 {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
str := l.activeStreams.dequeue()
|
||||||
|
if str == nil {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
dataItem := str.itl.peek().(*dataFrame)
|
||||||
|
if len(dataItem.h) == 0 && len(dataItem.d) == 0 {
|
||||||
|
// Client sends out empty data frame with endStream = true
|
||||||
|
if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
str.itl.dequeue()
|
||||||
|
if str.itl.isEmpty() {
|
||||||
|
str.state = empty
|
||||||
|
} else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers.
|
||||||
|
if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
l.activeStreams.enqueue(str)
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
var (
|
||||||
|
idx int
|
||||||
|
buf []byte
|
||||||
|
)
|
||||||
|
if len(dataItem.h) != 0 { // data header has not been written out yet.
|
||||||
|
buf = dataItem.h
|
||||||
|
} else {
|
||||||
|
idx = 1
|
||||||
|
buf = dataItem.d
|
||||||
|
}
|
||||||
|
size := http2MaxFrameLen
|
||||||
|
if len(buf) < size {
|
||||||
|
size = len(buf)
|
||||||
|
}
|
||||||
|
if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota <= 0 {
|
||||||
|
str.state = waitingOnStreamQuota
|
||||||
|
return false, nil
|
||||||
|
} else if strQuota < size {
|
||||||
|
size = strQuota
|
||||||
|
}
|
||||||
|
|
||||||
|
if l.sendQuota < uint32(size) {
|
||||||
|
size = int(l.sendQuota)
|
||||||
|
}
|
||||||
|
// Now that outgoing flow controls are checked we can replenish str's write quota
|
||||||
|
str.wq.replenish(size)
|
||||||
|
var endStream bool
|
||||||
|
// This last data message on this stream and all
|
||||||
|
// of it can be written in this go.
|
||||||
|
if dataItem.endStream && size == len(buf) {
|
||||||
|
// buf contains either data or it contains header but data is empty.
|
||||||
|
if idx == 1 || len(dataItem.d) == 0 {
|
||||||
|
endStream = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if dataItem.onEachWrite != nil {
|
||||||
|
dataItem.onEachWrite()
|
||||||
|
}
|
||||||
|
if err := l.framer.fr.WriteData(dataItem.streamID, endStream, buf[:size]); err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
buf = buf[size:]
|
||||||
|
str.bytesOutStanding += size
|
||||||
|
l.sendQuota -= uint32(size)
|
||||||
|
if idx == 0 {
|
||||||
|
dataItem.h = buf
|
||||||
|
} else {
|
||||||
|
dataItem.d = buf
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // All the data from that message was written out.
|
||||||
|
str.itl.dequeue()
|
||||||
|
}
|
||||||
|
if str.itl.isEmpty() {
|
||||||
|
str.state = empty
|
||||||
|
} else if trailer, ok := str.itl.peek().(*headerFrame); ok { // The next item is trailers.
|
||||||
|
if err := l.writeHeader(trailer.streamID, trailer.endStream, trailer.hf, trailer.onWrite); err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
if err := l.cleanupStreamHandler(trailer.cleanup); err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
} else if int(l.oiws)-str.bytesOutStanding <= 0 { // Ran out of stream quota.
|
||||||
|
str.state = waitingOnStreamQuota
|
||||||
|
} else { // Otherwise add it back to the list of active streams.
|
||||||
|
l.activeStreams.enqueue(str)
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
@ -20,13 +20,10 @@ package transport
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
|
||||||
"math"
|
"math"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"golang.org/x/net/http2"
|
|
||||||
"golang.org/x/net/http2/hpack"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|
@ -46,192 +43,86 @@ const (
|
||||||
defaultKeepalivePolicyMinTime = time.Duration(5 * time.Minute)
|
defaultKeepalivePolicyMinTime = time.Duration(5 * time.Minute)
|
||||||
// max window limit set by HTTP2 Specs.
|
// max window limit set by HTTP2 Specs.
|
||||||
maxWindowSize = math.MaxInt32
|
maxWindowSize = math.MaxInt32
|
||||||
// defaultLocalSendQuota sets is default value for number of data
|
// defaultWriteQuota is the default value for number of data
|
||||||
// bytes that each stream can schedule before some of it being
|
// bytes that each stream can schedule before some of it being
|
||||||
// flushed out.
|
// flushed out.
|
||||||
defaultLocalSendQuota = 128 * 1024
|
defaultWriteQuota = 64 * 1024
|
||||||
)
|
)
|
||||||
|
|
||||||
// The following defines various control items which could flow through
|
// writeQuota is a soft limit on the amount of data a stream can
|
||||||
// the control buffer of transport. They represent different aspects of
|
// schedule before some of it is written out.
|
||||||
// control tasks, e.g., flow control, settings, streaming resetting, etc.
|
type writeQuota struct {
|
||||||
|
quota int32
|
||||||
type headerFrame struct {
|
// get waits on read from when quota goes less than or equal to zero.
|
||||||
streamID uint32
|
// replenish writes on it when quota goes positive again.
|
||||||
hf []hpack.HeaderField
|
ch chan struct{}
|
||||||
endStream bool
|
// done is triggered in error case.
|
||||||
|
done <-chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (*headerFrame) item() {}
|
func newWriteQuota(sz int32, done <-chan struct{}) *writeQuota {
|
||||||
|
return &writeQuota{
|
||||||
type continuationFrame struct {
|
quota: sz,
|
||||||
streamID uint32
|
ch: make(chan struct{}, 1),
|
||||||
endHeaders bool
|
done: done,
|
||||||
headerBlockFragment []byte
|
|
||||||
}
|
|
||||||
|
|
||||||
type dataFrame struct {
|
|
||||||
streamID uint32
|
|
||||||
endStream bool
|
|
||||||
d []byte
|
|
||||||
f func()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (*dataFrame) item() {}
|
|
||||||
|
|
||||||
func (*continuationFrame) item() {}
|
|
||||||
|
|
||||||
type windowUpdate struct {
|
|
||||||
streamID uint32
|
|
||||||
increment uint32
|
|
||||||
}
|
|
||||||
|
|
||||||
func (*windowUpdate) item() {}
|
|
||||||
|
|
||||||
type settings struct {
|
|
||||||
ss []http2.Setting
|
|
||||||
}
|
|
||||||
|
|
||||||
func (*settings) item() {}
|
|
||||||
|
|
||||||
type settingsAck struct {
|
|
||||||
}
|
|
||||||
|
|
||||||
func (*settingsAck) item() {}
|
|
||||||
|
|
||||||
type resetStream struct {
|
|
||||||
streamID uint32
|
|
||||||
code http2.ErrCode
|
|
||||||
}
|
|
||||||
|
|
||||||
func (*resetStream) item() {}
|
|
||||||
|
|
||||||
type goAway struct {
|
|
||||||
code http2.ErrCode
|
|
||||||
debugData []byte
|
|
||||||
headsUp bool
|
|
||||||
closeConn bool
|
|
||||||
}
|
|
||||||
|
|
||||||
func (*goAway) item() {}
|
|
||||||
|
|
||||||
type flushIO struct {
|
|
||||||
closeTr bool
|
|
||||||
}
|
|
||||||
|
|
||||||
func (*flushIO) item() {}
|
|
||||||
|
|
||||||
type ping struct {
|
|
||||||
ack bool
|
|
||||||
data [8]byte
|
|
||||||
}
|
|
||||||
|
|
||||||
func (*ping) item() {}
|
|
||||||
|
|
||||||
// quotaPool is a pool which accumulates the quota and sends it to acquire()
|
|
||||||
// when it is available.
|
|
||||||
type quotaPool struct {
|
|
||||||
mu sync.Mutex
|
|
||||||
c chan struct{}
|
|
||||||
version uint32
|
|
||||||
quota int
|
|
||||||
}
|
|
||||||
|
|
||||||
// newQuotaPool creates a quotaPool which has quota q available to consume.
|
|
||||||
func newQuotaPool(q int) *quotaPool {
|
|
||||||
qb := "aPool{
|
|
||||||
quota: q,
|
|
||||||
c: make(chan struct{}, 1),
|
|
||||||
}
|
|
||||||
return qb
|
|
||||||
}
|
|
||||||
|
|
||||||
// add cancels the pending quota sent on acquired, incremented by v and sends
|
|
||||||
// it back on acquire.
|
|
||||||
func (qb *quotaPool) add(v int) {
|
|
||||||
qb.mu.Lock()
|
|
||||||
defer qb.mu.Unlock()
|
|
||||||
qb.lockedAdd(v)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (qb *quotaPool) lockedAdd(v int) {
|
|
||||||
var wakeUp bool
|
|
||||||
if qb.quota <= 0 {
|
|
||||||
wakeUp = true // Wake up potential waiters.
|
|
||||||
}
|
|
||||||
qb.quota += v
|
|
||||||
if wakeUp && qb.quota > 0 {
|
|
||||||
select {
|
|
||||||
case qb.c <- struct{}{}:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (qb *quotaPool) addAndUpdate(v int) {
|
func (w *writeQuota) get(sz int32) error {
|
||||||
qb.mu.Lock()
|
|
||||||
qb.lockedAdd(v)
|
|
||||||
qb.version++
|
|
||||||
qb.mu.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (qb *quotaPool) get(v int, wc waiters) (int, uint32, error) {
|
|
||||||
qb.mu.Lock()
|
|
||||||
if qb.quota > 0 {
|
|
||||||
if v > qb.quota {
|
|
||||||
v = qb.quota
|
|
||||||
}
|
|
||||||
qb.quota -= v
|
|
||||||
ver := qb.version
|
|
||||||
qb.mu.Unlock()
|
|
||||||
return v, ver, nil
|
|
||||||
}
|
|
||||||
qb.mu.Unlock()
|
|
||||||
for {
|
for {
|
||||||
select {
|
if atomic.LoadInt32(&w.quota) > 0 {
|
||||||
case <-wc.ctx.Done():
|
atomic.AddInt32(&w.quota, -sz)
|
||||||
return 0, 0, ContextErr(wc.ctx.Err())
|
return nil
|
||||||
case <-wc.tctx.Done():
|
|
||||||
return 0, 0, ErrConnClosing
|
|
||||||
case <-wc.done:
|
|
||||||
return 0, 0, io.EOF
|
|
||||||
case <-wc.goAway:
|
|
||||||
return 0, 0, errStreamDrain
|
|
||||||
case <-qb.c:
|
|
||||||
qb.mu.Lock()
|
|
||||||
if qb.quota > 0 {
|
|
||||||
if v > qb.quota {
|
|
||||||
v = qb.quota
|
|
||||||
}
|
}
|
||||||
qb.quota -= v
|
|
||||||
ver := qb.version
|
|
||||||
if qb.quota > 0 {
|
|
||||||
select {
|
select {
|
||||||
case qb.c <- struct{}{}:
|
case <-w.ch:
|
||||||
|
continue
|
||||||
|
case <-w.done:
|
||||||
|
return errStreamDone
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *writeQuota) replenish(n int) {
|
||||||
|
sz := int32(n)
|
||||||
|
a := atomic.AddInt32(&w.quota, sz)
|
||||||
|
b := a - sz
|
||||||
|
if b <= 0 && a > 0 {
|
||||||
|
select {
|
||||||
|
case w.ch <- struct{}{}:
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
qb.mu.Unlock()
|
|
||||||
return v, ver, nil
|
|
||||||
|
|
||||||
}
|
|
||||||
qb.mu.Unlock()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (qb *quotaPool) compareAndExecute(version uint32, success, failure func()) bool {
|
type trInFlow struct {
|
||||||
qb.mu.Lock()
|
limit uint32
|
||||||
if version == qb.version {
|
unacked uint32
|
||||||
success()
|
|
||||||
qb.mu.Unlock()
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
failure()
|
|
||||||
qb.mu.Unlock()
|
|
||||||
return false
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (f *trInFlow) newLimit(n uint32) uint32 {
|
||||||
|
d := n - f.limit
|
||||||
|
f.limit = n
|
||||||
|
return d
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *trInFlow) onData(n uint32) uint32 {
|
||||||
|
f.unacked += n
|
||||||
|
if f.unacked >= f.limit/4 {
|
||||||
|
w := f.unacked
|
||||||
|
f.unacked = 0
|
||||||
|
return w
|
||||||
|
}
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *trInFlow) reset() uint32 {
|
||||||
|
w := f.unacked
|
||||||
|
f.unacked = 0
|
||||||
|
return w
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(mmukhi): Simplify this code.
|
||||||
// inFlow deals with inbound flow control
|
// inFlow deals with inbound flow control
|
||||||
type inFlow struct {
|
type inFlow struct {
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
|
|
@ -252,9 +143,9 @@ type inFlow struct {
|
||||||
// It assumes that n is always greater than the old limit.
|
// It assumes that n is always greater than the old limit.
|
||||||
func (f *inFlow) newLimit(n uint32) uint32 {
|
func (f *inFlow) newLimit(n uint32) uint32 {
|
||||||
f.mu.Lock()
|
f.mu.Lock()
|
||||||
defer f.mu.Unlock()
|
|
||||||
d := n - f.limit
|
d := n - f.limit
|
||||||
f.limit = n
|
f.limit = n
|
||||||
|
f.mu.Unlock()
|
||||||
return d
|
return d
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -263,7 +154,6 @@ func (f *inFlow) maybeAdjust(n uint32) uint32 {
|
||||||
n = uint32(math.MaxInt32)
|
n = uint32(math.MaxInt32)
|
||||||
}
|
}
|
||||||
f.mu.Lock()
|
f.mu.Lock()
|
||||||
defer f.mu.Unlock()
|
|
||||||
// estSenderQuota is the receiver's view of the maximum number of bytes the sender
|
// estSenderQuota is the receiver's view of the maximum number of bytes the sender
|
||||||
// can send without a window update.
|
// can send without a window update.
|
||||||
estSenderQuota := int32(f.limit - (f.pendingData + f.pendingUpdate))
|
estSenderQuota := int32(f.limit - (f.pendingData + f.pendingUpdate))
|
||||||
|
|
@ -275,7 +165,7 @@ func (f *inFlow) maybeAdjust(n uint32) uint32 {
|
||||||
// for this message. Therefore we must send an update over the limit since there's an active read
|
// for this message. Therefore we must send an update over the limit since there's an active read
|
||||||
// request from the application.
|
// request from the application.
|
||||||
if estUntransmittedData > estSenderQuota {
|
if estUntransmittedData > estSenderQuota {
|
||||||
// Sender's window shouldn't go more than 2^31 - 1 as speecified in the HTTP spec.
|
// Sender's window shouldn't go more than 2^31 - 1 as specified in the HTTP spec.
|
||||||
if f.limit+n > maxWindowSize {
|
if f.limit+n > maxWindowSize {
|
||||||
f.delta = maxWindowSize - f.limit
|
f.delta = maxWindowSize - f.limit
|
||||||
} else {
|
} else {
|
||||||
|
|
@ -284,19 +174,24 @@ func (f *inFlow) maybeAdjust(n uint32) uint32 {
|
||||||
// is padded; We will fallback on the current available window(at least a 1/4th of the limit).
|
// is padded; We will fallback on the current available window(at least a 1/4th of the limit).
|
||||||
f.delta = n
|
f.delta = n
|
||||||
}
|
}
|
||||||
|
f.mu.Unlock()
|
||||||
return f.delta
|
return f.delta
|
||||||
}
|
}
|
||||||
|
f.mu.Unlock()
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
|
||||||
// onData is invoked when some data frame is received. It updates pendingData.
|
// onData is invoked when some data frame is received. It updates pendingData.
|
||||||
func (f *inFlow) onData(n uint32) error {
|
func (f *inFlow) onData(n uint32) error {
|
||||||
f.mu.Lock()
|
f.mu.Lock()
|
||||||
defer f.mu.Unlock()
|
|
||||||
f.pendingData += n
|
f.pendingData += n
|
||||||
if f.pendingData+f.pendingUpdate > f.limit+f.delta {
|
if f.pendingData+f.pendingUpdate > f.limit+f.delta {
|
||||||
return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", f.pendingData+f.pendingUpdate, f.limit)
|
limit := f.limit
|
||||||
|
rcvd := f.pendingData + f.pendingUpdate
|
||||||
|
f.mu.Unlock()
|
||||||
|
return fmt.Errorf("received %d-bytes data exceeding the limit %d bytes", rcvd, limit)
|
||||||
}
|
}
|
||||||
|
f.mu.Unlock()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -304,8 +199,8 @@ func (f *inFlow) onData(n uint32) error {
|
||||||
// to be sent to the peer.
|
// to be sent to the peer.
|
||||||
func (f *inFlow) onRead(n uint32) uint32 {
|
func (f *inFlow) onRead(n uint32) uint32 {
|
||||||
f.mu.Lock()
|
f.mu.Lock()
|
||||||
defer f.mu.Unlock()
|
|
||||||
if f.pendingData == 0 {
|
if f.pendingData == 0 {
|
||||||
|
f.mu.Unlock()
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
f.pendingData -= n
|
f.pendingData -= n
|
||||||
|
|
@ -320,15 +215,9 @@ func (f *inFlow) onRead(n uint32) uint32 {
|
||||||
if f.pendingUpdate >= f.limit/4 {
|
if f.pendingUpdate >= f.limit/4 {
|
||||||
wu := f.pendingUpdate
|
wu := f.pendingUpdate
|
||||||
f.pendingUpdate = 0
|
f.pendingUpdate = 0
|
||||||
|
f.mu.Unlock()
|
||||||
return wu
|
return wu
|
||||||
}
|
}
|
||||||
|
f.mu.Unlock()
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
|
||||||
func (f *inFlow) resetPendingUpdate() uint32 {
|
|
||||||
f.mu.Lock()
|
|
||||||
defer f.mu.Unlock()
|
|
||||||
n := f.pendingUpdate
|
|
||||||
f.pendingUpdate = 0
|
|
||||||
return n
|
|
||||||
}
|
|
||||||
|
|
@ -365,7 +365,7 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), trace
|
||||||
ht.stats.HandleRPC(s.ctx, inHeader)
|
ht.stats.HandleRPC(s.ctx, inHeader)
|
||||||
}
|
}
|
||||||
s.trReader = &transportReader{
|
s.trReader = &transportReader{
|
||||||
reader: &recvBufferReader{ctx: s.ctx, recv: s.buf},
|
reader: &recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: s.buf},
|
||||||
windowHandler: func(int) {},
|
windowHandler: func(int) {},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
File diff suppressed because it is too large
Load Diff
|
|
@ -52,27 +52,24 @@ var ErrIllegalHeaderWrite = errors.New("transport: the stream is done or WriteHe
|
||||||
// http2Server implements the ServerTransport interface with HTTP2.
|
// http2Server implements the ServerTransport interface with HTTP2.
|
||||||
type http2Server struct {
|
type http2Server struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
|
ctxDone <-chan struct{} // Cache the context.Done() chan
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
conn net.Conn
|
conn net.Conn
|
||||||
|
loopy *loopyWriter
|
||||||
|
readerDone chan struct{} // sync point to enable testing.
|
||||||
|
writerDone chan struct{} // sync point to enable testing.
|
||||||
remoteAddr net.Addr
|
remoteAddr net.Addr
|
||||||
localAddr net.Addr
|
localAddr net.Addr
|
||||||
maxStreamID uint32 // max stream ID ever seen
|
maxStreamID uint32 // max stream ID ever seen
|
||||||
authInfo credentials.AuthInfo // auth info about the connection
|
authInfo credentials.AuthInfo // auth info about the connection
|
||||||
inTapHandle tap.ServerInHandle
|
inTapHandle tap.ServerInHandle
|
||||||
framer *framer
|
framer *framer
|
||||||
hBuf *bytes.Buffer // the buffer for HPACK encoding
|
|
||||||
hEnc *hpack.Encoder // HPACK encoder
|
|
||||||
// The max number of concurrent streams.
|
// The max number of concurrent streams.
|
||||||
maxStreams uint32
|
maxStreams uint32
|
||||||
// controlBuf delivers all the control related tasks (e.g., window
|
// controlBuf delivers all the control related tasks (e.g., window
|
||||||
// updates, reset streams, and various settings) to the controller.
|
// updates, reset streams, and various settings) to the controller.
|
||||||
controlBuf *controlBuffer
|
controlBuf *controlBuffer
|
||||||
fc *inFlow
|
fc *trInFlow
|
||||||
// sendQuotaPool provides flow control to outbound message.
|
|
||||||
sendQuotaPool *quotaPool
|
|
||||||
// localSendQuota limits the amount of data that can be scheduled
|
|
||||||
// for writing before it is actually written out.
|
|
||||||
localSendQuota *quotaPool
|
|
||||||
stats stats.Handler
|
stats stats.Handler
|
||||||
// Flag to keep track of reading activity on transport.
|
// Flag to keep track of reading activity on transport.
|
||||||
// 1 is true and 0 is false.
|
// 1 is true and 0 is false.
|
||||||
|
|
@ -104,8 +101,6 @@ type http2Server struct {
|
||||||
drainChan chan struct{}
|
drainChan chan struct{}
|
||||||
state transportState
|
state transportState
|
||||||
activeStreams map[uint32]*Stream
|
activeStreams map[uint32]*Stream
|
||||||
// the per-stream outbound flow control window size set by the peer.
|
|
||||||
streamSendQuota uint32
|
|
||||||
// idle is the time instant when the connection went idle.
|
// idle is the time instant when the connection went idle.
|
||||||
// This is either the beginning of the connection or when the number of
|
// This is either the beginning of the connection or when the number of
|
||||||
// RPCs go down to 0.
|
// RPCs go down to 0.
|
||||||
|
|
@ -185,33 +180,30 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
|
||||||
if kep.MinTime == 0 {
|
if kep.MinTime == 0 {
|
||||||
kep.MinTime = defaultKeepalivePolicyMinTime
|
kep.MinTime = defaultKeepalivePolicyMinTime
|
||||||
}
|
}
|
||||||
var buf bytes.Buffer
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
t := &http2Server{
|
t := &http2Server{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
cancel: cancel,
|
cancel: cancel,
|
||||||
|
ctxDone: ctx.Done(),
|
||||||
conn: conn,
|
conn: conn,
|
||||||
remoteAddr: conn.RemoteAddr(),
|
remoteAddr: conn.RemoteAddr(),
|
||||||
localAddr: conn.LocalAddr(),
|
localAddr: conn.LocalAddr(),
|
||||||
authInfo: config.AuthInfo,
|
authInfo: config.AuthInfo,
|
||||||
framer: framer,
|
framer: framer,
|
||||||
hBuf: &buf,
|
readerDone: make(chan struct{}),
|
||||||
hEnc: hpack.NewEncoder(&buf),
|
writerDone: make(chan struct{}),
|
||||||
maxStreams: maxStreams,
|
maxStreams: maxStreams,
|
||||||
inTapHandle: config.InTapHandle,
|
inTapHandle: config.InTapHandle,
|
||||||
controlBuf: newControlBuffer(),
|
fc: &trInFlow{limit: uint32(icwz)},
|
||||||
fc: &inFlow{limit: uint32(icwz)},
|
|
||||||
sendQuotaPool: newQuotaPool(defaultWindowSize),
|
|
||||||
localSendQuota: newQuotaPool(defaultLocalSendQuota),
|
|
||||||
state: reachable,
|
state: reachable,
|
||||||
activeStreams: make(map[uint32]*Stream),
|
activeStreams: make(map[uint32]*Stream),
|
||||||
streamSendQuota: defaultWindowSize,
|
|
||||||
stats: config.StatsHandler,
|
stats: config.StatsHandler,
|
||||||
kp: kp,
|
kp: kp,
|
||||||
idle: time.Now(),
|
idle: time.Now(),
|
||||||
kep: kep,
|
kep: kep,
|
||||||
initialWindowSize: iwz,
|
initialWindowSize: iwz,
|
||||||
}
|
}
|
||||||
|
t.controlBuf = newControlBuffer(t.ctxDone)
|
||||||
if dynamicWindow {
|
if dynamicWindow {
|
||||||
t.bdpEst = &bdpEstimator{
|
t.bdpEst = &bdpEstimator{
|
||||||
bdp: initialWindowSize,
|
bdp: initialWindowSize,
|
||||||
|
|
@ -258,8 +250,11 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
|
||||||
t.handleSettings(sf)
|
t.handleSettings(sf)
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
loopyWriter(t.ctx, t.controlBuf, t.itemHandler)
|
t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
|
||||||
|
t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
|
||||||
|
t.loopy.run()
|
||||||
t.conn.Close()
|
t.conn.Close()
|
||||||
|
close(t.writerDone)
|
||||||
}()
|
}()
|
||||||
go t.keepalive()
|
go t.keepalive()
|
||||||
return t, nil
|
return t, nil
|
||||||
|
|
@ -268,12 +263,16 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
|
||||||
// operateHeader takes action on the decoded headers.
|
// operateHeader takes action on the decoded headers.
|
||||||
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (close bool) {
|
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (close bool) {
|
||||||
streamID := frame.Header().StreamID
|
streamID := frame.Header().StreamID
|
||||||
|
|
||||||
var state decodeState
|
var state decodeState
|
||||||
for _, hf := range frame.Fields {
|
for _, hf := range frame.Fields {
|
||||||
if err := state.processHeaderField(hf); err != nil {
|
if err := state.processHeaderField(hf); err != nil {
|
||||||
if se, ok := err.(StreamError); ok {
|
if se, ok := err.(StreamError); ok {
|
||||||
t.controlBuf.put(&resetStream{streamID, statusCodeConvTab[se.Code]})
|
t.controlBuf.put(&cleanupStream{
|
||||||
|
streamID: streamID,
|
||||||
|
rst: true,
|
||||||
|
rstCode: statusCodeConvTab[se.Code],
|
||||||
|
onWrite: func() {},
|
||||||
|
})
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
@ -325,7 +324,12 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||||
s.ctx, err = t.inTapHandle(s.ctx, info)
|
s.ctx, err = t.inTapHandle(s.ctx, info)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
|
warningf("transport: http2Server.operateHeaders got an error from InTapHandle: %v", err)
|
||||||
t.controlBuf.put(&resetStream{s.id, http2.ErrCodeRefusedStream})
|
t.controlBuf.put(&cleanupStream{
|
||||||
|
streamID: s.id,
|
||||||
|
rst: true,
|
||||||
|
rstCode: http2.ErrCodeRefusedStream,
|
||||||
|
onWrite: func() {},
|
||||||
|
})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -336,7 +340,12 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||||
}
|
}
|
||||||
if uint32(len(t.activeStreams)) >= t.maxStreams {
|
if uint32(len(t.activeStreams)) >= t.maxStreams {
|
||||||
t.mu.Unlock()
|
t.mu.Unlock()
|
||||||
t.controlBuf.put(&resetStream{streamID, http2.ErrCodeRefusedStream})
|
t.controlBuf.put(&cleanupStream{
|
||||||
|
streamID: streamID,
|
||||||
|
rst: true,
|
||||||
|
rstCode: http2.ErrCodeRefusedStream,
|
||||||
|
onWrite: func() {},
|
||||||
|
})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if streamID%2 != 1 || streamID <= t.maxStreamID {
|
if streamID%2 != 1 || streamID <= t.maxStreamID {
|
||||||
|
|
@ -346,7 +355,6 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
t.maxStreamID = streamID
|
t.maxStreamID = streamID
|
||||||
s.sendQuotaPool = newQuotaPool(int(t.streamSendQuota))
|
|
||||||
t.activeStreams[streamID] = s
|
t.activeStreams[streamID] = s
|
||||||
if len(t.activeStreams) == 1 {
|
if len(t.activeStreams) == 1 {
|
||||||
t.idle = time.Time{}
|
t.idle = time.Time{}
|
||||||
|
|
@ -367,19 +375,18 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||||
}
|
}
|
||||||
t.stats.HandleRPC(s.ctx, inHeader)
|
t.stats.HandleRPC(s.ctx, inHeader)
|
||||||
}
|
}
|
||||||
|
s.ctxDone = s.ctx.Done()
|
||||||
|
s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
|
||||||
s.trReader = &transportReader{
|
s.trReader = &transportReader{
|
||||||
reader: &recvBufferReader{
|
reader: &recvBufferReader{
|
||||||
ctx: s.ctx,
|
ctx: s.ctx,
|
||||||
|
ctxDone: s.ctxDone,
|
||||||
recv: s.buf,
|
recv: s.buf,
|
||||||
},
|
},
|
||||||
windowHandler: func(n int) {
|
windowHandler: func(n int) {
|
||||||
t.updateWindow(s, uint32(n))
|
t.updateWindow(s, uint32(n))
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
s.waiters = waiters{
|
|
||||||
ctx: s.ctx,
|
|
||||||
tctx: t.ctx,
|
|
||||||
}
|
|
||||||
handle(s)
|
handle(s)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
@ -388,18 +395,26 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||||
// typically run in a separate goroutine.
|
// typically run in a separate goroutine.
|
||||||
// traceCtx attaches trace to ctx and returns the new context.
|
// traceCtx attaches trace to ctx and returns the new context.
|
||||||
func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
|
func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
|
||||||
|
defer close(t.readerDone)
|
||||||
for {
|
for {
|
||||||
frame, err := t.framer.fr.ReadFrame()
|
frame, err := t.framer.fr.ReadFrame()
|
||||||
atomic.StoreUint32(&t.activity, 1)
|
atomic.StoreUint32(&t.activity, 1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if se, ok := err.(http2.StreamError); ok {
|
if se, ok := err.(http2.StreamError); ok {
|
||||||
|
warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
|
||||||
t.mu.Lock()
|
t.mu.Lock()
|
||||||
s := t.activeStreams[se.StreamID]
|
s := t.activeStreams[se.StreamID]
|
||||||
t.mu.Unlock()
|
t.mu.Unlock()
|
||||||
if s != nil {
|
if s != nil {
|
||||||
t.closeStream(s)
|
t.closeStream(s, true, se.Code, nil)
|
||||||
|
} else {
|
||||||
|
t.controlBuf.put(&cleanupStream{
|
||||||
|
streamID: se.StreamID,
|
||||||
|
rst: true,
|
||||||
|
rstCode: se.Code,
|
||||||
|
onWrite: func() {},
|
||||||
|
})
|
||||||
}
|
}
|
||||||
t.controlBuf.put(&resetStream{se.StreamID, se.Code})
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
if err == io.EOF || err == io.ErrUnexpectedEOF {
|
||||||
|
|
@ -453,33 +468,20 @@ func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
|
||||||
// of stream if the application is requesting data larger in size than
|
// of stream if the application is requesting data larger in size than
|
||||||
// the window.
|
// the window.
|
||||||
func (t *http2Server) adjustWindow(s *Stream, n uint32) {
|
func (t *http2Server) adjustWindow(s *Stream, n uint32) {
|
||||||
s.mu.Lock()
|
|
||||||
defer s.mu.Unlock()
|
|
||||||
if s.state == streamDone {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if w := s.fc.maybeAdjust(n); w > 0 {
|
if w := s.fc.maybeAdjust(n); w > 0 {
|
||||||
if cw := t.fc.resetPendingUpdate(); cw > 0 {
|
t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
|
||||||
t.controlBuf.put(&windowUpdate{0, cw})
|
|
||||||
}
|
|
||||||
t.controlBuf.put(&windowUpdate{s.id, w})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// updateWindow adjusts the inbound quota for the stream and the transport.
|
// updateWindow adjusts the inbound quota for the stream and the transport.
|
||||||
// Window updates will deliver to the controller for sending when
|
// Window updates will deliver to the controller for sending when
|
||||||
// the cumulative quota exceeds the corresponding threshold.
|
// the cumulative quota exceeds the corresponding threshold.
|
||||||
func (t *http2Server) updateWindow(s *Stream, n uint32) {
|
func (t *http2Server) updateWindow(s *Stream, n uint32) {
|
||||||
s.mu.Lock()
|
|
||||||
defer s.mu.Unlock()
|
|
||||||
if s.state == streamDone {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if w := s.fc.onRead(n); w > 0 {
|
if w := s.fc.onRead(n); w > 0 {
|
||||||
if cw := t.fc.resetPendingUpdate(); cw > 0 {
|
t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id,
|
||||||
t.controlBuf.put(&windowUpdate{0, cw})
|
increment: w,
|
||||||
}
|
})
|
||||||
t.controlBuf.put(&windowUpdate{s.id, w})
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -493,8 +495,11 @@ func (t *http2Server) updateFlowControl(n uint32) {
|
||||||
}
|
}
|
||||||
t.initialWindowSize = int32(n)
|
t.initialWindowSize = int32(n)
|
||||||
t.mu.Unlock()
|
t.mu.Unlock()
|
||||||
t.controlBuf.put(&windowUpdate{0, t.fc.newLimit(n)})
|
t.controlBuf.put(&outgoingWindowUpdate{
|
||||||
t.controlBuf.put(&settings{
|
streamID: 0,
|
||||||
|
increment: t.fc.newLimit(n),
|
||||||
|
})
|
||||||
|
t.controlBuf.put(&outgoingSettings{
|
||||||
ss: []http2.Setting{
|
ss: []http2.Setting{
|
||||||
{
|
{
|
||||||
ID: http2.SettingInitialWindowSize,
|
ID: http2.SettingInitialWindowSize,
|
||||||
|
|
@ -519,23 +524,22 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
|
||||||
// Decoupling the connection flow control will prevent other
|
// Decoupling the connection flow control will prevent other
|
||||||
// active(fast) streams from starving in presence of slow or
|
// active(fast) streams from starving in presence of slow or
|
||||||
// inactive streams.
|
// inactive streams.
|
||||||
//
|
if w := t.fc.onData(uint32(size)); w > 0 {
|
||||||
// Furthermore, if a bdpPing is being sent out we can piggyback
|
t.controlBuf.put(&outgoingWindowUpdate{
|
||||||
// connection's window update for the bytes we just received.
|
streamID: 0,
|
||||||
|
increment: w,
|
||||||
|
})
|
||||||
|
}
|
||||||
if sendBDPPing {
|
if sendBDPPing {
|
||||||
if size != 0 { // Could be an empty frame.
|
// Avoid excessive ping detection (e.g. in an L7 proxy)
|
||||||
t.controlBuf.put(&windowUpdate{0, uint32(size)})
|
// by sending a window update prior to the BDP ping.
|
||||||
|
if w := t.fc.reset(); w > 0 {
|
||||||
|
t.controlBuf.put(&outgoingWindowUpdate{
|
||||||
|
streamID: 0,
|
||||||
|
increment: w,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
t.controlBuf.put(bdpPing)
|
t.controlBuf.put(bdpPing)
|
||||||
} else {
|
|
||||||
if err := t.fc.onData(uint32(size)); err != nil {
|
|
||||||
errorf("transport: http2Server %v", err)
|
|
||||||
t.Close()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if w := t.fc.onRead(uint32(size)); w > 0 {
|
|
||||||
t.controlBuf.put(&windowUpdate{0, w})
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
// Select the right stream to dispatch.
|
// Select the right stream to dispatch.
|
||||||
s, ok := t.getStream(f)
|
s, ok := t.getStream(f)
|
||||||
|
|
@ -543,23 +547,15 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if size > 0 {
|
if size > 0 {
|
||||||
s.mu.Lock()
|
|
||||||
if s.state == streamDone {
|
|
||||||
s.mu.Unlock()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if err := s.fc.onData(uint32(size)); err != nil {
|
if err := s.fc.onData(uint32(size)); err != nil {
|
||||||
s.mu.Unlock()
|
t.closeStream(s, true, http2.ErrCodeFlowControl, nil)
|
||||||
t.closeStream(s)
|
|
||||||
t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl})
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if f.Header().Flags.Has(http2.FlagDataPadded) {
|
if f.Header().Flags.Has(http2.FlagDataPadded) {
|
||||||
if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
|
if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
|
||||||
t.controlBuf.put(&windowUpdate{s.id, w})
|
t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
s.mu.Unlock()
|
|
||||||
// TODO(bradfitz, zhaoq): A copy is required here because there is no
|
// TODO(bradfitz, zhaoq): A copy is required here because there is no
|
||||||
// guarantee f.Data() is consumed before the arrival of next frame.
|
// guarantee f.Data() is consumed before the arrival of next frame.
|
||||||
// Can this copy be eliminated?
|
// Can this copy be eliminated?
|
||||||
|
|
@ -571,11 +567,7 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
|
||||||
}
|
}
|
||||||
if f.Header().Flags.Has(http2.FlagDataEndStream) {
|
if f.Header().Flags.Has(http2.FlagDataEndStream) {
|
||||||
// Received the end of stream from the client.
|
// Received the end of stream from the client.
|
||||||
s.mu.Lock()
|
s.compareAndSwapState(streamActive, streamReadDone)
|
||||||
if s.state != streamDone {
|
|
||||||
s.state = streamReadDone
|
|
||||||
}
|
|
||||||
s.mu.Unlock()
|
|
||||||
s.write(recvMsg{err: io.EOF})
|
s.write(recvMsg{err: io.EOF})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -585,50 +577,21 @@ func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
|
||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
t.closeStream(s)
|
t.closeStream(s, false, 0, nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
|
func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
|
||||||
if f.IsAck() {
|
if f.IsAck() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
var rs []http2.Setting
|
var ss []http2.Setting
|
||||||
var ps []http2.Setting
|
|
||||||
f.ForeachSetting(func(s http2.Setting) error {
|
f.ForeachSetting(func(s http2.Setting) error {
|
||||||
if t.isRestrictive(s) {
|
ss = append(ss, s)
|
||||||
rs = append(rs, s)
|
|
||||||
} else {
|
|
||||||
ps = append(ps, s)
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
t.applySettings(rs)
|
t.controlBuf.put(&incomingSettings{
|
||||||
t.controlBuf.put(&settingsAck{})
|
ss: ss,
|
||||||
t.applySettings(ps)
|
})
|
||||||
}
|
|
||||||
|
|
||||||
func (t *http2Server) isRestrictive(s http2.Setting) bool {
|
|
||||||
switch s.ID {
|
|
||||||
case http2.SettingInitialWindowSize:
|
|
||||||
// Note: we don't acquire a lock here to read streamSendQuota
|
|
||||||
// because the same goroutine updates it later.
|
|
||||||
return s.Val < t.streamSendQuota
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
func (t *http2Server) applySettings(ss []http2.Setting) {
|
|
||||||
for _, s := range ss {
|
|
||||||
if s.ID == http2.SettingInitialWindowSize {
|
|
||||||
t.mu.Lock()
|
|
||||||
for _, stream := range t.activeStreams {
|
|
||||||
stream.sendQuotaPool.addAndUpdate(int(s.Val) - int(t.streamSendQuota))
|
|
||||||
}
|
|
||||||
t.streamSendQuota = s.Val
|
|
||||||
t.mu.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|
@ -687,30 +650,15 @@ func (t *http2Server) handlePing(f *http2.PingFrame) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
|
func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
|
||||||
id := f.Header().StreamID
|
t.controlBuf.put(&incomingWindowUpdate{
|
||||||
incr := f.Increment
|
streamID: f.Header().StreamID,
|
||||||
if id == 0 {
|
increment: f.Increment,
|
||||||
t.sendQuotaPool.add(int(incr))
|
})
|
||||||
return
|
|
||||||
}
|
|
||||||
if s, ok := t.getStream(f); ok {
|
|
||||||
s.sendQuotaPool.add(int(incr))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// WriteHeader sends the header metedata md back to the client.
|
// WriteHeader sends the header metedata md back to the client.
|
||||||
func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
|
func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
|
||||||
select {
|
if s.headerOk || s.getState() == streamDone {
|
||||||
case <-s.ctx.Done():
|
|
||||||
return ContextErr(s.ctx.Err())
|
|
||||||
case <-t.ctx.Done():
|
|
||||||
return ErrConnClosing
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
s.mu.Lock()
|
|
||||||
if s.headerOk || s.state == streamDone {
|
|
||||||
s.mu.Unlock()
|
|
||||||
return ErrIllegalHeaderWrite
|
return ErrIllegalHeaderWrite
|
||||||
}
|
}
|
||||||
s.headerOk = true
|
s.headerOk = true
|
||||||
|
|
@ -722,7 +670,6 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
md = s.header
|
md = s.header
|
||||||
s.mu.Unlock()
|
|
||||||
// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
|
// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
|
||||||
// first and create a slice of that exact size.
|
// first and create a slice of that exact size.
|
||||||
headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
|
headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
|
||||||
|
|
@ -744,6 +691,10 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
|
||||||
streamID: s.id,
|
streamID: s.id,
|
||||||
hf: headerFields,
|
hf: headerFields,
|
||||||
endStream: false,
|
endStream: false,
|
||||||
|
onWrite: func() {
|
||||||
|
atomic.StoreUint32(&t.resetPingStrikes, 1)
|
||||||
|
},
|
||||||
|
wq: s.wq,
|
||||||
})
|
})
|
||||||
if t.stats != nil {
|
if t.stats != nil {
|
||||||
// Note: WireLength is not set in outHeader.
|
// Note: WireLength is not set in outHeader.
|
||||||
|
|
@ -759,35 +710,19 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
|
||||||
// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
|
// TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
|
||||||
// OK is adopted.
|
// OK is adopted.
|
||||||
func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
|
func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
|
||||||
select {
|
if !s.headerOk && s.header.Len() > 0 {
|
||||||
case <-t.ctx.Done():
|
if err := t.WriteHeader(s, nil); err != nil {
|
||||||
return ErrConnClosing
|
return err
|
||||||
default:
|
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
var headersSent, hasHeader bool
|
if s.getState() == streamDone {
|
||||||
s.mu.Lock()
|
|
||||||
if s.state == streamDone {
|
|
||||||
s.mu.Unlock()
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if s.headerOk {
|
|
||||||
headersSent = true
|
|
||||||
}
|
}
|
||||||
if s.header.Len() > 0 {
|
|
||||||
hasHeader = true
|
|
||||||
}
|
|
||||||
s.mu.Unlock()
|
|
||||||
|
|
||||||
if !headersSent && hasHeader {
|
|
||||||
t.WriteHeader(s, nil)
|
|
||||||
headersSent = true
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
|
// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
|
||||||
// first and create a slice of that exact size.
|
// first and create a slice of that exact size.
|
||||||
headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
|
headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
|
||||||
if !headersSent {
|
if !s.headerOk {
|
||||||
headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
|
headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
|
||||||
headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
|
headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)})
|
||||||
}
|
}
|
||||||
|
|
@ -814,108 +749,66 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
|
||||||
headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
|
headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
t.controlBuf.put(&headerFrame{
|
trailer := &headerFrame{
|
||||||
streamID: s.id,
|
streamID: s.id,
|
||||||
hf: headerFields,
|
hf: headerFields,
|
||||||
endStream: true,
|
endStream: true,
|
||||||
})
|
onWrite: func() {
|
||||||
|
atomic.StoreUint32(&t.resetPingStrikes, 1)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
t.closeStream(s, false, 0, trailer)
|
||||||
if t.stats != nil {
|
if t.stats != nil {
|
||||||
t.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
|
t.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
|
||||||
}
|
}
|
||||||
t.closeStream(s)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
|
// Write converts the data into HTTP2 data frame and sends it out. Non-nil error
|
||||||
// is returns if it fails (e.g., framing error, transport error).
|
// is returns if it fails (e.g., framing error, transport error).
|
||||||
func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
|
func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
|
||||||
|
if !s.headerOk { // Headers haven't been written yet.
|
||||||
|
if err := t.WriteHeader(s, nil); err != nil {
|
||||||
|
// TODO(mmukhi, dfawley): Make sure this is the right code to return.
|
||||||
|
return streamErrorf(codes.Internal, "transport: %v", err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Writing headers checks for this condition.
|
||||||
|
if s.getState() == streamDone {
|
||||||
|
// TODO(mmukhi, dfawley): Should the server write also return io.EOF?
|
||||||
|
s.cancel()
|
||||||
select {
|
select {
|
||||||
case <-s.ctx.Done():
|
|
||||||
return ContextErr(s.ctx.Err())
|
|
||||||
case <-t.ctx.Done():
|
case <-t.ctx.Done():
|
||||||
return ErrConnClosing
|
return ErrConnClosing
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
return ContextErr(s.ctx.Err())
|
||||||
var writeHeaderFrame bool
|
|
||||||
s.mu.Lock()
|
|
||||||
if !s.headerOk {
|
|
||||||
writeHeaderFrame = true
|
|
||||||
}
|
}
|
||||||
s.mu.Unlock()
|
|
||||||
if writeHeaderFrame {
|
|
||||||
t.WriteHeader(s, nil)
|
|
||||||
}
|
}
|
||||||
// Add data to header frame so that we can equally distribute data across frames.
|
// Add some data to header frame so that we can equally distribute bytes across frames.
|
||||||
emptyLen := http2MaxFrameLen - len(hdr)
|
emptyLen := http2MaxFrameLen - len(hdr)
|
||||||
if emptyLen > len(data) {
|
if emptyLen > len(data) {
|
||||||
emptyLen = len(data)
|
emptyLen = len(data)
|
||||||
}
|
}
|
||||||
hdr = append(hdr, data[:emptyLen]...)
|
hdr = append(hdr, data[:emptyLen]...)
|
||||||
data = data[emptyLen:]
|
data = data[emptyLen:]
|
||||||
var (
|
df := &dataFrame{
|
||||||
streamQuota int
|
streamID: s.id,
|
||||||
streamQuotaVer uint32
|
h: hdr,
|
||||||
err error
|
d: data,
|
||||||
)
|
onEachWrite: func() {
|
||||||
for _, r := range [][]byte{hdr, data} {
|
atomic.StoreUint32(&t.resetPingStrikes, 1)
|
||||||
for len(r) > 0 {
|
},
|
||||||
size := http2MaxFrameLen
|
|
||||||
if size > len(r) {
|
|
||||||
size = len(r)
|
|
||||||
}
|
}
|
||||||
if streamQuota == 0 { // Used up all the locally cached stream quota.
|
if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
|
||||||
// Get all the stream quota there is.
|
select {
|
||||||
streamQuota, streamQuotaVer, err = s.sendQuotaPool.get(math.MaxInt32, s.waiters)
|
case <-t.ctx.Done():
|
||||||
if err != nil {
|
return ErrConnClosing
|
||||||
return err
|
default:
|
||||||
}
|
}
|
||||||
|
return ContextErr(s.ctx.Err())
|
||||||
}
|
}
|
||||||
if size > streamQuota {
|
return t.controlBuf.put(df)
|
||||||
size = streamQuota
|
|
||||||
}
|
|
||||||
// Get size worth quota from transport.
|
|
||||||
tq, _, err := t.sendQuotaPool.get(size, s.waiters)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if tq < size {
|
|
||||||
size = tq
|
|
||||||
}
|
|
||||||
ltq, _, err := t.localSendQuota.get(size, s.waiters)
|
|
||||||
if err != nil {
|
|
||||||
// Add the acquired quota back to transport.
|
|
||||||
t.sendQuotaPool.add(tq)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
// even if ltq is smaller than size we don't adjust size since,
|
|
||||||
// ltq is only a soft limit.
|
|
||||||
streamQuota -= size
|
|
||||||
p := r[:size]
|
|
||||||
success := func() {
|
|
||||||
ltq := ltq
|
|
||||||
t.controlBuf.put(&dataFrame{streamID: s.id, endStream: false, d: p, f: func() {
|
|
||||||
t.localSendQuota.add(ltq)
|
|
||||||
}})
|
|
||||||
r = r[size:]
|
|
||||||
}
|
|
||||||
failure := func() { // The stream quota version must have changed.
|
|
||||||
// Our streamQuota cache is invalidated now, so give it back.
|
|
||||||
s.sendQuotaPool.lockedAdd(streamQuota + size)
|
|
||||||
}
|
|
||||||
if !s.sendQuotaPool.compareAndExecute(streamQuotaVer, success, failure) {
|
|
||||||
// Couldn't send this chunk out.
|
|
||||||
t.sendQuotaPool.add(size)
|
|
||||||
t.localSendQuota.add(ltq)
|
|
||||||
streamQuota = 0
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if streamQuota > 0 {
|
|
||||||
// ADd the left over quota back to stream.
|
|
||||||
s.sendQuotaPool.add(streamQuota)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// keepalive running in a separate goroutine does the following:
|
// keepalive running in a separate goroutine does the following:
|
||||||
|
|
@ -998,136 +891,6 @@ func (t *http2Server) keepalive() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
|
|
||||||
|
|
||||||
// TODO(mmukhi): A lot of this code(and code in other places in the tranpsort layer)
|
|
||||||
// is duplicated between the client and the server.
|
|
||||||
// The transport layer needs to be refactored to take care of this.
|
|
||||||
func (t *http2Server) itemHandler(i item) error {
|
|
||||||
switch i := i.(type) {
|
|
||||||
case *dataFrame:
|
|
||||||
// Reset ping strikes when sending data since this might cause
|
|
||||||
// the peer to send ping.
|
|
||||||
atomic.StoreUint32(&t.resetPingStrikes, 1)
|
|
||||||
if err := t.framer.fr.WriteData(i.streamID, i.endStream, i.d); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
i.f()
|
|
||||||
return nil
|
|
||||||
case *headerFrame:
|
|
||||||
t.hBuf.Reset()
|
|
||||||
for _, f := range i.hf {
|
|
||||||
t.hEnc.WriteField(f)
|
|
||||||
}
|
|
||||||
first := true
|
|
||||||
endHeaders := false
|
|
||||||
for !endHeaders {
|
|
||||||
size := t.hBuf.Len()
|
|
||||||
if size > http2MaxFrameLen {
|
|
||||||
size = http2MaxFrameLen
|
|
||||||
} else {
|
|
||||||
endHeaders = true
|
|
||||||
}
|
|
||||||
var err error
|
|
||||||
if first {
|
|
||||||
first = false
|
|
||||||
err = t.framer.fr.WriteHeaders(http2.HeadersFrameParam{
|
|
||||||
StreamID: i.streamID,
|
|
||||||
BlockFragment: t.hBuf.Next(size),
|
|
||||||
EndStream: i.endStream,
|
|
||||||
EndHeaders: endHeaders,
|
|
||||||
})
|
|
||||||
} else {
|
|
||||||
err = t.framer.fr.WriteContinuation(
|
|
||||||
i.streamID,
|
|
||||||
endHeaders,
|
|
||||||
t.hBuf.Next(size),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
atomic.StoreUint32(&t.resetPingStrikes, 1)
|
|
||||||
return nil
|
|
||||||
case *windowUpdate:
|
|
||||||
return t.framer.fr.WriteWindowUpdate(i.streamID, i.increment)
|
|
||||||
case *settings:
|
|
||||||
return t.framer.fr.WriteSettings(i.ss...)
|
|
||||||
case *settingsAck:
|
|
||||||
return t.framer.fr.WriteSettingsAck()
|
|
||||||
case *resetStream:
|
|
||||||
return t.framer.fr.WriteRSTStream(i.streamID, i.code)
|
|
||||||
case *goAway:
|
|
||||||
t.mu.Lock()
|
|
||||||
if t.state == closing {
|
|
||||||
t.mu.Unlock()
|
|
||||||
// The transport is closing.
|
|
||||||
return fmt.Errorf("transport: Connection closing")
|
|
||||||
}
|
|
||||||
sid := t.maxStreamID
|
|
||||||
if !i.headsUp {
|
|
||||||
// Stop accepting more streams now.
|
|
||||||
t.state = draining
|
|
||||||
if len(t.activeStreams) == 0 {
|
|
||||||
i.closeConn = true
|
|
||||||
}
|
|
||||||
t.mu.Unlock()
|
|
||||||
if err := t.framer.fr.WriteGoAway(sid, i.code, i.debugData); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if i.closeConn {
|
|
||||||
// Abruptly close the connection following the GoAway (via
|
|
||||||
// loopywriter). But flush out what's inside the buffer first.
|
|
||||||
t.controlBuf.put(&flushIO{closeTr: true})
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
t.mu.Unlock()
|
|
||||||
// For a graceful close, send out a GoAway with stream ID of MaxUInt32,
|
|
||||||
// Follow that with a ping and wait for the ack to come back or a timer
|
|
||||||
// to expire. During this time accept new streams since they might have
|
|
||||||
// originated before the GoAway reaches the client.
|
|
||||||
// After getting the ack or timer expiration send out another GoAway this
|
|
||||||
// time with an ID of the max stream server intends to process.
|
|
||||||
if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
go func() {
|
|
||||||
timer := time.NewTimer(time.Minute)
|
|
||||||
defer timer.Stop()
|
|
||||||
select {
|
|
||||||
case <-t.drainChan:
|
|
||||||
case <-timer.C:
|
|
||||||
case <-t.ctx.Done():
|
|
||||||
return
|
|
||||||
}
|
|
||||||
t.controlBuf.put(&goAway{code: i.code, debugData: i.debugData})
|
|
||||||
}()
|
|
||||||
return nil
|
|
||||||
case *flushIO:
|
|
||||||
if err := t.framer.writer.Flush(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if i.closeTr {
|
|
||||||
return ErrConnClosing
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
case *ping:
|
|
||||||
if !i.ack {
|
|
||||||
t.bdpEst.timesnap(i.data)
|
|
||||||
}
|
|
||||||
return t.framer.fr.WritePing(i.ack, i.data)
|
|
||||||
default:
|
|
||||||
err := status.Errorf(codes.Internal, "transport: http2Server.controller got unexpected item type %t", i)
|
|
||||||
errorf("%v", err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close starts shutting down the http2Server transport.
|
// Close starts shutting down the http2Server transport.
|
||||||
// TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
|
// TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
|
||||||
// could cause some resource issue. Revisit this later.
|
// could cause some resource issue. Revisit this later.
|
||||||
|
|
@ -1141,6 +904,7 @@ func (t *http2Server) Close() error {
|
||||||
streams := t.activeStreams
|
streams := t.activeStreams
|
||||||
t.activeStreams = nil
|
t.activeStreams = nil
|
||||||
t.mu.Unlock()
|
t.mu.Unlock()
|
||||||
|
t.controlBuf.finish()
|
||||||
t.cancel()
|
t.cancel()
|
||||||
err := t.conn.Close()
|
err := t.conn.Close()
|
||||||
// Cancel all active streams.
|
// Cancel all active streams.
|
||||||
|
|
@ -1156,27 +920,36 @@ func (t *http2Server) Close() error {
|
||||||
|
|
||||||
// closeStream clears the footprint of a stream when the stream is not needed
|
// closeStream clears the footprint of a stream when the stream is not needed
|
||||||
// any more.
|
// any more.
|
||||||
func (t *http2Server) closeStream(s *Stream) {
|
func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame) {
|
||||||
t.mu.Lock()
|
if s.swapState(streamDone) == streamDone {
|
||||||
delete(t.activeStreams, s.id)
|
// If the stream was already done, return.
|
||||||
if len(t.activeStreams) == 0 {
|
return
|
||||||
t.idle = time.Now()
|
|
||||||
}
|
}
|
||||||
if t.state == draining && len(t.activeStreams) == 0 {
|
|
||||||
defer t.controlBuf.put(&flushIO{closeTr: true})
|
|
||||||
}
|
|
||||||
t.mu.Unlock()
|
|
||||||
// In case stream sending and receiving are invoked in separate
|
// In case stream sending and receiving are invoked in separate
|
||||||
// goroutines (e.g., bi-directional streaming), cancel needs to be
|
// goroutines (e.g., bi-directional streaming), cancel needs to be
|
||||||
// called to interrupt the potential blocking on other goroutines.
|
// called to interrupt the potential blocking on other goroutines.
|
||||||
s.cancel()
|
s.cancel()
|
||||||
s.mu.Lock()
|
cleanup := &cleanupStream{
|
||||||
if s.state == streamDone {
|
streamID: s.id,
|
||||||
s.mu.Unlock()
|
rst: rst,
|
||||||
return
|
rstCode: rstCode,
|
||||||
|
onWrite: func() {
|
||||||
|
t.mu.Lock()
|
||||||
|
if t.activeStreams != nil {
|
||||||
|
delete(t.activeStreams, s.id)
|
||||||
|
if len(t.activeStreams) == 0 {
|
||||||
|
t.idle = time.Now()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
t.mu.Unlock()
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if hdr != nil {
|
||||||
|
hdr.cleanup = cleanup
|
||||||
|
t.controlBuf.put(hdr)
|
||||||
|
} else {
|
||||||
|
t.controlBuf.put(cleanup)
|
||||||
}
|
}
|
||||||
s.state = streamDone
|
|
||||||
s.mu.Unlock()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *http2Server) RemoteAddr() net.Addr {
|
func (t *http2Server) RemoteAddr() net.Addr {
|
||||||
|
|
@ -1197,6 +970,63 @@ func (t *http2Server) drain(code http2.ErrCode, debugData []byte) {
|
||||||
t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true})
|
t.controlBuf.put(&goAway{code: code, debugData: debugData, headsUp: true})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
|
||||||
|
|
||||||
|
// Handles outgoing GoAway and returns true if loopy needs to put itself
|
||||||
|
// in draining mode.
|
||||||
|
func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
|
||||||
|
t.mu.Lock()
|
||||||
|
if t.state == closing { // TODO(mmukhi): This seems unnecessary.
|
||||||
|
t.mu.Unlock()
|
||||||
|
// The transport is closing.
|
||||||
|
return false, ErrConnClosing
|
||||||
|
}
|
||||||
|
sid := t.maxStreamID
|
||||||
|
if !g.headsUp {
|
||||||
|
// Stop accepting more streams now.
|
||||||
|
t.state = draining
|
||||||
|
if len(t.activeStreams) == 0 {
|
||||||
|
g.closeConn = true
|
||||||
|
}
|
||||||
|
t.mu.Unlock()
|
||||||
|
if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
if g.closeConn {
|
||||||
|
// Abruptly close the connection following the GoAway (via
|
||||||
|
// loopywriter). But flush out what's inside the buffer first.
|
||||||
|
t.framer.writer.Flush()
|
||||||
|
return false, fmt.Errorf("transport: Connection closing")
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
t.mu.Unlock()
|
||||||
|
// For a graceful close, send out a GoAway with stream ID of MaxUInt32,
|
||||||
|
// Follow that with a ping and wait for the ack to come back or a timer
|
||||||
|
// to expire. During this time accept new streams since they might have
|
||||||
|
// originated before the GoAway reaches the client.
|
||||||
|
// After getting the ack or timer expiration send out another GoAway this
|
||||||
|
// time with an ID of the max stream server intends to process.
|
||||||
|
if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, []byte{}); err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
go func() {
|
||||||
|
timer := time.NewTimer(time.Minute)
|
||||||
|
defer timer.Stop()
|
||||||
|
select {
|
||||||
|
case <-t.drainChan:
|
||||||
|
case <-timer.C:
|
||||||
|
case <-t.ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData})
|
||||||
|
}()
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
var rgen = rand.New(rand.NewSource(time.Now().UnixNano()))
|
var rgen = rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||||
|
|
||||||
func getJitter(v time.Duration) time.Duration {
|
func getJitter(v time.Duration) time.Duration {
|
||||||
|
|
|
||||||
|
|
@ -23,7 +23,6 @@ import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"encoding/base64"
|
"encoding/base64"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
@ -509,19 +508,63 @@ func decodeGrpcMessageUnchecked(msg string) string {
|
||||||
return buf.String()
|
return buf.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type bufWriter struct {
|
||||||
|
buf []byte
|
||||||
|
offset int
|
||||||
|
batchSize int
|
||||||
|
conn net.Conn
|
||||||
|
err error
|
||||||
|
|
||||||
|
onFlush func()
|
||||||
|
}
|
||||||
|
|
||||||
|
func newBufWriter(conn net.Conn, batchSize int) *bufWriter {
|
||||||
|
return &bufWriter{
|
||||||
|
buf: make([]byte, batchSize*2),
|
||||||
|
batchSize: batchSize,
|
||||||
|
conn: conn,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *bufWriter) Write(b []byte) (n int, err error) {
|
||||||
|
if w.err != nil {
|
||||||
|
return 0, w.err
|
||||||
|
}
|
||||||
|
n = copy(w.buf[w.offset:], b)
|
||||||
|
w.offset += n
|
||||||
|
if w.offset >= w.batchSize {
|
||||||
|
err = w.Flush()
|
||||||
|
}
|
||||||
|
return n, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *bufWriter) Flush() error {
|
||||||
|
if w.err != nil {
|
||||||
|
return w.err
|
||||||
|
}
|
||||||
|
if w.offset == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if w.onFlush != nil {
|
||||||
|
w.onFlush()
|
||||||
|
}
|
||||||
|
_, w.err = w.conn.Write(w.buf[:w.offset])
|
||||||
|
w.offset = 0
|
||||||
|
return w.err
|
||||||
|
}
|
||||||
|
|
||||||
type framer struct {
|
type framer struct {
|
||||||
numWriters int32
|
writer *bufWriter
|
||||||
reader io.Reader
|
|
||||||
writer *bufio.Writer
|
|
||||||
fr *http2.Framer
|
fr *http2.Framer
|
||||||
}
|
}
|
||||||
|
|
||||||
func newFramer(conn net.Conn, writeBufferSize, readBufferSize int) *framer {
|
func newFramer(conn net.Conn, writeBufferSize, readBufferSize int) *framer {
|
||||||
|
r := bufio.NewReaderSize(conn, readBufferSize)
|
||||||
|
w := newBufWriter(conn, writeBufferSize)
|
||||||
f := &framer{
|
f := &framer{
|
||||||
reader: bufio.NewReaderSize(conn, readBufferSize),
|
writer: w,
|
||||||
writer: bufio.NewWriterSize(conn, writeBufferSize),
|
fr: http2.NewFramer(w, r),
|
||||||
}
|
}
|
||||||
f.fr = http2.NewFramer(f.writer, f.reader)
|
|
||||||
// Opt-in to Frame reuse API on framer to reduce garbage.
|
// Opt-in to Frame reuse API on framer to reduce garbage.
|
||||||
// Frames aren't safe to read from after a subsequent call to ReadFrame.
|
// Frames aren't safe to read from after a subsequent call to ReadFrame.
|
||||||
f.fr.SetReuseFrames()
|
f.fr.SetReuseFrames()
|
||||||
|
|
|
||||||
|
|
@ -19,16 +19,17 @@
|
||||||
// Package transport defines and implements message oriented communication
|
// Package transport defines and implements message oriented communication
|
||||||
// channel to complete various transactions (e.g., an RPC). It is meant for
|
// channel to complete various transactions (e.g., an RPC). It is meant for
|
||||||
// grpc-internal usage and is not intended to be imported directly by users.
|
// grpc-internal usage and is not intended to be imported directly by users.
|
||||||
package transport // import "google.golang.org/grpc/transport"
|
package transport // externally used as import "google.golang.org/grpc/transport"
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"net"
|
"net"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
"golang.org/x/net/http2"
|
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
"google.golang.org/grpc/credentials"
|
"google.golang.org/grpc/credentials"
|
||||||
"google.golang.org/grpc/keepalive"
|
"google.golang.org/grpc/keepalive"
|
||||||
|
|
@ -57,6 +58,7 @@ type recvBuffer struct {
|
||||||
c chan recvMsg
|
c chan recvMsg
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
backlog []recvMsg
|
backlog []recvMsg
|
||||||
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
func newRecvBuffer() *recvBuffer {
|
func newRecvBuffer() *recvBuffer {
|
||||||
|
|
@ -68,6 +70,13 @@ func newRecvBuffer() *recvBuffer {
|
||||||
|
|
||||||
func (b *recvBuffer) put(r recvMsg) {
|
func (b *recvBuffer) put(r recvMsg) {
|
||||||
b.mu.Lock()
|
b.mu.Lock()
|
||||||
|
if b.err != nil {
|
||||||
|
b.mu.Unlock()
|
||||||
|
// An error had occurred earlier, don't accept more
|
||||||
|
// data or errors.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
b.err = r.err
|
||||||
if len(b.backlog) == 0 {
|
if len(b.backlog) == 0 {
|
||||||
select {
|
select {
|
||||||
case b.c <- r:
|
case b.c <- r:
|
||||||
|
|
@ -101,11 +110,12 @@ func (b *recvBuffer) get() <-chan recvMsg {
|
||||||
return b.c
|
return b.c
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//
|
||||||
// recvBufferReader implements io.Reader interface to read the data from
|
// recvBufferReader implements io.Reader interface to read the data from
|
||||||
// recvBuffer.
|
// recvBuffer.
|
||||||
type recvBufferReader struct {
|
type recvBufferReader struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
goAway chan struct{}
|
ctxDone <-chan struct{} // cache of ctx.Done() (for performance).
|
||||||
recv *recvBuffer
|
recv *recvBuffer
|
||||||
last []byte // Stores the remaining data in the previous calls.
|
last []byte // Stores the remaining data in the previous calls.
|
||||||
err error
|
err error
|
||||||
|
|
@ -130,10 +140,8 @@ func (r *recvBufferReader) read(p []byte) (n int, err error) {
|
||||||
return copied, nil
|
return copied, nil
|
||||||
}
|
}
|
||||||
select {
|
select {
|
||||||
case <-r.ctx.Done():
|
case <-r.ctxDone:
|
||||||
return 0, ContextErr(r.ctx.Err())
|
return 0, ContextErr(r.ctx.Err())
|
||||||
case <-r.goAway:
|
|
||||||
return 0, errStreamDrain
|
|
||||||
case m := <-r.recv.get():
|
case m := <-r.recv.get():
|
||||||
r.recv.load()
|
r.recv.load()
|
||||||
if m.err != nil {
|
if m.err != nil {
|
||||||
|
|
@ -145,61 +153,7 @@ func (r *recvBufferReader) read(p []byte) (n int, err error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// All items in an out of a controlBuffer should be the same type.
|
type streamState uint32
|
||||||
type item interface {
|
|
||||||
item()
|
|
||||||
}
|
|
||||||
|
|
||||||
// controlBuffer is an unbounded channel of item.
|
|
||||||
type controlBuffer struct {
|
|
||||||
c chan item
|
|
||||||
mu sync.Mutex
|
|
||||||
backlog []item
|
|
||||||
}
|
|
||||||
|
|
||||||
func newControlBuffer() *controlBuffer {
|
|
||||||
b := &controlBuffer{
|
|
||||||
c: make(chan item, 1),
|
|
||||||
}
|
|
||||||
return b
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *controlBuffer) put(r item) {
|
|
||||||
b.mu.Lock()
|
|
||||||
if len(b.backlog) == 0 {
|
|
||||||
select {
|
|
||||||
case b.c <- r:
|
|
||||||
b.mu.Unlock()
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
b.backlog = append(b.backlog, r)
|
|
||||||
b.mu.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (b *controlBuffer) load() {
|
|
||||||
b.mu.Lock()
|
|
||||||
if len(b.backlog) > 0 {
|
|
||||||
select {
|
|
||||||
case b.c <- b.backlog[0]:
|
|
||||||
b.backlog[0] = nil
|
|
||||||
b.backlog = b.backlog[1:]
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
b.mu.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// get returns the channel that receives an item in the buffer.
|
|
||||||
//
|
|
||||||
// Upon receipt of an item, the caller should call load to send another
|
|
||||||
// item onto the channel if there is any.
|
|
||||||
func (b *controlBuffer) get() <-chan item {
|
|
||||||
return b.c
|
|
||||||
}
|
|
||||||
|
|
||||||
type streamState uint8
|
|
||||||
|
|
||||||
const (
|
const (
|
||||||
streamActive streamState = iota
|
streamActive streamState = iota
|
||||||
|
|
@ -214,8 +168,8 @@ type Stream struct {
|
||||||
st ServerTransport // nil for client side Stream
|
st ServerTransport // nil for client side Stream
|
||||||
ctx context.Context // the associated context of the stream
|
ctx context.Context // the associated context of the stream
|
||||||
cancel context.CancelFunc // always nil for client side Stream
|
cancel context.CancelFunc // always nil for client side Stream
|
||||||
done chan struct{} // closed when the final status arrives
|
done chan struct{} // closed at the end of stream to unblock writers. On the client side.
|
||||||
goAway chan struct{} // closed when a GOAWAY control message is received
|
ctxDone <-chan struct{} // same as done chan but for server side. Cache of ctx.Done() (for performance)
|
||||||
method string // the associated RPC method of the stream
|
method string // the associated RPC method of the stream
|
||||||
recvCompress string
|
recvCompress string
|
||||||
sendCompress string
|
sendCompress string
|
||||||
|
|
@ -223,47 +177,51 @@ type Stream struct {
|
||||||
trReader io.Reader
|
trReader io.Reader
|
||||||
fc *inFlow
|
fc *inFlow
|
||||||
recvQuota uint32
|
recvQuota uint32
|
||||||
waiters waiters
|
wq *writeQuota
|
||||||
|
|
||||||
// Callback to state application's intentions to read data. This
|
// Callback to state application's intentions to read data. This
|
||||||
// is used to adjust flow control, if needed.
|
// is used to adjust flow control, if needed.
|
||||||
requestRead func(int)
|
requestRead func(int)
|
||||||
|
|
||||||
sendQuotaPool *quotaPool
|
|
||||||
headerChan chan struct{} // closed to indicate the end of header metadata.
|
headerChan chan struct{} // closed to indicate the end of header metadata.
|
||||||
headerDone bool // set when headerChan is closed. Used to avoid closing headerChan multiple times.
|
headerDone uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
|
||||||
header metadata.MD // the received header metadata.
|
header metadata.MD // the received header metadata.
|
||||||
trailer metadata.MD // the key-value map of trailer metadata.
|
trailer metadata.MD // the key-value map of trailer metadata.
|
||||||
|
|
||||||
mu sync.RWMutex // guard the following
|
|
||||||
headerOk bool // becomes true from the first header is about to send
|
headerOk bool // becomes true from the first header is about to send
|
||||||
state streamState
|
state streamState
|
||||||
|
|
||||||
status *status.Status // the status error received from the server
|
status *status.Status // the status error received from the server
|
||||||
|
|
||||||
rstStream bool // indicates whether a RST_STREAM frame needs to be sent
|
bytesReceived uint32 // indicates whether any bytes have been received on this stream
|
||||||
rstError http2.ErrCode // the error that needs to be sent along with the RST_STREAM frame
|
unprocessed uint32 // set if the server sends a refused stream or GOAWAY including this stream
|
||||||
|
|
||||||
bytesReceived bool // indicates whether any bytes have been received on this stream
|
|
||||||
unprocessed bool // set if the server sends a refused stream or GOAWAY including this stream
|
|
||||||
|
|
||||||
// contentSubtype is the content-subtype for requests.
|
// contentSubtype is the content-subtype for requests.
|
||||||
// this must be lowercase or the behavior is undefined.
|
// this must be lowercase or the behavior is undefined.
|
||||||
contentSubtype string
|
contentSubtype string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Stream) swapState(st streamState) streamState {
|
||||||
|
return streamState(atomic.SwapUint32((*uint32)(&s.state), uint32(st)))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Stream) compareAndSwapState(oldState, newState streamState) bool {
|
||||||
|
return atomic.CompareAndSwapUint32((*uint32)(&s.state), uint32(oldState), uint32(newState))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Stream) getState() streamState {
|
||||||
|
return streamState(atomic.LoadUint32((*uint32)(&s.state)))
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Stream) waitOnHeader() error {
|
func (s *Stream) waitOnHeader() error {
|
||||||
if s.headerChan == nil {
|
if s.headerChan == nil {
|
||||||
// On the server headerChan is always nil since a stream originates
|
// On the server headerChan is always nil since a stream originates
|
||||||
// only after having received headers.
|
// only after having received headers.
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
wc := s.waiters
|
|
||||||
select {
|
select {
|
||||||
case <-wc.ctx.Done():
|
case <-s.ctx.Done():
|
||||||
return ContextErr(wc.ctx.Err())
|
return ContextErr(s.ctx.Err())
|
||||||
case <-wc.goAway:
|
|
||||||
return errStreamDrain
|
|
||||||
case <-s.headerChan:
|
case <-s.headerChan:
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
@ -289,12 +247,6 @@ func (s *Stream) Done() <-chan struct{} {
|
||||||
return s.done
|
return s.done
|
||||||
}
|
}
|
||||||
|
|
||||||
// GoAway returns a channel which is closed when the server sent GoAways signal
|
|
||||||
// before this stream was initiated.
|
|
||||||
func (s *Stream) GoAway() <-chan struct{} {
|
|
||||||
return s.goAway
|
|
||||||
}
|
|
||||||
|
|
||||||
// Header acquires the key-value pairs of header metadata once it
|
// Header acquires the key-value pairs of header metadata once it
|
||||||
// is available. It blocks until i) the metadata is ready or ii) there is no
|
// is available. It blocks until i) the metadata is ready or ii) there is no
|
||||||
// header metadata or iii) the stream is canceled/expired.
|
// header metadata or iii) the stream is canceled/expired.
|
||||||
|
|
@ -303,6 +255,9 @@ func (s *Stream) Header() (metadata.MD, error) {
|
||||||
// Even if the stream is closed, header is returned if available.
|
// Even if the stream is closed, header is returned if available.
|
||||||
select {
|
select {
|
||||||
case <-s.headerChan:
|
case <-s.headerChan:
|
||||||
|
if s.header == nil {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
return s.header.Copy(), nil
|
return s.header.Copy(), nil
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
@ -312,10 +267,10 @@ func (s *Stream) Header() (metadata.MD, error) {
|
||||||
// Trailer returns the cached trailer metedata. Note that if it is not called
|
// Trailer returns the cached trailer metedata. Note that if it is not called
|
||||||
// after the entire stream is done, it could return an empty MD. Client
|
// after the entire stream is done, it could return an empty MD. Client
|
||||||
// side only.
|
// side only.
|
||||||
|
// It can be safely read only after stream has ended that is either read
|
||||||
|
// or write have returned io.EOF.
|
||||||
func (s *Stream) Trailer() metadata.MD {
|
func (s *Stream) Trailer() metadata.MD {
|
||||||
s.mu.RLock()
|
|
||||||
c := s.trailer.Copy()
|
c := s.trailer.Copy()
|
||||||
s.mu.RUnlock()
|
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -345,24 +300,23 @@ func (s *Stream) Method() string {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Status returns the status received from the server.
|
// Status returns the status received from the server.
|
||||||
|
// Status can be read safely only after the stream has ended,
|
||||||
|
// that is, read or write has returned io.EOF.
|
||||||
func (s *Stream) Status() *status.Status {
|
func (s *Stream) Status() *status.Status {
|
||||||
return s.status
|
return s.status
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetHeader sets the header metadata. This can be called multiple times.
|
// SetHeader sets the header metadata. This can be called multiple times.
|
||||||
// Server side only.
|
// Server side only.
|
||||||
|
// This should not be called in parallel to other data writes.
|
||||||
func (s *Stream) SetHeader(md metadata.MD) error {
|
func (s *Stream) SetHeader(md metadata.MD) error {
|
||||||
s.mu.Lock()
|
|
||||||
if s.headerOk || s.state == streamDone {
|
|
||||||
s.mu.Unlock()
|
|
||||||
return ErrIllegalHeaderWrite
|
|
||||||
}
|
|
||||||
if md.Len() == 0 {
|
if md.Len() == 0 {
|
||||||
s.mu.Unlock()
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
if s.headerOk || atomic.LoadUint32((*uint32)(&s.state)) == uint32(streamDone) {
|
||||||
|
return ErrIllegalHeaderWrite
|
||||||
|
}
|
||||||
s.header = metadata.Join(s.header, md)
|
s.header = metadata.Join(s.header, md)
|
||||||
s.mu.Unlock()
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -376,13 +330,12 @@ func (s *Stream) SendHeader(md metadata.MD) error {
|
||||||
|
|
||||||
// SetTrailer sets the trailer metadata which will be sent with the RPC status
|
// SetTrailer sets the trailer metadata which will be sent with the RPC status
|
||||||
// by the server. This can be called multiple times. Server side only.
|
// by the server. This can be called multiple times. Server side only.
|
||||||
|
// This should not be called parallel to other data writes.
|
||||||
func (s *Stream) SetTrailer(md metadata.MD) error {
|
func (s *Stream) SetTrailer(md metadata.MD) error {
|
||||||
if md.Len() == 0 {
|
if md.Len() == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
s.mu.Lock()
|
|
||||||
s.trailer = metadata.Join(s.trailer, md)
|
s.trailer = metadata.Join(s.trailer, md)
|
||||||
s.mu.Unlock()
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -422,29 +375,15 @@ func (t *transportReader) Read(p []byte) (n int, err error) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// finish sets the stream's state and status, and closes the done channel.
|
|
||||||
// s.mu must be held by the caller. st must always be non-nil.
|
|
||||||
func (s *Stream) finish(st *status.Status) {
|
|
||||||
s.status = st
|
|
||||||
s.state = streamDone
|
|
||||||
close(s.done)
|
|
||||||
}
|
|
||||||
|
|
||||||
// BytesReceived indicates whether any bytes have been received on this stream.
|
// BytesReceived indicates whether any bytes have been received on this stream.
|
||||||
func (s *Stream) BytesReceived() bool {
|
func (s *Stream) BytesReceived() bool {
|
||||||
s.mu.Lock()
|
return atomic.LoadUint32(&s.bytesReceived) == 1
|
||||||
br := s.bytesReceived
|
|
||||||
s.mu.Unlock()
|
|
||||||
return br
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Unprocessed indicates whether the server did not process this stream --
|
// Unprocessed indicates whether the server did not process this stream --
|
||||||
// i.e. it sent a refused stream or GOAWAY including this stream ID.
|
// i.e. it sent a refused stream or GOAWAY including this stream ID.
|
||||||
func (s *Stream) Unprocessed() bool {
|
func (s *Stream) Unprocessed() bool {
|
||||||
s.mu.Lock()
|
return atomic.LoadUint32(&s.unprocessed) == 1
|
||||||
br := s.unprocessed
|
|
||||||
s.mu.Unlock()
|
|
||||||
return br
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// GoString is implemented by Stream so context.String() won't
|
// GoString is implemented by Stream so context.String() won't
|
||||||
|
|
@ -694,6 +633,9 @@ var (
|
||||||
// connection is draining. This could be caused by goaway or balancer
|
// connection is draining. This could be caused by goaway or balancer
|
||||||
// removing the address.
|
// removing the address.
|
||||||
errStreamDrain = streamErrorf(codes.Unavailable, "the connection is draining")
|
errStreamDrain = streamErrorf(codes.Unavailable, "the connection is draining")
|
||||||
|
// errStreamDone is returned from write at the client side to indiacte application
|
||||||
|
// layer of an error.
|
||||||
|
errStreamDone = errors.New("tne stream is done")
|
||||||
// StatusGoAway indicates that the server sent a GOAWAY that included this
|
// StatusGoAway indicates that the server sent a GOAWAY that included this
|
||||||
// stream's ID in unprocessed RPCs.
|
// stream's ID in unprocessed RPCs.
|
||||||
statusGoAway = status.New(codes.Unavailable, "the stream is rejected because server is draining the connection")
|
statusGoAway = status.New(codes.Unavailable, "the stream is rejected because server is draining the connection")
|
||||||
|
|
@ -711,15 +653,6 @@ func (e StreamError) Error() string {
|
||||||
return fmt.Sprintf("stream error: code = %s desc = %q", e.Code, e.Desc)
|
return fmt.Sprintf("stream error: code = %s desc = %q", e.Code, e.Desc)
|
||||||
}
|
}
|
||||||
|
|
||||||
// waiters are passed to quotaPool get methods to
|
|
||||||
// wait on in addition to waiting on quota.
|
|
||||||
type waiters struct {
|
|
||||||
ctx context.Context
|
|
||||||
tctx context.Context
|
|
||||||
done chan struct{}
|
|
||||||
goAway chan struct{}
|
|
||||||
}
|
|
||||||
|
|
||||||
// GoAwayReason contains the reason for the GoAway frame received.
|
// GoAwayReason contains the reason for the GoAway frame received.
|
||||||
type GoAwayReason uint8
|
type GoAwayReason uint8
|
||||||
|
|
||||||
|
|
@ -733,39 +666,3 @@ const (
|
||||||
// "too_many_pings".
|
// "too_many_pings".
|
||||||
GoAwayTooManyPings GoAwayReason = 2
|
GoAwayTooManyPings GoAwayReason = 2
|
||||||
)
|
)
|
||||||
|
|
||||||
// loopyWriter is run in a separate go routine. It is the single code path that will
|
|
||||||
// write data on wire.
|
|
||||||
func loopyWriter(ctx context.Context, cbuf *controlBuffer, handler func(item) error) {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case i := <-cbuf.get():
|
|
||||||
cbuf.load()
|
|
||||||
if err := handler(i); err != nil {
|
|
||||||
errorf("transport: Error while handling item. Err: %v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
}
|
|
||||||
hasData:
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case i := <-cbuf.get():
|
|
||||||
cbuf.load()
|
|
||||||
if err := handler(i); err != nil {
|
|
||||||
errorf("transport: Error while handling item. Err: %v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
if err := handler(&flushIO{}); err != nil {
|
|
||||||
errorf("transport: Error while flushing. Err: %v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
break hasData
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
||||||
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue