mDNS round robin (#540)

* WIP mdns rr

* add callback mechanism

* add initial testing

* fix lint

* add debug logs

* register as unique instance

* update comments

* add continue when no address

* fix lint

* add test for empty address list next

* refresh on new app id in the background

* fix lint

* update comment

* add way to control mDNS instance ID

* update test case

* add guard for empty refreshes

* update log message to include app id

* update log message to include app id

* update comment

Co-authored-by: Yaron Schneider <yaronsc@microsoft.com>
This commit is contained in:
Joni Collinge 2020-12-14 20:50:21 +00:00 committed by GitHub
parent a4884f1d4b
commit e881e4ba28
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 840 additions and 44 deletions

View File

@ -9,9 +9,11 @@ import (
"context"
"errors"
"fmt"
"math"
"os"
"os/signal"
"strconv"
"sync"
"syscall"
"time"
@ -20,26 +22,150 @@ import (
"github.com/grandcat/zeroconf"
)
const browseTimeout = time.Second * 1
const (
// firstOnlyTimeout is the timeout used when
// browsing for the first response to a single app id.
firstOnlyTimeout = time.Second * 1
// refreshTimeout is the timeout used when
// browsing for any responses to a single app id.
refreshTimeout = time.Second * 3
// refreshInterval is the duration between
// background address refreshes.
refreshInterval = time.Second * 30
// addressTTL is the duration an address has before
// becoming stale and being evicted.
addressTTL = time.Second * 45
)
// address is used to store an ip address along with
// an expiry time at which point the address is considered
// too stale to trust.
type address struct {
ip string
expiresAt time.Time
}
// addressList represents a set of addresses along with
// data used to control and access said addresses.
type addressList struct {
addresses []*address
counter uint32
mu sync.RWMutex
}
// expire removes any addresses with an expiry time earlier
// than the current time.
func (a *addressList) expire() {
a.mu.Lock()
defer a.mu.Unlock()
i := 0
for _, addr := range a.addresses {
if time.Now().Before(addr.expiresAt) {
a.addresses[i] = addr
i++
}
}
for j := i; j < len(a.addresses); j++ {
a.addresses[j] = nil // clear truncated pointers
}
a.addresses = a.addresses[:i] // resize slice
}
// add adds a new address to the address list with a
// maximum expiry time. For existing addresses, the
// expiry time is updated to the maximum.
// TODO: Consider enforcing a maximum address list size.
func (a *addressList) add(ip string) {
a.mu.Lock()
defer a.mu.Unlock()
for _, addr := range a.addresses {
if addr.ip == ip {
addr.expiresAt = time.Now().Add(addressTTL)
return
}
}
a.addresses = append(a.addresses, &address{
ip: ip,
expiresAt: time.Now().Add(addressTTL),
})
}
// next gets the next address from the list given
// the current round robin implementation.
// There are no guarantees on the selection
// beyond best effort linear iteration.
func (a *addressList) next() *string {
a.mu.RLock()
defer a.mu.RUnlock()
if len(a.addresses) == 0 {
return nil
}
if a.counter == math.MaxUint32 {
a.counter = 0
}
index := a.counter % uint32(len(a.addresses))
addr := a.addresses[index]
a.counter++
return &addr.ip
}
// NewResolver creates the instance of mDNS name resolver.
func NewResolver(logger logger.Logger) nameresolution.Resolver {
return &resolver{logger: logger}
r := &resolver{
appAddressesIPv4: make(map[string]*addressList),
appAddressesIPv6: make(map[string]*addressList),
refreshChan: make(chan string),
logger: logger,
}
// refresh app addresses on demand.
go func() {
for appID := range r.refreshChan {
if err := r.refreshApp(context.Background(), appID); err != nil {
r.logger.Warnf(err.Error())
}
}
}()
// refresh all app addresses periodically.
go func() {
for {
time.Sleep(refreshInterval)
if err := r.refreshAllApps(context.Background()); err != nil {
r.logger.Warnf(err.Error())
}
}
}()
return r
}
type resolver struct {
logger logger.Logger
ipv4Mu sync.RWMutex
appAddressesIPv4 map[string]*addressList
ipv6Mu sync.RWMutex
appAddressesIPv6 map[string]*addressList
refreshChan chan string
logger logger.Logger
}
// Init registers service for mDNS.
func (m *resolver) Init(metadata nameresolution.Metadata) error {
var id string
var appID string
var hostAddress string
var ok bool
var instanceID string
props := metadata.Properties
if id, ok = props[nameresolution.MDNSInstanceName]; !ok {
if appID, ok = props[nameresolution.MDNSInstanceName]; !ok {
return errors.New("name is missing")
}
if hostAddress, ok = props[nameresolution.MDNSInstanceAddress]; !ok {
@ -56,15 +182,19 @@ func (m *resolver) Init(metadata nameresolution.Metadata) error {
return errors.New("port is invalid")
}
err = m.registerMDNS(id, []string{hostAddress}, int(port))
if instanceID, ok = props[nameresolution.MDNSInstanceID]; !ok {
instanceID = ""
}
err = m.registerMDNS(instanceID, appID, []string{hostAddress}, int(port))
if err == nil {
m.logger.Infof("local service entry announced: %s -> %s:%d", id, hostAddress, port)
m.logger.Infof("local service entry announced: %s -> %s:%d", appID, hostAddress, port)
}
return err
}
func (m *resolver) registerMDNS(id string, ips []string, port int) error {
func (m *resolver) registerMDNS(instanceID string, appID string, ips []string, port int) error {
started := make(chan bool, 1)
var err error
@ -72,12 +202,17 @@ func (m *resolver) registerMDNS(id string, ips []string, port int) error {
var server *zeroconf.Server
host, _ := os.Hostname()
info := []string{id}
info := []string{appID}
// default instance id is unique to the process.
if instanceID == "" {
instanceID = fmt.Sprintf("%s-%d", host, syscall.Getpid())
}
if len(ips) > 0 {
server, err = zeroconf.RegisterProxy(host, id, "local.", port, host, ips, info, nil)
server, err = zeroconf.RegisterProxy(instanceID, appID, "local.", port, host, ips, info, nil)
} else {
server, err = zeroconf.Register(host, id, "local.", port, info, nil)
server, err = zeroconf.Register(instanceID, appID, "local.", port, info, nil)
}
if err != nil {
@ -103,52 +238,333 @@ func (m *resolver) registerMDNS(id string, ips []string, port int) error {
// ResolveID resolves name to address via mDNS.
func (m *resolver) ResolveID(req nameresolution.ResolveRequest) (string, error) {
resolver, err := zeroconf.NewResolver(nil)
if err != nil {
return "", fmt.Errorf("failed to initialize resolver: %e", err)
// check for cached IPv4 addresses for this app id first.
if addr := m.nextIPv4Address(req.ID); addr != nil {
return *addr, nil
}
port := -1
// check for cached IPv6 addresses for this app id second.
if addr := m.nextIPv6Address(req.ID); addr != nil {
return *addr, nil
}
// cache miss, fallback to browsing the network for addresses.
m.logger.Debugf("no mDNS address found in cache, browsing network for app id %s", req.ID)
// get the first address we receive...
addr, err := m.browseFirstOnly(context.Background(), req.ID)
if err == nil {
// ...and trigger a background refresh for any additional addresses.
m.refreshChan <- req.ID
}
return addr, err
}
// browseFirstOnly will perform a mDNS network browse for an address
// matching the provided app id. It will return the first address it
// receives and stop browsing for any more.
func (m *resolver) browseFirstOnly(ctx context.Context, appID string) (string, error) {
var addr string
ctx, cancel := context.WithTimeout(ctx, firstOnlyTimeout)
defer cancel()
// onFirst will be invoked on the first address received.
// Due to the asynchronous nature of cancel() there
// is no guarantee that this will ONLY be invoked on the
// first address. Ensure that multiple invocations of this
// function are safe.
onFirst := func(ip string) {
addr = ip
cancel() // cancel to stop browsing.
}
m.logger.Debugf("Browsing for first mDNS address for app id %s", appID)
err := m.browse(ctx, appID, onFirst)
if err != nil {
return "", err
}
// wait for the context to be canceled or time out.
<-ctx.Done()
if errors.Is(ctx.Err(), context.Canceled) {
// expect this when we've found an address and canceled the browse.
m.logger.Debugf("Browsing for first mDNS address for app id %s canceled.", appID)
} else if errors.Is(ctx.Err(), context.DeadlineExceeded) {
// expect this when we've been unable to find the first address before the timeout.
m.logger.Debugf("Browsing for first mDNS address for app id %s timed out.", appID)
}
if addr == "" {
return "", fmt.Errorf("couldn't find service: %s", appID)
}
return addr, nil
}
// refreshApp will perform a mDNS network browse for a provided
// app id. This function is blocking.
func (m *resolver) refreshApp(ctx context.Context, appID string) error {
if appID == "" {
return nil
}
m.logger.Debugf("Refreshing mDNS addresses for app id %s.", appID)
ctx, cancel := context.WithTimeout(ctx, refreshTimeout)
defer cancel()
if err := m.browse(ctx, appID, nil); err != nil {
return err
}
// wait for the context to be canceled or time out.
<-ctx.Done()
if errors.Is(ctx.Err(), context.Canceled) {
// this is not expected, investigate why context was canceled.
m.logger.Warnf("Refreshing mDNS addresses for app id %s canceled.", appID)
} else if errors.Is(ctx.Err(), context.DeadlineExceeded) {
// expect this when our browse has timedout.
m.logger.Debugf("Refreshing mDNS addresses for app id %s timed out.", appID)
}
return nil
}
// refreshAllApps will perform a mDNS network browse for each address
// currently in the cache. This function is blocking.
func (m *resolver) refreshAllApps(ctx context.Context) error {
m.logger.Debug("Refreshing all mDNS addresses.")
// check if we have any IPv4 or IPv6 addresses
// in the address cache that need refreshing.
m.ipv4Mu.RLock()
numAppIPv4Addr := len(m.appAddressesIPv4)
m.ipv4Mu.RUnlock()
m.ipv6Mu.RLock()
numAppIPv6Addr := len(m.appAddressesIPv4)
m.ipv6Mu.RUnlock()
numApps := numAppIPv4Addr + numAppIPv6Addr
if numApps == 0 {
m.logger.Debug("no mDNS apps to refresh.")
return nil
}
var wg sync.WaitGroup
// expired addresses will be evicted by getAppIDs()
for _, appID := range m.getAppIDs() {
_appID := appID
wg.Add(1)
go func() {
defer wg.Done()
m.refreshApp(ctx, _appID)
}()
}
// wait for all the app refreshes to complete.
wg.Wait()
return nil
}
// browse will perform a non-blocking mdns network browse for the provided app id.
func (m *resolver) browse(ctx context.Context, appID string, onEach func(ip string)) error {
resolver, err := zeroconf.NewResolver(nil)
if err != nil {
return fmt.Errorf("failed to initialize resolver: %e", err)
}
entries := make(chan *zeroconf.ServiceEntry)
ctx, cancel := context.WithTimeout(context.Background(), browseTimeout)
// handle each service entry returned from the mDNS browse.
go func(results <-chan *zeroconf.ServiceEntry) {
for entry := range results {
for _, text := range entry.Text {
if text == req.ID {
port = entry.Port
if len(entry.AddrIPv4) > 0 {
addr = entry.AddrIPv4[0].String() // entry has IPv4
} else if len(entry.AddrIPv6) > 0 {
addr = entry.AddrIPv6[0].String() // entry has IPv6
} else {
addr = "localhost" // default
for {
select {
case <-ctx.Done():
if errors.Is(ctx.Err(), context.Canceled) {
m.logger.Debugf("mDNS browse for app id %s canceled.", appID)
} else if errors.Is(ctx.Err(), context.DeadlineExceeded) {
m.logger.Debugf("mDNS browse for app id %s timed out.", appID)
}
return
case entry := <-results:
if entry == nil {
break
}
for _, text := range entry.Text {
if text != appID {
m.logger.Debugf("mDNS response doesn't match app id %s, skipping.", appID)
break
}
// cancel timeout because it found the service
cancel()
m.logger.Debugf("mDNS response for app id %s received.", appID)
return
hasIPv4Address := len(entry.AddrIPv4) > 0
hasIPv6Address := len(entry.AddrIPv6) > 0
if !hasIPv4Address && !hasIPv6Address {
m.logger.Debugf("mDNS response for app id %s doesn't contain any IPv4 or IPv6 addresses, skipping.", appID)
break
}
var addr string
port := entry.Port
// TODO: We currently only use the first IPv4 and IPv6 address.
// We should understand the cases in which additional addresses
// are returned and whether we need to support them.
if hasIPv4Address {
addr = fmt.Sprintf("%s:%d", entry.AddrIPv4[0].String(), port)
m.addAppAddressIPv4(appID, addr)
}
if hasIPv6Address {
addr = fmt.Sprintf("%s:%d", entry.AddrIPv6[0].String(), port)
m.addAppAddressIPv6(appID, addr)
}
if onEach != nil {
onEach(addr) // invoke callback.
}
}
}
}
}(entries)
if err = resolver.Browse(ctx, req.ID, "local.", entries); err != nil {
// cancel context
cancel()
return "", fmt.Errorf("failed to browse: %s", err.Error())
if err = resolver.Browse(ctx, appID, "local.", entries); err != nil {
return fmt.Errorf("failed to browse: %s", err.Error())
}
// wait until context is cancelled or timeed out.
<-ctx.Done()
if port == -1 || addr == "" {
return "", fmt.Errorf("couldn't find service: %s", req.ID)
}
return fmt.Sprintf("%s:%d", addr, port), nil
return nil
}
// addAppAddressIPv4 adds an IPv4 address to the
// cache for the provided app id.
func (m *resolver) addAppAddressIPv4(appID string, addr string) {
m.ipv4Mu.Lock()
defer m.ipv4Mu.Unlock()
m.logger.Debugf("Adding IPv4 address %s for app id %s cache entry.", addr, appID)
if _, ok := m.appAddressesIPv4[appID]; !ok {
m.appAddressesIPv4[appID] = &addressList{}
}
m.appAddressesIPv4[appID].add(addr)
}
// addAppIPv4Address adds an IPv6 address to the
// cache for the provided app id.
func (m *resolver) addAppAddressIPv6(appID string, addr string) {
m.ipv6Mu.Lock()
defer m.ipv6Mu.Unlock()
m.logger.Debugf("Adding IPv6 address %s for app id %s cache entry.", addr, appID)
if _, ok := m.appAddressesIPv6[appID]; !ok {
m.appAddressesIPv6[appID] = &addressList{}
}
m.appAddressesIPv6[appID].add(addr)
}
// getAppIDsIPv4 returns a list of the current IPv4 app IDs.
// This method uses expire on read to evict expired addreses.
func (m *resolver) getAppIDsIPv4() []string {
m.ipv4Mu.RLock()
defer m.ipv4Mu.RUnlock()
appIDs := make([]string, len(m.appAddressesIPv4))
for appID, addr := range m.appAddressesIPv4 {
old := len(addr.addresses)
addr.expire()
m.logger.Debugf("%d IPv4 address(es) expired for app id %s.", old-len(addr.addresses), appID)
appIDs = append(appIDs, appID)
}
return appIDs
}
// getAppIDsIPv6 returns a list of the known IPv6 app IDs.
// This method uses expire on read to evict expired addreses.
func (m *resolver) getAppIDsIPv6() []string {
m.ipv6Mu.RLock()
defer m.ipv6Mu.RUnlock()
appIDs := make([]string, len(m.appAddressesIPv6))
for appID, addr := range m.appAddressesIPv6 {
old := len(addr.addresses)
addr.expire()
m.logger.Debugf("%d IPv6 address(es) expired for app id %s.", old-len(addr.addresses), appID)
appIDs = append(appIDs, appID)
}
return appIDs
}
// getAppIDs returns a list of app ids currently in
// the cache, ensuring expired addresses are evicted.
func (m *resolver) getAppIDs() []string {
return union(m.getAppIDsIPv4(), m.getAppIDsIPv6())
}
// nextIPv4Address returns the next IPv4 address for
// the provided app id from the cache.
func (m *resolver) nextIPv4Address(appID string) *string {
m.ipv4Mu.RLock()
defer m.ipv4Mu.RUnlock()
addrList, exists := m.appAddressesIPv4[appID]
if exists {
addr := addrList.next()
if addr != nil {
m.logger.Debugf("found mDNS IPv4 address in cache: %s", *addr)
return addr
}
}
return nil
}
// nextIPv6Address returns the next IPv6 address for
// the provided app id from the cache.
func (m *resolver) nextIPv6Address(appID string) *string {
m.ipv6Mu.RLock()
defer m.ipv6Mu.RUnlock()
addrList, exists := m.appAddressesIPv6[appID]
if exists {
addr := addrList.next()
if addr != nil {
m.logger.Debugf("found mDNS IPv6 address in cache: %s", *addr)
return addr
}
}
return nil
}
// union merges the elements from two lists into a set.
func union(first []string, second []string) []string {
keys := make(map[string]struct{})
for _, id := range first {
keys[id] = struct{}{}
}
for _, id := range second {
keys[id] = struct{}{}
}
result := make([]string, 0, len(keys))
for id := range keys {
result = append(result, id)
}
return result
}

View File

@ -6,7 +6,10 @@
package mdns
import (
"fmt"
"math"
"testing"
"time"
nr "github.com/dapr/components-contrib/nameresolution"
"github.com/dapr/dapr/pkg/logger"
@ -77,10 +80,385 @@ func TestResolver(t *testing.T) {
err := resolver.Init(md)
require.NoError(t, err)
request := nr.ResolveRequest{ID: "testAppID", Port: 1234}
request := nr.ResolveRequest{ID: "testAppID"}
pt, err := resolver.ResolveID(request)
// assert
require.NoError(t, err)
assert.Equal(t, "127.0.0.1:1234", pt)
}
func TestResolverMultipleInstances(t *testing.T) {
// arrange...
resolver := NewResolver(logger.NewLogger("test"))
// register instance A
instanceAID := "A"
instanceAName := "testAppID"
instanceAAddress := "127.0.0.1"
instanceAPort := "1234"
instanceAPQDN := fmt.Sprintf("%s:%s", instanceAAddress, instanceAPort)
instanceA := nr.Metadata{Properties: map[string]string{
nr.MDNSInstanceName: instanceAName,
nr.MDNSInstanceAddress: instanceAAddress,
nr.MDNSInstancePort: instanceAPort,
nr.MDNSInstanceID: instanceAID,
}}
err1 := resolver.Init(instanceA)
require.NoError(t, err1)
// register instance B
instanceBID := "B"
instanceBName := "testAppID"
instanceBAddress := "127.0.0.1"
instanceBPort := "5678"
instanceBPQDN := fmt.Sprintf("%s:%s", instanceBAddress, instanceBPort)
instanceB := nr.Metadata{Properties: map[string]string{
nr.MDNSInstanceName: instanceBName,
nr.MDNSInstanceAddress: instanceBAddress,
nr.MDNSInstancePort: instanceBPort,
nr.MDNSInstanceID: instanceBID,
}}
err2 := resolver.Init(instanceB)
require.NoError(t, err2)
// act...
request := nr.ResolveRequest{ID: "testAppID"}
// first resolution will return the first responder's address and trigger a cache refresh.
addr1, err := resolver.ResolveID(request)
require.NoError(t, err)
require.Contains(t, []string{instanceAPQDN, instanceBPQDN}, addr1)
// delay long enough for the background address cache to populate.
time.Sleep(1 * time.Second)
// assert that when we resolve the test app id n times, we see only
// instance A and instance B and we see them each atleast m times.
instanceACount := 0
instanceBCount := 0
for i := 0; i < 100; i++ {
addr, err := resolver.ResolveID(request)
require.NoError(t, err)
require.Contains(t, []string{instanceAPQDN, instanceBPQDN}, addr)
if addr == instanceAPQDN {
instanceACount++
} else if addr == instanceBPQDN {
instanceBCount++
}
}
// 45 allows some variation in distribution.
require.Greater(t, instanceACount, 45)
require.Greater(t, instanceBCount, 45)
}
func TestAddressListExpire(t *testing.T) {
// arrange
base := time.Now()
expired := base.Add(-60 * time.Second)
notExpired := base.Add(60 * time.Second)
addressList := &addressList{
addresses: []*address{
{
ip: "expired0",
expiresAt: expired,
},
{
ip: "expired1",
expiresAt: expired,
},
{
ip: "notExpired0",
expiresAt: notExpired,
},
},
}
// act
addressList.expire()
// assert
require.Len(t, addressList.addresses, 1)
}
func TestAddressListAddNewAddress(t *testing.T) {
// arrange
expiry := time.Now().Add(60 * time.Second)
addressList := &addressList{
addresses: []*address{
{
ip: "addr0",
expiresAt: expiry,
},
{
ip: "addr1",
expiresAt: expiry,
},
},
}
// act
addressList.add("addr2")
// assert
require.Len(t, addressList.addresses, 3)
require.Equal(t, "addr2", addressList.addresses[2].ip)
}
func TestAddressListAddExisitingAddress(t *testing.T) {
// arrange
expiry := time.Now().Add(10 * time.Second)
addressList := &addressList{
addresses: []*address{
{
ip: "addr0",
expiresAt: expiry,
},
{
ip: "addr1",
expiresAt: expiry,
},
},
}
// act
addressList.add("addr1")
deltaSec := int(addressList.addresses[1].expiresAt.Sub(expiry).Seconds())
// assert
require.Len(t, addressList.addresses, 2)
require.Greater(t, deltaSec, 0)
}
func TestAddressListNext(t *testing.T) {
// arrange
expiry := time.Now().Add(10 * time.Second)
addressList := &addressList{
addresses: []*address{
{
ip: "addr0",
expiresAt: expiry,
},
{
ip: "addr1",
expiresAt: expiry,
},
{
ip: "addr2",
expiresAt: expiry,
},
{
ip: "addr3",
expiresAt: expiry,
},
{
ip: "addr4",
expiresAt: expiry,
},
{
ip: "addr5",
expiresAt: expiry,
},
},
}
// act & assert
require.Equal(t, "addr0", *addressList.next())
require.Equal(t, "addr1", *addressList.next())
require.Equal(t, "addr2", *addressList.next())
require.Equal(t, "addr3", *addressList.next())
require.Equal(t, "addr4", *addressList.next())
require.Equal(t, "addr5", *addressList.next())
require.Equal(t, "addr0", *addressList.next())
require.Equal(t, "addr1", *addressList.next())
}
func TestAddressListNextMaxCounter(t *testing.T) {
// arrange
expiry := time.Now().Add(10 * time.Second)
addressList := &addressList{
addresses: []*address{
{
ip: "addr0",
expiresAt: expiry,
},
{
ip: "addr1",
expiresAt: expiry,
},
{
ip: "addr2",
expiresAt: expiry,
},
{
ip: "addr3",
expiresAt: expiry,
},
{
ip: "addr4",
expiresAt: expiry,
},
{
ip: "addr5",
expiresAt: expiry,
},
},
}
// act & assert
require.Equal(t, "addr0", *addressList.next())
require.Equal(t, "addr1", *addressList.next())
require.Equal(t, "addr2", *addressList.next())
require.Equal(t, "addr3", *addressList.next())
addressList.counter = math.MaxUint32
require.Equal(t, "addr0", *addressList.next())
require.Equal(t, "addr1", *addressList.next())
require.Equal(t, "addr2", *addressList.next())
}
func TestAddressListNextNoAddress(t *testing.T) {
// arrange
addressList := &addressList{
addresses: []*address{},
}
// act & assert
require.Nil(t, addressList.next())
}
func TestAddressListNextWithAdd(t *testing.T) {
// arrange
expiry := time.Now().Add(10 * time.Second)
addressList := &addressList{
addresses: []*address{
{
ip: "addr0",
expiresAt: expiry,
},
{
ip: "addr1",
expiresAt: expiry,
},
{
ip: "addr2",
expiresAt: expiry,
},
{
ip: "addr3",
expiresAt: expiry,
},
{
ip: "addr4",
expiresAt: expiry,
},
{
ip: "addr5",
expiresAt: expiry,
},
},
}
// act & assert
require.Equal(t, "addr0", *addressList.next())
require.Equal(t, "addr1", *addressList.next())
require.Equal(t, "addr2", *addressList.next())
require.Equal(t, "addr3", *addressList.next())
addressList.add("addr6")
require.Equal(t, "addr4", *addressList.next())
require.Equal(t, "addr5", *addressList.next())
require.Equal(t, "addr6", *addressList.next())
require.Equal(t, "addr0", *addressList.next())
require.Equal(t, "addr1", *addressList.next())
}
func TestAddressListNextWithExpiration(t *testing.T) {
// arrange
expiry := time.Now().Add(10 * time.Second)
expired := time.Now().Add(-60 * time.Second)
addressList := &addressList{
addresses: []*address{
{
ip: "addr0",
expiresAt: expired,
},
{
ip: "addr1",
expiresAt: expiry,
},
{
ip: "addr2",
expiresAt: expired,
},
{
ip: "addr3",
expiresAt: expiry,
},
{
ip: "addr4",
expiresAt: expired,
},
{
ip: "addr5",
expiresAt: expiry,
},
},
}
// act & assert
require.Equal(t, "addr0", *addressList.next())
require.Equal(t, "addr1", *addressList.next())
require.Equal(t, "addr2", *addressList.next())
require.Equal(t, "addr3", *addressList.next())
addressList.expire()
require.Equal(t, "addr3", *addressList.next())
require.Equal(t, "addr5", *addressList.next())
require.Equal(t, "addr1", *addressList.next())
require.Equal(t, "addr3", *addressList.next())
}
func TestUnion(t *testing.T) {
// arrange
testCases := []struct {
first []string
second []string
expected []string
}{
{
first: []string{"a", "b", "c"},
second: []string{"a", "c", "d", "e"},
expected: []string{"a", "b", "c", "d", "e"},
},
{
first: []string{"a", "b", "c"},
second: []string{"d", "e", "f", "g"},
expected: []string{"a", "b", "c", "d", "e", "f", "g"},
},
{
first: []string{"a", "b"},
second: []string{"a", "b"},
expected: []string{"a", "b"},
},
{
first: []string{"a", "b"},
second: []string{"b", "a"},
expected: []string{"a", "b"},
},
}
for _, tt := range testCases {
// act
union := union(tt.first, tt.second)
// assert
var matches int
for _, i1 := range tt.expected {
for _, i2 := range union {
if i1 == i2 {
matches++
}
}
}
require.Equal(t, len(tt.expected), matches)
}
}

View File

@ -12,6 +12,8 @@ const (
MDNSInstanceAddress string = "address"
// MDNSInstancePort is the port of instance.
MDNSInstancePort string = "port"
// MDNSInstanceID is an optional unique instance ID.
MDNSInstanceID string = "instance"
)
// Metadata contains a name resolution specific set of metadata properties