add parllel processing, fix retry bug

Signed-off-by: yaron2 <schneider.yaron@live.com>
This commit is contained in:
yaron2 2022-05-26 11:44:56 -07:00
parent 5c410fb726
commit ca36ecc556
3 changed files with 43 additions and 15 deletions

View File

@ -13,7 +13,11 @@ limitations under the License.
package natsstreaming
import "time"
import (
"time"
"github.com/dapr/components-contrib/pubsub"
)
type metadata struct {
natsURL string
@ -30,4 +34,5 @@ type metadata struct {
startAtTimeFormat string
ackWaitTime time.Duration
maxInFlight uint64
concurrencyMode pubsub.ConcurrencyMode
}

View File

@ -180,6 +180,12 @@ func parseNATSStreamingMetadata(meta pubsub.Metadata) (metadata, error) {
}
}
c, err := pubsub.Concurrency(meta.Properties)
if err != nil {
return m, fmt.Errorf("nats-streaming error: can't parse %s: %s", pubsub.ConcurrencyKey, err)
}
m.concurrencyMode = c
return m, nil
}
@ -237,24 +243,21 @@ func (n *natsStreamingPubSub) Subscribe(req pubsub.SubscribeRequest, handler pub
Topic: req.Topic,
Data: natsMsg.Data,
}
b := n.backOffConfig.NewBackOffWithContext(n.ctx)
rerr := retry.NotifyRecover(func() error {
n.logger.Debugf("Processing NATS Streaming message %s/%d", natsMsg.Subject, natsMsg.Sequence)
herr := handler(n.ctx, &msg)
if herr == nil {
// we only send a successful ACK if there is no error from Dapr runtime
n.logger.Debugf("Processing NATS Streaming message %s/%d", natsMsg.Subject, natsMsg.Sequence)
f := func() {
err := handler(n.ctx, &msg)
if err == nil {
natsMsg.Ack()
}
}
return herr
}, b, func(err error, d time.Duration) {
n.logger.Errorf("Error processing NATS Streaming message: %s/%d. Retrying...", natsMsg.Subject, natsMsg.Sequence)
}, func() {
n.logger.Infof("Successfully processed NATS Streaming message after it previously failed: %s/%d", natsMsg.Subject, natsMsg.Sequence)
})
if rerr != nil && !errors.Is(rerr, context.Canceled) {
n.logger.Errorf("Error processing message and retries are exhausted: %s/%d.", natsMsg.Subject, natsMsg.Sequence)
switch n.metadata.concurrencyMode {
case pubsub.Single:
f()
case pubsub.Parallel:
go f()
}
}

View File

@ -141,6 +141,7 @@ func TestParseNATSStreamingMetadataForValidSubscriptionOptions(t *testing.T) {
natsStreamingClusterID: "testcluster",
consumerID: "consumer1",
subscriptionType: "topic",
pubsub.ConcurrencyKey: "single",
startWithLastReceived: "true",
},
"startWithLastReceived", "true",
@ -153,6 +154,7 @@ func TestParseNATSStreamingMetadataForValidSubscriptionOptions(t *testing.T) {
natsStreamingClusterID: "testcluster",
consumerID: "consumer1",
subscriptionType: "topic",
pubsub.ConcurrencyKey: "single",
deliverAll: "true",
},
"deliverAll", "true",
@ -165,6 +167,7 @@ func TestParseNATSStreamingMetadataForValidSubscriptionOptions(t *testing.T) {
natsStreamingClusterID: "testcluster",
consumerID: "consumer1",
subscriptionType: "topic",
pubsub.ConcurrencyKey: "single",
deliverNew: "true",
},
"deliverNew", "true",
@ -177,6 +180,7 @@ func TestParseNATSStreamingMetadataForValidSubscriptionOptions(t *testing.T) {
natsStreamingClusterID: "testcluster",
consumerID: "consumer1",
subscriptionType: "topic",
pubsub.ConcurrencyKey: "single",
startAtSequence: "42",
},
"startAtSequence", "42",
@ -189,10 +193,24 @@ func TestParseNATSStreamingMetadataForValidSubscriptionOptions(t *testing.T) {
natsStreamingClusterID: "testcluster",
consumerID: "consumer1",
subscriptionType: "topic",
pubsub.ConcurrencyKey: "single",
startAtTimeDelta: "1h",
},
"startAtTimeDelta", "1h",
},
{
"using concurrencyMode",
map[string]string{
natsURL: "nats://foo.bar:4222",
natsStreamingClusterID: "testcluster",
consumerID: "consumer1",
subscriptionType: "topic",
startAtTimeDelta: "1h",
pubsub.ConcurrencyKey: "single",
},
"concurrencyMode", "single",
},
}
for _, _test := range tests {
@ -208,12 +226,14 @@ func TestParseNATSStreamingMetadataForValidSubscriptionOptions(t *testing.T) {
assert.NotEmpty(t, m.natsStreamingClusterID)
assert.NotEmpty(t, m.subscriptionType)
assert.NotEmpty(t, m.natsQueueGroupName)
assert.NotEmpty(t, m.concurrencyMode)
assert.NotEmpty(t, _test.expectedMetadataValue)
assert.Equal(t, _test.properties[natsURL], m.natsURL)
assert.Equal(t, _test.properties[natsStreamingClusterID], m.natsStreamingClusterID)
assert.Equal(t, _test.properties[subscriptionType], m.subscriptionType)
assert.Equal(t, _test.properties[consumerID], m.natsQueueGroupName)
assert.Equal(t, _test.properties[pubsub.ConcurrencyKey], string(m.concurrencyMode))
assert.Equal(t, _test.properties[_test.expectedMetadataName], _test.expectedMetadataValue)
})
}