Merge branch 'master' into snssqs-cert-test-1971-fifo

Signed-off-by: Roberto J Rojas <robertojrojas@gmail.com>
This commit is contained in:
Roberto J Rojas 2023-01-20 22:25:19 -05:00
commit ff3250e3ce
11 changed files with 2102 additions and 0 deletions

View File

@ -50,6 +50,7 @@ jobs:
PR_COMPONENTS=$(yq -I0 --tojson eval - << EOF
- pubsub.kafka
- pubsub.rabbitmq
- pubsub.pulsar
- pubsub.mqtt3
- state.mongodb
- state.redis

View File

@ -425,6 +425,22 @@ func (w *Watcher) Assert(t TestingT, timeout time.Duration) bool {
}
}
func (w *Watcher) AssertNotDelivered(t TestingT, timeout time.Duration) bool {
w.checkClosable()
select {
case <-time.After(timeout):
w.mu.Lock()
defer w.mu.Unlock()
return true
case <-w.finished:
w.mu.Lock()
defer w.mu.Unlock()
return len(w.observed) == 0
}
}
// Assert waits for up to `timeout` for all
// expected data to be observed and requires
// the expected and observed data are either

View File

@ -0,0 +1,54 @@
### Pulsar Pubsub Certification
The purpose of this module is to provide tests that certify the Pulsar Pubsub as a stable component.
**Certification Tests**
- Verify with single publisher / single subscriber
- Run dapr application with 1 publisher and 1 subscriber
- Publisher publishes to 2 topics
- Subscriber is subscribed to 1 topic
- Simulate periodic errors and verify that the component retires on error
- Verify that all expected messages were received
- Verify that subscriber does not receive messages from the non-subscribed topic
- Verify with single publisher / multiple subscribers with same consumerID
- Run dapr application with 1 publisher and 2 subscribers
- Publisher publishes to 1 topic
- Subscriber is subscribed to 1 topic
- Simulate periodic errors and verify that the component retires on error
- Verify that all expected messages were received
- Verify with single publisher / multiple subscribers with different consumerIDs
- Run dapr application with 1 publisher and 2 subscribers
- Publisher publishes to 1 topic
- Subscriber is subscribed to 1 topic
- Simulate periodic errors and verify that the component retires on error
- Verify that all expected messages were received
- Verify with multiple publishers / multiple subscribers with different consumerIDs
- Run dapr application with 2 publishers and 2 subscribers
- Publisher publishes to 1 topic
- Subscriber is subscribed to 1 topic
- Simulate periodic errors and verify that the component retires on error
- Verify that all expected messages were received
- Verify data with a topic that does not exist
- Run dapr application with 1 publisher and 1 subscriber
- Verify the creation of topic
- Send messages to the topic created
- Verify that subscriber received all the messages
- Verify reconnection after the network interruptions
- Run dapr application with 1 publisher and 1 subscriber
- Publisher publishes to 1 topic
- Subscriber is subscribed to 1 topic
- Simulate network interruptions and verify that the component retires on error
- Verify that all expected messages were received
- Verify data with an optional metadata query parameter deliverAfter/deliverAt set
- Run dapr application with 1 publisher and 1 subscriber
- Publisher publishes to 1 topic
- Subscriber is subscribed to 1 topic
- Verify that subscriber has not immediately received messages
- Wait for message delay to pass
- Verify that all expected messages were received
- Verify data with persistent topics after pulsar restart
- Run dapr application with 1 publisher and 1 subscriber
- Publisher publishes to 1 topic
- Restart pulsar service
- Subscriber is subscribed to 1 topic
- Verify that all expected messages were received

View File

@ -0,0 +1,14 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: messagebus
spec:
type: pubsub.pulsar
version: v1
metadata:
- name: host
value: "localhost:6650"
- name: consumerID
value: certification1
- name: redeliveryDelay
value: 200ms

View File

@ -0,0 +1,14 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: messagebus
spec:
type: pubsub.pulsar
version: v1
metadata:
- name: host
value: "localhost:6650"
- name: consumerID
value: certification3
- name: redeliveryDelay
value: 200ms

View File

