From 2006bcccb01e397e8987aa03fde249b4d5368b0c Mon Sep 17 00:00:00 2001 From: Simon Leet Date: Mon, 25 Oct 2021 18:10:12 +0000 Subject: [PATCH] Support message-id in Event Hubs components --- bindings/azure/eventhubs/eventhubs.go | 8 ++++++-- bindings/azure/eventhubs/eventhubs_integration_test.go | 2 ++ pubsub/azure/eventhubs/eventhubs.go | 7 ++++++- pubsub/azure/eventhubs/eventhubs_integration_test.go | 2 ++ tests/scripts/send-iot-device-events.sh | 2 +- 5 files changed, 17 insertions(+), 4 deletions(-) diff --git a/bindings/azure/eventhubs/eventhubs.go b/bindings/azure/eventhubs/eventhubs.go index ad10031f9..9667a170a 100644 --- a/bindings/azure/eventhubs/eventhubs.go +++ b/bindings/azure/eventhubs/eventhubs.go @@ -57,6 +57,7 @@ const ( sysPropIotHubConnectionAuthMethod = "iothub-connection-auth-method" sysPropIotHubConnectionModuleID = "iothub-connection-module-id" sysPropIotHubEnqueuedTime = "iothub-enqueuedtime" + sysPropMessageID = "message-id" ) func readHandler(e *eventhub.Event, handler func(*bindings.ReadResponse) ([]byte, error)) error { @@ -74,7 +75,7 @@ func readHandler(e *eventhub.Event, handler func(*bindings.ReadResponse) ([]byte if e.SystemProperties.PartitionID != nil { res.Metadata[sysPropPartitionID] = strconv.Itoa(int(*e.SystemProperties.PartitionID)) } - // The following metadata properties are only present if event was generated by Azure IoT Hub + // The following metadata properties are only present if event was generated by Azure IoT Hub. if e.SystemProperties.PartitionKey != nil { res.Metadata[sysPropPartitionKey] = *e.SystemProperties.PartitionKey } @@ -93,7 +94,10 @@ func readHandler(e *eventhub.Event, handler func(*bindings.ReadResponse) ([]byte if e.SystemProperties.IoTHubEnqueuedTime != nil { res.Metadata[sysPropIotHubEnqueuedTime] = e.SystemProperties.IoTHubEnqueuedTime.Format(time.RFC3339) } - + // azure-event-hubs-go SDK pulls out the AMQP message-id property to the Event.ID property, map it from there. + if e.ID != "" { + res.Metadata[sysPropMessageID] = e.ID + } _, err := handler(&res) return err diff --git a/bindings/azure/eventhubs/eventhubs_integration_test.go b/bindings/azure/eventhubs/eventhubs_integration_test.go index 5a655185d..f2604be86 100644 --- a/bindings/azure/eventhubs/eventhubs_integration_test.go +++ b/bindings/azure/eventhubs/eventhubs_integration_test.go @@ -81,6 +81,7 @@ func testReadIotHubEvents(t *testing.T) { assert.Greater(t, len(readResponses), 0, "Failed to receive any IotHub events") logger.Infof("Received %d messages", len(readResponses)) for _, r := range readResponses { + logger.Infof("Message metadata: %v", r.Metadata) assert.Contains(t, string(r.Data), "Integration test message") // Verify expected IoT Hub device event metadata exists @@ -92,6 +93,7 @@ func testReadIotHubEvents(t *testing.T) { assert.Contains(t, r.Metadata, sysPropIotHubAuthGenerationID, "IoT device event missing: %s", sysPropIotHubAuthGenerationID) assert.Contains(t, r.Metadata, sysPropIotHubConnectionAuthMethod, "IoT device event missing: %s", sysPropIotHubConnectionAuthMethod) assert.Contains(t, r.Metadata, sysPropIotHubEnqueuedTime, "IoT device event missing: %s", sysPropIotHubEnqueuedTime) + assert.Contains(t, r.Metadata, sysPropMessageID, "IoT device event missing: %s", sysPropMessageID) } eh.Close() diff --git a/pubsub/azure/eventhubs/eventhubs.go b/pubsub/azure/eventhubs/eventhubs.go index 7ddd94e16..c45d68d6e 100644 --- a/pubsub/azure/eventhubs/eventhubs.go +++ b/pubsub/azure/eventhubs/eventhubs.go @@ -51,6 +51,7 @@ const ( sysPropIotHubConnectionAuthMethod = "iothub-connection-auth-method" sysPropIotHubConnectionModuleID = "iothub-connection-module-id" sysPropIotHubEnqueuedTime = "iothub-enqueuedtime" + sysPropMessageID = "message-id" ) func subscribeHandler(ctx context.Context, topic string, e *eventhub.Event, handler pubsub.Handler) error { @@ -68,7 +69,7 @@ func subscribeHandler(ctx context.Context, topic string, e *eventhub.Event, hand if e.SystemProperties.PartitionID != nil { res.Metadata[sysPropPartitionID] = strconv.Itoa(int(*e.SystemProperties.PartitionID)) } - // The following metadata properties are only present if event was generated by Azure IoT Hub + // The following metadata properties are only present if event was generated by Azure IoT Hub. if e.SystemProperties.PartitionKey != nil { res.Metadata[sysPropPartitionKey] = *e.SystemProperties.PartitionKey } @@ -87,6 +88,10 @@ func subscribeHandler(ctx context.Context, topic string, e *eventhub.Event, hand if e.SystemProperties.IoTHubEnqueuedTime != nil { res.Metadata[sysPropIotHubEnqueuedTime] = e.SystemProperties.IoTHubEnqueuedTime.Format(time.RFC3339) } + // azure-event-hubs-go SDK pulls out the AMQP message-id property to the Event.ID property, map it from there. + if e.ID != "" { + res.Metadata[sysPropMessageID] = e.ID + } return handler(ctx, &res) } diff --git a/pubsub/azure/eventhubs/eventhubs_integration_test.go b/pubsub/azure/eventhubs/eventhubs_integration_test.go index 6dbae4f78..2dc13aaa5 100644 --- a/pubsub/azure/eventhubs/eventhubs_integration_test.go +++ b/pubsub/azure/eventhubs/eventhubs_integration_test.go @@ -88,6 +88,7 @@ func testReadIotHubEvents(t *testing.T) { assert.Greater(t, len(messages), 0, "Failed to receive any IotHub events") logger.Infof("Received %d messages", len(messages)) for _, r := range messages { + logger.Infof("Message metadata: %v", r.Metadata) assert.Equal(t, r.Topic, testTopic, "Message topic doesn't match subscription") assert.Contains(t, string(r.Data), "Integration test message") @@ -100,6 +101,7 @@ func testReadIotHubEvents(t *testing.T) { assert.Contains(t, r.Metadata, sysPropIotHubAuthGenerationID, "IoT device event missing: %s", sysPropIotHubAuthGenerationID) assert.Contains(t, r.Metadata, sysPropIotHubConnectionAuthMethod, "IoT device event missing: %s", sysPropIotHubConnectionAuthMethod) assert.Contains(t, r.Metadata, sysPropIotHubEnqueuedTime, "IoT device event missing: %s", sysPropIotHubEnqueuedTime) + assert.Contains(t, r.Metadata, sysPropMessageID, "IoT device event missing: %s", sysPropMessageID) } eh.Close() diff --git a/tests/scripts/send-iot-device-events.sh b/tests/scripts/send-iot-device-events.sh index f1cf6d8ba..a8dd03270 100644 --- a/tests/scripts/send-iot-device-events.sh +++ b/tests/scripts/send-iot-device-events.sh @@ -30,4 +30,4 @@ if [[ -z "$(az iot hub device-identity show -n ${IOT_HUB_NAME} -d ${IOT_HUB_TEST fi # Send the test IoT device messages to the IoT Hub -az iot device send-d2c-message -n ${IOT_HUB_NAME} -d ${IOT_HUB_TEST_DEVICE_NAME} --data '{ "data": "Integration test message" }' --msg-count 2 +az iot device simulate -n ${IOT_HUB_NAME} -d ${IOT_HUB_TEST_DEVICE_NAME} --data '{ "data": "Integration test message" }' --msg-count 2 --msg-interval 1 --protocol http --properties "iothub-userid=dapr-user-id;iothub-messageid=dapr-message-id"