Merge branch 'master' into consul-improv

This commit is contained in:
Bernd Verst 2023-02-24 14:19:55 -08:00 committed by GitHub
commit 4c6a87c7df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 62 additions and 749 deletions

View File

@ -1,6 +0,0 @@
version: '2'
services:
hazelcast:
image: hazelcast/hazelcast:3.12.12-1
ports:
- 5701:5701

View File

@ -208,10 +208,6 @@ const components = {
'AzureCertificationServicePrincipalClientSecret',
],
},
'pubsub.hazelcast': {
conformance: true,
conformanceSetup: 'docker-compose.sh hazelcast',
},
'pubsub.in-memory': {
conformance: true,
},

View File

@ -1,229 +0,0 @@
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package twitter
//nolint:staticcheck
import (
"context"
"encoding/json"
"errors"
"fmt"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/dghubble/go-twitter/twitter"
"github.com/dghubble/oauth1"
"github.com/dapr/components-contrib/bindings"
"github.com/dapr/kit/logger"
)
// Binding represents Twitter input/output binding.
type Binding struct {
client *twitter.Client
query string
logger logger.Logger
closed atomic.Bool
closeCh chan struct{}
wg sync.WaitGroup
}
// NewTwitter returns a new Twitter event input binding.
func NewTwitter(logger logger.Logger) bindings.InputOutputBinding {
return &Binding{logger: logger, closeCh: make(chan struct{})}
}
// Init initializes the Twitter binding.
func (t *Binding) Init(ctx context.Context, metadata bindings.Metadata) error {
t.logger.Warnf("DEPRECATION NOTICE: Component bindings.twitter has been deprecated and will be removed in a future Dapr release.")
ck, f := metadata.Properties["consumerKey"]
if !f || ck == "" {
return fmt.Errorf("consumerKey not set")
}
cs, f := metadata.Properties["consumerSecret"]
if !f || cs == "" {
return fmt.Errorf("consumerSecret not set")
}
at, f := metadata.Properties["accessToken"]
if !f || at == "" {
return fmt.Errorf("accessToken not set")
}
as, f := metadata.Properties["accessSecret"]
if !f || as == "" {
return fmt.Errorf("accessSecret not set")
}
// set query only in an input binding case
q, f := metadata.Properties["query"]
if f {
t.query = q
}
config := oauth1.NewConfig(ck, cs)
token := oauth1.NewToken(at, as)
httpClient := config.Client(oauth1.NoContext, token)
t.client = twitter.NewClient(httpClient)
return nil
}
// Operations returns list of operations supported by twitter binding.
func (t *Binding) Operations() []bindings.OperationKind {
return []bindings.OperationKind{bindings.GetOperation}
}
// Read triggers the Twitter search and events on each result tweet.
func (t *Binding) Read(ctx context.Context, handler bindings.Handler) error {
if t.query == "" {
return errors.New("metadata property 'query' is empty")
}
demux := twitter.NewSwitchDemux()
demux.Tweet = func(tweet *twitter.Tweet) {
t.logger.Debugf("raw tweet: %+v", tweet)
data, marshalErr := json.Marshal(tweet)
if marshalErr != nil {
t.logger.Errorf("error marshaling tweet: %+v", tweet)
return
}
handler(ctx, &bindings.ReadResponse{
Data: data,
Metadata: map[string]string{
"query": t.query,
},
})
}
demux.StreamLimit = func(limit *twitter.StreamLimit) {
t.logger.Warnf("disconnect: %+v", limit)
}
demux.StreamDisconnect = func(disconnect *twitter.StreamDisconnect) {
t.logger.Errorf("stream disconnect: %+v", disconnect)
}
filterParams := &twitter.StreamFilterParams{
Track: []string{t.query},
StallWarnings: twitter.Bool(true),
}
t.logger.Debug("starting stream for query: %s", t.query)
stream, err := t.client.Streams.Filter(filterParams)
if err != nil {
return fmt.Errorf("error executing stream filter '%+v': %w", filterParams, err)
}
t.logger.Debug("starting handler...")
t.wg.Add(2)
go func() {
defer t.wg.Done()
demux.HandleChan(stream.Messages)
}()
go func() {
defer t.wg.Done()
select {
case <-t.closeCh:
case <-ctx.Done():
}
t.logger.Debug("stopping handler...")
stream.Stop()
}()
return nil
}
func (t *Binding) Close() error {
if t.closed.CompareAndSwap(false, true) {
close(t.closeCh)
}
t.wg.Wait()
return nil
}
// Invoke handles all operations.
func (t *Binding) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
t.logger.Debugf("operation: %v", req.Operation)
if req.Metadata == nil {
return nil, fmt.Errorf("metadata not set")
}
// required
q, f := req.Metadata["query"]
if !f || q == "" {
return nil, fmt.Errorf("query not set")
}
// optionals
l, f := req.Metadata["lang"]
if !f || l == "" {
l = "en"
}
r, f := req.Metadata["result"]
if !f || r == "" {
// mixed : Include both popular and real time results in the response
// recent : return only the most recent results in the response
// popular : return only the most popular results in the response
r = "recent"
}
var sinceID int64
s, f := req.Metadata["since_id"]
if f && s != "" {
i, err := strconv.ParseInt(s, 10, 64)
if err == nil {
sinceID = i
}
}
sq := &twitter.SearchTweetParams{
Count: 100, // max
Lang: l,
SinceID: sinceID,
Query: q,
ResultType: r,
IncludeEntities: twitter.Bool(true),
}
t.logger.Debug("starting stream for: %+v", sq)
search, _, err := t.client.Search.Tweets(sq)
if err != nil {
return nil, fmt.Errorf("error executing search filter '%+v': %w", sq, err)
}
if search == nil || search.Statuses == nil {
return nil, fmt.Errorf("nil search result from '%+v'", sq)
}
t.logger.Debugf("raw response: %+v", search.Statuses)
data, marshalErr := json.Marshal(search.Statuses)
if marshalErr != nil {
t.logger.Errorf("error marshaling tweet: %v", marshalErr)
return nil, fmt.Errorf("error parsing response from '%+v': %w", sq, marshalErr)
}
req.Metadata["max_tweet_id"] = search.Metadata.MaxIDStr
req.Metadata["tweet_count"] = strconv.Itoa(search.Metadata.Count)
req.Metadata["search_ts"] = time.Now().UTC().String()
ir := &bindings.InvokeResponse{
Data: data,
Metadata: req.Metadata,
}
return ir, nil
}

View File

@ -1,149 +0,0 @@
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package twitter
//nolint:staticcheck
import (
"context"
"encoding/json"
"os"
"testing"
"time"
"github.com/dghubble/go-twitter/twitter"
"github.com/stretchr/testify/assert"
"github.com/dapr/components-contrib/bindings"
"github.com/dapr/kit/logger"
)
const (
testTwitterConsumerKey = "test-consumerKey"
testTwitterConsumerSecret = "test-consumerSecret"
testTwitterAccessToken = "test-accessToken"
testTwitterAccessSecret = "test-accessSecret"
)
func getTestMetadata() bindings.Metadata {
m := bindings.Metadata{}
m.Properties = map[string]string{
"consumerKey": testTwitterConsumerKey,
"consumerSecret": testTwitterConsumerSecret,
"accessToken": testTwitterAccessToken,
"accessSecret": testTwitterAccessSecret,
}
return m
}
func getRuntimeMetadata() map[string]string {
return map[string]string{
"consumerKey": os.Getenv("CONSUMER_KEY"),
"consumerSecret": os.Getenv("CONSUMER_SECRET"),
"accessToken": os.Getenv("ACCESS_TOKEN"),
"accessSecret": os.Getenv("ACCESS_SECRET"),
}
}
// go test -v -count=1 ./bindings/twitter/.
func TestInit(t *testing.T) {
m := getTestMetadata()
tw := NewTwitter(logger.NewLogger("test")).(*Binding)
err := tw.Init(context.Background(), m)
assert.Nilf(t, err, "error initializing valid metadata properties")
}
// TestReadError excutes the Read method and fails before the Twitter API call
// go test -v -count=1 -run TestReadError ./bindings/twitter/.
func TestReadError(t *testing.T) {
tw := NewTwitter(logger.NewLogger("test")).(*Binding)
m := getTestMetadata()
err := tw.Init(context.Background(), m)
assert.Nilf(t, err, "error initializing valid metadata properties")
err = tw.Read(context.Background(), func(ctx context.Context, res *bindings.ReadResponse) ([]byte, error) {
t.Logf("result: %+v", res)
assert.NotNilf(t, err, "no error on read with invalid credentials")
return nil, nil
})
assert.Error(t, err)
assert.NoError(t, tw.Close())
}
// TestRead executes the Read method which calls Twiter API
// env RUN_LIVE_TW_TEST=true go test -v -count=1 -run TestRead ./bindings/twitter/.
func TestRead(t *testing.T) {
if os.Getenv("RUN_LIVE_TW_TEST") != "true" {
t.SkipNow() // skip this test until able to read credentials in test infra
}
m := bindings.Metadata{}
m.Properties = getRuntimeMetadata()
// add query
m.Properties["query"] = "microsoft"
tw := NewTwitter(logger.NewLogger("test")).(*Binding)
tw.logger.SetOutputLevel(logger.DebugLevel)
err := tw.Init(context.Background(), m)
assert.Nilf(t, err, "error initializing read")
ctx, cancel := context.WithCancel(context.Background())
counter := 0
err = tw.Read(ctx, func(ctx context.Context, res *bindings.ReadResponse) ([]byte, error) {
counter++
t.Logf("tweet[%d]", counter)
var tweet twitter.Tweet
json.Unmarshal(res.Data, &tweet)
assert.NotEmpty(t, tweet.IDStr, "tweet should have an ID")
cancel()
return nil, nil
})
assert.Nilf(t, err, "error on read")
select {
case <-ctx.Done():
// do nothing
case <-time.After(30 * time.Second):
cancel()
t.Fatal("Timeout waiting for messages")
}
assert.NoError(t, tw.Close())
}
// TestInvoke executes the Invoke method which calls Twiter API
// test tokens must be set
// env RUN_LIVE_TW_TEST=true go test -v -count=1 -run TestInvoke ./bindings/twitter/.
func TestInvoke(t *testing.T) {
if os.Getenv("RUN_LIVE_TW_TEST") != "true" {
t.SkipNow() // skip this test until able to read credentials in test infra
}
m := bindings.Metadata{}
m.Properties = getRuntimeMetadata()
tw := NewTwitter(logger.NewLogger("test")).(*Binding)
tw.logger.SetOutputLevel(logger.DebugLevel)
err := tw.Init(context.Background(), m)
assert.Nilf(t, err, "error initializing Invoke")
req := &bindings.InvokeRequest{
Metadata: map[string]string{
"query": "microsoft",
},
}
resp, err := tw.Invoke(context.Background(), req)
assert.Nilf(t, err, "error on invoke")
assert.NotNil(t, resp)
assert.NoError(t, tw.Close())
}

3
go.mod
View File

@ -50,8 +50,6 @@ require (
github.com/dancannon/gorethink v4.0.0+incompatible
github.com/dapr/kit v0.0.4
github.com/denisenkom/go-mssqldb v0.12.3
github.com/dghubble/go-twitter v0.0.0-20221104224141-912508c3888b
github.com/dghubble/oauth1 v0.7.2
github.com/didip/tollbooth/v7 v7.0.1
github.com/eclipse/paho.mqtt.golang v1.4.2
github.com/fasthttp-contrib/sessions v0.0.0-20160905201309-74f6ac73d5d5
@ -182,7 +180,6 @@ require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect
github.com/deepmap/oapi-codegen v1.3.6 // indirect
github.com/dghubble/sling v1.4.0 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/dimchansky/utfbom v1.1.1 // indirect
github.com/dubbogo/gost v1.13.1 // indirect

6
go.sum
View File

@ -723,12 +723,6 @@ github.com/deepmap/oapi-codegen v1.3.6 h1:Wj44p9A0V0PJ+AUg0BWdyGcsS1LY18U+0rCuPQ
github.com/deepmap/oapi-codegen v1.3.6/go.mod h1:aBozjEveG+33xPiP55Iw/XbVkhtZHEGLq3nxlX0+hfU=
github.com/denisenkom/go-mssqldb v0.12.3 h1:pBSGx9Tq67pBOTLmxNuirNTeB8Vjmf886Kx+8Y+8shw=
github.com/denisenkom/go-mssqldb v0.12.3/go.mod h1:k0mtMFOnU+AihqFxPMiF05rtiDrorD1Vrm1KEz5hxDo=
github.com/dghubble/go-twitter v0.0.0-20221104224141-912508c3888b h1:XQu6o3AwJx/jsg9LZ41uIeUdXK5be099XFfFn6H9ikk=
github.com/dghubble/go-twitter v0.0.0-20221104224141-912508c3888b/go.mod h1:B0/qdW5XUupJvcsx40hnVbfjzz9He5YpYXx6eVVdiSY=
github.com/dghubble/oauth1 v0.7.2 h1:pwcinOZy8z6XkNxvPmUDY52M7RDPxt0Xw1zgZ6Cl5JA=
github.com/dghubble/oauth1 v0.7.2/go.mod h1:9erQdIhqhOHG/7K9s/tgh9Ks/AfoyrO5mW/43Lu2+kE=
github.com/dghubble/sling v1.4.0 h1:/n8MRosVTthvMbwlNZgLx579OGVjUOy3GNEv5BIqAWY=
github.com/dghubble/sling v1.4.0/go.mod h1:0r40aNsU9EdDUVBNhfCstAtFgutjgJGYbO1oNzkMoM8=
github.com/dgraph-io/badger/v3 v3.2103.5 h1:ylPa6qzbjYRQMU6jokoj4wzcaweHylt//CH0AKt0akg=
github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWajOK8=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=

View File

@ -238,9 +238,10 @@ func (m *Resolver) startRefreshers() {
// refresh app addresses periodically and on demand
go func() {
defer func() {
m.refreshRunning.Store(false)
}()
defer m.refreshRunning.Store(false)
t := time.NewTicker(refreshInterval)
defer t.Stop()
for {
select {
@ -252,7 +253,7 @@ func (m *Resolver) startRefreshers() {
}
}()
// Refresh periodically
case <-time.After(refreshInterval):
case <-t.C:
go func() {
if err := m.refreshAllApps(m.runCtx); err != nil {
m.logger.Warnf(err.Error())
@ -269,37 +270,28 @@ func (m *Resolver) startRefreshers() {
// Init registers service for mDNS.
func (m *Resolver) Init(metadata nameresolution.Metadata) error {
var (
appID string
hostAddress string
ok bool
instanceID string
)
props := metadata.Properties
if appID, ok = props[nameresolution.MDNSInstanceName]; !ok {
appID := props[nameresolution.AppID]
if appID == "" {
return errors.New("name is missing")
}
if hostAddress, ok = props[nameresolution.MDNSInstanceAddress]; !ok {
hostAddress := props[nameresolution.HostAddress]
if hostAddress == "" {
return errors.New("address is missing")
}
p, ok := props[nameresolution.MDNSInstancePort]
if !ok {
if props[nameresolution.DaprPort] == "" {
return errors.New("port is missing")
}
port, err := strconv.ParseInt(p, 10, 32)
port, err := strconv.Atoi(props[nameresolution.DaprPort])
if err != nil {
return errors.New("port is invalid")
}
if instanceID, ok = props[nameresolution.MDNSInstanceID]; !ok {
instanceID = ""
}
err = m.registerMDNS(instanceID, appID, []string{hostAddress}, int(port))
err = m.registerMDNS("", appID, []string{hostAddress}, port)
if err != nil {
return err
}

View File

@ -42,30 +42,30 @@ func TestInitMetadata(t *testing.T) {
{
"name",
map[string]string{
nr.MDNSInstanceAddress: localhost,
nr.MDNSInstancePort: "30003",
nr.HostAddress: localhost,
nr.DaprPort: "30003",
},
},
{
"address",
map[string]string{
nr.MDNSInstanceName: "testAppID",
nr.MDNSInstancePort: "30003",
nr.AppID: "testAppID",
nr.DaprPort: "30003",
},
},
{
"port",
map[string]string{
nr.MDNSInstanceName: "testAppID",
nr.MDNSInstanceAddress: localhost,
nr.AppID: "testAppID",
nr.HostAddress: localhost,
},
},
{
"port",
map[string]string{
nr.MDNSInstanceName: "testAppID",
nr.MDNSInstanceAddress: localhost,
nr.MDNSInstancePort: "abcd",
nr.AppID: "testAppID",
nr.HostAddress: localhost,
nr.DaprPort: "abcd",
},
},
}
@ -90,9 +90,9 @@ func TestInitRegister(t *testing.T) {
resolver := NewResolver(logger.NewLogger("test")).(*Resolver)
defer resolver.Close()
md := nr.Metadata{Base: metadata.Base{Properties: map[string]string{
nr.MDNSInstanceName: "testAppID",
nr.MDNSInstanceAddress: localhost,
nr.MDNSInstancePort: "1234",
nr.AppID: "testAppID",
nr.HostAddress: localhost,
nr.DaprPort: "1234",
}}}
// act
@ -105,14 +105,14 @@ func TestInitRegisterDuplicate(t *testing.T) {
resolver := NewResolver(logger.NewLogger("test")).(*Resolver)
defer resolver.Close()
md := nr.Metadata{Base: metadata.Base{Properties: map[string]string{
nr.MDNSInstanceName: "testAppID",
nr.MDNSInstanceAddress: localhost,
nr.MDNSInstancePort: "1234",
nr.AppID: "testAppID",
nr.HostAddress: localhost,
nr.DaprPort: "1234",
}}}
md2 := nr.Metadata{Base: metadata.Base{Properties: map[string]string{
nr.MDNSInstanceName: "testAppID",
nr.MDNSInstanceAddress: localhost,
nr.MDNSInstancePort: "1234",
nr.AppID: "testAppID",
nr.HostAddress: localhost,
nr.DaprPort: "1234",
}}}
// act
@ -128,9 +128,9 @@ func TestResolver(t *testing.T) {
resolver := NewResolver(logger.NewLogger("test")).(*Resolver)
defer resolver.Close()
md := nr.Metadata{Base: metadata.Base{Properties: map[string]string{
nr.MDNSInstanceName: "testAppID",
nr.MDNSInstanceAddress: localhost,
nr.MDNSInstancePort: "1234",
nr.AppID: "testAppID",
nr.HostAddress: localhost,
nr.DaprPort: "1234",
}}}
// act
@ -149,9 +149,9 @@ func TestResolverClose(t *testing.T) {
// arrange
resolver := NewResolver(logger.NewLogger("test")).(*Resolver)
md := nr.Metadata{Base: metadata.Base{Properties: map[string]string{
nr.MDNSInstanceName: "testAppID",
nr.MDNSInstanceAddress: localhost,
nr.MDNSInstancePort: "1234",
nr.AppID: "testAppID",
nr.HostAddress: localhost,
nr.DaprPort: "1234",
}}}
// act
@ -181,34 +181,24 @@ func TestResolverMultipleInstances(t *testing.T) {
instanceAID := "A"
instanceAName := "testAppID"
instanceAAddress := localhost
instanceAPort := "1234"
instanceAPQDN := fmt.Sprintf("%s:%s", instanceAAddress, instanceAPort)
instanceAPort := 1234
instanceAPQDN := fmt.Sprintf("%s:%d", instanceAAddress, instanceAPort)
instanceA := nr.Metadata{Base: metadata.Base{Properties: map[string]string{
nr.MDNSInstanceName: instanceAName,
nr.MDNSInstanceAddress: instanceAAddress,
nr.MDNSInstancePort: instanceAPort,
nr.MDNSInstanceID: instanceAID,
}}}
err1 := resolver.Init(instanceA)
err1 := resolver.registerMDNS(instanceAID, instanceAName, []string{instanceAAddress}, instanceAPort)
require.NoError(t, err1)
// register instance B
instanceBID := "B"
instanceBName := "testAppID"
instanceBAddress := localhost
instanceBPort := "5678"
instanceBPQDN := fmt.Sprintf("%s:%s", instanceBAddress, instanceBPort)
instanceBPort := 5678
instanceBPQDN := fmt.Sprintf("%s:%d", instanceBAddress, instanceBPort)
instanceB := nr.Metadata{Base: metadata.Base{Properties: map[string]string{
nr.MDNSInstanceName: instanceBName,
nr.MDNSInstanceAddress: instanceBAddress,
nr.MDNSInstancePort: instanceBPort,
nr.MDNSInstanceID: instanceBID,
}}}
err2 := resolver.Init(instanceB)
err2 := resolver.registerMDNS(instanceBID, instanceBName, []string{instanceBAddress}, instanceBPort)
require.NoError(t, err2)
go resolver.startRefreshers()
// act...
request := nr.ResolveRequest{ID: "testAppID"}
@ -292,9 +282,9 @@ func ResolverConcurrencySubsriberClear(t *testing.T) {
resolver := NewResolver(logger.NewLogger("test")).(*Resolver)
defer resolver.Close()
md := nr.Metadata{Base: metadata.Base{Properties: map[string]string{
nr.MDNSInstanceName: "testAppID",
nr.MDNSInstanceAddress: localhost,
nr.MDNSInstancePort: "1234",
nr.AppID: "testAppID",
nr.HostAddress: localhost,
nr.DaprPort: "1234",
}}}
// act
@ -334,50 +324,34 @@ func ResolverConcurrencyFound(t *testing.T) {
appAID := "A"
appAName := "testAppA"
appAAddress := localhost
appAPort := "1234"
appAPQDN := fmt.Sprintf("%s:%s", appAAddress, appAPort)
appAPort := 1234
appABPQDN := fmt.Sprintf("%s:%d", appAAddress, appAPort)
appA := nr.Metadata{Base: metadata.Base{Properties: map[string]string{
nr.MDNSInstanceName: appAName,
nr.MDNSInstanceAddress: appAAddress,
nr.MDNSInstancePort: appAPort,
nr.MDNSInstanceID: appAID,
}}}
err1 := resolver.Init(appA)
err1 := resolver.registerMDNS(appAID, appAName, []string{appAAddress}, appAPort)
require.NoError(t, err1)
// register instance B
appBID := "B"
appBName := "testAppB"
appBAddress := localhost
appBPort := "5678"
appBBPQDN := fmt.Sprintf("%s:%s", appBAddress, appBPort)
appBPort := 5678
appBBPQDN := fmt.Sprintf("%s:%d", appBAddress, appBPort)
appB := nr.Metadata{Base: metadata.Base{Properties: map[string]string{
nr.MDNSInstanceName: appBName,
nr.MDNSInstanceAddress: appBAddress,
nr.MDNSInstancePort: appBPort,
nr.MDNSInstanceID: appBID,
}}}
err2 := resolver.Init(appB)
err2 := resolver.registerMDNS(appBID, appBName, []string{appBAddress}, appBPort)
require.NoError(t, err2)
// register instance C
appCID := "C"
appCName := "testAppC"
appCAddress := localhost
appCPort := "3456"
appCBPQDN := fmt.Sprintf("%s:%s", appCAddress, appCPort)
appCPort := 3456
appCBPQDN := fmt.Sprintf("%s:%d", appCAddress, appCPort)
appC := nr.Metadata{Base: metadata.Base{Properties: map[string]string{
nr.MDNSInstanceName: appCName,
nr.MDNSInstanceAddress: appCAddress,
nr.MDNSInstancePort: appCPort,
nr.MDNSInstanceID: appCID,
}}}
err3 := resolver.Init(appC)
err3 := resolver.registerMDNS(appCID, appCName, []string{appCAddress}, appCPort)
require.NoError(t, err3)
go resolver.startRefreshers()
// act...
wg := sync.WaitGroup{}
for i := 0; i < numConcurrency; i++ {
@ -402,7 +376,7 @@ func ResolverConcurrencyFound(t *testing.T) {
// assert
require.NoError(t, err)
if r == 0 {
assert.Equal(t, appAPQDN, pt)
assert.Equal(t, appABPQDN, pt)
} else if r == 1 {
assert.Equal(t, appBBPQDN, pt)
} else if r == 2 {

View File

@ -16,6 +16,8 @@ package nameresolution
import "github.com/dapr/components-contrib/metadata"
const (
// TODO: REMOVE THESE AFTER RUNTIME IS CHANGED
// MDNSInstanceName is the instance name which is broadcasted.
MDNSInstanceName string = "name"
// MDNSInstanceAddress is the address of the instance.

View File

@ -1,183 +0,0 @@
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package hazelcast
import (
"context"
"errors"
"fmt"
"strconv"
"strings"
"time"
"github.com/cenkalti/backoff/v4"
"github.com/hazelcast/hazelcast-go-client"
hazelcastCore "github.com/hazelcast/hazelcast-go-client/core"
"github.com/dapr/components-contrib/pubsub"
"github.com/dapr/kit/logger"
"github.com/dapr/kit/retry"
)
const (
hazelcastServers = "hazelcastServers"
hazelcastBackOffMaxRetries = "backOffMaxRetries"
)
type Hazelcast struct {
client hazelcast.Client
logger logger.Logger
metadata metadata
}
// NewHazelcastPubSub returns a new hazelcast pub-sub implementation.
func NewHazelcastPubSub(logger logger.Logger) pubsub.PubSub {
return &Hazelcast{logger: logger}
}
func parseHazelcastMetadata(meta pubsub.Metadata) (metadata, error) {
m := metadata{}
if val, ok := meta.Properties[hazelcastServers]; ok && val != "" {
m.hazelcastServers = val
} else {
return m, errors.New("hazelcast error: missing hazelcast servers")
}
if val, ok := meta.Properties[hazelcastBackOffMaxRetries]; ok && val != "" {
backOffMaxRetriesInt, err := strconv.Atoi(val)
if err != nil {
return m, fmt.Errorf("hazelcast error: invalid backOffMaxRetries %s, %v", val, err)
}
m.backOffMaxRetries = backOffMaxRetriesInt
}
return m, nil
}
func (p *Hazelcast) Init(ctx context.Context, metadata pubsub.Metadata) error {
p.logger.Warnf("DEPRECATION NOTICE: Component pubsub.hazelcast has been deprecated and will be removed in a future Dapr release.")
m, err := parseHazelcastMetadata(metadata)
if err != nil {
return err
}
p.metadata = m
hzConfig := hazelcast.NewConfig()
servers := m.hazelcastServers
hzConfig.NetworkConfig().AddAddress(strings.Split(servers, ",")...)
p.client, err = hazelcast.NewClientWithConfig(hzConfig)
if err != nil {
return fmt.Errorf("hazelcast error: failed to create new client, %v", err)
}
return nil
}
func (p *Hazelcast) Publish(ctx context.Context, req *pubsub.PublishRequest) error {
topic, err := p.client.GetTopic(req.Topic)
if err != nil {
return fmt.Errorf("hazelcast error: failed to get topic for %s", req.Topic)
}
if err = topic.Publish(req.Data); err != nil {
return fmt.Errorf("hazelcast error: failed to publish data, %v", err)
}
return nil
}
func (p *Hazelcast) Subscribe(subscribeCtx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
topic, err := p.client.GetTopic(req.Topic)
if err != nil {
return fmt.Errorf("hazelcast error: failed to get topic for %s", req.Topic)
}
listenerID, err := topic.AddMessageListener(&hazelcastMessageListener{
p: p,
ctx: subscribeCtx,
topicName: topic.Name(),
pubsubHandler: handler,
})
if err != nil {
return fmt.Errorf("hazelcast error: failed to add new listener, %v", err)
}
// Wait for context cancelation then remove the listener
go func() {
<-subscribeCtx.Done()
topic.RemoveMessageListener(listenerID)
}()
return nil
}
func (p *Hazelcast) Close() error {
p.client.Shutdown()
return nil
}
func (p *Hazelcast) Features() []pubsub.Feature {
return nil
}
type hazelcastMessageListener struct {
p *Hazelcast
ctx context.Context
topicName string
pubsubHandler pubsub.Handler
}
func (l *hazelcastMessageListener) OnMessage(message hazelcastCore.Message) error {
msg, ok := message.MessageObject().([]byte)
if !ok {
return errors.New("hazelcast error: cannot cast message to byte array")
}
if err := l.handleMessageObject(msg); err != nil {
l.p.logger.Error("Failure processing Hazelcast message")
return err
}
return nil
}
func (l *hazelcastMessageListener) handleMessageObject(message []byte) error {
pubsubMsg := pubsub.NewMessage{
Data: message,
Topic: l.topicName,
}
// TODO: See https://github.com/dapr/components-contrib/issues/1808
// This component has built-in retries because Hazelcast doesn't support N/ACK for pubsub (it delivers messages "once" and not "at least once")
var b backoff.BackOff = backoff.NewConstantBackOff(5 * time.Second)
b = backoff.WithContext(b, l.ctx)
if l.p.metadata.backOffMaxRetries >= 0 {
b = backoff.WithMaxRetries(b, uint64(l.p.metadata.backOffMaxRetries))
}
return retry.NotifyRecover(func() error {
l.p.logger.Debug("Processing Hazelcast message")
return l.pubsubHandler(l.ctx, &pubsubMsg)
}, b, func(err error, d time.Duration) {
l.p.logger.Error("Error processing Hazelcast message. Retrying...")
}, func() {
l.p.logger.Info("Successfully processed Hazelcast message after it previously failed")
})
}

View File

@ -1,19 +0,0 @@
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package hazelcast
type metadata struct {
hazelcastServers string
backOffMaxRetries int
}

View File

@ -1,39 +0,0 @@
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package hazelcast
import (
"testing"
"github.com/stretchr/testify/assert"
mdata "github.com/dapr/components-contrib/metadata"
"github.com/dapr/components-contrib/pubsub"
)
func TestValidateMetadata(t *testing.T) {
t.Run("return error when required servers is empty", func(t *testing.T) {
fakeMetaData := pubsub.Metadata{Base: mdata.Base{
Properties: map[string]string{
hazelcastServers: "",
},
}}
m, err := parseHazelcastMetadata(fakeMetaData)
// assert
assert.Error(t, err)
assert.Empty(t, m)
})
}

View File

@ -1,12 +0,0 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: pubsub
spec:
type: pubsub.hazelcast
version: v1
metadata:
- name: hazelcastServers
value: "localhost:5701"
- name: backOffMaxRetries
value: 3

View File

@ -71,8 +71,6 @@ components:
- component: mqtt3
profile: vernemq
operations: ['publish', 'subscribe', 'multiplehandlers']
- component: hazelcast
operations: ['publish', 'subscribe', 'multiplehandlers']
- component: rabbitmq
operations: ['publish', 'subscribe', 'multiplehandlers']
config:

View File

@ -62,7 +62,6 @@ import (
p_eventhubs "github.com/dapr/components-contrib/pubsub/azure/eventhubs"
p_servicebusqueues "github.com/dapr/components-contrib/pubsub/azure/servicebus/queues"
p_servicebustopics "github.com/dapr/components-contrib/pubsub/azure/servicebus/topics"
p_hazelcast "github.com/dapr/components-contrib/pubsub/hazelcast"
p_inmemory "github.com/dapr/components-contrib/pubsub/in-memory"
p_jetstream "github.com/dapr/components-contrib/pubsub/jetstream"
p_kafka "github.com/dapr/components-contrib/pubsub/kafka"
@ -480,8 +479,6 @@ func loadPubSub(tc TestComponent) pubsub.PubSub {
pubsub = p_pulsar.NewPulsar(testLogger)
case "mqtt3":
pubsub = p_mqtt3.NewMQTTPubSub(testLogger)
case "hazelcast":
pubsub = p_hazelcast.NewHazelcastPubSub(testLogger)
case "rabbitmq":
pubsub = p_rabbitmq.NewRabbitMQ(testLogger)
case "in-memory":