components-contrib/tests/certification/bindings/rabbitmq/rabbitmq_test.go

633 lines
22 KiB
Go

/*
Copyright 2022 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package rabbitmq_test
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"log"
"os"
"strconv"
"testing"
"time"
"github.com/dapr/components-contrib/tests/certification/flow/network"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/stretchr/testify/require"
"go.uber.org/multierr"
"github.com/dapr/components-contrib/bindings"
binding_rabbitmq "github.com/dapr/components-contrib/bindings/rabbitmq"
bindings_loader "github.com/dapr/dapr/pkg/components/bindings"
"github.com/dapr/dapr/pkg/config/protocol"
dapr_testing "github.com/dapr/dapr/pkg/testing"
daprClient "github.com/dapr/go-sdk/client"
"github.com/dapr/go-sdk/service/common"
"github.com/dapr/kit/logger"
"github.com/dapr/components-contrib/tests/certification/embedded"
"github.com/dapr/components-contrib/tests/certification/flow"
"github.com/dapr/components-contrib/tests/certification/flow/app"
"github.com/dapr/components-contrib/tests/certification/flow/dockercompose"
"github.com/dapr/components-contrib/tests/certification/flow/retry"
"github.com/dapr/components-contrib/tests/certification/flow/sidecar"
"github.com/dapr/components-contrib/tests/certification/flow/simulate"
"github.com/dapr/components-contrib/tests/certification/flow/watcher"
)
const (
rabbitMQURL = "amqp://test:test@localhost:5672"
rabbitMQURLExtAuth = "amqps://localhost:5671"
clusterName = "rabbitmqcertification"
dockerComposeYAML = "docker-compose.yml"
extSaslDockerComposeYAML = "mtls_sasl_external/docker-compose.yml"
numOfMessages = 10
sidecarName1 = "dapr-1"
sidecarName2 = "dapr-2"
)
func amqpReady(url string) flow.Runnable {
return func(ctx flow.Context) error {
conn, err := amqp.Dial(url)
if err != nil {
return err
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
return err
}
defer ch.Close()
return nil
}
}
func TestRabbitMQ(t *testing.T) {
messages := watcher.NewUnordered()
ports, _ := dapr_testing.GetFreePorts(3)
grpcPort := ports[0]
httpPort := ports[1]
appPort := ports[2]
test := func(ctx flow.Context) error {
client, err := daprClient.NewClientWithPort(fmt.Sprintf("%d", grpcPort))
require.NoError(t, err, "Could not initialize dapr client.")
// Declare the expected data.
msgs := make([]string, numOfMessages)
for i := 0; i < numOfMessages; i++ {
msgs[i] = fmt.Sprintf("standard-binding: Message %03d", i)
}
messages.ExpectStrings(msgs...)
metadata := make(map[string]string)
ctx.Log("Invoking binding!")
for _, msg := range msgs {
ctx.Logf("Sending: %q", msg)
req := &daprClient.InvokeBindingRequest{Name: "standard-binding", Operation: "create", Data: []byte(msg), Metadata: metadata}
err := client.InvokeOutputBinding(ctx, req)
require.NoError(ctx, err, "error publishing message")
}
// Assertion on the data.
messages.Assert(ctx, time.Minute)
return nil
}
application := func(ctx flow.Context, s common.Service) (err error) {
// Setup the input binding endpoints.
err = multierr.Combine(err,
s.AddBindingInvocationHandler("standard-binding", func(_ context.Context, in *common.BindingEvent) ([]byte, error) {
messages.Observe(string(in.Data))
ctx.Logf("Got message: %s", string(in.Data))
return []byte("{}"), nil
}))
return err
}
flow.New(t, "rabbitmq certification").
// Run the application logic above.
Step(dockercompose.Run(clusterName, dockerComposeYAML)).
Step("wait for rabbitmq readiness",
retry.Do(time.Second, 30, amqpReady(rabbitMQURL))).
Step(app.Run("standardApp", fmt.Sprintf(":%d", appPort), application)).
Step(sidecar.Run("standardSidecar",
append(componentRuntimeOptions(),
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort)),
embedded.WithDaprGRPCPort(strconv.Itoa(grpcPort)),
embedded.WithDaprHTTPPort(strconv.Itoa(httpPort)),
embedded.WithComponentsPath("./components/standard"),
)...,
)).
Step("send and wait", test).
Run()
}
func TestRabbitMQForOptions(t *testing.T) {
messages := watcher.NewUnordered()
ports, _ := dapr_testing.GetFreePorts(3)
grpcPort := ports[0]
httpPort := ports[1]
appPort := ports[2]
test := func(ctx flow.Context) error {
client, err := daprClient.NewClientWithPort(fmt.Sprintf("%d", grpcPort))
require.NoError(t, err, "Could not initialize dapr client.")
// Declare the expected data.
msgs := make([]string, numOfMessages)
for i := 0; i < numOfMessages; i++ {
msgs[i] = fmt.Sprintf("options-binding: Message %03d", i)
}
messages.ExpectStrings(msgs...)
metadata := make(map[string]string)
ctx.Log("Invoking binding!")
for _, msg := range msgs {
ctx.Logf("Sending: %q", msg)
req := &daprClient.InvokeBindingRequest{Name: "options-binding", Operation: "create", Data: []byte(msg), Metadata: metadata}
err := client.InvokeOutputBinding(ctx, req)
require.NoError(ctx, err, "error publishing message")
}
// Assertion on the data.
messages.Assert(ctx, time.Minute)
return nil
}
application := func(ctx flow.Context, s common.Service) (err error) {
// Setup the input binding endpoints.
err = multierr.Combine(err,
s.AddBindingInvocationHandler("options-binding", func(_ context.Context, in *common.BindingEvent) ([]byte, error) {
messages.Observe(string(in.Data))
ctx.Logf("Got message: %s", string(in.Data))
return []byte("{}"), nil
}))
return err
}
flow.New(t, "rabbitmq options certification").
// Run the application logic above.
Step(dockercompose.Run(clusterName, dockerComposeYAML)).
Step("wait for rabbitmq readiness",
retry.Do(time.Second, 30, amqpReady(rabbitMQURL))).
Step(app.Run("optionsApp", fmt.Sprintf(":%d", appPort), application)).
Step(sidecar.Run("optionsSidecar",
append(componentRuntimeOptions(),
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort)),
embedded.WithDaprGRPCPort(strconv.Itoa(grpcPort)),
embedded.WithDaprHTTPPort(strconv.Itoa(httpPort)),
embedded.WithComponentsPath("./components/options"),
)...,
)).
Step("send and wait", test).
Run()
}
func TestRabbitMQTTLs(t *testing.T) {
ttlMessages := watcher.NewUnordered()
ports, _ := dapr_testing.GetFreePorts(3)
grpcPort := ports[0]
httpPort := ports[1]
appPort := ports[2]
ttlTest := func(ctx flow.Context) error {
client, err := daprClient.NewClientWithPort(fmt.Sprintf("%d", grpcPort))
require.NoError(t, err, "Could not initialize dapr client.")
ctx.Logf("Sending messages for expiration.")
for i := 0; i < numOfMessages; i++ {
msg := fmt.Sprintf("Expiring message %d", i)
metadata := make(map[string]string)
// Send to the queue with TTL.
queueTTLReq := &daprClient.InvokeBindingRequest{Name: "queue-ttl-binding", Operation: "create", Data: []byte(msg), Metadata: metadata}
err := client.InvokeOutputBinding(ctx, queueTTLReq)
require.NoError(ctx, err, "error publishing message")
// Send message with TTL set in yaml file
messageTTLReq := &daprClient.InvokeBindingRequest{Name: "msg-ttl-binding", Operation: "create", Data: []byte(msg), Metadata: metadata}
messageTTLReq.Metadata["ttlInSeconds"] = "20"
err = client.InvokeOutputBinding(ctx, messageTTLReq)
require.NoError(ctx, err, "error publishing message")
// Send message with TTL to ensure it overwrites Queue TTL.
mixedTTLReq := &daprClient.InvokeBindingRequest{Name: "overwrite-ttl-binding", Operation: "create", Data: []byte(msg), Metadata: metadata}
mixedTTLReq.Metadata["ttlInSeconds"] = "10"
err = client.InvokeOutputBinding(ctx, mixedTTLReq)
require.NoError(ctx, err, "error publishing message")
}
// Wait for double the TTL after sending the last message.
time.Sleep(time.Second * 20)
return nil
}
ttlApplication := func(ctx flow.Context, s common.Service) (err error) {
// Setup the input binding endpoints.
err = multierr.Combine(err,
s.AddBindingInvocationHandler("queue-ttl-binding", func(_ context.Context, in *common.BindingEvent) ([]byte, error) {
ctx.Logf("Got message: %s", string(in.Data))
ttlMessages.FailIfNotExpected(t, string(in.Data))
return []byte("{}"), nil
}),
s.AddBindingInvocationHandler("msg-ttl-binding", func(_ context.Context, in *common.BindingEvent) ([]byte, error) {
ctx.Logf("Got message: %s", string(in.Data))
ttlMessages.FailIfNotExpected(t, string(in.Data))
return []byte("{}"), nil
}),
s.AddBindingInvocationHandler("overwrite-ttl-binding", func(_ context.Context, in *common.BindingEvent) ([]byte, error) {
ctx.Logf("Got message: %s", string(in.Data))
ttlMessages.FailIfNotExpected(t, string(in.Data))
return []byte("{}"), nil
}))
return err
}
freshPorts, _ := dapr_testing.GetFreePorts(2)
flow.New(t, "rabbitmq ttl certification").
// Run the application logic above.
Step(dockercompose.Run(clusterName, dockerComposeYAML)).
Step("wait for rabbitmq readiness",
retry.Do(time.Second, 30, amqpReady(rabbitMQURL))).
Step(app.Run("ttlApp", fmt.Sprintf(":%d", appPort), ttlApplication)).
Step(sidecar.Run("ttlSidecar",
append(componentRuntimeOptions(),
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort)),
embedded.WithDaprGRPCPort(strconv.Itoa(grpcPort)),
embedded.WithDaprHTTPPort(strconv.Itoa(httpPort)),
embedded.WithComponentsPath("./components/ttl"),
)...,
)).
Step("send ttl messages", ttlTest).
Step("stop initial sidecar", sidecar.Stop("ttlSidecar")).
Step(app.Run("ttlApp", fmt.Sprintf(":%d", appPort), ttlApplication)).
Step(sidecar.Run("appSidecar",
append(componentRuntimeOptions(),
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort)),
embedded.WithDaprGRPCPort(strconv.Itoa(freshPorts[0])),
embedded.WithDaprHTTPPort(strconv.Itoa(freshPorts[1])),
)...,
)).
Step("verify no messages", func(ctx flow.Context) error {
// Assertion on the data.
ttlMessages.Assert(t, time.Minute)
return nil
}) //.
//Run()
}
func TestRabbitMQRetriesOnError(t *testing.T) {
messages := watcher.NewUnordered()
ports, _ := dapr_testing.GetFreePorts(3)
grpcPort := ports[0]
httpPort := ports[1]
appPort := ports[2]
testRetry := func(ctx flow.Context) error {
client, err := daprClient.NewClientWithPort(fmt.Sprintf("%d", grpcPort))
require.NoError(t, err, "Could not initialize dapr client.")
// Declare the expected data.
msgs := make([]string, numOfMessages)
for i := 0; i < numOfMessages; i++ {
msgs[i] = fmt.Sprintf("Message %03d", i)
}
messages.ExpectStrings(msgs...)
metadata := make(map[string]string)
// Send events that the application above will observe.
ctx.Log("Invoking binding!")
for _, msg := range msgs {
ctx.Logf("Sending: %q", msg)
req := &daprClient.InvokeBindingRequest{Name: "retry-binding", Operation: "create", Data: []byte(msg), Metadata: metadata}
err := client.InvokeOutputBinding(ctx, req)
require.NoError(ctx, err, "error publishing message")
}
// Assertion on the data.
messages.Assert(ctx, time.Minute)
return nil
}
// Application logic that tracks messages from a topic.
retryApplication := func(ctx flow.Context, s common.Service) (err error) {
// Simulate periodic errors.
sim := simulate.PeriodicError(ctx, 10)
// Setup the input binding endpoint.
err = multierr.Combine(err,
s.AddBindingInvocationHandler("retry-binding", func(_ context.Context, in *common.BindingEvent) ([]byte, error) {
if err := sim(); err != nil {
ctx.Logf("Failing message: %s", string(in.Data))
return nil, err
}
messages.Observe(string(in.Data))
ctx.Logf("Got message: %s", string(in.Data))
return []byte("{}"), nil
}))
return err
}
flow.New(t, "rabbitmq retry certification").
// Run the application logic above.
Step(dockercompose.Run(clusterName, dockerComposeYAML)).
Step("wait for rabbitmq readiness",
retry.Do(time.Second, 30, amqpReady(rabbitMQURL))).
Step(app.Run("retryApp", fmt.Sprintf(":%d", appPort), retryApplication)).
Step(sidecar.Run("retrySidecar",
append(componentRuntimeOptions(),
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort)),
embedded.WithDaprGRPCPort(strconv.Itoa(grpcPort)),
embedded.WithDaprHTTPPort(strconv.Itoa(httpPort)),
embedded.WithComponentsPath("./components/retry"),
)...,
)).
Step("send and wait", testRetry).
Run()
}
func TestRabbitMQNetworkError(t *testing.T) {
messages := watcher.NewUnordered()
ports, _ := dapr_testing.GetFreePorts(3)
grpcPort := ports[0]
httpPort := ports[1]
appPort := ports[2]
test := func(ctx flow.Context) error {
client, err := daprClient.NewClientWithPort(fmt.Sprintf("%d", grpcPort))
require.NoError(t, err, "Could not initialize dapr client.")
// Declare the expected data.
msgs := make([]string, numOfMessages)
for i := 0; i < numOfMessages; i++ {
msgs[i] = fmt.Sprintf("standard-binding: Message %03d", i)
}
messages.ExpectStrings(msgs...)
metadata := make(map[string]string)
ctx.Log("Invoking binding!")
for _, msg := range msgs {
ctx.Logf("Sending: %q", msg)
req := &daprClient.InvokeBindingRequest{Name: "standard-binding", Operation: "create", Data: []byte(msg), Metadata: metadata}
err := client.InvokeOutputBinding(ctx, req)
require.NoError(ctx, err, "error publishing message")
}
// Assertion on the data.
messages.Assert(ctx, time.Minute)
return nil
}
application := func(ctx flow.Context, s common.Service) (err error) {
// Setup the input binding endpoints.
err = multierr.Combine(err,
s.AddBindingInvocationHandler("standard-binding", func(_ context.Context, in *common.BindingEvent) ([]byte, error) {
messages.Observe(string(in.Data))
ctx.Logf("Got message: %s", string(in.Data))
return []byte("{}"), nil
}))
return err
}
flow.New(t, "rabbitmq certification").
// Run the application logic above.
Step(dockercompose.Run(clusterName, dockerComposeYAML)).
Step("wait for rabbitmq readiness",
retry.Do(time.Second, 30, amqpReady(rabbitMQURL))).
Step(app.Run("standardApp", fmt.Sprintf(":%d", appPort), application)).
Step(sidecar.Run("standardSidecar",
append(componentRuntimeOptions(),
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort)),
embedded.WithDaprGRPCPort(strconv.Itoa(grpcPort)),
embedded.WithDaprHTTPPort(strconv.Itoa(httpPort)),
embedded.WithComponentsPath("./components/standard"),
)...,
)).
Step("send and wait", test).
Step("interrupt network", network.InterruptNetwork(30*time.Second, nil, nil, "5672")).
Run()
}
func TestRabbitMQExclusive(t *testing.T) {
messages := watcher.NewUnordered()
ports, _ := dapr_testing.GetFreePorts(3)
grpcPort := ports[0]
httpPort := ports[1]
appPort := ports[2]
test := func(ctx flow.Context) error {
client, err := daprClient.NewClientWithPort(fmt.Sprintf("%d", grpcPort))
require.NoError(t, err, "Could not initialize dapr client.")
// Declare the expected data.
msgs := make([]string, numOfMessages)
for i := 0; i < numOfMessages; i++ {
msgs[i] = fmt.Sprintf("exclusive-binding: Message %03d", i)
}
messages.ExpectStrings(msgs...)
metadata := make(map[string]string)
ctx.Log("Invoking binding!")
for _, msg := range msgs {
ctx.Logf("Sending: %q", msg)
req := &daprClient.InvokeBindingRequest{Name: "exclusive-binding", Operation: "create", Data: []byte(msg), Metadata: metadata}
err := client.InvokeOutputBinding(ctx, req)
require.Error(ctx, err, "error publishing message")
}
return nil
}
application := func(ctx flow.Context, s common.Service) (err error) {
// Setup the input binding endpoints.
err = multierr.Combine(err,
s.AddBindingInvocationHandler("exclusive-binding", func(_ context.Context, in *common.BindingEvent) ([]byte, error) {
messages.Observe(string(in.Data))
ctx.Logf("Got message: %s", string(in.Data))
return []byte("{}"), nil
}))
return err
}
flow.New(t, "rabbitmq certification").
// Run the application logic above.
Step(dockercompose.Run(clusterName, dockerComposeYAML)).
Step("wait for rabbitmq readiness",
retry.Do(time.Second, 30, amqpReady(rabbitMQURL))).
Step(app.Run("standardApp", fmt.Sprintf(":%d", appPort), application)).
Step(sidecar.Run("standardSidecar",
append(componentRuntimeOptions(),
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort)),
embedded.WithDaprGRPCPort(strconv.Itoa(grpcPort)),
embedded.WithDaprHTTPPort(strconv.Itoa(httpPort)),
embedded.WithComponentsPath("./components/exclusive"),
)...,
)).
// TODO: The following test function will always fail as expected because the sidecar didn't initialize the component (expected). This should be updated to look for a much more specific error signature however by reading the sidecar's stderr.
Step("send and wait", test).
Run()
}
func TestRabbitMQExtAuth(t *testing.T) {
messages := watcher.NewUnordered()
ports, _ := dapr_testing.GetFreePorts(3)
grpcPort := ports[0]
httpPort := ports[1]
appPort := ports[2]
test := func(ctx flow.Context) error {
client, err := daprClient.NewClientWithPort(fmt.Sprintf("%d", grpcPort))
require.NoError(t, err, "Could not initialize dapr client.")
// Declare the expected data.
msgs := make([]string, numOfMessages)
for i := 0; i < numOfMessages; i++ {
msgs[i] = fmt.Sprintf("standard-binding: Message %03d", i)
}
messages.ExpectStrings(msgs...)
metadata := make(map[string]string)
ctx.Log("Invoking binding!")
for _, msg := range msgs {
ctx.Logf("Sending: %q", msg)
req := &daprClient.InvokeBindingRequest{Name: "mq-mtls", Operation: "create", Data: []byte(msg), Metadata: metadata}
err := client.InvokeOutputBinding(ctx, req)
require.NoError(ctx, err, "error publishing message")
}
// Assertion on the data.
messages.Assert(ctx, time.Minute)
return nil
}
application := func(ctx flow.Context, s common.Service) (err error) {
// Setup the input binding endpoints.
err = multierr.Combine(err,
s.AddBindingInvocationHandler("mq-mtls", func(_ context.Context, in *common.BindingEvent) ([]byte, error) {
messages.Observe(string(in.Data))
ctx.Logf("Got message: %s", string(in.Data))
return []byte("{}"), nil
}))
return err
}
flow.New(t, "rabbitmq mtls certification").
// Run the application logic above.
Step(dockercompose.Run(clusterName, extSaslDockerComposeYAML)).
Step("wait for rabbitmq readiness",
retry.Do(time.Second, 30, amqpMtlsExternalAuthReady(rabbitMQURLExtAuth))).
Step(app.Run("standardApp", fmt.Sprintf(":%d", appPort), application)).
Step(sidecar.Run("standardSidecar",
append(componentRuntimeOptions(),
embedded.WithAppProtocol(protocol.HTTPProtocol, strconv.Itoa(appPort)),
embedded.WithDaprGRPCPort(strconv.Itoa(grpcPort)),
embedded.WithDaprHTTPPort(strconv.Itoa(httpPort)),
embedded.WithComponentsPath("./mtls_sasl_external/components/mtls_external"),
)...,
)).
Step("send and wait", test).
Run()
}
func amqpMtlsExternalAuthReady(url string) flow.Runnable {
return func(ctx flow.Context) error {
cer, err := tls.LoadX509KeyPair("./mtls_sasl_external/docker_sasl_external/certs/client/cert.pem", "./mtls_sasl_external/docker_sasl_external/certs/client/key.pem")
if err != nil {
log.Println(err)
}
tlsConfig := &tls.Config{Certificates: []tls.Certificate{cer}}
tlsConfig.InsecureSkipVerify = false
tlsConfig.RootCAs = x509.NewCertPool()
if ok := tlsConfig.RootCAs.AppendCertsFromPEM([]byte("-----BEGIN CERTIFICATE-----\nMIIC8DCCAdigAwIBAgIUHyqaUOmitCL9oR5ut9c9A7kfapEwDQYJKoZIhvcNAQEL\nBQAwEzERMA8GA1UEAwwITXlUZXN0Q0EwHhcNMjMwMjA4MjMxNTI2WhcNMjQwMjA4\nMjMxNTI2WjATMREwDwYDVQQDDAhNeVRlc3RDQTCCASIwDQYJKoZIhvcNAQEBBQAD\nggEPADCCAQoCggEBAOb8I5ng1cnKw37YbMBrgJQnsFOuqamSWT2AQAnzet/ZIHnE\n9cl/wjNNxluku7bR/YW1AB5syoNjyoFmLb9R8rx5awP/DrYjhyEp7DWE4attTTWB\nZQp4nFp9PDlGee5pQjZl/hq3ceqMVuCDP9OQnCv9fMYmZtpzEJuoAxOTuvc4NaNS\nFzKhvUWkpq/6lelk4r8a7nmxT7KgPbLohhXJmrfy81bQRrMz0m4eDlNDeDHm5IUg\n4dbUCsTPs8hibeogbz1DtSQh8wPe2IgsSKrJc94KSzrdhY7UohlkSxsQBXZlm/g0\nGyGdLmf39/iMn2x9bbqQodO+CiSoNm0rXdi+5zsCAwEAAaM8MDowDAYDVR0TBAUw\nAwEB/zALBgNVHQ8EBAMCAQYwHQYDVR0OBBYEFG8vXs0iB+ovHV1aISx/aJSYAOnF\nMA0GCSqGSIb3DQEBCwUAA4IBAQCOyfgf4TszN9jq+/CKJaTCC/Lw7Wkrzjx88/Sj\nCs8efyuM2ps/7+ce71jM5oUnSysg4cZcdEdKTVgd/ZQxcOyksQRskjhG/Y5MUHRl\nO2JH3zRSRKP3vKyHQ6K9DWIQw6RgC1PB+qG+MjU5MJONpn/H/7sjCeSCZqSWoled\nUhAKF0YAipYtMgpuE+lrwIu0LVQFvbK3QFPo59LYazjI4JG6mLC0mPE1rKOY4+cZ\nuDA6D/qYtM1344ZIYHrV1jhWRI8cwS0AUoYPTGb+muSXKpW0qeOJZmJli6wkAqZx\n0BULAkIRi0nBXhTP5w53TjAWwvNQ7IK+5MXBPr/f+ZjjtHIG\n-----END CERTIFICATE-----")); !ok {
os.Exit(1)
}
log.Println("Trying to connect...")
conn, err := amqp.DialTLS_ExternalAuth(url, tlsConfig)
if err != nil {
return err
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
return err
}
defer ch.Close()
return nil
}
}
func componentRuntimeOptions() []embedded.Option {
log := logger.NewLogger("dapr.components")
bindingsRegistry := bindings_loader.NewRegistry()
bindingsRegistry.Logger = log
bindingsRegistry.RegisterInputBinding(func(l logger.Logger) bindings.InputBinding {
return binding_rabbitmq.NewRabbitMQ(l)
}, "rabbitmq")
bindingsRegistry.RegisterOutputBinding(func(l logger.Logger) bindings.OutputBinding {
return binding_rabbitmq.NewRabbitMQ(l)
}, "rabbitmq")
return []embedded.Option{
embedded.WithBindings(bindingsRegistry),
}
}