Remove deprecated NATS Streaming PubSub Component (#3150)
Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
This commit is contained in:
		
							parent
							
								
									e7db4cf3ad
								
							
						
					
					
						commit
						75078a3e17
					
				| 
						 | 
				
			
			@ -1,7 +0,0 @@
 | 
			
		|||
version: '2'
 | 
			
		||||
services:
 | 
			
		||||
  natsstreaming:
 | 
			
		||||
    image: nats-streaming:latest
 | 
			
		||||
    ports:
 | 
			
		||||
      - "4222:4222"
 | 
			
		||||
      - "8222:8222"
 | 
			
		||||
| 
						 | 
				
			
			@ -414,10 +414,6 @@ const components = {
 | 
			
		|||
        conformanceSetup: 'docker-compose.sh vernemq',
 | 
			
		||||
        sourcePkg: ['pubsub/mqtt3'],
 | 
			
		||||
    },
 | 
			
		||||
    'pubsub.natsstreaming': {
 | 
			
		||||
        conformance: true,
 | 
			
		||||
        conformanceSetup: 'docker-compose.sh natsstreaming',
 | 
			
		||||
    },
 | 
			
		||||
    'pubsub.pulsar': {
 | 
			
		||||
        conformance: true,
 | 
			
		||||
        certification: true,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
							
								
								
									
										2
									
								
								go.mod
								
								
								
								
							
							
						
						
									
										2
									
								
								go.mod
								
								
								
								
							| 
						 | 
				
			
			@ -86,7 +86,6 @@ require (
 | 
			
		|||
	github.com/nats-io/nats-server/v2 v2.9.21
 | 
			
		||||
	github.com/nats-io/nats.go v1.28.0
 | 
			
		||||
	github.com/nats-io/nkeys v0.4.4
 | 
			
		||||
	github.com/nats-io/stan.go v0.10.4
 | 
			
		||||
	github.com/open-policy-agent/opa v0.55.0
 | 
			
		||||
	github.com/oracle/oci-go-sdk/v54 v54.0.0
 | 
			
		||||
	github.com/pashagolub/pgxmock/v2 v2.11.0
 | 
			
		||||
| 
						 | 
				
			
			@ -307,7 +306,6 @@ require (
 | 
			
		|||
	github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
 | 
			
		||||
	github.com/natefinch/lumberjack v2.0.0+incompatible // indirect
 | 
			
		||||
	github.com/nats-io/jwt/v2 v2.4.1 // indirect
 | 
			
		||||
	github.com/nats-io/nats-streaming-server v0.25.5 // indirect
 | 
			
		||||
	github.com/nats-io/nuid v1.0.1 // indirect
 | 
			
		||||
	github.com/oleiade/lane v1.0.1 // indirect
 | 
			
		||||
	github.com/opentracing/opentracing-go v1.2.0 // indirect
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
							
								
								
									
										11
									
								
								go.sum
								
								
								
								
							
							
						
						
									
										11
									
								
								go.sum
								
								
								
								
							| 
						 | 
				
			
			@ -1229,7 +1229,6 @@ github.com/hashicorp/go-immutable-radix v1.3.1/go.mod h1:0y9vanUI8NX6FsYoO3zeMjh
 | 
			
		|||
github.com/hashicorp/go-kms-wrapping/entropy v0.1.0/go.mod h1:d1g9WGtAunDNpek8jUIEJnBlbgKS1N2Q61QkHiZyR1g=
 | 
			
		||||
github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
 | 
			
		||||
github.com/hashicorp/go-msgpack v0.5.5 h1:i9R9JSrqIz0QVLz3sz+i3YJdT7TTSLcfLLzJi9aZTuI=
 | 
			
		||||
github.com/hashicorp/go-msgpack/v2 v2.1.0 h1:J2g2hMyjSefUPTnkLRU2MnsLLsPRB1n4Z/wJRN07GuA=
 | 
			
		||||
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
 | 
			
		||||
github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+vmowP0z+KUhOZdA=
 | 
			
		||||
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
 | 
			
		||||
| 
						 | 
				
			
			@ -1275,7 +1274,6 @@ github.com/hashicorp/mdns v1.0.4/go.mod h1:mtBihi+LeNXGtG8L9dX59gAEa12BDtBQSp4v/
 | 
			
		|||
github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I=
 | 
			
		||||
github.com/hashicorp/memberlist v0.5.0 h1:EtYPN8DpAURiapus508I4n9CzHs2W+8NZGbmmR/prTM=
 | 
			
		||||
github.com/hashicorp/memberlist v0.5.0/go.mod h1:yvyXLpo0QaGE59Y7hDTsTzDD25JYBZ4mHgHUZ8lrOI0=
 | 
			
		||||
github.com/hashicorp/raft v1.5.0 h1:uNs9EfJ4FwiArZRxxfd/dQ5d33nV31/CdCHArH89hT8=
 | 
			
		||||
github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc=
 | 
			
		||||
github.com/hashicorp/serf v0.10.1 h1:Z1H2J60yRKvfDYAOZLd2MU0ND4AH/WDz7xYHDWQsIPY=
 | 
			
		||||
github.com/hashicorp/serf v0.10.1/go.mod h1:yL2t6BqATOLGc5HF7qbFkTfXoPIY0WZdWHfEvMqbG+4=
 | 
			
		||||
| 
						 | 
				
			
			@ -1565,21 +1563,15 @@ github.com/nats-io/jwt/v2 v2.4.1/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+v
 | 
			
		|||
github.com/nats-io/nats-server/v2 v2.1.2/go.mod h1:Afk+wRZqkMQs/p45uXdrVLuab3gwv3Z8C4HTBu8GD/k=
 | 
			
		||||
github.com/nats-io/nats-server/v2 v2.9.21 h1:2TBTh0UDE74eNXQmV4HofsmRSCiVN0TH2Wgrp6BD6fk=
 | 
			
		||||
github.com/nats-io/nats-server/v2 v2.9.21/go.mod h1:ozqMZc2vTHcNcblOiXMWIXkf8+0lDGAi5wQcG+O1mHU=
 | 
			
		||||
github.com/nats-io/nats-streaming-server v0.25.5 h1:DX6xaPhKvVLhdpNsuEmmD+O9LfWSnw8cvxQU/H9LRy8=
 | 
			
		||||
github.com/nats-io/nats-streaming-server v0.25.5/go.mod h1:dSBVdHGsT/tV91lT4MWFfE6+yjRCNhRIYJpBaTHFdAo=
 | 
			
		||||
github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w=
 | 
			
		||||
github.com/nats-io/nats.go v1.22.1/go.mod h1:tLqubohF7t4z3du1QDPYJIQQyhb4wl6DhjxEajSI7UA=
 | 
			
		||||
github.com/nats-io/nats.go v1.28.0 h1:Th4G6zdsz2d0OqXdfzKLClo6bOfoI/b1kInhRtFIy5c=
 | 
			
		||||
github.com/nats-io/nats.go v1.28.0/go.mod h1:XpbWUlOElGwTYbMR7imivs7jJj9GtK7ypv321Wp6pjc=
 | 
			
		||||
github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
 | 
			
		||||
github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
 | 
			
		||||
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
 | 
			
		||||
github.com/nats-io/nkeys v0.4.4 h1:xvBJ8d69TznjcQl9t6//Q5xXuVhyYiSos6RPtvQNTwA=
 | 
			
		||||
github.com/nats-io/nkeys v0.4.4/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64=
 | 
			
		||||
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
 | 
			
		||||
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
 | 
			
		||||
github.com/nats-io/stan.go v0.10.4 h1:19GS/eD1SeQJaVkeM9EkvEYattnvnWrZ3wkSWSw4uXw=
 | 
			
		||||
github.com/nats-io/stan.go v0.10.4/go.mod h1:3XJXH8GagrGqajoO/9+HgPyKV5MWsv7S5ccdda+pc6k=
 | 
			
		||||
github.com/niean/gotools v0.0.0-20151221085310-ff3f51fc5c60/go.mod h1:gH2bvE9/eX49hWK7CwwL/+/y+dodduyxs5cTpBzF5v0=
 | 
			
		||||
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
 | 
			
		||||
github.com/nishanths/predeclared v0.0.0-20200524104333-86fad755b4d3/go.mod h1:nt3d53pc1VYcphSCIaYAJtnPYnr3Zyn8fMq2wvPGPso=
 | 
			
		||||
| 
						 | 
				
			
			@ -1973,7 +1965,6 @@ github.com/zouyx/agollo/v3 v3.4.5/go.mod h1:LJr3kDmm23QSW+F1Ol4TMHDa7HvJvscMdVxJ
 | 
			
		|||
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
 | 
			
		||||
go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
 | 
			
		||||
go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ=
 | 
			
		||||
go.etcd.io/bbolt v1.3.7 h1:j+zJOnnEjF/kyHlDDgGnVL/AIqIJPq8UoB2GSNfkUfQ=
 | 
			
		||||
go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg=
 | 
			
		||||
go.etcd.io/etcd/api/v3 v3.5.0-alpha.0/go.mod h1:mPcW6aZJukV6Aa81LSKpBjQXTWlXB5r74ymPoSWa3Sw=
 | 
			
		||||
go.etcd.io/etcd/api/v3 v3.5.4/go.mod h1:5GB2vv4A4AOn3yk7MftYGHkUfGtDHnEraIjym4dYz5A=
 | 
			
		||||
| 
						 | 
				
			
			@ -2103,7 +2094,6 @@ golang.org/x/crypto v0.0.0-20191219195013-becbf705a915/go.mod h1:LzIPMQfyMNhhGPh
 | 
			
		|||
golang.org/x/crypto v0.0.0-20200510223506-06a226fb4e37/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
 | 
			
		||||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
 | 
			
		||||
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
 | 
			
		||||
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
 | 
			
		||||
golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
 | 
			
		||||
golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
 | 
			
		||||
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
 | 
			
		||||
| 
						 | 
				
			
			@ -2116,7 +2106,6 @@ golang.org/x/crypto v0.0.0-20220427172511-eb4f295cb31f/go.mod h1:IxCIyHEi3zRg3s0
 | 
			
		|||
golang.org/x/crypto v0.0.0-20220513210258-46612604a0f9/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
 | 
			
		||||
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
 | 
			
		||||
golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw=
 | 
			
		||||
golang.org/x/crypto v0.5.0/go.mod h1:NK/OQwhpMQP3MwtdjgLlYHnH9ebylxKWv3e0fK+mkQU=
 | 
			
		||||
golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58=
 | 
			
		||||
golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU=
 | 
			
		||||
golang.org/x/crypto v0.9.0/go.mod h1:yrmDGqONDYtNj3tH8X9dzUun2m2lzPa9ngI6/RUPGR0=
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,38 +0,0 @@
 | 
			
		|||
/*
 | 
			
		||||
Copyright 2021 The Dapr Authors
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
package natsstreaming
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/dapr/components-contrib/pubsub"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type natsMetadata struct {
 | 
			
		||||
	NatsURL                 string
 | 
			
		||||
	NatsStreamingClusterID  string
 | 
			
		||||
	SubscriptionType        string
 | 
			
		||||
	NatsQueueGroupName      string `mapstructure:"consumerId"`
 | 
			
		||||
	DurableSubscriptionName string
 | 
			
		||||
	StartAtSequence         *uint64
 | 
			
		||||
	StartWithLastReceived   string
 | 
			
		||||
	DeliverNew              string
 | 
			
		||||
	DeliverAll              string
 | 
			
		||||
	StartAtTimeDelta        time.Duration
 | 
			
		||||
	StartAtTime             string
 | 
			
		||||
	StartAtTimeFormat       string
 | 
			
		||||
	AckWaitTime             time.Duration
 | 
			
		||||
	MaxInFlight             *uint64
 | 
			
		||||
	ConcurrencyMode         pubsub.ConcurrencyMode
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -1,372 +0,0 @@
 | 
			
		|||
/*
 | 
			
		||||
Copyright 2021 The Dapr Authors
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
/*
 | 
			
		||||
Package natsstreaming implements NATS Streaming pubsub component
 | 
			
		||||
*/
 | 
			
		||||
package natsstreaming
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"errors"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"math/rand"
 | 
			
		||||
	"reflect"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"sync/atomic"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	nats "github.com/nats-io/nats.go"
 | 
			
		||||
	stan "github.com/nats-io/stan.go"
 | 
			
		||||
	"github.com/nats-io/stan.go/pb"
 | 
			
		||||
 | 
			
		||||
	"github.com/dapr/components-contrib/metadata"
 | 
			
		||||
	"github.com/dapr/components-contrib/pubsub"
 | 
			
		||||
	"github.com/dapr/kit/logger"
 | 
			
		||||
	"github.com/dapr/kit/retry"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// compulsory options.
 | 
			
		||||
const (
 | 
			
		||||
	natsURL                = "natsURL"
 | 
			
		||||
	natsStreamingClusterID = "natsStreamingClusterID"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// subscription options (optional).
 | 
			
		||||
const (
 | 
			
		||||
	durableSubscriptionName = "durableSubscriptionName"
 | 
			
		||||
	startAtSequence         = "startAtSequence"
 | 
			
		||||
	startWithLastReceived   = "startWithLastReceived"
 | 
			
		||||
	deliverAll              = "deliverAll"
 | 
			
		||||
	deliverNew              = "deliverNew"
 | 
			
		||||
	startAtTimeDelta        = "startAtTimeDelta"
 | 
			
		||||
	startAtTime             = "startAtTime"
 | 
			
		||||
	startAtTimeFormat       = "startAtTimeFormat"
 | 
			
		||||
	ackWaitTime             = "ackWaitTime"
 | 
			
		||||
	maxInFlight             = "maxInFlight"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// valid values for subscription options.
 | 
			
		||||
const (
 | 
			
		||||
	subscriptionTypeQueueGroup = "queue"
 | 
			
		||||
	subscriptionTypeTopic      = "topic"
 | 
			
		||||
	startWithLastReceivedTrue  = "true"
 | 
			
		||||
	deliverAllTrue             = "true"
 | 
			
		||||
	deliverNewTrue             = "true"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const (
 | 
			
		||||
	consumerID       = "consumerID" // passed in by Dapr runtime
 | 
			
		||||
	subscriptionType = "subscriptionType"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type natsStreamingPubSub struct {
 | 
			
		||||
	metadata         natsMetadata
 | 
			
		||||
	natStreamingConn stan.Conn
 | 
			
		||||
 | 
			
		||||
	logger logger.Logger
 | 
			
		||||
 | 
			
		||||
	backOffConfig retry.Config
 | 
			
		||||
 | 
			
		||||
	closed  atomic.Bool
 | 
			
		||||
	closeCh chan struct{}
 | 
			
		||||
	wg      sync.WaitGroup
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewNATSStreamingPubSub returns a new NATS Streaming pub-sub implementation.
 | 
			
		||||
func NewNATSStreamingPubSub(logger logger.Logger) pubsub.PubSub {
 | 
			
		||||
	return &natsStreamingPubSub{logger: logger, closeCh: make(chan struct{})}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func parseNATSStreamingMetadata(meta pubsub.Metadata) (natsMetadata, error) {
 | 
			
		||||
	m := natsMetadata{}
 | 
			
		||||
 | 
			
		||||
	var err error
 | 
			
		||||
	if err = metadata.DecodeMetadata(meta.Properties, &m); err != nil {
 | 
			
		||||
		return m, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if m.NatsURL == "" {
 | 
			
		||||
		return m, errors.New("nats-streaming error: missing nats URL")
 | 
			
		||||
	}
 | 
			
		||||
	if m.NatsStreamingClusterID == "" {
 | 
			
		||||
		return m, errors.New("nats-streaming error: missing nats streaming cluster ID")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	switch m.SubscriptionType {
 | 
			
		||||
	case subscriptionTypeTopic, subscriptionTypeQueueGroup, "":
 | 
			
		||||
		// valid values
 | 
			
		||||
	default:
 | 
			
		||||
		return m, errors.New("nats-streaming error: valid value for subscriptionType is topic or queue")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if m.NatsQueueGroupName == "" {
 | 
			
		||||
		return m, errors.New("nats-streaming error: missing queue group name")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if m.MaxInFlight != nil && *m.MaxInFlight < 1 {
 | 
			
		||||
		return m, errors.New("nats-streaming error: maxInFlight should be equal to or more than 1")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	//nolint:nestif
 | 
			
		||||
	// subscription options - only one can be used
 | 
			
		||||
 | 
			
		||||
	// helper function to reset mutually exclusive options
 | 
			
		||||
	clearValues := func(m *natsMetadata, indexToKeep int) {
 | 
			
		||||
		if indexToKeep != 0 {
 | 
			
		||||
			m.StartAtSequence = nil
 | 
			
		||||
		}
 | 
			
		||||
		if indexToKeep != 1 {
 | 
			
		||||
			m.StartWithLastReceived = ""
 | 
			
		||||
		}
 | 
			
		||||
		if indexToKeep != 2 {
 | 
			
		||||
			m.DeliverAll = ""
 | 
			
		||||
		}
 | 
			
		||||
		if indexToKeep != 3 {
 | 
			
		||||
			m.DeliverNew = ""
 | 
			
		||||
		}
 | 
			
		||||
		if indexToKeep != 4 {
 | 
			
		||||
			m.StartAtTime = ""
 | 
			
		||||
		}
 | 
			
		||||
		if indexToKeep != 4 {
 | 
			
		||||
			m.StartAtTimeFormat = ""
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	switch {
 | 
			
		||||
	case m.StartAtSequence != nil:
 | 
			
		||||
		if *m.StartAtSequence < 1 {
 | 
			
		||||
			return m, errors.New("nats-streaming error: startAtSequence should be equal to or more than 1")
 | 
			
		||||
		}
 | 
			
		||||
		clearValues(&m, 0)
 | 
			
		||||
	case m.StartWithLastReceived != "":
 | 
			
		||||
		if m.StartWithLastReceived != startWithLastReceivedTrue {
 | 
			
		||||
			return m, errors.New("nats-streaming error: valid value for startWithLastReceived is true")
 | 
			
		||||
		}
 | 
			
		||||
		clearValues(&m, 1)
 | 
			
		||||
	case m.DeliverAll != "":
 | 
			
		||||
		if m.DeliverAll != deliverAllTrue {
 | 
			
		||||
			return m, errors.New("nats-streaming error: valid value for deliverAll is true")
 | 
			
		||||
		}
 | 
			
		||||
		clearValues(&m, 2)
 | 
			
		||||
	case m.DeliverNew != "":
 | 
			
		||||
		if m.DeliverNew != deliverNewTrue {
 | 
			
		||||
			return m, errors.New("nats-streaming error: valid value for deliverNew is true")
 | 
			
		||||
		}
 | 
			
		||||
		clearValues(&m, 3)
 | 
			
		||||
	case m.StartAtTime != "":
 | 
			
		||||
		if m.StartAtTimeFormat == "" {
 | 
			
		||||
			return m, errors.New("nats-streaming error: missing value for startAtTimeFormat")
 | 
			
		||||
		}
 | 
			
		||||
		clearValues(&m, 4)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	m.ConcurrencyMode, err = pubsub.Concurrency(meta.Properties)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return m, fmt.Errorf("nats-streaming error: can't parse %s: %s", pubsub.ConcurrencyKey, err)
 | 
			
		||||
	}
 | 
			
		||||
	return m, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (n *natsStreamingPubSub) Init(_ context.Context, metadata pubsub.Metadata) error {
 | 
			
		||||
	n.logger.Warn("⚠️ The NATS Streaming PubSub component is deprecated due to the deprecation of NATS Server, and will be removed from Dapr 1.13")
 | 
			
		||||
 | 
			
		||||
	m, err := parseNATSStreamingMetadata(metadata)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
	n.metadata = m
 | 
			
		||||
	clientID := genRandomString(20)
 | 
			
		||||
	opts := []nats.Option{nats.Name(clientID)}
 | 
			
		||||
	natsConn, err := nats.Connect(m.NatsURL, opts...)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("nats-streaming: error connecting to nats server at %s: %s", m.NatsURL, err)
 | 
			
		||||
	}
 | 
			
		||||
	natStreamingConn, err := stan.Connect(m.NatsStreamingClusterID, clientID, stan.NatsConn(natsConn))
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("nats-streaming: error connecting to nats streaming server %s: %s", m.NatsStreamingClusterID, err)
 | 
			
		||||
	}
 | 
			
		||||
	n.logger.Debugf("connected to natsstreaming at %s", m.NatsURL)
 | 
			
		||||
 | 
			
		||||
	// Default retry configuration is used if no
 | 
			
		||||
	// backOff properties are set.
 | 
			
		||||
	if err := retry.DecodeConfigWithPrefix(
 | 
			
		||||
		&n.backOffConfig,
 | 
			
		||||
		metadata.Properties,
 | 
			
		||||
		"backOff"); err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	n.natStreamingConn = natStreamingConn
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (n *natsStreamingPubSub) Publish(_ context.Context, req *pubsub.PublishRequest) error {
 | 
			
		||||
	if n.closed.Load() {
 | 
			
		||||
		return errors.New("component is closed")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	err := n.natStreamingConn.Publish(req.Topic, req.Data)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("nats-streaming: error from publish: %s", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (n *natsStreamingPubSub) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, handler pubsub.Handler) error {
 | 
			
		||||
	if n.closed.Load() {
 | 
			
		||||
		return errors.New("component is closed")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	natStreamingsubscriptionOptions, err := n.subscriptionOptions()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("nats-streaming: error getting subscription options %s", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	natsMsgHandler := func(natsMsg *stan.Msg) {
 | 
			
		||||
		msg := pubsub.NewMessage{
 | 
			
		||||
			Topic: req.Topic,
 | 
			
		||||
			Data:  natsMsg.Data,
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		n.logger.Debugf("Processing NATS Streaming message %s/%d", natsMsg.Subject, natsMsg.Sequence)
 | 
			
		||||
 | 
			
		||||
		f := func() {
 | 
			
		||||
			herr := handler(ctx, &msg)
 | 
			
		||||
			if herr == nil {
 | 
			
		||||
				natsMsg.Ack()
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		switch n.metadata.ConcurrencyMode {
 | 
			
		||||
		case pubsub.Single:
 | 
			
		||||
			f()
 | 
			
		||||
		case pubsub.Parallel:
 | 
			
		||||
			n.wg.Add(1)
 | 
			
		||||
			go func() {
 | 
			
		||||
				defer n.wg.Done()
 | 
			
		||||
				f()
 | 
			
		||||
			}()
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var subscription stan.Subscription
 | 
			
		||||
	if n.metadata.SubscriptionType == subscriptionTypeTopic {
 | 
			
		||||
		subscription, err = n.natStreamingConn.Subscribe(req.Topic, natsMsgHandler, natStreamingsubscriptionOptions...)
 | 
			
		||||
	} else if n.metadata.SubscriptionType == subscriptionTypeQueueGroup {
 | 
			
		||||
		subscription, err = n.natStreamingConn.QueueSubscribe(req.Topic, n.metadata.NatsQueueGroupName, natsMsgHandler, natStreamingsubscriptionOptions...)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("nats-streaming: subscribe error %s", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	n.wg.Add(1)
 | 
			
		||||
	go func() {
 | 
			
		||||
		defer n.wg.Done()
 | 
			
		||||
		select {
 | 
			
		||||
		case <-ctx.Done():
 | 
			
		||||
		case <-n.closeCh:
 | 
			
		||||
		}
 | 
			
		||||
		err := subscription.Unsubscribe()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			n.logger.Warnf("nats-streaming: error while unsubscribing from topic %s: %v", req.Topic, err)
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	if n.metadata.SubscriptionType == subscriptionTypeTopic {
 | 
			
		||||
		n.logger.Debugf("nats-streaming: subscribed to subject %s", req.Topic)
 | 
			
		||||
	} else if n.metadata.SubscriptionType == subscriptionTypeQueueGroup {
 | 
			
		||||
		n.logger.Debugf("nats-streaming: subscribed to subject %s with queue group %s", req.Topic, n.metadata.NatsQueueGroupName)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (n *natsStreamingPubSub) subscriptionOptions() ([]stan.SubscriptionOption, error) {
 | 
			
		||||
	var options []stan.SubscriptionOption
 | 
			
		||||
 | 
			
		||||
	if n.metadata.DurableSubscriptionName != "" {
 | 
			
		||||
		options = append(options, stan.DurableName(n.metadata.DurableSubscriptionName))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	switch {
 | 
			
		||||
	case n.metadata.DeliverNew == deliverNewTrue:
 | 
			
		||||
		options = append(options, stan.StartAt(pb.StartPosition_NewOnly)) //nolint:nosnakecase
 | 
			
		||||
	case n.metadata.StartAtSequence != nil && *n.metadata.StartAtSequence >= 1: // messages index start from 1, this is a valid check
 | 
			
		||||
		options = append(options, stan.StartAtSequence(*n.metadata.StartAtSequence))
 | 
			
		||||
	case n.metadata.StartWithLastReceived == startWithLastReceivedTrue:
 | 
			
		||||
		options = append(options, stan.StartWithLastReceived())
 | 
			
		||||
	case n.metadata.DeliverAll == deliverAllTrue:
 | 
			
		||||
		options = append(options, stan.DeliverAllAvailable())
 | 
			
		||||
	case n.metadata.StartAtTimeDelta > (1 * time.Nanosecond): // as long as its a valid time.Duration
 | 
			
		||||
		options = append(options, stan.StartAtTimeDelta(n.metadata.StartAtTimeDelta))
 | 
			
		||||
	case n.metadata.StartAtTime != "":
 | 
			
		||||
		if n.metadata.StartAtTimeFormat != "" {
 | 
			
		||||
			startTime, err := time.Parse(n.metadata.StartAtTimeFormat, n.metadata.StartAtTime)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return nil, err
 | 
			
		||||
			}
 | 
			
		||||
			options = append(options, stan.StartAtTime(startTime))
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// default is auto ACK. switching to manual ACK since processing errors need to be handled
 | 
			
		||||
	options = append(options, stan.SetManualAckMode())
 | 
			
		||||
 | 
			
		||||
	// check if set the ack options.
 | 
			
		||||
	if n.metadata.AckWaitTime > (1 * time.Nanosecond) {
 | 
			
		||||
		options = append(options, stan.AckWait(n.metadata.AckWaitTime))
 | 
			
		||||
	}
 | 
			
		||||
	if n.metadata.MaxInFlight != nil && *n.metadata.MaxInFlight >= 1 {
 | 
			
		||||
		options = append(options, stan.MaxInflight(int(*n.metadata.MaxInFlight)))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return options, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
const inputs = "ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890"
 | 
			
		||||
 | 
			
		||||
// generates a random string of length 20.
 | 
			
		||||
func genRandomString(n int) string {
 | 
			
		||||
	b := make([]byte, n)
 | 
			
		||||
	s := rand.NewSource(int64(time.Now().Nanosecond()))
 | 
			
		||||
	for i := range b {
 | 
			
		||||
		b[i] = inputs[s.Int63()%int64(len(inputs))]
 | 
			
		||||
	}
 | 
			
		||||
	clientID := string(b)
 | 
			
		||||
 | 
			
		||||
	return clientID
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (n *natsStreamingPubSub) Close() error {
 | 
			
		||||
	defer n.wg.Wait()
 | 
			
		||||
	if n.closed.CompareAndSwap(false, true) {
 | 
			
		||||
		close(n.closeCh)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return n.natStreamingConn.Close()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (n *natsStreamingPubSub) Features() []pubsub.Feature {
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetComponentMetadata returns the metadata of the component.
 | 
			
		||||
func (n *natsStreamingPubSub) GetComponentMetadata() (metadataInfo metadata.MetadataMap) {
 | 
			
		||||
	metadataStruct := natsMetadata{}
 | 
			
		||||
	metadata.GetMetadataInfoFromStructType(reflect.TypeOf(metadataStruct), &metadataInfo, metadata.PubSubType)
 | 
			
		||||
	return
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -1,448 +0,0 @@
 | 
			
		|||
/*
 | 
			
		||||
Copyright 2021 The Dapr Authors
 | 
			
		||||
Licensed under the Apache License, Version 2.0 (the "License");
 | 
			
		||||
you may not use this file except in compliance with the License.
 | 
			
		||||
You may obtain a copy of the License at
 | 
			
		||||
    http://www.apache.org/licenses/LICENSE-2.0
 | 
			
		||||
Unless required by applicable law or agreed to in writing, software
 | 
			
		||||
distributed under the License is distributed on an "AS IS" BASIS,
 | 
			
		||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
			
		||||
See the License for the specific language governing permissions and
 | 
			
		||||
limitations under the License.
 | 
			
		||||
*/
 | 
			
		||||
 | 
			
		||||
package natsstreaming
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"strconv"
 | 
			
		||||
	"testing"
 | 
			
		||||
	"time"
 | 
			
		||||
 | 
			
		||||
	"github.com/stretchr/testify/assert"
 | 
			
		||||
 | 
			
		||||
	mdata "github.com/dapr/components-contrib/metadata"
 | 
			
		||||
	"github.com/dapr/components-contrib/pubsub"
 | 
			
		||||
	"github.com/dapr/kit/ptr"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func TestParseNATSStreamingForMetadataMandatoryOptionsMissing(t *testing.T) {
 | 
			
		||||
	type test struct {
 | 
			
		||||
		name       string
 | 
			
		||||
		properties map[string]string
 | 
			
		||||
	}
 | 
			
		||||
	tests := []test{
 | 
			
		||||
		{"nats URL missing", map[string]string{
 | 
			
		||||
			natsStreamingClusterID: "testcluster",
 | 
			
		||||
			consumerID:             "consumer1",
 | 
			
		||||
			subscriptionType:       "topic",
 | 
			
		||||
		}},
 | 
			
		||||
		{"consumer ID missing", map[string]string{
 | 
			
		||||
			natsURL:                "nats://foo.bar:4222",
 | 
			
		||||
			natsStreamingClusterID: "testcluster",
 | 
			
		||||
			subscriptionType:       "topic",
 | 
			
		||||
		}},
 | 
			
		||||
		{"cluster ID missing", map[string]string{
 | 
			
		||||
			natsURL:          "nats://foo.bar:4222",
 | 
			
		||||
			consumerID:       "consumer1",
 | 
			
		||||
			subscriptionType: "topic",
 | 
			
		||||
		}},
 | 
			
		||||
	}
 | 
			
		||||
	for _, _test := range tests {
 | 
			
		||||
		t.Run(_test.name, func(t *testing.T) {
 | 
			
		||||
			fakeMetaData := pubsub.Metadata{Base: mdata.Base{
 | 
			
		||||
				Properties: _test.properties,
 | 
			
		||||
			}}
 | 
			
		||||
			_, err := parseNATSStreamingMetadata(fakeMetaData)
 | 
			
		||||
			assert.NotEmpty(t, err)
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestParseNATSStreamingMetadataForInvalidSubscriptionOptions(t *testing.T) {
 | 
			
		||||
	type test struct {
 | 
			
		||||
		name       string
 | 
			
		||||
		properties map[string]string
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	tests := []test{
 | 
			
		||||
		{"invalid value (less than 1) for startAtSequence", map[string]string{
 | 
			
		||||
			natsURL:                "nats://foo.bar:4222",
 | 
			
		||||
			natsStreamingClusterID: "testcluster",
 | 
			
		||||
			consumerID:             "consumer1",
 | 
			
		||||
			subscriptionType:       "topic",
 | 
			
		||||
			startAtSequence:        "0",
 | 
			
		||||
		}},
 | 
			
		||||
		{"non integer value for startAtSequence", map[string]string{
 | 
			
		||||
			natsURL:                "nats://foo.bar:4222",
 | 
			
		||||
			natsStreamingClusterID: "testcluster",
 | 
			
		||||
			consumerID:             "consumer1",
 | 
			
		||||
			subscriptionType:       "topic",
 | 
			
		||||
			startAtSequence:        "foo",
 | 
			
		||||
		}},
 | 
			
		||||
		{"startWithLastReceived is other than true", map[string]string{
 | 
			
		||||
			natsURL:                "nats://foo.bar:4222",
 | 
			
		||||
			natsStreamingClusterID: "testcluster",
 | 
			
		||||
			consumerID:             "consumer1",
 | 
			
		||||
			subscriptionType:       "topic",
 | 
			
		||||
			startWithLastReceived:  "foo",
 | 
			
		||||
		}},
 | 
			
		||||
		{"deliverAll is other than true", map[string]string{
 | 
			
		||||
			natsURL:                "nats://foo.bar:4222",
 | 
			
		||||
			natsStreamingClusterID: "testcluster",
 | 
			
		||||
			consumerID:             "consumer1",
 | 
			
		||||
			subscriptionType:       "topic",
 | 
			
		||||
			deliverAll:             "foo",
 | 
			
		||||
		}},
 | 
			
		||||
		{"deliverNew is other than true", map[string]string{
 | 
			
		||||
			natsURL:                "nats://foo.bar:4222",
 | 
			
		||||
			natsStreamingClusterID: "testcluster",
 | 
			
		||||
			consumerID:             "consumer1",
 | 
			
		||||
			subscriptionType:       "topic",
 | 
			
		||||
			deliverNew:             "foo",
 | 
			
		||||
		}},
 | 
			
		||||
		{"invalid value for startAtTimeDelta", map[string]string{
 | 
			
		||||
			natsURL:                "nats://foo.bar:4222",
 | 
			
		||||
			natsStreamingClusterID: "testcluster",
 | 
			
		||||
			consumerID:             "consumer1",
 | 
			
		||||
			subscriptionType:       "topic",
 | 
			
		||||
			startAtTimeDelta:       "foo",
 | 
			
		||||
		}},
 | 
			
		||||
		{"startAtTime provided without startAtTimeFormat", map[string]string{
 | 
			
		||||
			natsURL:                "nats://foo.bar:4222",
 | 
			
		||||
			natsStreamingClusterID: "testcluster",
 | 
			
		||||
			consumerID:             "consumer1",
 | 
			
		||||
			subscriptionType:       "topic",
 | 
			
		||||
			startAtTime:            "foo",
 | 
			
		||||
		}},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, _test := range tests {
 | 
			
		||||
		t.Run(_test.name, func(t *testing.T) {
 | 
			
		||||
			fakeMetaData := pubsub.Metadata{
 | 
			
		||||
				Base: mdata.Base{Properties: _test.properties},
 | 
			
		||||
			}
 | 
			
		||||
			_, err := parseNATSStreamingMetadata(fakeMetaData)
 | 
			
		||||
			assert.NotEmpty(t, err)
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestParseNATSStreamingMetadataForValidSubscriptionOptions(t *testing.T) {
 | 
			
		||||
	type test struct {
 | 
			
		||||
		name                  string
 | 
			
		||||
		properties            map[string]string
 | 
			
		||||
		expectedMetadataName  string
 | 
			
		||||
		expectedMetadataValue string
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	tests := []test{
 | 
			
		||||
		{
 | 
			
		||||
			"using startWithLastReceived",
 | 
			
		||||
			map[string]string{
 | 
			
		||||
				natsURL:                "nats://foo.bar:4222",
 | 
			
		||||
				natsStreamingClusterID: "testcluster",
 | 
			
		||||
				consumerID:             "consumer1",
 | 
			
		||||
				subscriptionType:       "topic",
 | 
			
		||||
				pubsub.ConcurrencyKey:  "single",
 | 
			
		||||
				startWithLastReceived:  "true",
 | 
			
		||||
			},
 | 
			
		||||
			"startWithLastReceived", "true",
 | 
			
		||||
		},
 | 
			
		||||
 | 
			
		||||
		{
 | 
			
		||||
			"using deliverAll",
 | 
			
		||||
			map[string]string{
 | 
			
		||||
				natsURL:                "nats://foo.bar:4222",
 | 
			
		||||
				natsStreamingClusterID: "testcluster",
 | 
			
		||||
				consumerID:             "consumer1",
 | 
			
		||||
				subscriptionType:       "topic",
 | 
			
		||||
				pubsub.ConcurrencyKey:  "single",
 | 
			
		||||
				deliverAll:             "true",
 | 
			
		||||
			},
 | 
			
		||||
			"deliverAll", "true",
 | 
			
		||||
		},
 | 
			
		||||
 | 
			
		||||
		{
 | 
			
		||||
			"using deliverNew",
 | 
			
		||||
			map[string]string{
 | 
			
		||||
				natsURL:                "nats://foo.bar:4222",
 | 
			
		||||
				natsStreamingClusterID: "testcluster",
 | 
			
		||||
				consumerID:             "consumer1",
 | 
			
		||||
				subscriptionType:       "topic",
 | 
			
		||||
				pubsub.ConcurrencyKey:  "single",
 | 
			
		||||
				deliverNew:             "true",
 | 
			
		||||
			},
 | 
			
		||||
			"deliverNew", "true",
 | 
			
		||||
		},
 | 
			
		||||
 | 
			
		||||
		{
 | 
			
		||||
			"using startAtSequence",
 | 
			
		||||
			map[string]string{
 | 
			
		||||
				natsURL:                "nats://foo.bar:4222",
 | 
			
		||||
				natsStreamingClusterID: "testcluster",
 | 
			
		||||
				consumerID:             "consumer1",
 | 
			
		||||
				subscriptionType:       "topic",
 | 
			
		||||
				pubsub.ConcurrencyKey:  "single",
 | 
			
		||||
				startAtSequence:        "42",
 | 
			
		||||
			},
 | 
			
		||||
			"startAtSequence", "42",
 | 
			
		||||
		},
 | 
			
		||||
 | 
			
		||||
		{
 | 
			
		||||
			"using startAtTimeDelta",
 | 
			
		||||
			map[string]string{
 | 
			
		||||
				natsURL:                "nats://foo.bar:4222",
 | 
			
		||||
				natsStreamingClusterID: "testcluster",
 | 
			
		||||
				consumerID:             "consumer1",
 | 
			
		||||
				subscriptionType:       "topic",
 | 
			
		||||
				pubsub.ConcurrencyKey:  "single",
 | 
			
		||||
				startAtTimeDelta:       "1h",
 | 
			
		||||
			},
 | 
			
		||||
			"startAtTimeDelta", "1h",
 | 
			
		||||
		},
 | 
			
		||||
 | 
			
		||||
		{
 | 
			
		||||
			"using concurrencyMode single",
 | 
			
		||||
			map[string]string{
 | 
			
		||||
				natsURL:                "nats://foo.bar:4222",
 | 
			
		||||
				natsStreamingClusterID: "testcluster",
 | 
			
		||||
				consumerID:             "consumer1",
 | 
			
		||||
				subscriptionType:       "topic",
 | 
			
		||||
				startAtTimeDelta:       "1h",
 | 
			
		||||
				pubsub.ConcurrencyKey:  "single",
 | 
			
		||||
			},
 | 
			
		||||
			"concurrencyMode", "single",
 | 
			
		||||
		},
 | 
			
		||||
 | 
			
		||||
		{
 | 
			
		||||
			"using concurrencyMode parallel",
 | 
			
		||||
			map[string]string{
 | 
			
		||||
				natsURL:                "nats://foo.bar:4222",
 | 
			
		||||
				natsStreamingClusterID: "testcluster",
 | 
			
		||||
				consumerID:             "consumer1",
 | 
			
		||||
				subscriptionType:       "topic",
 | 
			
		||||
				startAtTimeDelta:       "1h",
 | 
			
		||||
				pubsub.ConcurrencyKey:  "parallel",
 | 
			
		||||
			},
 | 
			
		||||
			"concurrencyMode", "parallel",
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, _test := range tests {
 | 
			
		||||
		t.Run(_test.name, func(t *testing.T) {
 | 
			
		||||
			fakeMetaData := pubsub.Metadata{
 | 
			
		||||
				Base: mdata.Base{Properties: _test.properties},
 | 
			
		||||
			}
 | 
			
		||||
			m, err := parseNATSStreamingMetadata(fakeMetaData)
 | 
			
		||||
 | 
			
		||||
			assert.NoError(t, err)
 | 
			
		||||
 | 
			
		||||
			assert.NotEmpty(t, m.NatsURL)
 | 
			
		||||
			assert.NotEmpty(t, m.NatsStreamingClusterID)
 | 
			
		||||
			assert.NotEmpty(t, m.SubscriptionType)
 | 
			
		||||
			assert.NotEmpty(t, m.NatsQueueGroupName)
 | 
			
		||||
			assert.NotEmpty(t, m.ConcurrencyMode)
 | 
			
		||||
			assert.NotEmpty(t, _test.expectedMetadataValue)
 | 
			
		||||
 | 
			
		||||
			assert.Equal(t, _test.properties[natsURL], m.NatsURL)
 | 
			
		||||
			assert.Equal(t, _test.properties[natsStreamingClusterID], m.NatsStreamingClusterID)
 | 
			
		||||
			assert.Equal(t, _test.properties[subscriptionType], m.SubscriptionType)
 | 
			
		||||
			assert.Equal(t, _test.properties[consumerID], m.NatsQueueGroupName)
 | 
			
		||||
			assert.Equal(t, _test.properties[pubsub.ConcurrencyKey], string(m.ConcurrencyMode))
 | 
			
		||||
			assert.Equal(t, _test.properties[_test.expectedMetadataName], _test.expectedMetadataValue)
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestParseNATSStreamingMetadata(t *testing.T) {
 | 
			
		||||
	t.Run("mandatory metadata provided", func(t *testing.T) {
 | 
			
		||||
		fakeProperties := map[string]string{
 | 
			
		||||
			natsURL:                "nats://foo.bar:4222",
 | 
			
		||||
			natsStreamingClusterID: "testcluster",
 | 
			
		||||
			consumerID:             "consumer1",
 | 
			
		||||
		}
 | 
			
		||||
		fakeMetaData := pubsub.Metadata{
 | 
			
		||||
			Base: mdata.Base{Properties: fakeProperties},
 | 
			
		||||
		}
 | 
			
		||||
		m, err := parseNATSStreamingMetadata(fakeMetaData)
 | 
			
		||||
 | 
			
		||||
		assert.NoError(t, err)
 | 
			
		||||
		assert.NotEmpty(t, m.NatsURL)
 | 
			
		||||
		assert.NotEmpty(t, m.NatsStreamingClusterID)
 | 
			
		||||
		assert.NotEmpty(t, m.NatsQueueGroupName)
 | 
			
		||||
		assert.Equal(t, fakeProperties[natsURL], m.NatsURL)
 | 
			
		||||
		assert.Equal(t, fakeProperties[natsStreamingClusterID], m.NatsStreamingClusterID)
 | 
			
		||||
		assert.Equal(t, fakeProperties[consumerID], m.NatsQueueGroupName)
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	t.Run("subscription type missing", func(t *testing.T) {
 | 
			
		||||
		fakeProperties := map[string]string{
 | 
			
		||||
			natsURL:                "nats://foo.bar:4222",
 | 
			
		||||
			natsStreamingClusterID: "testcluster",
 | 
			
		||||
			consumerID:             "consumer1",
 | 
			
		||||
		}
 | 
			
		||||
		fakeMetaData := pubsub.Metadata{
 | 
			
		||||
			Base: mdata.Base{Properties: fakeProperties},
 | 
			
		||||
		}
 | 
			
		||||
		_, err := parseNATSStreamingMetadata(fakeMetaData)
 | 
			
		||||
		assert.Empty(t, err)
 | 
			
		||||
	})
 | 
			
		||||
	t.Run("invalid value for subscription type", func(t *testing.T) {
 | 
			
		||||
		fakeProperties := map[string]string{
 | 
			
		||||
			natsURL:                "nats://foo.bar:4222",
 | 
			
		||||
			natsStreamingClusterID: "testcluster",
 | 
			
		||||
			consumerID:             "consumer1",
 | 
			
		||||
			subscriptionType:       "baz",
 | 
			
		||||
		}
 | 
			
		||||
		fakeMetaData := pubsub.Metadata{
 | 
			
		||||
			Base: mdata.Base{Properties: fakeProperties},
 | 
			
		||||
		}
 | 
			
		||||
		_, err := parseNATSStreamingMetadata(fakeMetaData)
 | 
			
		||||
		assert.NotEmpty(t, err)
 | 
			
		||||
	})
 | 
			
		||||
	t.Run("more than one subscription option provided", func(t *testing.T) {
 | 
			
		||||
		fakeProperties := map[string]string{
 | 
			
		||||
			natsURL:                "nats://foo.bar:4222",
 | 
			
		||||
			natsStreamingClusterID: "testcluster",
 | 
			
		||||
			consumerID:             "consumer1",
 | 
			
		||||
			subscriptionType:       "topic",
 | 
			
		||||
			startAtSequence:        "42",
 | 
			
		||||
			startWithLastReceived:  "true",
 | 
			
		||||
			deliverAll:             "true",
 | 
			
		||||
		}
 | 
			
		||||
		fakeMetaData := pubsub.Metadata{
 | 
			
		||||
			Base: mdata.Base{Properties: fakeProperties},
 | 
			
		||||
		}
 | 
			
		||||
		m, err := parseNATSStreamingMetadata(fakeMetaData)
 | 
			
		||||
		assert.NoError(t, err)
 | 
			
		||||
		assert.NotEmpty(t, m.NatsURL)
 | 
			
		||||
		assert.NotEmpty(t, m.NatsStreamingClusterID)
 | 
			
		||||
		assert.NotEmpty(t, m.SubscriptionType)
 | 
			
		||||
		assert.NotEmpty(t, m.NatsQueueGroupName)
 | 
			
		||||
		assert.NotEmpty(t, m.StartAtSequence)
 | 
			
		||||
		// startWithLastReceived ignored
 | 
			
		||||
		assert.Empty(t, m.StartWithLastReceived)
 | 
			
		||||
		// deliverAll will be ignored
 | 
			
		||||
		assert.Empty(t, m.DeliverAll)
 | 
			
		||||
 | 
			
		||||
		assert.Equal(t, fakeProperties[natsURL], m.NatsURL)
 | 
			
		||||
		assert.Equal(t, fakeProperties[natsStreamingClusterID], m.NatsStreamingClusterID)
 | 
			
		||||
		assert.Equal(t, fakeProperties[subscriptionType], m.SubscriptionType)
 | 
			
		||||
		assert.Equal(t, fakeProperties[consumerID], m.NatsQueueGroupName)
 | 
			
		||||
		assert.Equal(t, fakeProperties[startAtSequence], strconv.FormatUint(*m.StartAtSequence, 10))
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestSubscriptionOptionsForValidOptions(t *testing.T) {
 | 
			
		||||
	type test struct {
 | 
			
		||||
		name                    string
 | 
			
		||||
		m                       natsMetadata
 | 
			
		||||
		expectedNumberOfOptions int
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	tests := []test{
 | 
			
		||||
		{"using durableSubscriptionName", natsMetadata{DurableSubscriptionName: "foobar"}, 2},
 | 
			
		||||
		{"durableSubscriptionName is empty", natsMetadata{DurableSubscriptionName: ""}, 1},
 | 
			
		||||
		{"using startAtSequence", natsMetadata{StartAtSequence: ptr.Of(uint64(42))}, 2},
 | 
			
		||||
		{"using startWithLastReceived", natsMetadata{StartWithLastReceived: startWithLastReceivedTrue}, 2},
 | 
			
		||||
		{"using deliverAll", natsMetadata{DeliverAll: deliverAllTrue}, 2},
 | 
			
		||||
		{"using startAtTimeDelta", natsMetadata{StartAtTimeDelta: 1 * time.Hour}, 2},
 | 
			
		||||
		{"using startAtTime and startAtTimeFormat", natsMetadata{StartAtTime: "Feb 3, 2013 at 7:54pm (PST)", StartAtTimeFormat: "Jan 2, 2006 at 3:04pm (MST)"}, 2},
 | 
			
		||||
		{"using manual ack with ackWaitTime", natsMetadata{AckWaitTime: 30 * time.Second}, 2},
 | 
			
		||||
		{"using manual ack with maxInFlight", natsMetadata{MaxInFlight: ptr.Of(uint64(42))}, 2},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, _test := range tests {
 | 
			
		||||
		t.Run(_test.name, func(t *testing.T) {
 | 
			
		||||
			natsStreaming := natsStreamingPubSub{metadata: _test.m}
 | 
			
		||||
			opts, err := natsStreaming.subscriptionOptions()
 | 
			
		||||
			assert.Empty(t, err)
 | 
			
		||||
			assert.NotEmpty(t, opts)
 | 
			
		||||
			assert.Equal(t, _test.expectedNumberOfOptions, len(opts))
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestSubscriptionOptionsForInvalidOptions(t *testing.T) {
 | 
			
		||||
	type test struct {
 | 
			
		||||
		name string
 | 
			
		||||
		m    natsMetadata
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	tests := []test{
 | 
			
		||||
		{"startAtSequence is less than 1", natsMetadata{StartAtSequence: ptr.Of(uint64(0))}},
 | 
			
		||||
		{"startWithLastReceived is other than true", natsMetadata{StartWithLastReceived: "foo"}},
 | 
			
		||||
		{"deliverAll is other than true", natsMetadata{DeliverAll: "foo"}},
 | 
			
		||||
		{"deliverNew is other than true", natsMetadata{DeliverNew: "foo"}},
 | 
			
		||||
		{"startAtTime is empty", natsMetadata{StartAtTime: "", StartAtTimeFormat: "Jan 2, 2006 at 3:04pm (MST)"}},
 | 
			
		||||
		{"startAtTimeFormat is empty", natsMetadata{StartAtTime: "Feb 3, 2013 at 7:54pm (PST)", StartAtTimeFormat: ""}},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, _test := range tests {
 | 
			
		||||
		t.Run(_test.name, func(t *testing.T) {
 | 
			
		||||
			natsStreaming := natsStreamingPubSub{metadata: _test.m}
 | 
			
		||||
			opts, err := natsStreaming.subscriptionOptions()
 | 
			
		||||
			assert.Empty(t, err)
 | 
			
		||||
			assert.NotEmpty(t, opts)
 | 
			
		||||
			assert.Equal(t, 1, len(opts))
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestSubscriptionOptions(t *testing.T) {
 | 
			
		||||
	// general
 | 
			
		||||
	t.Run("manual ACK option is present by default", func(t *testing.T) {
 | 
			
		||||
		natsStreaming := natsStreamingPubSub{metadata: natsMetadata{}}
 | 
			
		||||
		opts, err := natsStreaming.subscriptionOptions()
 | 
			
		||||
		assert.Empty(t, err)
 | 
			
		||||
		assert.NotEmpty(t, opts)
 | 
			
		||||
		assert.Equal(t, 1, len(opts))
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	t.Run("only one subscription option will be honored", func(t *testing.T) {
 | 
			
		||||
		m := natsMetadata{DeliverNew: deliverNewTrue, DeliverAll: deliverAllTrue, StartAtTimeDelta: 1 * time.Hour}
 | 
			
		||||
		natsStreaming := natsStreamingPubSub{metadata: m}
 | 
			
		||||
		opts, err := natsStreaming.subscriptionOptions()
 | 
			
		||||
		assert.Empty(t, err)
 | 
			
		||||
		assert.NotEmpty(t, opts)
 | 
			
		||||
		assert.Equal(t, 2, len(opts))
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	// invalid subscription options
 | 
			
		||||
 | 
			
		||||
	t.Run("startAtTime is invalid", func(t *testing.T) {
 | 
			
		||||
		m := natsMetadata{StartAtTime: "foobar", StartAtTimeFormat: "Jan 2, 2006 at 3:04pm (MST)"}
 | 
			
		||||
		natsStreaming := natsStreamingPubSub{metadata: m}
 | 
			
		||||
		opts, err := natsStreaming.subscriptionOptions()
 | 
			
		||||
		assert.NotEmpty(t, err)
 | 
			
		||||
		assert.Nil(t, opts)
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	t.Run("startAtTimeFormat is invalid", func(t *testing.T) {
 | 
			
		||||
		m := natsMetadata{StartAtTime: "Feb 3, 2013 at 7:54pm (PST)", StartAtTimeFormat: "foo"}
 | 
			
		||||
 | 
			
		||||
		natsStreaming := natsStreamingPubSub{metadata: m}
 | 
			
		||||
		opts, err := natsStreaming.subscriptionOptions()
 | 
			
		||||
		assert.NotEmpty(t, err)
 | 
			
		||||
		assert.Nil(t, opts)
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestGenRandomString(t *testing.T) {
 | 
			
		||||
	t.Run("random client ID is not empty", func(t *testing.T) {
 | 
			
		||||
		clientID := genRandomString(20)
 | 
			
		||||
		assert.NotEmpty(t, clientID)
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	t.Run("random client ID is not nil", func(t *testing.T) {
 | 
			
		||||
		clientID := genRandomString(20)
 | 
			
		||||
		assert.NotNil(t, clientID)
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	t.Run("random client ID length is 20", func(t *testing.T) {
 | 
			
		||||
		clientID := genRandomString(20)
 | 
			
		||||
		assert.NotEmpty(t, clientID)
 | 
			
		||||
		assert.NotNil(t, clientID)
 | 
			
		||||
		assert.Equal(t, 20, len(clientID))
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -1,20 +0,0 @@
 | 
			
		|||
apiVersion: dapr.io/v1alpha1
 | 
			
		||||
kind: Component
 | 
			
		||||
metadata:
 | 
			
		||||
  name: pubsub
 | 
			
		||||
spec:
 | 
			
		||||
  type: pubsub.natsstreaming
 | 
			
		||||
  version: v1
 | 
			
		||||
  metadata:
 | 
			
		||||
  - name: natsURL
 | 
			
		||||
    value: "nats://localhost:4222"
 | 
			
		||||
  - name: natsStreamingClusterID
 | 
			
		||||
    value: "test-cluster"
 | 
			
		||||
  - name: subscriptionType
 | 
			
		||||
    value: topic
 | 
			
		||||
  - name: consumerID
 | 
			
		||||
    value: myConsumerID
 | 
			
		||||
  - name: ackWaitTime
 | 
			
		||||
    value: 10s
 | 
			
		||||
  - name: maxInFlight
 | 
			
		||||
    value: 1
 | 
			
		||||
| 
						 | 
				
			
			@ -50,8 +50,6 @@ components:
 | 
			
		|||
    operations: []
 | 
			
		||||
    config:
 | 
			
		||||
      checkInOrderProcessing: false
 | 
			
		||||
  - component: natsstreaming
 | 
			
		||||
    operations: []
 | 
			
		||||
  - component: jetstream
 | 
			
		||||
    operations: []
 | 
			
		||||
  - component: kafka
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -75,7 +75,6 @@ import (
 | 
			
		|||
	p_kafka "github.com/dapr/components-contrib/pubsub/kafka"
 | 
			
		||||
	p_kubemq "github.com/dapr/components-contrib/pubsub/kubemq"
 | 
			
		||||
	p_mqtt3 "github.com/dapr/components-contrib/pubsub/mqtt3"
 | 
			
		||||
	p_natsstreaming "github.com/dapr/components-contrib/pubsub/natsstreaming"
 | 
			
		||||
	p_pulsar "github.com/dapr/components-contrib/pubsub/pulsar"
 | 
			
		||||
	p_rabbitmq "github.com/dapr/components-contrib/pubsub/rabbitmq"
 | 
			
		||||
	p_redis "github.com/dapr/components-contrib/pubsub/redis"
 | 
			
		||||
| 
						 | 
				
			
			@ -468,8 +467,6 @@ func loadPubSub(tc TestComponent) pubsub.PubSub {
 | 
			
		|||
		pubsub = p_servicebustopics.NewAzureServiceBusTopics(testLogger)
 | 
			
		||||
	case "azure.servicebus.queues":
 | 
			
		||||
		pubsub = p_servicebusqueues.NewAzureServiceBusQueues(testLogger)
 | 
			
		||||
	case "natsstreaming":
 | 
			
		||||
		pubsub = p_natsstreaming.NewNATSStreamingPubSub(testLogger)
 | 
			
		||||
	case "jetstream":
 | 
			
		||||
		pubsub = p_jetstream.NewJetStream(testLogger)
 | 
			
		||||
	case kafka:
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue