From 5e44a2a4e0ba14c5a1c46e4a8aa522be444050b7 Mon Sep 17 00:00:00 2001 From: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> Date: Thu, 23 Feb 2023 19:45:10 +0000 Subject: [PATCH 1/9] Misc fixes to consul nameresolver Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> --- nameresolution/consul/consul.go | 59 +++++++++++++-------------- nameresolution/consul/consul_test.go | 61 ++++++++++++++++++++++++++++ 2 files changed, 89 insertions(+), 31 deletions(-) diff --git a/nameresolution/consul/consul.go b/nameresolution/consul/consul.go index 3165d934a..a05a1e191 100644 --- a/nameresolution/consul/consul.go +++ b/nameresolution/consul/consul.go @@ -60,6 +60,8 @@ type clientInterface interface { type agentInterface interface { Self() (map[string]map[string]interface{}, error) ServiceRegister(service *consul.AgentServiceRegistration) error + ServiceDeregister(serviceID string) error + ServiceDeregisterOpts(serviceID string, q *consul.QueryOptions) error } type healthInterface interface { @@ -93,9 +95,7 @@ func newResolver(logger logger.Logger, resolverConfig resolverConfig, client cli } // Init will configure component. It will also register service or validate client connection based on config. -func (r *resolver) Init(metadata nr.Metadata) error { - var err error - +func (r *resolver) Init(metadata nr.Metadata) (err error) { r.config, err = getConfig(metadata) if err != nil { return err @@ -120,7 +120,7 @@ func (r *resolver) Init(metadata nr.Metadata) error { } // ResolveID resolves name to address via consul. -func (r *resolver) ResolveID(req nr.ResolveRequest) (string, error) { +func (r *resolver) ResolveID(req nr.ResolveRequest) (addr string, err error) { cfg := r.config services, _, err := r.client.Health().Service(req.ID, "", true, cfg.QueryOptions) if err != nil { @@ -142,35 +142,28 @@ func (r *resolver) ResolveID(req nr.ResolveRequest) (string, error) { return services } + // Pick a random service svc := shuffle(services)[0] - addr := "" + port := svc.Service.Meta[cfg.DaprPortMetaKey] + if port == "" { + return "", fmt.Errorf("target service AppID '%s' found but DAPR_PORT missing from meta", req.ID) + } - if port, ok := svc.Service.Meta[cfg.DaprPortMetaKey]; ok { - if svc.Service.Address != "" { - addr = fmt.Sprintf("%s:%s", svc.Service.Address, port) - } else if svc.Node.Address != "" { - addr = fmt.Sprintf("%s:%s", svc.Node.Address, port) - } else { - return "", fmt.Errorf("no healthy services found with AppID:%s", req.ID) - } + if svc.Service.Address != "" { + addr = svc.Service.Address + ":" + port + } else if svc.Node.Address != "" { + addr = svc.Node.Address + ":" + port } else { - return "", fmt.Errorf("target service AppID:%s found but DAPR_PORT missing from meta", req.ID) + return "", fmt.Errorf("no healthy services found with AppID '%s'", req.ID) } return addr, nil } // getConfig configuration from metadata, defaults are best suited for self-hosted mode. -func getConfig(metadata nr.Metadata) (resolverConfig, error) { - var daprPort string - var ok bool - var err error - resolverCfg := resolverConfig{} - - props := metadata.Properties - - if daprPort, ok = props[nr.DaprPort]; !ok { +func getConfig(metadata nr.Metadata) (resolverCfg resolverConfig, err error) { + if metadata.Properties[nr.DaprPort] == "" { return resolverCfg, fmt.Errorf("metadata property missing: %s", nr.DaprPort) } @@ -187,7 +180,8 @@ func getConfig(metadata nr.Metadata) (resolverConfig, error) { } resolverCfg.Client = getClientConfig(cfg) - if resolverCfg.Registration, err = getRegistrationConfig(cfg, props); err != nil { + resolverCfg.Registration, err = getRegistrationConfig(cfg, metadata.Properties) + if err != nil { return resolverCfg, err } resolverCfg.QueryOptions = getQueryOptionsConfig(cfg) @@ -198,7 +192,7 @@ func getConfig(metadata nr.Metadata) (resolverConfig, error) { resolverCfg.Registration.Meta = map[string]string{} } - resolverCfg.Registration.Meta[resolverCfg.DaprPortMetaKey] = daprPort + resolverCfg.Registration.Meta[resolverCfg.DaprPortMetaKey] = metadata.Properties[nr.DaprPort] } return resolverCfg, nil @@ -217,15 +211,18 @@ func getRegistrationConfig(cfg configSpec, props map[string]string) (*consul.Age // if advanced registration configured ignore other registration related configs if cfg.AdvancedRegistration != nil { return cfg.AdvancedRegistration, nil - } else if !cfg.SelfRegister { + } + if !cfg.SelfRegister { return nil, nil } - var appID string - var appPort string - var host string - var httpPort string - var ok bool + var ( + appID string + appPort string + host string + httpPort string + ok bool + ) if appID, ok = props[nr.AppID]; !ok { return nil, fmt.Errorf("metadata property missing: %s", nr.AppID) diff --git a/nameresolution/consul/consul_test.go b/nameresolution/consul/consul_test.go index 8b6bbc49c..8e25f9b0c 100644 --- a/nameresolution/consul/consul_test.go +++ b/nameresolution/consul/consul_test.go @@ -82,6 +82,14 @@ func (m *mockAgent) ServiceRegister(service *consul.AgentServiceRegistration) er return m.serviceRegisterErr } +func (m *mockAgent) ServiceDeregister(serviceID string) error { + return nil +} + +func (m *mockAgent) ServiceDeregisterOpts(serviceID string, q *consul.QueryOptions) error { + return nil +} + func TestInit(t *testing.T) { t.Parallel() @@ -223,6 +231,59 @@ func TestResolveID(t *testing.T) { assert.Equal(t, "123.234.345.456:50005", addr) }, }, + { + "should get random address from service", + nr.ResolveRequest{ + ID: "test-app", + }, + func(t *testing.T, req nr.ResolveRequest) { + t.Helper() + mock := mockClient{ + mockHealth: mockHealth{ + serviceResult: []*consul.ServiceEntry{ + { + Service: &consul.AgentService{ + Address: "123.234.345.456", + Port: 8600, + Meta: map[string]string{ + "DAPR_PORT": "50005", + }, + }, + }, + { + Service: &consul.AgentService{ + Address: "234.345.456.678", + Port: 8600, + Meta: map[string]string{ + "DAPR_PORT": "50005", + }, + }, + }, + }, + }, + } + resolver := newResolver(logger.NewLogger("test"), *testConfig, &mock) + + total1 := 0 + total2 := 0 + for i := 0; i < 100; i++ { + addr, _ := resolver.ResolveID(req) + + if addr == "123.234.345.456:50005" { + total1++ + } else if addr == "234.345.456.678:50005" { + total2++ + } else { + t.Fatalf("Received unexpected address: %s", addr) + } + } + + // Because of the random nature of the address being returned, we just check to make sure we get at least 20 of each (and a total of 100) + assert.Equal(t, 100, total1+total2) + assert.Greater(t, total1, 20) + assert.Greater(t, total2, 20) + }, + }, { "should get address from node if not on service", nr.ResolveRequest{ From 83717272a26d0b828cf30e24a554af76e9bdb04b Mon Sep 17 00:00:00 2001 From: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> Date: Thu, 23 Feb 2023 19:52:40 +0000 Subject: [PATCH 2/9] Significant improvements to shuffling of results Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> --- nameresolution/consul/consul.go | 18 +++--------------- 1 file changed, 3 insertions(+), 15 deletions(-) diff --git a/nameresolution/consul/consul.go b/nameresolution/consul/consul.go index a05a1e191..c1b274c2b 100644 --- a/nameresolution/consul/consul.go +++ b/nameresolution/consul/consul.go @@ -14,9 +14,8 @@ limitations under the License. package consul import ( - "crypto/rand" "fmt" - "math/big" + "math/rand" "net" "strconv" @@ -131,19 +130,8 @@ func (r *resolver) ResolveID(req nr.ResolveRequest) (addr string, err error) { return "", fmt.Errorf("no healthy services found with AppID:%s", req.ID) } - shuffle := func(services []*consul.ServiceEntry) []*consul.ServiceEntry { - for i := len(services) - 1; i > 0; i-- { - rndbig, _ := rand.Int(rand.Reader, big.NewInt(int64(i+1))) - j := rndbig.Int64() - - services[i], services[j] = services[j], services[i] - } - - return services - } - - // Pick a random service - svc := shuffle(services)[0] + // Pick a random service from the result + svc := services[rand.Int()%len(services)] port := svc.Service.Meta[cfg.DaprPortMetaKey] if port == "" { From d0a1efaca39cbcf515413c85ccfde578d33d7be7 Mon Sep 17 00:00:00 2001 From: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> Date: Thu, 23 Feb 2023 21:54:16 +0000 Subject: [PATCH 3/9] De-register service before attempting to register again Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> --- nameresolution/consul/consul.go | 29 ++++++++++++++++++++++++----- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/nameresolution/consul/consul.go b/nameresolution/consul/consul.go index c1b274c2b..6ca84608d 100644 --- a/nameresolution/consul/consul.go +++ b/nameresolution/consul/consul.go @@ -100,19 +100,36 @@ func (r *resolver) Init(metadata nr.Metadata) (err error) { return err } - if err = r.client.InitClient(r.config.Client); err != nil { + err = r.client.InitClient(r.config.Client) + if err != nil { return fmt.Errorf("failed to init consul client: %w", err) } - // register service to consul + // Register service to consul if r.config.Registration != nil { - if err := r.client.Agent().ServiceRegister(r.config.Registration); err != nil { + agent := r.client.Agent() + + // First, try to de-register the service if it already exists, to remove any previous value that may be there + err = agent.ServiceDeregisterOpts(r.config.Registration.ID, &consul.QueryOptions{ + Namespace: r.config.Registration.Namespace, + Partition: r.config.Registration.Partition, + RequireConsistent: true, + }) + if err != nil { + return fmt.Errorf("failed to de-register previous consul service: %w", err) + } + + err = agent.ServiceRegister(r.config.Registration) + if err != nil { return fmt.Errorf("failed to register consul service: %w", err) } r.logger.Infof("service:%s registered on consul agent", r.config.Registration.Name) - } else if _, err := r.client.Agent().Self(); err != nil { - return fmt.Errorf("failed check on consul agent: %w", err) + } else { + _, err = r.client.Agent().Self() + if err != nil { + return fmt.Errorf("failed check on consul agent: %w", err) + } } return nil @@ -131,6 +148,8 @@ func (r *resolver) ResolveID(req nr.ResolveRequest) (addr string, err error) { } // Pick a random service from the result + // Note: we're using math/random here as PRNG and that's ok since we're just using this for selecting a random address from a list for load-balancing, so we don't need a CSPRNG + //nolint:gosec svc := services[rand.Int()%len(services)] port := svc.Service.Meta[cfg.DaprPortMetaKey] From 184aa665438eeebc594eb426b5fc7f29e5a14b2d Mon Sep 17 00:00:00 2001 From: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> Date: Fri, 24 Feb 2023 00:16:40 +0000 Subject: [PATCH 4/9] Fixes Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> --- nameresolution/consul/consul.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/nameresolution/consul/consul.go b/nameresolution/consul/consul.go index 6ca84608d..015b8ccc6 100644 --- a/nameresolution/consul/consul.go +++ b/nameresolution/consul/consul.go @@ -14,9 +14,11 @@ limitations under the License. package consul import ( + "errors" "fmt" "math/rand" "net" + "net/http" "strconv" consul "github.com/hashicorp/consul/api" @@ -82,13 +84,12 @@ type resolverConfig struct { // NewResolver creates Consul name resolver. func NewResolver(logger logger.Logger) nr.Resolver { - return newResolver(logger, resolverConfig{}, &client{}) + return newResolver(logger, &client{}) } -func newResolver(logger logger.Logger, resolverConfig resolverConfig, client clientInterface) nr.Resolver { +func newResolver(logger logger.Logger, client clientInterface) nr.Resolver { return &resolver{ logger: logger, - config: resolverConfig, client: client, } } @@ -116,7 +117,11 @@ func (r *resolver) Init(metadata nr.Metadata) (err error) { RequireConsistent: true, }) if err != nil { - return fmt.Errorf("failed to de-register previous consul service: %w", err) + // If the error is 404, that means there was no service registered, so we can ignore that + statusErr := consul.StatusError{} + if !errors.As(err, &statusErr) || statusErr.Code != http.StatusNotFound { + return fmt.Errorf("failed to de-register previous consul service: %w", err) + } } err = agent.ServiceRegister(r.config.Registration) @@ -144,7 +149,7 @@ func (r *resolver) ResolveID(req nr.ResolveRequest) (addr string, err error) { } if len(services) == 0 { - return "", fmt.Errorf("no healthy services found with AppID:%s", req.ID) + return "", fmt.Errorf("no healthy services found with AppID '%s'", req.ID) } // Pick a random service from the result From 2cc740cfafa1d5a1127083b6d5266a4d8bb78ac7 Mon Sep 17 00:00:00 2001 From: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> Date: Fri, 24 Feb 2023 00:32:26 +0000 Subject: [PATCH 5/9] Set DeregisterCriticalServiceAfter Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> --- nameresolution/consul/configuration.go | 9 +++++++-- nameresolution/consul/consul.go | 5 +++-- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/nameresolution/consul/configuration.go b/nameresolution/consul/configuration.go index c697fd83a..58e14de74 100644 --- a/nameresolution/consul/configuration.go +++ b/nameresolution/consul/configuration.go @@ -141,6 +141,11 @@ func mapCheck(config *AgentServiceCheck) *consul.AgentServiceCheck { return nil } + // After 1 min of downtime, deregister the service automatically + deregisterCriticalServiceAfter := "1m" + if config.DeregisterCriticalServiceAfter != nil { + deregisterCriticalServiceAfter = *config.DeregisterCriticalServiceAfter + } return &consul.AgentServiceCheck{ CheckID: config.CheckID, Name: config.Name, @@ -161,7 +166,7 @@ func mapCheck(config *AgentServiceCheck) *consul.AgentServiceCheck { GRPCUseTLS: config.GRPCUseTLS, AliasNode: config.AliasNode, AliasService: config.AliasService, - DeregisterCriticalServiceAfter: config.DeregisterCriticalServiceAfter, + DeregisterCriticalServiceAfter: deregisterCriticalServiceAfter, } } @@ -369,7 +374,7 @@ type AgentServiceCheck struct { GRPC string AliasNode string AliasService string - DeregisterCriticalServiceAfter string + DeregisterCriticalServiceAfter *string Header map[string][]string TLSSkipVerify bool GRPCUseTLS bool diff --git a/nameresolution/consul/consul.go b/nameresolution/consul/consul.go index 015b8ccc6..324fed889 100644 --- a/nameresolution/consul/consul.go +++ b/nameresolution/consul/consul.go @@ -17,7 +17,6 @@ import ( "errors" "fmt" "math/rand" - "net" "net/http" "strconv" @@ -262,7 +261,9 @@ func getRegistrationConfig(cfg configSpec, props map[string]string) (*consul.Age Name: "Dapr Health Status", CheckID: fmt.Sprintf("daprHealth:%s", id), Interval: "15s", - HTTP: fmt.Sprintf("http://%s/v1.0/healthz", net.JoinHostPort(host, httpPort)), + HTTP: fmt.Sprintf("http://%s:%s/v1.0/healthz", host, httpPort), + // After 1 min of downtime, deregister the service automatically + DeregisterCriticalServiceAfter: "1m", }, } } From 002d77c8ac68556e9411fdb17ff5838bc0398a01 Mon Sep 17 00:00:00 2001 From: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> Date: Fri, 24 Feb 2023 01:07:17 +0000 Subject: [PATCH 6/9] Reverted changes and fixed tests Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> --- nameresolution/consul/configuration.go | 9 ++----- nameresolution/consul/consul.go | 22 +---------------- nameresolution/consul/consul_test.go | 34 ++++++++++++-------------- 3 files changed, 19 insertions(+), 46 deletions(-) diff --git a/nameresolution/consul/configuration.go b/nameresolution/consul/configuration.go index 58e14de74..c697fd83a 100644 --- a/nameresolution/consul/configuration.go +++ b/nameresolution/consul/configuration.go @@ -141,11 +141,6 @@ func mapCheck(config *AgentServiceCheck) *consul.AgentServiceCheck { return nil } - // After 1 min of downtime, deregister the service automatically - deregisterCriticalServiceAfter := "1m" - if config.DeregisterCriticalServiceAfter != nil { - deregisterCriticalServiceAfter = *config.DeregisterCriticalServiceAfter - } return &consul.AgentServiceCheck{ CheckID: config.CheckID, Name: config.Name, @@ -166,7 +161,7 @@ func mapCheck(config *AgentServiceCheck) *consul.AgentServiceCheck { GRPCUseTLS: config.GRPCUseTLS, AliasNode: config.AliasNode, AliasService: config.AliasService, - DeregisterCriticalServiceAfter: deregisterCriticalServiceAfter, + DeregisterCriticalServiceAfter: config.DeregisterCriticalServiceAfter, } } @@ -374,7 +369,7 @@ type AgentServiceCheck struct { GRPC string AliasNode string AliasService string - DeregisterCriticalServiceAfter *string + DeregisterCriticalServiceAfter string Header map[string][]string TLSSkipVerify bool GRPCUseTLS bool diff --git a/nameresolution/consul/consul.go b/nameresolution/consul/consul.go index 324fed889..e4e69ddfb 100644 --- a/nameresolution/consul/consul.go +++ b/nameresolution/consul/consul.go @@ -14,10 +14,8 @@ limitations under the License. package consul import ( - "errors" "fmt" "math/rand" - "net/http" "strconv" consul "github.com/hashicorp/consul/api" @@ -60,8 +58,6 @@ type clientInterface interface { type agentInterface interface { Self() (map[string]map[string]interface{}, error) ServiceRegister(service *consul.AgentServiceRegistration) error - ServiceDeregister(serviceID string) error - ServiceDeregisterOpts(serviceID string, q *consul.QueryOptions) error } type healthInterface interface { @@ -86,7 +82,7 @@ func NewResolver(logger logger.Logger) nr.Resolver { return newResolver(logger, &client{}) } -func newResolver(logger logger.Logger, client clientInterface) nr.Resolver { +func newResolver(logger logger.Logger, client clientInterface) *resolver { return &resolver{ logger: logger, client: client, @@ -109,20 +105,6 @@ func (r *resolver) Init(metadata nr.Metadata) (err error) { if r.config.Registration != nil { agent := r.client.Agent() - // First, try to de-register the service if it already exists, to remove any previous value that may be there - err = agent.ServiceDeregisterOpts(r.config.Registration.ID, &consul.QueryOptions{ - Namespace: r.config.Registration.Namespace, - Partition: r.config.Registration.Partition, - RequireConsistent: true, - }) - if err != nil { - // If the error is 404, that means there was no service registered, so we can ignore that - statusErr := consul.StatusError{} - if !errors.As(err, &statusErr) || statusErr.Code != http.StatusNotFound { - return fmt.Errorf("failed to de-register previous consul service: %w", err) - } - } - err = agent.ServiceRegister(r.config.Registration) if err != nil { return fmt.Errorf("failed to register consul service: %w", err) @@ -262,8 +244,6 @@ func getRegistrationConfig(cfg configSpec, props map[string]string) (*consul.Age CheckID: fmt.Sprintf("daprHealth:%s", id), Interval: "15s", HTTP: fmt.Sprintf("http://%s:%s/v1.0/healthz", host, httpPort), - // After 1 min of downtime, deregister the service automatically - DeregisterCriticalServiceAfter: "1m", }, } } diff --git a/nameresolution/consul/consul_test.go b/nameresolution/consul/consul_test.go index 8e25f9b0c..4cafca700 100644 --- a/nameresolution/consul/consul_test.go +++ b/nameresolution/consul/consul_test.go @@ -82,14 +82,6 @@ func (m *mockAgent) ServiceRegister(service *consul.AgentServiceRegistration) er return m.serviceRegisterErr } -func (m *mockAgent) ServiceDeregister(serviceID string) error { - return nil -} - -func (m *mockAgent) ServiceDeregisterOpts(serviceID string, q *consul.QueryOptions) error { - return nil -} - func TestInit(t *testing.T) { t.Parallel() @@ -107,7 +99,7 @@ func TestInit(t *testing.T) { t.Helper() var mock mockClient - resolver := newResolver(logger.NewLogger("test"), resolverConfig{}, &mock) + resolver := newResolver(logger.NewLogger("test"), &mock) _ = resolver.Init(metadata) @@ -130,7 +122,7 @@ func TestInit(t *testing.T) { t.Helper() var mock mockClient - resolver := newResolver(logger.NewLogger("test"), resolverConfig{}, &mock) + resolver := newResolver(logger.NewLogger("test"), &mock) _ = resolver.Init(metadata) @@ -152,7 +144,7 @@ func TestInit(t *testing.T) { t.Helper() var mock mockClient - resolver := newResolver(logger.NewLogger("test"), resolverConfig{}, &mock) + resolver := newResolver(logger.NewLogger("test"), &mock) _ = resolver.Init(metadata) @@ -174,7 +166,7 @@ func TestInit(t *testing.T) { func TestResolveID(t *testing.T) { t.Parallel() - testConfig := &resolverConfig{ + testConfig := resolverConfig{ DaprPortMetaKey: "DAPR_PORT", } @@ -195,7 +187,8 @@ func TestResolveID(t *testing.T) { serviceResult: []*consul.ServiceEntry{}, }, } - resolver := newResolver(logger.NewLogger("test"), *testConfig, &mock) + resolver := newResolver(logger.NewLogger("test"), &mock) + resolver.config = testConfig _, err := resolver.ResolveID(req) assert.Equal(t, 1, mock.mockHealth.serviceCalled) @@ -224,7 +217,8 @@ func TestResolveID(t *testing.T) { }, }, } - resolver := newResolver(logger.NewLogger("test"), *testConfig, &mock) + resolver := newResolver(logger.NewLogger("test"), &mock) + resolver.config = testConfig addr, _ := resolver.ResolveID(req) @@ -262,7 +256,8 @@ func TestResolveID(t *testing.T) { }, }, } - resolver := newResolver(logger.NewLogger("test"), *testConfig, &mock) + resolver := newResolver(logger.NewLogger("test"), &mock) + resolver.config = testConfig total1 := 0 total2 := 0 @@ -321,7 +316,8 @@ func TestResolveID(t *testing.T) { }, }, } - resolver := newResolver(logger.NewLogger("test"), *testConfig, &mock) + resolver := newResolver(logger.NewLogger("test"), &mock) + resolver.config = testConfig addr, _ := resolver.ResolveID(req) @@ -350,7 +346,8 @@ func TestResolveID(t *testing.T) { }, }, } - resolver := newResolver(logger.NewLogger("test"), *testConfig, &mock) + resolver := newResolver(logger.NewLogger("test"), &mock) + resolver.config = testConfig _, err := resolver.ResolveID(req) @@ -376,7 +373,8 @@ func TestResolveID(t *testing.T) { }, }, } - resolver := newResolver(logger.NewLogger("test"), *testConfig, &mock) + resolver := newResolver(logger.NewLogger("test"), &mock) + resolver.config = testConfig _, err := resolver.ResolveID(req) From 87a27a437e636b970ed4701d41c6a589982b75e0 Mon Sep 17 00:00:00 2001 From: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> Date: Fri, 24 Feb 2023 19:13:58 +0000 Subject: [PATCH 7/9] =?UTF-8?q?=F0=9F=92=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> --- nameresolution/consul/consul.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/nameresolution/consul/consul.go b/nameresolution/consul/consul.go index e4e69ddfb..c5f9ffa0d 100644 --- a/nameresolution/consul/consul.go +++ b/nameresolution/consul/consul.go @@ -16,6 +16,7 @@ package consul import ( "fmt" "math/rand" + "net" "strconv" consul "github.com/hashicorp/consul/api" @@ -243,7 +244,7 @@ func getRegistrationConfig(cfg configSpec, props map[string]string) (*consul.Age Name: "Dapr Health Status", CheckID: fmt.Sprintf("daprHealth:%s", id), Interval: "15s", - HTTP: fmt.Sprintf("http://%s:%s/v1.0/healthz", host, httpPort), + HTTP: fmt.Sprintf("http://%s/v1.0/healthz", net.JoinHostPort(host, httpPort)), }, } } From b4aecdfef7fc267a7b905ab44b5d4815a2babe40 Mon Sep 17 00:00:00 2001 From: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> Date: Fri, 24 Feb 2023 20:26:28 +0000 Subject: [PATCH 8/9] Resolve long-standing TODO in mDNS component Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> --- nameresolution/mdns/mdns.go | 34 ++++----- nameresolution/mdns/mdns_test.go | 120 ++++++++++++------------------- nameresolution/metadata.go | 2 + 3 files changed, 62 insertions(+), 94 deletions(-) diff --git a/nameresolution/mdns/mdns.go b/nameresolution/mdns/mdns.go index 9990652eb..198530ffc 100644 --- a/nameresolution/mdns/mdns.go +++ b/nameresolution/mdns/mdns.go @@ -238,9 +238,10 @@ func (m *Resolver) startRefreshers() { // refresh app addresses periodically and on demand go func() { - defer func() { - m.refreshRunning.Store(false) - }() + defer m.refreshRunning.Store(false) + + t := time.NewTicker(refreshInterval) + defer t.Stop() for { select { @@ -252,7 +253,7 @@ func (m *Resolver) startRefreshers() { } }() // Refresh periodically - case <-time.After(refreshInterval): + case <-t.C: go func() { if err := m.refreshAllApps(m.runCtx); err != nil { m.logger.Warnf(err.Error()) @@ -269,37 +270,28 @@ func (m *Resolver) startRefreshers() { // Init registers service for mDNS. func (m *Resolver) Init(metadata nameresolution.Metadata) error { - var ( - appID string - hostAddress string - ok bool - instanceID string - ) - props := metadata.Properties - if appID, ok = props[nameresolution.MDNSInstanceName]; !ok { + appID := props[nameresolution.AppID] + if appID == "" { return errors.New("name is missing") } - if hostAddress, ok = props[nameresolution.MDNSInstanceAddress]; !ok { + + hostAddress := props[nameresolution.HostAddress] + if hostAddress == "" { return errors.New("address is missing") } - p, ok := props[nameresolution.MDNSInstancePort] - if !ok { + if props[nameresolution.DaprPort] == "" { return errors.New("port is missing") } - port, err := strconv.ParseInt(p, 10, 32) + port, err := strconv.Atoi(props[nameresolution.DaprPort]) if err != nil { return errors.New("port is invalid") } - if instanceID, ok = props[nameresolution.MDNSInstanceID]; !ok { - instanceID = "" - } - - err = m.registerMDNS(instanceID, appID, []string{hostAddress}, int(port)) + err = m.registerMDNS("", appID, []string{hostAddress}, port) if err != nil { return err } diff --git a/nameresolution/mdns/mdns_test.go b/nameresolution/mdns/mdns_test.go index ff5bf4d93..d060e9eae 100644 --- a/nameresolution/mdns/mdns_test.go +++ b/nameresolution/mdns/mdns_test.go @@ -42,30 +42,30 @@ func TestInitMetadata(t *testing.T) { { "name", map[string]string{ - nr.MDNSInstanceAddress: localhost, - nr.MDNSInstancePort: "30003", + nr.HostAddress: localhost, + nr.DaprPort: "30003", }, }, { "address", map[string]string{ - nr.MDNSInstanceName: "testAppID", - nr.MDNSInstancePort: "30003", + nr.AppID: "testAppID", + nr.DaprPort: "30003", }, }, { "port", map[string]string{ - nr.MDNSInstanceName: "testAppID", - nr.MDNSInstanceAddress: localhost, + nr.AppID: "testAppID", + nr.HostAddress: localhost, }, }, { "port", map[string]string{ - nr.MDNSInstanceName: "testAppID", - nr.MDNSInstanceAddress: localhost, - nr.MDNSInstancePort: "abcd", + nr.AppID: "testAppID", + nr.HostAddress: localhost, + nr.DaprPort: "abcd", }, }, } @@ -90,9 +90,9 @@ func TestInitRegister(t *testing.T) { resolver := NewResolver(logger.NewLogger("test")).(*Resolver) defer resolver.Close() md := nr.Metadata{Base: metadata.Base{Properties: map[string]string{ - nr.MDNSInstanceName: "testAppID", - nr.MDNSInstanceAddress: localhost, - nr.MDNSInstancePort: "1234", + nr.AppID: "testAppID", + nr.HostAddress: localhost, + nr.DaprPort: "1234", }}} // act @@ -105,14 +105,14 @@ func TestInitRegisterDuplicate(t *testing.T) { resolver := NewResolver(logger.NewLogger("test")).(*Resolver) defer resolver.Close() md := nr.Metadata{Base: metadata.Base{Properties: map[string]string{ - nr.MDNSInstanceName: "testAppID", - nr.MDNSInstanceAddress: localhost, - nr.MDNSInstancePort: "1234", + nr.AppID: "testAppID", + nr.HostAddress: localhost, + nr.DaprPort: "1234", }}} md2 := nr.Metadata{Base: metadata.Base{Properties: map[string]string{ - nr.MDNSInstanceName: "testAppID", - nr.MDNSInstanceAddress: localhost, - nr.MDNSInstancePort: "1234", + nr.AppID: "testAppID", + nr.HostAddress: localhost, + nr.DaprPort: "1234", }}} // act @@ -128,9 +128,9 @@ func TestResolver(t *testing.T) { resolver := NewResolver(logger.NewLogger("test")).(*Resolver) defer resolver.Close() md := nr.Metadata{Base: metadata.Base{Properties: map[string]string{ - nr.MDNSInstanceName: "testAppID", - nr.MDNSInstanceAddress: localhost, - nr.MDNSInstancePort: "1234", + nr.AppID: "testAppID", + nr.HostAddress: localhost, + nr.DaprPort: "1234", }}} // act @@ -149,9 +149,9 @@ func TestResolverClose(t *testing.T) { // arrange resolver := NewResolver(logger.NewLogger("test")).(*Resolver) md := nr.Metadata{Base: metadata.Base{Properties: map[string]string{ - nr.MDNSInstanceName: "testAppID", - nr.MDNSInstanceAddress: localhost, - nr.MDNSInstancePort: "1234", + nr.AppID: "testAppID", + nr.HostAddress: localhost, + nr.DaprPort: "1234", }}} // act @@ -181,34 +181,24 @@ func TestResolverMultipleInstances(t *testing.T) { instanceAID := "A" instanceAName := "testAppID" instanceAAddress := localhost - instanceAPort := "1234" - instanceAPQDN := fmt.Sprintf("%s:%s", instanceAAddress, instanceAPort) + instanceAPort := 1234 + instanceAPQDN := fmt.Sprintf("%s:%d", instanceAAddress, instanceAPort) - instanceA := nr.Metadata{Base: metadata.Base{Properties: map[string]string{ - nr.MDNSInstanceName: instanceAName, - nr.MDNSInstanceAddress: instanceAAddress, - nr.MDNSInstancePort: instanceAPort, - nr.MDNSInstanceID: instanceAID, - }}} - err1 := resolver.Init(instanceA) + err1 := resolver.registerMDNS(instanceAID, instanceAName, []string{instanceAAddress}, instanceAPort) require.NoError(t, err1) // register instance B instanceBID := "B" instanceBName := "testAppID" instanceBAddress := localhost - instanceBPort := "5678" - instanceBPQDN := fmt.Sprintf("%s:%s", instanceBAddress, instanceBPort) + instanceBPort := 5678 + instanceBPQDN := fmt.Sprintf("%s:%d", instanceBAddress, instanceBPort) - instanceB := nr.Metadata{Base: metadata.Base{Properties: map[string]string{ - nr.MDNSInstanceName: instanceBName, - nr.MDNSInstanceAddress: instanceBAddress, - nr.MDNSInstancePort: instanceBPort, - nr.MDNSInstanceID: instanceBID, - }}} - err2 := resolver.Init(instanceB) + err2 := resolver.registerMDNS(instanceBID, instanceBName, []string{instanceBAddress}, instanceBPort) require.NoError(t, err2) + go resolver.startRefreshers() + // act... request := nr.ResolveRequest{ID: "testAppID"} @@ -292,9 +282,9 @@ func ResolverConcurrencySubsriberClear(t *testing.T) { resolver := NewResolver(logger.NewLogger("test")).(*Resolver) defer resolver.Close() md := nr.Metadata{Base: metadata.Base{Properties: map[string]string{ - nr.MDNSInstanceName: "testAppID", - nr.MDNSInstanceAddress: localhost, - nr.MDNSInstancePort: "1234", + nr.AppID: "testAppID", + nr.HostAddress: localhost, + nr.DaprPort: "1234", }}} // act @@ -334,50 +324,34 @@ func ResolverConcurrencyFound(t *testing.T) { appAID := "A" appAName := "testAppA" appAAddress := localhost - appAPort := "1234" - appAPQDN := fmt.Sprintf("%s:%s", appAAddress, appAPort) + appAPort := 1234 + appABPQDN := fmt.Sprintf("%s:%d", appAAddress, appAPort) - appA := nr.Metadata{Base: metadata.Base{Properties: map[string]string{ - nr.MDNSInstanceName: appAName, - nr.MDNSInstanceAddress: appAAddress, - nr.MDNSInstancePort: appAPort, - nr.MDNSInstanceID: appAID, - }}} - err1 := resolver.Init(appA) + err1 := resolver.registerMDNS(appAID, appAName, []string{appAAddress}, appAPort) require.NoError(t, err1) // register instance B appBID := "B" appBName := "testAppB" appBAddress := localhost - appBPort := "5678" - appBBPQDN := fmt.Sprintf("%s:%s", appBAddress, appBPort) + appBPort := 5678 + appBBPQDN := fmt.Sprintf("%s:%d", appBAddress, appBPort) - appB := nr.Metadata{Base: metadata.Base{Properties: map[string]string{ - nr.MDNSInstanceName: appBName, - nr.MDNSInstanceAddress: appBAddress, - nr.MDNSInstancePort: appBPort, - nr.MDNSInstanceID: appBID, - }}} - err2 := resolver.Init(appB) + err2 := resolver.registerMDNS(appBID, appBName, []string{appBAddress}, appBPort) require.NoError(t, err2) // register instance C appCID := "C" appCName := "testAppC" appCAddress := localhost - appCPort := "3456" - appCBPQDN := fmt.Sprintf("%s:%s", appCAddress, appCPort) + appCPort := 3456 + appCBPQDN := fmt.Sprintf("%s:%d", appCAddress, appCPort) - appC := nr.Metadata{Base: metadata.Base{Properties: map[string]string{ - nr.MDNSInstanceName: appCName, - nr.MDNSInstanceAddress: appCAddress, - nr.MDNSInstancePort: appCPort, - nr.MDNSInstanceID: appCID, - }}} - err3 := resolver.Init(appC) + err3 := resolver.registerMDNS(appCID, appCName, []string{appCAddress}, appCPort) require.NoError(t, err3) + go resolver.startRefreshers() + // act... wg := sync.WaitGroup{} for i := 0; i < numConcurrency; i++ { @@ -402,7 +376,7 @@ func ResolverConcurrencyFound(t *testing.T) { // assert require.NoError(t, err) if r == 0 { - assert.Equal(t, appAPQDN, pt) + assert.Equal(t, appABPQDN, pt) } else if r == 1 { assert.Equal(t, appBBPQDN, pt) } else if r == 2 { diff --git a/nameresolution/metadata.go b/nameresolution/metadata.go index b2c69c588..ac0ac293a 100644 --- a/nameresolution/metadata.go +++ b/nameresolution/metadata.go @@ -16,6 +16,8 @@ package nameresolution import "github.com/dapr/components-contrib/metadata" const ( + // TODO: REMOVE THESE AFTER RUNTIME IS CHANGED + // MDNSInstanceName is the instance name which is broadcasted. MDNSInstanceName string = "name" // MDNSInstanceAddress is the address of the instance. From 2a5abb9a4f6bfb53d02cc82fafff92f3cd704d88 Mon Sep 17 00:00:00 2001 From: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> Date: Fri, 24 Feb 2023 22:11:11 +0000 Subject: [PATCH 9/9] Remove deprecated components: pubsub.hazelcast, bindings.twitter Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> --- .../docker-compose-hazelcast.yml | 6 - .github/scripts/test-info.mjs | 4 - bindings/twitter/twitter.go | 229 ------------------ bindings/twitter/twitter_test.go | 149 ------------ go.mod | 3 - go.sum | 6 - pubsub/hazelcast/hazelcast.go | 183 -------------- pubsub/hazelcast/metadata.go | 19 -- pubsub/hazelcast/metadata_test.go | 39 --- tests/config/pubsub/hazelcast/pubsub.yml | 12 - tests/config/pubsub/tests.yml | 2 - tests/conformance/common.go | 3 - 12 files changed, 655 deletions(-) delete mode 100644 .github/infrastructure/docker-compose-hazelcast.yml delete mode 100644 bindings/twitter/twitter.go delete mode 100644 bindings/twitter/twitter_test.go delete mode 100644 pubsub/hazelcast/hazelcast.go delete mode 100644 pubsub/hazelcast/metadata.go delete mode 100644 pubsub/hazelcast/metadata_test.go delete mode 100644 tests/config/pubsub/hazelcast/pubsub.yml diff --git a/.github/infrastructure/docker-compose-hazelcast.yml b/.github/infrastructure/docker-compose-hazelcast.yml deleted file mode 100644 index 3d4dceab3..000000000 --- a/.github/infrastructure/docker-compose-hazelcast.yml +++ /dev/null @@ -1,6 +0,0 @@ -version: '2' -services: - hazelcast: - image: hazelcast/hazelcast:3.12.12-1 - ports: - - 5701:5701 \ No newline at end of file diff --git a/.github/scripts/test-info.mjs b/.github/scripts/test-info.mjs index c5607b242..053df402f 100644 --- a/.github/scripts/test-info.mjs +++ b/.github/scripts/test-info.mjs @@ -208,10 +208,6 @@ const components = { 'AzureCertificationServicePrincipalClientSecret', ], }, - 'pubsub.hazelcast': { - conformance: true, - conformanceSetup: 'docker-compose.sh hazelcast', - }, 'pubsub.in-memory': { conformance: true, }, diff --git a/bindings/twitter/twitter.go b/bindings/twitter/twitter.go deleted file mode 100644 index 78d8f2a56..000000000 --- a/bindings/twitter/twitter.go +++ /dev/null @@ -1,229 +0,0 @@ -/* -Copyright 2021 The Dapr Authors -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package twitter - -//nolint:staticcheck -import ( - "context" - "encoding/json" - "errors" - "fmt" - "strconv" - "sync" - "sync/atomic" - "time" - - "github.com/dghubble/go-twitter/twitter" - "github.com/dghubble/oauth1" - - "github.com/dapr/components-contrib/bindings" - "github.com/dapr/kit/logger" -) - -// Binding represents Twitter input/output binding. -type Binding struct { - client *twitter.Client - query string - logger logger.Logger - closed atomic.Bool - closeCh chan struct{} - wg sync.WaitGroup -} - -// NewTwitter returns a new Twitter event input binding. -func NewTwitter(logger logger.Logger) bindings.InputOutputBinding { - return &Binding{logger: logger, closeCh: make(chan struct{})} -} - -// Init initializes the Twitter binding. -func (t *Binding) Init(ctx context.Context, metadata bindings.Metadata) error { - t.logger.Warnf("DEPRECATION NOTICE: Component bindings.twitter has been deprecated and will be removed in a future Dapr release.") - ck, f := metadata.Properties["consumerKey"] - if !f || ck == "" { - return fmt.Errorf("consumerKey not set") - } - cs, f := metadata.Properties["consumerSecret"] - if !f || cs == "" { - return fmt.Errorf("consumerSecret not set") - } - at, f := metadata.Properties["accessToken"] - if !f || at == "" { - return fmt.Errorf("accessToken not set") - } - as, f := metadata.Properties["accessSecret"] - if !f || as == "" { - return fmt.Errorf("accessSecret not set") - } - - // set query only in an input binding case - q, f := metadata.Properties["query"] - if f { - t.query = q - } - - config := oauth1.NewConfig(ck, cs) - token := oauth1.NewToken(at, as) - - httpClient := config.Client(oauth1.NoContext, token) - - t.client = twitter.NewClient(httpClient) - - return nil -} - -// Operations returns list of operations supported by twitter binding. -func (t *Binding) Operations() []bindings.OperationKind { - return []bindings.OperationKind{bindings.GetOperation} -} - -// Read triggers the Twitter search and events on each result tweet. -func (t *Binding) Read(ctx context.Context, handler bindings.Handler) error { - if t.query == "" { - return errors.New("metadata property 'query' is empty") - } - - demux := twitter.NewSwitchDemux() - demux.Tweet = func(tweet *twitter.Tweet) { - t.logger.Debugf("raw tweet: %+v", tweet) - data, marshalErr := json.Marshal(tweet) - if marshalErr != nil { - t.logger.Errorf("error marshaling tweet: %+v", tweet) - - return - } - handler(ctx, &bindings.ReadResponse{ - Data: data, - Metadata: map[string]string{ - "query": t.query, - }, - }) - } - - demux.StreamLimit = func(limit *twitter.StreamLimit) { - t.logger.Warnf("disconnect: %+v", limit) - } - - demux.StreamDisconnect = func(disconnect *twitter.StreamDisconnect) { - t.logger.Errorf("stream disconnect: %+v", disconnect) - } - - filterParams := &twitter.StreamFilterParams{ - Track: []string{t.query}, - StallWarnings: twitter.Bool(true), - } - - t.logger.Debug("starting stream for query: %s", t.query) - stream, err := t.client.Streams.Filter(filterParams) - if err != nil { - return fmt.Errorf("error executing stream filter '%+v': %w", filterParams, err) - } - - t.logger.Debug("starting handler...") - t.wg.Add(2) - go func() { - defer t.wg.Done() - demux.HandleChan(stream.Messages) - }() - go func() { - defer t.wg.Done() - select { - case <-t.closeCh: - case <-ctx.Done(): - } - t.logger.Debug("stopping handler...") - stream.Stop() - }() - - return nil -} - -func (t *Binding) Close() error { - if t.closed.CompareAndSwap(false, true) { - close(t.closeCh) - } - t.wg.Wait() - return nil -} - -// Invoke handles all operations. -func (t *Binding) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { - t.logger.Debugf("operation: %v", req.Operation) - if req.Metadata == nil { - return nil, fmt.Errorf("metadata not set") - } - // required - q, f := req.Metadata["query"] - if !f || q == "" { - return nil, fmt.Errorf("query not set") - } - - // optionals - l, f := req.Metadata["lang"] - if !f || l == "" { - l = "en" - } - - r, f := req.Metadata["result"] - if !f || r == "" { - // mixed : Include both popular and real time results in the response - // recent : return only the most recent results in the response - // popular : return only the most popular results in the response - r = "recent" - } - - var sinceID int64 - s, f := req.Metadata["since_id"] - if f && s != "" { - i, err := strconv.ParseInt(s, 10, 64) - if err == nil { - sinceID = i - } - } - - sq := &twitter.SearchTweetParams{ - Count: 100, // max - Lang: l, - SinceID: sinceID, - Query: q, - ResultType: r, - IncludeEntities: twitter.Bool(true), - } - - t.logger.Debug("starting stream for: %+v", sq) - search, _, err := t.client.Search.Tweets(sq) - if err != nil { - return nil, fmt.Errorf("error executing search filter '%+v': %w", sq, err) - } - if search == nil || search.Statuses == nil { - return nil, fmt.Errorf("nil search result from '%+v'", sq) - } - - t.logger.Debugf("raw response: %+v", search.Statuses) - data, marshalErr := json.Marshal(search.Statuses) - if marshalErr != nil { - t.logger.Errorf("error marshaling tweet: %v", marshalErr) - return nil, fmt.Errorf("error parsing response from '%+v': %w", sq, marshalErr) - } - - req.Metadata["max_tweet_id"] = search.Metadata.MaxIDStr - req.Metadata["tweet_count"] = strconv.Itoa(search.Metadata.Count) - req.Metadata["search_ts"] = time.Now().UTC().String() - - ir := &bindings.InvokeResponse{ - Data: data, - Metadata: req.Metadata, - } - - return ir, nil -} diff --git a/bindings/twitter/twitter_test.go b/bindings/twitter/twitter_test.go deleted file mode 100644 index ba38dcca2..000000000 --- a/bindings/twitter/twitter_test.go +++ /dev/null @@ -1,149 +0,0 @@ -/* -Copyright 2021 The Dapr Authors -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package twitter - -//nolint:staticcheck -import ( - "context" - "encoding/json" - "os" - "testing" - "time" - - "github.com/dghubble/go-twitter/twitter" - "github.com/stretchr/testify/assert" - - "github.com/dapr/components-contrib/bindings" - "github.com/dapr/kit/logger" -) - -const ( - testTwitterConsumerKey = "test-consumerKey" - testTwitterConsumerSecret = "test-consumerSecret" - testTwitterAccessToken = "test-accessToken" - testTwitterAccessSecret = "test-accessSecret" -) - -func getTestMetadata() bindings.Metadata { - m := bindings.Metadata{} - m.Properties = map[string]string{ - "consumerKey": testTwitterConsumerKey, - "consumerSecret": testTwitterConsumerSecret, - "accessToken": testTwitterAccessToken, - "accessSecret": testTwitterAccessSecret, - } - - return m -} - -func getRuntimeMetadata() map[string]string { - return map[string]string{ - "consumerKey": os.Getenv("CONSUMER_KEY"), - "consumerSecret": os.Getenv("CONSUMER_SECRET"), - "accessToken": os.Getenv("ACCESS_TOKEN"), - "accessSecret": os.Getenv("ACCESS_SECRET"), - } -} - -// go test -v -count=1 ./bindings/twitter/. -func TestInit(t *testing.T) { - m := getTestMetadata() - tw := NewTwitter(logger.NewLogger("test")).(*Binding) - err := tw.Init(context.Background(), m) - assert.Nilf(t, err, "error initializing valid metadata properties") -} - -// TestReadError excutes the Read method and fails before the Twitter API call -// go test -v -count=1 -run TestReadError ./bindings/twitter/. -func TestReadError(t *testing.T) { - tw := NewTwitter(logger.NewLogger("test")).(*Binding) - m := getTestMetadata() - err := tw.Init(context.Background(), m) - assert.Nilf(t, err, "error initializing valid metadata properties") - - err = tw.Read(context.Background(), func(ctx context.Context, res *bindings.ReadResponse) ([]byte, error) { - t.Logf("result: %+v", res) - assert.NotNilf(t, err, "no error on read with invalid credentials") - - return nil, nil - }) - assert.Error(t, err) - - assert.NoError(t, tw.Close()) -} - -// TestRead executes the Read method which calls Twiter API -// env RUN_LIVE_TW_TEST=true go test -v -count=1 -run TestRead ./bindings/twitter/. -func TestRead(t *testing.T) { - if os.Getenv("RUN_LIVE_TW_TEST") != "true" { - t.SkipNow() // skip this test until able to read credentials in test infra - } - m := bindings.Metadata{} - m.Properties = getRuntimeMetadata() - // add query - m.Properties["query"] = "microsoft" - tw := NewTwitter(logger.NewLogger("test")).(*Binding) - tw.logger.SetOutputLevel(logger.DebugLevel) - err := tw.Init(context.Background(), m) - assert.Nilf(t, err, "error initializing read") - - ctx, cancel := context.WithCancel(context.Background()) - counter := 0 - err = tw.Read(ctx, func(ctx context.Context, res *bindings.ReadResponse) ([]byte, error) { - counter++ - t.Logf("tweet[%d]", counter) - var tweet twitter.Tweet - json.Unmarshal(res.Data, &tweet) - assert.NotEmpty(t, tweet.IDStr, "tweet should have an ID") - cancel() - - return nil, nil - }) - assert.Nilf(t, err, "error on read") - select { - case <-ctx.Done(): - // do nothing - case <-time.After(30 * time.Second): - cancel() - t.Fatal("Timeout waiting for messages") - } - - assert.NoError(t, tw.Close()) -} - -// TestInvoke executes the Invoke method which calls Twiter API -// test tokens must be set -// env RUN_LIVE_TW_TEST=true go test -v -count=1 -run TestInvoke ./bindings/twitter/. -func TestInvoke(t *testing.T) { - if os.Getenv("RUN_LIVE_TW_TEST") != "true" { - t.SkipNow() // skip this test until able to read credentials in test infra - } - m := bindings.Metadata{} - m.Properties = getRuntimeMetadata() - tw := NewTwitter(logger.NewLogger("test")).(*Binding) - tw.logger.SetOutputLevel(logger.DebugLevel) - err := tw.Init(context.Background(), m) - assert.Nilf(t, err, "error initializing Invoke") - - req := &bindings.InvokeRequest{ - Metadata: map[string]string{ - "query": "microsoft", - }, - } - - resp, err := tw.Invoke(context.Background(), req) - assert.Nilf(t, err, "error on invoke") - assert.NotNil(t, resp) - assert.NoError(t, tw.Close()) -} diff --git a/go.mod b/go.mod index 4f1bb9f3e..00e7c44ab 100644 --- a/go.mod +++ b/go.mod @@ -50,8 +50,6 @@ require ( github.com/dancannon/gorethink v4.0.0+incompatible github.com/dapr/kit v0.0.4 github.com/denisenkom/go-mssqldb v0.12.3 - github.com/dghubble/go-twitter v0.0.0-20221104224141-912508c3888b - github.com/dghubble/oauth1 v0.7.2 github.com/didip/tollbooth/v7 v7.0.1 github.com/eclipse/paho.mqtt.golang v1.4.2 github.com/fasthttp-contrib/sessions v0.0.0-20160905201309-74f6ac73d5d5 @@ -182,7 +180,6 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect github.com/deepmap/oapi-codegen v1.3.6 // indirect - github.com/dghubble/sling v1.4.0 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dimchansky/utfbom v1.1.1 // indirect github.com/dubbogo/gost v1.13.1 // indirect diff --git a/go.sum b/go.sum index f17fb1150..aeadb7ef8 100644 --- a/go.sum +++ b/go.sum @@ -723,12 +723,6 @@ github.com/deepmap/oapi-codegen v1.3.6 h1:Wj44p9A0V0PJ+AUg0BWdyGcsS1LY18U+0rCuPQ github.com/deepmap/oapi-codegen v1.3.6/go.mod h1:aBozjEveG+33xPiP55Iw/XbVkhtZHEGLq3nxlX0+hfU= github.com/denisenkom/go-mssqldb v0.12.3 h1:pBSGx9Tq67pBOTLmxNuirNTeB8Vjmf886Kx+8Y+8shw= github.com/denisenkom/go-mssqldb v0.12.3/go.mod h1:k0mtMFOnU+AihqFxPMiF05rtiDrorD1Vrm1KEz5hxDo= -github.com/dghubble/go-twitter v0.0.0-20221104224141-912508c3888b h1:XQu6o3AwJx/jsg9LZ41uIeUdXK5be099XFfFn6H9ikk= -github.com/dghubble/go-twitter v0.0.0-20221104224141-912508c3888b/go.mod h1:B0/qdW5XUupJvcsx40hnVbfjzz9He5YpYXx6eVVdiSY= -github.com/dghubble/oauth1 v0.7.2 h1:pwcinOZy8z6XkNxvPmUDY52M7RDPxt0Xw1zgZ6Cl5JA= -github.com/dghubble/oauth1 v0.7.2/go.mod h1:9erQdIhqhOHG/7K9s/tgh9Ks/AfoyrO5mW/43Lu2+kE= -github.com/dghubble/sling v1.4.0 h1:/n8MRosVTthvMbwlNZgLx579OGVjUOy3GNEv5BIqAWY= -github.com/dghubble/sling v1.4.0/go.mod h1:0r40aNsU9EdDUVBNhfCstAtFgutjgJGYbO1oNzkMoM8= github.com/dgraph-io/badger/v3 v3.2103.5 h1:ylPa6qzbjYRQMU6jokoj4wzcaweHylt//CH0AKt0akg= github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWajOK8= github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= diff --git a/pubsub/hazelcast/hazelcast.go b/pubsub/hazelcast/hazelcast.go deleted file mode 100644 index c6d1b3fec..000000000 --- a/pubsub/hazelcast/hazelcast.go +++ /dev/null @@ -1,183 +0,0 @@ -/* -Copyright 2021 The Dapr Authors -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package hazelcast - -import ( - "context" - "errors" - "fmt" - "strconv" - "strings" - "time" - - "github.com/cenkalti/backoff/v4" - "github.com/hazelcast/hazelcast-go-client" - hazelcastCore "github.com/hazelcast/hazelcast-go-client/core" - - "github.com/dapr/components-contrib/pubsub" - "github.com/dapr/kit/logger" - "github.com/dapr/kit/retry" -) - -const ( - hazelcastServers = "hazelcastServers" - hazelcastBackOffMaxRetries = "backOffMaxRetries" -) - -type Hazelcast struct { - client hazelcast.Client - logger logger.Logger - metadata metadata -} - -// NewHazelcastPubSub returns a new hazelcast pub-sub implementation. -func NewHazelcastPubSub(logger logger.Logger) pubsub.PubSub { - return &Hazelcast{logger: logger} -} - -func parseHazelcastMetadata(meta pubsub.Metadata) (metadata, error) { - m := metadata{} - if val, ok := meta.Properties[hazelcastServers]; ok && val != "" { - m.hazelcastServers = val - } else { - return m, errors.New("hazelcast error: missing hazelcast servers") - } - - if val, ok := meta.Properties[hazelcastBackOffMaxRetries]; ok && val != "" { - backOffMaxRetriesInt, err := strconv.Atoi(val) - if err != nil { - return m, fmt.Errorf("hazelcast error: invalid backOffMaxRetries %s, %v", val, err) - } - m.backOffMaxRetries = backOffMaxRetriesInt - } - - return m, nil -} - -func (p *Hazelcast) Init(ctx context.Context, metadata pubsub.Metadata) error { - p.logger.Warnf("DEPRECATION NOTICE: Component pubsub.hazelcast has been deprecated and will be removed in a future Dapr release.") - - m, err := parseHazelcastMetadata(metadata) - if err != nil { - return err - } - - p.metadata = m - hzConfig := hazelcast.NewConfig() - - servers := m.hazelcastServers - hzConfig.NetworkConfig().AddAddress(strings.Split(servers, ",")...) - - p.client, err = hazelcast.NewClientWithConfig(hzConfig) - if err != nil { - return fmt.Errorf("hazelcast error: failed to create new client, %v", err) - } - - return nil -} - -func (p *Hazelcast) Publish(ctx context.Context, req *pubsub.PublishRequest) error { - topic, err := p.client.GetTopic(req.Topic) - if err != nil { - return fmt.Errorf("hazelcast error: failed to get topic for %s", req.Topic) - } - - if err = topic.Publish(req.Data); err != nil { - return fmt.Errorf("hazelcast error: failed to publish data, %v", err) - } - - return nil -} - -func (p *Hazelcast) Subscribe(subscribeCtx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error { - topic, err := p.client.GetTopic(req.Topic) - if err != nil { - return fmt.Errorf("hazelcast error: failed to get topic for %s", req.Topic) - } - - listenerID, err := topic.AddMessageListener(&hazelcastMessageListener{ - p: p, - ctx: subscribeCtx, - topicName: topic.Name(), - pubsubHandler: handler, - }) - if err != nil { - return fmt.Errorf("hazelcast error: failed to add new listener, %v", err) - } - - // Wait for context cancelation then remove the listener - go func() { - <-subscribeCtx.Done() - topic.RemoveMessageListener(listenerID) - }() - - return nil -} - -func (p *Hazelcast) Close() error { - p.client.Shutdown() - - return nil -} - -func (p *Hazelcast) Features() []pubsub.Feature { - return nil -} - -type hazelcastMessageListener struct { - p *Hazelcast - ctx context.Context - topicName string - pubsubHandler pubsub.Handler -} - -func (l *hazelcastMessageListener) OnMessage(message hazelcastCore.Message) error { - msg, ok := message.MessageObject().([]byte) - if !ok { - return errors.New("hazelcast error: cannot cast message to byte array") - } - - if err := l.handleMessageObject(msg); err != nil { - l.p.logger.Error("Failure processing Hazelcast message") - - return err - } - - return nil -} - -func (l *hazelcastMessageListener) handleMessageObject(message []byte) error { - pubsubMsg := pubsub.NewMessage{ - Data: message, - Topic: l.topicName, - } - - // TODO: See https://github.com/dapr/components-contrib/issues/1808 - // This component has built-in retries because Hazelcast doesn't support N/ACK for pubsub (it delivers messages "once" and not "at least once") - var b backoff.BackOff = backoff.NewConstantBackOff(5 * time.Second) - b = backoff.WithContext(b, l.ctx) - if l.p.metadata.backOffMaxRetries >= 0 { - b = backoff.WithMaxRetries(b, uint64(l.p.metadata.backOffMaxRetries)) - } - - return retry.NotifyRecover(func() error { - l.p.logger.Debug("Processing Hazelcast message") - - return l.pubsubHandler(l.ctx, &pubsubMsg) - }, b, func(err error, d time.Duration) { - l.p.logger.Error("Error processing Hazelcast message. Retrying...") - }, func() { - l.p.logger.Info("Successfully processed Hazelcast message after it previously failed") - }) -} diff --git a/pubsub/hazelcast/metadata.go b/pubsub/hazelcast/metadata.go deleted file mode 100644 index af367a6d1..000000000 --- a/pubsub/hazelcast/metadata.go +++ /dev/null @@ -1,19 +0,0 @@ -/* -Copyright 2021 The Dapr Authors -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package hazelcast - -type metadata struct { - hazelcastServers string - backOffMaxRetries int -} diff --git a/pubsub/hazelcast/metadata_test.go b/pubsub/hazelcast/metadata_test.go deleted file mode 100644 index db4a96003..000000000 --- a/pubsub/hazelcast/metadata_test.go +++ /dev/null @@ -1,39 +0,0 @@ -/* -Copyright 2021 The Dapr Authors -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package hazelcast - -import ( - "testing" - - "github.com/stretchr/testify/assert" - - mdata "github.com/dapr/components-contrib/metadata" - "github.com/dapr/components-contrib/pubsub" -) - -func TestValidateMetadata(t *testing.T) { - t.Run("return error when required servers is empty", func(t *testing.T) { - fakeMetaData := pubsub.Metadata{Base: mdata.Base{ - Properties: map[string]string{ - hazelcastServers: "", - }, - }} - - m, err := parseHazelcastMetadata(fakeMetaData) - - // assert - assert.Error(t, err) - assert.Empty(t, m) - }) -} diff --git a/tests/config/pubsub/hazelcast/pubsub.yml b/tests/config/pubsub/hazelcast/pubsub.yml deleted file mode 100644 index df2cd2e42..000000000 --- a/tests/config/pubsub/hazelcast/pubsub.yml +++ /dev/null @@ -1,12 +0,0 @@ -apiVersion: dapr.io/v1alpha1 -kind: Component -metadata: - name: pubsub -spec: - type: pubsub.hazelcast - version: v1 - metadata: - - name: hazelcastServers - value: "localhost:5701" - - name: backOffMaxRetries - value: 3 \ No newline at end of file diff --git a/tests/config/pubsub/tests.yml b/tests/config/pubsub/tests.yml index 290238674..892008da4 100644 --- a/tests/config/pubsub/tests.yml +++ b/tests/config/pubsub/tests.yml @@ -71,8 +71,6 @@ components: - component: mqtt3 profile: vernemq operations: ['publish', 'subscribe', 'multiplehandlers'] - - component: hazelcast - operations: ['publish', 'subscribe', 'multiplehandlers'] - component: rabbitmq operations: ['publish', 'subscribe', 'multiplehandlers'] config: diff --git a/tests/conformance/common.go b/tests/conformance/common.go index 5b4ca81e0..475acb38c 100644 --- a/tests/conformance/common.go +++ b/tests/conformance/common.go @@ -62,7 +62,6 @@ import ( p_eventhubs "github.com/dapr/components-contrib/pubsub/azure/eventhubs" p_servicebusqueues "github.com/dapr/components-contrib/pubsub/azure/servicebus/queues" p_servicebustopics "github.com/dapr/components-contrib/pubsub/azure/servicebus/topics" - p_hazelcast "github.com/dapr/components-contrib/pubsub/hazelcast" p_inmemory "github.com/dapr/components-contrib/pubsub/in-memory" p_jetstream "github.com/dapr/components-contrib/pubsub/jetstream" p_kafka "github.com/dapr/components-contrib/pubsub/kafka" @@ -480,8 +479,6 @@ func loadPubSub(tc TestComponent) pubsub.PubSub { pubsub = p_pulsar.NewPulsar(testLogger) case "mqtt3": pubsub = p_mqtt3.NewMQTTPubSub(testLogger) - case "hazelcast": - pubsub = p_hazelcast.NewHazelcastPubSub(testLogger) case "rabbitmq": pubsub = p_rabbitmq.NewRabbitMQ(testLogger) case "in-memory":