xds: Accept a net.Listener in Serve(). (#4052)

This commit is contained in:
Easwar Swaminathan 2020-11-20 12:42:06 -08:00 committed by GitHub
parent 83af853381
commit 8736dcd05b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 62 additions and 162 deletions

View File

@ -92,15 +92,14 @@ func (s) TestServerSideXDS(t *testing.T) {
testpb.RegisterTestServiceServer(server, &testService{})
defer server.Stop()
localAddr, err := testutils.AvailableHostPort()
lis, err := testutils.LocalTCPListener()
if err != nil {
t.Fatalf("testutils.AvailableHostPort() failed: %v", err)
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
}
go func() {
opts := xds.ServeOptions{Address: localAddr}
if err := server.Serve(opts); err != nil {
t.Errorf("Serve(%+v) failed: %v", opts, err)
if err := server.Serve(lis); err != nil {
t.Errorf("Serve() failed: %v", err)
}
}()
@ -108,7 +107,7 @@ func (s) TestServerSideXDS(t *testing.T) {
go func() {
listener := &v3listenerpb.Listener{
// This needs to match the name we are querying for.
Name: fmt.Sprintf("grpc/server?udpa.resource.listening_address=%s", localAddr),
Name: fmt.Sprintf("grpc/server?udpa.resource.listening_address=%s", lis.Addr().String()),
ApiListener: &v3listenerpb.ApiListener{
ApiListener: &anypb.Any{
TypeUrl: version.V2HTTPConnManagerURL,
@ -138,7 +137,7 @@ func (s) TestServerSideXDS(t *testing.T) {
}()
// Create a ClientConn and make a successful RPC.
cc, err := grpc.DialContext(ctx, localAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
cc, err := grpc.DialContext(ctx, lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}

View File

@ -1,40 +0,0 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package testutils
import "net"
// AvailableHostPort returns a local address to listen on. This will be of the
// form "host:port", where the host will be a literal IP address, and port
// must be a literal port number. If the host is a literal IPv6 address it
// will be enclosed in square brackets, as in "[2001:db8::1]:80.
//
// This is useful for tests which need to call the Serve() method on
// xds.GRPCServer which needs to be passed an IP:Port to listen on, where the IP
// must be a literal IP and not localhost. This approach will work on support
// one or both of IPv4 or IPv6.
func AvailableHostPort() (string, error) {
l, err := net.Listen("tcp", "localhost:0")
if err != nil {
return "", err
}
addr := l.Addr().String()
l.Close()
return addr, nil
}

View File

@ -0,0 +1,26 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package testutils
import "net"
// LocalTCPListener returns a net.Listener listening on local address and port.
func LocalTCPListener() (net.Listener, error) {
return net.Listen("tcp", ":0")
}

View File

@ -22,7 +22,6 @@ import (
"context"
"fmt"
"net"
"strconv"
"sync"
"google.golang.org/grpc"
@ -75,34 +74,6 @@ type grpcServerInterface interface {
GracefulStop()
}
// ServeOptions contains parameters to configure the Serve() method.
//
// Experimental
//
// Notice: This type is EXPERIMENTAL and may be changed or removed in a
// later release.
type ServeOptions struct {
// Address contains the local address to listen on. This should be of the
// form "host:port", where the host must be a literal IP address, and port
// must be a literal port number. If the host is a literal IPv6 address it
// must be enclosed in square brackets, as in "[2001:db8::1]:80.
Address string
}
func (so *ServeOptions) validate() error {
addr, port, err := net.SplitHostPort(so.Address)
if err != nil {
return fmt.Errorf("xds: unsupported address %q for server listener", so.Address)
}
if net.ParseIP(addr) == nil {
return fmt.Errorf("xds: failed to parse %q as a valid literal IP address", addr)
}
if _, err := strconv.Atoi(port); err != nil {
return fmt.Errorf("%q is not a valid listener port", port)
}
return nil
}
// GRPCServer wraps a gRPC server and provides server-side xDS functionality, by
// communication with a management server using xDS APIs. It implements the
// grpc.ServiceRegistrar interface and can be passed to service registration
@ -173,16 +144,16 @@ func (s *GRPCServer) initXDSClient() error {
}
// Serve gets the underlying gRPC server to accept incoming connections on the
// listening address in opts. A connection to the management server, to receive
// xDS configuration, is initiated here.
// listener lis, which is expected to be listening on a TCP port.
//
// A connection to the management server, to receive xDS configuration, is
// initiated here.
//
// Serve will return a non-nil error unless Stop or GracefulStop is called.
func (s *GRPCServer) Serve(opts ServeOptions) error {
s.logger.Infof("Serve() called with options: %+v", opts)
// Validate the listening address in opts.
if err := opts.validate(); err != nil {
return err
func (s *GRPCServer) Serve(lis net.Listener) error {
s.logger.Infof("Serve() passed a net.Listener on %s", lis.Addr().String())
if _, ok := lis.Addr().(*net.TCPAddr); !ok {
return fmt.Errorf("xds: GRPCServer expects listener to return a net.TCPAddr. Got %T", lis.Addr())
}
// If this is the first time Serve() is being called, we need to initialize
@ -190,7 +161,7 @@ func (s *GRPCServer) Serve(opts ServeOptions) error {
if err := s.initXDSClient(); err != nil {
return err
}
lw, err := s.newListenerWrapper(opts)
lw, err := s.newListenerWrapper(lis)
if lw == nil {
// Error returned can be nil (when Stop/GracefulStop() is called). So,
// we need to check the returned listenerWrapper instead.
@ -199,20 +170,15 @@ func (s *GRPCServer) Serve(opts ServeOptions) error {
return s.gs.Serve(lw)
}
// newListenerWrapper starts a net.Listener on the address specified in opts. It
// then registers a watch for a Listener resource and blocks until a good
// newListenerWrapper creates and returns a listenerWrapper, which is a thin
// wrapper around the passed in listener lis, that can be passed to
// grpcServer.Serve().
//
// It then registers a watch for a Listener resource and blocks until a good
// response is received or the server is stopped by a call to
// Stop/GracefulStop().
//
// Returns a listenerWrapper, which implements the net.Listener interface, that
// can be passed to grpcServer.Serve().
func (s *GRPCServer) newListenerWrapper(opts ServeOptions) (*listenerWrapper, error) {
lis, err := net.Listen("tcp", opts.Address)
if err != nil {
return nil, fmt.Errorf("xds: failed to listen on %+v: %v", opts, err)
}
func (s *GRPCServer) newListenerWrapper(lis net.Listener) (*listenerWrapper, error) {
lw := &listenerWrapper{Listener: lis}
s.logger.Infof("Started a net.Listener on %s", lis.Addr().String())
// This is used to notify that a good update has been received and that
// Serve() can be invoked on the underlying gRPC server. Using a
@ -224,7 +190,7 @@ func (s *GRPCServer) newListenerWrapper(opts ServeOptions) (*listenerWrapper, er
// Register an LDS watch using our xdsClient, and specify the listening
// address as the resource name.
// TODO(easwars): Check if literal IPv6 addresses need an enclosing [].
name := fmt.Sprintf(listenerResourceNameFormat, opts.Address)
name := fmt.Sprintf(listenerResourceNameFormat, lis.Addr().String())
cancelWatch := s.xdsC.WatchListener(name, func(update xdsclient.ListenerUpdate, err error) {
if err != nil {
// We simply log an error here and hope we get a successful update

View File

@ -50,57 +50,6 @@ func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}
func (s) TestServeOptions_Validate(t *testing.T) {
tests := []struct {
desc string
opts ServeOptions
wantErr bool
}{
{
desc: "empty options",
opts: ServeOptions{},
wantErr: true,
},
{
desc: "bad address",
opts: ServeOptions{Address: "I'm a bad IP address"},
wantErr: true,
},
{
desc: "no port",
opts: ServeOptions{Address: "1.2.3.4"},
wantErr: true,
},
{
desc: "empty hostname",
opts: ServeOptions{Address: ":1234"},
wantErr: true,
},
{
desc: "localhost",
opts: ServeOptions{Address: "localhost:1234"},
wantErr: true,
},
{
desc: "ipv4",
opts: ServeOptions{Address: "1.2.3.4:1234"},
},
{
desc: "ipv6",
opts: ServeOptions{Address: "[1:2::3:4]:1234"},
},
}
for _, test := range tests {
t.Run(test.desc, func(t *testing.T) {
err := test.opts.validate()
if (err != nil) != test.wantErr {
t.Errorf("ServeOptions.validate(%+v) returned err %v, wantErr: %v", test.opts, err, test.wantErr)
}
})
}
}
type fakeGRPCServer struct {
done chan struct{}
registerServiceCh *testutils.Channel
@ -229,15 +178,15 @@ func (s) TestServeSuccess(t *testing.T) {
server := NewGRPCServer()
defer server.Stop()
localAddr, err := xdstestutils.AvailableHostPort()
lis, err := xdstestutils.LocalTCPListener()
if err != nil {
t.Fatalf("testutils.AvailableHostPort() failed: %v", err)
t.Fatalf("xdstestutils.LocalTCPListener() failed: %v", err)
}
// Call Serve() in a goroutine, and push on a channel when Serve returns.
serveDone := testutils.NewChannel()
go func() {
if err := server.Serve(ServeOptions{Address: localAddr}); err != nil {
if err := server.Serve(lis); err != nil {
t.Error(err)
}
serveDone.Send(nil)
@ -257,7 +206,7 @@ func (s) TestServeSuccess(t *testing.T) {
if err != nil {
t.Fatalf("error when waiting for a ListenerWatch: %v", err)
}
wantName := fmt.Sprintf("grpc/server?udpa.resource.listening_address=%s", localAddr)
wantName := fmt.Sprintf("grpc/server?udpa.resource.listening_address=%s", lis.Addr().String())
if name != wantName {
t.Fatalf("LDS watch registered for name %q, want %q", name, wantName)
}
@ -290,15 +239,15 @@ func (s) TestServeWithStop(t *testing.T) {
// it after the LDS watch has been registered.
server := NewGRPCServer()
localAddr, err := xdstestutils.AvailableHostPort()
lis, err := xdstestutils.LocalTCPListener()
if err != nil {
t.Fatalf("testutils.AvailableHostPort() failed: %v", err)
t.Fatalf("xdstestutils.LocalTCPListener() failed: %v", err)
}
// Call Serve() in a goroutine, and push on a channel when Serve returns.
serveDone := testutils.NewChannel()
go func() {
if err := server.Serve(ServeOptions{Address: localAddr}); err != nil {
if err := server.Serve(lis); err != nil {
t.Error(err)
}
serveDone.Send(nil)
@ -319,7 +268,7 @@ func (s) TestServeWithStop(t *testing.T) {
server.Stop()
t.Fatalf("error when waiting for a ListenerWatch: %v", err)
}
wantName := fmt.Sprintf("grpc/server?udpa.resource.listening_address=%s", localAddr)
wantName := fmt.Sprintf("grpc/server?udpa.resource.listening_address=%s", lis.Addr().String())
if name != wantName {
server.Stop()
t.Fatalf("LDS watch registered for name %q, wantPrefix %q", name, wantName)
@ -349,14 +298,14 @@ func (s) TestServeBootstrapFailure(t *testing.T) {
server := NewGRPCServer()
defer server.Stop()
localAddr, err := xdstestutils.AvailableHostPort()
lis, err := xdstestutils.LocalTCPListener()
if err != nil {
t.Fatalf("testutils.AvailableHostPort() failed: %v", err)
t.Fatalf("xdstestutils.LocalTCPListener() failed: %v", err)
}
serveDone := testutils.NewChannel()
go func() {
err := server.Serve(ServeOptions{Address: localAddr})
err := server.Serve(lis)
serveDone.Send(err)
}()
@ -393,14 +342,14 @@ func (s) TestServeNewClientFailure(t *testing.T) {
server := NewGRPCServer()
defer server.Stop()
localAddr, err := xdstestutils.AvailableHostPort()
lis, err := xdstestutils.LocalTCPListener()
if err != nil {
t.Fatalf("testutils.AvailableHostPort() failed: %v", err)
t.Fatalf("xdstestutils.LocalTCPListener() failed: %v", err)
}
serveDone := testutils.NewChannel()
go func() {
err := server.Serve(ServeOptions{Address: localAddr})
err := server.Serve(lis)
serveDone.Send(err)
}()