Add Apache Pulsar e2e encryption (#2812)

Signed-off-by: yaron2 <schneider.yaron@live.com>
Co-authored-by: Deepanshu Agarwal <deepanshu.agarwal1984@gmail.com>
This commit is contained in:
Yaron Schneider 2023-04-28 11:19:15 -07:00 committed by GitHub
parent 181f3ad039
commit a1bf2ea00a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 448 additions and 0 deletions

View File

@ -0,0 +1,46 @@
/*
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pulsar
import (
"github.com/apache/pulsar-client-go/pulsar/crypto"
)
// DataKeyReader is a custom implementation of KeyReader
type DataKeyReader struct {
publicKey string
privateKey string
}
// NewDataKeyReader returns a new instance of DataKeyReader
func NewDataKeyReader(publicKey, privateKey string) *DataKeyReader {
return &DataKeyReader{
publicKey: publicKey,
privateKey: privateKey,
}
}
// PublicKey read public key from string
func (d *DataKeyReader) PublicKey(keyName string, keyMeta map[string]string) (*crypto.EncryptionKeyInfo, error) {
return readKey(keyName, d.publicKey, keyMeta)
}
// PrivateKey read private key from string
func (d *DataKeyReader) PrivateKey(keyName string, keyMeta map[string]string) (*crypto.EncryptionKeyInfo, error) {
return readKey(keyName, d.privateKey, keyMeta)
}
func readKey(keyName, key string, keyMeta map[string]string) (*crypto.EncryptionKeyInfo, error) {
return crypto.NewEncryptionKeyInfo(keyName, []byte(key), keyMeta), nil
}

View File

@ -0,0 +1,32 @@
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pulsar
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestDataKeyReader(t *testing.T) {
kr := NewDataKeyReader("111", "222")
pk, err := kr.PublicKey("1", map[string]string{})
assert.Nil(t, err)
pr, err := kr.PrivateKey("2", map[string]string{})
assert.Nil(t, err)
assert.NotNil(t, pk)
assert.NotNil(t, pr)
}

View File

@ -29,6 +29,9 @@ type pulsarMetadata struct {
Token string `mapstructure:"token"`
RedeliveryDelay time.Duration `mapstructure:"redeliveryDelay"`
internalTopicSchemas map[string]schemaMetadata `mapstructure:"-"`
PublicKey string `mapstructure:"publicKey"`
PrivateKey string `mapstructure:"privateKey"`
Keys string `mapstructure:"keys"`
}
type schemaMetadata struct {

View File

@ -16,6 +16,7 @@ package pulsar
import (
"context"
"encoding/json"
"encoding/pem"
"errors"
"fmt"
"reflect"
@ -27,6 +28,7 @@ import (
"github.com/hamba/avro/v2"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/apache/pulsar-client-go/pulsar/crypto"
lru "github.com/hashicorp/golang-lru/v2"
"github.com/dapr/components-contrib/metadata"
@ -189,6 +191,14 @@ func (p *Pulsar) Init(_ context.Context, metadata pubsub.Metadata) error {
return nil
}
func (p *Pulsar) useProducerEncryption() bool {
return p.metadata.PublicKey != "" && p.metadata.Keys != ""
}
func (p *Pulsar) useConsumerEncryption() bool {
return p.metadata.PublicKey != "" && p.metadata.PrivateKey != ""
}
func (p *Pulsar) Publish(ctx context.Context, req *pubsub.PublishRequest) error {
if p.closed.Load() {
return errors.New("component is closed")
@ -217,6 +227,20 @@ func (p *Pulsar) Publish(ctx context.Context, req *pubsub.PublishRequest) error
opts.Schema = getPulsarSchema(sm)
}
if p.useProducerEncryption() {
var reader crypto.KeyReader
if isValidPEM(p.metadata.PublicKey) {
reader = NewDataKeyReader(p.metadata.PublicKey, "")
} else {
reader = crypto.NewFileKeyReader(p.metadata.PublicKey, "")
}
opts.Encryption = &pulsar.ProducerEncryptionInfo{
KeyReader: reader,
Keys: strings.Split(p.metadata.Keys, ","),
}
}
producer, err = p.client.CreateProducer(opts)
if err != nil {
return err
@ -349,6 +373,19 @@ func (p *Pulsar) Subscribe(ctx context.Context, req pubsub.SubscribeRequest, han
NackRedeliveryDelay: p.metadata.RedeliveryDelay,
}
if p.useConsumerEncryption() {
var reader crypto.KeyReader
if isValidPEM(p.metadata.PublicKey) {
reader = NewDataKeyReader(p.metadata.PublicKey, p.metadata.PrivateKey)
} else {
reader = crypto.NewFileKeyReader(p.metadata.PublicKey, p.metadata.PrivateKey)
}
options.Decryption = &pulsar.MessageDecryptionInfo{
KeyReader: reader,
}
}
if sm, ok := p.metadata.internalTopicSchemas[req.Topic]; ok {
options.Schema = getPulsarSchema(sm)
}
@ -465,3 +502,9 @@ func (p *Pulsar) GetComponentMetadata() map[string]string {
metadata.GetMetadataInfoFromStructType(reflect.TypeOf(metadataStruct), &metadataInfo, metadata.PubSubType)
return metadataInfo
}
// isValidPEM validates the provided input has PEM formatted block.
func isValidPEM(val string) bool {
block, _ := pem.Decode([]byte(val))
return block != nil
}

View File

@ -208,3 +208,83 @@ func TestValidTenantAndNS(t *testing.T) {
assert.Equal(t, expectNonPersistentResult, res)
})
}
func TestEncryptionKeys(t *testing.T) {
m := pubsub.Metadata{}
m.Properties = map[string]string{"host": "a", "privateKey": "111", "publicKey": "222", "keys": "a,b"}
t.Run("test encryption metadata", func(t *testing.T) {
meta, err := parsePulsarMetadata(m)
assert.Nil(t, err)
assert.Equal(t, "111", meta.PrivateKey)
assert.Equal(t, "222", meta.PublicKey)
assert.Equal(t, "a,b", meta.Keys)
})
t.Run("test valid producer encryption", func(t *testing.T) {
m := pubsub.Metadata{}
m.Properties = map[string]string{"host": "a", "publicKey": "222", "keys": "a,b"}
meta, _ := parsePulsarMetadata(m)
p := &Pulsar{metadata: *meta}
r := p.useProducerEncryption()
assert.True(t, r)
})
t.Run("test invalid producer encryption missing public key", func(t *testing.T) {
m := pubsub.Metadata{}
m.Properties = map[string]string{"host": "a", "keys": "a,b"}
meta, _ := parsePulsarMetadata(m)
p := &Pulsar{metadata: *meta}
r := p.useProducerEncryption()
assert.False(t, r)
})
t.Run("test invalid producer encryption missing keys", func(t *testing.T) {
m := pubsub.Metadata{}
m.Properties = map[string]string{"host": "a", "publicKey": "222"}
meta, _ := parsePulsarMetadata(m)
p := &Pulsar{metadata: *meta}
r := p.useProducerEncryption()
assert.False(t, r)
})
t.Run("test valid consumer encryption", func(t *testing.T) {
m := pubsub.Metadata{}
m.Properties = map[string]string{"host": "a", "privateKey": "222", "publicKey": "333"}
meta, _ := parsePulsarMetadata(m)
p := &Pulsar{metadata: *meta}
r := p.useConsumerEncryption()
assert.True(t, r)
})
t.Run("test invalid consumer encryption missing public key", func(t *testing.T) {
m := pubsub.Metadata{}
m.Properties = map[string]string{"host": "a", "privateKey": "222"}
meta, _ := parsePulsarMetadata(m)
p := &Pulsar{metadata: *meta}
r := p.useConsumerEncryption()
assert.False(t, r)
})
t.Run("test invalid producer encryption missing private key", func(t *testing.T) {
m := pubsub.Metadata{}
m.Properties = map[string]string{"host": "a", "privateKey": "222"}
meta, _ := parsePulsarMetadata(m)
p := &Pulsar{metadata: *meta}
r := p.useConsumerEncryption()
assert.False(t, r)
})
}

View File

@ -0,0 +1,20 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: messagebus
spec:
type: pubsub.pulsar
version: v1
metadata:
- name: host
value: "localhost:6650"
- name: consumerID
value: certification5
- name: redeliveryDelay
value: 200ms
- name: publicKey
value: public.key
- name: privateKey
value: private.key
- name: keys
value: myapp.key

View File

@ -0,0 +1,20 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: messagebus
spec:
type: pubsub.pulsar
version: v1
metadata:
- name: host
value: "localhost:6650"
- name: consumerID
value: certification5
- name: redeliveryDelay
value: 200ms
- name: publicKey
value: "-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA1KDAM4L8RtJ+nLaXBrBh\nzVpvTemsKVZoAct8A+ShepOHT9lgHOCGLFGWNla6K6j+b3AV/P/fAAhwj82vwTDd\nruXSflvSdmYeFAw3Ypphc1A5oM53wSRWhg63potBNWqdDzj8ApYgqjpmjYSQdL5/\na3golb36GYFrY0MLFTv7wZ87pmMIPsOgGIcPbCHker2fRZ34WXYLb1hkeUpwx4eK\njpwcg35gccvR6o/UhbKAuc60V1J9Wof2sNgtlRaQej45wnpjWYzZrIyk5qUbn0Qi\nCdpIrXvYtANq0Id6gP8zJvUEdPIgNuYxEmVCl9jI+8eGI6peD0qIt8U80hf9axhJ\n3QIDAQAB\n-----END PUBLIC KEY-----\n"
- name: privateKey
value: "-----BEGIN RSA PRIVATE KEY-----\nMIIEpAIBAAKCAQEA1KDAM4L8RtJ+nLaXBrBhzVpvTemsKVZoAct8A+ShepOHT9lg\nHOCGLFGWNla6K6j+b3AV/P/fAAhwj82vwTDdruXSflvSdmYeFAw3Ypphc1A5oM53\nwSRWhg63potBNWqdDzj8ApYgqjpmjYSQdL5/a3golb36GYFrY0MLFTv7wZ87pmMI\nPsOgGIcPbCHker2fRZ34WXYLb1hkeUpwx4eKjpwcg35gccvR6o/UhbKAuc60V1J9\nWof2sNgtlRaQej45wnpjWYzZrIyk5qUbn0QiCdpIrXvYtANq0Id6gP8zJvUEdPIg\nNuYxEmVCl9jI+8eGI6peD0qIt8U80hf9axhJ3QIDAQABAoIBAQCKuHnM4ac/eXM7\nQPDVX1vfgyHc3hgBPCtNCHnXfGFRvFBqavKGxIElBvGOcBS0CWQ+Rg1Ca5kMx3TQ\njSweSYhH5A7pe3Sa5FK5V6MGxJvRhMSkQi/lJZUBjzaIBJA9jln7pXzdHx8ekE16\nBMPONr6g2dr4nuI9o67xKrtfViwRDGaG6eh7jIMlEqMMc6WqyhvI67rlVDSTHFKX\njlMcozJ3IT8BtTzKg2Tpy7ReVuJEpehum8yn1ZVdAnotBDJxI07DC1cbOP4M2fHM\ngfgPYWmchauZuTeTFu4hrlY5jg0/WLs6by8r/81+vX3QTNvejX9UdTHMSIfQdX82\nAfkCKUVhAoGBAOvGv+YXeTlPRcYC642x5iOyLQm+BiSX4jKtnyJiTU2s/qvvKkIu\nxAOk3OtniT9NaUAHEZE9tI71dDN6IgTLQlAcPCzkVh6Sc5eG0MObqOO7WOMCWBkI\nlaAKKBbd6cGDJkwGCJKnx0pxC9f8R4dw3fmXWgWAr8ENiekMuvjSfjZ5AoGBAObd\ns2L5uiUPTtpyh8WZ7rEvrun3djBhzi+d7rgxEGdditeiLQGKyZbDPMSMBuus/5wH\nwfi0xUq50RtYDbzQQdC3T/C20oHmZbjWK5mDaLRVzWS89YG/NT2Q8eZLBstKqxkx\ngoT77zoUDfRy+CWs1xvXzgxagD5Yg8/OrCuXOqWFAoGAPIw3r6ELknoXEvihASxU\nS4pwInZYIYGXpygLG8teyrnIVOMAWSqlT8JAsXtPNaBtjPHDwyazfZrvEmEk51JD\nX0tA8M5ah1NYt+r5JaKNxp3P/8wUT6lyszyoeubWJsnFRfSusuq/NRC+1+KDg/aq\nKnSBu7QGbm9JoT2RrmBv5RECgYBRn8Lj1I1muvHTNDkiuRj2VniOSirkUkA2/6y+\nPMKi+SS0tqcY63v4rNCYYTW1L7Yz8V44U5mJoQb4lvpMbolGhPljjxAAU3hVkItb\nvGVRlSCIZHKczADD4rJUDOS7DYxO3P1bjUN4kkyYx+lKUMDBHFzCa2D6Kgt4dobS\n5qYajQKBgQC7u7MFPkkEMqNqNGu5erytQkBq1v1Ipmf9rCi3iIj4XJLopxMgw0fx\n6jwcwNInl72KzoUBLnGQ9PKGVeBcgEgdI+a+tq+1TJo6Ta+hZSx+4AYiKY18eRKG\neNuER9NOcSVJ7Eqkcw4viCGyYDm2vgNV9HJ0VlAo3RDh8x5spEN+mg==\n-----END RSA PRIVATE KEY-----\n"
- name: keys
value: myapp.key

View File

@ -0,0 +1,39 @@
-----BEGIN RSA PRIVATE KEY-----
MIIG4wIBAAKCAYEA5YeHRDCRtEn8kVi7xVT2YicRByZZUPjzB9qSlQboHvdIpZhW
hoRg+qPYP2RoH7JlmPX/q/RnNjFZ52pG9Tzl3J67Pz3H5j4FoKFZ7OTf55Wk2f83
pe9hgDTfAfIuR1ch2MtfOx1yzoVUENFfLwiIhSg/+6wjY98SbZujM1a/iWpM9IFk
pOgwZZrXtSPoUlNidwAZrYS+eGVL9hyyH9RT86gYh1BgsVH4zjQqCP76lxXrsSyn
lsXfasenpTppVfvyR1NhK1JhQf3dsvCgie0iWcK+OICt0z9WesakKnyAU01BFOmX
APuDS2wErTb2Zw654lo7iEFdRGK1Mmx0gViJ7bXBxKVIw24za/EsCeb+jLFXVilC
/P0T7mI/y6FF0wPkTf8aICJphspdnimEJSXeQinjmx+iFWvG/uHimrswXaAQbP1Y
OoJZx+HFy4b/4/hVLu3aiFM2sgfGkT8gsg3di2zxEIn7cnLaIC4HgcVxQLUVMsBX
FXgU2SWVWPb2+fv9AgMBAAECggGAIohVbYjxIvLOsP9soK+7seC2yyCV53zM862K
yCkV2zLRFzjoK6zW+l6UNlVg5QPuDSlVogVPUfPy1sJCkrrqylqHSWh+HsHODvC3
mtCEb08wUiv3r1toi+Vod9573/fX+8n9NeOkVjXxA+a8L1NVVtNLbQ3k9S2mlB8K
Wrr+bio0EcU14ymbvm2hbntMhLZO5iB5vIVwMqTq3MhMdEV5q/rqVdFd+s+eY0et
21ShVwHRSAKz5Bc5GdOYAV/cYTdaXyR3RJUxy+JT0QW5qJ7FKGa7WIvh2epIdQtf
N79aXQ6ZpQK8aghd6hintV8pJgxD2EcRaT2AAHmuLj1P74YO7WJJnQy/eXhdq2jl
xmTqk9TgFQksvin8QtJyFQzFvxWUKN4z7NAnFlDCM7Yj/k8QnA85kWtyBpBuCFhr
/fapz59Kg5apaAlWKFe9piTezQim8CNgPjdhDjyo7Yyy/rr6XI1ncmC+xKnWpyeH
gXPxDT+7pC1TAU6DC8DRSizh5DGBAoHBAP3JFDSSwwNP5UP9P2EmslhcKR3Ks6rY
XuUotK7yy5sXtkxW2+xGOaKyrCjw4WSXqx0FgGndpejG93bBLM+SwhT/OYZYAqju
f2B9iOFIDshSumpCor19FBn4fnb933RkNS/O9k4iQye6He1L8hleBHMoqDJmFbk2
vESwNLYrpxuQVfjMbWsLoamwRs2gqwYtd4LvSfwtwxLtTozZhbMufRaTb610VDlZ
iALDTvWRpGjDLaSoz+agaSDalGHVmnfmhQKBwQDniEPMKAiQM6Nk1jsWfFfu0E/0
0v55F99naAdJuaG0IkJNtD0WCbjtM25gk82hbcP7XP7Sz+ZaNu8Q16xOpOYvtgn5
Yqd1/5Hh8oQf7lwRPFfpr/PeHjW1f4qCQokPgSut3E8nKKfQKhhRajJH78JBqfPW
Q7A90W4zBC62sTDRqbAzkDq1OKlTu5HyuNGmPqnum8olIgsaOsoBIpsCqfeuFElq
WXCpw8NKOaH8YQP1hd+PhVKrHcNW/EFhhjOqZRkCgcAPcXD9UgDz7qSw4nQ84THx
FoqZ+X+9YbVElJmKG9Qv991r/80aL5vKPr0jMKVGjcQn2/HYf1hdNd5RJ6gmaXPN
+0nw1uIyjXDK2li9/LiJkB8v4CYvCbFzcx+e9gvm9UIXSqzKTGNxw22WxwxQZtw2
db7mcjfYMXB7bY2HmFhu4PWaUjZGUUrhHIzyblh548JmAVGrOs2oFTC2eXYdVTLf
cNFW6MFHTB9uq5vebaJnjZj1cCBWlGRRT3vACFOCAFECgcAlJdzS3b15/X8Cx8iV
NAAbxfp+Kng/z4+9lJhOwOTr9O80bm26ona0QCM+hZhhhS4Dn4kXI9ousU+sIR55
Q8XW89sn0ydRLF8opHOEeAb3kPn9+YgkJC6z3zHG8ovxG+V5MLbWbpR2NrrOHT7S
AermBDGmOBgH4xlOQCaKk2VkzlgB/esddmjckWS6T+L7TGSRbxeA27RyUeplQjsi
s0iU+pZI5O7JniowN40A5EPxWbhj251G7TCRPUn0LscNWMkCgcEA6BxNQ907FfQu
+srCpZcmthxKdm8cLrONT/U9cc5tJBftL/zlSvGnhU+rFEt/nLOYoroa9dhZlM8/
i3BXZofMQ36Q76QIaN41fbzzwpDz5aIFlN/cvASk5Yspdv3Lq1dis0FzAcHFweWU
4jw9m4f9nn81b3QiAjT5aB1Ftqeu9L9r4EZHqAjU+iBN/HpF6W7YPhKKX7VNkX7g
uUZjERpZq5kg7DWWqhcwLc2ztiRrBmT964Q6CnRTzvYmdyaqyQni
-----END RSA PRIVATE KEY-----

View File

@ -0,0 +1,11 @@
-----BEGIN PUBLIC KEY-----
MIIBojANBgkqhkiG9w0BAQEFAAOCAY8AMIIBigKCAYEA5YeHRDCRtEn8kVi7xVT2
YicRByZZUPjzB9qSlQboHvdIpZhWhoRg+qPYP2RoH7JlmPX/q/RnNjFZ52pG9Tzl
3J67Pz3H5j4FoKFZ7OTf55Wk2f83pe9hgDTfAfIuR1ch2MtfOx1yzoVUENFfLwiI
hSg/+6wjY98SbZujM1a/iWpM9IFkpOgwZZrXtSPoUlNidwAZrYS+eGVL9hyyH9RT
86gYh1BgsVH4zjQqCP76lxXrsSynlsXfasenpTppVfvyR1NhK1JhQf3dsvCgie0i
WcK+OICt0z9WesakKnyAU01BFOmXAPuDS2wErTb2Zw654lo7iEFdRGK1Mmx0gViJ
7bXBxKVIw24za/EsCeb+jLFXVilC/P0T7mI/y6FF0wPkTf8aICJphspdnimEJSXe
Qinjmx+iFWvG/uHimrswXaAQbP1YOoJZx+HFy4b/4/hVLu3aiFM2sgfGkT8gsg3d
i2zxEIn7cnLaIC4HgcVxQLUVMsBXFXgU2SWVWPb2+fv9AgMBAAE=
-----END PUBLIC KEY-----

View File

@ -926,3 +926,157 @@ func TestPulsarPartitionedOrderingProcess(t *testing.T) {
Step("reset", flow.Reset(consumerGroup1)).
Run()
}
func TestPulsarEncryptionFromFile(t *testing.T) {
consumerGroup1 := watcher.NewUnordered()
publishMessages := func(sidecarName string, topicName string, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// prepare the messages
messages := make([]string, numMessages)
for i := range messages {
test := &schemaTest{
ID: i,
Name: uuid.New().String(),
}
b, _ := json.Marshal(test)
messages[i] = string(b)
}
for _, messageWatcher := range messageWatchers {
messageWatcher.ExpectStrings(messages...)
}
// get the sidecar (dapr) client
client := sidecar.GetClient(ctx, sidecarName)
// publish messages
ctx.Logf("Publishing messages. sidecarName: %s, topicName: %s", sidecarName, topicName)
for _, message := range messages {
ctx.Logf("Publishing: %q", message)
err := client.PublishEvent(ctx, pubsubName, topicName, message)
require.NoError(ctx, err, "error publishing message")
}
return nil
}
}
flow.New(t, "pulsar encryption test with file path").
// Run subscriberApplication app1
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort),
subscriberSchemaApplication(appID1, topicActiveName, consumerGroup1))).
Step(dockercompose.Run(clusterName, dockerComposeYAML)).
Step("wait", flow.Sleep(10*time.Second)).
Step("wait for pulsar readiness", retry.Do(10*time.Second, 30, func(ctx flow.Context) error {
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
if err != nil {
return fmt.Errorf("could not create pulsar client: %v", err)
}
defer client.Close()
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "topic-1",
SubscriptionName: "my-sub",
Type: pulsar.Shared,
})
if err != nil {
return fmt.Errorf("could not create pulsar Topic: %v", err)
}
defer consumer.Close()
return err
})).
Step(sidecar.Run(sidecarName1,
embedded.WithComponentsPath("./components/consumer_five"),
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort),
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort),
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort),
componentRuntimeOptions(),
)).
Step("publish messages to topic1", publishMessages(sidecarName1, topicActiveName, consumerGroup1)).
Step("verify if app1 has received messages published to topic", assertMessages(10*time.Second, consumerGroup1)).
Step("reset", flow.Reset(consumerGroup1)).
Run()
}
func TestPulsarEncryptionFromData(t *testing.T) {
consumerGroup1 := watcher.NewUnordered()
publishMessages := func(sidecarName string, topicName string, messageWatchers ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
// prepare the messages
messages := make([]string, numMessages)
for i := range messages {
test := &schemaTest{
ID: i,
Name: uuid.New().String(),
}
b, _ := json.Marshal(test)
messages[i] = string(b)
}
for _, messageWatcher := range messageWatchers {
messageWatcher.ExpectStrings(messages...)
}
// get the sidecar (dapr) client
client := sidecar.GetClient(ctx, sidecarName)
// publish messages
ctx.Logf("Publishing messages. sidecarName: %s, topicName: %s", sidecarName, topicName)
for _, message := range messages {
ctx.Logf("Publishing: %q", message)
err := client.PublishEvent(ctx, pubsubName, topicName, message)
require.NoError(ctx, err, "error publishing message")
}
return nil
}
}
flow.New(t, "pulsar encryption test with data").
// Run subscriberApplication app2
Step(app.Run(appID1, fmt.Sprintf(":%d", appPort),
subscriberSchemaApplication(appID1, topicActiveName, consumerGroup1))).
Step(dockercompose.Run(clusterName, dockerComposeYAML)).
Step("wait", flow.Sleep(10*time.Second)).
Step("wait for pulsar readiness", retry.Do(10*time.Second, 30, func(ctx flow.Context) error {
client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
if err != nil {
return fmt.Errorf("could not create pulsar client: %v", err)
}
defer client.Close()
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: "topic-1",
SubscriptionName: "my-sub",
Type: pulsar.Shared,
})
if err != nil {
return fmt.Errorf("could not create pulsar Topic: %v", err)
}
defer consumer.Close()
return err
})).
Step(sidecar.Run(sidecarName1,
embedded.WithComponentsPath("./components/consumer_six"),
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort),
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort),
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort),
componentRuntimeOptions(),
)).
Step("publish messages to topic1", publishMessages(sidecarName1, topicActiveName, consumerGroup1)).
Step("verify if app1 has received messages published to topic", assertMessages(10*time.Second, consumerGroup1)).
Step("reset", flow.Reset(consumerGroup1)).
Run()
}