Merge pull request #2596 from ItalyPaleAle/mdns-todos

Resolve long-standing TODO in mDNS component
This commit is contained in:
Bernd Verst 2023-02-24 14:18:37 -08:00 committed by GitHub
commit 21f904f4ca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 62 additions and 94 deletions

View File

@ -238,9 +238,10 @@ func (m *Resolver) startRefreshers() {
// refresh app addresses periodically and on demand // refresh app addresses periodically and on demand
go func() { go func() {
defer func() { defer m.refreshRunning.Store(false)
m.refreshRunning.Store(false)
}() t := time.NewTicker(refreshInterval)
defer t.Stop()
for { for {
select { select {
@ -252,7 +253,7 @@ func (m *Resolver) startRefreshers() {
} }
}() }()
// Refresh periodically // Refresh periodically
case <-time.After(refreshInterval): case <-t.C:
go func() { go func() {
if err := m.refreshAllApps(m.runCtx); err != nil { if err := m.refreshAllApps(m.runCtx); err != nil {
m.logger.Warnf(err.Error()) m.logger.Warnf(err.Error())
@ -269,37 +270,28 @@ func (m *Resolver) startRefreshers() {
// Init registers service for mDNS. // Init registers service for mDNS.
func (m *Resolver) Init(metadata nameresolution.Metadata) error { func (m *Resolver) Init(metadata nameresolution.Metadata) error {
var (
appID string
hostAddress string
ok bool
instanceID string
)
props := metadata.Properties props := metadata.Properties
if appID, ok = props[nameresolution.MDNSInstanceName]; !ok { appID := props[nameresolution.AppID]
if appID == "" {
return errors.New("name is missing") return errors.New("name is missing")
} }
if hostAddress, ok = props[nameresolution.MDNSInstanceAddress]; !ok {
hostAddress := props[nameresolution.HostAddress]
if hostAddress == "" {
return errors.New("address is missing") return errors.New("address is missing")
} }
p, ok := props[nameresolution.MDNSInstancePort] if props[nameresolution.DaprPort] == "" {
if !ok {
return errors.New("port is missing") return errors.New("port is missing")
} }
port, err := strconv.ParseInt(p, 10, 32) port, err := strconv.Atoi(props[nameresolution.DaprPort])
if err != nil { if err != nil {
return errors.New("port is invalid") return errors.New("port is invalid")
} }
if instanceID, ok = props[nameresolution.MDNSInstanceID]; !ok { err = m.registerMDNS("", appID, []string{hostAddress}, port)
instanceID = ""
}
err = m.registerMDNS(instanceID, appID, []string{hostAddress}, int(port))
if err != nil { if err != nil {
return err return err
} }

View File

@ -42,30 +42,30 @@ func TestInitMetadata(t *testing.T) {
{ {
"name", "name",
map[string]string{ map[string]string{
nr.MDNSInstanceAddress: localhost, nr.HostAddress: localhost,
nr.MDNSInstancePort: "30003", nr.DaprPort: "30003",
}, },
}, },
{ {
"address", "address",
map[string]string{ map[string]string{
nr.MDNSInstanceName: "testAppID", nr.AppID: "testAppID",
nr.MDNSInstancePort: "30003", nr.DaprPort: "30003",
}, },
}, },
{ {
"port", "port",
map[string]string{ map[string]string{
nr.MDNSInstanceName: "testAppID", nr.AppID: "testAppID",
nr.MDNSInstanceAddress: localhost, nr.HostAddress: localhost,
}, },
}, },
{ {
"port", "port",
map[string]string{ map[string]string{
nr.MDNSInstanceName: "testAppID", nr.AppID: "testAppID",
nr.MDNSInstanceAddress: localhost, nr.HostAddress: localhost,
nr.MDNSInstancePort: "abcd", nr.DaprPort: "abcd",
}, },
}, },
} }
@ -90,9 +90,9 @@ func TestInitRegister(t *testing.T) {
resolver := NewResolver(logger.NewLogger("test")).(*Resolver) resolver := NewResolver(logger.NewLogger("test")).(*Resolver)
defer resolver.Close() defer resolver.Close()
md := nr.Metadata{Base: metadata.Base{Properties: map[string]string{ md := nr.Metadata{Base: metadata.Base{Properties: map[string]string{
nr.MDNSInstanceName: "testAppID", nr.AppID: "testAppID",
nr.MDNSInstanceAddress: localhost, nr.HostAddress: localhost,
nr.MDNSInstancePort: "1234", nr.DaprPort: "1234",
}}} }}}
// act // act
@ -105,14 +105,14 @@ func TestInitRegisterDuplicate(t *testing.T) {
resolver := NewResolver(logger.NewLogger("test")).(*Resolver) resolver := NewResolver(logger.NewLogger("test")).(*Resolver)
defer resolver.Close() defer resolver.Close()
md := nr.Metadata{Base: metadata.Base{Properties: map[string]string{ md := nr.Metadata{Base: metadata.Base{Properties: map[string]string{
nr.MDNSInstanceName: "testAppID", nr.AppID: "testAppID",
nr.MDNSInstanceAddress: localhost, nr.HostAddress: localhost,
nr.MDNSInstancePort: "1234", nr.DaprPort: "1234",
}}} }}}
md2 := nr.Metadata{Base: metadata.Base{Properties: map[string]string{ md2 := nr.Metadata{Base: metadata.Base{Properties: map[string]string{
nr.MDNSInstanceName: "testAppID", nr.AppID: "testAppID",
nr.MDNSInstanceAddress: localhost, nr.HostAddress: localhost,
nr.MDNSInstancePort: "1234", nr.DaprPort: "1234",
}}} }}}
// act // act
@ -128,9 +128,9 @@ func TestResolver(t *testing.T) {
resolver := NewResolver(logger.NewLogger("test")).(*Resolver) resolver := NewResolver(logger.NewLogger("test")).(*Resolver)
defer resolver.Close() defer resolver.Close()
md := nr.Metadata{Base: metadata.Base{Properties: map[string]string{ md := nr.Metadata{Base: metadata.Base{Properties: map[string]string{
nr.MDNSInstanceName: "testAppID", nr.AppID: "testAppID",
nr.MDNSInstanceAddress: localhost, nr.HostAddress: localhost,
nr.MDNSInstancePort: "1234", nr.DaprPort: "1234",
}}} }}}
// act // act
@ -149,9 +149,9 @@ func TestResolverClose(t *testing.T) {
// arrange // arrange
resolver := NewResolver(logger.NewLogger("test")).(*Resolver) resolver := NewResolver(logger.NewLogger("test")).(*Resolver)
md := nr.Metadata{Base: metadata.Base{Properties: map[string]string{ md := nr.Metadata{Base: metadata.Base{Properties: map[string]string{
nr.MDNSInstanceName: "testAppID", nr.AppID: "testAppID",
nr.MDNSInstanceAddress: localhost, nr.HostAddress: localhost,
nr.MDNSInstancePort: "1234", nr.DaprPort: "1234",
}}} }}}
// act // act
@ -181,34 +181,24 @@ func TestResolverMultipleInstances(t *testing.T) {
instanceAID := "A" instanceAID := "A"
instanceAName := "testAppID" instanceAName := "testAppID"
instanceAAddress := localhost instanceAAddress := localhost
instanceAPort := "1234" instanceAPort := 1234
instanceAPQDN := fmt.Sprintf("%s:%s", instanceAAddress, instanceAPort) instanceAPQDN := fmt.Sprintf("%s:%d", instanceAAddress, instanceAPort)
instanceA := nr.Metadata{Base: metadata.Base{Properties: map[string]string{ err1 := resolver.registerMDNS(instanceAID, instanceAName, []string{instanceAAddress}, instanceAPort)
nr.MDNSInstanceName: instanceAName,
nr.MDNSInstanceAddress: instanceAAddress,
nr.MDNSInstancePort: instanceAPort,
nr.MDNSInstanceID: instanceAID,
}}}
err1 := resolver.Init(instanceA)
require.NoError(t, err1) require.NoError(t, err1)
// register instance B // register instance B
instanceBID := "B" instanceBID := "B"
instanceBName := "testAppID" instanceBName := "testAppID"
instanceBAddress := localhost instanceBAddress := localhost
instanceBPort := "5678" instanceBPort := 5678
instanceBPQDN := fmt.Sprintf("%s:%s", instanceBAddress, instanceBPort) instanceBPQDN := fmt.Sprintf("%s:%d", instanceBAddress, instanceBPort)
instanceB := nr.Metadata{Base: metadata.Base{Properties: map[string]string{ err2 := resolver.registerMDNS(instanceBID, instanceBName, []string{instanceBAddress}, instanceBPort)
nr.MDNSInstanceName: instanceBName,
nr.MDNSInstanceAddress: instanceBAddress,
nr.MDNSInstancePort: instanceBPort,
nr.MDNSInstanceID: instanceBID,
}}}
err2 := resolver.Init(instanceB)
require.NoError(t, err2) require.NoError(t, err2)
go resolver.startRefreshers()
// act... // act...
request := nr.ResolveRequest{ID: "testAppID"} request := nr.ResolveRequest{ID: "testAppID"}
@ -292,9 +282,9 @@ func ResolverConcurrencySubsriberClear(t *testing.T) {
resolver := NewResolver(logger.NewLogger("test")).(*Resolver) resolver := NewResolver(logger.NewLogger("test")).(*Resolver)
defer resolver.Close() defer resolver.Close()
md := nr.Metadata{Base: metadata.Base{Properties: map[string]string{ md := nr.Metadata{Base: metadata.Base{Properties: map[string]string{
nr.MDNSInstanceName: "testAppID", nr.AppID: "testAppID",
nr.MDNSInstanceAddress: localhost, nr.HostAddress: localhost,
nr.MDNSInstancePort: "1234", nr.DaprPort: "1234",
}}} }}}
// act // act
@ -334,50 +324,34 @@ func ResolverConcurrencyFound(t *testing.T) {
appAID := "A" appAID := "A"
appAName := "testAppA" appAName := "testAppA"
appAAddress := localhost appAAddress := localhost
appAPort := "1234" appAPort := 1234
appAPQDN := fmt.Sprintf("%s:%s", appAAddress, appAPort) appABPQDN := fmt.Sprintf("%s:%d", appAAddress, appAPort)
appA := nr.Metadata{Base: metadata.Base{Properties: map[string]string{ err1 := resolver.registerMDNS(appAID, appAName, []string{appAAddress}, appAPort)
nr.MDNSInstanceName: appAName,
nr.MDNSInstanceAddress: appAAddress,
nr.MDNSInstancePort: appAPort,
nr.MDNSInstanceID: appAID,
}}}
err1 := resolver.Init(appA)
require.NoError(t, err1) require.NoError(t, err1)
// register instance B // register instance B
appBID := "B" appBID := "B"
appBName := "testAppB" appBName := "testAppB"
appBAddress := localhost appBAddress := localhost
appBPort := "5678" appBPort := 5678
appBBPQDN := fmt.Sprintf("%s:%s", appBAddress, appBPort) appBBPQDN := fmt.Sprintf("%s:%d", appBAddress, appBPort)
appB := nr.Metadata{Base: metadata.Base{Properties: map[string]string{ err2 := resolver.registerMDNS(appBID, appBName, []string{appBAddress}, appBPort)
nr.MDNSInstanceName: appBName,
nr.MDNSInstanceAddress: appBAddress,
nr.MDNSInstancePort: appBPort,
nr.MDNSInstanceID: appBID,
}}}
err2 := resolver.Init(appB)
require.NoError(t, err2) require.NoError(t, err2)
// register instance C // register instance C
appCID := "C" appCID := "C"
appCName := "testAppC" appCName := "testAppC"
appCAddress := localhost appCAddress := localhost
appCPort := "3456" appCPort := 3456
appCBPQDN := fmt.Sprintf("%s:%s", appCAddress, appCPort) appCBPQDN := fmt.Sprintf("%s:%d", appCAddress, appCPort)
appC := nr.Metadata{Base: metadata.Base{Properties: map[string]string{ err3 := resolver.registerMDNS(appCID, appCName, []string{appCAddress}, appCPort)
nr.MDNSInstanceName: appCName,
nr.MDNSInstanceAddress: appCAddress,
nr.MDNSInstancePort: appCPort,
nr.MDNSInstanceID: appCID,
}}}
err3 := resolver.Init(appC)
require.NoError(t, err3) require.NoError(t, err3)
go resolver.startRefreshers()
// act... // act...
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
for i := 0; i < numConcurrency; i++ { for i := 0; i < numConcurrency; i++ {
@ -402,7 +376,7 @@ func ResolverConcurrencyFound(t *testing.T) {
// assert // assert
require.NoError(t, err) require.NoError(t, err)
if r == 0 { if r == 0 {
assert.Equal(t, appAPQDN, pt) assert.Equal(t, appABPQDN, pt)
} else if r == 1 { } else if r == 1 {
assert.Equal(t, appBBPQDN, pt) assert.Equal(t, appBBPQDN, pt)
} else if r == 2 { } else if r == 2 {

View File

@ -16,6 +16,8 @@ package nameresolution
import "github.com/dapr/components-contrib/metadata" import "github.com/dapr/components-contrib/metadata"
const ( const (
// TODO: REMOVE THESE AFTER RUNTIME IS CHANGED
// MDNSInstanceName is the instance name which is broadcasted. // MDNSInstanceName is the instance name which is broadcasted.
MDNSInstanceName string = "name" MDNSInstanceName string = "name"
// MDNSInstanceAddress is the address of the instance. // MDNSInstanceAddress is the address of the instance.