xds: Minor improvements to xDS server API and test. (#4026)

This commit is contained in:
Easwar Swaminathan 2020-11-11 10:12:52 -08:00 committed by GitHub
parent b5d479d642
commit 28c130fe3d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 108 additions and 38 deletions

View File

@ -41,14 +41,13 @@ import (
testpb "google.golang.org/grpc/test/grpc_testing"
"google.golang.org/grpc/xds"
"google.golang.org/grpc/xds/internal/env"
"google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/fakeserver"
"google.golang.org/grpc/xds/internal/version"
)
const (
defaultTestTimeout = 10 * time.Second
localAddress = "localhost:9999"
listenerName = "grpc/server?udpa.resource.listening_address=localhost:9999"
)
type s struct {
@ -68,7 +67,7 @@ func setupListenerResponse(respCh chan *fakeserver.Response, name string) {
Value: func() []byte {
l := &v3listenerpb.Listener{
// This needs to match the name we are querying for.
Name: listenerName,
Name: name,
ApiListener: &v3listenerpb.ApiListener{
ApiListener: &anypb.Any{
TypeUrl: version.V2HTTPConnManagerURL,
@ -151,8 +150,13 @@ func (s) TestServerSideXDS(t *testing.T) {
testpb.RegisterTestServiceServer(server, &testService{})
defer server.Stop()
localAddr, err := testutils.AvailableHostPort()
if err != nil {
t.Fatalf("testutils.AvailableHostPort() failed: %v", err)
}
go func() {
opts := xds.ServeOptions{Network: "tcp", Address: localAddress}
opts := xds.ServeOptions{Address: localAddr}
if err := server.Serve(opts); err != nil {
t.Errorf("Serve(%+v) failed: %v", opts, err)
}
@ -167,10 +171,10 @@ func (s) TestServerSideXDS(t *testing.T) {
if _, err := fs.XDSRequestChan.Receive(ctx); err != nil {
t.Errorf("timeout when waiting for listener request: %v", err)
}
setupListenerResponse(fs.XDSResponseChan, listenerName)
setupListenerResponse(fs.XDSResponseChan, fmt.Sprintf("grpc/server?udpa.resource.listening_address=%s", localAddr))
}()
cc, err := grpc.DialContext(ctx, localAddress, grpc.WithTransportCredentials(insecure.NewCredentials()))
cc, err := grpc.DialContext(ctx, localAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}

View File

@ -0,0 +1,40 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package testutils
import "net"
// AvailableHostPort returns a local address to listen on. This will be of the
// form "host:port", where the host will be a literal IP address, and port
// must be a literal port number. If the host is a literal IPv6 address it
// will be enclosed in square brackets, as in "[2001:db8::1]:80.
//
// This is useful for tests which need to call the Serve() method on
// xds.GRPCServer which needs to be passed an IP:Port to listen on, where the IP
// must be a literal IP and not localhost. This approach will work on support
// one or both of IPv4 or IPv6.
func AvailableHostPort() (string, error) {
l, err := net.Listen("tcp", "localhost:0")
if err != nil {
return "", err
}
addr := l.Addr().String()
l.Close()
return addr, nil
}

View File

@ -22,6 +22,7 @@ import (
"context"
"fmt"
"net"
"strconv"
"sync"
"google.golang.org/grpc"
@ -81,24 +82,24 @@ type grpcServerInterface interface {
// Notice: This type is EXPERIMENTAL and may be changed or removed in a
// later release.
type ServeOptions struct {
// Network identifies the local network to listen on. The network must be
// "tcp", "tcp4", "tcp6".
Network string
// Address contains the local address to listen on. This should be of the
// form "host:port", where the host must be a literal IP address, and port
// must be a literal port number. If the host is a literal IPv6 address it
// must be enclosed in square brackets, as in "[2001:db8::1]:80. The host
// portion can be left unspecified.
// must be enclosed in square brackets, as in "[2001:db8::1]:80.
Address string
}
func (so *ServeOptions) validate() error {
if net := so.Network; net != "tcp" && net != "tcp4" && net != "tcp6" {
return fmt.Errorf("xds: unsupported network type %q for server listener", net)
}
if _, _, err := net.SplitHostPort(so.Address); err != nil {
addr, port, err := net.SplitHostPort(so.Address)
if err != nil {
return fmt.Errorf("xds: unsupported address %q for server listener", so.Address)
}
if net.ParseIP(addr) == nil {
return fmt.Errorf("xds: failed to parse %q as a valid literal IP address", addr)
}
if _, err := strconv.Atoi(port); err != nil {
return fmt.Errorf("%q is not a valid listener port", port)
}
return nil
}
@ -211,7 +212,7 @@ func (s *GRPCServer) Serve(opts ServeOptions) error {
// Returns a listenerWrapper, which implements the net.Listener interface, that
// can be passed to grpcServer.Serve().
func (s *GRPCServer) newListenerWrapper(opts ServeOptions) (*listenerWrapper, error) {
lis, err := net.Listen(opts.Network, opts.Address)
lis, err := net.Listen("tcp", opts.Address)
if err != nil {
return nil, fmt.Errorf("xds: failed to listen on %+v: %v", opts, err)
}

View File

@ -21,9 +21,9 @@ package xds
import (
"context"
"errors"
"fmt"
"net"
"reflect"
"strings"
"testing"
"time"
@ -61,32 +61,33 @@ func (s) TestServeOptions_Validate(t *testing.T) {
opts: ServeOptions{},
wantErr: true,
},
{
desc: "unsupported network",
opts: ServeOptions{Network: "foo"},
wantErr: true,
},
{
desc: "bad address",
opts: ServeOptions{Network: "tcp", Address: "I'm a bad IP address"},
opts: ServeOptions{Address: "I'm a bad IP address"},
wantErr: true,
},
{
desc: "no port",
opts: ServeOptions{Network: "tcp", Address: "1.2.3.4"},
opts: ServeOptions{Address: "1.2.3.4"},
wantErr: true,
},
{
desc: "empty hostname",
opts: ServeOptions{Network: "tcp", Address: ":1234"},
desc: "empty hostname",
opts: ServeOptions{Address: ":1234"},
wantErr: true,
},
{
desc: "localhost",
opts: ServeOptions{Address: "localhost:1234"},
wantErr: true,
},
{
desc: "ipv4",
opts: ServeOptions{Network: "tcp", Address: "1.2.3.4:1234"},
opts: ServeOptions{Address: "1.2.3.4:1234"},
},
{
desc: "ipv6",
opts: ServeOptions{Network: "tcp", Address: "[1:2::3:4]:1234"},
opts: ServeOptions{Address: "[1:2::3:4]:1234"},
},
}
@ -228,10 +229,17 @@ func (s) TestServeSuccess(t *testing.T) {
server := NewGRPCServer()
defer server.Stop()
localAddr, err := xdstestutils.AvailableHostPort()
if err != nil {
t.Fatalf("testutils.AvailableHostPort() failed: %v", err)
}
// Call Serve() in a goroutine, and push on a channel when Serve returns.
serveDone := testutils.NewChannel()
go func() {
server.Serve(ServeOptions{Network: "tcp", Address: "localhost:0"})
if err := server.Serve(ServeOptions{Address: localAddr}); err != nil {
t.Error(err)
}
serveDone.Send(nil)
}()
@ -249,9 +257,9 @@ func (s) TestServeSuccess(t *testing.T) {
if err != nil {
t.Fatalf("error when waiting for a ListenerWatch: %v", err)
}
wantPrefix := "grpc/server?udpa.resource.listening_address=localhost:"
if !strings.HasPrefix(name, wantPrefix) {
t.Fatalf("LDS watch registered for name %q, wantPrefix %q", name, wantPrefix)
wantName := fmt.Sprintf("grpc/server?udpa.resource.listening_address=%s", localAddr)
if name != wantName {
t.Fatalf("LDS watch registered for name %q, want %q", name, wantName)
}
// Push an error to the registered listener watch callback and make sure
@ -282,10 +290,17 @@ func (s) TestServeWithStop(t *testing.T) {
// it after the LDS watch has been registered.
server := NewGRPCServer()
localAddr, err := xdstestutils.AvailableHostPort()
if err != nil {
t.Fatalf("testutils.AvailableHostPort() failed: %v", err)
}
// Call Serve() in a goroutine, and push on a channel when Serve returns.
serveDone := testutils.NewChannel()
go func() {
server.Serve(ServeOptions{Network: "tcp", Address: "localhost:0"})
if err := server.Serve(ServeOptions{Address: localAddr}); err != nil {
t.Error(err)
}
serveDone.Send(nil)
}()
@ -304,10 +319,10 @@ func (s) TestServeWithStop(t *testing.T) {
server.Stop()
t.Fatalf("error when waiting for a ListenerWatch: %v", err)
}
wantPrefix := "grpc/server?udpa.resource.listening_address=localhost:"
if !strings.HasPrefix(name, wantPrefix) {
wantName := fmt.Sprintf("grpc/server?udpa.resource.listening_address=%s", localAddr)
if name != wantName {
server.Stop()
t.Fatalf("LDS watch registered for name %q, wantPrefix %q", name, wantPrefix)
t.Fatalf("LDS watch registered for name %q, wantPrefix %q", name, wantName)
}
// Call Stop() on the server before a listener update is received, and
@ -334,9 +349,14 @@ func (s) TestServeBootstrapFailure(t *testing.T) {
server := NewGRPCServer()
defer server.Stop()
localAddr, err := xdstestutils.AvailableHostPort()
if err != nil {
t.Fatalf("testutils.AvailableHostPort() failed: %v", err)
}
serveDone := testutils.NewChannel()
go func() {
err := server.Serve(ServeOptions{Network: "tcp", Address: "localhost:0"})
err := server.Serve(ServeOptions{Address: localAddr})
serveDone.Send(err)
}()
@ -373,9 +393,14 @@ func (s) TestServeNewClientFailure(t *testing.T) {
server := NewGRPCServer()
defer server.Stop()
localAddr, err := xdstestutils.AvailableHostPort()
if err != nil {
t.Fatalf("testutils.AvailableHostPort() failed: %v", err)
}
serveDone := testutils.NewChannel()
go func() {
err := server.Serve(ServeOptions{Network: "tcp", Address: "localhost:0"})
err := server.Serve(ServeOptions{Address: localAddr})
serveDone.Send(err)
}()