1293 lines
45 KiB
Go
1293 lines
45 KiB
Go
/*
|
|
Copyright 2023 The Dapr Authors
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package pulsar_test
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/tls"
|
|
"encoding/json"
|
|
"encoding/pem"
|
|
"fmt"
|
|
"io"
|
|
"io/fs"
|
|
"io/ioutil"
|
|
"net/http"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"strconv"
|
|
"strings"
|
|
"testing"
|
|
"text/template"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/stretchr/testify/require"
|
|
"github.com/stretchr/testify/suite"
|
|
"go.uber.org/multierr"
|
|
|
|
"github.com/dapr/components-contrib/common/authentication/oauth2"
|
|
pubsub_pulsar "github.com/dapr/components-contrib/pubsub/pulsar"
|
|
pubsub_loader "github.com/dapr/dapr/pkg/components/pubsub"
|
|
"github.com/dapr/dapr/pkg/config/protocol"
|
|
|
|
"github.com/apache/pulsar-client-go/pulsar"
|
|
"github.com/dapr/dapr/pkg/runtime"
|
|
dapr "github.com/dapr/go-sdk/client"
|
|
"github.com/dapr/go-sdk/service/common"
|
|
"github.com/dapr/kit/logger"
|
|
|
|
"github.com/dapr/components-contrib/tests/certification/embedded"
|
|
"github.com/dapr/components-contrib/tests/certification/flow"
|
|
"github.com/dapr/components-contrib/tests/certification/flow/app"
|
|
"github.com/dapr/components-contrib/tests/certification/flow/dockercompose"
|
|
"github.com/dapr/components-contrib/tests/certification/flow/network"
|
|
"github.com/dapr/components-contrib/tests/certification/flow/retry"
|
|
"github.com/dapr/components-contrib/tests/certification/flow/sidecar"
|
|
"github.com/dapr/components-contrib/tests/certification/flow/simulate"
|
|
"github.com/dapr/components-contrib/tests/certification/flow/watcher"
|
|
)
|
|
|
|
const (
|
|
sidecarName1 = "dapr-1"
|
|
sidecarName2 = "dapr-2"
|
|
|
|
appID1 = "app-1"
|
|
appID2 = "app-2"
|
|
|
|
numMessages = 10
|
|
appPort = 8001
|
|
portOffset = 2
|
|
messageKey = "partitionKey"
|
|
pubsubName = "messagebus"
|
|
topicActiveName = "certification-pubsub-topic-active"
|
|
topicPassiveName = "certification-pubsub-topic-passive"
|
|
topicToBeCreated = "certification-topic-per-test-run"
|
|
topicDefaultName = "certification-topic-default"
|
|
topicMultiPartitionName = "certification-topic-multi-partition8"
|
|
partition0 = "partition-0"
|
|
partition1 = "partition-1"
|
|
clusterName = "pulsarcertification"
|
|
dockerComposeAuthNoneYAML = "./config/docker-compose_auth-none.yaml"
|
|
dockerComposeAuthOAuth2YAML = "./config/docker-compose_auth-oauth2.yaml.tmpl"
|
|
dockerComposeMockOAuth2YAML = "./config/docker-compose_auth-mock-oauth2-server.yaml"
|
|
pulsarURL = "localhost:6650"
|
|
|
|
subscribeTypeKey = "subscribeType"
|
|
|
|
subscribeTypeExclusive = "exclusive"
|
|
subscribeTypeShared = "shared"
|
|
subscribeTypeFailover = "failover"
|
|
subscribeTypeKeyShared = "key_shared"
|
|
|
|
processModeKey = "processMode"
|
|
processModeAsync = "async"
|
|
processModeSync = "sync"
|
|
)
|
|
|
|
type pulsarSuite struct {
|
|
suite.Suite
|
|
|
|
authType string
|
|
oauth2CAPEM []byte
|
|
dockerComposeYAML string
|
|
componentsPath string
|
|
services []string
|
|
}
|
|
|
|
func TestPulsar(t *testing.T) {
|
|
t.Run("Auth:None", func(t *testing.T) {
|
|
suite.Run(t, &pulsarSuite{
|
|
authType: "none",
|
|
dockerComposeYAML: dockerComposeAuthNoneYAML,
|
|
componentsPath: "./components/auth-none",
|
|
services: []string{"standalone"},
|
|
})
|
|
})
|
|
|
|
t.Run("Auth:OAuth2", func(t *testing.T) {
|
|
dir := t.TempDir()
|
|
require.NoError(t, os.Chmod(dir, 0o777))
|
|
|
|
t.Log("Starting OAuth2 server...")
|
|
out, err := exec.Command(
|
|
"docker-compose",
|
|
"-p", "oauth2",
|
|
"-f", dockerComposeMockOAuth2YAML,
|
|
"up", "-d").CombinedOutput()
|
|
require.NoError(t, err, string(out))
|
|
t.Log(string(out))
|
|
|
|
t.Cleanup(func() {
|
|
t.Log("Stopping OAuth2 server...")
|
|
out, err = exec.Command(
|
|
"docker-compose",
|
|
"-p", "oauth2",
|
|
"-f", dockerComposeMockOAuth2YAML,
|
|
"down", "-v",
|
|
"--remove-orphans").CombinedOutput()
|
|
require.NoError(t, err, string(out))
|
|
t.Log(string(out))
|
|
})
|
|
|
|
t.Log("Waiting for OAuth server to be ready...")
|
|
oauth2CA := peerCertificate(t, "localhost:8085")
|
|
t.Log("OAuth server is ready")
|
|
|
|
require.NoError(t, os.WriteFile(filepath.Join(dir, "ca.pem"), oauth2CA, 0o644))
|
|
outf, err := os.OpenFile("./config/pulsar_auth-oauth2.conf", os.O_RDONLY, 0o644)
|
|
require.NoError(t, err)
|
|
inf, err := os.OpenFile(filepath.Join(dir, "pulsar_auth-oauth2.conf"), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o644)
|
|
require.NoError(t, err)
|
|
_, err = io.Copy(inf, outf)
|
|
require.NoError(t, err)
|
|
outf.Close()
|
|
inf.Close()
|
|
|
|
td := struct {
|
|
TmpDir string
|
|
OAuth2CAPEM string
|
|
}{
|
|
TmpDir: dir,
|
|
OAuth2CAPEM: strings.ReplaceAll(string(oauth2CA), "\n", "\\n"),
|
|
}
|
|
|
|
tmpl, err := template.New("").ParseFiles(dockerComposeAuthOAuth2YAML)
|
|
require.NoError(t, err)
|
|
f, err := os.OpenFile(filepath.Join(dir, "docker-compose.yaml"), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o644)
|
|
require.NoError(t, err)
|
|
require.NoError(t, tmpl.ExecuteTemplate(f, "docker-compose_auth-oauth2.yaml.tmpl", td))
|
|
|
|
require.NoError(t, filepath.Walk("./components/auth-oauth2", func(path string, info fs.FileInfo, err error) error {
|
|
if info.IsDir() {
|
|
return nil
|
|
}
|
|
tmpl, err := template.New("").ParseFiles(path)
|
|
require.NoError(t, err)
|
|
path = strings.TrimSuffix(path, ".tmpl")
|
|
require.NoError(t, os.MkdirAll(filepath.Dir(filepath.Join(dir, path)), 0o755))
|
|
f, err := os.OpenFile(filepath.Join(dir, path), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o644)
|
|
require.NoError(t, err)
|
|
require.NoError(t, tmpl.ExecuteTemplate(f, filepath.Base(path)+".tmpl", td))
|
|
return nil
|
|
}))
|
|
|
|
suite.Run(t, &pulsarSuite{
|
|
oauth2CAPEM: oauth2CA,
|
|
authType: "oauth2",
|
|
dockerComposeYAML: filepath.Join(dir, "docker-compose.yaml"),
|
|
componentsPath: filepath.Join(dir, "components/auth-oauth2"),
|
|
services: []string{"zookeeper", "pulsar-init", "bookie", "broker"},
|
|
})
|
|
})
|
|
}
|
|
|
|
func subscriberApplication(appID string, topicName string, messagesWatcher *watcher.Watcher) app.SetupFn {
|
|
return func(ctx flow.Context, s common.Service) error {
|
|
// Simulate periodic errors.
|
|
sim := simulate.PeriodicError(ctx, 100)
|
|
// Setup the /orders event handler.
|
|
return multierr.Combine(
|
|
s.AddTopicEventHandler(&common.Subscription{
|
|
PubsubName: pubsubName,
|
|
Topic: topicName,
|
|
Route: "/orders",
|
|
}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
|
|
if err := sim(); err != nil {
|
|
return true, err
|
|
}
|
|
|
|
// Track/Observe the data of the event.
|
|
messagesWatcher.Observe(e.Data)
|
|
ctx.Logf("Message Received appID: %s,pubsub: %s, topic: %s, id: %s, data: %s", appID, e.PubsubName, e.Topic, e.ID, e.Data)
|
|
return false, nil
|
|
}),
|
|
)
|
|
}
|
|
}
|
|
|
|
func subscriberApplicationWithoutError(appID string, topicName string, messagesWatcher *watcher.Watcher) app.SetupFn {
|
|
return func(ctx flow.Context, s common.Service) error {
|
|
// Setup the /orders event handler.
|
|
return multierr.Combine(
|
|
s.AddTopicEventHandler(&common.Subscription{
|
|
PubsubName: pubsubName,
|
|
Topic: topicName,
|
|
Route: "/orders",
|
|
Metadata: map[string]string{
|
|
subscribeTypeKey: subscribeTypeKeyShared,
|
|
processModeKey: processModeSync,
|
|
},
|
|
}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
|
|
// Track/Observe the data of the event.
|
|
messagesWatcher.Observe(e.Data)
|
|
ctx.Logf("Message Received appID: %s,pubsub: %s, topic: %s, id: %s, data: %s", appID, e.PubsubName, e.Topic, e.ID, e.Data)
|
|
return false, nil
|
|
}),
|
|
)
|
|
}
|
|
}
|
|
|
|
func subscriberSchemaApplication(appID string, topicName string, messagesWatcher *watcher.Watcher) app.SetupFn {
|
|
return func(ctx flow.Context, s common.Service) error {
|
|
// Setup the /orders event handler.
|
|
return multierr.Combine(
|
|
s.AddTopicEventHandler(&common.Subscription{
|
|
PubsubName: pubsubName,
|
|
Topic: topicName,
|
|
Route: "/orders",
|
|
}, func(_ context.Context, e *common.TopicEvent) (retry bool, err error) {
|
|
// Track/Observe the data of the event.
|
|
messagesWatcher.ObserveJSON(e.Data)
|
|
ctx.Logf("Message Received appID: %s,pubsub: %s, topic: %s, id: %s, data: %s", appID, e.PubsubName, e.Topic, e.ID, e.Data)
|
|
return false, nil
|
|
}),
|
|
)
|
|
}
|
|
}
|
|
|
|
func publishMessages(metadata map[string]string, sidecarName string, topicName string, messageWatchers ...*watcher.Watcher) flow.Runnable {
|
|
return func(ctx flow.Context) error {
|
|
// prepare the messages
|
|
messages := make([]string, numMessages)
|
|
for i := range messages {
|
|
messages[i] = fmt.Sprintf("partitionKey: %s, message for topic: %s, index: %03d, uniqueId: %s", metadata[messageKey], topicName, i, uuid.New().String())
|
|
}
|
|
|
|
for _, messageWatcher := range messageWatchers {
|
|
messageWatcher.ExpectStrings(messages...)
|
|
}
|
|
|
|
// get the sidecar (dapr) client
|
|
client := sidecar.GetClient(ctx, sidecarName)
|
|
|
|
// publish messages
|
|
ctx.Logf("Publishing messages. sidecarName: %s, topicName: %s", sidecarName, topicName)
|
|
|
|
var publishOptions dapr.PublishEventOption
|
|
|
|
if metadata != nil {
|
|
publishOptions = dapr.PublishEventWithMetadata(metadata)
|
|
}
|
|
|
|
for _, message := range messages {
|
|
ctx.Logf("Publishing: %q", message)
|
|
var err error
|
|
|
|
if publishOptions != nil {
|
|
err = client.PublishEvent(ctx, pubsubName, topicName, message, publishOptions)
|
|
} else {
|
|
err = client.PublishEvent(ctx, pubsubName, topicName, message)
|
|
}
|
|
require.NoError(ctx, err, "error publishing message")
|
|
}
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func assertMessages(timeout time.Duration, messageWatchers ...*watcher.Watcher) flow.Runnable {
|
|
return func(ctx flow.Context) error {
|
|
// assert for messages
|
|
for _, m := range messageWatchers {
|
|
m.Assert(ctx, 25*timeout)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (p *pulsarSuite) TestPulsar() {
|
|
t := p.T()
|
|
consumerGroup1 := watcher.NewUnordered()
|
|
consumerGroup2 := watcher.NewUnordered()
|
|
|
|
publishMessages := func(metadata map[string]string, sidecarName string, topicName string, messageWatchers ...*watcher.Watcher) flow.Runnable {
|
|
return func(ctx flow.Context) error {
|
|
// prepare the messages
|
|
messages := make([]string, numMessages)
|
|
for i := range messages {
|
|
messages[i] = fmt.Sprintf("partitionKey: %s, message for topic: %s, index: %03d, uniqueId: %s", metadata[messageKey], topicName, i, uuid.New().String())
|
|
}
|
|
|
|
for _, messageWatcher := range messageWatchers {
|
|
messageWatcher.ExpectStrings(messages...)
|
|
}
|
|
|
|
// get the sidecar (dapr) client
|
|
client := sidecar.GetClient(ctx, sidecarName)
|
|
|
|
// publish messages
|
|
ctx.Logf("Publishing messages. sidecarName: %s, topicName: %s", sidecarName, topicName)
|
|
|
|
var publishOptions dapr.PublishEventOption
|
|
|
|
if metadata != nil {
|
|
publishOptions = dapr.PublishEventWithMetadata(metadata)
|
|
}
|
|
|
|
for _, message := range messages {
|
|
ctx.Logf("Publishing: %q", message)
|
|
var err error
|
|
|
|
if publishOptions != nil {
|
|
err = client.PublishEvent(ctx, pubsubName, topicName, message, publishOptions)
|
|
} else {
|
|
err = client.PublishEvent(ctx, pubsubName, topicName, message)
|
|
}
|
|
require.NoError(ctx, err, "error publishing message")
|
|
}
|
|
return nil
|
|
}
|
|
}
|
|
|
|
flow.New(t, "pulsar certification basic test").
|
|
|
|
// Run subscriberApplication app1
|
|
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort),
|
|
subscriberApplication(appID1, topicActiveName, consumerGroup1))).
|
|
Step(dockercompose.Run(clusterName, p.dockerComposeYAML)).
|
|
Step("wait", flow.Sleep(10*time.Second)).
|
|
Step("wait for pulsar readiness", retry.Do(10*time.Second, 30, func(ctx flow.Context) error {
|
|
client, err := p.client(t)
|
|
if err != nil {
|
|
return fmt.Errorf("could not create pulsar client: %v", err)
|
|
}
|
|
|
|
defer client.Close()
|
|
|
|
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
|
|
Topic: "topic-1",
|
|
SubscriptionName: "my-sub",
|
|
Type: pulsar.Shared,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("could not create pulsar Topic: %v", err)
|
|
}
|
|
defer consumer.Close()
|
|
|
|
return err
|
|
})).
|
|
Step(sidecar.Run(sidecarName1,
|
|
append(componentRuntimeOptions(),
|
|
embedded.WithComponentsPath(filepath.Join(p.componentsPath, "consumer_one")),
|
|
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort)),
|
|
embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort)),
|
|
embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort)),
|
|
)...,
|
|
)).
|
|
|
|
// Run subscriberApplication app2
|
|
Step(app.Run(appID2, fmt.Sprintf(":%d", appPort+portOffset),
|
|
subscriberApplication(appID2, topicActiveName, consumerGroup2))).
|
|
|
|
// Run the Dapr sidecar with the component 2.
|
|
Step(sidecar.Run(sidecarName2,
|
|
append(componentRuntimeOptions(),
|
|
embedded.WithComponentsPath(filepath.Join(p.componentsPath, "consumer_two")),
|
|
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort+portOffset)),
|
|
embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort+portOffset)),
|
|
embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort+portOffset)),
|
|
embedded.WithProfilePort(strconv.Itoa(runtime.DefaultProfilePort+portOffset)),
|
|
)...,
|
|
)).
|
|
Step("publish messages to topic1", publishMessages(nil, sidecarName1, topicActiveName, consumerGroup1, consumerGroup2)).
|
|
Step("publish messages to unUsedTopic", publishMessages(nil, sidecarName1, topicPassiveName)).
|
|
Step("verify if app1 has received messages published to active topic", assertMessages(10*time.Second, consumerGroup1)).
|
|
Step("verify if app2 has received messages published to passive topic", assertMessages(10*time.Second, consumerGroup2)).
|
|
Step("reset", flow.Reset(consumerGroup1, consumerGroup2)).
|
|
Run()
|
|
}
|
|
|
|
func (p *pulsarSuite) TestPulsarMultipleSubsSameConsumerIDs() {
|
|
t := p.T()
|
|
consumerGroup1 := watcher.NewUnordered()
|
|
consumerGroup2 := watcher.NewUnordered()
|
|
|
|
metadata := map[string]string{
|
|
messageKey: partition0,
|
|
}
|
|
|
|
metadata1 := map[string]string{
|
|
messageKey: partition1,
|
|
}
|
|
|
|
flow.New(t, "pulsar certification - single publisher and multiple subscribers with same consumer IDs").
|
|
|
|
// Run subscriberApplication app1
|
|
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort),
|
|
subscriberApplication(appID1, topicActiveName, consumerGroup1))).
|
|
Step(dockercompose.Run(clusterName, p.dockerComposeYAML)).
|
|
Step("wait", flow.Sleep(10*time.Second)).
|
|
Step("wait for pulsar readiness", retry.Do(10*time.Second, 30, func(ctx flow.Context) error {
|
|
client, err := p.client(t)
|
|
if err != nil {
|
|
return fmt.Errorf("could not create pulsar client: %v", err)
|
|
}
|
|
|
|
defer client.Close()
|
|
|
|
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
|
|
Topic: "topic-1",
|
|
SubscriptionName: "my-sub",
|
|
Type: pulsar.Shared,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("could not create pulsar Topic: %v", err)
|
|
}
|
|
defer consumer.Close()
|
|
|
|
return err
|
|
})).
|
|
Step(sidecar.Run(sidecarName1,
|
|
append(componentRuntimeOptions(),
|
|
embedded.WithComponentsPath(filepath.Join(p.componentsPath, "consumer_one")),
|
|
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort)),
|
|
embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort)),
|
|
embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort)),
|
|
)...,
|
|
)).
|
|
|
|
// Run subscriberApplication app2
|
|
Step(app.Run(appID2, fmt.Sprintf(":%d", appPort+portOffset),
|
|
subscriberApplication(appID2, topicActiveName, consumerGroup2))).
|
|
|
|
// Run the Dapr sidecar with the component 2.
|
|
Step(sidecar.Run(sidecarName2,
|
|
append(componentRuntimeOptions(),
|
|
embedded.WithComponentsPath(filepath.Join(p.componentsPath, "consumer_two")),
|
|
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort+portOffset)),
|
|
embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort+portOffset)),
|
|
embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort+portOffset)),
|
|
embedded.WithProfilePort(strconv.Itoa(runtime.DefaultProfilePort+portOffset)),
|
|
)...,
|
|
)).
|
|
Step("publish messages to topic1", publishMessages(metadata, sidecarName1, topicActiveName, consumerGroup2)).
|
|
Step("publish messages to topic1", publishMessages(metadata1, sidecarName2, topicActiveName, consumerGroup2)).
|
|
Step("verify if app1, app2 together have received messages published to topic1", assertMessages(10*time.Second, consumerGroup2)).
|
|
Step("reset", flow.Reset(consumerGroup1, consumerGroup2)).
|
|
Run()
|
|
}
|
|
|
|
func (p *pulsarSuite) TestPulsarMultipleSubsDifferentConsumerIDs() {
|
|
t := p.T()
|
|
|
|
consumerGroup1 := watcher.NewUnordered()
|
|
consumerGroup2 := watcher.NewUnordered()
|
|
|
|
// Set the partition key on all messages so they are written to the same partition. This allows for checking of ordered messages.
|
|
metadata := map[string]string{
|
|
messageKey: partition0,
|
|
}
|
|
|
|
flow.New(t, "pulsar certification - single publisher and multiple subscribers with different consumer IDs").
|
|
|
|
// Run subscriberApplication app1
|
|
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort),
|
|
subscriberApplication(appID1, topicActiveName, consumerGroup1))).
|
|
Step(dockercompose.Run(clusterName, p.dockerComposeYAML)).
|
|
Step("wait", flow.Sleep(10*time.Second)).
|
|
Step("wait for pulsar readiness", retry.Do(10*time.Second, 30, func(ctx flow.Context) error {
|
|
client, err := p.client(t)
|
|
if err != nil {
|
|
return fmt.Errorf("could not create pulsar client: %v", err)
|
|
}
|
|
|
|
defer client.Close()
|
|
|
|
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
|
|
Topic: "topic-1",
|
|
SubscriptionName: "my-sub",
|
|
Type: pulsar.Shared,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("could not create pulsar Topic: %v", err)
|
|
}
|
|
defer consumer.Close()
|
|
|
|
// Ensure the brokers are ready by attempting to consume
|
|
// a topic partition.
|
|
return err
|
|
})).
|
|
Step(sidecar.Run(sidecarName1,
|
|
append(componentRuntimeOptions(),
|
|
embedded.WithComponentsPath(filepath.Join(p.componentsPath, "consumer_one")),
|
|
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort)),
|
|
embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort)),
|
|
embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort)),
|
|
)...,
|
|
)).
|
|
|
|
// Run subscriberApplication app2
|
|
Step(app.Run(appID2, fmt.Sprintf(":%d", appPort+portOffset),
|
|
subscriberApplication(appID2, topicActiveName, consumerGroup2))).
|
|
|
|
// Run the Dapr sidecar with the component 2.
|
|
Step(sidecar.Run(sidecarName2,
|
|
append(componentRuntimeOptions(),
|
|
embedded.WithComponentsPath(filepath.Join(p.componentsPath, "consumer_two")),
|
|
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort+portOffset)),
|
|
embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort+portOffset)),
|
|
embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort+portOffset)),
|
|
embedded.WithProfilePort(strconv.Itoa(runtime.DefaultProfilePort+portOffset)),
|
|
)...,
|
|
)).
|
|
Step("publish messages to topic1", publishMessages(metadata, sidecarName1, topicActiveName, consumerGroup1)).
|
|
Step("verify if app1, app2 together have received messages published to topic1", assertMessages(10*time.Second, consumerGroup1)).
|
|
Step("reset", flow.Reset(consumerGroup1, consumerGroup2)).
|
|
Run()
|
|
}
|
|
|
|
func (p *pulsarSuite) TestPulsarMultiplePubSubsDifferentConsumerIDs() {
|
|
t := p.T()
|
|
consumerGroup1 := watcher.NewUnordered()
|
|
consumerGroup2 := watcher.NewUnordered()
|
|
|
|
// Set the partition key on all messages so they are written to the same partition. This allows for checking of ordered messages.
|
|
metadata := map[string]string{
|
|
messageKey: partition0,
|
|
}
|
|
|
|
metadata1 := map[string]string{
|
|
messageKey: partition1,
|
|
}
|
|
|
|
flow.New(t, "pulsar certification - multiple publishers and multiple subscribers with different consumer IDs").
|
|
|
|
// Run subscriberApplication app1
|
|
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort),
|
|
subscriberApplication(appID1, topicActiveName, consumerGroup1))).
|
|
Step(dockercompose.Run(clusterName, p.dockerComposeYAML)).
|
|
Step("wait", flow.Sleep(10*time.Second)).
|
|
Step("wait for pulsar readiness", retry.Do(10*time.Second, 30, func(ctx flow.Context) error {
|
|
client, err := p.client(t)
|
|
if err != nil {
|
|
return fmt.Errorf("could not create pulsar client: %v", err)
|
|
}
|
|
|
|
defer client.Close()
|
|
|
|
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
|
|
Topic: "topic-1",
|
|
SubscriptionName: "my-sub",
|
|
Type: pulsar.Shared,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("could not create pulsar Topic: %v", err)
|
|
}
|
|
defer consumer.Close()
|
|
|
|
// Ensure the brokers are ready by attempting to consume
|
|
// a topic partition.
|
|
return err
|
|
})).
|
|
Step(sidecar.Run(sidecarName1,
|
|
append(componentRuntimeOptions(),
|
|
embedded.WithComponentsPath(filepath.Join(p.componentsPath, "consumer_one")),
|
|
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort)),
|
|
embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort)),
|
|
embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort)),
|
|
)...,
|
|
)).
|
|
|
|
// Run subscriberApplication app2
|
|
Step(app.Run(appID2, fmt.Sprintf(":%d", appPort+portOffset),
|
|
subscriberApplication(appID2, topicActiveName, consumerGroup2))).
|
|
|
|
// Run the Dapr sidecar with the component 2.
|
|
Step(sidecar.Run(sidecarName2,
|
|
append(componentRuntimeOptions(),
|
|
embedded.WithComponentsPath(filepath.Join(p.componentsPath, "consumer_two")),
|
|
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort+portOffset)),
|
|
embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort+portOffset)),
|
|
embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort+portOffset)),
|
|
embedded.WithProfilePort(strconv.Itoa(runtime.DefaultProfilePort+portOffset)),
|
|
)...,
|
|
)).
|
|
Step("publish messages to topic1", publishMessages(metadata, sidecarName1, topicActiveName, consumerGroup1)).
|
|
Step("publish messages to topic1", publishMessages(metadata1, sidecarName2, topicActiveName, consumerGroup2)).
|
|
Step("verify if app1, app2 together have received messages published to topic1", assertMessages(10*time.Second, consumerGroup1)).
|
|
Step("verify if app1, app2 together have received messages published to topic1", assertMessages(10*time.Second, consumerGroup2)).
|
|
Step("reset", flow.Reset(consumerGroup1, consumerGroup2)).
|
|
Run()
|
|
}
|
|
|
|
func (p *pulsarSuite) TestPulsarNonexistingTopic() {
|
|
t := p.T()
|
|
consumerGroup1 := watcher.NewUnordered()
|
|
|
|
// Set the partition key on all messages so they are written to the same partition. This allows for checking of ordered messages.
|
|
metadata := map[string]string{
|
|
messageKey: partition0,
|
|
}
|
|
|
|
flow.New(t, "pulsar certification - non-existing topic").
|
|
|
|
// Run subscriberApplication app1
|
|
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort+portOffset*3),
|
|
subscriberApplication(appID1, topicToBeCreated, consumerGroup1))).
|
|
Step(dockercompose.Run(clusterName, p.dockerComposeYAML)).
|
|
Step("wait", flow.Sleep(10*time.Second)).
|
|
Step("wait for pulsar readiness", retry.Do(10*time.Second, 30, func(ctx flow.Context) error {
|
|
client, err := p.client(t)
|
|
if err != nil {
|
|
return fmt.Errorf("could not create pulsar client: %v", err)
|
|
}
|
|
|
|
defer client.Close()
|
|
|
|
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
|
|
Topic: "topic-1",
|
|
SubscriptionName: "my-sub",
|
|
Type: pulsar.Shared,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("could not create pulsar Topic: %v", err)
|
|
}
|
|
defer consumer.Close()
|
|
|
|
// Ensure the brokers are ready by attempting to consume
|
|
// a topic partition.
|
|
return err
|
|
})).
|
|
// Run the Dapr sidecar with the component entitymanagement
|
|
Step(sidecar.Run(sidecarName1,
|
|
append(componentRuntimeOptions(),
|
|
embedded.WithComponentsPath(filepath.Join(p.componentsPath, "consumer_one")),
|
|
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort+portOffset*3)),
|
|
embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort+portOffset*3)),
|
|
embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort+portOffset*3)),
|
|
embedded.WithProfilePort(strconv.Itoa(runtime.DefaultProfilePort+portOffset*3)),
|
|
)...,
|
|
)).
|
|
Step(fmt.Sprintf("publish messages to topicToBeCreated: %s", topicToBeCreated), publishMessages(metadata, sidecarName1, topicToBeCreated, consumerGroup1)).
|
|
Step("wait", flow.Sleep(30*time.Second)).
|
|
Step("verify if app1 has received messages published to newly created topic", assertMessages(10*time.Second, consumerGroup1)).
|
|
Run()
|
|
}
|
|
|
|
func (p *pulsarSuite) TestPulsarNetworkInterruption() {
|
|
t := p.T()
|
|
consumerGroup1 := watcher.NewUnordered()
|
|
|
|
// Set the partition key on all messages so they are written to the same partition. This allows for checking of ordered messages.
|
|
metadata := map[string]string{
|
|
messageKey: partition0,
|
|
}
|
|
|
|
flow.New(t, "pulsar certification - network interruption").
|
|
|
|
// Run subscriberApplication app1
|
|
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort+portOffset),
|
|
subscriberApplication(appID1, topicActiveName, consumerGroup1))).
|
|
Step(dockercompose.Run(clusterName, p.dockerComposeYAML)).
|
|
Step("wait", flow.Sleep(10*time.Second)).
|
|
Step("wait for pulsar readiness", retry.Do(10*time.Second, 30, func(ctx flow.Context) error {
|
|
client, err := p.client(t)
|
|
if err != nil {
|
|
return fmt.Errorf("could not create pulsar client: %v", err)
|
|
}
|
|
|
|
defer client.Close()
|
|
|
|
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
|
|
Topic: "topic-1",
|
|
SubscriptionName: "my-sub",
|
|
Type: pulsar.Shared,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("could not create pulsar Topic: %v", err)
|
|
}
|
|
defer consumer.Close()
|
|
|
|
// Ensure the brokers are ready by attempting to consume
|
|
// a topic partition.
|
|
return err
|
|
})).
|
|
// Run the Dapr sidecar with the component entitymanagement
|
|
Step(sidecar.Run(sidecarName1,
|
|
append(componentRuntimeOptions(),
|
|
embedded.WithComponentsPath(filepath.Join(p.componentsPath, "consumer_one")),
|
|
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort+portOffset)),
|
|
embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort+portOffset)),
|
|
embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort+portOffset)),
|
|
embedded.WithProfilePort(strconv.Itoa(runtime.DefaultProfilePort+portOffset)),
|
|
)...,
|
|
)).
|
|
Step(fmt.Sprintf("publish messages to topicToBeCreated: %s", topicActiveName), publishMessages(metadata, sidecarName1, topicActiveName, consumerGroup1)).
|
|
Step("interrupt network", network.InterruptNetwork(30*time.Second, nil, nil, "6650")).
|
|
Step("wait", flow.Sleep(30*time.Second)).
|
|
Step("verify if app1 has received messages published to newly created topic", assertMessages(10*time.Second, consumerGroup1)).
|
|
Run()
|
|
}
|
|
|
|
func (p *pulsarSuite) TestPulsarPersitant() {
|
|
t := p.T()
|
|
consumerGroup1 := watcher.NewUnordered()
|
|
|
|
flow.New(t, "pulsar certification persistant test").
|
|
|
|
// Run subscriberApplication app1
|
|
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort),
|
|
subscriberApplication(appID1, topicActiveName, consumerGroup1))).
|
|
Step(dockercompose.Run(clusterName, p.dockerComposeYAML)).
|
|
Step("wait", flow.Sleep(10*time.Second)).
|
|
Step("wait for pulsar readiness", retry.Do(10*time.Second, 30, func(ctx flow.Context) error {
|
|
client, err := p.client(t)
|
|
if err != nil {
|
|
return fmt.Errorf("could not create pulsar client: %v", err)
|
|
}
|
|
|
|
defer client.Close()
|
|
|
|
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
|
|
Topic: "topic-1",
|
|
SubscriptionName: "my-sub",
|
|
Type: pulsar.Shared,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("could not create pulsar Topic: %v", err)
|
|
}
|
|
defer consumer.Close()
|
|
|
|
// Ensure the brokers are ready by attempting to consume
|
|
// a topic partition.
|
|
return err
|
|
})).
|
|
Step(sidecar.Run(sidecarName1,
|
|
append(componentRuntimeOptions(),
|
|
embedded.WithComponentsPath(filepath.Join(p.componentsPath, "consumer_one")),
|
|
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort)),
|
|
embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort)),
|
|
embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort)),
|
|
embedded.WithGracefulShutdownDuration(time.Second*20),
|
|
)...,
|
|
)).
|
|
Step("publish messages to topic1", publishMessages(nil, sidecarName1, topicActiveName, consumerGroup1)).
|
|
Step("stop pulsar server", dockercompose.Stop(clusterName, p.dockerComposeYAML, p.services...)).
|
|
Step("wait", flow.Sleep(5*time.Second)).
|
|
Step("start pulsar server", dockercompose.Start(clusterName, p.dockerComposeYAML, p.services...)).
|
|
Step("wait", flow.Sleep(30*time.Second)).
|
|
Step("verify if app1 has received messages published to active topic", assertMessages(10*time.Second, consumerGroup1)).
|
|
Step("reset", flow.Reset(consumerGroup1)).
|
|
Run()
|
|
}
|
|
|
|
func (p *pulsarSuite) TestPulsarDelay() {
|
|
t := p.T()
|
|
consumerGroup1 := watcher.NewUnordered()
|
|
|
|
date := time.Now()
|
|
deliverTime := date.Add(time.Second * 60)
|
|
|
|
metadataAfter := map[string]string{
|
|
"deliverAfter": "30s",
|
|
}
|
|
|
|
metadataAt := map[string]string{
|
|
"deliverAt": deliverTime.Format(time.RFC3339Nano),
|
|
}
|
|
|
|
assertMessagesNot := func(timeout time.Duration, messageWatchers ...*watcher.Watcher) flow.Runnable {
|
|
return func(ctx flow.Context) error {
|
|
// assert for messages
|
|
for _, m := range messageWatchers {
|
|
m.AssertNotDelivered(ctx, 5*timeout)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
}
|
|
|
|
flow.New(t, "pulsar certification delay test").
|
|
|
|
// Run subscriberApplication app1
|
|
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort),
|
|
subscriberApplication(appID1, topicActiveName, consumerGroup1))).
|
|
Step(dockercompose.Run(clusterName, p.dockerComposeYAML)).
|
|
Step("wait", flow.Sleep(10*time.Second)).
|
|
Step("wait for pulsar readiness", retry.Do(10*time.Second, 30, func(ctx flow.Context) error {
|
|
client, err := p.client(t)
|
|
if err != nil {
|
|
return fmt.Errorf("could not create pulsar client: %v", err)
|
|
}
|
|
|
|
defer client.Close()
|
|
|
|
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
|
|
Topic: "topic-1",
|
|
SubscriptionName: "my-sub",
|
|
Type: pulsar.Shared,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("could not create pulsar Topic: %v", err)
|
|
}
|
|
defer consumer.Close()
|
|
|
|
// Ensure the brokers are ready by attempting to consume
|
|
// a topic partition.
|
|
return err
|
|
})).
|
|
Step(sidecar.Run(sidecarName1,
|
|
append(componentRuntimeOptions(),
|
|
embedded.WithComponentsPath(filepath.Join(p.componentsPath, "consumer_three")),
|
|
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort)),
|
|
embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort)),
|
|
embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort)),
|
|
)...,
|
|
)).
|
|
Step("publish messages to topic1", publishMessages(metadataAfter, sidecarName1, topicActiveName, consumerGroup1)).
|
|
// receive no messages due to deliverAfter delay
|
|
Step("verify if app1 has received no messages published to topic", assertMessagesNot(1*time.Second, consumerGroup1)).
|
|
// delay has passed, messages should be received
|
|
Step("verify if app1 has received messages published to topic", assertMessages(10*time.Second, consumerGroup1)).
|
|
Step("reset", flow.Reset(consumerGroup1)).
|
|
// publish messages using deliverAt property
|
|
Step("publish messages to topic1", publishMessages(metadataAt, sidecarName1, topicActiveName, consumerGroup1)).
|
|
Step("verify if app1 has received messages published to topic", assertMessages(10*time.Second, consumerGroup1)).
|
|
Run()
|
|
}
|
|
|
|
type schemaTest struct {
|
|
ID int `json:"id"`
|
|
Name string `json:"name"`
|
|
}
|
|
|
|
func (p *pulsarSuite) TestPulsarSchema() {
|
|
t := p.T()
|
|
consumerGroup1 := watcher.NewUnordered()
|
|
|
|
publishMessages := func(sidecarName string, topicName string, messageWatchers ...*watcher.Watcher) flow.Runnable {
|
|
return func(ctx flow.Context) error {
|
|
// prepare the messages
|
|
messages := make([]string, numMessages)
|
|
for i := range messages {
|
|
test := &schemaTest{
|
|
ID: i,
|
|
Name: uuid.New().String(),
|
|
}
|
|
|
|
b, _ := json.Marshal(test)
|
|
messages[i] = string(b)
|
|
}
|
|
|
|
for _, messageWatcher := range messageWatchers {
|
|
messageWatcher.ExpectStrings(messages...)
|
|
}
|
|
|
|
// get the sidecar (dapr) client
|
|
client := sidecar.GetClient(ctx, sidecarName)
|
|
|
|
// publish messages
|
|
ctx.Logf("Publishing messages. sidecarName: %s, topicName: %s", sidecarName, topicName)
|
|
|
|
for _, message := range messages {
|
|
ctx.Logf("Publishing: %q", message)
|
|
|
|
err := client.PublishEvent(ctx, pubsubName, topicName, message)
|
|
require.NoError(ctx, err, "error publishing message")
|
|
}
|
|
return nil
|
|
}
|
|
}
|
|
|
|
flow.New(t, "pulsar certification schema test").
|
|
|
|
// Run subscriberApplication app1
|
|
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort),
|
|
subscriberSchemaApplication(appID1, topicActiveName, consumerGroup1))).
|
|
Step(dockercompose.Run(clusterName, p.dockerComposeYAML)).
|
|
Step("wait", flow.Sleep(10*time.Second)).
|
|
Step("wait for pulsar readiness", retry.Do(10*time.Second, 30, func(ctx flow.Context) error {
|
|
client, err := p.client(t)
|
|
if err != nil {
|
|
return fmt.Errorf("could not create pulsar client: %v", err)
|
|
}
|
|
|
|
defer client.Close()
|
|
|
|
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
|
|
Topic: "topic-1",
|
|
SubscriptionName: "my-sub",
|
|
Type: pulsar.Shared,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("could not create pulsar Topic: %v", err)
|
|
}
|
|
defer consumer.Close()
|
|
|
|
return err
|
|
})).
|
|
Step(sidecar.Run(sidecarName1,
|
|
append(componentRuntimeOptions(),
|
|
embedded.WithComponentsPath(filepath.Join(p.componentsPath, "consumer_four")),
|
|
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort)),
|
|
embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort)),
|
|
embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort)),
|
|
)...,
|
|
)).
|
|
Step("publish messages to topic1", publishMessages(sidecarName1, topicActiveName, consumerGroup1)).
|
|
Step("verify if app1 has received messages published to topic", assertMessages(10*time.Second, consumerGroup1)).
|
|
Run()
|
|
}
|
|
|
|
func componentRuntimeOptions() []embedded.Option {
|
|
log := logger.NewLogger("dapr.components")
|
|
|
|
pubsubRegistry := pubsub_loader.NewRegistry()
|
|
pubsubRegistry.Logger = log
|
|
pubsubRegistry.RegisterComponent(pubsub_pulsar.NewPulsar, "pulsar")
|
|
|
|
return []embedded.Option{
|
|
embedded.WithPubSubs(pubsubRegistry),
|
|
}
|
|
}
|
|
|
|
func (p *pulsarSuite) createMultiPartitionTopic(tenant, namespace, topic string, partition int) flow.Runnable {
|
|
return func(ctx flow.Context) error {
|
|
reqURL := fmt.Sprintf("http://localhost:8080/admin/v2/persistent/%s/%s/%s/partitions",
|
|
tenant, namespace, topic)
|
|
|
|
reqBody, err := json.Marshal(partition)
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("createMultiPartitionTopic json.Marshal(%d) err: %s", partition, err.Error())
|
|
}
|
|
|
|
req, err := http.NewRequest(http.MethodPut, reqURL, bytes.NewBuffer(reqBody))
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("createMultiPartitionTopic NewRequest(url: %s, body: %s) err:%s",
|
|
reqURL, reqBody, err.Error())
|
|
}
|
|
|
|
req.Header.Set("Content-Type", "application/json")
|
|
|
|
if p.authType == "oauth2" {
|
|
cc, err := p.oauth2ClientCredentials()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
token, err := cc.Token()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token))
|
|
}
|
|
|
|
rsp, err := http.DefaultClient.Do(req)
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("createMultiPartitionTopic(url: %s, body: %s) err:%s",
|
|
reqURL, reqBody, err.Error())
|
|
}
|
|
|
|
defer rsp.Body.Close()
|
|
|
|
if rsp.StatusCode >= http.StatusOK && rsp.StatusCode <= http.StatusMultipleChoices {
|
|
return nil
|
|
}
|
|
|
|
rspBody, _ := ioutil.ReadAll(rsp.Body)
|
|
|
|
return fmt.Errorf("createMultiPartitionTopic(url: %s, body: %s) statusCode: %d, resBody: %s",
|
|
reqURL, reqBody, rsp.StatusCode, string(rspBody))
|
|
}
|
|
}
|
|
|
|
func (p *pulsarSuite) TestPulsarPartitionedOrderingProcess() {
|
|
t := p.T()
|
|
consumerGroup1 := watcher.NewOrdered()
|
|
|
|
// Set the partition key on all messages so they are written to the same partition. This allows for checking of ordered messages.
|
|
metadata := map[string]string{
|
|
messageKey: partition0,
|
|
}
|
|
|
|
flow.New(t, "pulsar certification - process message in order with partitioned-topic").
|
|
Step(dockercompose.Run(clusterName, p.dockerComposeYAML)).
|
|
|
|
// Run subscriberApplication app1
|
|
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort+portOffset),
|
|
subscriberApplicationWithoutError(appID1, topicMultiPartitionName, consumerGroup1))).
|
|
Step("wait", flow.Sleep(10*time.Second)).
|
|
Step("wait for pulsar readiness", retry.Do(10*time.Second, 30, func(ctx flow.Context) error {
|
|
client, err := p.client(t)
|
|
if err != nil {
|
|
return fmt.Errorf("could not create pulsar client: %v", err)
|
|
}
|
|
|
|
defer client.Close()
|
|
|
|
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
|
|
Topic: "topic-1",
|
|
SubscriptionName: "my-sub",
|
|
Type: pulsar.Shared,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("could not create pulsar Topic: %v", err)
|
|
}
|
|
|
|
defer consumer.Close()
|
|
|
|
// Ensure the brokers are ready by attempting to consume
|
|
// a topic partition.
|
|
return err
|
|
})).
|
|
Step("create multi-partition topic explicitly", retry.Do(10*time.Second, 30,
|
|
p.createMultiPartitionTopic("public", "default", topicMultiPartitionName, 4))).
|
|
// Run the Dapr sidecar with the component entitymanagement
|
|
Step(sidecar.Run(sidecarName1,
|
|
append(componentRuntimeOptions(),
|
|
embedded.WithComponentsPath(filepath.Join(p.componentsPath, "consumer_one")),
|
|
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort+portOffset)),
|
|
embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort+portOffset)),
|
|
embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort+portOffset)),
|
|
embedded.WithProfilePort(strconv.Itoa(runtime.DefaultProfilePort+portOffset)),
|
|
)...,
|
|
)).
|
|
// Run subscriberApplication app2
|
|
Step(app.Run(appID2, fmt.Sprintf(":%d", appPort+portOffset*3),
|
|
subscriberApplicationWithoutError(appID2, topicActiveName, consumerGroup1))).
|
|
|
|
// Run the Dapr sidecar with the component 2.
|
|
Step(sidecar.Run(sidecarName2,
|
|
append(componentRuntimeOptions(),
|
|
embedded.WithComponentsPath(filepath.Join(p.componentsPath, "consumer_two")),
|
|
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort+portOffset*3)),
|
|
embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort+portOffset*3)),
|
|
embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort+portOffset*3)),
|
|
embedded.WithProfilePort(strconv.Itoa(runtime.DefaultProfilePort+portOffset*3)),
|
|
)...,
|
|
)).
|
|
Step(fmt.Sprintf("publish messages to topicToBeCreated: %s", topicMultiPartitionName), publishMessages(metadata, sidecarName1, topicMultiPartitionName, consumerGroup1)).
|
|
Step("wait", flow.Sleep(30*time.Second)).
|
|
Step("verify if app1 has received messages published to newly created topic", assertMessages(10*time.Second, consumerGroup1)).
|
|
Step("reset", flow.Reset(consumerGroup1)).
|
|
Run()
|
|
}
|
|
|
|
func (p *pulsarSuite) TestPulsarEncryptionFromFile() {
|
|
t := p.T()
|
|
consumerGroup1 := watcher.NewUnordered()
|
|
|
|
publishMessages := func(sidecarName string, topicName string, messageWatchers ...*watcher.Watcher) flow.Runnable {
|
|
return func(ctx flow.Context) error {
|
|
// prepare the messages
|
|
messages := make([]string, numMessages)
|
|
for i := range messages {
|
|
test := &schemaTest{
|
|
ID: i,
|
|
Name: uuid.New().String(),
|
|
}
|
|
|
|
b, _ := json.Marshal(test)
|
|
messages[i] = string(b)
|
|
}
|
|
|
|
for _, messageWatcher := range messageWatchers {
|
|
messageWatcher.ExpectStrings(messages...)
|
|
}
|
|
|
|
// get the sidecar (dapr) client
|
|
client := sidecar.GetClient(ctx, sidecarName)
|
|
|
|
// publish messages
|
|
ctx.Logf("Publishing messages. sidecarName: %s, topicName: %s", sidecarName, topicName)
|
|
|
|
for _, message := range messages {
|
|
ctx.Logf("Publishing: %q", message)
|
|
|
|
err := client.PublishEvent(ctx, pubsubName, topicName, message)
|
|
require.NoError(ctx, err, "error publishing message")
|
|
}
|
|
return nil
|
|
}
|
|
}
|
|
|
|
flow.New(t, "pulsar encryption test with file path").
|
|
|
|
// Run subscriberApplication app1
|
|
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort),
|
|
subscriberSchemaApplication(appID1, topicActiveName, consumerGroup1))).
|
|
Step(dockercompose.Run(clusterName, p.dockerComposeYAML)).
|
|
Step("wait", flow.Sleep(10*time.Second)).
|
|
Step("wait for pulsar readiness", retry.Do(10*time.Second, 30, func(ctx flow.Context) error {
|
|
client, err := p.client(t)
|
|
if err != nil {
|
|
return fmt.Errorf("could not create pulsar client: %v", err)
|
|
}
|
|
|
|
defer client.Close()
|
|
|
|
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
|
|
Topic: "topic-1",
|
|
SubscriptionName: "my-sub",
|
|
Type: pulsar.Shared,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("could not create pulsar Topic: %v", err)
|
|
}
|
|
defer consumer.Close()
|
|
|
|
return err
|
|
})).
|
|
Step(sidecar.Run(sidecarName1,
|
|
append(componentRuntimeOptions(),
|
|
embedded.WithComponentsPath(filepath.Join(p.componentsPath, "consumer_five")),
|
|
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort)),
|
|
embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort)),
|
|
embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort)),
|
|
)...,
|
|
)).
|
|
Step("publish messages to topic1", publishMessages(sidecarName1, topicActiveName, consumerGroup1)).
|
|
Step("verify if app1 has received messages published to topic", assertMessages(10*time.Second, consumerGroup1)).
|
|
Step("reset", flow.Reset(consumerGroup1)).
|
|
Run()
|
|
}
|
|
|
|
func (p *pulsarSuite) TestPulsarEncryptionFromData() {
|
|
t := p.T()
|
|
consumerGroup1 := watcher.NewUnordered()
|
|
|
|
publishMessages := func(sidecarName string, topicName string, messageWatchers ...*watcher.Watcher) flow.Runnable {
|
|
return func(ctx flow.Context) error {
|
|
// prepare the messages
|
|
messages := make([]string, numMessages)
|
|
for i := range messages {
|
|
test := &schemaTest{
|
|
ID: i,
|
|
Name: uuid.New().String(),
|
|
}
|
|
|
|
b, _ := json.Marshal(test)
|
|
messages[i] = string(b)
|
|
}
|
|
|
|
for _, messageWatcher := range messageWatchers {
|
|
messageWatcher.ExpectStrings(messages...)
|
|
}
|
|
|
|
// get the sidecar (dapr) client
|
|
client := sidecar.GetClient(ctx, sidecarName)
|
|
|
|
// publish messages
|
|
ctx.Logf("Publishing messages. sidecarName: %s, topicName: %s", sidecarName, topicName)
|
|
|
|
for _, message := range messages {
|
|
ctx.Logf("Publishing: %q", message)
|
|
|
|
err := client.PublishEvent(ctx, pubsubName, topicName, message)
|
|
require.NoError(ctx, err, "error publishing message")
|
|
}
|
|
return nil
|
|
}
|
|
}
|
|
|
|
flow.New(t, "pulsar encryption test with data").
|
|
|
|
// Run subscriberApplication app2
|
|
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort),
|
|
subscriberSchemaApplication(appID1, topicActiveName, consumerGroup1))).
|
|
Step(dockercompose.Run(clusterName, p.dockerComposeYAML)).
|
|
Step("wait", flow.Sleep(10*time.Second)).
|
|
Step("wait for pulsar readiness", retry.Do(10*time.Second, 30, func(ctx flow.Context) error {
|
|
client, err := p.client(t)
|
|
if err != nil {
|
|
return fmt.Errorf("could not create pulsar client: %v", err)
|
|
}
|
|
|
|
defer client.Close()
|
|
|
|
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
|
|
Topic: "topic-1",
|
|
SubscriptionName: "my-sub",
|
|
Type: pulsar.Shared,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("could not create pulsar Topic: %v", err)
|
|
}
|
|
defer consumer.Close()
|
|
|
|
return err
|
|
})).
|
|
Step(sidecar.Run(sidecarName1,
|
|
append(componentRuntimeOptions(),
|
|
embedded.WithComponentsPath(filepath.Join(p.componentsPath, "consumer_six")),
|
|
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort)),
|
|
embedded.WithDaprGRPCPort(strconv.Itoa(runtime.DefaultDaprAPIGRPCPort)),
|
|
embedded.WithDaprHTTPPort(strconv.Itoa(runtime.DefaultDaprHTTPPort)),
|
|
)...,
|
|
)).
|
|
Step("publish messages to topic1", publishMessages(sidecarName1, topicActiveName, consumerGroup1)).
|
|
Step("verify if app1 has received messages published to topic", assertMessages(10*time.Second, consumerGroup1)).
|
|
Step("reset", flow.Reset(consumerGroup1)).
|
|
Run()
|
|
}
|
|
|
|
func (p *pulsarSuite) client(t *testing.T) (pulsar.Client, error) {
|
|
t.Helper()
|
|
|
|
opts := pulsar.ClientOptions{
|
|
URL: "pulsar://localhost:6650",
|
|
}
|
|
switch p.authType {
|
|
case "oauth2":
|
|
cc, err := p.oauth2ClientCredentials()
|
|
require.NoError(t, err)
|
|
opts.Authentication = pulsar.NewAuthenticationTokenFromSupplier(cc.Token)
|
|
default:
|
|
}
|
|
|
|
return pulsar.NewClient(opts)
|
|
}
|
|
|
|
func (p *pulsarSuite) oauth2ClientCredentials() (*oauth2.ClientCredentials, error) {
|
|
cc, err := oauth2.NewClientCredentials(context.Background(), oauth2.ClientCredentialsOptions{
|
|
Logger: logger.NewLogger("dapr.test.readiness"),
|
|
TokenURL: "https://localhost:8085/issuer1/token",
|
|
ClientID: "foo",
|
|
ClientSecret: "bar",
|
|
Scopes: []string{"openid"},
|
|
Audiences: []string{"pulsar"},
|
|
CAPEM: p.oauth2CAPEM,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return cc, nil
|
|
}
|
|
|
|
func peerCertificate(t *testing.T, hostport string) []byte {
|
|
conf := &tls.Config{InsecureSkipVerify: true}
|
|
|
|
for {
|
|
time.Sleep(1 * time.Second)
|
|
|
|
conn, err := tls.Dial("tcp", hostport, conf)
|
|
if err != nil {
|
|
t.Log(err)
|
|
continue
|
|
}
|
|
|
|
defer conn.Close()
|
|
|
|
certs := conn.ConnectionState().PeerCertificates
|
|
require.Len(t, certs, 1, "expected 1 peer certificate")
|
|
return pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: certs[0].Raw})
|
|
}
|
|
}
|