internal: remove TestingUseHandlerImpl (#2253)

This commit is contained in:
dfawley 2018-08-03 09:35:00 -07:00 committed by GitHub
parent 3b859c04c1
commit a344a35754
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 110 additions and 62 deletions

View File

@ -21,14 +21,6 @@
package internal
var (
// TestingUseHandlerImpl enables the http.Handler-based server implementation.
// It must be called before Serve and requires TLS credentials.
//
// The provided grpcServer must be of type *grpc.Server. It is untyped
// for circular dependency reasons.
TestingUseHandlerImpl func(grpcServer interface{})
// WithContextDialer is exported by clientconn.go
WithContextDialer interface{} // func(context.Context, string) (net.Conn, error) grpc.DialOption
// WithResolverBuilder is exported by clientconn.go

View File

@ -237,9 +237,9 @@ func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) erro
if ht.stats != nil {
ht.stats.HandleRPC(s.Context(), &stats.OutTrailer{})
}
ht.Close()
close(ht.writes)
}
ht.Close()
return err
}
@ -326,11 +326,11 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), trace
go func() {
select {
case <-requestOver:
return
case <-ht.closedCh:
case <-clientGone:
}
cancel()
ht.Close()
}()
req := ht.req
@ -442,5 +442,8 @@ func mapRecvMsgError(err error) error {
return status.Error(code, se.Error())
}
}
if strings.Contains(err.Error(), "body closed by handler") {
return status.Error(codes.Canceled, err.Error())
}
return connectionErrorf(true, err, err.Error())
}

View File

@ -35,7 +35,6 @@ import (
"io/ioutil"
"golang.org/x/net/context"
"golang.org/x/net/http2"
"golang.org/x/net/trace"
"google.golang.org/grpc/codes"
@ -43,7 +42,6 @@ import (
"google.golang.org/grpc/encoding"
"google.golang.org/grpc/encoding/proto"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/keepalive"
@ -126,7 +124,6 @@ type options struct {
maxConcurrentStreams uint32
maxReceiveMessageSize int
maxSendMessageSize int
useHandlerImpl bool // use http.Handler-based server
unknownStreamDesc *StreamDesc
keepaliveParams keepalive.ServerParameters
keepalivePolicy keepalive.EnforcementPolicy
@ -635,27 +632,19 @@ func (s *Server) handleRawConn(rawConn net.Conn) {
}
s.mu.Unlock()
var serve func()
c := conn.(io.Closer)
if s.opts.useHandlerImpl {
serve = func() { s.serveUsingHandler(conn) }
} else {
// Finish handshaking (HTTP2)
st := s.newHTTP2Transport(conn, authInfo)
if st == nil {
return
}
c = st
serve = func() { s.serveStreams(st) }
// Finish handshaking (HTTP2)
st := s.newHTTP2Transport(conn, authInfo)
if st == nil {
return
}
rawConn.SetDeadline(time.Time{})
if !s.addConn(c) {
if !s.addConn(st) {
return
}
go func() {
serve()
s.removeConn(c)
s.serveStreams(st)
s.removeConn(st)
}()
}
@ -710,27 +699,6 @@ func (s *Server) serveStreams(st transport.ServerTransport) {
var _ http.Handler = (*Server)(nil)
// serveUsingHandler is called from handleRawConn when s is configured
// to handle requests via the http.Handler interface. It sets up a
// net/http.Server to handle the just-accepted conn. The http.Server
// is configured to route all incoming requests (all HTTP/2 streams)
// to ServeHTTP, which creates a new ServerTransport for each stream.
// serveUsingHandler blocks until conn closes.
//
// This codepath is only used when Server.TestingUseHandlerImpl has
// been configured. This lets the end2end tests exercise the ServeHTTP
// method as one of the environment types.
//
// conn is the *tls.Conn that's already been authenticated.
func (s *Server) serveUsingHandler(conn net.Conn) {
h2s := &http2.Server{
MaxConcurrentStreams: s.opts.maxConcurrentStreams,
}
h2s.ServeConn(conn, &http2.ServeConnOpts{
Handler: s,
})
}
// ServeHTTP implements the Go standard library's http.Handler
// interface by responding to the gRPC request r, by looking up
// the requested gRPC method in the gRPC server s.
@ -1413,12 +1381,6 @@ func (s *Server) GracefulStop() {
s.mu.Unlock()
}
func init() {
internal.TestingUseHandlerImpl = func(arg interface{}) {
arg.(*Server).opts.useHandlerImpl = true
}
}
// contentSubtype must be lowercase
// cannot return nil
func (s *Server) getCodec(contentSubtype string) baseCodec {

View File

@ -42,7 +42,7 @@ import (
func (te *test) startServers(ts testpb.TestServiceServer, num int) {
for i := 0; i < num; i++ {
te.startServer(ts)
te.srvs = append(te.srvs, te.srv)
te.srvs = append(te.srvs, te.srv.(*grpc.Server))
te.srvAddrs = append(te.srvAddrs, te.srvAddr)
te.srv = nil
te.srvAddr = ""

View File

@ -30,6 +30,7 @@ import (
"io"
"math"
"net"
"net/http"
"os"
"reflect"
"runtime"
@ -55,7 +56,6 @@ import (
"google.golang.org/grpc/health"
healthgrpc "google.golang.org/grpc/health/grpc_health_v1"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/leakcheck"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
@ -485,7 +485,7 @@ type test struct {
nonBlockingDial bool
// srv and srvAddr are set once startServer is called.
srv *grpc.Server
srv stopper
srvAddr string
// srvs and srvAddrs are set once startServers is called.
@ -496,6 +496,11 @@ type test struct {
restoreLogs func() // nil unless declareLogNoise is used
}
type stopper interface {
Stop()
GracefulStop()
}
func (te *test) tearDown() {
if te.cancel != nil {
te.cancel()
@ -603,9 +608,6 @@ func (te *test) listenAndServe(ts testpb.TestServiceServer, listen func(network,
}
s := grpc.NewServer(sopts...)
te.srv = s
if te.e.httpHandler {
internal.TestingUseHandlerImpl(s)
}
if te.healthServer != nil {
healthgrpc.RegisterHealthServer(s, te.healthServer)
}
@ -623,11 +625,100 @@ func (te *test) listenAndServe(ts testpb.TestServiceServer, listen func(network,
addr = "localhost:" + port
}
go s.Serve(lis)
te.srvAddr = addr
if te.e.httpHandler {
if te.e.security != "tls" {
te.t.Fatalf("unsupported environment settings")
}
cert, err := tls.LoadX509KeyPair(testdata.Path("server1.pem"), testdata.Path("server1.key"))
if err != nil {
te.t.Fatal("Error creating TLS certificate: ", err)
}
hs := &http.Server{
Handler: s,
}
err = http2.ConfigureServer(hs, &http2.Server{
MaxConcurrentStreams: te.maxStream,
})
if err != nil {
te.t.Fatal("error starting http2 server: ", err)
}
hs.TLSConfig.Certificates = []tls.Certificate{cert}
tlsListener := tls.NewListener(lis, hs.TLSConfig)
whs := &wrapHS{Listener: tlsListener, s: hs, conns: make(map[net.Conn]bool)}
te.srv = whs
go hs.Serve(whs)
return lis
}
go s.Serve(lis)
return lis
}
// TODO: delete wrapHS and wrapConn when Go1.6 and Go1.7 support are gone and
// call s.Close and s.Shutdown instead.
type wrapHS struct {
sync.Mutex
net.Listener
s *http.Server
conns map[net.Conn]bool
}
func (w *wrapHS) Accept() (net.Conn, error) {
c, err := w.Listener.Accept()
if err != nil {
return nil, err
}
w.Lock()
if w.conns == nil {
w.Unlock()
c.Close()
return nil, errors.New("connection after listener closed")
}
w.conns[&wrapConn{Conn: c, hs: w}] = true
w.Unlock()
return c, nil
}
func (w *wrapHS) Stop() {
w.Listener.Close()
w.Lock()
conns := w.conns
w.conns = nil
w.Unlock()
for c := range conns {
c.Close()
}
}
// Poll for now..
func (w *wrapHS) GracefulStop() {
w.Listener.Close()
for {
w.Lock()
l := len(w.conns)
w.Unlock()
if l == 0 {
return
}
time.Sleep(50 * time.Millisecond)
}
}
type wrapConn struct {
net.Conn
hs *wrapHS
}
func (w *wrapConn) Close() error {
w.hs.Lock()
delete(w.hs.conns, w.Conn)
w.hs.Unlock()
return w.Conn.Close()
}
func (te *test) startServerWithConnControl(ts testpb.TestServiceServer) *listenerWrapper {
l := te.listenAndServe(ts, listenWithConnControl)
return l.(*listenerWrapper)