Renames and add data property.

This commit is contained in:
Phillip Hoff 2024-07-11 00:33:28 -07:00
parent 46b86e21d3
commit c3ba1505df
4 changed files with 26 additions and 6 deletions

View File

@ -1,5 +1,7 @@
// See https://aka.ms/new-console-template for more information
using System.Text.Json;
using System.Text.Json.Serialization;
using Dapr.Messaging.PublishSubscribe;
using Microsoft.Extensions.Logging;
@ -14,25 +16,29 @@ var client = DaprPublishSubscribeClient.Create(new() { LoggerFactory = loggerFac
Console.WriteLine("Subscribing to topic A...");
using var subscriptionA = client.SubscribeAsync(
using var subscriptionA = client.Subscribe(
"pubsub",
"topicA",
(request, cancellationToken) =>
{
Console.WriteLine($"Received message on topic A: {request}");
Console.WriteLine($"Data is: {JsonSerializer.Deserialize<TopicData>(request.Data.Span)}");
return Task.FromResult(TopicResponse.Drop);
});
Console.WriteLine("Subscribing to topic B...");
using var subscriptionB = client.SubscribeAsync(
using var subscriptionB = client.Subscribe(
"pubsub",
"topicB",
(request, cancellationToken) =>
{
Console.WriteLine($"Received message on topic B: {request}");
Console.WriteLine($"Data is: {JsonSerializer.Deserialize<TopicData>(request.Data.Span)}");
return Task.FromResult(TopicResponse.Success);
});
@ -41,3 +47,9 @@ Console.WriteLine("Waiting 30s to exit...");
await Task.Delay(TimeSpan.FromSeconds(30));
Console.WriteLine("Exiting...");
sealed record TopicData
{
[JsonPropertyName("test")]
public string? Test { get; init; }
}

View File

@ -62,9 +62,9 @@ public abstract class DaprPublishSubscribeClient
/// <param name="topicName"></param>
/// <param name="handler"></param>
/// <returns></returns>
public IDisposable SubscribeAsync(string pubSubName, string topicName, TopicRequestHandler handler)
public IDisposable Subscribe(string pubSubName, string topicName, TopicRequestHandler handler)
{
return SubscribeAsync(pubSubName, topicName, handler, null);
return Subscribe(pubSubName, topicName, handler, null);
}
/// <summary>
@ -75,5 +75,5 @@ public abstract class DaprPublishSubscribeClient
/// <param name="handler"></param>
/// <param name="options"></param>
/// <returns></returns>
public abstract IDisposable SubscribeAsync(string pubSubName, string topicName, TopicRequestHandler handler, DaprSubscriptionOptions? options);
public abstract IDisposable Subscribe(string pubSubName, string topicName, TopicRequestHandler handler, DaprSubscriptionOptions? options);
}

View File

@ -34,7 +34,7 @@ sealed class DaprPublishSubscribeGrpcClient : DaprPublishSubscribeClient
this.logger = options?.LoggerFactory?.CreateLogger<DaprPublishSubscribeGrpcClient>();
}
public override IDisposable SubscribeAsync(string pubSubName, string topicName, TopicRequestHandler handler, DaprSubscriptionOptions? options)
public override IDisposable Subscribe(string pubSubName, string topicName, TopicRequestHandler handler, DaprSubscriptionOptions? options)
{
var cts = new CancellationTokenSource();
@ -94,6 +94,7 @@ sealed class DaprPublishSubscribeGrpcClient : DaprPublishSubscribeClient
Type = response.Type,
SpecVersion = response.SpecVersion,
DataContentType = response.DataContentType,
Data = response.Data.Memory,
Topic = response.Topic,
PubSubName = response.PubsubName,
Path = response.Path,

View File

@ -11,6 +11,8 @@
// limitations under the License.
// ------------------------------------------------------------------------
using System;
namespace Dapr.Messaging.PublishSubscribe;
/// <summary>
@ -43,6 +45,11 @@ public sealed record TopicRequest
/// </summary>
public string DataContentType { get; init; } = default!;
/// <summary>
///
/// </summary>
public ReadOnlyMemory<byte> Data { get; init; }
/// <summary>
///
/// </summary>