feat: Make the MQTT protocol bindings follow conventions
Signed-off-by: Jon Skeet <jonskeet@google.com>
This commit is contained in:
parent
17d943302a
commit
f93f1fe6e6
|
@ -3,20 +3,69 @@
|
||||||
// See LICENSE file in the project root for full license information.
|
// See LICENSE file in the project root for full license information.
|
||||||
|
|
||||||
using MQTTnet;
|
using MQTTnet;
|
||||||
|
using System;
|
||||||
|
using System.Collections.Generic;
|
||||||
|
|
||||||
namespace CloudNative.CloudEvents.Mqtt
|
namespace CloudNative.CloudEvents.Mqtt
|
||||||
{
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Extension methods to convert between CloudEvents and MQTT messages.
|
||||||
|
/// </summary>
|
||||||
public static class MqttClientExtensions
|
public static class MqttClientExtensions
|
||||||
{
|
{
|
||||||
|
/// <summary>
|
||||||
|
/// Converts this MQTT message into a CloudEvent object.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="message">The MQTT message to convert. Must not be null.</param>
|
||||||
|
/// <param name="formatter">The event formatter to use to parse the CloudEvent. Must not be null.</param>
|
||||||
|
/// <param name="extensionAttributes">The extension attributes to use when parsing the CloudEvent. May be null.</param>
|
||||||
|
/// <returns>A reference to a validated CloudEvent instance.</returns>
|
||||||
public static CloudEvent ToCloudEvent(this MqttApplicationMessage message,
|
public static CloudEvent ToCloudEvent(this MqttApplicationMessage message,
|
||||||
CloudEventFormatter eventFormatter, params CloudEventAttribute[] extensionAttributes)
|
CloudEventFormatter formatter, params CloudEventAttribute[] extensionAttributes) =>
|
||||||
|
ToCloudEvent(message, formatter, (IEnumerable<CloudEventAttribute>) extensionAttributes);
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Converts this MQTT message into a CloudEvent object.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="message">The MQTT message to convert. Must not be null.</param>
|
||||||
|
/// <param name="formatter">The event formatter to use to parse the CloudEvent. Must not be null.</param>
|
||||||
|
/// <param name="extensionAttributes">The extension attributes to use when parsing the CloudEvent. May be null.</param>
|
||||||
|
/// <returns>A reference to a validated CloudEvent instance.</returns>
|
||||||
|
public static CloudEvent ToCloudEvent(this MqttApplicationMessage message,
|
||||||
|
CloudEventFormatter formatter, IEnumerable<CloudEventAttribute> extensionAttributes)
|
||||||
{
|
{
|
||||||
|
message = message ?? throw new ArgumentNullException(nameof(message));
|
||||||
|
formatter = formatter ?? throw new ArgumentNullException(nameof(formatter));
|
||||||
|
|
||||||
// TODO: Determine if there's a sensible content type we should apply.
|
// TODO: Determine if there's a sensible content type we should apply.
|
||||||
return eventFormatter.DecodeStructuredModeMessage(message.Payload, contentType: null, extensionAttributes);
|
return formatter.DecodeStructuredModeMessage(message.Payload, contentType: null, extensionAttributes);
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: Update to a newer version of MQTTNet and support both binary and structured mode?
|
// TODO: Update to a newer version of MQTTNet and support both binary and structured mode?
|
||||||
public static MqttApplicationMessage ToMqttApplicationMessage(this CloudEvent cloudEvent, CloudEventFormatter formatter, string topic) =>
|
/// <summary>
|
||||||
new MqttApplicationMessage { Topic = topic, Payload = formatter.EncodeStructuredModeMessage(cloudEvent, out _)};
|
/// Converts a CloudEvent to <see cref="MqttApplicationMessage"/>.
|
||||||
|
/// </summary>
|
||||||
|
/// <param name="cloudEvent">The CloudEvent to convert. Must not be null, and must be a valid CloudEvent.</param>
|
||||||
|
/// <param name="contentMode">Content mode. Currently only structured mode is supported.</param>
|
||||||
|
/// <param name="formatter">The formatter to use within the conversion. Must not be null.</param>
|
||||||
|
/// <param name="topic">The MQTT topic for the message. May be null.</param>
|
||||||
|
public static MqttApplicationMessage ToMqttApplicationMessage(this CloudEvent cloudEvent, ContentMode contentMode, CloudEventFormatter formatter, string topic)
|
||||||
|
{
|
||||||
|
cloudEvent = cloudEvent ?? throw new ArgumentNullException(nameof(cloudEvent));
|
||||||
|
cloudEvent.ValidateForConversion(nameof(cloudEvent));
|
||||||
|
formatter = formatter ?? throw new ArgumentNullException(nameof(formatter));
|
||||||
|
|
||||||
|
switch (contentMode)
|
||||||
|
{
|
||||||
|
case ContentMode.Structured:
|
||||||
|
return new MqttApplicationMessage
|
||||||
|
{
|
||||||
|
Topic = topic,
|
||||||
|
Payload = formatter.EncodeStructuredModeMessage(cloudEvent, out _)
|
||||||
|
};
|
||||||
|
default:
|
||||||
|
throw new ArgumentOutOfRangeException(nameof(contentMode), $"Unsupported content mode: {contentMode}");
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -63,7 +63,7 @@ namespace CloudNative.CloudEvents.Mqtt.UnitTests
|
||||||
tcs.SetResult(args.ApplicationMessage.ToCloudEvent(jsonEventFormatter));
|
tcs.SetResult(args.ApplicationMessage.ToCloudEvent(jsonEventFormatter));
|
||||||
|
|
||||||
var result = await client.SubscribeAsync("abc");
|
var result = await client.SubscribeAsync("abc");
|
||||||
await client.PublishAsync(cloudEvent.ToMqttApplicationMessage(new JsonEventFormatter(), topic: "abc"));
|
await client.PublishAsync(cloudEvent.ToMqttApplicationMessage(ContentMode.Structured, new JsonEventFormatter(), topic: "abc"));
|
||||||
var receivedCloudEvent = await tcs.Task;
|
var receivedCloudEvent = await tcs.Task;
|
||||||
|
|
||||||
Assert.Equal(CloudEventsSpecVersion.Default, receivedCloudEvent.SpecVersion);
|
Assert.Equal(CloudEventsSpecVersion.Default, receivedCloudEvent.SpecVersion);
|
||||||
|
|
Loading…
Reference in New Issue