Update CloudNative.CloudEvents.Mqtt to MQTTnet version 4.3.6.1152

This involves a new major version of CloudNative.CloudEvents.Mqtt.
This commit is contained in:
Jon Skeet 2024-07-30 11:35:39 +01:00
parent b04e9296d9
commit 81fb9bafb6
4 changed files with 31 additions and 17 deletions

View File

@ -5,11 +5,14 @@
<Description>MQTT extensions for CloudNative.CloudEvents</Description>
<PackageTags>cncf;cloudnative;cloudevents;events;mqtt</PackageTags>
<LangVersion>8.0</LangVersion>
<Version>3.$(MinorVersion).$(PatchVersion)</Version>
<!-- After the first release of v3, we'll change the major here to 3. -->
<PackageValidationBaselineVersion>2.$(PackageValidationMinor).0</PackageValidationBaselineVersion>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="MQTTnet" Version="3.0.15" />
<PackageReference Include="MQTTnet" Version="4.3.6.1152" />
<ProjectReference Include="..\CloudNative.CloudEvents\CloudNative.CloudEvents.csproj" />
</ItemGroup>

View File

@ -1,4 +1,4 @@
// Copyright (c) Cloud Native Foundation.
// Copyright (c) Cloud Native Foundation.
// Licensed under the Apache 2.0 license.
// See LICENSE file in the project root for full license information.
@ -39,10 +39,10 @@ namespace CloudNative.CloudEvents.Mqtt
Validation.CheckNotNull(message, nameof(message));
// TODO: Determine if there's a sensible content type we should apply.
return formatter.DecodeStructuredModeMessage(message.Payload, contentType: null, extensionAttributes);
return formatter.DecodeStructuredModeMessage(message.PayloadSegment, contentType: null, extensionAttributes);
}
// TODO: Update to a newer version of MQTTNet and support both binary and structured mode?
// TODO: Support both binary and structured mode.
/// <summary>
/// Converts a CloudEvent to <see cref="MqttApplicationMessage"/>.
/// </summary>
@ -61,11 +61,11 @@ namespace CloudNative.CloudEvents.Mqtt
return new MqttApplicationMessage
{
Topic = topic,
Payload = BinaryDataUtilities.AsArray(formatter.EncodeStructuredModeMessage(cloudEvent, out _))
PayloadSegment = BinaryDataUtilities.GetArraySegment(formatter.EncodeStructuredModeMessage(cloudEvent, out _))
};
default:
throw new ArgumentOutOfRangeException(nameof(contentMode), $"Unsupported content mode: {contentMode}");
}
}
}
}
}

View File

@ -114,7 +114,16 @@ namespace CloudNative.CloudEvents.Core
}
// Note: when this returns, the Array property of the returned segment is guaranteed not to be null.
private static ArraySegment<byte> GetArraySegment(ReadOnlyMemory<byte> memory) =>
/// <summary>
/// Returns the data from <paramref name="memory"/> as a byte array, return the underlying array
/// if there is one, or creating a copy otherwise. This method should be used with care, due to the
/// "sometimes shared, sometimes not" nature of the result. (It is generally safe to use this with the result
/// of encoding a CloudEvent, assuming the same memory is not used elsewhere.)
/// </summary>
/// <param name="memory">The memory to obtain the data from.</param>
/// <returns>The data in <paramref name="memory"/> as an array segment.</returns>
public static ArraySegment<byte> GetArraySegment(ReadOnlyMemory<byte> memory) =>
MemoryMarshal.TryGetArray(memory, out var segment) && segment.Array is not null
? segment
: new ArraySegment<byte>(memory.ToArray());

View File

@ -1,12 +1,10 @@
// Copyright (c) Cloud Native Foundation.
// Copyright (c) Cloud Native Foundation.
// Licensed under the Apache 2.0 license.
// See LICENSE file in the project root for full license information.
using CloudNative.CloudEvents.NewtonsoftJson;
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Options;
using MQTTnet.Client.Receiving;
using MQTTnet.Server;
using System;
using System.Net.Mime;
@ -18,16 +16,17 @@ namespace CloudNative.CloudEvents.Mqtt.UnitTests
{
public class MqttTest : IDisposable
{
private readonly IMqttServer mqttServer;
private readonly MqttServer mqttServer;
public MqttTest()
{
var optionsBuilder = new MqttServerOptionsBuilder()
.WithConnectionBacklog(100)
.WithDefaultEndpoint()
.WithDefaultEndpointPort(52355);
this.mqttServer = new MqttFactory().CreateMqttServer();
mqttServer.StartAsync(optionsBuilder.Build()).GetAwaiter().GetResult();
this.mqttServer = new MqttFactory().CreateMqttServer(optionsBuilder.Build());
mqttServer.StartAsync().GetAwaiter().GetResult();
}
public void Dispose()
@ -55,14 +54,17 @@ namespace CloudNative.CloudEvents.Mqtt.UnitTests
var options = new MqttClientOptionsBuilder()
.WithClientId("Client1")
.WithTcpServer("localhost", 52355)
.WithTcpServer("127.0.0.1", 52355)
.WithCleanSession()
.Build();
TaskCompletionSource<CloudEvent> tcs = new TaskCompletionSource<CloudEvent>();
await client.ConnectAsync(options);
client.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(
args => tcs.SetResult(args.ApplicationMessage.ToCloudEvent(jsonEventFormatter)));
client.ApplicationMessageReceivedAsync += args =>
{
tcs.SetResult(args.ApplicationMessage.ToCloudEvent(jsonEventFormatter));
return Task.CompletedTask;
};
var result = await client.SubscribeAsync("abc");
await client.PublishAsync(cloudEvent.ToMqttApplicationMessage(ContentMode.Structured, new JsonEventFormatter(), topic: "abc"));
@ -79,4 +81,4 @@ namespace CloudNative.CloudEvents.Mqtt.UnitTests
Assert.Equal("value", (string?) receivedCloudEvent["comexampleextension1"]);
}
}
}
}