Re-add built-in retries to pubsub.hazelcast
See: https://github.com/dapr/components-contrib/issues/1808 Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
This commit is contained in:
parent
35034aa877
commit
6cf1afc264
|
|
@ -17,17 +17,22 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/cenkalti/backoff/v4"
|
||||||
"github.com/hazelcast/hazelcast-go-client"
|
"github.com/hazelcast/hazelcast-go-client"
|
||||||
hazelcastCore "github.com/hazelcast/hazelcast-go-client/core"
|
hazelcastCore "github.com/hazelcast/hazelcast-go-client/core"
|
||||||
|
|
||||||
"github.com/dapr/components-contrib/pubsub"
|
"github.com/dapr/components-contrib/pubsub"
|
||||||
"github.com/dapr/kit/logger"
|
"github.com/dapr/kit/logger"
|
||||||
|
"github.com/dapr/kit/retry"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
hazelcastServers = "hazelcastServers"
|
hazelcastServers = "hazelcastServers"
|
||||||
|
hazelcastBackOffMaxRetries = "backOffMaxRetries"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Hazelcast struct {
|
type Hazelcast struct {
|
||||||
|
|
@ -41,7 +46,7 @@ func NewHazelcastPubSub(logger logger.Logger) pubsub.PubSub {
|
||||||
return &Hazelcast{logger: logger}
|
return &Hazelcast{logger: logger}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Hazelcast) parseHazelcastMetadata(meta pubsub.Metadata) (metadata, error) {
|
func parseHazelcastMetadata(meta pubsub.Metadata) (metadata, error) {
|
||||||
m := metadata{}
|
m := metadata{}
|
||||||
if val, ok := meta.Properties[hazelcastServers]; ok && val != "" {
|
if val, ok := meta.Properties[hazelcastServers]; ok && val != "" {
|
||||||
m.hazelcastServers = val
|
m.hazelcastServers = val
|
||||||
|
|
@ -49,17 +54,19 @@ func (p *Hazelcast) parseHazelcastMetadata(meta pubsub.Metadata) (metadata, erro
|
||||||
return m, errors.New("hazelcast error: missing hazelcast servers")
|
return m, errors.New("hazelcast error: missing hazelcast servers")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Deprecated config option
|
if val, ok := meta.Properties[hazelcastBackOffMaxRetries]; ok && val != "" {
|
||||||
// TODO: Remove in the future
|
backOffMaxRetriesInt, err := strconv.Atoi(val)
|
||||||
if _, ok := meta.Properties["backOffMaxRetries"]; ok {
|
if err != nil {
|
||||||
p.logger.Warnf("Metadata property 'backOffMaxRetries' for component pubsub.hazelcast has been deprecated and will be ignored. See: https://docs.dapr.io/reference/components-reference/supported-pubsub/setup-hazelcast/")
|
return m, fmt.Errorf("hazelcast error: invalid backOffMaxRetries %s, %v", val, err)
|
||||||
|
}
|
||||||
|
m.backOffMaxRetries = backOffMaxRetriesInt
|
||||||
}
|
}
|
||||||
|
|
||||||
return m, nil
|
return m, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Hazelcast) Init(metadata pubsub.Metadata) error {
|
func (p *Hazelcast) Init(metadata pubsub.Metadata) error {
|
||||||
m, err := p.parseHazelcastMetadata(metadata)
|
m, err := parseHazelcastMetadata(metadata)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -154,6 +161,21 @@ func (l *hazelcastMessageListener) handleMessageObject(message []byte) error {
|
||||||
Topic: l.topicName,
|
Topic: l.topicName,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: See https://github.com/dapr/components-contrib/issues/1808
|
||||||
|
// This component has built-in retries because Hazelcast doesn't support N/ACK for pubsub (it delivers messages "once" and not "at least once")
|
||||||
|
var b backoff.BackOff = backoff.NewConstantBackOff(5 * time.Second)
|
||||||
|
b = backoff.WithContext(b, l.ctx)
|
||||||
|
if l.p.metadata.backOffMaxRetries >= 0 {
|
||||||
|
b = backoff.WithMaxRetries(b, uint64(l.p.metadata.backOffMaxRetries))
|
||||||
|
}
|
||||||
|
|
||||||
|
return retry.NotifyRecover(func() error {
|
||||||
l.p.logger.Debug("Processing Hazelcast message")
|
l.p.logger.Debug("Processing Hazelcast message")
|
||||||
|
|
||||||
return l.pubsubHandler(l.ctx, &pubsubMsg)
|
return l.pubsubHandler(l.ctx, &pubsubMsg)
|
||||||
|
}, b, func(err error, d time.Duration) {
|
||||||
|
l.p.logger.Error("Error processing Hazelcast message. Retrying...")
|
||||||
|
}, func() {
|
||||||
|
l.p.logger.Info("Successfully processed Hazelcast message after it previously failed")
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -15,4 +15,5 @@ package hazelcast
|
||||||
|
|
||||||
type metadata struct {
|
type metadata struct {
|
||||||
hazelcastServers string
|
hazelcastServers string
|
||||||
|
backOffMaxRetries int
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -19,7 +19,6 @@ import (
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
|
||||||
"github.com/dapr/components-contrib/pubsub"
|
"github.com/dapr/components-contrib/pubsub"
|
||||||
"github.com/dapr/kit/logger"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestValidateMetadata(t *testing.T) {
|
func TestValidateMetadata(t *testing.T) {
|
||||||
|
|
@ -30,10 +29,7 @@ func TestValidateMetadata(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
obj := Hazelcast{
|
m, err := parseHazelcastMetadata(fakeMetaData)
|
||||||
logger: logger.NewLogger("test"),
|
|
||||||
}
|
|
||||||
m, err := obj.parseHazelcastMetadata(fakeMetaData)
|
|
||||||
|
|
||||||
// assert
|
// assert
|
||||||
assert.Error(t, err)
|
assert.Error(t, err)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue