diff --git a/xds/internal/test/xds_server_integration_test.go b/xds/internal/test/xds_server_integration_test.go index 294ded07a..a934067a4 100644 --- a/xds/internal/test/xds_server_integration_test.go +++ b/xds/internal/test/xds_server_integration_test.go @@ -92,15 +92,14 @@ func (s) TestServerSideXDS(t *testing.T) { testpb.RegisterTestServiceServer(server, &testService{}) defer server.Stop() - localAddr, err := testutils.AvailableHostPort() + lis, err := testutils.LocalTCPListener() if err != nil { - t.Fatalf("testutils.AvailableHostPort() failed: %v", err) + t.Fatalf("testutils.LocalTCPListener() failed: %v", err) } go func() { - opts := xds.ServeOptions{Address: localAddr} - if err := server.Serve(opts); err != nil { - t.Errorf("Serve(%+v) failed: %v", opts, err) + if err := server.Serve(lis); err != nil { + t.Errorf("Serve() failed: %v", err) } }() @@ -108,7 +107,7 @@ func (s) TestServerSideXDS(t *testing.T) { go func() { listener := &v3listenerpb.Listener{ // This needs to match the name we are querying for. - Name: fmt.Sprintf("grpc/server?udpa.resource.listening_address=%s", localAddr), + Name: fmt.Sprintf("grpc/server?udpa.resource.listening_address=%s", lis.Addr().String()), ApiListener: &v3listenerpb.ApiListener{ ApiListener: &anypb.Any{ TypeUrl: version.V2HTTPConnManagerURL, @@ -138,7 +137,7 @@ func (s) TestServerSideXDS(t *testing.T) { }() // Create a ClientConn and make a successful RPC. - cc, err := grpc.DialContext(ctx, localAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) + cc, err := grpc.DialContext(ctx, lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { t.Fatalf("failed to dial local test server: %v", err) } diff --git a/xds/internal/testutils/local_address.go b/xds/internal/testutils/local_address.go deleted file mode 100644 index 5ada73682..000000000 --- a/xds/internal/testutils/local_address.go +++ /dev/null @@ -1,40 +0,0 @@ -/* - * - * Copyright 2020 gRPC authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package testutils - -import "net" - -// AvailableHostPort returns a local address to listen on. This will be of the -// form "host:port", where the host will be a literal IP address, and port -// must be a literal port number. If the host is a literal IPv6 address it -// will be enclosed in square brackets, as in "[2001:db8::1]:80. -// -// This is useful for tests which need to call the Serve() method on -// xds.GRPCServer which needs to be passed an IP:Port to listen on, where the IP -// must be a literal IP and not localhost. This approach will work on support -// one or both of IPv4 or IPv6. -func AvailableHostPort() (string, error) { - l, err := net.Listen("tcp", "localhost:0") - if err != nil { - return "", err - } - addr := l.Addr().String() - l.Close() - return addr, nil -} diff --git a/xds/internal/testutils/local_listener.go b/xds/internal/testutils/local_listener.go new file mode 100644 index 000000000..7da1bc1ce --- /dev/null +++ b/xds/internal/testutils/local_listener.go @@ -0,0 +1,26 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package testutils + +import "net" + +// LocalTCPListener returns a net.Listener listening on local address and port. +func LocalTCPListener() (net.Listener, error) { + return net.Listen("tcp", ":0") +} diff --git a/xds/server.go b/xds/server.go index 51acf08a8..fc499848b 100644 --- a/xds/server.go +++ b/xds/server.go @@ -22,7 +22,6 @@ import ( "context" "fmt" "net" - "strconv" "sync" "google.golang.org/grpc" @@ -75,34 +74,6 @@ type grpcServerInterface interface { GracefulStop() } -// ServeOptions contains parameters to configure the Serve() method. -// -// Experimental -// -// Notice: This type is EXPERIMENTAL and may be changed or removed in a -// later release. -type ServeOptions struct { - // Address contains the local address to listen on. This should be of the - // form "host:port", where the host must be a literal IP address, and port - // must be a literal port number. If the host is a literal IPv6 address it - // must be enclosed in square brackets, as in "[2001:db8::1]:80. - Address string -} - -func (so *ServeOptions) validate() error { - addr, port, err := net.SplitHostPort(so.Address) - if err != nil { - return fmt.Errorf("xds: unsupported address %q for server listener", so.Address) - } - if net.ParseIP(addr) == nil { - return fmt.Errorf("xds: failed to parse %q as a valid literal IP address", addr) - } - if _, err := strconv.Atoi(port); err != nil { - return fmt.Errorf("%q is not a valid listener port", port) - } - return nil -} - // GRPCServer wraps a gRPC server and provides server-side xDS functionality, by // communication with a management server using xDS APIs. It implements the // grpc.ServiceRegistrar interface and can be passed to service registration @@ -173,16 +144,16 @@ func (s *GRPCServer) initXDSClient() error { } // Serve gets the underlying gRPC server to accept incoming connections on the -// listening address in opts. A connection to the management server, to receive -// xDS configuration, is initiated here. +// listener lis, which is expected to be listening on a TCP port. +// +// A connection to the management server, to receive xDS configuration, is +// initiated here. // // Serve will return a non-nil error unless Stop or GracefulStop is called. -func (s *GRPCServer) Serve(opts ServeOptions) error { - s.logger.Infof("Serve() called with options: %+v", opts) - - // Validate the listening address in opts. - if err := opts.validate(); err != nil { - return err +func (s *GRPCServer) Serve(lis net.Listener) error { + s.logger.Infof("Serve() passed a net.Listener on %s", lis.Addr().String()) + if _, ok := lis.Addr().(*net.TCPAddr); !ok { + return fmt.Errorf("xds: GRPCServer expects listener to return a net.TCPAddr. Got %T", lis.Addr()) } // If this is the first time Serve() is being called, we need to initialize @@ -190,7 +161,7 @@ func (s *GRPCServer) Serve(opts ServeOptions) error { if err := s.initXDSClient(); err != nil { return err } - lw, err := s.newListenerWrapper(opts) + lw, err := s.newListenerWrapper(lis) if lw == nil { // Error returned can be nil (when Stop/GracefulStop() is called). So, // we need to check the returned listenerWrapper instead. @@ -199,20 +170,15 @@ func (s *GRPCServer) Serve(opts ServeOptions) error { return s.gs.Serve(lw) } -// newListenerWrapper starts a net.Listener on the address specified in opts. It -// then registers a watch for a Listener resource and blocks until a good +// newListenerWrapper creates and returns a listenerWrapper, which is a thin +// wrapper around the passed in listener lis, that can be passed to +// grpcServer.Serve(). +// +// It then registers a watch for a Listener resource and blocks until a good // response is received or the server is stopped by a call to // Stop/GracefulStop(). -// -// Returns a listenerWrapper, which implements the net.Listener interface, that -// can be passed to grpcServer.Serve(). -func (s *GRPCServer) newListenerWrapper(opts ServeOptions) (*listenerWrapper, error) { - lis, err := net.Listen("tcp", opts.Address) - if err != nil { - return nil, fmt.Errorf("xds: failed to listen on %+v: %v", opts, err) - } +func (s *GRPCServer) newListenerWrapper(lis net.Listener) (*listenerWrapper, error) { lw := &listenerWrapper{Listener: lis} - s.logger.Infof("Started a net.Listener on %s", lis.Addr().String()) // This is used to notify that a good update has been received and that // Serve() can be invoked on the underlying gRPC server. Using a @@ -224,7 +190,7 @@ func (s *GRPCServer) newListenerWrapper(opts ServeOptions) (*listenerWrapper, er // Register an LDS watch using our xdsClient, and specify the listening // address as the resource name. // TODO(easwars): Check if literal IPv6 addresses need an enclosing []. - name := fmt.Sprintf(listenerResourceNameFormat, opts.Address) + name := fmt.Sprintf(listenerResourceNameFormat, lis.Addr().String()) cancelWatch := s.xdsC.WatchListener(name, func(update xdsclient.ListenerUpdate, err error) { if err != nil { // We simply log an error here and hope we get a successful update diff --git a/xds/server_test.go b/xds/server_test.go index 189edd880..91583becc 100644 --- a/xds/server_test.go +++ b/xds/server_test.go @@ -50,57 +50,6 @@ func Test(t *testing.T) { grpctest.RunSubTests(t, s{}) } -func (s) TestServeOptions_Validate(t *testing.T) { - tests := []struct { - desc string - opts ServeOptions - wantErr bool - }{ - { - desc: "empty options", - opts: ServeOptions{}, - wantErr: true, - }, - { - desc: "bad address", - opts: ServeOptions{Address: "I'm a bad IP address"}, - wantErr: true, - }, - { - desc: "no port", - opts: ServeOptions{Address: "1.2.3.4"}, - wantErr: true, - }, - { - desc: "empty hostname", - opts: ServeOptions{Address: ":1234"}, - wantErr: true, - }, - { - desc: "localhost", - opts: ServeOptions{Address: "localhost:1234"}, - wantErr: true, - }, - { - desc: "ipv4", - opts: ServeOptions{Address: "1.2.3.4:1234"}, - }, - { - desc: "ipv6", - opts: ServeOptions{Address: "[1:2::3:4]:1234"}, - }, - } - - for _, test := range tests { - t.Run(test.desc, func(t *testing.T) { - err := test.opts.validate() - if (err != nil) != test.wantErr { - t.Errorf("ServeOptions.validate(%+v) returned err %v, wantErr: %v", test.opts, err, test.wantErr) - } - }) - } -} - type fakeGRPCServer struct { done chan struct{} registerServiceCh *testutils.Channel @@ -229,15 +178,15 @@ func (s) TestServeSuccess(t *testing.T) { server := NewGRPCServer() defer server.Stop() - localAddr, err := xdstestutils.AvailableHostPort() + lis, err := xdstestutils.LocalTCPListener() if err != nil { - t.Fatalf("testutils.AvailableHostPort() failed: %v", err) + t.Fatalf("xdstestutils.LocalTCPListener() failed: %v", err) } // Call Serve() in a goroutine, and push on a channel when Serve returns. serveDone := testutils.NewChannel() go func() { - if err := server.Serve(ServeOptions{Address: localAddr}); err != nil { + if err := server.Serve(lis); err != nil { t.Error(err) } serveDone.Send(nil) @@ -257,7 +206,7 @@ func (s) TestServeSuccess(t *testing.T) { if err != nil { t.Fatalf("error when waiting for a ListenerWatch: %v", err) } - wantName := fmt.Sprintf("grpc/server?udpa.resource.listening_address=%s", localAddr) + wantName := fmt.Sprintf("grpc/server?udpa.resource.listening_address=%s", lis.Addr().String()) if name != wantName { t.Fatalf("LDS watch registered for name %q, want %q", name, wantName) } @@ -290,15 +239,15 @@ func (s) TestServeWithStop(t *testing.T) { // it after the LDS watch has been registered. server := NewGRPCServer() - localAddr, err := xdstestutils.AvailableHostPort() + lis, err := xdstestutils.LocalTCPListener() if err != nil { - t.Fatalf("testutils.AvailableHostPort() failed: %v", err) + t.Fatalf("xdstestutils.LocalTCPListener() failed: %v", err) } // Call Serve() in a goroutine, and push on a channel when Serve returns. serveDone := testutils.NewChannel() go func() { - if err := server.Serve(ServeOptions{Address: localAddr}); err != nil { + if err := server.Serve(lis); err != nil { t.Error(err) } serveDone.Send(nil) @@ -319,7 +268,7 @@ func (s) TestServeWithStop(t *testing.T) { server.Stop() t.Fatalf("error when waiting for a ListenerWatch: %v", err) } - wantName := fmt.Sprintf("grpc/server?udpa.resource.listening_address=%s", localAddr) + wantName := fmt.Sprintf("grpc/server?udpa.resource.listening_address=%s", lis.Addr().String()) if name != wantName { server.Stop() t.Fatalf("LDS watch registered for name %q, wantPrefix %q", name, wantName) @@ -349,14 +298,14 @@ func (s) TestServeBootstrapFailure(t *testing.T) { server := NewGRPCServer() defer server.Stop() - localAddr, err := xdstestutils.AvailableHostPort() + lis, err := xdstestutils.LocalTCPListener() if err != nil { - t.Fatalf("testutils.AvailableHostPort() failed: %v", err) + t.Fatalf("xdstestutils.LocalTCPListener() failed: %v", err) } serveDone := testutils.NewChannel() go func() { - err := server.Serve(ServeOptions{Address: localAddr}) + err := server.Serve(lis) serveDone.Send(err) }() @@ -393,14 +342,14 @@ func (s) TestServeNewClientFailure(t *testing.T) { server := NewGRPCServer() defer server.Stop() - localAddr, err := xdstestutils.AvailableHostPort() + lis, err := xdstestutils.LocalTCPListener() if err != nil { - t.Fatalf("testutils.AvailableHostPort() failed: %v", err) + t.Fatalf("xdstestutils.LocalTCPListener() failed: %v", err) } serveDone := testutils.NewChannel() go func() { - err := server.Serve(ServeOptions{Address: localAddr}) + err := server.Serve(lis) serveDone.Send(err) }()