xds: generic xDS client transport channel and ads stream implementation (#8144)

This commit is contained in:
Purnesh Dixit 2025-03-18 12:33:18 +05:30 committed by GitHub
parent c27e6dc312
commit 1703656ba5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
23 changed files with 4183 additions and 0 deletions

View File

@ -0,0 +1,124 @@
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package backoff implements the backoff strategy for clients.
//
// This is kept in internal until the clients project decides whether or not to
// allow alternative backoff strategies.
package backoff
import (
"context"
"errors"
rand "math/rand/v2"
"time"
)
// config defines the configuration options for backoff.
type config struct {
// baseDelay is the amount of time to backoff after the first failure.
baseDelay time.Duration
// multiplier is the factor with which to multiply backoffs after a
// failed retry. Should ideally be greater than 1.
multiplier float64
// jitter is the factor with which backoffs are randomized.
jitter float64
// maxDelay is the upper bound of backoff delay.
maxDelay time.Duration
}
// defaultConfig is a backoff configuration with the default values specified
// at https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md.
//
// This should be useful for callers who want to configure backoff with
// non-default values only for a subset of the options.
var defaultConfig = config{
baseDelay: 1.0 * time.Second,
multiplier: 1.6,
jitter: 0.2,
maxDelay: 120 * time.Second,
}
// DefaultExponential is an exponential backoff implementation using the
// default values for all the configurable knobs defined in
// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md.
var DefaultExponential = exponential{config: defaultConfig}
// exponential implements exponential backoff algorithm as defined in
// https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md.
type exponential struct {
// Config contains all options to configure the backoff algorithm.
config config
}
// Backoff returns the amount of time to wait before the next retry given the
// number of retries.
func (bc exponential) Backoff(retries int) time.Duration {
if retries == 0 {
return bc.config.baseDelay
}
backoff, max := float64(bc.config.baseDelay), float64(bc.config.maxDelay)
for backoff < max && retries > 0 {
backoff *= bc.config.multiplier
retries--
}
if backoff > max {
backoff = max
}
// Randomize backoff delays so that if a cluster of requests start at
// the same time, they won't operate in lockstep.
backoff *= 1 + bc.config.jitter*(rand.Float64()*2-1)
if backoff < 0 {
return 0
}
return time.Duration(backoff)
}
// ErrResetBackoff is the error to be returned by the function executed by RunF,
// to instruct the latter to reset its backoff state.
var ErrResetBackoff = errors.New("reset backoff state")
// RunF provides a convenient way to run a function f repeatedly until the
// context expires or f returns a non-nil error that is not ErrResetBackoff.
// When f returns ErrResetBackoff, RunF continues to run f, but resets its
// backoff state before doing so. backoff accepts an integer representing the
// number of retries, and returns the amount of time to backoff.
func RunF(ctx context.Context, f func() error, backoff func(int) time.Duration) {
attempt := 0
timer := time.NewTimer(0)
for ctx.Err() == nil {
select {
case <-timer.C:
case <-ctx.Done():
timer.Stop()
return
}
err := f()
if errors.Is(err, ErrResetBackoff) {
timer.Reset(0)
attempt = 0
continue
}
if err != nil {
return
}
timer.Reset(backoff(attempt))
attempt++
}
}

View File

@ -0,0 +1,116 @@
/*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package buffer provides an implementation of an unbounded buffer.
package buffer
import (
"errors"
"sync"
)
// Unbounded is an implementation of an unbounded buffer which does not use
// extra goroutines. This is typically used for passing updates from one entity
// to another within gRPC.
//
// All methods on this type are thread-safe and don't block on anything except
// the underlying mutex used for synchronization.
//
// Unbounded supports values of any type to be stored in it by using a channel
// of `any`. This means that a call to Put() incurs an extra memory allocation,
// and also that users need a type assertion while reading. For performance
// critical code paths, using Unbounded is strongly discouraged and defining a
// new type specific implementation of this buffer is preferred. See
// internal/transport/transport.go for an example of this.
type Unbounded struct {
c chan any
closed bool
closing bool
mu sync.Mutex
backlog []any
}
// NewUnbounded returns a new instance of Unbounded.
func NewUnbounded() *Unbounded {
return &Unbounded{c: make(chan any, 1)}
}
var errBufferClosed = errors.New("Put() called on closed buffer.Unbounded")
// Put adds t to the unbounded buffer.
func (b *Unbounded) Put(t any) error {
b.mu.Lock()
defer b.mu.Unlock()
if b.closing {
return errBufferClosed
}
if len(b.backlog) == 0 {
select {
case b.c <- t:
return nil
default:
}
}
b.backlog = append(b.backlog, t)
return nil
}
// Load sends the earliest buffered data, if any, onto the read channel returned
// by Get(). Users are expected to call this every time they successfully read a
// value from the read channel.
func (b *Unbounded) Load() {
b.mu.Lock()
defer b.mu.Unlock()
if len(b.backlog) > 0 {
select {
case b.c <- b.backlog[0]:
b.backlog[0] = nil
b.backlog = b.backlog[1:]
default:
}
} else if b.closing && !b.closed {
close(b.c)
}
}
// Get returns a read channel on which values added to the buffer, via Put(),
// are sent on.
//
// Upon reading a value from this channel, users are expected to call Load() to
// send the next buffered value onto the channel if there is any.
//
// If the unbounded buffer is closed, the read channel returned by this method
// is closed after all data is drained.
func (b *Unbounded) Get() <-chan any {
return b.c
}
// Close closes the unbounded buffer. No subsequent data may be Put(), and the
// channel returned from Get() will be closed after all the data is read and
// Load() is called for the final time.
func (b *Unbounded) Close() {
b.mu.Lock()
defer b.mu.Unlock()
if b.closing {
return
}
b.closing = true
if len(b.backlog) == 0 {
b.closed = true
close(b.c)
}
}

View File

@ -0,0 +1,148 @@
/*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package buffer
import (
"sort"
"sync"
"testing"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/internal/grpctest"
)
const (
numWriters = 10
numWrites = 10
)
type s struct {
grpctest.Tester
}
func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}
// wantReads contains the set of values expected to be read by the reader
// goroutine in the tests.
var wantReads []int
func init() {
for i := 0; i < numWriters; i++ {
for j := 0; j < numWrites; j++ {
wantReads = append(wantReads, i)
}
}
}
// TestSingleWriter starts one reader and one writer goroutine and makes sure
// that the reader gets all the values added to the buffer by the writer.
func (s) TestSingleWriter(t *testing.T) {
ub := NewUnbounded()
reads := []int{}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
ch := ub.Get()
for i := 0; i < numWriters*numWrites; i++ {
r := <-ch
reads = append(reads, r.(int))
ub.Load()
}
}()
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < numWriters; i++ {
for j := 0; j < numWrites; j++ {
ub.Put(i)
}
}
}()
wg.Wait()
if !cmp.Equal(reads, wantReads) {
t.Errorf("reads: %#v, wantReads: %#v", reads, wantReads)
}
}
// TestMultipleWriters starts multiple writers and one reader goroutine and
// makes sure that the reader gets all the data written by all writers.
func (s) TestMultipleWriters(t *testing.T) {
ub := NewUnbounded()
reads := []int{}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
ch := ub.Get()
for i := 0; i < numWriters*numWrites; i++ {
r := <-ch
reads = append(reads, r.(int))
ub.Load()
}
}()
wg.Add(numWriters)
for i := 0; i < numWriters; i++ {
go func(index int) {
defer wg.Done()
for j := 0; j < numWrites; j++ {
ub.Put(index)
}
}(i)
}
wg.Wait()
sort.Ints(reads)
if !cmp.Equal(reads, wantReads) {
t.Errorf("reads: %#v, wantReads: %#v", reads, wantReads)
}
}
// TestClose closes the buffer and makes sure that nothing is sent after the
// buffer is closed.
func (s) TestClose(t *testing.T) {
ub := NewUnbounded()
if err := ub.Put(1); err != nil {
t.Fatalf("Unbounded.Put() = %v; want nil", err)
}
ub.Close()
if err := ub.Put(1); err == nil {
t.Fatalf("Unbounded.Put() = <nil>; want non-nil error")
}
if v, ok := <-ub.Get(); !ok {
t.Errorf("Unbounded.Get() = %v, %v, want %v, %v", v, ok, 1, true)
}
if err := ub.Put(1); err == nil {
t.Fatalf("Unbounded.Put() = <nil>; want non-nil error")
}
ub.Load()
if v, ok := <-ub.Get(); ok {
t.Errorf("Unbounded.Get() = %v, want closed channel", v)
}
if err := ub.Put(1); err == nil {
t.Fatalf("Unbounded.Put() = <nil>; want non-nil error")
}
ub.Close() // ignored
}

View File

@ -0,0 +1,73 @@
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package pretty defines helper functions to pretty-print structs for logging.
package pretty
import (
"bytes"
"encoding/json"
"fmt"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/protoadapt"
)
const jsonIndent = " "
// ToJSON marshals the input into a json string.
//
// If marshal fails, it falls back to fmt.Sprintf("%+v").
func ToJSON(e any) string {
if ee, ok := e.(protoadapt.MessageV1); ok {
e = protoadapt.MessageV2Of(ee)
}
if ee, ok := e.(protoadapt.MessageV2); ok {
mm := protojson.MarshalOptions{
Indent: jsonIndent,
Multiline: true,
}
ret, err := mm.Marshal(ee)
if err != nil {
// This may fail for proto.Anys, e.g. for xDS v2, LDS, the v2
// messages are not imported, and this will fail because the message
// is not found.
return fmt.Sprintf("%+v", ee)
}
return string(ret)
}
ret, err := json.MarshalIndent(e, "", jsonIndent)
if err != nil {
return fmt.Sprintf("%+v", e)
}
return string(ret)
}
// FormatJSON formats the input json bytes with indentation.
//
// If Indent fails, it returns the unchanged input as string.
func FormatJSON(b []byte) string {
var out bytes.Buffer
err := json.Indent(&out, b, "", jsonIndent)
if err != nil {
return string(b)
}
return out.String()
}

View File

@ -0,0 +1,112 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package syncutil
import (
"context"
"google.golang.org/grpc/xds/internal/clients/internal/buffer"
)
// CallbackSerializer provides a mechanism to schedule callbacks in a
// synchronized manner. It provides a FIFO guarantee on the order of execution
// of scheduled callbacks. New callbacks can be scheduled by invoking the
// Schedule() method.
//
// This type is safe for concurrent access.
type CallbackSerializer struct {
// done is closed once the serializer is shut down completely, i.e all
// scheduled callbacks are executed and the serializer has deallocated all
// its resources.
done chan struct{}
callbacks *buffer.Unbounded
}
// NewCallbackSerializer returns a new CallbackSerializer instance. The provided
// context will be passed to the scheduled callbacks. Users should cancel the
// provided context to shutdown the CallbackSerializer. It is guaranteed that no
// callbacks will be added once this context is canceled, and any pending un-run
// callbacks will be executed before the serializer is shut down.
func NewCallbackSerializer(ctx context.Context) *CallbackSerializer {
cs := &CallbackSerializer{
done: make(chan struct{}),
callbacks: buffer.NewUnbounded(),
}
go cs.run(ctx)
return cs
}
// TrySchedule tries to schedule the provided callback function f to be
// executed in the order it was added. This is a best-effort operation. If the
// context passed to NewCallbackSerializer was canceled before this method is
// called, the callback will not be scheduled.
//
// Callbacks are expected to honor the context when performing any blocking
// operations, and should return early when the context is canceled.
func (cs *CallbackSerializer) TrySchedule(f func(ctx context.Context)) {
cs.callbacks.Put(f)
}
// ScheduleOr schedules the provided callback function f to be executed in the
// order it was added. If the context passed to NewCallbackSerializer has been
// canceled before this method is called, the onFailure callback will be
// executed inline instead.
//
// Callbacks are expected to honor the context when performing any blocking
// operations, and should return early when the context is canceled.
func (cs *CallbackSerializer) ScheduleOr(f func(ctx context.Context), onFailure func()) {
if cs.callbacks.Put(f) != nil {
onFailure()
}
}
func (cs *CallbackSerializer) run(ctx context.Context) {
defer close(cs.done)
// TODO: when Go 1.21 is the oldest supported version, this loop and Close
// can be replaced with:
//
// context.AfterFunc(ctx, cs.callbacks.Close)
for ctx.Err() == nil {
select {
case <-ctx.Done():
// Do nothing here. Next iteration of the for loop will not happen,
// since ctx.Err() would be non-nil.
case cb := <-cs.callbacks.Get():
cs.callbacks.Load()
cb.(func(context.Context))(ctx)
}
}
// Close the buffer to prevent new callbacks from being added.
cs.callbacks.Close()
// Run all pending callbacks.
for cb := range cs.callbacks.Get() {
cs.callbacks.Load()
cb.(func(context.Context))(ctx)
}
}
// Done returns a channel that is closed after the context passed to
// NewCallbackSerializer is canceled and all callbacks have been executed.
func (cs *CallbackSerializer) Done() <-chan struct{} {
return cs.done
}

View File

@ -0,0 +1,206 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package syncutil
import (
"context"
"sync"
"testing"
"time"
"github.com/google/go-cmp/cmp"
)
const (
defaultTestTimeout = 5 * time.Second
defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen.
)
// TestCallbackSerializer_Schedule_FIFO verifies that callbacks are executed in
// the same order in which they were scheduled.
func (s) TestCallbackSerializer_Schedule_FIFO(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
cs := NewCallbackSerializer(ctx)
defer cancel()
// We have two channels, one to record the order of scheduling, and the
// other to record the order of execution. We spawn a bunch of goroutines
// which record the order of scheduling and call the actual Schedule()
// method as well. The callbacks record the order of execution.
//
// We need to grab a lock to record order of scheduling to guarantee that
// the act of recording and the act of calling Schedule() happen atomically.
const numCallbacks = 100
var mu sync.Mutex
scheduleOrderCh := make(chan int, numCallbacks)
executionOrderCh := make(chan int, numCallbacks)
for i := 0; i < numCallbacks; i++ {
go func(id int) {
mu.Lock()
defer mu.Unlock()
scheduleOrderCh <- id
cs.TrySchedule(func(ctx context.Context) {
select {
case <-ctx.Done():
return
case executionOrderCh <- id:
}
})
}(i)
}
// Spawn a couple of goroutines to capture the order or scheduling and the
// order of execution.
scheduleOrder := make([]int, numCallbacks)
executionOrder := make([]int, numCallbacks)
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
for i := 0; i < numCallbacks; i++ {
select {
case <-ctx.Done():
return
case id := <-scheduleOrderCh:
scheduleOrder[i] = id
}
}
}()
go func() {
defer wg.Done()
for i := 0; i < numCallbacks; i++ {
select {
case <-ctx.Done():
return
case id := <-executionOrderCh:
executionOrder[i] = id
}
}
}()
wg.Wait()
if diff := cmp.Diff(executionOrder, scheduleOrder); diff != "" {
t.Fatalf("Callbacks are not executed in scheduled order. diff(-want, +got):\n%s", diff)
}
}
// TestCallbackSerializer_Schedule_Concurrent verifies that all concurrently
// scheduled callbacks get executed.
func (s) TestCallbackSerializer_Schedule_Concurrent(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
cs := NewCallbackSerializer(ctx)
defer cancel()
// Schedule callbacks concurrently by calling Schedule() from goroutines.
// The execution of the callbacks call Done() on the waitgroup, which
// eventually unblocks the test and allows it to complete.
const numCallbacks = 100
var wg sync.WaitGroup
wg.Add(numCallbacks)
for i := 0; i < numCallbacks; i++ {
go func() {
cs.TrySchedule(func(context.Context) {
wg.Done()
})
}()
}
// We call Wait() on the waitgroup from a goroutine so that we can select on
// the Wait() being unblocked and the overall test deadline expiring.
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-ctx.Done():
t.Fatal("Timeout waiting for all scheduled callbacks to be executed")
case <-done:
}
}
// TestCallbackSerializer_Schedule_Close verifies that callbacks in the queue
// are not executed once Close() returns.
func (s) TestCallbackSerializer_Schedule_Close(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
serializerCtx, serializerCancel := context.WithTimeout(context.Background(), defaultTestTimeout)
cs := NewCallbackSerializer(serializerCtx)
// Schedule a callback which blocks until the context passed to it is
// canceled. It also closes a channel to signal that it has started.
firstCallbackStartedCh := make(chan struct{})
cs.TrySchedule(func(ctx context.Context) {
close(firstCallbackStartedCh)
<-ctx.Done()
})
// Schedule a bunch of callbacks. These should be executed since they are
// scheduled before the serializer is closed.
const numCallbacks = 10
callbackCh := make(chan int, numCallbacks)
for i := 0; i < numCallbacks; i++ {
num := i
callback := func(context.Context) { callbackCh <- num }
onFailure := func() { t.Fatal("Schedule failed to accept a callback when the serializer is yet to be closed") }
cs.ScheduleOr(callback, onFailure)
}
// Ensure that none of the newer callbacks are executed at this point.
select {
case <-time.After(defaultTestShortTimeout):
case <-callbackCh:
t.Fatal("Newer callback executed when older one is still executing")
}
// Wait for the first callback to start before closing the scheduler.
<-firstCallbackStartedCh
// Cancel the context which will unblock the first callback. All of the
// other callbacks (which have not started executing at this point) should
// be executed after this.
serializerCancel()
// Ensure that the newer callbacks are executed.
for i := 0; i < numCallbacks; i++ {
select {
case <-ctx.Done():
t.Fatal("Timeout when waiting for callback scheduled before close to be executed")
case num := <-callbackCh:
if num != i {
t.Fatalf("Executing callback %d, want %d", num, i)
}
}
}
<-cs.Done()
// Ensure that a callback cannot be scheduled after the serializer is
// closed.
done := make(chan struct{})
callback := func(context.Context) { t.Fatal("Scheduled a callback after closing the serializer") }
onFailure := func() { close(done) }
cs.ScheduleOr(callback, onFailure)
select {
case <-time.After(defaultTestTimeout):
t.Fatal("Successfully scheduled callback after serializer is closed")
case <-done:
}
}

View File

@ -0,0 +1,61 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package syncutil implements additional synchronization primitives built upon
// the sync package.
package syncutil
import (
"sync"
"sync/atomic"
)
// Event represents a one-time event that may occur in the future.
type Event struct {
fired int32
c chan struct{}
o sync.Once
}
// Fire causes e to complete. It is safe to call multiple times, and
// concurrently. It returns true iff this call to Fire caused the signaling
// channel returned by Done to close.
func (e *Event) Fire() bool {
ret := false
e.o.Do(func() {
atomic.StoreInt32(&e.fired, 1)
close(e.c)
ret = true
})
return ret
}
// Done returns a channel that will be closed when Fire is called.
func (e *Event) Done() <-chan struct{} {
return e.c
}
// HasFired returns true if Fire has been called.
func (e *Event) HasFired() bool {
return atomic.LoadInt32(&e.fired) == 1
}
// NewEvent returns a new, ready-to-use Event.
func NewEvent() *Event {
return &Event{c: make(chan struct{})}
}

View File

@ -0,0 +1,81 @@
/*
*
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package syncutil
import (
"testing"
"google.golang.org/grpc/internal/grpctest"
)
type s struct {
grpctest.Tester
}
func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}
func (s) TestEventHasFired(t *testing.T) {
e := NewEvent()
if e.HasFired() {
t.Fatal("e.HasFired() = true; want false")
}
if !e.Fire() {
t.Fatal("e.Fire() = false; want true")
}
if !e.HasFired() {
t.Fatal("e.HasFired() = false; want true")
}
}
func (s) TestEventDoneChannel(t *testing.T) {
e := NewEvent()
select {
case <-e.Done():
t.Fatal("e.HasFired() = true; want false")
default:
}
if !e.Fire() {
t.Fatal("e.Fire() = false; want true")
}
select {
case <-e.Done():
default:
t.Fatal("e.HasFired() = false; want true")
}
}
func (s) TestEventMultipleFires(t *testing.T) {
e := NewEvent()
if e.HasFired() {
t.Fatal("e.HasFired() = true; want false")
}
if !e.Fire() {
t.Fatal("e.Fire() = false; want true")
}
for i := 0; i < 3; i++ {
if !e.HasFired() {
t.Fatal("e.HasFired() = false; want true")
}
if e.Fire() {
t.Fatal("e.Fire() = true; want false")
}
}
}

View File

@ -0,0 +1,37 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Package testutils contains testing helpers for xDS and LRS clients.
package testutils
// Channel wraps a generic channel and provides a timed receive operation.
type Channel struct {
// C is the underlying channel on which values sent using the SendXxx()
// methods are delivered. Tests which cannot use ReceiveXxx() for whatever
// reasons can use C to read the values.
C chan any
}
// Send sends value on the underlying channel.
func (c *Channel) Send(value any) {
c.C <- value
}
// NewChannelWithSize returns a new Channel with a buffer of bufSize.
func NewChannelWithSize(bufSize int) *Channel {
return &Channel{C: make(chan any, bufSize)}
}

View File

@ -0,0 +1,84 @@
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package e2e
import (
"fmt"
"github.com/envoyproxy/go-control-plane/pkg/wellknown"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3routerpb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3"
v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
)
const (
// ClientSideCertProviderInstance is the certificate provider instance name
// used in the Cluster resource on the client side.
ClientSideCertProviderInstance = "client-side-certificate-provider-instance"
)
// RouterHTTPFilter is the HTTP Filter configuration for the Router filter.
var RouterHTTPFilter = HTTPFilter("router", &v3routerpb.Router{})
// DefaultClientListener returns a basic xds Listener resource to be used on
// the client side.
func DefaultClientListener(target, routeName string) *v3listenerpb.Listener {
hcm := marshalAny(&v3httppb.HttpConnectionManager{
RouteSpecifier: &v3httppb.HttpConnectionManager_Rds{Rds: &v3httppb.Rds{
ConfigSource: &v3corepb.ConfigSource{
ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{Ads: &v3corepb.AggregatedConfigSource{}},
},
RouteConfigName: routeName,
}},
HttpFilters: []*v3httppb.HttpFilter{HTTPFilter("router", &v3routerpb.Router{})}, // router fields are unused by grpc
})
return &v3listenerpb.Listener{
Name: target,
ApiListener: &v3listenerpb.ApiListener{ApiListener: hcm},
FilterChains: []*v3listenerpb.FilterChain{{
Name: "filter-chain-name",
Filters: []*v3listenerpb.Filter{{
Name: wellknown.HTTPConnectionManager,
ConfigType: &v3listenerpb.Filter_TypedConfig{TypedConfig: hcm},
}},
}},
}
}
func marshalAny(m proto.Message) *anypb.Any {
a, err := anypb.New(m)
if err != nil {
panic(fmt.Sprintf("anypb.New(%+v) failed: %v", m, err))
}
return a
}
// HTTPFilter constructs an xds HttpFilter with the provided name and config.
func HTTPFilter(name string, config proto.Message) *v3httppb.HttpFilter {
return &v3httppb.HttpFilter{
Name: name,
ConfigType: &v3httppb.HttpFilter_TypedConfig{
TypedConfig: marshalAny(config),
},
}
}

View File

@ -0,0 +1,40 @@
/*
*
* Copyright 2022 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package e2e
// serverLogger implements the Logger interface defined at
// envoyproxy/go-control-plane/pkg/log. This is passed to the Snapshot cache.
type serverLogger struct {
logger interface {
Logf(format string, args ...any)
}
}
func (l serverLogger) Debugf(format string, args ...any) {
l.logger.Logf(format, args...)
}
func (l serverLogger) Infof(format string, args ...any) {
l.logger.Logf(format, args...)
}
func (l serverLogger) Warnf(format string, args ...any) {
l.logger.Logf(format, args...)
}
func (l serverLogger) Errorf(format string, args ...any) {
l.logger.Logf(format, args...)
}

View File

@ -0,0 +1,257 @@
/*
*
* Copyright 2020 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package e2e provides utilities for end2end testing of xDS and LRS clients
// functionalities.
package e2e
import (
"context"
"fmt"
"net"
"reflect"
"strconv"
"testing"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"google.golang.org/grpc"
"google.golang.org/grpc/xds/internal/clients/internal/testutils/fakeserver"
v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
v3endpointpb "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
v3discoverygrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
v3lrsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v3"
v3cache "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
v3resource "github.com/envoyproxy/go-control-plane/pkg/resource/v3"
v3server "github.com/envoyproxy/go-control-plane/pkg/server/v3"
)
// ManagementServer is a thin wrapper around the xDS control plane
// implementation provided by envoyproxy/go-control-plane.
type ManagementServer struct {
// Address is the host:port on which the management server is listening for
// new connections.
Address string
// LRSServer points to the fake LRS server implementation. Set only if the
// SupportLoadReportingService option was set to true when creating this
// management server.
LRSServer *fakeserver.Server
cancel context.CancelFunc // To stop the v3 ADS service.
xs v3server.Server // v3 implementation of ADS.
gs *grpc.Server // gRPC server which exports the ADS service.
cache v3cache.SnapshotCache // Resource snapshot.
version int // Version of resource snapshot.
// A logging interface, usually supplied from *testing.T.
logger interface {
Logf(format string, args ...any)
}
}
// ManagementServerOptions contains options to be passed to the management
// server during creation.
type ManagementServerOptions struct {
// Listener to accept connections on. If nil, a TPC listener on a local port
// will be created and used.
Listener net.Listener
// SupportLoadReportingService, if set, results in the load reporting
// service being registered on the same port as that of ADS.
SupportLoadReportingService bool
// AllowResourceSubSet allows the management server to respond to requests
// before all configured resources are explicitly named in the request. The
// default behavior that we want is for the management server to wait for
// all configured resources to be requested before responding to any of
// them, since this is how we have run our tests historically, and should be
// set to true only for tests which explicitly require the other behavior.
AllowResourceSubset bool
// ServerFeaturesIgnoreResourceDeletion, if set, results in a bootstrap config
// where the server features list contains `ignore_resource_deletion`. This
// results in gRPC ignoring resource deletions from the management server, as
// per A53.
ServerFeaturesIgnoreResourceDeletion bool
// The callbacks defined below correspond to the state of the world (sotw)
// version of the xDS API on the management server.
// OnStreamOpen is called when an xDS stream is opened. The callback is
// invoked with the assigned stream ID and the type URL from the incoming
// request (or "" for ADS).
//
// Returning an error from this callback will end processing and close the
// stream. OnStreamClosed will still be called.
OnStreamOpen func(context.Context, int64, string) error
// OnStreamClosed is called immediately prior to closing an xDS stream. The
// callback is invoked with the stream ID of the stream being closed.
OnStreamClosed func(int64, *v3corepb.Node)
// OnStreamRequest is called when a request is received on the stream. The
// callback is invoked with the stream ID of the stream on which the request
// was received and the received request.
//
// Returning an error from this callback will end processing and close the
// stream. OnStreamClosed will still be called.
OnStreamRequest func(int64, *v3discoverypb.DiscoveryRequest) error
// OnStreamResponse is called immediately prior to sending a response on the
// stream. The callback is invoked with the stream ID of the stream on which
// the response is being sent along with the incoming request and the outgoing
// response.
OnStreamResponse func(context.Context, int64, *v3discoverypb.DiscoveryRequest, *v3discoverypb.DiscoveryResponse)
}
// StartManagementServer initializes a management server which implements the
// AggregatedDiscoveryService endpoint. The management server is initialized
// with no resources. Tests should call the Update() method to change the
// resource snapshot held by the management server, as per by the test logic.
//
// Registers a cleanup function on t to stop the management server.
func StartManagementServer(t *testing.T, opts ManagementServerOptions) *ManagementServer {
t.Helper()
// Create a snapshot cache. The first parameter to NewSnapshotCache()
// controls whether the server should wait for all resources to be
// explicitly named in the request before responding to any of them.
wait := !opts.AllowResourceSubset
cache := v3cache.NewSnapshotCache(wait, v3cache.IDHash{}, serverLogger{t})
t.Logf("Created new snapshot cache...")
lis := opts.Listener
if lis == nil {
var err error
lis, err = net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Failed to listen on localhost:0: %v", err)
}
}
// Cancelling the context passed to the server is the only way of stopping it
// at the end of the test.
ctx, cancel := context.WithCancel(context.Background())
callbacks := v3server.CallbackFuncs{
StreamOpenFunc: opts.OnStreamOpen,
StreamClosedFunc: opts.OnStreamClosed,
StreamRequestFunc: opts.OnStreamRequest,
StreamResponseFunc: opts.OnStreamResponse,
}
// Create an xDS management server and register the ADS implementation
// provided by it on a gRPC server.
xs := v3server.NewServer(ctx, cache, callbacks)
gs := grpc.NewServer()
v3discoverygrpc.RegisterAggregatedDiscoveryServiceServer(gs, xs)
t.Logf("Registered Aggregated Discovery Service (ADS)...")
mgmtServer := &ManagementServer{
Address: lis.Addr().String(),
cancel: cancel,
version: 0,
gs: gs,
xs: xs,
cache: cache,
logger: t,
}
if opts.SupportLoadReportingService {
lrs := fakeserver.NewServer(lis.Addr().String())
v3lrsgrpc.RegisterLoadReportingServiceServer(gs, lrs)
mgmtServer.LRSServer = lrs
t.Logf("Registered Load Reporting Service (LRS)...")
}
// Start serving.
go gs.Serve(lis)
t.Logf("xDS management server serving at: %v...", lis.Addr().String())
t.Cleanup(mgmtServer.Stop)
return mgmtServer
}
// UpdateOptions wraps parameters to be passed to the Update() method.
type UpdateOptions struct {
// NodeID is the id of the client to which this update is to be pushed.
NodeID string
// Endpoints, Clusters, Routes, and Listeners are the updated list of xds
// resources for the server. All must be provided with each Update.
Endpoints []*v3endpointpb.ClusterLoadAssignment
Clusters []*v3clusterpb.Cluster
Routes []*v3routepb.RouteConfiguration
Listeners []*v3listenerpb.Listener
// SkipValidation indicates whether we want to skip validation (by not
// calling snapshot.Consistent()). It can be useful for negative tests,
// where we send updates that the client will NACK.
SkipValidation bool
}
// Update changes the resource snapshot held by the management server, which
// updates connected clients as required.
func (s *ManagementServer) Update(ctx context.Context, opts UpdateOptions) error {
s.version++
// Create a snapshot with the passed in resources.
resources := map[v3resource.Type][]types.Resource{
v3resource.ListenerType: resourceSlice(opts.Listeners),
v3resource.RouteType: resourceSlice(opts.Routes),
v3resource.ClusterType: resourceSlice(opts.Clusters),
v3resource.EndpointType: resourceSlice(opts.Endpoints),
}
snapshot, err := v3cache.NewSnapshot(strconv.Itoa(s.version), resources)
if err != nil {
return fmt.Errorf("failed to create new snapshot cache: %v", err)
}
if !opts.SkipValidation {
if err := snapshot.Consistent(); err != nil {
return fmt.Errorf("failed to create new resource snapshot: %v", err)
}
}
s.logger.Logf("Created new resource snapshot...")
// Update the cache with the new resource snapshot.
if err := s.cache.SetSnapshot(ctx, opts.NodeID, snapshot); err != nil {
return fmt.Errorf("failed to update resource snapshot in management server: %v", err)
}
s.logger.Logf("Updated snapshot cache with resource snapshot...")
return nil
}
// Stop stops the management server.
func (s *ManagementServer) Stop() {
if s.cancel != nil {
s.cancel()
}
s.gs.Stop()
}
// resourceSlice accepts a slice of any type of proto messages and returns a
// slice of types.Resource. Will panic if there is an input type mismatch.
func resourceSlice(i any) []types.Resource {
v := reflect.ValueOf(i)
rs := make([]types.Resource, v.Len())
for i := 0; i < v.Len(); i++ {
rs[i] = v.Index(i).Interface().(types.Resource)
}
return rs
}

View File

@ -0,0 +1,254 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package fakeserver provides a fake implementation of the management server.
//
// This package is recommended only for scenarios which cannot be tested using
// the xDS management server (which uses envoy-go-control-plane) provided by the
// `internal/testutils/e2e` package.
package fakeserver
import (
"fmt"
"io"
"net"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/grpc/xds/internal/clients/internal/testutils"
"google.golang.org/protobuf/proto"
v3discoverygrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
v3lrsgrpc "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v3"
v3lrspb "github.com/envoyproxy/go-control-plane/envoy/service/load_stats/v3"
)
const (
// TODO: Make this a var or a field in the server if there is a need to use a
// value other than this default.
defaultChannelBufferSize = 50
defaultDialTimeout = 5 * time.Second
)
// Request wraps the request protobuf (xds/LRS) and error received by the
// Server in a call to stream.Recv().
type Request struct {
Req proto.Message
Err error
}
// Response wraps the response protobuf (xds/LRS) and error that the Server
// should send out to the client through a call to stream.Send()
type Response struct {
Resp proto.Message
Err error
}
// Server is a fake implementation of xDS and LRS protocols. It listens on the
// same port for both services and exposes a bunch of channels to send/receive
// messages.
//
// This server is recommended only for scenarios which cannot be tested using
// the xDS management server (which uses envoy-go-control-plane) provided by the
// `internal/testutils/xds/e2e` package.
type Server struct {
// XDSRequestChan is a channel on which received xDS requests are made
// available to the users of this Server.
XDSRequestChan *testutils.Channel
// XDSResponseChan is a channel on which the Server accepts xDS responses
// to be sent to the client.
XDSResponseChan chan *Response
// LRSRequestChan is a channel on which received LRS requests are made
// available to the users of this Server.
LRSRequestChan *testutils.Channel
// LRSResponseChan is a channel on which the Server accepts the LRS
// response to be sent to the client.
LRSResponseChan chan *Response
// LRSStreamOpenChan is a channel on which the Server sends notifications
// when a new LRS stream is created.
LRSStreamOpenChan *testutils.Channel
// LRSStreamCloseChan is a channel on which the Server sends notifications
// when an existing LRS stream is closed.
LRSStreamCloseChan *testutils.Channel
// NewConnChan is a channel on which the fake server notifies receipt of new
// connection attempts. Tests can gate on this event before proceeding to
// other actions which depend on a connection to the fake server being up.
NewConnChan *testutils.Channel
// Address is the host:port on which the Server is listening for requests.
Address string
// The underlying fake implementation of xDS and LRS.
*xdsServer
*lrsServer
}
type wrappedListener struct {
net.Listener
server *Server
}
func (wl *wrappedListener) Accept() (net.Conn, error) {
c, err := wl.Listener.Accept()
if err != nil {
return nil, err
}
wl.server.NewConnChan.Send(struct{}{})
return c, err
}
// StartServer makes a new Server and gets it to start listening on the given
// net.Listener. If the given net.Listener is nil, a new one is created on a
// local port for gRPC requests. The returned cancel function should be invoked
// by the caller upon completion of the test.
func StartServer(lis net.Listener) (*Server, func(), error) {
if lis == nil {
var err error
lis, err = net.Listen("tcp", "localhost:0")
if err != nil {
return nil, func() {}, fmt.Errorf("net.Listen() failed: %v", err)
}
}
s := NewServer(lis.Addr().String())
wp := &wrappedListener{
Listener: lis,
server: s,
}
server := grpc.NewServer()
v3lrsgrpc.RegisterLoadReportingServiceServer(server, s)
v3discoverygrpc.RegisterAggregatedDiscoveryServiceServer(server, s)
go server.Serve(wp)
return s, func() { server.Stop() }, nil
}
// NewServer returns a new instance of Server, set to accept requests on addr.
// It is the responsibility of the caller to register the exported ADS and LRS
// services on an appropriate gRPC server. Most usages should prefer
// StartServer() instead of this.
func NewServer(addr string) *Server {
s := &Server{
XDSRequestChan: testutils.NewChannelWithSize(defaultChannelBufferSize),
LRSRequestChan: testutils.NewChannelWithSize(defaultChannelBufferSize),
NewConnChan: testutils.NewChannelWithSize(defaultChannelBufferSize),
XDSResponseChan: make(chan *Response, defaultChannelBufferSize),
LRSResponseChan: make(chan *Response, 1), // The server only ever sends one response.
LRSStreamOpenChan: testutils.NewChannelWithSize(defaultChannelBufferSize),
LRSStreamCloseChan: testutils.NewChannelWithSize(defaultChannelBufferSize),
Address: addr,
}
s.xdsServer = &xdsServer{reqChan: s.XDSRequestChan, respChan: s.XDSResponseChan}
s.lrsServer = &lrsServer{reqChan: s.LRSRequestChan, respChan: s.LRSResponseChan, streamOpenChan: s.LRSStreamOpenChan, streamCloseChan: s.LRSStreamCloseChan}
return s
}
type xdsServer struct {
reqChan *testutils.Channel
respChan chan *Response
}
func (xdsS *xdsServer) StreamAggregatedResources(s v3discoverygrpc.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
errCh := make(chan error, 2)
go func() {
for {
req, err := s.Recv()
if err != nil {
errCh <- err
return
}
xdsS.reqChan.Send(&Request{req, err})
}
}()
go func() {
var retErr error
defer func() {
errCh <- retErr
}()
for {
select {
case r := <-xdsS.respChan:
if r.Err != nil {
retErr = r.Err
return
}
if err := s.Send(r.Resp.(*v3discoverypb.DiscoveryResponse)); err != nil {
retErr = err
return
}
case <-s.Context().Done():
retErr = s.Context().Err()
return
}
}
}()
if err := <-errCh; err != nil {
return err
}
return nil
}
func (xdsS *xdsServer) DeltaAggregatedResources(v3discoverygrpc.AggregatedDiscoveryService_DeltaAggregatedResourcesServer) error {
return status.Error(codes.Unimplemented, "")
}
type lrsServer struct {
reqChan *testutils.Channel
respChan chan *Response
streamOpenChan *testutils.Channel
streamCloseChan *testutils.Channel
}
func (lrsS *lrsServer) StreamLoadStats(s v3lrsgrpc.LoadReportingService_StreamLoadStatsServer) error {
lrsS.streamOpenChan.Send(nil)
defer lrsS.streamCloseChan.Send(nil)
req, err := s.Recv()
lrsS.reqChan.Send(&Request{req, err})
if err != nil {
return err
}
select {
case r := <-lrsS.respChan:
if r.Err != nil {
return r.Err
}
if err := s.Send(r.Resp.(*v3lrspb.LoadStatsResponse)); err != nil {
return err
}
case <-s.Context().Done():
return s.Context().Err()
}
for {
req, err := s.Recv()
lrsS.reqChan.Send(&Request{req, err})
if err != nil {
if err == io.EOF {
return nil
}
return err
}
}
}

View File

@ -0,0 +1,37 @@
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package testutils
import (
"testing"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
)
// MarshalAny is a convenience function to marshal protobuf messages into any
// protos. function will fail the test with a fatal error if the marshaling fails.
func MarshalAny(t *testing.T, m proto.Message) *anypb.Any {
t.Helper()
a, err := anypb.New(m)
if err != nil {
t.Fatalf("Failed to marshal proto %+v into an Any: %v", m, err)
}
return a
}

View File

@ -0,0 +1,106 @@
/*
*
* Copyright 2019 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package testutils
import (
"net"
"sync"
"google.golang.org/grpc/grpclog"
)
var logger = grpclog.Component("testutils")
type tempError struct{}
func (*tempError) Error() string {
return "restartable listener temporary error"
}
func (*tempError) Temporary() bool {
return true
}
// RestartableListener wraps a net.Listener and supports stopping and restarting
// the latter.
type RestartableListener struct {
lis net.Listener
mu sync.Mutex
stopped bool
conns []net.Conn
}
// NewRestartableListener returns a new RestartableListener wrapping l.
func NewRestartableListener(l net.Listener) *RestartableListener {
return &RestartableListener{lis: l}
}
// Accept waits for and returns the next connection to the listener.
//
// If the listener is currently not accepting new connections, because `Stop`
// was called on it, the connection is immediately closed after accepting
// without any bytes being sent on it.
func (l *RestartableListener) Accept() (net.Conn, error) {
conn, err := l.lis.Accept()
if err != nil {
return nil, err
}
l.mu.Lock()
defer l.mu.Unlock()
if l.stopped {
conn.Close()
return nil, &tempError{}
}
l.conns = append(l.conns, conn)
return conn, nil
}
// Close closes the listener.
func (l *RestartableListener) Close() error {
return l.lis.Close()
}
// Addr returns the listener's network address.
func (l *RestartableListener) Addr() net.Addr {
return l.lis.Addr()
}
// Stop closes existing connections on the listener and prevents new connections
// from being accepted.
func (l *RestartableListener) Stop() {
logger.Infof("Stopping restartable listener %q", l.Addr())
l.mu.Lock()
l.stopped = true
for _, conn := range l.conns {
conn.Close()
}
l.conns = nil
l.mu.Unlock()
}
// Restart gets a previously stopped listener to start accepting connections.
func (l *RestartableListener) Restart() {
logger.Infof("Restarting listener %q", l.Addr())
l.mu.Lock()
l.stopped = false
l.mu.Unlock()
}

View File

@ -0,0 +1,785 @@
/*
*
* Copyright 2025 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package xdsclient
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
"google.golang.org/grpc/grpclog"
igrpclog "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/xds/internal/clients"
"google.golang.org/grpc/xds/internal/clients/internal/backoff"
"google.golang.org/grpc/xds/internal/clients/internal/buffer"
"google.golang.org/grpc/xds/internal/clients/internal/pretty"
"google.golang.org/grpc/xds/internal/clients/xdsclient/internal/xdsresource"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
cpb "google.golang.org/genproto/googleapis/rpc/code"
statuspb "google.golang.org/genproto/googleapis/rpc/status"
)
const (
// Any per-RPC level logs which print complete request or response messages
// should be gated at this verbosity level. Other per-RPC level logs which print
// terse output should be at `INFO` and verbosity 2.
perRPCVerbosityLevel = 9
)
// response represents a response received on the ADS stream. It contains the
// type URL, version, and resources for the response.
type response struct {
typeURL string
version string
resources []*anypb.Any
}
// dataAndErrTuple is a struct that holds a resource and an error. It is used to
// return a resource and any associated error from a function.
type dataAndErrTuple struct {
Resource ResourceData
Err error
}
// adsStreamEventHandler is an interface that defines the callbacks for events that
// occur on the ADS stream. Methods on this interface may be invoked
// concurrently and implementations need to handle them in a thread-safe manner.
type adsStreamEventHandler interface {
onStreamError(error) // Called when the ADS stream breaks.
onWatchExpiry(ResourceType, string) // Called when the watch timer expires for a resource.
onResponse(response, func()) ([]string, error) // Called when a response is received on the ADS stream.
}
// watchState is a enum that describes the watch state of a particular
// resource.
type watchState int
const (
// resourceWatchStateStarted is the state where a watch for a resource was
// started, but a request asking for that resource is yet to be sent to the
// management server.
resourceWatchStateStarted watchState = iota
// resourceWatchStateRequested is the state when a request has been sent for
// the resource being watched.
resourceWatchStateRequested
// ResourceWatchStateReceived is the state when a response has been received
// for the resource being watched.
resourceWatchStateReceived
// resourceWatchStateTimeout is the state when the watch timer associated
// with the resource expired because no response was received.
resourceWatchStateTimeout
)
// resourceWatchState is the state corresponding to a resource being watched.
type resourceWatchState struct {
State watchState // Watch state of the resource.
ExpiryTimer *time.Timer // Timer for the expiry of the watch.
}
// state corresponding to a resource type.
type resourceTypeState struct {
version string // Last acked version. Should not be reset when the stream breaks.
nonce string // Last received nonce. Should be reset when the stream breaks.
bufferedRequests chan struct{} // Channel to buffer requests when writing is blocked.
subscribedResources map[string]*resourceWatchState // Map of subscribed resource names to their state.
pendingWrite bool // True if there is a pending write for this resource type.
}
// adsStreamImpl provides the functionality associated with an ADS (Aggregated
// Discovery Service) stream on the client side. It manages the lifecycle of the
// ADS stream, including creating the stream, sending requests, and handling
// responses. It also handles flow control and retries for the stream.
type adsStreamImpl struct {
// The following fields are initialized from arguments passed to the
// constructor and are read-only afterwards, and hence can be accessed
// without a mutex.
transport clients.Transport // Transport to use for ADS stream.
eventHandler adsStreamEventHandler // Callbacks into the xdsChannel.
backoff func(int) time.Duration // Backoff for retries, after stream failures.
nodeProto *v3corepb.Node // Identifies the gRPC application.
watchExpiryTimeout time.Duration // Resource watch expiry timeout
logger *igrpclog.PrefixLogger
// The following fields are initialized in the constructor and are not
// written to afterwards, and hence can be accessed without a mutex.
streamCh chan clients.Stream // New ADS streams are pushed here.
requestCh *buffer.Unbounded // Subscriptions and unsubscriptions are pushed here.
runnerDoneCh chan struct{} // Notify completion of runner goroutine.
cancel context.CancelFunc // To cancel the context passed to the runner goroutine.
// Guards access to the below fields (and to the contents of the map).
mu sync.Mutex
resourceTypeState map[ResourceType]*resourceTypeState // Map of resource types to their state.
fc *adsFlowControl // Flow control for ADS stream.
firstRequest bool // False after the first request is sent out.
}
// adsStreamOpts contains the options for creating a new ADS Stream.
type adsStreamOpts struct {
transport clients.Transport // xDS transport to create the stream on.
eventHandler adsStreamEventHandler // Callbacks for stream events.
backoff func(int) time.Duration // Backoff for retries, after stream failures.
nodeProto *v3corepb.Node // Node proto to identify the gRPC application.
watchExpiryTimeout time.Duration // Resource watch expiry timeout.
logPrefix string // Prefix to be used for log messages.
}
// newADSStreamImpl initializes a new adsStreamImpl instance using the given
// parameters. It also launches goroutines responsible for managing reads and
// writes for messages of the underlying stream.
func newADSStreamImpl(opts adsStreamOpts) *adsStreamImpl {
s := &adsStreamImpl{
transport: opts.transport,
eventHandler: opts.eventHandler,
backoff: opts.backoff,
nodeProto: opts.nodeProto,
watchExpiryTimeout: opts.watchExpiryTimeout,
streamCh: make(chan clients.Stream, 1),
requestCh: buffer.NewUnbounded(),
runnerDoneCh: make(chan struct{}),
resourceTypeState: make(map[ResourceType]*resourceTypeState),
}
l := grpclog.Component("xds")
s.logger = igrpclog.NewPrefixLogger(l, opts.logPrefix+fmt.Sprintf("[ads-stream %p] ", s))
ctx, cancel := context.WithCancel(context.Background())
s.cancel = cancel
go s.runner(ctx)
return s
}
// Stop blocks until the stream is closed and all spawned goroutines exit.
func (s *adsStreamImpl) Stop() {
s.cancel()
s.requestCh.Close()
<-s.runnerDoneCh
s.logger.Infof("Shutdown ADS stream")
}
// subscribe subscribes to the given resource. It is assumed that multiple
// subscriptions for the same resource is deduped at the caller. A discovery
// request is sent out on the underlying stream for the resource type when there
// is sufficient flow control quota.
func (s *adsStreamImpl) subscribe(typ ResourceType, name string) {
if s.logger.V(2) {
s.logger.Infof("Subscribing to resource %q of type %q", name, typ.TypeName)
}
s.mu.Lock()
defer s.mu.Unlock()
state, ok := s.resourceTypeState[typ]
if !ok {
// An entry in the type state map is created as part of the first
// subscription request for this type.
state = &resourceTypeState{
subscribedResources: make(map[string]*resourceWatchState),
bufferedRequests: make(chan struct{}, 1),
}
s.resourceTypeState[typ] = state
}
// Create state for the newly subscribed resource. The watch timer will
// be started when a request for this resource is actually sent out.
state.subscribedResources[name] = &resourceWatchState{State: resourceWatchStateStarted}
state.pendingWrite = true
// Send a request for the resource type with updated subscriptions.
s.requestCh.Put(typ)
}
// Unsubscribe cancels the subscription to the given resource. It is a no-op if
// the given resource does not exist. The watch expiry timer associated with the
// resource is stopped if one is active. A discovery request is sent out on the
// stream for the resource type when there is sufficient flow control quota.
func (s *adsStreamImpl) Unsubscribe(typ ResourceType, name string) {
if s.logger.V(2) {
s.logger.Infof("Unsubscribing to resource %q of type %q", name, typ.TypeName)
}
s.mu.Lock()
defer s.mu.Unlock()
state, ok := s.resourceTypeState[typ]
if !ok {
return
}
rs, ok := state.subscribedResources[name]
if !ok {
return
}
if rs.ExpiryTimer != nil {
rs.ExpiryTimer.Stop()
}
delete(state.subscribedResources, name)
state.pendingWrite = true
// Send a request for the resource type with updated subscriptions.
s.requestCh.Put(typ)
}
// runner is a long-running goroutine that handles the lifecycle of the ADS
// stream. It spwans another goroutine to handle writes of discovery request
// messages on the stream. Whenever an existing stream fails, it performs
// exponential backoff (if no messages were received on that stream) before
// creating a new stream.
func (s *adsStreamImpl) runner(ctx context.Context) {
defer close(s.runnerDoneCh)
go s.send(ctx)
runStreamWithBackoff := func() error {
stream, err := s.transport.NewStream(ctx, "/envoy.service.discovery.v3.AggregatedDiscoveryService/StreamAggregatedResources")
if err != nil {
s.logger.Warningf("Failed to create a new ADS streaming RPC: %v", err)
s.onError(err, false)
return nil
}
if s.logger.V(2) {
s.logger.Infof("ADS stream created")
}
s.mu.Lock()
// Flow control is a property of the underlying streaming RPC call and
// needs to be initialized everytime a new one is created.
s.fc = newADSFlowControl(s.logger)
s.firstRequest = true
s.mu.Unlock()
// Ensure that the most recently created stream is pushed on the
// channel for the `send` goroutine to consume.
select {
case <-s.streamCh:
default:
}
s.streamCh <- stream
// Backoff state is reset upon successful receipt of at least one
// message from the server.
if s.recv(ctx, stream) {
return backoff.ErrResetBackoff
}
return nil
}
backoff.RunF(ctx, runStreamWithBackoff, s.backoff)
}
// send is a long running goroutine that handles sending discovery requests for
// two scenarios:
// - a new subscription or unsubscription request is received
// - a new stream is created after the previous one failed
func (s *adsStreamImpl) send(ctx context.Context) {
// Stores the most recent stream instance received on streamCh.
var stream clients.Stream
for {
select {
case <-ctx.Done():
return
case stream = <-s.streamCh:
if err := s.sendExisting(stream); err != nil {
// Send failed, clear the current stream. Attempt to resend will
// only be made after a new stream is created.
stream = nil
continue
}
case req, ok := <-s.requestCh.Get():
if !ok {
return
}
s.requestCh.Load()
typ := req.(ResourceType)
if err := s.sendNew(stream, typ); err != nil {
stream = nil
continue
}
}
}
}
// sendNew attempts to send a discovery request based on a new subscription or
// unsubscription. If there is no flow control quota, the request is buffered
// and will be sent later. This method also starts the watch expiry timer for
// resources that were sent in the request for the first time, i.e. their watch
// state is `watchStateStarted`.
func (s *adsStreamImpl) sendNew(stream clients.Stream, typ ResourceType) error {
s.mu.Lock()
defer s.mu.Unlock()
// If there's no stream yet, skip the request. This request will be resent
// when a new stream is created. If no stream is created, the watcher will
// timeout (same as server not sending response back).
if stream == nil {
return nil
}
// If local processing of the most recently received response is not yet
// complete, i.e. fc.pending == true, queue this write and return early.
// This allows us to batch writes for requests which are generated as part
// of local processing of a received response.
state := s.resourceTypeState[typ]
if s.fc.pending.Load() {
select {
case state.bufferedRequests <- struct{}{}:
default:
}
return nil
}
return s.sendMessageIfWritePendingLocked(stream, typ, state)
}
// sendExisting sends out discovery requests for existing resources when
// recovering from a broken stream.
//
// The stream argument is guaranteed to be non-nil.
func (s *adsStreamImpl) sendExisting(stream clients.Stream) error {
s.mu.Lock()
defer s.mu.Unlock()
for typ, state := range s.resourceTypeState {
// Reset only the nonces map when the stream restarts.
//
// xDS spec says the following. See section:
// https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#ack-nack-and-resource-type-instance-version
//
// Note that the version for a resource type is not a property of an
// individual xDS stream but rather a property of the resources
// themselves. If the stream becomes broken and the client creates a new
// stream, the clients initial request on the new stream should
// indicate the most recent version seen by the client on the previous
// stream
state.nonce = ""
if len(state.subscribedResources) == 0 {
continue
}
state.pendingWrite = true
if err := s.sendMessageIfWritePendingLocked(stream, typ, state); err != nil {
return err
}
}
return nil
}
// sendBuffered sends out discovery requests for resources that were buffered
// when they were subscribed to, because local processing of the previously
// received response was not yet complete.
//
// The stream argument is guaranteed to be non-nil.
func (s *adsStreamImpl) sendBuffered(stream clients.Stream) error {
s.mu.Lock()
defer s.mu.Unlock()
for typ, state := range s.resourceTypeState {
select {
case <-state.bufferedRequests:
if err := s.sendMessageIfWritePendingLocked(stream, typ, state); err != nil {
return err
}
default:
// No buffered request.
continue
}
}
return nil
}
// sendMessageIfWritePendingLocked attempts to sends a discovery request to the
// server, if there is a pending write for the given resource type.
//
// If the request is successfully sent, the pending write field is cleared and
// watch timers are started for the resources in the request.
//
// Caller needs to hold c.mu.
func (s *adsStreamImpl) sendMessageIfWritePendingLocked(stream clients.Stream, typ ResourceType, state *resourceTypeState) error {
if !state.pendingWrite {
if s.logger.V(2) {
s.logger.Infof("Skipping sending request for type %q, because all subscribed resources were already sent", typ.TypeURL)
}
return nil
}
names := resourceNames(state.subscribedResources)
if err := s.sendMessageLocked(stream, names, typ.TypeURL, state.version, state.nonce, nil); err != nil {
return err
}
state.pendingWrite = false
// Drain the buffered requests channel because we just sent a request for this
// resource type.
select {
case <-state.bufferedRequests:
default:
}
s.startWatchTimersLocked(typ, names)
return nil
}
// sendMessageLocked sends a discovery request to the server, populating the
// different fields of the message with the given parameters. Returns a non-nil
// error if the request could not be sent.
//
// Caller needs to hold c.mu.
func (s *adsStreamImpl) sendMessageLocked(stream clients.Stream, names []string, url, version, nonce string, nackErr error) error {
req := &v3discoverypb.DiscoveryRequest{
ResourceNames: names,
TypeUrl: url,
VersionInfo: version,
ResponseNonce: nonce,
}
// The xDS protocol only requires that we send the node proto in the first
// discovery request on every stream. Sending the node proto in every
// request wastes CPU resources on the client and the server.
if s.firstRequest {
req.Node = s.nodeProto
}
if nackErr != nil {
req.ErrorDetail = &statuspb.Status{
Code: int32(cpb.Code_INVALID_ARGUMENT), Message: nackErr.Error(),
}
}
msg, err := proto.Marshal(req)
if err != nil {
s.logger.Warningf("Failed to marshal DiscoveryRequest: %v", err)
return err
}
if err := stream.Send(msg); err != nil {
s.logger.Warningf("Sending ADS request for type %q, resources: %v, version: %q, nonce: %q failed: %v", url, names, version, nonce, err)
return err
}
s.firstRequest = false
if s.logger.V(perRPCVerbosityLevel) {
s.logger.Infof("ADS request sent: %v", pretty.ToJSON(req))
} else if s.logger.V(2) {
s.logger.Warningf("ADS request sent for type %q, resources: %v, version: %q, nonce: %q", url, names, version, nonce)
}
return nil
}
// recv is responsible for receiving messages from the ADS stream.
//
// It performs the following actions:
// - Waits for local flow control to be available before sending buffered
// requests, if any.
// - Receives a message from the ADS stream. If an error is encountered here,
// it is handled by the onError method which propagates the error to all
// watchers.
// - Invokes the event handler's OnADSResponse method to process the message.
// - Sends an ACK or NACK to the server based on the response.
//
// It returns a boolean indicating whether at least one message was received
// from the server.
func (s *adsStreamImpl) recv(ctx context.Context, stream clients.Stream) bool {
msgReceived := false
for {
// Wait for ADS stream level flow control to be available, and send out
// a request if anything was buffered while we were waiting for local
// processing of the previous response to complete.
if !s.fc.wait(ctx) {
if s.logger.V(2) {
s.logger.Infof("ADS stream context canceled")
}
return msgReceived
}
s.sendBuffered(stream)
resources, url, version, nonce, err := s.recvMessage(stream)
if err != nil {
s.onError(err, msgReceived)
s.logger.Warningf("ADS stream closed: %v", err)
return msgReceived
}
msgReceived = true
// Invoke the onResponse event handler to parse the incoming message and
// decide whether to send an ACK or NACK.
resp := response{
resources: resources,
typeURL: url,
version: version,
}
var resourceNames []string
var nackErr error
s.fc.setPending()
resourceNames, nackErr = s.eventHandler.onResponse(resp, s.fc.onDone)
if xdsresource.ErrType(nackErr) == xdsresource.ErrorTypeResourceTypeUnsupported {
// A general guiding principle is that if the server sends
// something the client didn't actually subscribe to, then the
// client ignores it. Here, we have received a response with
// resources of a type that we don't know about.
//
// Sending a NACK doesn't really seem appropriate here, since we're
// not actually validating what the server sent and therefore don't
// know that it's invalid. But we shouldn't ACK either, because we
// don't know that it is valid.
s.logger.Warningf("%v", nackErr)
continue
}
s.onRecv(stream, resourceNames, url, version, nonce, nackErr)
}
}
func (s *adsStreamImpl) recvMessage(stream clients.Stream) (resources []*anypb.Any, url, version, nonce string, err error) {
r, err := stream.Recv()
if err != nil {
return nil, "", "", "", err
}
var resp v3discoverypb.DiscoveryResponse
if err := proto.Unmarshal(r, &resp); err != nil {
s.logger.Infof("Failed to unmarshal response to DiscoveryResponse: %v", err)
return nil, "", "", "", fmt.Errorf("unexpected message type %T", r)
}
if s.logger.V(perRPCVerbosityLevel) {
s.logger.Infof("ADS response received: %v", pretty.ToJSON(&resp))
} else if s.logger.V(2) {
s.logger.Infof("ADS response received for type %q, version %q, nonce %q", resp.GetTypeUrl(), resp.GetVersionInfo(), resp.GetNonce())
}
return resp.GetResources(), resp.GetTypeUrl(), resp.GetVersionInfo(), resp.GetNonce(), nil
}
// onRecv is invoked when a response is received from the server. The arguments
// passed to this method correspond to the most recently received response.
//
// It performs the following actions:
// - updates resource type specific state
// - updates resource specific state for resources in the response
// - sends an ACK or NACK to the server based on the response
func (s *adsStreamImpl) onRecv(stream clients.Stream, names []string, url, version, nonce string, nackErr error) {
s.mu.Lock()
defer s.mu.Unlock()
// Lookup the resource type specific state based on the type URL.
var typ ResourceType
for t := range s.resourceTypeState {
if t.TypeURL == url {
typ = t
break
}
}
typeState, ok := s.resourceTypeState[typ]
if !ok {
s.logger.Warningf("ADS stream received a response for type %q, but no state exists for it", url)
return
}
// Update the resource type specific state. This includes:
// - updating the nonce unconditionally
// - updating the version only if the response is to be ACKed
previousVersion := typeState.version
typeState.nonce = nonce
if nackErr == nil {
typeState.version = version
}
// Update the resource specific state. For all resources received as
// part of this response that are in state `started` or `requested`,
// this includes:
// - setting the watch state to watchstateReceived
// - stopping the expiry timer, if one exists
for _, name := range names {
rs, ok := typeState.subscribedResources[name]
if !ok {
s.logger.Warningf("ADS stream received a response for resource %q, but no state exists for it", name)
continue
}
if ws := rs.State; ws == resourceWatchStateStarted || ws == resourceWatchStateRequested {
rs.State = resourceWatchStateReceived
if rs.ExpiryTimer != nil {
rs.ExpiryTimer.Stop()
rs.ExpiryTimer = nil
}
}
}
// Send an ACK or NACK.
subscribedResourceNames := resourceNames(typeState.subscribedResources)
if nackErr != nil {
s.logger.Warningf("Sending NACK for resource type: %q, version: %q, nonce: %q, reason: %v", url, version, nonce, nackErr)
s.sendMessageLocked(stream, subscribedResourceNames, url, previousVersion, nonce, nackErr)
return
}
if s.logger.V(2) {
s.logger.Infof("Sending ACK for resource type: %q, version: %q, nonce: %q", url, version, nonce)
}
s.sendMessageLocked(stream, subscribedResourceNames, url, version, nonce, nil)
}
// onError is called when an error occurs on the ADS stream. It stops any
// outstanding resource timers and resets the watch state to started for any
// resources that were in the requested state. It also handles the case where
// the ADS stream was closed after receiving a response, which is not
// considered an error.
func (s *adsStreamImpl) onError(err error, msgReceived bool) {
// For resources that been requested but not yet responded to by the
// management server, stop the resource timers and reset the watch state to
// watchStateStarted. This is because we don't want the expiry timer to be
// running when we don't have a stream open to the management server.
s.mu.Lock()
for _, state := range s.resourceTypeState {
for _, rs := range state.subscribedResources {
if rs.State != resourceWatchStateRequested {
continue
}
if rs.ExpiryTimer != nil {
rs.ExpiryTimer.Stop()
rs.ExpiryTimer = nil
}
rs.State = resourceWatchStateStarted
}
}
s.mu.Unlock()
// Note that we do not consider it an error if the ADS stream was closed
// after having received a response on the stream. This is because there
// are legitimate reasons why the server may need to close the stream during
// normal operations, such as needing to rebalance load or the underlying
// connection hitting its max connection age limit.
// (see [gRFC A9](https://github.com/grpc/proposal/blob/master/A9-server-side-conn-mgt.md)).
if msgReceived {
err = xdsresource.NewError(xdsresource.ErrTypeStreamFailedAfterRecv, err.Error())
}
s.eventHandler.onStreamError(err)
}
// startWatchTimersLocked starts the expiry timers for the given resource names
// of the specified resource type. For each resource name, if the resource
// watch state is in the "started" state, it transitions the state to
// "requested" and starts an expiry timer. When the timer expires, the resource
// watch state is set to "timeout" and the event handler callback is called.
//
// The caller must hold the s.mu lock.
func (s *adsStreamImpl) startWatchTimersLocked(typ ResourceType, names []string) {
typeState := s.resourceTypeState[typ]
for _, name := range names {
resourceState, ok := typeState.subscribedResources[name]
if !ok {
continue
}
if resourceState.State != resourceWatchStateStarted {
continue
}
resourceState.State = resourceWatchStateRequested
rs := resourceState
resourceState.ExpiryTimer = time.AfterFunc(s.watchExpiryTimeout, func() {
s.mu.Lock()
rs.State = resourceWatchStateTimeout
rs.ExpiryTimer = nil
s.mu.Unlock()
s.eventHandler.onWatchExpiry(typ, name)
})
}
}
func resourceNames(m map[string]*resourceWatchState) []string {
ret := make([]string, len(m))
idx := 0
for name := range m {
ret[idx] = name
idx++
}
return ret
}
// adsFlowControl implements ADS stream level flow control that enables the
// transport to block the reading of the next message off of the stream until
// the previous update is consumed by all watchers.
//
// The lifetime of the flow control is tied to the lifetime of the stream.
type adsFlowControl struct {
logger *igrpclog.PrefixLogger
// Whether the most recent update is pending consumption by all watchers.
pending atomic.Bool
// Channel used to notify when all the watchers have consumed the most
// recent update. Wait() blocks on reading a value from this channel.
readyCh chan struct{}
}
// newADSFlowControl returns a new adsFlowControl.
func newADSFlowControl(logger *igrpclog.PrefixLogger) *adsFlowControl {
return &adsFlowControl{
logger: logger,
readyCh: make(chan struct{}, 1),
}
}
// setPending changes the internal state to indicate that there is an update
// pending consumption by all watchers.
func (fc *adsFlowControl) setPending() {
fc.pending.Store(true)
}
// wait blocks until all the watchers have consumed the most recent update and
// returns true. If the context expires before that, it returns false.
func (fc *adsFlowControl) wait(ctx context.Context) bool {
// If there is no pending update, there is no need to block.
if !fc.pending.Load() {
// If all watchers finished processing the most recent update before the
// `recv` goroutine made the next call to `Wait()`, there would be an
// entry in the readyCh channel that needs to be drained to ensure that
// the next call to `Wait()` doesn't unblock before it actually should.
select {
case <-fc.readyCh:
default:
}
return true
}
select {
case <-ctx.Done():
return false
case <-fc.readyCh:
return true
}
}
// onDone indicates that all watchers have consumed the most recent update.
func (fc *adsFlowControl) onDone() {
select {
// Writes to the readyCh channel should not block ideally. The default
// branch here is to appease the paranoid mind.
case fc.readyCh <- struct{}{}:
default:
if fc.logger.V(2) {
fc.logger.Infof("ADS stream flow control readyCh is full")
}
}
fc.pending.Store(false)
}

View File

@ -0,0 +1,316 @@
/*
*
* Copyright 2025 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package xdsclient
import (
"errors"
"fmt"
"strings"
"time"
"google.golang.org/grpc/grpclog"
igrpclog "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/xds/internal/clients"
"google.golang.org/grpc/xds/internal/clients/internal"
"google.golang.org/grpc/xds/internal/clients/internal/backoff"
"google.golang.org/grpc/xds/internal/clients/internal/syncutil"
"google.golang.org/grpc/xds/internal/clients/xdsclient/internal/xdsresource"
)
const (
clientFeatureNoOverprovisioning = "envoy.lb.does_not_support_overprovisioning"
clientFeatureResourceWrapper = "xds.config.resource-in-sotw"
)
// xdsChannelEventHandler wraps callbacks used to notify the xDS client about
// events on the xdsChannel. Methods in this interface may be invoked
// concurrently and the xDS client implementation needs to handle them in a
// thread-safe manner.
type xdsChannelEventHandler interface {
// adsStreamFailure is called when the xdsChannel encounters an ADS stream
// failure.
adsStreamFailure(error)
// adsResourceUpdate is called when the xdsChannel receives an ADS response
// from the xDS management server. The callback is provided with the
// following:
// - the resource type of the resources in the response
// - a map of resources in the response, keyed by resource name
// - the metadata associated with the response
// - a callback to be invoked when the updated is processed
adsResourceUpdate(ResourceType, map[string]dataAndErrTuple, xdsresource.UpdateMetadata, func())
// adsResourceDoesNotExist is called when the xdsChannel determines that a
// requested ADS resource does not exist.
adsResourceDoesNotExist(ResourceType, string)
}
// xdsChannelOpts holds the options for creating a new xdsChannel.
type xdsChannelOpts struct {
transport clients.Transport // Takes ownership of this transport.
serverConfig *ServerConfig // Configuration of the server to connect to.
clientConfig *Config // Complete xDS client configuration, used to decode resources.
eventHandler xdsChannelEventHandler // Callbacks for ADS stream events.
backoff func(int) time.Duration // Backoff function to use for stream retries. Defaults to exponential backoff, if unset.
watchExpiryTimeout time.Duration // Timeout for ADS resource watch expiry.
logPrefix string // Prefix to use for logging.
}
// newXDSChannel creates a new xdsChannel instance with the provided options.
// It performs basic validation on the provided options and initializes the
// xdsChannel with the necessary components.
func newXDSChannel(opts xdsChannelOpts) (*xdsChannel, error) {
switch {
case opts.transport == nil:
return nil, errors.New("xdsclient: transport is nil")
case opts.serverConfig == nil:
return nil, errors.New("xdsclient: serverConfig is nil")
case opts.clientConfig == nil:
return nil, errors.New("xdsclient: clientConfig is nil")
case opts.eventHandler == nil:
return nil, errors.New("xdsclient: eventHandler is nil")
}
xc := &xdsChannel{
transport: opts.transport,
serverConfig: opts.serverConfig,
clientConfig: opts.clientConfig,
eventHandler: opts.eventHandler,
closed: syncutil.NewEvent(),
}
l := grpclog.Component("xds")
logPrefix := opts.logPrefix + fmt.Sprintf("[xds-channel %p] ", xc)
xc.logger = igrpclog.NewPrefixLogger(l, logPrefix)
if opts.backoff == nil {
opts.backoff = backoff.DefaultExponential.Backoff
}
np := internal.NodeProto(opts.clientConfig.Node)
np.ClientFeatures = []string{clientFeatureNoOverprovisioning, clientFeatureResourceWrapper}
xc.ads = newADSStreamImpl(adsStreamOpts{
transport: opts.transport,
eventHandler: xc,
backoff: opts.backoff,
nodeProto: np,
watchExpiryTimeout: opts.watchExpiryTimeout,
logPrefix: logPrefix,
})
if xc.logger.V(2) {
xc.logger.Infof("xdsChannel is created for ServerConfig %v", opts.serverConfig)
}
return xc, nil
}
// xdsChannel represents a client channel to a management server, and is
// responsible for managing the lifecycle of the ADS and LRS streams. It invokes
// callbacks on the registered event handler for various ADS stream events.
//
// It is safe for concurrent use.
type xdsChannel struct {
// The following fields are initialized at creation time and are read-only
// after that, and hence need not be guarded by a mutex.
transport clients.Transport // Takes ownership of this transport (used to make streaming calls).
ads *adsStreamImpl // An ADS stream to the management server.
serverConfig *ServerConfig // Configuration of the server to connect to.
clientConfig *Config // Complete xDS client configuration, used to decode resources.
eventHandler xdsChannelEventHandler // Callbacks for ADS stream events.
logger *igrpclog.PrefixLogger // Logger to use for logging.
closed *syncutil.Event // Fired when the channel is closed.
}
func (xc *xdsChannel) close() {
xc.closed.Fire()
xc.ads.Stop()
xc.transport.Close()
xc.logger.Infof("Shutdown")
}
// subscribe adds a subscription for the given resource name of the given
// resource type on the ADS stream.
func (xc *xdsChannel) subscribe(typ ResourceType, name string) {
if xc.closed.HasFired() {
if xc.logger.V(2) {
xc.logger.Infof("Attempt to subscribe to an xDS resource of type %s and name %q on a closed channel", typ.TypeName, name)
}
return
}
xc.ads.subscribe(typ, name)
}
// unsubscribe removes the subscription for the given resource name of the given
// resource type from the ADS stream.
func (xc *xdsChannel) unsubscribe(typ ResourceType, name string) {
if xc.closed.HasFired() {
if xc.logger.V(2) {
xc.logger.Infof("Attempt to unsubscribe to an xDS resource of type %s and name %q on a closed channel", typ.TypeName, name)
}
return
}
xc.ads.Unsubscribe(typ, name)
}
// The following onADSXxx() methods implement the StreamEventHandler interface
// and are invoked by the ADS stream implementation.
// onStreamError is invoked when an error occurs on the ADS stream. It
// propagates the update to the xDS client.
func (xc *xdsChannel) onStreamError(err error) {
if xc.closed.HasFired() {
if xc.logger.V(2) {
xc.logger.Infof("Received ADS stream error on a closed xdsChannel: %v", err)
}
return
}
xc.eventHandler.adsStreamFailure(err)
}
// onWatchExpiry is invoked when a watch for a resource expires. It
// propagates the update to the xDS client.
func (xc *xdsChannel) onWatchExpiry(typ ResourceType, name string) {
if xc.closed.HasFired() {
if xc.logger.V(2) {
xc.logger.Infof("Received ADS resource watch expiry for resource %q on a closed xdsChannel", name)
}
return
}
xc.eventHandler.adsResourceDoesNotExist(typ, name)
}
// onResponse is invoked when a response is received on the ADS stream. It
// decodes the resources in the response, and propagates the updates to the xDS
// client.
//
// It returns the list of resource names in the response and any errors
// encountered during decoding.
func (xc *xdsChannel) onResponse(resp response, onDone func()) ([]string, error) {
if xc.closed.HasFired() {
if xc.logger.V(2) {
xc.logger.Infof("Received an update from the ADS stream on closed ADS stream")
}
return nil, errors.New("xdsChannel is closed")
}
// Lookup the resource parser based on the resource type.
rType, ok := xc.clientConfig.ResourceTypes[resp.typeURL]
if !ok {
return nil, xdsresource.NewErrorf(xdsresource.ErrorTypeResourceTypeUnsupported, "Resource type URL %q unknown in response from server", resp.typeURL)
}
// Decode the resources and build the list of resource names to return.
opts := &DecodeOptions{
Config: xc.clientConfig,
ServerConfig: xc.serverConfig,
}
updates, md, err := decodeResponse(opts, &rType, resp)
var names []string
for name := range updates {
names = append(names, name)
}
xc.eventHandler.adsResourceUpdate(rType, updates, md, onDone)
return names, err
}
// decodeResponse decodes the resources in the given ADS response.
//
// The opts parameter provides configuration options for decoding the resources.
// The rType parameter specifies the resource type parser to use for decoding
// the resources.
//
// The returned map contains a key for each resource in the response, with the
// value being either the decoded resource data or an error if decoding failed.
// The returned metadata includes the version of the response, the timestamp of
// the update, and the status of the update (ACKed or NACKed).
//
// If there are any errors decoding the resources, the metadata will indicate
// that the update was NACKed, and the returned error will contain information
// about all errors encountered by this function.
func decodeResponse(opts *DecodeOptions, rType *ResourceType, resp response) (map[string]dataAndErrTuple, xdsresource.UpdateMetadata, error) {
timestamp := time.Now()
md := xdsresource.UpdateMetadata{
Version: resp.version,
Timestamp: timestamp,
}
topLevelErrors := make([]error, 0) // Tracks deserialization errors, where we don't have a resource name.
perResourceErrors := make(map[string]error) // Tracks resource validation errors, where we have a resource name.
ret := make(map[string]dataAndErrTuple) // Return result, a map from resource name to either resource data or error.
for _, r := range resp.resources {
result, err := rType.Decoder.Decode(r.GetValue(), *opts)
// Name field of the result is left unpopulated only when resource
// deserialization fails.
name := ""
if result != nil {
name = xdsresource.ParseName(result.Name).String()
}
if err == nil {
ret[name] = dataAndErrTuple{Resource: result.Resource}
continue
}
if name == "" {
topLevelErrors = append(topLevelErrors, err)
continue
}
perResourceErrors[name] = err
// Add place holder in the map so we know this resource name was in
// the response.
ret[name] = dataAndErrTuple{Err: xdsresource.NewError(xdsresource.ErrorTypeNACKed, err.Error())}
}
if len(topLevelErrors) == 0 && len(perResourceErrors) == 0 {
md.Status = xdsresource.ServiceStatusACKed
return ret, md, nil
}
md.Status = xdsresource.ServiceStatusNACKed
errRet := combineErrors(rType.TypeName, topLevelErrors, perResourceErrors)
md.ErrState = &xdsresource.UpdateErrorMetadata{
Version: resp.version,
Err: xdsresource.NewError(xdsresource.ErrorTypeNACKed, errRet.Error()),
Timestamp: timestamp,
}
return ret, md, errRet
}
func combineErrors(rType string, topLevelErrors []error, perResourceErrors map[string]error) error {
var errStrB strings.Builder
errStrB.WriteString(fmt.Sprintf("error parsing %q response: ", rType))
if len(topLevelErrors) > 0 {
errStrB.WriteString("top level errors: ")
for i, err := range topLevelErrors {
if i != 0 {
errStrB.WriteString(";\n")
}
errStrB.WriteString(err.Error())
}
}
if len(perResourceErrors) > 0 {
var i int
for name, err := range perResourceErrors {
if i != 0 {
errStrB.WriteString(";\n")
}
i++
errStrB.WriteString(fmt.Sprintf("resource %q: %v", name, err.Error()))
}
}
return errors.New(errStrB.String())
}

View File

@ -0,0 +1,783 @@
/*
*
* Copyright 2025 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package xdsclient
import (
"context"
"fmt"
"net"
"strings"
"testing"
"time"
"github.com/envoyproxy/go-control-plane/pkg/wellknown"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/google/uuid"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/xds/internal/clients"
"google.golang.org/grpc/xds/internal/clients/grpctransport"
"google.golang.org/grpc/xds/internal/clients/internal/testutils"
"google.golang.org/grpc/xds/internal/clients/internal/testutils/e2e"
"google.golang.org/grpc/xds/internal/clients/internal/testutils/fakeserver"
"google.golang.org/grpc/xds/internal/clients/xdsclient/internal/xdsresource"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/anypb"
v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
v3routerpb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/router/v3"
v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
)
// xdsChannelForTest creates an xdsChannel to the specified serverURI for
// testing purposes.
func xdsChannelForTest(t *testing.T, serverURI, nodeID string, watchExpiryTimeout time.Duration) *xdsChannel {
t.Helper()
// Create a grpc transport to the above management server.
si := clients.ServerIdentifier{
ServerURI: serverURI,
Extensions: grpctransport.ServerIdentifierExtension{Credentials: insecure.NewBundle()},
}
tr, err := (&grpctransport.Builder{}).Build(si)
if err != nil {
t.Fatalf("Failed to create a transport for server config %v: %v", si, err)
}
serverCfg := ServerConfig{
ServerIdentifier: si,
}
clientConfig := Config{
Servers: []ServerConfig{serverCfg},
Node: clients.Node{ID: nodeID},
ResourceTypes: map[string]ResourceType{xdsresource.V3ListenerURL: listenerType},
}
// Create an xdsChannel that uses everything set up above.
xc, err := newXDSChannel(xdsChannelOpts{
transport: tr,
serverConfig: &serverCfg,
clientConfig: &clientConfig,
eventHandler: newTestEventHandler(),
watchExpiryTimeout: watchExpiryTimeout,
})
if err != nil {
t.Fatalf("Failed to create xdsChannel: %v", err)
}
t.Cleanup(func() { xc.close() })
return xc
}
// verifyUpdateAndMetadata verifies that the event handler received the expected
// updates and metadata. It checks that the received resource type matches the
// expected type, and that the received updates and metadata match the expected
// values. The function ignores the timestamp fields in the metadata, as those
// are expected to be different.
func verifyUpdateAndMetadata(ctx context.Context, t *testing.T, eh *testEventHandler, wantUpdates map[string]dataAndErrTuple, wantMD xdsresource.UpdateMetadata) {
t.Helper()
gotTyp, gotUpdates, gotMD, err := eh.waitForUpdate(ctx)
if err != nil {
t.Fatalf("Timeout when waiting for update callback to be invoked on the event handler")
}
if gotTyp != listenerType {
t.Fatalf("Got resource type %v, want %v", gotTyp, listenerType)
}
opts := cmp.Options{
protocmp.Transform(),
cmpopts.EquateEmpty(),
cmpopts.EquateErrors(),
cmpopts.IgnoreFields(xdsresource.UpdateMetadata{}, "Timestamp"),
cmpopts.IgnoreFields(xdsresource.UpdateErrorMetadata{}, "Timestamp"),
}
if diff := cmp.Diff(wantUpdates, gotUpdates, opts); diff != "" {
t.Fatalf("Got unexpected diff in update (-want +got):\n%s\n want: %+v\n got: %+v", diff, wantUpdates, gotUpdates)
}
if diff := cmp.Diff(wantMD, gotMD, opts); diff != "" {
t.Fatalf("Got unexpected diff in update (-want +got):\n%s\n want: %v\n got: %v", diff, wantMD, gotMD)
}
}
// Tests different failure cases when creating a new xdsChannel. It checks that
// the xdsChannel creation fails when any of the required options (transport,
// serverConfig, bootstrapConfig, or resourceTypeGetter) are missing or nil.
func (s) TestChannel_New_FailureCases(t *testing.T) {
type fakeTransport struct {
clients.Transport
}
tests := []struct {
name string
opts xdsChannelOpts
wantErrStr string
}{
{
name: "emptyTransport",
opts: xdsChannelOpts{},
wantErrStr: "transport is nil",
},
{
name: "emptyServerConfig",
opts: xdsChannelOpts{transport: &fakeTransport{}},
wantErrStr: "serverConfig is nil",
},
{
name: "emptyCConfig",
opts: xdsChannelOpts{
transport: &fakeTransport{},
serverConfig: &ServerConfig{},
},
wantErrStr: "clientConfig is nil",
},
{
name: "emptyEventHandler",
opts: xdsChannelOpts{
transport: &fakeTransport{},
serverConfig: &ServerConfig{},
clientConfig: &Config{},
},
wantErrStr: "eventHandler is nil",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
if _, err := newXDSChannel(test.opts); err == nil || !strings.Contains(err.Error(), test.wantErrStr) {
t.Fatalf("newXDSChannel() = %v, want %q", err, test.wantErrStr)
}
})
}
}
// Tests different scenarios of the xdsChannel receiving a response from the
// management server. In all scenarios, the xdsChannel is expected to pass the
// received responses as-is to the resource parsing functionality specified by
// the resourceTypeGetter.
func (s) TestChannel_ADS_HandleResponseFromManagementServer(t *testing.T) {
const (
listenerName1 = "listener-name-1"
listenerName2 = "listener-name-2"
routeName = "route-name"
clusterName = "cluster-name"
)
var (
badlyMarshaledResource = &anypb.Any{
TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener",
Value: []byte{1, 2, 3, 4},
}
apiListener = &v3listenerpb.ApiListener{
ApiListener: testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{
RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{
RouteConfig: &v3routepb.RouteConfiguration{
Name: routeName},
},
}),
}
listener1 = testutils.MarshalAny(t, &v3listenerpb.Listener{
Name: listenerName1,
ApiListener: apiListener,
})
listener2 = testutils.MarshalAny(t, &v3listenerpb.Listener{
Name: listenerName2,
ApiListener: apiListener,
})
)
tests := []struct {
desc string
resourceNamesToRequest []string
managementServerResponse *v3discoverypb.DiscoveryResponse
wantUpdates map[string]dataAndErrTuple
wantMD xdsresource.UpdateMetadata
wantErr error
}{
{
desc: "one bad resource - deserialization failure",
resourceNamesToRequest: []string{listenerName1},
managementServerResponse: &v3discoverypb.DiscoveryResponse{
VersionInfo: "0",
TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener",
Resources: []*anypb.Any{badlyMarshaledResource},
},
wantUpdates: nil, // No updates expected as the response runs into unmarshaling errors.
wantMD: xdsresource.UpdateMetadata{
Status: xdsresource.ServiceStatusNACKed,
Version: "0",
ErrState: &xdsresource.UpdateErrorMetadata{
Version: "0",
Err: cmpopts.AnyError,
},
},
wantErr: cmpopts.AnyError,
},
{
desc: "one bad resource - validation failure",
resourceNamesToRequest: []string{listenerName1},
managementServerResponse: &v3discoverypb.DiscoveryResponse{
VersionInfo: "0",
TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener",
Resources: []*anypb.Any{testutils.MarshalAny(t, &v3listenerpb.Listener{
Name: listenerName1,
ApiListener: &v3listenerpb.ApiListener{
ApiListener: testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{
RouteSpecifier: &v3httppb.HttpConnectionManager_ScopedRoutes{},
}),
},
})},
},
wantUpdates: map[string]dataAndErrTuple{
listenerName1: {
Err: cmpopts.AnyError,
},
},
wantMD: xdsresource.UpdateMetadata{
Status: xdsresource.ServiceStatusNACKed,
Version: "0",
ErrState: &xdsresource.UpdateErrorMetadata{
Version: "0",
Err: cmpopts.AnyError,
},
},
},
{
desc: "two bad resources",
resourceNamesToRequest: []string{listenerName1, listenerName2},
managementServerResponse: &v3discoverypb.DiscoveryResponse{
VersionInfo: "0",
TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener",
Resources: []*anypb.Any{
badlyMarshaledResource,
testutils.MarshalAny(t, &v3listenerpb.Listener{
Name: listenerName2,
ApiListener: &v3listenerpb.ApiListener{
ApiListener: testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{
RouteSpecifier: &v3httppb.HttpConnectionManager_ScopedRoutes{},
}),
},
}),
},
},
wantUpdates: map[string]dataAndErrTuple{
listenerName2: {
Err: cmpopts.AnyError,
},
},
wantMD: xdsresource.UpdateMetadata{
Status: xdsresource.ServiceStatusNACKed,
Version: "0",
ErrState: &xdsresource.UpdateErrorMetadata{
Version: "0",
Err: cmpopts.AnyError,
},
},
},
{
desc: "one good resource",
resourceNamesToRequest: []string{listenerName1},
managementServerResponse: &v3discoverypb.DiscoveryResponse{
VersionInfo: "0",
TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener",
Resources: []*anypb.Any{listener1},
},
wantUpdates: map[string]dataAndErrTuple{
listenerName1: {
Resource: &listenerResourceData{Resource: listenerUpdate{
RouteConfigName: routeName,
Raw: listener1.GetValue(),
}},
},
},
wantMD: xdsresource.UpdateMetadata{
Status: xdsresource.ServiceStatusACKed,
Version: "0",
},
},
{
desc: "one good and one bad - deserialization failure",
resourceNamesToRequest: []string{listenerName1, listenerName2},
managementServerResponse: &v3discoverypb.DiscoveryResponse{
VersionInfo: "0",
TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener",
Resources: []*anypb.Any{
badlyMarshaledResource,
listener2,
},
},
wantUpdates: map[string]dataAndErrTuple{
listenerName2: {
Resource: &listenerResourceData{Resource: listenerUpdate{
RouteConfigName: routeName,
Raw: listener2.GetValue(),
}},
},
},
wantMD: xdsresource.UpdateMetadata{
Status: xdsresource.ServiceStatusNACKed,
Version: "0",
ErrState: &xdsresource.UpdateErrorMetadata{
Version: "0",
Err: cmpopts.AnyError,
},
},
},
{
desc: "one good and one bad - validation failure",
resourceNamesToRequest: []string{listenerName1, listenerName2},
managementServerResponse: &v3discoverypb.DiscoveryResponse{
VersionInfo: "0",
TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener",
Resources: []*anypb.Any{
testutils.MarshalAny(t, &v3listenerpb.Listener{
Name: listenerName1,
ApiListener: &v3listenerpb.ApiListener{
ApiListener: testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{
RouteSpecifier: &v3httppb.HttpConnectionManager_ScopedRoutes{},
}),
},
}),
listener2,
},
},
wantUpdates: map[string]dataAndErrTuple{
listenerName1: {Err: cmpopts.AnyError},
listenerName2: {
Resource: &listenerResourceData{Resource: listenerUpdate{
RouteConfigName: routeName,
Raw: listener2.GetValue(),
}},
},
},
wantMD: xdsresource.UpdateMetadata{
Status: xdsresource.ServiceStatusNACKed,
Version: "0",
ErrState: &xdsresource.UpdateErrorMetadata{
Version: "0",
Err: cmpopts.AnyError,
},
},
},
{
desc: "two good resources",
resourceNamesToRequest: []string{listenerName1, listenerName2},
managementServerResponse: &v3discoverypb.DiscoveryResponse{
VersionInfo: "0",
TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener",
Resources: []*anypb.Any{listener1, listener2},
},
wantUpdates: map[string]dataAndErrTuple{
listenerName1: {
Resource: &listenerResourceData{Resource: listenerUpdate{
RouteConfigName: routeName,
Raw: listener1.GetValue(),
}},
},
listenerName2: {
Resource: &listenerResourceData{Resource: listenerUpdate{
RouteConfigName: routeName,
Raw: listener2.GetValue(),
}},
},
},
wantMD: xdsresource.UpdateMetadata{
Status: xdsresource.ServiceStatusACKed,
Version: "0",
},
},
{
desc: "two resources when we requested one",
resourceNamesToRequest: []string{listenerName1},
managementServerResponse: &v3discoverypb.DiscoveryResponse{
VersionInfo: "0",
TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener",
Resources: []*anypb.Any{listener1, listener2},
},
wantUpdates: map[string]dataAndErrTuple{
listenerName1: {
Resource: &listenerResourceData{Resource: listenerUpdate{
RouteConfigName: routeName,
Raw: listener1.GetValue(),
}},
},
listenerName2: {
Resource: &listenerResourceData{Resource: listenerUpdate{
RouteConfigName: routeName,
Raw: listener2.GetValue(),
}},
},
},
wantMD: xdsresource.UpdateMetadata{
Status: xdsresource.ServiceStatusACKed,
Version: "0",
},
},
}
for _, test := range tests {
t.Run(test.desc, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
// Start a fake xDS management server and configure the response it
// would send to its client.
mgmtServer, cleanup, err := fakeserver.StartServer(nil)
if err != nil {
t.Fatalf("Failed to start fake xDS server: %v", err)
}
defer cleanup()
t.Logf("Started xDS management server on %s", mgmtServer.Address)
mgmtServer.XDSResponseChan <- &fakeserver.Response{Resp: test.managementServerResponse}
// Create an xdsChannel for the test with a long watch expiry timer
// to ensure that watches don't expire for the duration of the test.
nodeID := uuid.New().String()
xc := xdsChannelForTest(t, mgmtServer.Address, nodeID, 2*defaultTestTimeout)
defer xc.close()
// Subscribe to the resources specified in the test table.
for _, name := range test.resourceNamesToRequest {
xc.subscribe(listenerType, name)
}
// Wait for an update callback on the event handler and verify the
// contents of the update and the metadata.
verifyUpdateAndMetadata(ctx, t, xc.eventHandler.(*testEventHandler), test.wantUpdates, test.wantMD)
})
}
}
// Tests that the xdsChannel correctly handles the expiry of a watch for a
// resource by ensuring that the watch expiry callback is invoked on the event
// handler with the expected resource type and name.
func (s) TestChannel_ADS_HandleResponseWatchExpiry(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
// Start an xDS management server, but do not configure any resources on it.
// This will result in the watch for a resource to timeout.
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{})
// Create an xdsChannel for the test with a short watch expiry timer to
// ensure that the test does not run very long, as it needs to wait for the
// watch to expire.
nodeID := uuid.New().String()
xc := xdsChannelForTest(t, mgmtServer.Address, nodeID, 2*defaultTestShortTimeout)
defer xc.close()
// Subscribe to a listener resource.
const listenerName = "listener-name"
xc.subscribe(listenerType, listenerName)
// Wait for the watch expiry callback on the authority to be invoked and
// verify that the watch expired for the expected resource name and type.
eventHandler := xc.eventHandler.(*testEventHandler)
gotTyp, gotName, err := eventHandler.waitForResourceDoesNotExist(ctx)
if err != nil {
t.Fatal("Timeout when waiting for the watch expiry callback to be invoked on the xDS client")
}
if gotTyp != listenerType {
t.Fatalf("Got type %v, want %v", gotTyp, listenerType)
}
if gotName != listenerName {
t.Fatalf("Got name %v, want %v", gotName, listenerName)
}
}
// Tests that the xdsChannel correctly handles stream failures by ensuring that
// the stream failure callback is invoked on the event handler.
func (s) TestChannel_ADS_StreamFailure(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 20000*defaultTestTimeout)
defer cancel()
// Start an xDS management server with a restartable listener to simulate
// connection failures.
l, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("net.Listen() failed: %v", err)
}
lis := testutils.NewRestartableListener(l)
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{Listener: lis})
// Configure a listener resource on the management server.
const listenerResourceName = "test-listener-resource"
const routeConfigurationName = "test-route-configuration-resource"
nodeID := uuid.New().String()
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(listenerResourceName, routeConfigurationName)},
SkipValidation: true,
}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}
// Create an xdsChannel for the test with a long watch expiry timer
// to ensure that watches don't expire for the duration of the test.
xc := xdsChannelForTest(t, mgmtServer.Address, nodeID, 2000*defaultTestTimeout)
defer xc.close()
// Subscribe to the resource created above.
xc.subscribe(listenerType, listenerResourceName)
// Wait for an update callback on the event handler and verify the
// contents of the update and the metadata.
hcm := testutils.MarshalAny(t, &v3httppb.HttpConnectionManager{
RouteSpecifier: &v3httppb.HttpConnectionManager_Rds{Rds: &v3httppb.Rds{
ConfigSource: &v3corepb.ConfigSource{
ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{Ads: &v3corepb.AggregatedConfigSource{}},
},
RouteConfigName: routeConfigurationName,
}},
HttpFilters: []*v3httppb.HttpFilter{e2e.HTTPFilter("router", &v3routerpb.Router{})},
})
listenerResource, err := anypb.New(&v3listenerpb.Listener{
Name: listenerResourceName,
ApiListener: &v3listenerpb.ApiListener{ApiListener: hcm},
FilterChains: []*v3listenerpb.FilterChain{{
Name: "filter-chain-name",
Filters: []*v3listenerpb.Filter{{
Name: wellknown.HTTPConnectionManager,
ConfigType: &v3listenerpb.Filter_TypedConfig{TypedConfig: hcm},
}},
}},
})
if err != nil {
t.Fatalf("Failed to create listener resource: %v", err)
}
wantUpdates := map[string]dataAndErrTuple{
listenerResourceName: {
Resource: &listenerResourceData{
Resource: listenerUpdate{
RouteConfigName: routeConfigurationName,
Raw: listenerResource.GetValue(),
},
},
},
}
wantMD := xdsresource.UpdateMetadata{
Status: xdsresource.ServiceStatusACKed,
Version: "1",
}
eventHandler := xc.eventHandler.(*testEventHandler)
verifyUpdateAndMetadata(ctx, t, eventHandler, wantUpdates, wantMD)
lis.Stop()
if err := eventHandler.waitForStreamFailure(ctx); err != nil {
t.Fatalf("Timeout when waiting for the stream failure callback to be invoked on the xDS client: %v", err)
}
}
// Tests the behavior of the xdsChannel when a resource is unsubscribed.
// Verifies that when a previously subscribed resource is unsubscribed, a
// request is sent without the previously subscribed resource name.
func (s) TestChannel_ADS_ResourceUnsubscribe(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
// Start an xDS management server that uses a channel to inform the test
// about the specific LDS resource names being requested.
ldsResourcesCh := make(chan []string, 1)
mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{
OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error {
t.Logf("Received request for resources: %v of type %s", req.GetResourceNames(), req.GetTypeUrl())
if req.TypeUrl != xdsresource.V3ListenerURL {
return fmt.Errorf("unexpected resource type URL: %q", req.TypeUrl)
}
// Make the most recently requested names available to the test.
ldsResourcesCh <- req.GetResourceNames()
return nil
},
})
// Configure two listener resources on the management server.
const listenerResourceName1 = "test-listener-resource-1"
const routeConfigurationName1 = "test-route-configuration-resource-1"
const listenerResourceName2 = "test-listener-resource-2"
const routeConfigurationName2 = "test-route-configuration-resource-2"
nodeID := uuid.New().String()
resources := e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{
e2e.DefaultClientListener(listenerResourceName1, routeConfigurationName1),
e2e.DefaultClientListener(listenerResourceName2, routeConfigurationName2),
},
SkipValidation: true,
}
if err := mgmtServer.Update(ctx, resources); err != nil {
t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err)
}
// Create an xdsChannel for the test with a long watch expiry timer
// to ensure that watches don't expire for the duration of the test.
xc := xdsChannelForTest(t, mgmtServer.Address, nodeID, 2*defaultTestTimeout)
defer xc.close()
// Subscribe to the resources created above and verify that a request is
// sent for the same.
xc.subscribe(listenerType, listenerResourceName1)
xc.subscribe(listenerType, listenerResourceName2)
if err := waitForResourceNames(ctx, ldsResourcesCh, []string{listenerResourceName1, listenerResourceName2}); err != nil {
t.Fatal(err)
}
// Wait for the above resources to be ACKed.
if err := waitForResourceNames(ctx, ldsResourcesCh, []string{listenerResourceName1, listenerResourceName2}); err != nil {
t.Fatal(err)
}
// Unsubscribe to one of the resources created above, and ensure that the
// other resource is still being requested.
xc.unsubscribe(listenerType, listenerResourceName1)
if err := waitForResourceNames(ctx, ldsResourcesCh, []string{listenerResourceName2}); err != nil {
t.Fatal(err)
}
// Since the version on the management server for the above resource is not
// changed, we will not receive an update from it for the one resource that
// we are still requesting.
// Unsubscribe to the remaining resource, and ensure that no more resources
// are being requested.
xc.unsubscribe(listenerType, listenerResourceName2)
if err := waitForResourceNames(ctx, ldsResourcesCh, []string{}); err != nil {
t.Fatal(err)
}
}
// waitForResourceNames waits for the wantNames to be received on namesCh.
// Returns a non-nil error if the context expires before that.
func waitForResourceNames(ctx context.Context, namesCh chan []string, wantNames []string) error {
var lastRequestedNames []string
for ; ; <-time.After(defaultTestShortTimeout) {
select {
case <-ctx.Done():
return fmt.Errorf("timeout waiting for resources %v to be requested from the management server. Last requested resources: %v", wantNames, lastRequestedNames)
case gotNames := <-namesCh:
if cmp.Equal(gotNames, wantNames, cmpopts.EquateEmpty(), cmpopts.SortSlices(func(s1, s2 string) bool { return s1 < s2 })) {
return nil
}
lastRequestedNames = gotNames
}
}
}
// newTestEventHandler creates a new testEventHandler instance with the
// necessary channels for testing the xdsChannel.
func newTestEventHandler() *testEventHandler {
return &testEventHandler{
typeCh: make(chan ResourceType, 1),
updateCh: make(chan map[string]dataAndErrTuple, 1),
mdCh: make(chan xdsresource.UpdateMetadata, 1),
nameCh: make(chan string, 1),
connErrCh: make(chan error, 1),
}
}
// testEventHandler is a struct that implements the xdsChannelEventhandler
// interface. It is used to receive events from an xdsChannel, and has multiple
// channels on which it makes these events available to the test.
type testEventHandler struct {
typeCh chan ResourceType // Resource type of an update or resource-does-not-exist error.
updateCh chan map[string]dataAndErrTuple // Resource updates.
mdCh chan xdsresource.UpdateMetadata // Metadata from an update.
nameCh chan string // Name of the non-existent resource.
connErrCh chan error // Connectivity error.
}
func (ta *testEventHandler) adsStreamFailure(err error) {
ta.connErrCh <- err
}
func (ta *testEventHandler) waitForStreamFailure(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-ta.connErrCh:
}
return nil
}
func (ta *testEventHandler) adsResourceUpdate(typ ResourceType, updates map[string]dataAndErrTuple, md xdsresource.UpdateMetadata, onDone func()) {
ta.typeCh <- typ
ta.updateCh <- updates
ta.mdCh <- md
onDone()
}
// waitForUpdate waits for the next resource update event from the xdsChannel.
// It returns the resource type, the resource updates, and the update metadata.
// If the context is canceled, it returns an error.
func (ta *testEventHandler) waitForUpdate(ctx context.Context) (ResourceType, map[string]dataAndErrTuple, xdsresource.UpdateMetadata, error) {
var typ ResourceType
var updates map[string]dataAndErrTuple
var md xdsresource.UpdateMetadata
select {
case typ = <-ta.typeCh:
case <-ctx.Done():
return ResourceType{}, nil, xdsresource.UpdateMetadata{}, ctx.Err()
}
select {
case updates = <-ta.updateCh:
case <-ctx.Done():
return ResourceType{}, nil, xdsresource.UpdateMetadata{}, ctx.Err()
}
select {
case md = <-ta.mdCh:
case <-ctx.Done():
return ResourceType{}, nil, xdsresource.UpdateMetadata{}, ctx.Err()
}
return typ, updates, md, nil
}
func (ta *testEventHandler) adsResourceDoesNotExist(typ ResourceType, name string) {
ta.typeCh <- typ
ta.nameCh <- name
}
// waitForResourceDoesNotExist waits for the next resource-does-not-exist event
// from the xdsChannel. It returns the resource type and the resource name. If
// the context is canceled, it returns an error.
func (ta *testEventHandler) waitForResourceDoesNotExist(ctx context.Context) (ResourceType, string, error) {
var typ ResourceType
var name string
select {
case typ = <-ta.typeCh:
case <-ctx.Done():
return ResourceType{}, "", ctx.Err()
}
select {
case name = <-ta.nameCh:
case <-ctx.Done():
return ResourceType{}, "", ctx.Err()
}
return typ, name, nil
}

View File

@ -0,0 +1,235 @@
/*
*
* Copyright 2025 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package xdsclient
import (
"bytes"
"errors"
"fmt"
"strconv"
"testing"
"time"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/xds/internal/clients/internal/pretty"
"google.golang.org/grpc/xds/internal/clients/xdsclient/internal/xdsresource"
"google.golang.org/protobuf/proto"
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
)
type s struct {
grpctest.Tester
}
func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}
const (
defaultTestWatchExpiryTimeout = 100 * time.Millisecond
defaultTestTimeout = 5 * time.Second
defaultTestShortTimeout = 10 * time.Millisecond // For events expected to *not* happen.
// listenerResourceTypeName represents the transport agnostic name for the
// listener resource.
listenerResourceTypeName = "ListenerResource"
)
var (
// Singleton instantiation of the resource type implementation.
listenerType = ResourceType{
TypeURL: xdsresource.V3ListenerURL,
TypeName: listenerResourceTypeName,
AllResourcesRequiredInSotW: true,
Decoder: listenerDecoder{},
}
)
func unmarshalListenerResource(r []byte) (string, listenerUpdate, error) {
lis := &v3listenerpb.Listener{}
if err := proto.Unmarshal(r, lis); err != nil {
return "", listenerUpdate{}, fmt.Errorf("failed to unmarshal resource: %v", err)
}
lu, err := processListener(lis)
if err != nil {
return lis.GetName(), listenerUpdate{}, err
}
lu.Raw = r
return lis.GetName(), *lu, nil
}
func processListener(lis *v3listenerpb.Listener) (*listenerUpdate, error) {
if lis.GetApiListener() != nil {
return processClientSideListener(lis)
}
return processServerSideListener(lis)
}
// processClientSideListener checks if the provided Listener proto meets
// the expected criteria. If so, it returns a non-empty routeConfigName.
func processClientSideListener(lis *v3listenerpb.Listener) (*listenerUpdate, error) {
update := &listenerUpdate{}
apiLisAny := lis.GetApiListener().GetApiListener()
if !xdsresource.IsHTTPConnManagerResource(apiLisAny.GetTypeUrl()) {
return nil, fmt.Errorf("unexpected http connection manager resource type: %q", apiLisAny.GetTypeUrl())
}
apiLis := &v3httppb.HttpConnectionManager{}
if err := proto.Unmarshal(apiLisAny.GetValue(), apiLis); err != nil {
return nil, fmt.Errorf("failed to unmarshal api_listener: %v", err)
}
// "HttpConnectionManager.xff_num_trusted_hops must be unset or zero and
// HttpConnectionManager.original_ip_detection_extensions must be empty. If
// either field has an incorrect value, the Listener must be NACKed." - A41
if apiLis.XffNumTrustedHops != 0 {
return nil, fmt.Errorf("xff_num_trusted_hops must be unset or zero %+v", apiLis)
}
if len(apiLis.OriginalIpDetectionExtensions) != 0 {
return nil, fmt.Errorf("original_ip_detection_extensions must be empty %+v", apiLis)
}
switch apiLis.RouteSpecifier.(type) {
case *v3httppb.HttpConnectionManager_Rds:
if configsource := apiLis.GetRds().GetConfigSource(); configsource.GetAds() == nil && configsource.GetSelf() == nil {
return nil, fmt.Errorf("LDS's RDS configSource is not ADS or Self: %+v", lis)
}
name := apiLis.GetRds().GetRouteConfigName()
if name == "" {
return nil, fmt.Errorf("empty route_config_name: %+v", lis)
}
update.RouteConfigName = name
case *v3httppb.HttpConnectionManager_RouteConfig:
routeU := apiLis.GetRouteConfig()
if routeU == nil {
return nil, fmt.Errorf("empty inline RDS resp:: %+v", lis)
}
if routeU.Name == "" {
return nil, fmt.Errorf("empty route_config_name in inline RDS resp: %+v", lis)
}
update.RouteConfigName = routeU.Name
case nil:
return nil, fmt.Errorf("no RouteSpecifier: %+v", apiLis)
default:
return nil, fmt.Errorf("unsupported type %T for RouteSpecifier", apiLis.RouteSpecifier)
}
return update, nil
}
func processServerSideListener(lis *v3listenerpb.Listener) (*listenerUpdate, error) {
if n := len(lis.ListenerFilters); n != 0 {
return nil, fmt.Errorf("unsupported field 'listener_filters' contains %d entries", n)
}
if useOrigDst := lis.GetUseOriginalDst(); useOrigDst != nil && useOrigDst.GetValue() {
return nil, errors.New("unsupported field 'use_original_dst' is present and set to true")
}
addr := lis.GetAddress()
if addr == nil {
return nil, fmt.Errorf("no address field in LDS response: %+v", lis)
}
sockAddr := addr.GetSocketAddress()
if sockAddr == nil {
return nil, fmt.Errorf("no socket_address field in LDS response: %+v", lis)
}
lu := &listenerUpdate{
InboundListenerCfg: &inboundListenerConfig{
Address: sockAddr.GetAddress(),
Port: strconv.Itoa(int(sockAddr.GetPortValue())),
},
}
return lu, nil
}
type listenerDecoder struct{}
// Decode deserializes and validates an xDS resource serialized inside the
// provided `Any` proto, as received from the xDS management server.
func (listenerDecoder) Decode(resource []byte, _ DecodeOptions) (*DecodeResult, error) {
name, listener, err := unmarshalListenerResource(resource)
switch {
case name == "":
// Name is unset only when protobuf deserialization fails.
return nil, err
case err != nil:
// Protobuf deserialization succeeded, but resource validation failed.
return &DecodeResult{Name: name, Resource: &listenerResourceData{Resource: listenerUpdate{}}}, err
}
return &DecodeResult{Name: name, Resource: &listenerResourceData{Resource: listener}}, nil
}
// listenerResourceData wraps the configuration of a Listener resource as
// received from the management server.
//
// Implements the ResourceData interface.
type listenerResourceData struct {
ResourceData
Resource listenerUpdate
}
// Equal returns true if other is equal to l.
func (l *listenerResourceData) Equal(other ResourceData) bool {
if l == nil && other == nil {
return true
}
if (l == nil) != (other == nil) {
return false
}
return bytes.Equal(l.Resource.Raw, other.Bytes())
}
// ToJSON returns a JSON string representation of the resource data.
func (l *listenerResourceData) ToJSON() string {
return pretty.ToJSON(l.Resource)
}
// Bytes returns the underlying raw protobuf form of the listener resource.
func (l *listenerResourceData) Bytes() []byte {
return l.Resource.Raw
}
// ListenerUpdate contains information received in an LDS response, which is of
// interest to the registered LDS watcher.
type listenerUpdate struct {
// RouteConfigName is the route configuration name corresponding to the
// target which is being watched through LDS.
RouteConfigName string
// InboundListenerCfg contains inbound listener configuration.
InboundListenerCfg *inboundListenerConfig
// Raw is the resource from the xds response.
Raw []byte
}
// InboundListenerConfig contains information about the inbound listener, i.e
// the server-side listener.
type inboundListenerConfig struct {
// Address is the local address on which the inbound listener is expected to
// accept incoming connections.
Address string
// Port is the local port on which the inbound listener is expected to
// accept incoming connections.
Port string
}

View File

@ -0,0 +1,78 @@
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package xdsresource
import (
"errors"
"fmt"
)
// ErrorType is the type of the error that the watcher will receive from the xds
// client.
type ErrorType int
const (
// ErrorTypeUnknown indicates the error doesn't have a specific type. It is
// the default value, and is returned if the error is not an xds error.
ErrorTypeUnknown ErrorType = iota
// ErrorTypeConnection indicates a connection error from the gRPC client.
ErrorTypeConnection
// ErrorTypeResourceNotFound indicates a resource is not found from the xds
// response. It's typically returned if the resource is removed in the xds
// server.
ErrorTypeResourceNotFound
// ErrorTypeResourceTypeUnsupported indicates the receipt of a message from
// the management server with resources of an unsupported resource type.
ErrorTypeResourceTypeUnsupported
// ErrTypeStreamFailedAfterRecv indicates an ADS stream error, after
// successful receipt of at least one message from the server.
ErrTypeStreamFailedAfterRecv
// ErrorTypeNACKed indicates that configuration provided by the xDS management
// server was NACKed.
ErrorTypeNACKed
)
type xdsClientError struct {
t ErrorType
desc string
}
func (e *xdsClientError) Error() string {
return e.desc
}
// NewErrorf creates an xDS client error. The callbacks are called with this
// error, to pass additional information about the error.
func NewErrorf(t ErrorType, format string, args ...any) error {
return &xdsClientError{t: t, desc: fmt.Sprintf(format, args...)}
}
// NewError creates an xDS client error. The callbacks are called with this
// error, to pass additional information about the error.
func NewError(t ErrorType, message string) error {
return NewErrorf(t, "%s", message)
}
// ErrType returns the error's type.
func ErrType(e error) ErrorType {
var xe *xdsClientError
if ok := errors.As(e, &xe); ok {
return xe.t
}
return ErrorTypeUnknown
}

View File

@ -0,0 +1,127 @@
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package xdsresource
import (
"net/url"
"sort"
"strings"
)
// FederationScheme is the scheme of a federation resource name.
const FederationScheme = "xdstp"
// Name contains the parsed component of an xDS resource name.
//
// An xDS resource name is in the format of
// xdstp://[{authority}]/{resource type}/{id/*}?{context parameters}{#processing directive,*}
//
// See
// https://github.com/cncf/xds/blob/main/proposals/TP1-xds-transport-next.md#uri-based-xds-resource-names
// for details, and examples.
type Name struct {
Scheme string
Authority string
Type string
ID string
ContextParams map[string]string
processingDirective string
}
// ParseName splits the name and returns a struct representation of the Name.
//
// If the name isn't a valid new-style xDS name, field ID is set to the input.
// Note that this is not an error, because we still support the old-style
// resource names (those not starting with "xdstp:").
//
// The caller can tell if the parsing is successful by checking the returned
// Scheme.
func ParseName(name string) *Name {
if !strings.Contains(name, "://") {
// Only the long form URL, with ://, is valid.
return &Name{ID: name}
}
parsed, err := url.Parse(name)
if err != nil {
return &Name{ID: name}
}
ret := &Name{
Scheme: parsed.Scheme,
Authority: parsed.Host,
}
split := strings.SplitN(parsed.Path, "/", 3)
if len(split) < 3 {
// Path is in the format of "/type/id". There must be at least 3
// segments after splitting.
return &Name{ID: name}
}
ret.Type = split[1]
ret.ID = split[2]
if len(parsed.Query()) != 0 {
ret.ContextParams = make(map[string]string)
for k, vs := range parsed.Query() {
if len(vs) > 0 {
// We only keep one value of each key. Behavior for multiple values
// is undefined.
ret.ContextParams[k] = vs[0]
}
}
}
// TODO: processing directive (the part comes after "#" in the URL, stored
// in parsed.RawFragment) is kept but not processed. Add support for that
// when it's needed.
ret.processingDirective = parsed.RawFragment
return ret
}
// String returns a canonicalized string of name. The context parameters are
// sorted by the keys.
func (n *Name) String() string {
if n.Scheme == "" {
return n.ID
}
// Sort and build query.
keys := make([]string, 0, len(n.ContextParams))
for k := range n.ContextParams {
keys = append(keys, k)
}
sort.Strings(keys)
var pairs []string
for _, k := range keys {
pairs = append(pairs, strings.Join([]string{k, n.ContextParams[k]}, "="))
}
rawQuery := strings.Join(pairs, "&")
path := n.Type
if n.ID != "" {
path = "/" + path + "/" + n.ID
}
tempURL := &url.URL{
Scheme: n.Scheme,
Host: n.Authority,
Path: path,
RawQuery: rawQuery,
RawFragment: n.processingDirective,
}
return tempURL.String()
}

View File

@ -0,0 +1,93 @@
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package xdsresource
import (
"time"
"google.golang.org/protobuf/types/known/anypb"
)
// UpdateMetadata contains the metadata for each update, including timestamp,
// raw message, and so on.
type UpdateMetadata struct {
// Status is the status of this resource, e.g. ACKed, NACKed, or
// Not_exist(removed).
Status ServiceStatus
// Version is the version of the xds response. Note that this is the version
// of the resource in use (previous ACKed). If a response is NACKed, the
// NACKed version is in ErrState.
Version string
// Timestamp is when the response is received.
Timestamp time.Time
// ErrState is set when the update is NACKed.
ErrState *UpdateErrorMetadata
}
// IsListenerResource returns true if the provider URL corresponds to an xDS
// Listener resource.
func IsListenerResource(url string) bool {
return url == V3ListenerURL
}
// IsHTTPConnManagerResource returns true if the provider URL corresponds to an xDS
// HTTPConnManager resource.
func IsHTTPConnManagerResource(url string) bool {
return url == V3HTTPConnManagerURL
}
// ServiceStatus is the status of the update.
type ServiceStatus int
const (
// ServiceStatusUnknown is the default state, before a watch is started for
// the resource.
ServiceStatusUnknown ServiceStatus = iota
// ServiceStatusRequested is when the watch is started, but before and
// response is received.
ServiceStatusRequested
// ServiceStatusNotExist is when the resource doesn't exist in
// state-of-the-world responses (e.g. LDS and CDS), which means the resource
// is removed by the management server.
ServiceStatusNotExist // Resource is removed in the server, in LDS/CDS.
// ServiceStatusACKed is when the resource is ACKed.
ServiceStatusACKed
// ServiceStatusNACKed is when the resource is NACKed.
ServiceStatusNACKed
)
// UpdateErrorMetadata is part of UpdateMetadata. It contains the error state
// when a response is NACKed.
type UpdateErrorMetadata struct {
// Version is the version of the NACKed response.
Version string
// Err contains why the response was NACKed.
Err error
// Timestamp is when the NACKed response was received.
Timestamp time.Time
}
// UpdateWithMD contains the raw message of the update and the metadata,
// including version, raw message, timestamp.
//
// This is to be used for config dump and CSDS, not directly by users (like
// resolvers/balancers).
type UpdateWithMD struct {
MD UpdateMetadata
Raw *anypb.Any
}

View File

@ -0,0 +1,30 @@
/*
*
* Copyright 2025 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package xdsresource defines constants to distinguish between supported xDS
// API versions.
package xdsresource
// Resource URLs. We need to be able to accept either version of the resource
// regardless of the version of the transport protocol in use.
const (
googleapiPrefix = "type.googleapis.com/"
V3ListenerURL = googleapiPrefix + "envoy.config.listener.v3.Listener"
V3HTTPConnManagerURL = googleapiPrefix + "envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager"
)