/* Copyright 2021 The Dapr Authors Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package consul import ( "context" "fmt" "net" "strconv" "sync/atomic" "testing" "time" consul "github.com/hashicorp/consul/api" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" nr "github.com/dapr/components-contrib/nameresolution" "github.com/dapr/kit/logger" ) type mockClient struct { mockAgent mockHealth initClientCalled int initClientErr error } func (m *mockClient) InitClient(config *consul.Config) error { m.initClientCalled++ return m.initClientErr } func (m *mockClient) Health() healthInterface { return &m.mockHealth } func (m *mockClient) Agent() agentInterface { return &m.mockAgent } type mockHealth struct { serviceCalled int serviceErr *error serviceBehavior func(service, tag string, passingOnly bool, q *consul.QueryOptions) serviceResult []*consul.ServiceEntry serviceMeta *consul.QueryMeta stateCallStarted atomic.Int32 stateCalled int stateError *error stateBehaviour func(state string, q *consul.QueryOptions) stateResult consul.HealthChecks stateMeta *consul.QueryMeta } func (m *mockHealth) State(state string, q *consul.QueryOptions) (consul.HealthChecks, *consul.QueryMeta, error) { m.stateCallStarted.Add(1) if m.stateBehaviour != nil { m.stateBehaviour(state, q) } m.stateCalled++ if m.stateError == nil { return m.stateResult, m.stateMeta, nil } return m.stateResult, m.stateMeta, *m.stateError } func (m *mockHealth) Service(service, tag string, passingOnly bool, q *consul.QueryOptions) ([]*consul.ServiceEntry, *consul.QueryMeta, error) { if m.serviceBehavior != nil { m.serviceBehavior(service, tag, passingOnly, q) } m.serviceCalled++ if m.serviceErr == nil { return m.serviceResult, m.serviceMeta, nil } return m.serviceResult, m.serviceMeta, *m.serviceErr } type mockAgent struct { selfCalled int selfErr error selfResult map[string]map[string]interface{} serviceRegisterCalled int serviceRegisterErr error serviceDeregisterCalled int serviceDeregisterErr error } func (m *mockAgent) Self() (map[string]map[string]interface{}, error) { m.selfCalled++ return m.selfResult, m.selfErr } func (m *mockAgent) ServiceRegister(service *consul.AgentServiceRegistration) error { m.serviceRegisterCalled++ return m.serviceRegisterErr } func (m *mockAgent) ServiceDeregister(serviceID string) error { m.serviceDeregisterCalled++ return m.serviceDeregisterErr } type mockRegistry struct { getKeysCalled atomic.Int32 getKeysResult *[]string getKeysBehaviour func() addOrUpdateCalled atomic.Int32 addOrUpdateBehaviour func(service string, services []*consul.ServiceEntry) expireCalled int expireAllCalled int removeCalled int removeAllCalled atomic.Int32 getCalled int getResult *registryEntry registerChannelResult chan string } func (m *mockRegistry) registrationChannel() chan string { return m.registerChannelResult } func (m *mockRegistry) getKeys() []string { if m.getKeysBehaviour != nil { m.getKeysBehaviour() } m.getKeysCalled.Add(1) return *m.getKeysResult } func (m *mockRegistry) expireAll() { m.expireAllCalled++ } func (m *mockRegistry) removeAll() { m.removeAllCalled.Add(1) } func (m *mockRegistry) addOrUpdate(service string, services []*consul.ServiceEntry) { if m.addOrUpdateBehaviour != nil { m.addOrUpdateBehaviour(service, services) } m.addOrUpdateCalled.Add(1) } func (m *mockRegistry) expire(service string) { m.expireCalled++ } func (m *mockRegistry) remove(service string) { m.removeCalled++ } func (m *mockRegistry) get(service string) *registryEntry { m.getCalled++ return m.getResult } func TestInit(t *testing.T) { tests := []struct { testName string metadata nr.Metadata test func(*testing.T, nr.Metadata) }{ { "given no configuration don't register service just check agent", nr.Metadata{Instance: getInstanceInfoWithoutKey(""), Configuration: nil}, func(t *testing.T, metadata nr.Metadata) { var mock mockClient resolver := newResolver(logger.NewLogger("test"), resolverConfig{}, &mock, ®istry{}, make(chan struct{})) _ = resolver.Init(context.Background(), metadata) assert.Equal(t, 1, mock.initClientCalled) assert.Equal(t, 0, mock.mockAgent.serviceRegisterCalled) assert.Equal(t, 1, mock.mockAgent.selfCalled) }, }, { "given SelfRegister true then register service", nr.Metadata{ Instance: getInstanceInfoWithoutKey(""), Configuration: configSpec{ SelfRegister: true, }, }, func(t *testing.T, metadata nr.Metadata) { var mock mockClient resolver := newResolver(logger.NewLogger("test"), resolverConfig{}, &mock, ®istry{}, make(chan struct{})) _ = resolver.Init(context.Background(), metadata) assert.Equal(t, 1, mock.initClientCalled) assert.Equal(t, 1, mock.mockAgent.serviceRegisterCalled) assert.Equal(t, 0, mock.mockAgent.selfCalled) }, }, { "given AdvancedRegistraion then register service", nr.Metadata{ Instance: getInstanceInfoWithoutKey(""), Configuration: configSpec{ AdvancedRegistration: &consul.AgentServiceRegistration{}, QueryOptions: &consul.QueryOptions{}, }, }, func(t *testing.T, metadata nr.Metadata) { var mock mockClient resolver := newResolver(logger.NewLogger("test"), resolverConfig{}, &mock, ®istry{}, make(chan struct{})) _ = resolver.Init(context.Background(), metadata) assert.Equal(t, 1, mock.initClientCalled) assert.Equal(t, 1, mock.mockAgent.serviceRegisterCalled) assert.Equal(t, 0, mock.mockAgent.selfCalled) }, }, } for _, tt := range tests { tt := tt t.Run(tt.testName, func(t *testing.T) { tt.test(t, tt.metadata) }) } } func TestResolveID(t *testing.T) { t.Parallel() testConfig := resolverConfig{ DaprPortMetaKey: "DAPR_PORT", QueryOptions: &consul.QueryOptions{}, } tests := []struct { testName string req nr.ResolveRequest test func(*testing.T, nr.ResolveRequest) }{ { "should use cache when enabled", nr.ResolveRequest{ ID: "test-app", }, func(t *testing.T, req nr.ResolveRequest) { blockingCall := make(chan uint64) meta := &consul.QueryMeta{ LastIndex: 0, } serviceEntries := []*consul.ServiceEntry{ { Service: &consul.AgentService{ Address: "10.3.245.137", Port: 8600, Meta: map[string]string{ "DAPR_PORT": "50005", }, }, }, } cachedEntries := []*consul.ServiceEntry{ { Service: &consul.AgentService{ Address: "10.3.245.137", Port: 8600, Meta: map[string]string{ "DAPR_PORT": "70007", }, }, }, } healthChecks := consul.HealthChecks{ &consul.HealthCheck{ Node: "0e1234", ServiceID: "test-app-10.3.245.137-3500", ServiceName: "test-app", Status: consul.HealthPassing, }, } mock := &mockClient{ mockHealth: mockHealth{ // Service() serviceResult: serviceEntries, serviceMeta: meta, serviceBehavior: func(service, tag string, passingOnly bool, q *consul.QueryOptions) { }, serviceErr: nil, // State() stateResult: healthChecks, stateMeta: meta, stateBehaviour: func(state string, q *consul.QueryOptions) { meta.LastIndex = <-blockingCall }, stateError: nil, }, } cfg := resolverConfig{ DaprPortMetaKey: "DAPR_PORT", UseCache: true, QueryOptions: &consul.QueryOptions{}, } serviceKeys := make([]string, 0, 10) mockReg := &mockRegistry{ registerChannelResult: make(chan string, 100), getKeysResult: &serviceKeys, addOrUpdateBehaviour: func(service string, services []*consul.ServiceEntry) { if services == nil { serviceKeys = append(serviceKeys, service) } }, } resolver := newResolver(logger.NewLogger("test"), cfg, mock, mockReg, make(chan struct{})) addr, _ := resolver.ResolveID(context.Background(), req) // no apps in registry - cache miss, call agent directly assert.Equal(t, 1, mockReg.getCalled) waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.getKeysCalled.Load() == 2 }) assert.Equal(t, 1, mock.mockHealth.serviceCalled) assert.Equal(t, "10.3.245.137:50005", addr) // watcher adds app to registry assert.Equal(t, int32(1), mockReg.addOrUpdateCalled.Load()) assert.Equal(t, int32(2), mockReg.getKeysCalled.Load()) mockReg.registerChannelResult <- "test-app" mockReg.getResult = ®istryEntry{ services: cachedEntries, } // blocking query - return new index blockingCall <- 2 waitTillTrueOrTimeout(time.Second, func() bool { return mock.mockHealth.stateCallStarted.Load() == 2 }) assert.Equal(t, 1, mock.mockHealth.stateCalled) // get healthy nodes and update registry for service in result assert.Equal(t, 2, mock.mockHealth.serviceCalled) assert.Equal(t, int32(2), mockReg.addOrUpdateCalled.Load()) // resolve id should only hit cache now addr, _ = resolver.ResolveID(context.Background(), req) assert.Equal(t, "10.3.245.137:70007", addr) addr, _ = resolver.ResolveID(context.Background(), req) assert.Equal(t, "10.3.245.137:70007", addr) addr, _ = resolver.ResolveID(context.Background(), req) assert.Equal(t, "10.3.245.137:70007", addr) assert.Equal(t, 2, mock.mockHealth.serviceCalled) assert.Equal(t, 4, mockReg.getCalled) // no update when no change in index and payload blockingCall <- 2 waitTillTrueOrTimeout(time.Second, func() bool { return mock.mockHealth.stateCallStarted.Load() == 3 }) assert.Equal(t, 2, mock.mockHealth.stateCalled) assert.Equal(t, 2, mock.mockHealth.serviceCalled) assert.Equal(t, int32(2), mockReg.addOrUpdateCalled.Load()) // no update when no change in payload blockingCall <- 3 waitTillTrueOrTimeout(time.Second, func() bool { return mock.mockHealth.stateCallStarted.Load() == 4 }) assert.Equal(t, 3, mock.mockHealth.stateCalled) assert.Equal(t, 2, mock.mockHealth.serviceCalled) assert.Equal(t, int32(2), mockReg.addOrUpdateCalled.Load()) // update when change in index and payload mock.mockHealth.stateResult[0].Status = consul.HealthCritical blockingCall <- 4 waitTillTrueOrTimeout(time.Second, func() bool { return mock.mockHealth.stateCallStarted.Load() == 5 }) assert.Equal(t, 4, mock.mockHealth.stateCalled) assert.Equal(t, 3, mock.mockHealth.serviceCalled) assert.Equal(t, int32(3), mockReg.addOrUpdateCalled.Load()) }, }, { "should only update cache on change", nr.ResolveRequest{ ID: "test-app", }, func(t *testing.T, req nr.ResolveRequest) { blockingCall := make(chan uint64) meta := &consul.QueryMeta{} var err error // Node 1 all checks healthy node1check1 := &consul.HealthCheck{ Node: "0e1234", ServiceID: "test-app-10.3.245.137-3500", ServiceName: "test-app", Status: consul.HealthPassing, CheckID: "1", } node1check2 := &consul.HealthCheck{ Node: "0e1234", ServiceID: "test-app-10.3.245.137-3500", ServiceName: "test-app", Status: consul.HealthPassing, CheckID: "2", } // Node 2 all checks unhealthy node2check1 := &consul.HealthCheck{ Node: "0e9878", ServiceID: "test-app-10.3.245.127-3500", ServiceName: "test-app", Status: consul.HealthCritical, CheckID: "1", } node2check2 := &consul.HealthCheck{ Node: "0e9878", ServiceID: "test-app-10.3.245.127-3500", ServiceName: "test-app", Status: consul.HealthCritical, CheckID: "2", } mock := mockClient{ mockHealth: mockHealth{ // Service() serviceResult: []*consul.ServiceEntry{ { Service: &consul.AgentService{ Address: "10.3.245.137", Port: 8600, Meta: map[string]string{ "DAPR_PORT": "50005", }, }, }, }, serviceMeta: meta, serviceBehavior: nil, serviceErr: &err, // State() stateResult: consul.HealthChecks{ node1check1, node1check2, node2check1, node2check2, }, stateMeta: meta, stateBehaviour: func(state string, q *consul.QueryOptions) { meta.LastIndex = <-blockingCall }, stateError: nil, }, } cfg := resolverConfig{ DaprPortMetaKey: "DAPR_PORT", UseCache: true, QueryOptions: &consul.QueryOptions{ WaitIndex: 1, }, } serviceKeys := make([]string, 0, 10) mockReg := &mockRegistry{ registerChannelResult: make(chan string, 100), getKeysResult: &serviceKeys, addOrUpdateBehaviour: func(service string, services []*consul.ServiceEntry) { if services == nil { serviceKeys = append(serviceKeys, service) } }, } resolver := newResolver(logger.NewLogger("test"), cfg, &mock, mockReg, make(chan struct{})) addr, _ := resolver.ResolveID(context.Background(), req) // no apps in registry - cache miss, call agent directly assert.Equal(t, 1, mockReg.getCalled) waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.addOrUpdateCalled.Load() == 1 }) assert.Equal(t, 1, mock.mockHealth.serviceCalled) assert.Equal(t, "10.3.245.137:50005", addr) // watcher adds app to registry assert.Equal(t, int32(1), mockReg.addOrUpdateCalled.Load()) assert.Equal(t, int32(2), mockReg.getKeysCalled.Load()) // add key to mock registry - trigger watcher mockReg.registerChannelResult <- "test-app" mockReg.getResult = ®istryEntry{ services: mock.mockHealth.serviceResult, } // blocking query - return new index blockingCall <- 2 waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.addOrUpdateCalled.Load() == 2 }) assert.Equal(t, 1, mock.mockHealth.stateCalled) // get healthy nodes and update registry for service in result assert.Equal(t, 2, mock.mockHealth.serviceCalled) assert.Equal(t, int32(2), mockReg.addOrUpdateCalled.Load()) // resolve id should only hit cache now _, _ = resolver.ResolveID(context.Background(), req) _, _ = resolver.ResolveID(context.Background(), req) _, _ = resolver.ResolveID(context.Background(), req) assert.Equal(t, 2, mock.mockHealth.serviceCalled) // change one check for node1 app to critical node1check1.Status = consul.HealthCritical // blocking query - return new index - node1 app is now unhealthy blockingCall <- 3 waitTillTrueOrTimeout(time.Second, func() bool { return mock.mockHealth.stateCallStarted.Load() == 3 }) assert.Equal(t, 2, mock.mockHealth.stateCalled) assert.Equal(t, 3, mock.mockHealth.serviceCalled) assert.Equal(t, int32(3), mockReg.addOrUpdateCalled.Load()) // change remaining check for node1 app to critical node1check2.Status = consul.HealthCritical // blocking query - return new index - node1 app is still unhealthy, no change blockingCall <- 4 waitTillTrueOrTimeout(time.Second, func() bool { return mock.mockHealth.stateCallStarted.Load() == 4 }) assert.Equal(t, 3, mock.mockHealth.stateCalled) assert.Equal(t, 3, mock.mockHealth.serviceCalled) assert.Equal(t, int32(3), mockReg.addOrUpdateCalled.Load()) // change one check for node2 app to healthy node2check1.Status = consul.HealthPassing // blocking query - return new index - node2 app is still unhealthy, no change blockingCall <- 4 waitTillTrueOrTimeout(time.Second, func() bool { return mock.mockHealth.stateCallStarted.Load() == 5 }) assert.Equal(t, 4, mock.mockHealth.stateCalled) assert.Equal(t, 3, mock.mockHealth.serviceCalled) assert.Equal(t, int32(3), mockReg.addOrUpdateCalled.Load()) // change remaining check for node2 app to healthy node2check2.Status = consul.HealthPassing // blocking query - return new index - node2 app is now healthy blockingCall <- 5 waitTillTrueOrTimeout(time.Second, func() bool { return mock.mockHealth.stateCallStarted.Load() == 6 }) assert.Equal(t, 5, mock.mockHealth.stateCalled) assert.Equal(t, 4, mock.mockHealth.serviceCalled) assert.Equal(t, int32(4), mockReg.addOrUpdateCalled.Load()) }, }, { "should expire cache upon blocking call error", nr.ResolveRequest{ ID: "test-app", }, func(t *testing.T, req nr.ResolveRequest) { blockingCall := make(chan uint64) meta := &consul.QueryMeta{ LastIndex: 0, } err := fmt.Errorf("oh no") serviceEntries := []*consul.ServiceEntry{ { Service: &consul.AgentService{ Address: "10.3.245.137", Port: 8600, Meta: map[string]string{ "DAPR_PORT": "50005", }, }, }, } healthChecks := consul.HealthChecks{ &consul.HealthCheck{ Node: "0e1234", ServiceID: "test-app-10.3.245.137-3500", ServiceName: "test-app", Status: consul.HealthPassing, }, } mock := &mockClient{ mockHealth: mockHealth{ // Service() serviceResult: serviceEntries, serviceMeta: meta, serviceBehavior: func(service, tag string, passingOnly bool, q *consul.QueryOptions) { }, serviceErr: nil, // State() stateResult: healthChecks, stateMeta: meta, stateBehaviour: func(state string, q *consul.QueryOptions) { meta.LastIndex = <-blockingCall }, stateError: nil, }, } cfg := resolverConfig{ DaprPortMetaKey: "DAPR_PORT", UseCache: true, QueryOptions: &consul.QueryOptions{}, } serviceKeys := make([]string, 0, 10) mockReg := &mockRegistry{ registerChannelResult: make(chan string, 100), getKeysResult: &serviceKeys, addOrUpdateBehaviour: func(service string, services []*consul.ServiceEntry) { if services == nil { serviceKeys = append(serviceKeys, service) } }, } resolver := newResolver(logger.NewLogger("test"), cfg, mock, mockReg, make(chan struct{})) addr, _ := resolver.ResolveID(context.Background(), req) // Cache miss pass through assert.Equal(t, 1, mockReg.getCalled) waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.addOrUpdateCalled.Load() == 1 }) assert.Equal(t, 1, mock.mockHealth.serviceCalled) assert.Equal(t, int32(1), mockReg.addOrUpdateCalled.Load()) assert.Equal(t, "10.3.245.137:50005", addr) waitTillTrueOrTimeout(time.Second, func() bool { return mock.mockHealth.stateCallStarted.Load() == 1 }) mockReg.getKeysResult = &serviceKeys mockReg.registerChannelResult <- "test-app" mockReg.getResult = ®istryEntry{ services: serviceEntries, } blockingCall <- 2 waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.addOrUpdateCalled.Load() == 2 }) assert.Equal(t, 1, mock.mockHealth.stateCalled) assert.Equal(t, 2, mock.mockHealth.serviceCalled) assert.Equal(t, int32(2), mockReg.addOrUpdateCalled.Load()) mock.mockHealth.stateError = &err blockingCall <- 3 blockingCall <- 3 waitTillTrueOrTimeout(time.Second, func() bool { return mock.mockHealth.stateCallStarted.Load() == 2 }) assert.Equal(t, 1, mockReg.expireAllCalled) }, }, { "should stop watcher on close", nr.ResolveRequest{ ID: "test-app", }, func(t *testing.T, req nr.ResolveRequest) { blockingCall := make(chan uint64) meta := &consul.QueryMeta{ LastIndex: 0, } serviceEntries := []*consul.ServiceEntry{ { Service: &consul.AgentService{ Address: "10.3.245.137", Port: 8600, Meta: map[string]string{ "DAPR_PORT": "50005", }, }, }, } healthChecks := consul.HealthChecks{ &consul.HealthCheck{ Node: "0e1234", ServiceID: "test-app-10.3.245.137-3500", ServiceName: "test-app", Status: consul.HealthPassing, }, } mock := &mockClient{ mockHealth: mockHealth{ // Service() serviceResult: serviceEntries, serviceMeta: meta, serviceBehavior: func(service, tag string, passingOnly bool, q *consul.QueryOptions) { }, serviceErr: nil, // State() stateResult: healthChecks, stateMeta: meta, stateBehaviour: func(state string, q *consul.QueryOptions) { select { case meta.LastIndex = <-blockingCall: case <-q.Context().Done(): } }, stateError: nil, }, } cfg := resolverConfig{ DaprPortMetaKey: "DAPR_PORT", UseCache: true, QueryOptions: &consul.QueryOptions{}, } serviceKeys := make([]string, 0, 10) mockReg := &mockRegistry{ registerChannelResult: make(chan string, 100), getKeysResult: &serviceKeys, addOrUpdateBehaviour: func(service string, services []*consul.ServiceEntry) { if services == nil { serviceKeys = append(serviceKeys, service) } }, } resolver := newResolver(logger.NewLogger("test"), cfg, mock, mockReg, make(chan struct{})).(*resolver) addr, _ := resolver.ResolveID(context.Background(), req) // Cache miss pass through assert.Equal(t, 1, mockReg.getCalled) waitTillTrueOrTimeout(time.Second, func() bool { return mockReg.addOrUpdateCalled.Load() == 1 }) assert.Equal(t, 1, mock.mockHealth.serviceCalled) assert.Equal(t, int32(1), mockReg.addOrUpdateCalled.Load()) assert.Equal(t, "10.3.245.137:50005", addr) waitTillTrueOrTimeout(time.Second, func() bool { return mock.mockHealth.stateCallStarted.Load() == 1 }) mockReg.getKeysResult = &serviceKeys mockReg.registerChannelResult <- "test-app" mockReg.getResult = ®istryEntry{ services: serviceEntries, } resolver.Close() waitTillTrueOrTimeout(time.Second*1, func() bool { return mockReg.removeAllCalled.Load() == 1 }) assert.Equal(t, int32(1), mockReg.removeAllCalled.Load()) assert.False(t, resolver.watcherStarted.Load()) }, }, { "error if no healthy services found", nr.ResolveRequest{ ID: "test-app", }, func(t *testing.T, req nr.ResolveRequest) { mock := mockClient{ mockHealth: mockHealth{ serviceResult: []*consul.ServiceEntry{}, }, } resolver := newResolver(logger.NewLogger("test"), testConfig, &mock, ®istry{}, make(chan struct{})) _, err := resolver.ResolveID(context.Background(), req) assert.Equal(t, 1, mock.mockHealth.serviceCalled) require.Error(t, err) }, }, { "should get address from service", nr.ResolveRequest{ ID: "test-app", }, func(t *testing.T, req nr.ResolveRequest) { mock := mockClient{ mockHealth: mockHealth{ serviceResult: []*consul.ServiceEntry{ { Service: &consul.AgentService{ Address: "10.3.245.137", Port: 8600, Meta: map[string]string{ "DAPR_PORT": "50005", }, }, }, }, }, } resolver := newResolver(logger.NewLogger("test"), testConfig, &mock, ®istry{}, make(chan struct{})) addr, _ := resolver.ResolveID(context.Background(), req) assert.Equal(t, "10.3.245.137:50005", addr) }, }, { "should get ipv6 address from service", nr.ResolveRequest{ ID: "test-app", }, func(t *testing.T, req nr.ResolveRequest) { mock := mockClient{ mockHealth: mockHealth{ serviceResult: []*consul.ServiceEntry{ { Service: &consul.AgentService{ Address: "2001:db8:3333:4444:5555:6666:7777:8888", Port: 8600, Meta: map[string]string{ "DAPR_PORT": "50005", }, }, }, }, }, } resolver := newResolver(logger.NewLogger("test"), testConfig, &mock, ®istry{}, make(chan struct{})) addr, _ := resolver.ResolveID(context.Background(), req) assert.Equal(t, "[2001:db8:3333:4444:5555:6666:7777:8888]:50005", addr) }, }, { "should get localhost (hostname) from service", nr.ResolveRequest{ ID: "test-app", }, func(t *testing.T, req nr.ResolveRequest) { t.Helper() mock := mockClient{ mockHealth: mockHealth{ serviceResult: []*consul.ServiceEntry{ { Service: &consul.AgentService{ Address: "localhost", Port: 8600, Meta: map[string]string{ "DAPR_PORT": "50005", }, }, }, }, }, } resolver := newResolver(logger.NewLogger("test"), testConfig, &mock, ®istry{}, make(chan struct{})) addr, _ := resolver.ResolveID(context.Background(), req) assert.Equal(t, "localhost:50005", addr) }, }, { "should get random address from service", nr.ResolveRequest{ ID: "test-app", }, func(t *testing.T, req nr.ResolveRequest) { mock := mockClient{ mockHealth: mockHealth{ serviceResult: []*consul.ServiceEntry{ { Service: &consul.AgentService{ Address: "10.3.245.137", Port: 8600, Meta: map[string]string{ "DAPR_PORT": "50005", }, }, }, { Service: &consul.AgentService{ Address: "234.245.255.228", Port: 8600, Meta: map[string]string{ "DAPR_PORT": "50005", }, }, }, }, }, } resolver := newResolver(logger.NewLogger("test"), testConfig, &mock, ®istry{}, make(chan struct{})) total1 := 0 total2 := 0 for i := 0; i < 100; i++ { addr, _ := resolver.ResolveID(context.Background(), req) if addr == "10.3.245.137:50005" { total1++ } else if addr == "234.245.255.228:50005" { total2++ } else { t.Fatalf("Received unexpected address: %s", addr) } } // Because of the random nature of the address being returned, we just check to make sure we get at least 20 of each (and a total of 100) assert.Equal(t, 100, total1+total2) assert.Greater(t, total1, 20) assert.Greater(t, total2, 20) }, }, { "should get address from node if not on service", nr.ResolveRequest{ ID: "test-app", }, func(t *testing.T, req nr.ResolveRequest) { mock := mockClient{ mockHealth: mockHealth{ serviceResult: []*consul.ServiceEntry{ { Node: &consul.Node{ Address: "10.3.245.137", }, Service: &consul.AgentService{ Address: "", Port: 8600, Meta: map[string]string{ "DAPR_PORT": "50005", }, }, }, { Node: &consul.Node{ Address: "10.3.245.137", }, Service: &consul.AgentService{ Address: "", Port: 8600, Meta: map[string]string{ "DAPR_PORT": "50005", }, }, }, }, }, } resolver := newResolver(logger.NewLogger("test"), testConfig, &mock, ®istry{}, make(chan struct{})) addr, _ := resolver.ResolveID(context.Background(), req) assert.Equal(t, "10.3.245.137:50005", addr) }, }, { "error if no address found on service", nr.ResolveRequest{ ID: "test-app", }, func(t *testing.T, req nr.ResolveRequest) { mock := mockClient{ mockHealth: mockHealth{ serviceResult: []*consul.ServiceEntry{ { Node: &consul.Node{}, Service: &consul.AgentService{ Port: 8600, Meta: map[string]string{ "DAPR_PORT": "50005", }, }, }, }, }, } resolver := newResolver(logger.NewLogger("test"), testConfig, &mock, ®istry{}, make(chan struct{})) _, err := resolver.ResolveID(context.Background(), req) require.Error(t, err) }, }, { "error if consul service missing DaprPortMetaKey", nr.ResolveRequest{ ID: "test-app", }, func(t *testing.T, req nr.ResolveRequest) { mock := mockClient{ mockHealth: mockHealth{ serviceResult: []*consul.ServiceEntry{ { Service: &consul.AgentService{ Address: "123.234.145.155", Port: 8600, }, }, }, }, } resolver := newResolver(logger.NewLogger("test"), testConfig, &mock, ®istry{}, make(chan struct{})) _, err := resolver.ResolveID(context.Background(), req) require.Error(t, err) }, }, } for _, tt := range tests { tt := tt t.Run(tt.testName, func(t *testing.T) { tt.test(t, tt.req) }) } } func TestClose(t *testing.T) { tests := []struct { testName string metadata nr.Metadata test func(*testing.T, nr.Metadata) }{ { "should deregister", nr.Metadata{Instance: getInstanceInfoWithoutKey(""), Configuration: nil}, func(t *testing.T, metadata nr.Metadata) { var mock mockClient cfg := resolverConfig{ Registration: &consul.AgentServiceRegistration{}, DeregisterOnClose: true, } resolver := newResolver(logger.NewLogger("test"), cfg, &mock, ®istry{}, make(chan struct{})).(*resolver) resolver.Close() assert.Equal(t, 1, mock.mockAgent.serviceDeregisterCalled) }, }, { "should not deregister", nr.Metadata{Instance: getInstanceInfoWithoutKey(""), Configuration: nil}, func(t *testing.T, metadata nr.Metadata) { var mock mockClient cfg := resolverConfig{ Registration: &consul.AgentServiceRegistration{}, DeregisterOnClose: false, } resolver := newResolver(logger.NewLogger("test"), cfg, &mock, ®istry{}, make(chan struct{})).(*resolver) resolver.Close() assert.Equal(t, 0, mock.mockAgent.serviceDeregisterCalled) }, }, { "should not deregister when no registration", nr.Metadata{Instance: getInstanceInfoWithoutKey(""), Configuration: nil}, func(t *testing.T, metadata nr.Metadata) { var mock mockClient cfg := resolverConfig{ Registration: nil, DeregisterOnClose: true, } resolver := newResolver(logger.NewLogger("test"), cfg, &mock, ®istry{}, make(chan struct{})).(*resolver) resolver.Close() assert.Equal(t, 0, mock.mockAgent.serviceDeregisterCalled) }, }, { "should stop watcher if started", nr.Metadata{Instance: getInstanceInfoWithoutKey(""), Configuration: nil}, func(t *testing.T, metadata nr.Metadata) { var mock mockClient resolver := newResolver(logger.NewLogger("test"), resolverConfig{}, &mock, ®istry{}, make(chan struct{})).(*resolver) resolver.watcherStarted.Store(true) go resolver.Close() sleepTimer := time.NewTimer(time.Second) watcherStoppedInItem := false select { case <-sleepTimer.C: case <-resolver.watcherStopChannel: watcherStoppedInItem = true } assert.True(t, watcherStoppedInItem) }, }, } for _, tt := range tests { tt := tt t.Run(tt.testName, func(t *testing.T) { t.Parallel() tt.test(t, tt.metadata) }) } } func TestRegistry(t *testing.T) { appID := "myService" tests := []struct { testName string test func(*testing.T) }{ { "should add and update entry", func(t *testing.T) { registry := ®istry{} result := []*consul.ServiceEntry{ { Service: &consul.AgentService{ Address: "10.3.245.137", Port: 8600, }, }, } registry.addOrUpdate(appID, result) entry, _ := registry.entries.Load(appID) assert.Equal(t, result, entry.(*registryEntry).services) update := []*consul.ServiceEntry{ { Service: &consul.AgentService{ Address: "random", Port: 123, }, }, } registry.addOrUpdate(appID, update) entry, _ = registry.entries.Load(appID) assert.Equal(t, update, entry.(*registryEntry).services) }, }, { "should expire entries", func(t *testing.T) { registry := ®istry{} registry.entries.Store( "A", ®istryEntry{ services: []*consul.ServiceEntry{ { Service: &consul.AgentService{ Address: "10.3.245.137", Port: 8600, }, }, }, }) registry.entries.Store( "B", ®istryEntry{ services: []*consul.ServiceEntry{ { Service: &consul.AgentService{ Address: "10.3.245.137", Port: 8600, }, }, }, }) registry.entries.Store( "C", ®istryEntry{ services: []*consul.ServiceEntry{ { Service: &consul.AgentService{ Address: "10.3.245.137", Port: 8600, }, }, }, }) result, _ := registry.entries.Load("A") assert.NotNil(t, result.(*registryEntry).services) registry.expire("A") result, _ = registry.entries.Load("A") assert.Nil(t, result.(*registryEntry).services) registry.expireAll() count := 0 nilCount := 0 registry.entries.Range(func(key, value any) bool { count++ if value.(*registryEntry).services == nil { nilCount++ } return true }) assert.Equal(t, 3, count) assert.Equal(t, 3, nilCount) }, }, { "should remove entry", func(t *testing.T) { registry := ®istry{} entry := ®istryEntry{ services: []*consul.ServiceEntry{ { Service: &consul.AgentService{ Address: "10.3.245.137", Port: 8600, }, }, }, } registry.entries.Store("A", entry) registry.entries.Store("B", entry) registry.entries.Store("C", entry) registry.entries.Store("D", entry) registry.remove("A") result, _ := registry.entries.Load("A") assert.Nil(t, result) result, _ = registry.entries.Load("B") assert.NotNil(t, result) registry.removeAll() count := 0 registry.entries.Range(func(key, value any) bool { count++ return true }) assert.Equal(t, 0, count) }, }, } for _, tt := range tests { tt := tt t.Run(tt.testName, func(t *testing.T) { t.Parallel() tt.test(t) }) } } func TestParseConfig(t *testing.T) { tests := []struct { testName string shouldParse bool input interface{} expected configSpec }{ { "valid configuration in metadata", true, map[any]any{ "Checks": []interface{}{ map[any]any{ "Name": "test-app health check name", "CheckID": "test-app health check id", "Interval": "15s", "HTTP": "http://127.0.0.1:3500/health", }, }, "Tags": []interface{}{ "dapr", "test", }, "Meta": map[any]any{ "APP_PORT": "123", "DAPR_HTTP_PORT": "3500", "DAPR_GRPC_PORT": "50005", }, "QueryOptions": map[any]any{ "UseCache": true, "Filter": "Checks.ServiceTags contains dapr", }, "DaprPortMetaKey": "DAPR_PORT", "UseCache": false, }, configSpec{ Checks: []*consul.AgentServiceCheck{ { Name: "test-app health check name", CheckID: "test-app health check id", Interval: "15s", HTTP: "http://127.0.0.1:3500/health", }, }, Tags: []string{ "dapr", "test", }, Meta: map[string]string{ "APP_PORT": "123", "DAPR_HTTP_PORT": "3500", "DAPR_GRPC_PORT": "50005", }, QueryOptions: &consul.QueryOptions{ UseCache: true, Filter: "Checks.ServiceTags contains dapr", }, DaprPortMetaKey: "DAPR_PORT", UseCache: false, }, }, { "empty configuration in metadata", true, nil, configSpec{ DaprPortMetaKey: defaultDaprPortMetaKey, }, }, { "fail on unsupported map key", false, map[any]any{ 1000: map[any]any{ "DAPR_HTTP_PORT": "3500", "DAPR_GRPC_PORT": "50005", }, }, configSpec{}, }, } for _, tt := range tests { tt := tt t.Run(tt.testName, func(t *testing.T) { actual, err := parseConfig(tt.input) if tt.shouldParse { require.NoError(t, err) assert.Equal(t, tt.expected, actual) } else { require.Error(t, err) } }) } } func TestGetConfig(t *testing.T) { tests := []struct { testName string metadata nr.Metadata test func(*testing.T, nr.Metadata) }{ { "empty configuration should only return Client, QueryOptions and DaprPortMetaKey", nr.Metadata{ Instance: getInstanceInfoWithoutKey(""), Configuration: nil, }, func(t *testing.T, metadata nr.Metadata) { actual, _ := getConfig(metadata) // Client assert.Equal(t, consul.DefaultConfig().Address, actual.Client.Address) // Registration assert.Nil(t, actual.Registration) // QueryOptions assert.NotNil(t, actual.QueryOptions) assert.True(t, actual.QueryOptions.UseCache) // DaprPortMetaKey assert.Equal(t, defaultDaprPortMetaKey, actual.DaprPortMetaKey) // Cache assert.False(t, actual.UseCache) }, }, { "empty configuration with SelfRegister should default correctly", nr.Metadata{ Instance: getInstanceInfoWithoutKey(""), Configuration: map[any]any{ "SelfRegister": true, }, }, func(t *testing.T, metadata nr.Metadata) { actual, _ := getConfig(metadata) // Client assert.Equal(t, consul.DefaultConfig().Address, actual.Client.Address) // Checks assert.Len(t, actual.Registration.Checks, 1) check := actual.Registration.Checks[0] assert.Equal(t, "Dapr Health Status", check.Name) assert.Equal(t, "daprHealth:test-app-"+metadata.Instance.Address+"-"+strconv.Itoa(metadata.Instance.DaprHTTPPort), check.CheckID) assert.Equal(t, "15s", check.Interval) assert.Equal(t, fmt.Sprintf("http://%s/v1.0/healthz?appid=%s", net.JoinHostPort(metadata.Instance.Address, strconv.Itoa(metadata.Instance.DaprHTTPPort)), metadata.Instance.AppID), check.HTTP) // Metadata assert.Len(t, actual.Registration.Meta, 1) assert.Equal(t, "50001", actual.Registration.Meta[actual.DaprPortMetaKey]) // QueryOptions assert.True(t, actual.QueryOptions.UseCache) // DaprPortMetaKey assert.Equal(t, defaultDaprPortMetaKey, actual.DaprPortMetaKey) // Cache assert.False(t, actual.UseCache) }, }, { "DaprPortMetaKey should set registration meta and config used for resolve", nr.Metadata{ Instance: getInstanceInfoWithoutKey(""), Configuration: map[any]any{ "SelfRegister": true, "DaprPortMetaKey": "random_key", }, }, func(t *testing.T, metadata nr.Metadata) { actual, _ := getConfig(metadata) daprPort := strconv.Itoa(metadata.Instance.DaprInternalPort) assert.Equal(t, "random_key", actual.DaprPortMetaKey) assert.Equal(t, daprPort, actual.Registration.Meta["random_key"]) }, }, { "SelfDeregister should set DeregisterOnClose", nr.Metadata{ Instance: getInstanceInfoWithoutKey(""), Configuration: map[any]any{ "SelfRegister": true, "SelfDeregister": true, }, }, func(t *testing.T, metadata nr.Metadata) { actual, _ := getConfig(metadata) assert.True(t, actual.DeregisterOnClose) }, }, { "missing AppID property should error when SelfRegister true", nr.Metadata{ Instance: getInstanceInfoWithoutKey("AppID"), Configuration: map[any]any{ "SelfRegister": true, }, }, func(t *testing.T, metadata nr.Metadata) { _, err := getConfig(metadata) require.Error(t, err) assert.Contains(t, err.Error(), nr.AppID) metadata.Configuration = configSpec{ SelfRegister: false, } _, err = getConfig(metadata) require.NoError(t, err) metadata.Configuration = configSpec{ AdvancedRegistration: &consul.AgentServiceRegistration{}, QueryOptions: &consul.QueryOptions{}, } _, err = getConfig(metadata) require.NoError(t, err) }, }, { "missing AppPort property should error when SelfRegister true", nr.Metadata{ Instance: getInstanceInfoWithoutKey("AppPort"), Configuration: map[any]any{ "SelfRegister": true, }, }, func(t *testing.T, metadata nr.Metadata) { _, err := getConfig(metadata) require.Error(t, err) assert.Contains(t, err.Error(), nr.AppPort) metadata.Configuration = configSpec{ SelfRegister: false, } _, err = getConfig(metadata) require.NoError(t, err) metadata.Configuration = configSpec{ AdvancedRegistration: &consul.AgentServiceRegistration{}, QueryOptions: &consul.QueryOptions{}, } _, err = getConfig(metadata) require.NoError(t, err) }, }, { "missing Address property should error when SelfRegister true", nr.Metadata{ Instance: getInstanceInfoWithoutKey("Address"), Configuration: map[any]any{ "SelfRegister": true, }, }, func(t *testing.T, metadata nr.Metadata) { _, err := getConfig(metadata) require.Error(t, err) assert.Contains(t, err.Error(), "HOST_ADDRESS") metadata.Configuration = configSpec{ SelfRegister: false, } _, err = getConfig(metadata) require.NoError(t, err) metadata.Configuration = configSpec{ AdvancedRegistration: &consul.AgentServiceRegistration{}, QueryOptions: &consul.QueryOptions{}, } _, err = getConfig(metadata) require.NoError(t, err) }, }, { "missing DaprHTTPPort property should error only when SelfRegister true", nr.Metadata{ Instance: getInstanceInfoWithoutKey("DaprHTTPPort"), Configuration: map[any]any{ "SelfRegister": true, }, }, func(t *testing.T, metadata nr.Metadata) { _, err := getConfig(metadata) require.Error(t, err) assert.Contains(t, err.Error(), "DAPR_HTTP_PORT") metadata.Configuration = configSpec{ SelfRegister: false, } _, err = getConfig(metadata) require.NoError(t, err) metadata.Configuration = configSpec{ AdvancedRegistration: &consul.AgentServiceRegistration{}, QueryOptions: &consul.QueryOptions{}, } _, err = getConfig(metadata) require.NoError(t, err) }, }, { "missing DaprInternalPort property should always error", nr.Metadata{ Instance: getInstanceInfoWithoutKey("DaprInternalPort"), }, func(t *testing.T, metadata nr.Metadata) { metadata.Configuration = configSpec{ SelfRegister: false, } _, err := getConfig(metadata) require.Error(t, err) assert.Contains(t, err.Error(), "DAPR_PORT") metadata.Configuration = configSpec{ SelfRegister: true, } _, err = getConfig(metadata) require.Error(t, err) assert.Contains(t, err.Error(), "DAPR_PORT") metadata.Configuration = configSpec{ AdvancedRegistration: &consul.AgentServiceRegistration{}, QueryOptions: &consul.QueryOptions{}, } _, err = getConfig(metadata) require.Error(t, err) assert.Contains(t, err.Error(), "DAPR_PORT") }, }, { "registration should configure correctly", nr.Metadata{ Instance: getInstanceInfoWithoutKey(""), Configuration: map[any]any{ "Checks": []interface{}{ map[any]any{ "Name": "test-app health check name", "CheckID": "test-app health check id", "Interval": "15s", "HTTP": "http://127.0.0.1:3500/health", }, }, "Tags": []interface{}{ "test", }, "Meta": map[any]any{ "APP_PORT": "8650", "DAPR_GRPC_PORT": "50005", }, "QueryOptions": map[any]any{ "UseCache": false, "Filter": "Checks.ServiceTags contains something", }, "SelfRegister": true, "DaprPortMetaKey": "PORT", "UseCache": false, }, }, func(t *testing.T, metadata nr.Metadata) { actual, _ := getConfig(metadata) // Enabled Registration assert.NotNil(t, actual.Registration) assert.Equal(t, metadata.Instance.AppID, actual.Registration.Name) assert.Equal(t, metadata.Instance.Address, actual.Registration.Address) assert.Equal(t, metadata.Instance.AppPort, actual.Registration.Port) assert.Equal(t, "test-app health check name", actual.Registration.Checks[0].Name) assert.Equal(t, "test-app health check id", actual.Registration.Checks[0].CheckID) assert.Equal(t, "15s", actual.Registration.Checks[0].Interval) assert.Equal(t, "http://127.0.0.1:3500/health", actual.Registration.Checks[0].HTTP) assert.Equal(t, "test", actual.Registration.Tags[0]) assert.Equal(t, "8650", actual.Registration.Meta["APP_PORT"]) assert.Equal(t, "50005", actual.Registration.Meta["DAPR_GRPC_PORT"]) assert.Equal(t, strconv.Itoa(metadata.Instance.DaprInternalPort), actual.Registration.Meta["PORT"]) assert.False(t, actual.QueryOptions.UseCache) assert.Equal(t, "Checks.ServiceTags contains something", actual.QueryOptions.Filter) assert.Equal(t, "PORT", actual.DaprPortMetaKey) assert.False(t, actual.UseCache) }, }, { "advanced registration should override/ignore other configs", nr.Metadata{ Instance: getInstanceInfoWithoutKey(""), Configuration: map[any]any{ "AdvancedRegistration": map[any]any{ "Name": "random-app-id", "Port": 0o00, "Address": "123.345.678", "Tags": []string{"random-tag"}, "Meta": map[string]string{ "APP_PORT": "000", }, "Checks": []interface{}{ map[any]any{ "Name": "random health check name", "CheckID": "random health check id", "Interval": "15s", "HTTP": "http://127.0.0.1:3500/health", }, }, }, "Checks": []interface{}{ map[any]any{ "Name": "test-app health check name", "CheckID": "test-app health check id", "Interval": "15s", "HTTP": "http://127.0.0.1:3500/health", }, }, "Tags": []string{ "dapr", "test", }, "Meta": map[string]string{ "APP_PORT": "123", "DAPR_HTTP_PORT": "3500", "DAPR_GRPC_PORT": "50005", }, "SelfRegister": false, }, }, func(t *testing.T, metadata nr.Metadata) { actual, _ := getConfig(metadata) // Enabled Registration assert.NotNil(t, actual.Registration) assert.Equal(t, "random-app-id", actual.Registration.Name) assert.Equal(t, "123.345.678", actual.Registration.Address) assert.Equal(t, 0o00, actual.Registration.Port) assert.Equal(t, "random health check name", actual.Registration.Checks[0].Name) assert.Equal(t, "000", actual.Registration.Meta["APP_PORT"]) assert.Equal(t, "random-tag", actual.Registration.Tags[0]) }, }, } for _, tt := range tests { tt := tt t.Run(tt.testName, func(t *testing.T) { tt.test(t, tt.metadata) }) } } func TestMapConfig(t *testing.T) { t.Run("should map full configuration", func(t *testing.T) { expected := intermediateConfig{ Client: &Config{ Address: "Address", Scheme: "Scheme", Datacenter: "Datacenter", HTTPAuth: &HTTPBasicAuth{ Username: "Username", Password: "Password", }, WaitTime: 10, Token: "Token", TokenFile: "TokenFile", TLSConfig: TLSConfig{ Address: "Address", CAFile: "CAFile", CAPath: "CAPath", CertFile: "CertFile", KeyFile: "KeyFile", InsecureSkipVerify: true, }, }, Checks: []*AgentServiceCheck{ { Args: []string{ "arg1", "arg2", }, CheckID: "CheckID", Name: "Name", DockerContainerID: "DockerContainerID", Shell: "Shell", Interval: "Interval", Timeout: "Timeout", TTL: "TTL", HTTP: "HTTP", Method: "Method", TCP: "TCP", Status: "Status", Notes: "Notes", GRPC: "GRPC", AliasNode: "AliasNode", AliasService: "AliasService", DeregisterCriticalServiceAfter: "DeregisterCriticalServiceAfter", Header: map[string][]string{ "M": {"Key", "Value"}, "M2": {"Key2", "Value2"}, }, TLSSkipVerify: true, GRPCUseTLS: true, }, { Args: []string{ "arg1", "arg2", }, CheckID: "CheckID2", Name: "Name2", DockerContainerID: "DockerContainerID2", Shell: "Shell2", Interval: "Interval2", Timeout: "Timeout2", TTL: "TTL2", HTTP: "HTTP2", Method: "Method2", TCP: "TCP2", Status: "Status2", Notes: "Notes2", GRPC: "GRPC2", AliasNode: "AliasNode2", AliasService: "AliasService2", DeregisterCriticalServiceAfter: "DeregisterCriticalServiceAfter2", Header: map[string][]string{ "M": {"Key", "Value"}, "M2": {"Key2", "Value2"}, }, TLSSkipVerify: true, GRPCUseTLS: true, }, }, Tags: []string{ "tag1", "tag2", }, Meta: map[string]string{ "M": "Value", "M2": "Value2", }, QueryOptions: &QueryOptions{ Datacenter: "Datacenter", WaitHash: "WaitHash", Token: "Token", Near: "Near", Filter: "Filter", MaxAge: 11, StaleIfError: 22, WaitIndex: 33, WaitTime: 44, NodeMeta: map[string]string{ "M": "Value", "M2": "Value2", }, AllowStale: true, RequireConsistent: true, UseCache: true, RelayFactor: 55, LocalOnly: true, Connect: true, }, AdvancedRegistration: &AgentServiceRegistration{ Kind: "Kind", ID: "ID", Name: "Name", Tags: []string{ "tag1", "tag2", }, Meta: map[string]string{ "M": "Value", "M2": "Value2", }, Port: 123456, Address: "Address", TaggedAddresses: map[string]ServiceAddress{ "T1": { Address: "Address", Port: 234567, }, "T2": { Address: "Address", Port: 345678, }, }, EnableTagOverride: true, Weights: &AgentWeights{ Passing: 123, Warning: 321, }, Check: &AgentServiceCheck{ Args: []string{ "arg1", "arg2", }, CheckID: "CheckID2", Name: "Name2", DockerContainerID: "DockerContainerID2", Shell: "Shell2", Interval: "Interval2", Timeout: "Timeout2", TTL: "TTL2", HTTP: "HTTP2", Method: "Method2", TCP: "TCP2", Status: "Status2", Notes: "Notes2", GRPC: "GRPC2", AliasNode: "AliasNode2", AliasService: "AliasService2", DeregisterCriticalServiceAfter: "DeregisterCriticalServiceAfter2", Header: map[string][]string{ "M": {"Key", "Value"}, "M2": {"Key2", "Value2"}, }, TLSSkipVerify: true, GRPCUseTLS: true, }, Checks: AgentServiceChecks{ { Args: []string{ "arg1", "arg2", }, CheckID: "CheckID", Name: "Name", DockerContainerID: "DockerContainerID", Shell: "Shell", Interval: "Interval", Timeout: "Timeout", TTL: "TTL", HTTP: "HTTP", Method: "Method", TCP: "TCP", Status: "Status", Notes: "Notes", GRPC: "GRPC", AliasNode: "AliasNode", AliasService: "AliasService", DeregisterCriticalServiceAfter: "DeregisterCriticalServiceAfter", Header: map[string][]string{ "M": {"Key", "Value"}, "M2": {"Key2", "Value2"}, }, TLSSkipVerify: true, GRPCUseTLS: true, }, { Args: []string{ "arg1", "arg2", }, CheckID: "CheckID2", Name: "Name2", DockerContainerID: "DockerContainerID2", Shell: "Shell2", Interval: "Interval2", Timeout: "Timeout2", TTL: "TTL2", HTTP: "HTTP2", Method: "Method2", TCP: "TCP2", Status: "Status2", Notes: "Notes2", GRPC: "GRPC2", AliasNode: "AliasNode2", AliasService: "AliasService2", DeregisterCriticalServiceAfter: "DeregisterCriticalServiceAfter2", Header: map[string][]string{ "M": {"Key", "Value"}, "M2": {"Key2", "Value2"}, }, TLSSkipVerify: true, GRPCUseTLS: true, }, }, Proxy: &AgentServiceConnectProxyConfig{ DestinationServiceName: "DestinationServiceName", DestinationServiceID: "DestinationServiceID", LocalServiceAddress: "LocalServiceAddress", LocalServicePort: 123456, Config: map[string]interface{}{ "Random": 123123, }, Upstreams: []Upstream{ { DestinationType: "DestinationType", DestinationNamespace: "DestinationNamespace", DestinationName: "DestinationName", Datacenter: "Datacenter", LocalBindAddress: "LocalBindAddress", LocalBindPort: 234567, Config: map[string]interface{}{ "Random": 321321, }, MeshGateway: MeshGatewayConfig{ Mode: "Mode", }, }, }, MeshGateway: MeshGatewayConfig{ Mode: "Mode2", }, Expose: ExposeConfig{ Checks: true, Paths: []ExposePath{ { ListenerPort: 123456, Path: "Path", LocalPathPort: 234567, Protocol: "Protocol", ParsedFromCheck: true, }, }, }, }, Connect: &AgentServiceConnect{ Native: true, SidecarService: nil, }, }, SelfRegister: true, DaprPortMetaKey: "SOMETHINGSOMETHING", UseCache: false, } actual := mapConfig(expected) compareQueryOptions(t, expected.QueryOptions, actual.QueryOptions) compareRegistration(t, expected.AdvancedRegistration, actual.AdvancedRegistration) compareClientConfig(t, expected.Client, actual.Client) for i := 0; i < len(expected.Checks); i++ { compareCheck(t, expected.Checks[i], actual.Checks[i]) } assert.Equal(t, expected.Tags, actual.Tags) assert.Equal(t, expected.Meta, actual.Meta) assert.Equal(t, expected.SelfRegister, actual.SelfRegister) assert.Equal(t, expected.DaprPortMetaKey, actual.DaprPortMetaKey) assert.Equal(t, expected.UseCache, actual.UseCache) }) t.Run("should map empty configuration", func(t *testing.T) { expected := intermediateConfig{} actual := mapConfig(expected) assert.Equal(t, configSpec{}, actual) }) } func compareQueryOptions(t *testing.T, expected *QueryOptions, actual *consul.QueryOptions) { assert.Equal(t, expected.Datacenter, actual.Datacenter) assert.Equal(t, expected.AllowStale, actual.AllowStale) assert.Equal(t, expected.RequireConsistent, actual.RequireConsistent) assert.Equal(t, expected.UseCache, actual.UseCache) assert.Equal(t, expected.MaxAge, actual.MaxAge) assert.Equal(t, expected.StaleIfError, actual.StaleIfError) assert.Equal(t, expected.WaitIndex, actual.WaitIndex) assert.Equal(t, expected.WaitHash, actual.WaitHash) assert.Equal(t, expected.WaitTime, actual.WaitTime) assert.Equal(t, expected.Token, actual.Token) assert.Equal(t, expected.Near, actual.Near) assert.Equal(t, expected.NodeMeta, actual.NodeMeta) assert.Equal(t, expected.RelayFactor, actual.RelayFactor) assert.Equal(t, expected.LocalOnly, actual.LocalOnly) assert.Equal(t, expected.Connect, actual.Connect) assert.Equal(t, expected.Filter, actual.Filter) } func compareClientConfig(t *testing.T, expected *Config, actual *consul.Config) { assert.Equal(t, expected.Address, actual.Address) assert.Equal(t, expected.Datacenter, actual.Datacenter) if expected.HTTPAuth != nil { assert.Equal(t, expected.HTTPAuth.Username, actual.HttpAuth.Username) assert.Equal(t, expected.HTTPAuth.Password, actual.HttpAuth.Password) } assert.Equal(t, expected.Scheme, actual.Scheme) assert.Equal(t, expected.TLSConfig.Address, actual.TLSConfig.Address) assert.Equal(t, expected.TLSConfig.CAFile, actual.TLSConfig.CAFile) assert.Equal(t, expected.TLSConfig.CAPath, actual.TLSConfig.CAPath) assert.Equal(t, expected.TLSConfig.CertFile, actual.TLSConfig.CertFile) assert.Equal(t, expected.TLSConfig.InsecureSkipVerify, actual.TLSConfig.InsecureSkipVerify) assert.Equal(t, expected.TLSConfig.KeyFile, actual.TLSConfig.KeyFile) assert.Equal(t, expected.Token, actual.Token) assert.Equal(t, expected.TokenFile, actual.TokenFile) assert.Equal(t, expected.WaitTime, actual.WaitTime) } func compareRegistration(t *testing.T, expected *AgentServiceRegistration, actual *consul.AgentServiceRegistration) { assert.Equal(t, expected.Kind, string(actual.Kind)) assert.Equal(t, expected.ID, actual.ID) assert.Equal(t, expected.Name, actual.Name) assert.Equal(t, expected.Tags, actual.Tags) assert.Equal(t, expected.Port, actual.Port) assert.Equal(t, expected.Address, actual.Address) if expected.TaggedAddresses != nil { for k := range expected.TaggedAddresses { assert.Equal(t, expected.TaggedAddresses[k].Address, actual.TaggedAddresses[k].Address) assert.Equal(t, expected.TaggedAddresses[k].Port, actual.TaggedAddresses[k].Port) } } assert.Equal(t, expected.EnableTagOverride, actual.EnableTagOverride) assert.Equal(t, expected.Meta, actual.Meta) if expected.Weights != nil { assert.Equal(t, expected.Weights.Passing, actual.Weights.Passing) assert.Equal(t, expected.Weights.Warning, actual.Weights.Warning) } compareCheck(t, expected.Check, actual.Check) for i := 0; i < len(expected.Checks); i++ { compareCheck(t, expected.Checks[i], actual.Checks[i]) } if expected.Proxy != nil { assert.Equal(t, expected.Proxy.DestinationServiceName, actual.Proxy.DestinationServiceName) assert.Equal(t, expected.Proxy.DestinationServiceID, actual.Proxy.DestinationServiceID) assert.Equal(t, expected.Proxy.LocalServiceAddress, actual.Proxy.LocalServiceAddress) assert.Equal(t, expected.Proxy.LocalServicePort, actual.Proxy.LocalServicePort) assert.Equal(t, expected.Proxy.Config, actual.Proxy.Config) for i := 0; i < len(expected.Proxy.Upstreams); i++ { assert.Equal(t, string(expected.Proxy.Upstreams[i].DestinationType), string(actual.Proxy.Upstreams[i].DestinationType)) assert.Equal(t, expected.Proxy.Upstreams[i].DestinationNamespace, actual.Proxy.Upstreams[i].DestinationNamespace) assert.Equal(t, expected.Proxy.Upstreams[i].DestinationName, actual.Proxy.Upstreams[i].DestinationName) assert.Equal(t, expected.Proxy.Upstreams[i].Datacenter, actual.Proxy.Upstreams[i].Datacenter) assert.Equal(t, expected.Proxy.Upstreams[i].LocalBindAddress, actual.Proxy.Upstreams[i].LocalBindAddress) assert.Equal(t, expected.Proxy.Upstreams[i].LocalBindPort, actual.Proxy.Upstreams[i].LocalBindPort) assert.Equal(t, expected.Proxy.Upstreams[i].Config, actual.Proxy.Upstreams[i].Config) assert.Equal(t, string(expected.Proxy.Upstreams[i].MeshGateway.Mode), string(actual.Proxy.Upstreams[i].MeshGateway.Mode)) } assert.Equal(t, string(expected.Proxy.MeshGateway.Mode), string(actual.Proxy.MeshGateway.Mode)) assert.Equal(t, expected.Proxy.Expose.Checks, actual.Proxy.Expose.Checks) for i := 0; i < len(expected.Proxy.Expose.Paths); i++ { assert.Equal(t, expected.Proxy.Expose.Paths[i].ListenerPort, actual.Proxy.Expose.Paths[i].ListenerPort) assert.Equal(t, expected.Proxy.Expose.Paths[i].LocalPathPort, actual.Proxy.Expose.Paths[i].LocalPathPort) assert.Equal(t, expected.Proxy.Expose.Paths[i].ParsedFromCheck, actual.Proxy.Expose.Paths[i].ParsedFromCheck) assert.Equal(t, expected.Proxy.Expose.Paths[i].Path, actual.Proxy.Expose.Paths[i].Path) assert.Equal(t, expected.Proxy.Expose.Paths[i].Protocol, actual.Proxy.Expose.Paths[i].Protocol) } } assert.Equal(t, expected.Connect.Native, actual.Connect.Native) if expected.Connect.SidecarService != nil { compareRegistration(t, expected.Connect.SidecarService, actual.Connect.SidecarService) } } func compareCheck(t *testing.T, expected *AgentServiceCheck, actual *consul.AgentServiceCheck) { assert.Equal(t, expected.Args, actual.Args) assert.Equal(t, expected.CheckID, actual.CheckID) assert.Equal(t, expected.Name, actual.Name) assert.Equal(t, expected.DockerContainerID, actual.DockerContainerID) assert.Equal(t, expected.Shell, actual.Shell) assert.Equal(t, expected.Interval, actual.Interval) assert.Equal(t, expected.Timeout, actual.Timeout) assert.Equal(t, expected.TTL, actual.TTL) assert.Equal(t, expected.HTTP, actual.HTTP) assert.Equal(t, expected.Method, actual.Method) assert.Equal(t, expected.TCP, actual.TCP) assert.Equal(t, expected.Status, actual.Status) assert.Equal(t, expected.Notes, actual.Notes) assert.Equal(t, expected.GRPC, actual.GRPC) assert.Equal(t, expected.AliasNode, actual.AliasNode) assert.Equal(t, expected.AliasService, actual.AliasService) assert.Equal(t, expected.DeregisterCriticalServiceAfter, actual.DeregisterCriticalServiceAfter) assert.Equal(t, expected.Header, actual.Header) assert.Equal(t, expected.TLSSkipVerify, actual.TLSSkipVerify) assert.Equal(t, expected.GRPCUseTLS, actual.GRPCUseTLS) } func getInstanceInfoWithoutKey(removeKey string) nr.Instance { res := nr.Instance{ AppID: "test-app", AppPort: 8650, DaprInternalPort: 50001, DaprHTTPPort: 3500, Address: "127.0.0.1", } switch removeKey { case "AppID": res.AppID = "" case "AppPort": res.AppPort = 0 case "DaprInternalPort": res.DaprInternalPort = 0 case "DaprHTTPPort": res.DaprHTTPPort = 0 case "Address": res.Address = "" } return res } func waitTillTrueOrTimeout(d time.Duration, condition func() bool) { for i := 0; i < 100; i++ { if condition() { return } time.Sleep(d / 100) } }