From b4aecdfef7fc267a7b905ab44b5d4815a2babe40 Mon Sep 17 00:00:00 2001 From: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> Date: Fri, 24 Feb 2023 20:26:28 +0000 Subject: [PATCH] Resolve long-standing TODO in mDNS component Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> --- nameresolution/mdns/mdns.go | 34 ++++----- nameresolution/mdns/mdns_test.go | 120 ++++++++++++------------------- nameresolution/metadata.go | 2 + 3 files changed, 62 insertions(+), 94 deletions(-) diff --git a/nameresolution/mdns/mdns.go b/nameresolution/mdns/mdns.go index 9990652eb..198530ffc 100644 --- a/nameresolution/mdns/mdns.go +++ b/nameresolution/mdns/mdns.go @@ -238,9 +238,10 @@ func (m *Resolver) startRefreshers() { // refresh app addresses periodically and on demand go func() { - defer func() { - m.refreshRunning.Store(false) - }() + defer m.refreshRunning.Store(false) + + t := time.NewTicker(refreshInterval) + defer t.Stop() for { select { @@ -252,7 +253,7 @@ func (m *Resolver) startRefreshers() { } }() // Refresh periodically - case <-time.After(refreshInterval): + case <-t.C: go func() { if err := m.refreshAllApps(m.runCtx); err != nil { m.logger.Warnf(err.Error()) @@ -269,37 +270,28 @@ func (m *Resolver) startRefreshers() { // Init registers service for mDNS. func (m *Resolver) Init(metadata nameresolution.Metadata) error { - var ( - appID string - hostAddress string - ok bool - instanceID string - ) - props := metadata.Properties - if appID, ok = props[nameresolution.MDNSInstanceName]; !ok { + appID := props[nameresolution.AppID] + if appID == "" { return errors.New("name is missing") } - if hostAddress, ok = props[nameresolution.MDNSInstanceAddress]; !ok { + + hostAddress := props[nameresolution.HostAddress] + if hostAddress == "" { return errors.New("address is missing") } - p, ok := props[nameresolution.MDNSInstancePort] - if !ok { + if props[nameresolution.DaprPort] == "" { return errors.New("port is missing") } - port, err := strconv.ParseInt(p, 10, 32) + port, err := strconv.Atoi(props[nameresolution.DaprPort]) if err != nil { return errors.New("port is invalid") } - if instanceID, ok = props[nameresolution.MDNSInstanceID]; !ok { - instanceID = "" - } - - err = m.registerMDNS(instanceID, appID, []string{hostAddress}, int(port)) + err = m.registerMDNS("", appID, []string{hostAddress}, port) if err != nil { return err } diff --git a/nameresolution/mdns/mdns_test.go b/nameresolution/mdns/mdns_test.go index ff5bf4d93..d060e9eae 100644 --- a/nameresolution/mdns/mdns_test.go +++ b/nameresolution/mdns/mdns_test.go @@ -42,30 +42,30 @@ func TestInitMetadata(t *testing.T) { { "name", map[string]string{ - nr.MDNSInstanceAddress: localhost, - nr.MDNSInstancePort: "30003", + nr.HostAddress: localhost, + nr.DaprPort: "30003", }, }, { "address", map[string]string{ - nr.MDNSInstanceName: "testAppID", - nr.MDNSInstancePort: "30003", + nr.AppID: "testAppID", + nr.DaprPort: "30003", }, }, { "port", map[string]string{ - nr.MDNSInstanceName: "testAppID", - nr.MDNSInstanceAddress: localhost, + nr.AppID: "testAppID", + nr.HostAddress: localhost, }, }, { "port", map[string]string{ - nr.MDNSInstanceName: "testAppID", - nr.MDNSInstanceAddress: localhost, - nr.MDNSInstancePort: "abcd", + nr.AppID: "testAppID", + nr.HostAddress: localhost, + nr.DaprPort: "abcd", }, }, } @@ -90,9 +90,9 @@ func TestInitRegister(t *testing.T) { resolver := NewResolver(logger.NewLogger("test")).(*Resolver) defer resolver.Close() md := nr.Metadata{Base: metadata.Base{Properties: map[string]string{ - nr.MDNSInstanceName: "testAppID", - nr.MDNSInstanceAddress: localhost, - nr.MDNSInstancePort: "1234", + nr.AppID: "testAppID", + nr.HostAddress: localhost, + nr.DaprPort: "1234", }}} // act @@ -105,14 +105,14 @@ func TestInitRegisterDuplicate(t *testing.T) { resolver := NewResolver(logger.NewLogger("test")).(*Resolver) defer resolver.Close() md := nr.Metadata{Base: metadata.Base{Properties: map[string]string{ - nr.MDNSInstanceName: "testAppID", - nr.MDNSInstanceAddress: localhost, - nr.MDNSInstancePort: "1234", + nr.AppID: "testAppID", + nr.HostAddress: localhost, + nr.DaprPort: "1234", }}} md2 := nr.Metadata{Base: metadata.Base{Properties: map[string]string{ - nr.MDNSInstanceName: "testAppID", - nr.MDNSInstanceAddress: localhost, - nr.MDNSInstancePort: "1234", + nr.AppID: "testAppID", + nr.HostAddress: localhost, + nr.DaprPort: "1234", }}} // act @@ -128,9 +128,9 @@ func TestResolver(t *testing.T) { resolver := NewResolver(logger.NewLogger("test")).(*Resolver) defer resolver.Close() md := nr.Metadata{Base: metadata.Base{Properties: map[string]string{ - nr.MDNSInstanceName: "testAppID", - nr.MDNSInstanceAddress: localhost, - nr.MDNSInstancePort: "1234", + nr.AppID: "testAppID", + nr.HostAddress: localhost, + nr.DaprPort: "1234", }}} // act @@ -149,9 +149,9 @@ func TestResolverClose(t *testing.T) { // arrange resolver := NewResolver(logger.NewLogger("test")).(*Resolver) md := nr.Metadata{Base: metadata.Base{Properties: map[string]string{ - nr.MDNSInstanceName: "testAppID", - nr.MDNSInstanceAddress: localhost, - nr.MDNSInstancePort: "1234", + nr.AppID: "testAppID", + nr.HostAddress: localhost, + nr.DaprPort: "1234", }}} // act @@ -181,34 +181,24 @@ func TestResolverMultipleInstances(t *testing.T) { instanceAID := "A" instanceAName := "testAppID" instanceAAddress := localhost - instanceAPort := "1234" - instanceAPQDN := fmt.Sprintf("%s:%s", instanceAAddress, instanceAPort) + instanceAPort := 1234 + instanceAPQDN := fmt.Sprintf("%s:%d", instanceAAddress, instanceAPort) - instanceA := nr.Metadata{Base: metadata.Base{Properties: map[string]string{ - nr.MDNSInstanceName: instanceAName, - nr.MDNSInstanceAddress: instanceAAddress, - nr.MDNSInstancePort: instanceAPort, - nr.MDNSInstanceID: instanceAID, - }}} - err1 := resolver.Init(instanceA) + err1 := resolver.registerMDNS(instanceAID, instanceAName, []string{instanceAAddress}, instanceAPort) require.NoError(t, err1) // register instance B instanceBID := "B" instanceBName := "testAppID" instanceBAddress := localhost - instanceBPort := "5678" - instanceBPQDN := fmt.Sprintf("%s:%s", instanceBAddress, instanceBPort) + instanceBPort := 5678 + instanceBPQDN := fmt.Sprintf("%s:%d", instanceBAddress, instanceBPort) - instanceB := nr.Metadata{Base: metadata.Base{Properties: map[string]string{ - nr.MDNSInstanceName: instanceBName, - nr.MDNSInstanceAddress: instanceBAddress, - nr.MDNSInstancePort: instanceBPort, - nr.MDNSInstanceID: instanceBID, - }}} - err2 := resolver.Init(instanceB) + err2 := resolver.registerMDNS(instanceBID, instanceBName, []string{instanceBAddress}, instanceBPort) require.NoError(t, err2) + go resolver.startRefreshers() + // act... request := nr.ResolveRequest{ID: "testAppID"} @@ -292,9 +282,9 @@ func ResolverConcurrencySubsriberClear(t *testing.T) { resolver := NewResolver(logger.NewLogger("test")).(*Resolver) defer resolver.Close() md := nr.Metadata{Base: metadata.Base{Properties: map[string]string{ - nr.MDNSInstanceName: "testAppID", - nr.MDNSInstanceAddress: localhost, - nr.MDNSInstancePort: "1234", + nr.AppID: "testAppID", + nr.HostAddress: localhost, + nr.DaprPort: "1234", }}} // act @@ -334,50 +324,34 @@ func ResolverConcurrencyFound(t *testing.T) { appAID := "A" appAName := "testAppA" appAAddress := localhost - appAPort := "1234" - appAPQDN := fmt.Sprintf("%s:%s", appAAddress, appAPort) + appAPort := 1234 + appABPQDN := fmt.Sprintf("%s:%d", appAAddress, appAPort) - appA := nr.Metadata{Base: metadata.Base{Properties: map[string]string{ - nr.MDNSInstanceName: appAName, - nr.MDNSInstanceAddress: appAAddress, - nr.MDNSInstancePort: appAPort, - nr.MDNSInstanceID: appAID, - }}} - err1 := resolver.Init(appA) + err1 := resolver.registerMDNS(appAID, appAName, []string{appAAddress}, appAPort) require.NoError(t, err1) // register instance B appBID := "B" appBName := "testAppB" appBAddress := localhost - appBPort := "5678" - appBBPQDN := fmt.Sprintf("%s:%s", appBAddress, appBPort) + appBPort := 5678 + appBBPQDN := fmt.Sprintf("%s:%d", appBAddress, appBPort) - appB := nr.Metadata{Base: metadata.Base{Properties: map[string]string{ - nr.MDNSInstanceName: appBName, - nr.MDNSInstanceAddress: appBAddress, - nr.MDNSInstancePort: appBPort, - nr.MDNSInstanceID: appBID, - }}} - err2 := resolver.Init(appB) + err2 := resolver.registerMDNS(appBID, appBName, []string{appBAddress}, appBPort) require.NoError(t, err2) // register instance C appCID := "C" appCName := "testAppC" appCAddress := localhost - appCPort := "3456" - appCBPQDN := fmt.Sprintf("%s:%s", appCAddress, appCPort) + appCPort := 3456 + appCBPQDN := fmt.Sprintf("%s:%d", appCAddress, appCPort) - appC := nr.Metadata{Base: metadata.Base{Properties: map[string]string{ - nr.MDNSInstanceName: appCName, - nr.MDNSInstanceAddress: appCAddress, - nr.MDNSInstancePort: appCPort, - nr.MDNSInstanceID: appCID, - }}} - err3 := resolver.Init(appC) + err3 := resolver.registerMDNS(appCID, appCName, []string{appCAddress}, appCPort) require.NoError(t, err3) + go resolver.startRefreshers() + // act... wg := sync.WaitGroup{} for i := 0; i < numConcurrency; i++ { @@ -402,7 +376,7 @@ func ResolverConcurrencyFound(t *testing.T) { // assert require.NoError(t, err) if r == 0 { - assert.Equal(t, appAPQDN, pt) + assert.Equal(t, appABPQDN, pt) } else if r == 1 { assert.Equal(t, appBBPQDN, pt) } else if r == 2 { diff --git a/nameresolution/metadata.go b/nameresolution/metadata.go index b2c69c588..ac0ac293a 100644 --- a/nameresolution/metadata.go +++ b/nameresolution/metadata.go @@ -16,6 +16,8 @@ package nameresolution import "github.com/dapr/components-contrib/metadata" const ( + // TODO: REMOVE THESE AFTER RUNTIME IS CHANGED + // MDNSInstanceName is the instance name which is broadcasted. MDNSInstanceName string = "name" // MDNSInstanceAddress is the address of the instance.