Allow metadata to flow through service bus queue (#1911)

* Allow metadata to flow through service bus queue

This commit lets metadata bind to a message and be read when
the message is received. This allows for the distributed tracing
of events.

Signed-off-by: Hal Spang <halspang@microsoft.com>

* Remove saved fields from ApplicationProperties

Signed-off-by: Hal Spang <halspang@microsoft.com>

* Add a certification test for metadata

Signed-off-by: Hal Spang <halspang@microsoft.com>

Signed-off-by: Hal Spang <halspang@microsoft.com>
Co-authored-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com>
Co-authored-by: Bernd Verst <4535280+berndverst@users.noreply.github.com>
This commit is contained in:
halspang 2022-08-15 17:31:14 -07:00 committed by GitHub
parent 38fc439410
commit 4bc800f612
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 93 additions and 1 deletions

View File

@ -155,7 +155,8 @@ func (a *AzureServiceBusQueues) Invoke(ctx context.Context, req *bindings.Invoke
}
msg := &servicebus.Message{
Body: req.Data,
Body: req.Data,
ApplicationProperties: make(map[string]interface{}),
}
if val, ok := req.Metadata[id]; ok && val != "" {
msg.MessageID = &val
@ -163,6 +164,16 @@ func (a *AzureServiceBusQueues) Invoke(ctx context.Context, req *bindings.Invoke
if val, ok := req.Metadata[correlationID]; ok && val != "" {
msg.CorrelationID = &val
}
// Include incoming metadata in the message to be used when it is read.
for k, v := range req.Metadata {
// Don't include the values that are saved in MessageID or CorrelationID.
if k == id || k == correlationID {
continue
}
msg.ApplicationProperties[k] = v
}
ttl, ok, err := contrib_metadata.TryGetTTL(req.Metadata)
if err != nil {
return nil, err
@ -262,6 +273,13 @@ func (a *AzureServiceBusQueues) getHandlerFunc(handler bindings.Handler) impl.Ha
metadata[label] = *msg.Subject
}
// Passthrough any custom metadata to the handler.
for key, val := range msg.ApplicationProperties {
if stringVal, ok := val.(string); ok {
metadata[key] = stringVal
}
}
_, err := handler(a.ctx, &bindings.ReadResponse{
Data: msg.Body,
Metadata: metadata,

View File

@ -31,6 +31,11 @@ The purpose of this module is to provide tests that certify the Azure Service Bu
- Start an application that is guaranteed to fail
- Ensure the binding continues to read incoming messages
- Ensure the messages that are failed are retried
- Verify Metadata flows through event
- Create an output/input binding
- Run dapr application with components
- Invoke the output binding providing metadata
- Receive the message and validate the metadata
### Future Tests
1. Provide iterations around the different auth mechanisms supported by Azure Service Bus.

View File

@ -353,3 +353,72 @@ func TestAzureServiceBusQueueRetriesOnError(t *testing.T) {
Step("send and wait", test).
Run()
}
func TestServiceBusQueueMetadata(t *testing.T) {
log := logger.NewLogger("dapr.components")
messages := watcher.NewUnordered()
ports, _ := dapr_testing.GetFreePorts(3)
grpcPort := ports[0]
httpPort := ports[1]
appPort := ports[2]
test := func(ctx flow.Context) error {
client, err := daprClient.NewClientWithPort(fmt.Sprintf("%d", grpcPort))
require.NoError(t, err, "Could not initialize dapr client.")
// Send events that the application above will observe.
ctx.Log("Invoking binding!")
req := &daprClient.InvokeBindingRequest{Name: "sb-binding-1", Operation: "create", Data: []byte("test msg"), Metadata: map[string]string{"TestMetadata": "Some Metadata"}}
err = client.InvokeOutputBinding(ctx, req)
require.NoError(ctx, err, "error publishing message")
// Do the messages we observed match what we expect?
messages.Assert(ctx, time.Minute)
return nil
}
// Application logic that tracks messages from a topic.
application := func(ctx flow.Context, s common.Service) (err error) {
// Setup the input binding endpoints
err = multierr.Combine(err,
s.AddBindingInvocationHandler("sb-binding-1", func(_ context.Context, in *common.BindingEvent) ([]byte, error) {
messages.Observe(string(in.Data))
ctx.Logf("Got message: %s - %+v", string(in.Data), in.Metadata)
require.NotEmpty(t, in.Metadata)
require.Contains(t, in.Metadata, "TestMetadata")
require.Equal(t, "Some Metadata", in.Metadata["TestMetadata"])
return []byte("{}"), nil
}))
return err
}
flow.New(t, "servicebusqueue certification").
// Run the application logic above.
Step(app.Run("metadataApp", fmt.Sprintf(":%d", appPort), application)).
Step(sidecar.Run("metadataSidecar",
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort),
embedded.WithDaprGRPCPort(grpcPort),
embedded.WithDaprHTTPPort(httpPort),
embedded.WithComponentsPath("./components/standard"),
runtime.WithOutputBindings(
binding_loader.NewOutput("azure.servicebusqueues", func() bindings.OutputBinding {
return binding_asb.NewAzureServiceBusQueues(log)
}),
),
runtime.WithInputBindings(
binding_loader.NewInput("azure.servicebusqueues", func() bindings.InputBinding {
return binding_asb.NewAzureServiceBusQueues(log)
}),
),
runtime.WithSecretStores(
secretstores_loader.New("local.env", func() secretstores.SecretStore {
return secretstore_env.NewEnvSecretStore(log)
}),
))).
Step("send and wait", test).
Run()
}