From c3ba1505df884d35779624ecb19b27cbb5568989 Mon Sep 17 00:00:00 2001 From: Phillip Hoff Date: Thu, 11 Jul 2024 00:33:28 -0700 Subject: [PATCH] Renames and add data property. --- .../StreamingSubscribeExample/Program.cs | 16 ++++++++++++++-- .../DaprPublishSubscribeClient.cs | 6 +++--- .../DaprPublishSubscribeGrpcClient.cs | 3 ++- .../PublishSubscribe/TopicRequest.cs | 7 +++++++ 4 files changed, 26 insertions(+), 6 deletions(-) diff --git a/examples/Client/PublishSubscribe/StreamingSubscribeExample/Program.cs b/examples/Client/PublishSubscribe/StreamingSubscribeExample/Program.cs index 3e06010d..6c9a8546 100644 --- a/examples/Client/PublishSubscribe/StreamingSubscribeExample/Program.cs +++ b/examples/Client/PublishSubscribe/StreamingSubscribeExample/Program.cs @@ -1,5 +1,7 @@ // See https://aka.ms/new-console-template for more information +using System.Text.Json; +using System.Text.Json.Serialization; using Dapr.Messaging.PublishSubscribe; using Microsoft.Extensions.Logging; @@ -14,25 +16,29 @@ var client = DaprPublishSubscribeClient.Create(new() { LoggerFactory = loggerFac Console.WriteLine("Subscribing to topic A..."); -using var subscriptionA = client.SubscribeAsync( +using var subscriptionA = client.Subscribe( "pubsub", "topicA", (request, cancellationToken) => { Console.WriteLine($"Received message on topic A: {request}"); + Console.WriteLine($"Data is: {JsonSerializer.Deserialize(request.Data.Span)}"); + return Task.FromResult(TopicResponse.Drop); }); Console.WriteLine("Subscribing to topic B..."); -using var subscriptionB = client.SubscribeAsync( +using var subscriptionB = client.Subscribe( "pubsub", "topicB", (request, cancellationToken) => { Console.WriteLine($"Received message on topic B: {request}"); + Console.WriteLine($"Data is: {JsonSerializer.Deserialize(request.Data.Span)}"); + return Task.FromResult(TopicResponse.Success); }); @@ -41,3 +47,9 @@ Console.WriteLine("Waiting 30s to exit..."); await Task.Delay(TimeSpan.FromSeconds(30)); Console.WriteLine("Exiting..."); + +sealed record TopicData +{ + [JsonPropertyName("test")] + public string? Test { get; init; } +} diff --git a/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClient.cs b/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClient.cs index 129eb780..18fbba8d 100644 --- a/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClient.cs +++ b/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeClient.cs @@ -62,9 +62,9 @@ public abstract class DaprPublishSubscribeClient /// /// /// - public IDisposable SubscribeAsync(string pubSubName, string topicName, TopicRequestHandler handler) + public IDisposable Subscribe(string pubSubName, string topicName, TopicRequestHandler handler) { - return SubscribeAsync(pubSubName, topicName, handler, null); + return Subscribe(pubSubName, topicName, handler, null); } /// @@ -75,5 +75,5 @@ public abstract class DaprPublishSubscribeClient /// /// /// - public abstract IDisposable SubscribeAsync(string pubSubName, string topicName, TopicRequestHandler handler, DaprSubscriptionOptions? options); + public abstract IDisposable Subscribe(string pubSubName, string topicName, TopicRequestHandler handler, DaprSubscriptionOptions? options); } diff --git a/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeGrpcClient.cs b/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeGrpcClient.cs index 34e9d755..4c61dab5 100644 --- a/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeGrpcClient.cs +++ b/src/Dapr.Messaging/PublishSubscribe/DaprPublishSubscribeGrpcClient.cs @@ -34,7 +34,7 @@ sealed class DaprPublishSubscribeGrpcClient : DaprPublishSubscribeClient this.logger = options?.LoggerFactory?.CreateLogger(); } - public override IDisposable SubscribeAsync(string pubSubName, string topicName, TopicRequestHandler handler, DaprSubscriptionOptions? options) + public override IDisposable Subscribe(string pubSubName, string topicName, TopicRequestHandler handler, DaprSubscriptionOptions? options) { var cts = new CancellationTokenSource(); @@ -94,6 +94,7 @@ sealed class DaprPublishSubscribeGrpcClient : DaprPublishSubscribeClient Type = response.Type, SpecVersion = response.SpecVersion, DataContentType = response.DataContentType, + Data = response.Data.Memory, Topic = response.Topic, PubSubName = response.PubsubName, Path = response.Path, diff --git a/src/Dapr.Messaging/PublishSubscribe/TopicRequest.cs b/src/Dapr.Messaging/PublishSubscribe/TopicRequest.cs index 90c47df2..b03762b9 100644 --- a/src/Dapr.Messaging/PublishSubscribe/TopicRequest.cs +++ b/src/Dapr.Messaging/PublishSubscribe/TopicRequest.cs @@ -11,6 +11,8 @@ // limitations under the License. // ------------------------------------------------------------------------ +using System; + namespace Dapr.Messaging.PublishSubscribe; /// @@ -43,6 +45,11 @@ public sealed record TopicRequest /// public string DataContentType { get; init; } = default!; + /// + /// + /// + public ReadOnlyMemory Data { get; init; } + /// /// ///