@ -0,0 +1,14 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: messagebus
spec:
type: pubsub.pulsar
version: v1
metadata:
- name: host
value: "localhost:6650"
- name: consumerID
value: certification2
- name: redeliveryDelay
value: 200ms

View File

@ -0,0 +1,8 @@
apiVersion: dapr.io/v1alpha1
kind: Configuration
metadata:
name: pubsubroutingconfig
spec:
features:
- name: PubSub.Routing
enabled: true

View File

@ -0,0 +1,19 @@
version: '2'
services:
standalone:
image: apachepulsar/pulsar:2.8.2
ports:
- "6650:6650"
- "8080:8080"
environment:
- BOOKIE_MEM=" -Xms768m -Xms768m -XX:MaxDirectMemorySize=1g"
command: >
/bin/bash -c
"bin/apply-config-from-env.py conf/standalone.conf
&& bin/pulsar standalone --advertised-address standalone"
volumes:
- pulsardata:/pulsar/data
- pulsarconf:/pulsar/conf
volumes:
pulsardata:
pulsarconf:

View File

@ -0,0 +1,160 @@
module github.com/dapr/components-contrib/tests/certification/pubsub/pulsar
go 1.19
replace github.com/dapr/components-contrib/tests/certification => ../../
replace github.com/dapr/components-contrib => ../../../../
require (
github.com/apache/pulsar-client-go v0.9.0
github.com/cenkalti/backoff/v4 v4.2.0 // indirect
github.com/dapr/components-contrib v1.9.6
github.com/dapr/components-contrib/tests/certification v0.0.0-00010101000000-000000000000
github.com/dapr/dapr v1.9.4-0.20230112074057-9f143d8deeed
github.com/dapr/go-sdk v1.6.0
github.com/dapr/kit v0.0.4-0.20230105202559-fcb09958bfb0
github.com/google/uuid v1.3.0
github.com/stretchr/testify v1.8.1
go.uber.org/multierr v1.8.0
)
require (
contrib.go.opencensus.io/exporter/prometheus v0.4.2 // indirect
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect
github.com/99designs/keyring v1.2.1 // indirect
github.com/AdhityaRamadhanus/fasthttpcors v0.0.0-20170121111917-d4c07198763a // indirect
github.com/AthenZ/athenz v1.10.39 // indirect
github.com/DataDog/zstd v1.5.0 // indirect
github.com/PuerkitoBio/purell v1.2.0 // indirect
github.com/andybalholm/brotli v1.0.4 // indirect
github.com/antlr/antlr4/runtime/Go/antlr v1.4.10 // indirect
github.com/ardielle/ardielle-go v1.5.2 // indirect
github.com/armon/go-metrics v0.4.1 // indirect
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff v2.2.1+incompatible // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/danieljoos/wincred v1.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dvsekhvalnov/jose2go v1.5.0 // indirect
github.com/emicklei/go-restful/v3 v3.9.0 // indirect
github.com/evanphx/json-patch/v5 v5.6.0 // indirect
github.com/fasthttp/router v1.4.14 // indirect
github.com/fatih/color v1.13.0 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/ghodss/yaml v1.0.0 // indirect
github.com/go-kit/log v0.2.1 // indirect
github.com/go-logfmt/logfmt v0.5.1 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-openapi/jsonpointer v0.19.5 // indirect
github.com/go-openapi/jsonreference v0.20.0 // indirect
github.com/go-openapi/swag v0.19.14 // indirect
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/cel-go v0.12.5 // indirect
github.com/google/gnostic v0.5.7-v3refs // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/gorilla/mux v1.8.0 // indirect
github.com/grandcat/zeroconf v1.0.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0 // indirect
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect
github.com/hashicorp/consul/api v1.13.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/hashicorp/go-hclog v1.3.1 // indirect
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-rootcerts v1.0.2 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/serf v0.10.1 // indirect
github.com/imdario/mergo v0.3.12 // indirect
github.com/jhump/protoreflect v1.13.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.15.12 // indirect
github.com/linkedin/goavro/v2 v2.9.8 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.16 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/miekg/dns v1.1.50 // indirect
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/mapstructure v1.5.1-0.20220423185008-bf980b35cac4 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mtibben/percent v0.2.1 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/openzipkin/zipkin-go v0.4.1 // indirect
github.com/phayes/freeport v0.0.0-20220201140144-74d24b5ae9f5 // indirect
github.com/pierrec/lz4 v2.6.0+incompatible // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.14.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.39.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/prometheus/statsd_exporter v0.22.7 // indirect
github.com/savsgio/gotils v0.0.0-20220530130905-52f3993e8d6d // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
github.com/sony/gobreaker v0.5.0 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/spf13/cast v1.5.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stoewer/go-strcase v1.2.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/tidwall/transform v0.0.0-20201103190739-32f242e2dbde // indirect
github.com/tylertreat/comcast v1.0.1 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasthttp v1.44.0 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/otel v1.11.1 // indirect
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.11.1 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.11.1 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.11.1 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.11.1 // indirect
go.opentelemetry.io/otel/exporters/zipkin v1.11.1 // indirect
go.opentelemetry.io/otel/sdk v1.11.1 // indirect
go.opentelemetry.io/otel/trace v1.11.1 // indirect
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
golang.org/x/exp v0.0.0-20230113152452-c42ee1cf562e // indirect
golang.org/x/mod v0.7.0 // indirect
golang.org/x/net v0.5.0 // indirect
golang.org/x/oauth2 v0.4.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.4.0 // indirect
golang.org/x/term v0.4.0 // indirect
golang.org/x/text v0.6.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.2.0 // indirect
gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20221227171554-f9683d7f8bef // indirect
google.golang.org/grpc v1.52.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/api v0.26.0 // indirect
k8s.io/apiextensions-apiserver v0.26.0 // indirect
k8s.io/apimachinery v0.26.0 // indirect
k8s.io/client-go v0.26.0 // indirect
k8s.io/component-base v0.26.0 // indirect
k8s.io/klog/v2 v2.80.1 // indirect
k8s.io/kube-openapi v0.0.0-20221012153701-172d655c2280 // indirect
k8s.io/utils v0.0.0-20221128185143-99ec85e7a448 // indirect
sigs.k8s.io/controller-runtime v0.14.1 // indirect
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
)

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,682 @@
/*
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pulsar_test
import (
"context"
"fmt"
"testing"
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/require"
"go.uber.org/multierr"
pubsub_pulsar "github.com/dapr/components-contrib/pubsub/pulsar"
pubsub_loader "github.com/dapr/dapr/pkg/components/pubsub"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/dapr/dapr/pkg/runtime"
dapr "github.com/dapr/go-sdk/client"
"github.com/dapr/go-sdk/service/common"
"github.com/dapr/kit/logger"
"github.com/dapr/components-contrib/tests/certification/embedded"
"github.com/dapr/components-contrib/tests/certification/flow"
"github.com/dapr/components-contrib/tests/certification/flow/app"
"github.com/dapr/components-contrib/tests/certification/flow/dockercompose"
"github.com/dapr/components-contrib/tests/certification/flow/network"
"github.com/dapr/components-contrib/tests/certification/flow/retry"
"github.com/dapr/components-contrib/tests/certification/flow/sidecar"
"github.com/dapr/components-contrib/tests/certification/flow/simulate"
"github.com/dapr/components-contrib/tests/certification/flow/watcher"
)
const (
sidecarName1 = "dapr-1"
sidecarName2 = "dapr-2"
appID1 = "app-1"
appID2 = "app-2"
numMessages = 10
appPort = 8000
portOffset = 2
messageKey = "partitionKey"
pubsubName = "messagebus"
topicActiveName = "certification-pubsub-topic-active"
topicPassiveName = "certification-pubsub-topic-passive"
topicToBeCreated = "certification-topic-per-test-run"
topicDefaultName = "certification-topic-default"
partition0 = "partition-0"
partition1 = "partition-1"
clusterName = "pulsarcertification"
dockerComposeYAML = "docker-compose.yml"
pulsarURL = "localhost:6650"
)
func subscriberApplication(appID string, topicName string, messagesWatcher *watcher.Watcher) app.SetupFn {
return func(ctx flow.Context, s common.Service) error {
// Simulate periodic errors.
sim := simulate.PeriodicError(ctx, 100)
// Setup the /orders event handler.
return multierr.Combine(
s.AddTopicEventHandler(&common.Subscription{
PubsubName: pubsubName,
Topic: topicName,
Route: "/orders",
}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
if err := sim(); err != nil {
return true, err
}
// Track/Observe the data of the event.
messagesWatcher.Observe(e.Data)
ctx.Logf("Message Received appID: %s,pubsub: %s, topic: %s, id: %s, data: %s", appID, e.PubsubName, e.Topic, e.ID, e.Data)
return false, nil
}),
)
}
}
func publishMessages(metadata map[string]string, sidecarName string, topicName string, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// prepare the messages
messages := make([]string, numMessages)
for i := range messages {
messages[i] = fmt.Sprintf("partitionKey: %s, message for topic: %s, index: %03d, uniqueId: %s", metadata[messageKey], topicName, i, uuid.New().String())
}
for _, messageWatcher := range messageWatchers {
messageWatcher.ExpectStrings(messages...)
}
// get the sidecar (dapr) client
client := sidecar.GetClient(ctx, sidecarName)
// publish messages
ctx.Logf("Publishing messages. sidecarName: %s, topicName: %s", sidecarName, topicName)
var publishOptions dapr.PublishEventOption
if metadata != nil {
publishOptions = dapr.PublishEventWithMetadata(metadata)
}
for _, message := range messages {
ctx.Logf("Publishing: %q", message)
var err error
if publishOptions != nil {
err = client.PublishEvent(ctx, pubsubName, topicName, message, publishOptions)
} else {
err = client.PublishEvent(ctx, pubsubName, topicName, message)
}
require.NoError(ctx, err, "error publishing message")
}
return nil
}
}
func assertMessages(timeout time.Duration, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// assert for messages
for _, m := range messageWatchers {
m.Assert(ctx, 25*timeout)
}
return nil
}
}
func TestPulsar(t *testing.T) {
consumerGroup1 := watcher.NewUnordered()
consumerGroup2 := watcher.NewUnordered()
publishMessages := func(metadata map[string]string, sidecarName string, topicName string, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// prepare the messages
messages := make([]string, numMessages)
for i := range messages {
messages[i] = fmt.Sprintf("partitionKey: %s, message for topic: %s, index: %03d, uniqueId: %s", metadata[messageKey], topicName, i, uuid.New().String())
}
for _, messageWatcher := range messageWatchers {
messageWatcher.ExpectStrings(messages...)
}
// get the sidecar (dapr) client
client := sidecar.GetClient(ctx, sidecarName)
// publish messages
ctx.Logf("Publishing messages. sidecarName: %s, topicName: %s", sidecarName, topicName)
var publishOptions dapr.PublishEventOption
if metadata != nil {
publishOptions = dapr.PublishEventWithMetadata(metadata)
}
for _, message := range messages {
ctx.Logf("Publishing: %q", message)
var err error
if publishOptions != nil {
err = client.PublishEvent(ctx, pubsubName, topicName, message, publishOptions)
} else {
err = client.PublishEvent(ctx, pubsubName, topicName, message)
}
require.NoError(ctx, err, "error publishing message")
}
return nil
}
}
flow.New(t, "pulsar certification basic test").
// Run subscriberApplication app1
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort),
subscriberApplication(appID1, topicActiveName, consumerGroup1))).
Step(dockercompose.Run(clusterName, dockerComposeYAML)).
Step("wait", flow.Sleep(10*time.Second)).
Step("wait for pulsar readiness", retry.Do(10*time.Second, 30, func(ctx flow.Context) error {
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
if err != nil {
return fmt.Errorf("could not create pulsar client: %v", err)
}
defer client.Close()
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "topic-1",
SubscriptionName: "my-sub",
Type: pulsar.Shared,
})
if err != nil {
return fmt.Errorf("could not create pulsar Topic: %v", err)
}
defer consumer.Close()
return err
})).
Step(sidecar.Run(sidecarName1,
embedded.WithComponentsPath("./components/consumer_one"),
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort),
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort),
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort),
componentRuntimeOptions(),
)).
// Run subscriberApplication app2
Step(app.Run(appID2, fmt.Sprintf(":%d", appPort+portOffset),
subscriberApplication(appID2, topicActiveName, consumerGroup2))).
// Run the Dapr sidecar with the component 2.
Step(sidecar.Run(sidecarName2,
embedded.WithComponentsPath("./components/consumer_two"),
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+portOffset),
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+portOffset),
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+portOffset),
embedded.WithProfilePort(runtime.DefaultProfilePort+portOffset),
componentRuntimeOptions(),
)).
Step("publish messages to topic1", publishMessages(nil, sidecarName1, topicActiveName, consumerGroup1, consumerGroup2)).
Step("publish messages to unUsedTopic", publishMessages(nil, sidecarName1, topicPassiveName)).
Step("verify if app1 has received messages published to active topic", assertMessages(10*time.Second, consumerGroup1)).
Step("verify if app2 has received messages published to passive topic", assertMessages(10*time.Second, consumerGroup2)).
Step("reset", flow.Reset(consumerGroup1, consumerGroup2)).
Run()
}
func TestPulsarMultipleSubsSameConsumerIDs(t *testing.T) {
consumerGroup1 := watcher.NewUnordered()
consumerGroup2 := watcher.NewUnordered()
metadata := map[string]string{
messageKey: partition0,
}
metadata1 := map[string]string{
messageKey: partition1,
}
flow.New(t, "pulsar certification - single publisher and multiple subscribers with same consumer IDs").
// Run subscriberApplication app1
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort),
subscriberApplication(appID1, topicActiveName, consumerGroup1))).
Step(dockercompose.Run(clusterName, dockerComposeYAML)).
Step("wait", flow.Sleep(10*time.Second)).
Step("wait for pulsar readiness", retry.Do(10*time.Second, 30, func(ctx flow.Context) error {
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
if err != nil {
return fmt.Errorf("could not create pulsar client: %v", err)
}
defer client.Close()
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "topic-1",
SubscriptionName: "my-sub",
Type: pulsar.Shared,
})
if err != nil {
return fmt.Errorf("could not create pulsar Topic: %v", err)
}
defer consumer.Close()
return err
})).
Step(sidecar.Run(sidecarName1,
embedded.WithComponentsPath("./components/consumer_one"),
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort),
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort),
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort),
componentRuntimeOptions(),
)).
// Run subscriberApplication app2
Step(app.Run(appID2, fmt.Sprintf(":%d", appPort+portOffset),
subscriberApplication(appID2, topicActiveName, consumerGroup2))).
// Run the Dapr sidecar with the component 2.
Step(sidecar.Run(sidecarName2,
embedded.WithComponentsPath("./components/consumer_two"),
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+portOffset),
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+portOffset),
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+portOffset),
embedded.WithProfilePort(runtime.DefaultProfilePort+portOffset),
componentRuntimeOptions(),
)).
Step("publish messages to topic1", publishMessages(metadata, sidecarName1, topicActiveName, consumerGroup2)).
Step("publish messages to topic1", publishMessages(metadata1, sidecarName2, topicActiveName, consumerGroup2)).
Step("verify if app1, app2 together have received messages published to topic1", assertMessages(10*time.Second, consumerGroup2)).
Step("reset", flow.Reset(consumerGroup1, consumerGroup2)).
Run()
}
func TestPulsarMultipleSubsDifferentConsumerIDs(t *testing.T) {
consumerGroup1 := watcher.NewUnordered()
consumerGroup2 := watcher.NewUnordered()
// Set the partition key on all messages so they are written to the same partition. This allows for checking of ordered messages.
metadata := map[string]string{
messageKey: partition0,
}
flow.New(t, "pulsar certification - single publisher and multiple subscribers with different consumer IDs").
// Run subscriberApplication app1
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort),
subscriberApplication(appID1, topicActiveName, consumerGroup1))).
Step(dockercompose.Run(clusterName, dockerComposeYAML)).
Step("wait", flow.Sleep(10*time.Second)).
Step("wait for pulsar readiness", retry.Do(10*time.Second, 30, func(ctx flow.Context) error {
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
if err != nil {
return fmt.Errorf("could not create pulsar client: %v", err)
}
defer client.Close()
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "topic-1",
SubscriptionName: "my-sub",
Type: pulsar.Shared,
})
if err != nil {
return fmt.Errorf("could not create pulsar Topic: %v", err)
}
defer consumer.Close()
// Ensure the brokers are ready by attempting to consume
// a topic partition.
return err
})).
Step(sidecar.Run(sidecarName1,
embedded.WithComponentsPath("./components/consumer_one"),
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort),
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort),
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort),
componentRuntimeOptions(),
)).
// Run subscriberApplication app2
Step(app.Run(appID2, fmt.Sprintf(":%d", appPort+portOffset),
subscriberApplication(appID2, topicActiveName, consumerGroup2))).
// Run the Dapr sidecar with the component 2.
Step(sidecar.Run(sidecarName2,
embedded.WithComponentsPath("./components/consumer_two"),
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+portOffset),
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+portOffset),
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+portOffset),
embedded.WithProfilePort(runtime.DefaultProfilePort+portOffset),
componentRuntimeOptions(),
)).
Step("publish messages to topic1", publishMessages(metadata, sidecarName1, topicActiveName, consumerGroup1)).
Step("verify if app1, app2 together have received messages published to topic1", assertMessages(10*time.Second, consumerGroup1)).
Step("reset", flow.Reset(consumerGroup1, consumerGroup2)).
Run()
}
func TestPulsarMultiplePubSubsDifferentConsumerIDs(t *testing.T) {
consumerGroup1 := watcher.NewUnordered()
consumerGroup2 := watcher.NewUnordered()
// Set the partition key on all messages so they are written to the same partition. This allows for checking of ordered messages.
metadata := map[string]string{
messageKey: partition0,
}
metadata1 := map[string]string{
messageKey: partition1,
}
flow.New(t, "pulsar certification - multiple publishers and multiple subscribers with different consumer IDs").
// Run subscriberApplication app1
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort),
subscriberApplication(appID1, topicActiveName, consumerGroup1))).
Step(dockercompose.Run(clusterName, dockerComposeYAML)).
Step("wait", flow.Sleep(10*time.Second)).
Step("wait for pulsar readiness", retry.Do(10*time.Second, 30, func(ctx flow.Context) error {
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
if err != nil {
return fmt.Errorf("could not create pulsar client: %v", err)
}
defer client.Close()
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "topic-1",
SubscriptionName: "my-sub",
Type: pulsar.Shared,
})
if err != nil {
return fmt.Errorf("could not create pulsar Topic: %v", err)
}
defer consumer.Close()
// Ensure the brokers are ready by attempting to consume
// a topic partition.
return err
})).
Step(sidecar.Run(sidecarName1,
embedded.WithComponentsPath("./components/consumer_one"),
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort),
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort),
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort),
componentRuntimeOptions(),
)).
// Run subscriberApplication app2
Step(app.Run(appID2, fmt.Sprintf(":%d", appPort+portOffset),
subscriberApplication(appID2, topicActiveName, consumerGroup2))).
// Run the Dapr sidecar with the component 2.
Step(sidecar.Run(sidecarName2,
embedded.WithComponentsPath("./components/consumer_two"),
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+portOffset),
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+portOffset),
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+portOffset),
embedded.WithProfilePort(runtime.DefaultProfilePort+portOffset),
componentRuntimeOptions(),
)).
Step("publish messages to topic1", publishMessages(metadata, sidecarName1, topicActiveName, consumerGroup1)).
Step("publish messages to topic1", publishMessages(metadata1, sidecarName2, topicActiveName, consumerGroup2)).
Step("verify if app1, app2 together have received messages published to topic1", assertMessages(10*time.Second, consumerGroup1)).
Step("verify if app1, app2 together have received messages published to topic1", assertMessages(10*time.Second, consumerGroup2)).
Step("reset", flow.Reset(consumerGroup1, consumerGroup2)).
Run()
}
func TestPulsarNonexistingTopic(t *testing.T) {
consumerGroup1 := watcher.NewUnordered()
// Set the partition key on all messages so they are written to the same partition. This allows for checking of ordered messages.
metadata := map[string]string{
messageKey: partition0,
}
flow.New(t, "pulsar certification - non-existing topic").
// Run subscriberApplication app1
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort+portOffset*3),
subscriberApplication(appID1, topicToBeCreated, consumerGroup1))).
Step(dockercompose.Run(clusterName, dockerComposeYAML)).
Step("wait", flow.Sleep(10*time.Second)).
Step("wait for pulsar readiness", retry.Do(10*time.Second, 30, func(ctx flow.Context) error {
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
if err != nil {
return fmt.Errorf("could not create pulsar client: %v", err)
}
defer client.Close()
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "topic-1",
SubscriptionName: "my-sub",
Type: pulsar.Shared,
})
if err != nil {
return fmt.Errorf("could not create pulsar Topic: %v", err)
}
defer consumer.Close()
// Ensure the brokers are ready by attempting to consume
// a topic partition.
return err
})).
// Run the Dapr sidecar with the component entitymanagement
Step(sidecar.Run(sidecarName1,
embedded.WithComponentsPath("./components/consumer_one"),
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+portOffset*3),
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+portOffset*3),
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+portOffset*3),
embedded.WithProfilePort(runtime.DefaultProfilePort+portOffset*3),
componentRuntimeOptions(),
)).
Step(fmt.Sprintf("publish messages to topicToBeCreated: %s", topicToBeCreated), publishMessages(metadata, sidecarName1, topicToBeCreated, consumerGroup1)).
Step("wait", flow.Sleep(30*time.Second)).
Step("verify if app1 has received messages published to newly created topic", assertMessages(10*time.Second, consumerGroup1)).
Run()
}
func TestPulsarNetworkInterruption(t *testing.T) {
consumerGroup1 := watcher.NewUnordered()
// Set the partition key on all messages so they are written to the same partition. This allows for checking of ordered messages.
metadata := map[string]string{
messageKey: partition0,
}
flow.New(t, "pulsar certification - network interruption").
// Run subscriberApplication app1
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort+portOffset),
subscriberApplication(appID1, topicActiveName, consumerGroup1))).
Step(dockercompose.Run(clusterName, dockerComposeYAML)).
Step("wait", flow.Sleep(10*time.Second)).
Step("wait for pulsar readiness", retry.Do(10*time.Second, 30, func(ctx flow.Context) error {
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
if err != nil {
return fmt.Errorf("could not create pulsar client: %v", err)
}
defer client.Close()
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "topic-1",
SubscriptionName: "my-sub",
Type: pulsar.Shared,
})
if err != nil {
return fmt.Errorf("could not create pulsar Topic: %v", err)
}
defer consumer.Close()
// Ensure the brokers are ready by attempting to consume
// a topic partition.
return err
})).
// Run the Dapr sidecar with the component entitymanagement
Step(sidecar.Run(sidecarName1,
embedded.WithComponentsPath("./components/consumer_one"),
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+portOffset),
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+portOffset),
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+portOffset),
embedded.WithProfilePort(runtime.DefaultProfilePort+portOffset),
componentRuntimeOptions(),
)).
Step(fmt.Sprintf("publish messages to topicToBeCreated: %s", topicActiveName), publishMessages(metadata, sidecarName1, topicActiveName, consumerGroup1)).
Step("interrupt network", network.InterruptNetwork(30*time.Second, nil, nil, "6650")).
Step("wait", flow.Sleep(30*time.Second)).
Step("verify if app1 has received messages published to newly created topic", assertMessages(10*time.Second, consumerGroup1)).
Run()
}
func TestPulsarPersitant(t *testing.T) {
consumerGroup1 := watcher.NewUnordered()
flow.New(t, "pulsar certification persistant test").
// Run subscriberApplication app1
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort),
subscriberApplication(appID1, topicActiveName, consumerGroup1))).
Step(dockercompose.Run(clusterName, dockerComposeYAML)).
Step("wait", flow.Sleep(10*time.Second)).
Step("wait for pulsar readiness", retry.Do(10*time.Second, 30, func(ctx flow.Context) error {
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
if err != nil {
return fmt.Errorf("could not create pulsar client: %v", err)
}
defer client.Close()
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "topic-1",
SubscriptionName: "my-sub",
Type: pulsar.Shared,
})
if err != nil {
return fmt.Errorf("could not create pulsar Topic: %v", err)
}
defer consumer.Close()
// Ensure the brokers are ready by attempting to consume
// a topic partition.
return err
})).
Step(sidecar.Run(sidecarName1,
embedded.WithComponentsPath("./components/consumer_one"),
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort),
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort),
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort),
componentRuntimeOptions(),
)).
Step("publish messages to topic1", publishMessages(nil, sidecarName1, topicActiveName, consumerGroup1)).
Step("stop pulsar server", dockercompose.Stop(clusterName, dockerComposeYAML, "standalone")).
Step("wait", flow.Sleep(5*time.Second)).
Step("start pulsar server", dockercompose.Start(clusterName, dockerComposeYAML, "standalone")).
Step("wait", flow.Sleep(10*time.Second)).
Step("verify if app1 has received messages published to active topic", assertMessages(10*time.Second, consumerGroup1)).
Step("reset", flow.Reset(consumerGroup1)).
Run()
}
func TestPulsarDelay(t *testing.T) {
consumerGroup1 := watcher.NewUnordered()
date := time.Now()
deliverTime := date.Add(time.Second * 60)
metadataAfter := map[string]string{
"deliverAfter": "30s",
}
metadataAt := map[string]string{
"deliverAt": deliverTime.Format(time.RFC3339Nano),
}
assertMessagesNot := func(timeout time.Duration, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// assert for messages
for _, m := range messageWatchers {
m.AssertNotDelivered(ctx, 5*timeout)
}
return nil
}
}
flow.New(t, "pulsar certification delay test").
// Run subscriberApplication app1
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort),
subscriberApplication(appID1, topicActiveName, consumerGroup1))).
Step(dockercompose.Run(clusterName, dockerComposeYAML)).
Step("wait", flow.Sleep(10*time.Second)).
Step("wait for pulsar readiness", retry.Do(10*time.Second, 30, func(ctx flow.Context) error {
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
if err != nil {
return fmt.Errorf("could not create pulsar client: %v", err)
}
defer client.Close()
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "topic-1",
SubscriptionName: "my-sub",
Type: pulsar.Shared,
})
if err != nil {
return fmt.Errorf("could not create pulsar Topic: %v", err)
}
defer consumer.Close()
// Ensure the brokers are ready by attempting to consume
// a topic partition.
return err
})).
Step(sidecar.Run(sidecarName1,
embedded.WithComponentsPath("./components/consumer_three"),
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort),
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort),
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort),
componentRuntimeOptions(),
)).
Step("publish messages to topic1", publishMessages(metadataAfter, sidecarName1, topicActiveName, consumerGroup1)).
// receive no messages due to deliverAfter delay
Step("verify if app1 has received no messages published to topic", assertMessagesNot(1*time.Second, consumerGroup1)).
// delay has passed, messages should be received
Step("verify if app1 has received messages published to topic", assertMessages(10*time.Second, consumerGroup1)).
Step("reset", flow.Reset(consumerGroup1)).
// publish messages using deliverAt property
Step("publish messages to topic1", publishMessages(metadataAt, sidecarName1, topicActiveName, consumerGroup1)).
Step("verify if app1 has received messages published to topic", assertMessages(10*time.Second, consumerGroup1)).
Run()
}
func componentRuntimeOptions() []runtime.Option {
log := logger.NewLogger("dapr.components")
pubsubRegistry := pubsub_loader.NewRegistry()
pubsubRegistry.Logger = log
pubsubRegistry.RegisterComponent(pubsub_pulsar.NewPulsar, "pulsar")
return []runtime.Option{
runtime.WithPubSubs(pubsubRegistry),
}
}