Merge remote-tracking branch 'upstream/master' into azqueue

This commit is contained in:
Bernd Verst 2023-02-24 15:52:10 -08:00
commit 358ebbf0bc
17 changed files with 174 additions and 811 deletions

View File

@ -1,6 +0,0 @@
version: '2'
services:
hazelcast:
image: hazelcast/hazelcast:3.12.12-1
ports:
- 5701:5701

View File

@ -208,10 +208,6 @@ const components = {
'AzureCertificationServicePrincipalClientSecret',
],
},
'pubsub.hazelcast': {
conformance: true,
conformanceSetup: 'docker-compose.sh hazelcast',
},
'pubsub.in-memory': {
conformance: true,
},

View File

@ -1,229 +0,0 @@
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package twitter
//nolint:staticcheck
import (
"context"
"encoding/json"
"errors"
"fmt"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/dghubble/go-twitter/twitter"
"github.com/dghubble/oauth1"
"github.com/dapr/components-contrib/bindings"
"github.com/dapr/kit/logger"
)
// Binding represents Twitter input/output binding.
type Binding struct {
client *twitter.Client
query string
logger logger.Logger
closed atomic.Bool
closeCh chan struct{}
wg sync.WaitGroup
}
// NewTwitter returns a new Twitter event input binding.
func NewTwitter(logger logger.Logger) bindings.InputOutputBinding {
return &Binding{logger: logger, closeCh: make(chan struct{})}
}
// Init initializes the Twitter binding.
func (t *Binding) Init(ctx context.Context, metadata bindings.Metadata) error {
t.logger.Warnf("DEPRECATION NOTICE: Component bindings.twitter has been deprecated and will be removed in a future Dapr release.")
ck, f := metadata.Properties["consumerKey"]
if !f || ck == "" {
return fmt.Errorf("consumerKey not set")
}
cs, f := metadata.Properties["consumerSecret"]
if !f || cs == "" {
return fmt.Errorf("consumerSecret not set")
}
at, f := metadata.Properties["accessToken"]
if !f || at == "" {
return fmt.Errorf("accessToken not set")
}
as, f := metadata.Properties["accessSecret"]
if !f || as == "" {
return fmt.Errorf("accessSecret not set")
}
// set query only in an input binding case
q, f := metadata.Properties["query"]
if f {
t.query = q
}
config := oauth1.NewConfig(ck, cs)
token := oauth1.NewToken(at, as)
httpClient := config.Client(oauth1.NoContext, token)
t.client = twitter.NewClient(httpClient)
return nil
}
// Operations returns list of operations supported by twitter binding.
func (t *Binding) Operations() []bindings.OperationKind {
return []bindings.OperationKind{bindings.GetOperation}
}
// Read triggers the Twitter search and events on each result tweet.
func (t *Binding) Read(ctx context.Context, handler bindings.Handler) error {
if t.query == "" {
return errors.New("metadata property 'query' is empty")
}
demux := twitter.NewSwitchDemux()
demux.Tweet = func(tweet *twitter.Tweet) {
t.logger.Debugf("raw tweet: %+v", tweet)
data, marshalErr := json.Marshal(tweet)
if marshalErr != nil {
t.logger.Errorf("error marshaling tweet: %+v", tweet)
return
}
handler(ctx, &bindings.ReadResponse{
Data: data,
Metadata: map[string]string{
"query": t.query,
},
})
}
demux.StreamLimit = func(limit *twitter.StreamLimit) {
t.logger.Warnf("disconnect: %+v", limit)
}
demux.StreamDisconnect = func(disconnect *twitter.StreamDisconnect) {
t.logger.Errorf("stream disconnect: %+v", disconnect)
}
filterParams := &twitter.StreamFilterParams{
Track: []string{t.query},
StallWarnings: twitter.Bool(true),
}
t.logger.Debug("starting stream for query: %s", t.query)
stream, err := t.client.Streams.Filter(filterParams)
if err != nil {
return fmt.Errorf("error executing stream filter '%+v': %w", filterParams, err)
}
t.logger.Debug("starting handler...")
t.wg.Add(2)
go func() {
defer t.wg.Done()
demux.HandleChan(stream.Messages)
}()
go func() {
defer t.wg.Done()
select {
case <-t.closeCh:
case <-ctx.Done():
}
t.logger.Debug("stopping handler...")
stream.Stop()
}()
return nil
}
func (t *Binding) Close() error {
if t.closed.CompareAndSwap(false, true) {
close(t.closeCh)
}
t.wg.Wait()
return nil
}
// Invoke handles all operations.
func (t *Binding) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
t.logger.Debugf("operation: %v", req.Operation)
if req.Metadata == nil {
return nil, fmt.Errorf("metadata not set")
}
// required
q, f := req.Metadata["query"]
if !f || q == "" {
return nil, fmt.Errorf("query not set")
}
// optionals
l, f := req.Metadata["lang"]
if !f || l == "" {
l = "en"
}
r, f := req.Metadata["result"]
if !f || r == "" {
// mixed : Include both popular and real time results in the response
// recent : return only the most recent results in the response
// popular : return only the most popular results in the response
r = "recent"
}
var sinceID int64
s, f := req.Metadata["since_id"]
if f && s != "" {
i, err := strconv.ParseInt(s, 10, 64)
if err == nil {
sinceID = i
}
}
sq := &twitter.SearchTweetParams{
Count: 100, // max
Lang: l,
SinceID: sinceID,
Query: q,
ResultType: r,
IncludeEntities: twitter.Bool(true),
}
t.logger.Debug("starting stream for: %+v", sq)
search, _, err := t.client.Search.Tweets(sq)
if err != nil {
return nil, fmt.Errorf("error executing search filter '%+v': %w", sq, err)
}
if search == nil || search.Statuses == nil {
return nil, fmt.Errorf("nil search result from '%+v'", sq)
}
t.logger.Debugf("raw response: %+v", search.Statuses)
data, marshalErr := json.Marshal(search.Statuses)
if marshalErr != nil {
t.logger.Errorf("error marshaling tweet: %v", marshalErr)
return nil, fmt.Errorf("error parsing response from '%+v': %w", sq, marshalErr)
}
req.Metadata["max_tweet_id"] = search.Metadata.MaxIDStr
req.Metadata["tweet_count"] = strconv.Itoa(search.Metadata.Count)
req.Metadata["search_ts"] = time.Now().UTC().String()
ir := &bindings.InvokeResponse{
Data: data,
Metadata: req.Metadata,
}
return ir, nil
}

View File

@ -1,149 +0,0 @@
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package twitter
//nolint:staticcheck
import (
"context"
"encoding/json"
"os"
"testing"
"time"
"github.com/dghubble/go-twitter/twitter"
"github.com/stretchr/testify/assert"
"github.com/dapr/components-contrib/bindings"
"github.com/dapr/kit/logger"
)
const (
testTwitterConsumerKey = "test-consumerKey"
testTwitterConsumerSecret = "test-consumerSecret"
testTwitterAccessToken = "test-accessToken"
testTwitterAccessSecret = "test-accessSecret"
)
func getTestMetadata() bindings.Metadata {
m := bindings.Metadata{}
m.Properties = map[string]string{
"consumerKey": testTwitterConsumerKey,
"consumerSecret": testTwitterConsumerSecret,
"accessToken": testTwitterAccessToken,
"accessSecret": testTwitterAccessSecret,
}
return m
}
func getRuntimeMetadata() map[string]string {
return map[string]string{
"consumerKey": os.Getenv("CONSUMER_KEY"),
"consumerSecret": os.Getenv("CONSUMER_SECRET"),
"accessToken": os.Getenv("ACCESS_TOKEN"),
"accessSecret": os.Getenv("ACCESS_SECRET"),
}
}
// go test -v -count=1 ./bindings/twitter/.
func TestInit(t *testing.T) {
m := getTestMetadata()
tw := NewTwitter(logger.NewLogger("test")).(*Binding)
err := tw.Init(context.Background(), m)
assert.Nilf(t, err, "error initializing valid metadata properties")
}
// TestReadError excutes the Read method and fails before the Twitter API call
// go test -v -count=1 -run TestReadError ./bindings/twitter/.
func TestReadError(t *testing.T) {
tw := NewTwitter(logger.NewLogger("test")).(*Binding)
m := getTestMetadata()
err := tw.Init(context.Background(), m)
assert.Nilf(t, err, "error initializing valid metadata properties")
err = tw.Read(context.Background(), func(ctx context.Context, res *bindings.ReadResponse) ([]byte, error) {
t.Logf("result: %+v", res)
assert.NotNilf(t, err, "no error on read with invalid credentials")
return nil, nil
})
assert.Error(t, err)
assert.NoError(t, tw.Close())
}
// TestRead executes the Read method which calls Twiter API
// env RUN_LIVE_TW_TEST=true go test -v -count=1 -run TestRead ./bindings/twitter/.
func TestRead(t *testing.T) {
if os.Getenv("RUN_LIVE_TW_TEST") != "true" {
t.SkipNow() // skip this test until able to read credentials in test infra
}
m := bindings.Metadata{}
m.Properties = getRuntimeMetadata()
// add query
m.Properties["query"] = "microsoft"
tw := NewTwitter(logger.NewLogger("test")).(*Binding)
tw.logger.SetOutputLevel(logger.DebugLevel)
err := tw.Init(context.Background(), m)
assert.Nilf(t, err, "error initializing read")
ctx, cancel := context.WithCancel(context.Background())
counter := 0
err = tw.Read(ctx, func(ctx context.Context, res *bindings.ReadResponse) ([]byte, error) {
counter++
t.Logf("tweet[%d]", counter)
var tweet twitter.Tweet
json.Unmarshal(res.Data, &tweet)
assert.NotEmpty(t, tweet.IDStr, "tweet should have an ID")
cancel()
return nil, nil
})
assert.Nilf(t, err, "error on read")
select {
case <-ctx.Done():
// do nothing
case <-time.After(30 * time.Second):
cancel()
t.Fatal("Timeout waiting for messages")
}
assert.NoError(t, tw.Close())
}
// TestInvoke executes the Invoke method which calls Twiter API
// test tokens must be set
// env RUN_LIVE_TW_TEST=true go test -v -count=1 -run TestInvoke ./bindings/twitter/.
func TestInvoke(t *testing.T) {
if os.Getenv("RUN_LIVE_TW_TEST") != "true" {
t.SkipNow() // skip this test until able to read credentials in test infra
}
m := bindings.Metadata{}
m.Properties = getRuntimeMetadata()
tw := NewTwitter(logger.NewLogger("test")).(*Binding)
tw.logger.SetOutputLevel(logger.DebugLevel)
err := tw.Init(context.Background(), m)
assert.Nilf(t, err, "error initializing Invoke")
req := &bindings.InvokeRequest{
Metadata: map[string]string{
"query": "microsoft",
},
}
resp, err := tw.Invoke(context.Background(), req)
assert.Nilf(t, err, "error on invoke")
assert.NotNil(t, resp)
assert.NoError(t, tw.Close())
}

3
go.mod
View File

@ -50,8 +50,6 @@ require (
github.com/dancannon/gorethink v4.0.0+incompatible
github.com/dapr/kit v0.0.4
github.com/denisenkom/go-mssqldb v0.12.3
github.com/dghubble/go-twitter v0.0.0-20221104224141-912508c3888b
github.com/dghubble/oauth1 v0.7.2
github.com/didip/tollbooth/v7 v7.0.1
github.com/eclipse/paho.mqtt.golang v1.4.2
github.com/fasthttp-contrib/sessions v0.0.0-20160905201309-74f6ac73d5d5
@ -181,7 +179,6 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect
github.com/deepmap/oapi-codegen v1.3.6 // indirect
github.com/dghubble/sling v1.4.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/dimchansky/utfbom v1.1.1 // indirect
github.com/dubbogo/gost v1.13.1 // indirect

6
go.sum
View File

@ -720,12 +720,6 @@ github.com/deepmap/oapi-codegen v1.3.6 h1:Wj44p9A0V0PJ+AUg0BWdyGcsS1LY18U+0rCuPQ
github.com/deepmap/oapi-codegen v1.3.6/go.mod h1:aBozjEveG+33xPiP55Iw/XbVkhtZHEGLq3nxlX0+hfU=
github.com/denisenkom/go-mssqldb v0.12.3 h1:pBSGx9Tq67pBOTLmxNuirNTeB8Vjmf886Kx+8Y+8shw=
github.com/denisenkom/go-mssqldb v0.12.3/go.mod h1:k0mtMFOnU+AihqFxPMiF05rtiDrorD1Vrm1KEz5hxDo=
github.com/dghubble/go-twitter v0.0.0-20221104224141-912508c3888b h1:XQu6o3AwJx/jsg9LZ41uIeUdXK5be099XFfFn6H9ikk=
github.com/dghubble/go-twitter v0.0.0-20221104224141-912508c3888b/go.mod h1:B0/qdW5XUupJvcsx40hnVbfjzz9He5YpYXx6eVVdiSY=
github.com/dghubble/oauth1 v0.7.2 h1:pwcinOZy8z6XkNxvPmUDY52M7RDPxt0Xw1zgZ6Cl5JA=
github.com/dghubble/oauth1 v0.7.2/go.mod h1:9erQdIhqhOHG/7K9s/tgh9Ks/AfoyrO5mW/43Lu2+kE=
github.com/dghubble/sling v1.4.0 h1:/n8MRosVTthvMbwlNZgLx579OGVjUOy3GNEv5BIqAWY=
github.com/dghubble/sling v1.4.0/go.mod h1:0r40aNsU9EdDUVBNhfCstAtFgutjgJGYbO1oNzkMoM8=
github.com/dgraph-io/badger/v3 v3.2103.5 h1:ylPa6qzbjYRQMU6jokoj4wzcaweHylt//CH0AKt0akg=
github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWajOK8=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=

View File

@ -14,9 +14,8 @@ limitations under the License.
package consul
import (
"crypto/rand"
"fmt"
"math/big"
"math/rand"
"net"
"strconv"
@ -81,46 +80,50 @@ type resolverConfig struct {
// NewResolver creates Consul name resolver.
func NewResolver(logger logger.Logger) nr.Resolver {
return newResolver(logger, resolverConfig{}, &client{})
return newResolver(logger, &client{})
}
func newResolver(logger logger.Logger, resolverConfig resolverConfig, client clientInterface) nr.Resolver {
func newResolver(logger logger.Logger, client clientInterface) *resolver {
return &resolver{
logger: logger,
config: resolverConfig,
client: client,
}
}
// Init will configure component. It will also register service or validate client connection based on config.
func (r *resolver) Init(metadata nr.Metadata) error {
var err error
func (r *resolver) Init(metadata nr.Metadata) (err error) {
r.config, err = getConfig(metadata)
if err != nil {
return err
}
if err = r.client.InitClient(r.config.Client); err != nil {
err = r.client.InitClient(r.config.Client)
if err != nil {
return fmt.Errorf("failed to init consul client: %w", err)
}
// register service to consul
// Register service to consul
if r.config.Registration != nil {
if err := r.client.Agent().ServiceRegister(r.config.Registration); err != nil {
agent := r.client.Agent()
err = agent.ServiceRegister(r.config.Registration)
if err != nil {
return fmt.Errorf("failed to register consul service: %w", err)
}
r.logger.Infof("service:%s registered on consul agent", r.config.Registration.Name)
} else if _, err := r.client.Agent().Self(); err != nil {
return fmt.Errorf("failed check on consul agent: %w", err)
} else {
_, err = r.client.Agent().Self()
if err != nil {
return fmt.Errorf("failed check on consul agent: %w", err)
}
}
return nil
}
// ResolveID resolves name to address via consul.
func (r *resolver) ResolveID(req nr.ResolveRequest) (string, error) {
func (r *resolver) ResolveID(req nr.ResolveRequest) (addr string, err error) {
cfg := r.config
services, _, err := r.client.Health().Service(req.ID, "", true, cfg.QueryOptions)
if err != nil {
@ -128,49 +131,33 @@ func (r *resolver) ResolveID(req nr.ResolveRequest) (string, error) {
}
if len(services) == 0 {
return "", fmt.Errorf("no healthy services found with AppID:%s", req.ID)
return "", fmt.Errorf("no healthy services found with AppID '%s'", req.ID)
}
shuffle := func(services []*consul.ServiceEntry) []*consul.ServiceEntry {
for i := len(services) - 1; i > 0; i-- {
rndbig, _ := rand.Int(rand.Reader, big.NewInt(int64(i+1)))
j := rndbig.Int64()
// Pick a random service from the result
// Note: we're using math/random here as PRNG and that's ok since we're just using this for selecting a random address from a list for load-balancing, so we don't need a CSPRNG
//nolint:gosec
svc := services[rand.Int()%len(services)]
services[i], services[j] = services[j], services[i]
}
return services
port := svc.Service.Meta[cfg.DaprPortMetaKey]
if port == "" {
return "", fmt.Errorf("target service AppID '%s' found but DAPR_PORT missing from meta", req.ID)
}
svc := shuffle(services)[0]
addr := ""
if port, ok := svc.Service.Meta[cfg.DaprPortMetaKey]; ok {
if svc.Service.Address != "" {
addr = fmt.Sprintf("%s:%s", svc.Service.Address, port)
} else if svc.Node.Address != "" {
addr = fmt.Sprintf("%s:%s", svc.Node.Address, port)
} else {
return "", fmt.Errorf("no healthy services found with AppID:%s", req.ID)
}
if svc.Service.Address != "" {
addr = svc.Service.Address + ":" + port
} else if svc.Node.Address != "" {
addr = svc.Node.Address + ":" + port
} else {
return "", fmt.Errorf("target service AppID:%s found but DAPR_PORT missing from meta", req.ID)
return "", fmt.Errorf("no healthy services found with AppID '%s'", req.ID)
}
return addr, nil
}
// getConfig configuration from metadata, defaults are best suited for self-hosted mode.
func getConfig(metadata nr.Metadata) (resolverConfig, error) {
var daprPort string
var ok bool
var err error
resolverCfg := resolverConfig{}
props := metadata.Properties
if daprPort, ok = props[nr.DaprPort]; !ok {
func getConfig(metadata nr.Metadata) (resolverCfg resolverConfig, err error) {
if metadata.Properties[nr.DaprPort] == "" {
return resolverCfg, fmt.Errorf("metadata property missing: %s", nr.DaprPort)
}
@ -187,7 +174,8 @@ func getConfig(metadata nr.Metadata) (resolverConfig, error) {
}
resolverCfg.Client = getClientConfig(cfg)
if resolverCfg.Registration, err = getRegistrationConfig(cfg, props); err != nil {
resolverCfg.Registration, err = getRegistrationConfig(cfg, metadata.Properties)
if err != nil {
return resolverCfg, err
}
resolverCfg.QueryOptions = getQueryOptionsConfig(cfg)
@ -198,7 +186,7 @@ func getConfig(metadata nr.Metadata) (resolverConfig, error) {
resolverCfg.Registration.Meta = map[string]string{}
}
resolverCfg.Registration.Meta[resolverCfg.DaprPortMetaKey] = daprPort
resolverCfg.Registration.Meta[resolverCfg.DaprPortMetaKey] = metadata.Properties[nr.DaprPort]
}
return resolverCfg, nil
@ -217,15 +205,18 @@ func getRegistrationConfig(cfg configSpec, props map[string]string) (*consul.Age
// if advanced registration configured ignore other registration related configs
if cfg.AdvancedRegistration != nil {
return cfg.AdvancedRegistration, nil
} else if !cfg.SelfRegister {
}
if !cfg.SelfRegister {
return nil, nil
}
var appID string
var appPort string
var host string
var httpPort string
var ok bool
var (
appID string
appPort string
host string
httpPort string
ok bool
)
if appID, ok = props[nr.AppID]; !ok {
return nil, fmt.Errorf("metadata property missing: %s", nr.AppID)

View File

@ -99,7 +99,7 @@ func TestInit(t *testing.T) {
t.Helper()
var mock mockClient
resolver := newResolver(logger.NewLogger("test"), resolverConfig{}, &mock)
resolver := newResolver(logger.NewLogger("test"), &mock)
_ = resolver.Init(metadata)
@ -122,7 +122,7 @@ func TestInit(t *testing.T) {
t.Helper()
var mock mockClient
resolver := newResolver(logger.NewLogger("test"), resolverConfig{}, &mock)
resolver := newResolver(logger.NewLogger("test"), &mock)
_ = resolver.Init(metadata)
@ -144,7 +144,7 @@ func TestInit(t *testing.T) {
t.Helper()
var mock mockClient
resolver := newResolver(logger.NewLogger("test"), resolverConfig{}, &mock)
resolver := newResolver(logger.NewLogger("test"), &mock)
_ = resolver.Init(metadata)
@ -166,7 +166,7 @@ func TestInit(t *testing.T) {
func TestResolveID(t *testing.T) {
t.Parallel()
testConfig := &resolverConfig{
testConfig := resolverConfig{
DaprPortMetaKey: "DAPR_PORT",
}
@ -187,7 +187,8 @@ func TestResolveID(t *testing.T) {
serviceResult: []*consul.ServiceEntry{},
},
}
resolver := newResolver(logger.NewLogger("test"), *testConfig, &mock)
resolver := newResolver(logger.NewLogger("test"), &mock)
resolver.config = testConfig
_, err := resolver.ResolveID(req)
assert.Equal(t, 1, mock.mockHealth.serviceCalled)
@ -216,13 +217,68 @@ func TestResolveID(t *testing.T) {
},
},
}
resolver := newResolver(logger.NewLogger("test"), *testConfig, &mock)
resolver := newResolver(logger.NewLogger("test"), &mock)
resolver.config = testConfig
addr, _ := resolver.ResolveID(req)
assert.Equal(t, "123.234.345.456:50005", addr)
},
},
{
"should get random address from service",
nr.ResolveRequest{
ID: "test-app",
},
func(t *testing.T, req nr.ResolveRequest) {
t.Helper()
mock := mockClient{
mockHealth: mockHealth{
serviceResult: []*consul.ServiceEntry{
{
Service: &consul.AgentService{
Address: "123.234.345.456",
Port: 8600,
Meta: map[string]string{
"DAPR_PORT": "50005",
},
},
},
{
Service: &consul.AgentService{
Address: "234.345.456.678",
Port: 8600,
Meta: map[string]string{
"DAPR_PORT": "50005",
},
},
},
},
},
}
resolver := newResolver(logger.NewLogger("test"), &mock)
resolver.config = testConfig
total1 := 0
total2 := 0
for i := 0; i < 100; i++ {
addr, _ := resolver.ResolveID(req)
if addr == "123.234.345.456:50005" {
total1++
} else if addr == "234.345.456.678:50005" {
total2++
} else {
t.Fatalf("Received unexpected address: %s", addr)
}
}
// Because of the random nature of the address being returned, we just check to make sure we get at least 20 of each (and a total of 100)
assert.Equal(t, 100, total1+total2)
assert.Greater(t, total1, 20)
assert.Greater(t, total2, 20)
},
},
{
"should get address from node if not on service",
nr.ResolveRequest{
@ -260,7 +316,8 @@ func TestResolveID(t *testing.T) {
},
},
}
resolver := newResolver(logger.NewLogger("test"), *testConfig, &mock)
resolver := newResolver(logger.NewLogger("test"), &mock)
resolver.config = testConfig
addr, _ := resolver.ResolveID(req)
@ -289,7 +346,8 @@ func TestResolveID(t *testing.T) {
},
},
}
resolver := newResolver(logger.NewLogger("test"), *testConfig, &mock)
resolver := newResolver(logger.NewLogger("test"), &mock)
resolver.config = testConfig
_, err := resolver.ResolveID(req)
@ -315,7 +373,8 @@ func TestResolveID(t *testing.T) {
},
},
}
resolver := newResolver(logger.NewLogger("test"), *testConfig, &mock)
resolver := newResolver(logger.NewLogger("test"), &mock)
resolver.config = testConfig
_, err := resolver.ResolveID(req)

View File

@ -238,9 +238,10 @@ func (m *Resolver) startRefreshers() {
// refresh app addresses periodically and on demand
go func() {
defer func() {
m.refreshRunning.Store(false)
}()
defer m.refreshRunning.Store(false)
t := time.NewTicker(refreshInterval)
defer t.Stop()
for {
select {
@ -252,7 +253,7 @@ func (m *Resolver) startRefreshers() {
}
}()
// Refresh periodically
case <-time.After(refreshInterval):
case <-t.C:
go func() {
if err := m.refreshAllApps(m.runCtx); err != nil {
m.logger.Warnf(err.Error())
@ -269,37 +270,28 @@ func (m *Resolver) startRefreshers() {
// Init registers service for mDNS.
func (m *Resolver) Init(metadata nameresolution.Metadata) error {
var (
appID string
hostAddress string
ok bool
instanceID string
)
props := metadata.Properties
if appID, ok = props[nameresolution.MDNSInstanceName]; !ok {
appID := props[nameresolution.AppID]
if appID == "" {
return errors.New("name is missing")
}
if hostAddress, ok = props[nameresolution.MDNSInstanceAddress]; !ok {
hostAddress := props[nameresolution.HostAddress]
if hostAddress == "" {
return errors.New("address is missing")
}
p, ok := props[nameresolution.MDNSInstancePort]
if !ok {
if props[nameresolution.DaprPort] == "" {
return errors.New("port is missing")
}
port, err := strconv.ParseInt(p, 10, 32)
port, err := strconv.Atoi(props[nameresolution.DaprPort])
if err != nil {
return errors.New("port is invalid")
}
if instanceID, ok = props[nameresolution.MDNSInstanceID]; !ok {
instanceID = ""
}
err = m.registerMDNS(instanceID, appID, []string{hostAddress}, int(port))
err = m.registerMDNS("", appID, []string{hostAddress}, port)
if err != nil {
return err
}

View File

@ -42,30 +42,30 @@ func TestInitMetadata(t *testing.T) {
{
"name",
map[string]string{
nr.MDNSInstanceAddress: localhost,
nr.MDNSInstancePort: "30003",
nr.HostAddress: localhost,
nr.DaprPort: "30003",
},
},
{
"address",
map[string]string{
nr.MDNSInstanceName: "testAppID",
nr.MDNSInstancePort: "30003",
nr.AppID: "testAppID",
nr.DaprPort: "30003",
},
},
{
"port",
map[string]string{
nr.MDNSInstanceName: "testAppID",
nr.MDNSInstanceAddress: localhost,
nr.AppID: "testAppID",
nr.HostAddress: localhost,
},
},
{
"port",
map[string]string{
nr.MDNSInstanceName: "testAppID",
nr.MDNSInstanceAddress: localhost,
nr.MDNSInstancePort: "abcd",
nr.AppID: "testAppID",
nr.HostAddress: localhost,
nr.DaprPort: "abcd",
},
},
}
@ -90,9 +90,9 @@ func TestInitRegister(t *testing.T) {
resolver := NewResolver(logger.NewLogger("test")).(*Resolver)
defer resolver.Close()
md := nr.Metadata{Base: metadata.Base{Properties: map[string]string{
nr.MDNSInstanceName: "testAppID",
nr.MDNSInstanceAddress: localhost,
nr.MDNSInstancePort: "1234",
nr.AppID: "testAppID",
nr.HostAddress: localhost,
nr.DaprPort: "1234",
}}}
// act
@ -105,14 +105,14 @@ func TestInitRegisterDuplicate(t *testing.T) {
resolver := NewResolver(logger.NewLogger("test")).(*Resolver)
defer resolver.Close()
md := nr.Metadata{Base: metadata.Base{Properties: map[string]string{
nr.MDNSInstanceName: "testAppID",
nr.MDNSInstanceAddress: localhost,
nr.MDNSInstancePort: "1234",
nr.AppID: "testAppID",
nr.HostAddress: localhost,
nr.DaprPort: "1234",
}}}
md2 := nr.Metadata{Base: metadata.Base{Properties: map[string]string{
nr.MDNSInstanceName: "testAppID",
nr.MDNSInstanceAddress: localhost,
nr.MDNSInstancePort: "1234",
nr.AppID: "testAppID",
nr.HostAddress: localhost,
nr.DaprPort: "1234",
}}}
// act
@ -128,9 +128,9 @@ func TestResolver(t *testing.T) {
resolver := NewResolver(logger.NewLogger("test")).(*Resolver)
defer resolver.Close()
md := nr.Metadata{Base: metadata.Base{Properties: map[string]string{
nr.MDNSInstanceName: "testAppID",
nr.MDNSInstanceAddress: localhost,
nr.MDNSInstancePort: "1234",
nr.AppID: "testAppID",
nr.HostAddress: localhost,
nr.DaprPort: "1234",
}}}
// act
@ -149,9 +149,9 @@ func TestResolverClose(t *testing.T) {
// arrange
resolver := NewResolver(logger.NewLogger("test")).(*Resolver)
md := nr.Metadata{Base: metadata.Base{Properties: map[string]string{
nr.MDNSInstanceName: "testAppID",
nr.MDNSInstanceAddress: localhost,
nr.MDNSInstancePort: "1234",
nr.AppID: "testAppID",
nr.HostAddress: localhost,
nr.DaprPort: "1234",
}}}
// act
@ -181,34 +181,24 @@ func TestResolverMultipleInstances(t *testing.T) {
instanceAID := "A"
instanceAName := "testAppID"
instanceAAddress := localhost
instanceAPort := "1234"
instanceAPQDN := fmt.Sprintf("%s:%s", instanceAAddress, instanceAPort)
instanceAPort := 1234
instanceAPQDN := fmt.Sprintf("%s:%d", instanceAAddress, instanceAPort)
instanceA := nr.Metadata{Base: metadata.Base{Properties: map[string]string{
nr.MDNSInstanceName: instanceAName,
nr.MDNSInstanceAddress: instanceAAddress,
nr.MDNSInstancePort: instanceAPort,
nr.MDNSInstanceID: instanceAID,
}}}
err1 := resolver.Init(instanceA)
err1 := resolver.registerMDNS(instanceAID, instanceAName, []string{instanceAAddress}, instanceAPort)
require.NoError(t, err1)
// register instance B
instanceBID := "B"
instanceBName := "testAppID"
instanceBAddress := localhost
instanceBPort := "5678"
instanceBPQDN := fmt.Sprintf("%s:%s", instanceBAddress, instanceBPort)
instanceBPort := 5678
instanceBPQDN := fmt.Sprintf("%s:%d", instanceBAddress, instanceBPort)
instanceB := nr.Metadata{Base: metadata.Base{Properties: map[string]string{
nr.MDNSInstanceName: instanceBName,
nr.MDNSInstanceAddress: instanceBAddress,
nr.MDNSInstancePort: instanceBPort,
nr.MDNSInstanceID: instanceBID,
}}}
err2 := resolver.Init(instanceB)
err2 := resolver.registerMDNS(instanceBID, instanceBName, []string{instanceBAddress}, instanceBPort)
require.NoError(t, err2)
go resolver.startRefreshers()
// act...
request := nr.ResolveRequest{ID: "testAppID"}
@ -292,9 +282,9 @@ func ResolverConcurrencySubsriberClear(t *testing.T) {
resolver := NewResolver(logger.NewLogger("test")).(*Resolver)
defer resolver.Close()
md := nr.Metadata{Base: metadata.Base{Properties: map[string]string{
nr.MDNSInstanceName: "testAppID",
nr.MDNSInstanceAddress: localhost,
nr.MDNSInstancePort: "1234",
nr.AppID: "testAppID",
nr.HostAddress: localhost,
nr.DaprPort: "1234",
}}}
// act
@ -334,50 +324,34 @@ func ResolverConcurrencyFound(t *testing.T) {
appAID := "A"
appAName := "testAppA"
appAAddress := localhost
appAPort := "1234"
appAPQDN := fmt.Sprintf("%s:%s", appAAddress, appAPort)
appAPort := 1234
appABPQDN := fmt.Sprintf("%s:%d", appAAddress, appAPort)
appA := nr.Metadata{Base: metadata.Base{Properties: map[string]string{
nr.MDNSInstanceName: appAName,
nr.MDNSInstanceAddress: appAAddress,
nr.MDNSInstancePort: appAPort,
nr.MDNSInstanceID: appAID,
}}}
err1 := resolver.Init(appA)
err1 := resolver.registerMDNS(appAID, appAName, []string{appAAddress}, appAPort)
require.NoError(t, err1)
// register instance B
appBID := "B"
appBName := "testAppB"
appBAddress := localhost
appBPort := "5678"
appBBPQDN := fmt.Sprintf("%s:%s", appBAddress, appBPort)
appBPort := 5678
appBBPQDN := fmt.Sprintf("%s:%d", appBAddress, appBPort)
appB := nr.Metadata{Base: metadata.Base{Properties: map[string]string{
nr.MDNSInstanceName: appBName,
nr.MDNSInstanceAddress: appBAddress,
nr.MDNSInstancePort: appBPort,
nr.MDNSInstanceID: appBID,
}}}
err2 := resolver.Init(appB)
err2 := resolver.registerMDNS(appBID, appBName, []string{appBAddress}, appBPort)
require.NoError(t, err2)
// register instance C
appCID := "C"
appCName := "testAppC"
appCAddress := localhost
appCPort := "3456"
appCBPQDN := fmt.Sprintf("%s:%s", appCAddress, appCPort)
appCPort := 3456
appCBPQDN := fmt.Sprintf("%s:%d", appCAddress, appCPort)
appC := nr.Metadata{Base: metadata.Base{Properties: map[string]string{
nr.MDNSInstanceName: appCName,
nr.MDNSInstanceAddress: appCAddress,
nr.MDNSInstancePort: appCPort,
nr.MDNSInstanceID: appCID,
}}}
err3 := resolver.Init(appC)
err3 := resolver.registerMDNS(appCID, appCName, []string{appCAddress}, appCPort)
require.NoError(t, err3)
go resolver.startRefreshers()
// act...
wg := sync.WaitGroup{}
for i := 0; i < numConcurrency; i++ {
@ -402,7 +376,7 @@ func ResolverConcurrencyFound(t *testing.T) {
// assert
require.NoError(t, err)
if r == 0 {
assert.Equal(t, appAPQDN, pt)
assert.Equal(t, appABPQDN, pt)
} else if r == 1 {
assert.Equal(t, appBBPQDN, pt)
} else if r == 2 {

View File

@ -16,6 +16,8 @@ package nameresolution
import "github.com/dapr/components-contrib/metadata"
const (
// TODO: REMOVE THESE AFTER RUNTIME IS CHANGED
// MDNSInstanceName is the instance name which is broadcasted.
MDNSInstanceName string = "name"
// MDNSInstanceAddress is the address of the instance.

View File

@ -1,183 +0,0 @@
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package hazelcast
import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"time"
"github.com/cenkalti/backoff/v4"
"github.com/hazelcast/hazelcast-go-client"
hazelcastCore "github.com/hazelcast/hazelcast-go-client/core"
"github.com/dapr/components-contrib/pubsub"
"github.com/dapr/kit/logger"
"github.com/dapr/kit/retry"
)
const (
hazelcastServers = "hazelcastServers"
hazelcastBackOffMaxRetries = "backOffMaxRetries"
)
type Hazelcast struct {
client hazelcast.Client
logger logger.Logger
metadata metadata
}
// NewHazelcastPubSub returns a new hazelcast pub-sub implementation.
func NewHazelcastPubSub(logger logger.Logger) pubsub.PubSub {
return &Hazelcast{logger: logger}
}
func parseHazelcastMetadata(meta pubsub.Metadata) (metadata, error) {
m := metadata{}
if val, ok := meta.Properties[hazelcastServers]; ok && val != "" {
m.hazelcastServers = val
} else {
return m, errors.New("hazelcast error: missing hazelcast servers")
}
if val, ok := meta.Properties[hazelcastBackOffMaxRetries]; ok && val != "" {
backOffMaxRetriesInt, err := strconv.Atoi(val)
if err != nil {
return m, fmt.Errorf("hazelcast error: invalid backOffMaxRetries %s, %v", val, err)
}
m.backOffMaxRetries = backOffMaxRetriesInt
}
return m, nil
}
func (p *Hazelcast) Init(ctx context.Context, metadata pubsub.Metadata) error {
p.logger.Warnf("DEPRECATION NOTICE: Component pubsub.hazelcast has been deprecated and will be removed in a future Dapr release.")
m, err := parseHazelcastMetadata(metadata)
if err != nil {
return err
}
p.metadata = m
hzConfig := hazelcast.NewConfig()
servers := m.hazelcastServers
hzConfig.NetworkConfig().AddAddress(strings.Split(servers, ",")...)
p.client, err = hazelcast.NewClientWithConfig(hzConfig)
if err != nil {
return fmt.Errorf("hazelcast error: failed to create new client, %v", err)
}
return nil
}
func (p *Hazelcast) Publish(ctx context.Context, req *pubsub.PublishRequest) error {
topic, err := p.client.GetTopic(req.Topic)
if err != nil {
return fmt.Errorf("hazelcast error: failed to get topic for %s", req.Topic)
}
if err = topic.Publish(req.Data); err != nil {
return fmt.Errorf("hazelcast error: failed to publish data, %v", err)
}
return nil
}
func (p *Hazelcast) Subscribe(subscribeCtx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
topic, err := p.client.GetTopic(req.Topic)
if err != nil {
return fmt.Errorf("hazelcast error: failed to get topic for %s", req.Topic)
}
listenerID, err := topic.AddMessageListener(&hazelcastMessageListener{
p: p,
ctx: subscribeCtx,
topicName: topic.Name(),
pubsubHandler: handler,
})
if err != nil {
return fmt.Errorf("hazelcast error: failed to add new listener, %v", err)
}
// Wait for context cancelation then remove the listener
go func() {
<-subscribeCtx.Done()
topic.RemoveMessageListener(listenerID)
}()
return nil
}
func (p *Hazelcast) Close() error {
p.client.Shutdown()
return nil
}
func (p *Hazelcast) Features() []pubsub.Feature {
return nil
}
type hazelcastMessageListener struct {
p *Hazelcast
ctx context.Context
topicName string
pubsubHandler pubsub.Handler
}
func (l *hazelcastMessageListener) OnMessage(message hazelcastCore.Message) error {
msg, ok := message.MessageObject().([]byte)
if !ok {
return errors.New("hazelcast error: cannot cast message to byte array")
}
if err := l.handleMessageObject(msg); err != nil {
l.p.logger.Error("Failure processing Hazelcast message")
return err
}
return nil
}
func (l *hazelcastMessageListener) handleMessageObject(message []byte) error {
pubsubMsg := pubsub.NewMessage{
Data: message,
Topic: l.topicName,
}
// TODO: See https://github.com/dapr/components-contrib/issues/1808
// This component has built-in retries because Hazelcast doesn't support N/ACK for pubsub (it delivers messages "once" and not "at least once")
var b backoff.BackOff = backoff.NewConstantBackOff(5 * time.Second)
b = backoff.WithContext(b, l.ctx)
if l.p.metadata.backOffMaxRetries >= 0 {
b = backoff.WithMaxRetries(b, uint64(l.p.metadata.backOffMaxRetries))
}
return retry.NotifyRecover(func() error {
l.p.logger.Debug("Processing Hazelcast message")
return l.pubsubHandler(l.ctx, &pubsubMsg)
}, b, func(err error, d time.Duration) {
l.p.logger.Error("Error processing Hazelcast message. Retrying...")
}, func() {
l.p.logger.Info("Successfully processed Hazelcast message after it previously failed")
})
}

View File

@ -1,19 +0,0 @@
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package hazelcast
type metadata struct {
hazelcastServers string
backOffMaxRetries int
}

View File

@ -1,39 +0,0 @@
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package hazelcast
import (
"testing"
"github.com/stretchr/testify/assert"
mdata "github.com/dapr/components-contrib/metadata"
"github.com/dapr/components-contrib/pubsub"
)
func TestValidateMetadata(t *testing.T) {
t.Run("return error when required servers is empty", func(t *testing.T) {
fakeMetaData := pubsub.Metadata{Base: mdata.Base{
Properties: map[string]string{
hazelcastServers: "",
},
}}
m, err := parseHazelcastMetadata(fakeMetaData)
// assert
assert.Error(t, err)
assert.Empty(t, m)
})
}

View File

@ -1,12 +0,0 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.hazelcast
version: v1
metadata:
- name: hazelcastServers
value: "localhost:5701"
- name: backOffMaxRetries
value: 3

View File

@ -71,8 +71,6 @@ components:
- component: mqtt3
profile: vernemq
operations: ['publish', 'subscribe', 'multiplehandlers']
- component: hazelcast
operations: ['publish', 'subscribe', 'multiplehandlers']
- component: rabbitmq
operations: ['publish', 'subscribe', 'multiplehandlers']
config:

View File

@ -62,7 +62,6 @@ import (
p_eventhubs "github.com/dapr/components-contrib/pubsub/azure/eventhubs"
p_servicebusqueues "github.com/dapr/components-contrib/pubsub/azure/servicebus/queues"
p_servicebustopics "github.com/dapr/components-contrib/pubsub/azure/servicebus/topics"
p_hazelcast "github.com/dapr/components-contrib/pubsub/hazelcast"
p_inmemory "github.com/dapr/components-contrib/pubsub/in-memory"
p_jetstream "github.com/dapr/components-contrib/pubsub/jetstream"
p_kafka "github.com/dapr/components-contrib/pubsub/kafka"
@ -480,8 +479,6 @@ func loadPubSub(tc TestComponent) pubsub.PubSub {
pubsub = p_pulsar.NewPulsar(testLogger)
case "mqtt3":
pubsub = p_mqtt3.NewMQTTPubSub(testLogger)
case "hazelcast":
pubsub = p_hazelcast.NewHazelcastPubSub(testLogger)
case "rabbitmq":
pubsub = p_rabbitmq.NewRabbitMQ(testLogger)
case "in-memory":