mirror of https://github.com/linkerd/linkerd2.git
134 lines
3.4 KiB
Go
134 lines
3.4 KiB
Go
package destination
|
|
|
|
import (
|
|
"errors"
|
|
"net/http"
|
|
"testing"
|
|
"time"
|
|
|
|
pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
|
|
"github.com/linkerd/linkerd2/controller/api/destination/watcher"
|
|
consts "github.com/linkerd/linkerd2/pkg/k8s"
|
|
logging "github.com/sirupsen/logrus"
|
|
corev1 "k8s.io/api/core/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
)
|
|
|
|
func TestEndpointProfileTranslator(t *testing.T) {
|
|
// logging.SetLevel(logging.TraceLevel)
|
|
// defer logging.SetLevel(logging.PanicLevel)
|
|
|
|
addr := &watcher.Address{
|
|
IP: "10.10.11.11",
|
|
Port: 8080,
|
|
}
|
|
podAddr := &watcher.Address{
|
|
IP: "10.10.11.11",
|
|
Port: 8080,
|
|
Pod: &corev1.Pod{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Annotations: map[string]string{
|
|
consts.ProxyOpaquePortsAnnotation: "8080",
|
|
},
|
|
},
|
|
},
|
|
}
|
|
|
|
t.Run("Sends update", func(t *testing.T) {
|
|
mockGetProfileServer := &mockDestinationGetProfileServer{
|
|
profilesReceived: make(chan *pb.DestinationProfile), // UNBUFFERED
|
|
}
|
|
log := logging.WithField("test", t.Name())
|
|
translator := newEndpointProfileTranslator(
|
|
true, true, "cluster", "identity", make(map[uint32]struct{}), nil,
|
|
mockGetProfileServer,
|
|
nil,
|
|
log,
|
|
)
|
|
translator.Start()
|
|
defer translator.Stop()
|
|
|
|
if err := translator.Update(addr); err != nil {
|
|
t.Fatal("Expected update")
|
|
}
|
|
select {
|
|
case p := <-mockGetProfileServer.profilesReceived:
|
|
log.Debugf("Received update: %v", p)
|
|
case <-time.After(1 * time.Second):
|
|
t.Fatal("No update received")
|
|
}
|
|
|
|
if err := translator.Update(addr); err != nil {
|
|
t.Fatal("Unexpected update")
|
|
}
|
|
select {
|
|
case p := <-mockGetProfileServer.profilesReceived:
|
|
t.Fatalf("Duplicate update sent: %v", p)
|
|
case <-time.After(1 * time.Second):
|
|
}
|
|
|
|
if err := translator.Update(podAddr); err != nil {
|
|
t.Fatal("Expected update")
|
|
}
|
|
select {
|
|
case p := <-mockGetProfileServer.profilesReceived:
|
|
log.Debugf("Received update: %v", p)
|
|
case <-time.After(1 * time.Second):
|
|
}
|
|
})
|
|
|
|
t.Run("Handles overflow", func(t *testing.T) {
|
|
mockGetProfileServer := &mockDestinationGetProfileServer{
|
|
profilesReceived: make(chan *pb.DestinationProfile, 1),
|
|
}
|
|
log := logging.WithField("test", t.Name())
|
|
endStream := make(chan struct{})
|
|
translator := newEndpointProfileTranslator(
|
|
true, true, "cluster", "identity", make(map[uint32]struct{}), nil,
|
|
mockGetProfileServer,
|
|
endStream,
|
|
log,
|
|
)
|
|
|
|
// We avoid starting the translator so that it doesn't drain its update
|
|
// queue and we can test the overflow behavior.
|
|
|
|
for i := 0; i < updateQueueCapacity/2; i++ {
|
|
if err := translator.Update(podAddr); err != nil {
|
|
t.Fatal("Expected update")
|
|
}
|
|
select {
|
|
case <-endStream:
|
|
t.Fatal("Stream ended prematurely")
|
|
default:
|
|
}
|
|
|
|
if err := translator.Update(addr); err != nil {
|
|
t.Fatal("Expected update")
|
|
}
|
|
select {
|
|
case <-endStream:
|
|
t.Fatal("Stream ended prematurely")
|
|
default:
|
|
}
|
|
}
|
|
|
|
// The queue should be full and the next update should fail.
|
|
t.Logf("Queue length=%d capacity=%d", translator.queueLen(), updateQueueCapacity)
|
|
if err := translator.Update(podAddr); err == nil {
|
|
if !errors.Is(err, http.ErrServerClosed) {
|
|
t.Fatalf("Expected update to fail; queue=%d; capacity=%d", translator.queueLen(), updateQueueCapacity)
|
|
}
|
|
}
|
|
|
|
select {
|
|
case <-endStream:
|
|
default:
|
|
t.Fatal("Stream should have ended")
|
|
}
|
|
|
|
// XXX We should assert that endpointProfileUpdatesQueueOverflowCounter
|
|
// == 1 but we can't read counter values.
|
|
})
|
|
}
|