Multi pubsub (#374)

* Prepping for multi pubsub

* Add pubsub name, some cleanup

* multi pubsub changes

Co-authored-by: Aman Bhardwaj <amanbha@microsoft.com>
Co-authored-by: LM <lemai>
This commit is contained in:
Leon Mai 2020-08-17 13:58:50 -07:00 committed by GitHub
parent 6671ab020c
commit 11f6ea45e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 152 additions and 69 deletions

View File

@ -5,6 +5,7 @@
namespace ControllerSample.Controllers
{
using System;
using System.Threading.Tasks;
using Dapr;
using Dapr.Client;
@ -43,10 +44,12 @@ namespace ControllerSample.Controllers
/// <param name="transaction">Transaction info.</param>
/// <param name="daprClient">State client to interact with Dapr runtime.</param>
/// <returns>A <see cref="Task{TResult}"/> representing the result of the asynchronous operation.</returns>
[Topic("deposit")]
/// "pubsub", the first parameter into the Topic attribute, is name of the default pub/sub configured by the Dapr CLI.
[Topic("pubsub", "deposit")]
[HttpPost("deposit")]
public async Task<ActionResult<Account>> Deposit(Transaction transaction, [FromServices] DaprClient daprClient)
{
Console.WriteLine("Enter deposit");
var state = await daprClient.GetStateEntryAsync<Account>(StoreName, transaction.Id);
state.Value ??= new Account() { Id = transaction.Id, };
state.Value.Balance += transaction.Amount;
@ -60,10 +63,12 @@ namespace ControllerSample.Controllers
/// <param name="transaction">Transaction info.</param>
/// <param name="daprClient">State client to interact with Dapr runtime.</param>
/// <returns>A <see cref="Task{TResult}"/> representing the result of the asynchronous operation.</returns>
[Topic("withdraw")]
/// "pubsub", the first parameter into the Topic attribute, is name of the default pub/sub configured by the Dapr CLI.
[Topic("pubsub", "withdraw")]
[HttpPost("withdraw")]
public async Task<ActionResult<Account>> Withdraw(Transaction transaction, [FromServices] DaprClient daprClient)
{
Console.WriteLine("Enter withdraw");
var state = await daprClient.GetStateEntryAsync<Account>(StoreName, transaction.Id);
if (state.Value == null)

View File

@ -7,7 +7,7 @@ It exposes the following endpoints over HTTP:
- POST `/deposit`: Accepts a JSON payload to deposit money to an account
- POST `/withdraw`: Accepts a JSON payload to withdraw money from an account
The application also registers for pub-sub with the `deposit` and `withdraw` topics.
The application also registers for pub/sub with the `deposit` and `withdraw` topics.
## Running the Sample
@ -109,11 +109,11 @@ Output:
On Linux, MacOS:
```sh
dapr publish -t withdraw -d '{"id": "17", "amount": 15 }'
dapr publish --pubsub pubsub -t withdraw -d '{"id": "17", "amount": 15 }'
```
On Windows:
```sh
dapr publish -t withdraw -d "{\"id\": \"17\", \"amount\": 15 }"
dapr publish --pubsub pubsub -t withdraw -d "{\"id\": \"17\", \"amount\": 15 }"
```
---
@ -121,11 +121,11 @@ On Windows:
Publish events using Dapr cli:
On Linux, MacOS:
```sh
dapr publish -t deposit -d '{"id": "17", "amount": 15 }'
dapr publish --pubsub pubsub -t deposit -d '{"id": "17", "amount": 15 }'
```
On Windows:
```sh
dapr publish -t deposit -d "{\"id\": \"17\", \"amount\": 15 }"
dapr publish --pubsub pubsub -t deposit -d "{\"id\": \"17\", \"amount\": 15 }"
```
---
@ -162,12 +162,12 @@ app.UseEndpoints(endpoints =>
});
```
`MapSubscribeHandler()` registers an endpoint that will be called by the Dapr runtime to register for pub-sub topics. This is is not needed unless using pub-sub.
`MapSubscribeHandler()` registers an endpoint that will be called by the Dapr runtime to register for pub/sub topics. This is is not needed unless using pub/sub.
---
```C#
[Topic("deposit")]
[Topic("pubsub", "deposit")]
[HttpPost("deposit")]
public async Task<ActionResult<Account>> Deposit(...)
{
@ -175,7 +175,7 @@ public async Task<ActionResult<Account>> Deposit(...)
}
```
`[Topic(...)]` associates a pub-sub topic with this endpoint.
`[Topic(...)]` associates a pub/sub named `pubsub` (this is the default configured by the Dapr CLI) pub/sub topic `deposit` with this endpoint.
---
@ -215,7 +215,7 @@ Using `[FromState]` allows binding a data type directly without using `StateEntr
---
```C#
[Topic("deposit")]
[Topic("pubsub", "deposit")]
[HttpPost("deposit")]
public async Task<ActionResult<Account>> Deposit(Transaction transaction, [FromServices] StateClient stateClient)
{

View File

@ -7,7 +7,7 @@ It exposes the following endpoints over HTTP:
- POST `/deposit`: Accepts a JSON payload to deposit money to an account
- POST `/withdraw`: Accepts a JSON payload to withdraw money from an account
The application also registers for pub-sub with the `deposit` and `withdraw` topics.
The application also registers for pub/sub with the `deposit` and `withdraw` topics.
## Running the Sample
@ -105,12 +105,12 @@ Output:
Publish events using Dapr cli:
On Linux, MacOS:
```sh
dapr publish -t withdraw -d '{"id": "17", "amount": 15 }'
dapr publish --pubsub pubsub -t withdraw -d '{"id": "17", "amount": 15 }'
```
On Windows:
```sh
dapr publish -t withdraw -d "{\"id\": \"17\", \"amount\": 15 }"
dapr publish --pubsub pubsub -t withdraw -d "{\"id\": \"17\", \"amount\": 15 }"
```
---
@ -119,11 +119,11 @@ On Windows:
Publish events using Dapr cli:
On Linux, MacOS:
```sh
dapr publish -t deposit -d '{"id": "17", "amount": 15 }'
dapr publish --pubsub pubsub -t deposit -d '{"id": "17", "amount": 15 }'
```
On Windows:
```sh
dapr publish -t deposit -d "{\"id\": \"17\", \"amount\": 15 }"
dapr publish --pubsub pubsub -t deposit -d "{\"id\": \"17\", \"amount\": 15 }"
```
---
@ -158,16 +158,16 @@ app.UseEndpoints(endpoints =>
endpoints.MapSubscribeHandler();
endpoints.MapGet("{id}", Balance);
endpoints.MapPost("deposit", Deposit).WithTopic("deposit");
endpoints.MapPost("withdraw", Withdraw).WithTopic("withdraw");
endpoints.MapPost("deposit", Deposit).WithTopic(PubsubName, "deposit");
endpoints.MapPost("withdraw", Withdraw).WithTopic(PubsubName, "withdraw");
});
```
`MapSubscribeHandler()` registers an endpoint that will be called by the Dapr runtime to register for pub-sub topics. This is is not needed unless using pub-sub.
`MapSubscribeHandler()` registers an endpoint that will be called by the Dapr runtime to register for pub/sub topics. This is is not needed unless using pub/sub.
`MapGet(...)` and `MapPost(...)` are provided by ASP.NET Core routing - these are used to setup endpoints to handle HTTP requests.
`WithTopic(...)` associates an endpoint with a pub-sub topic.
`WithTopic(...)` associates an endpoint with a pub/sub topic.
---

View File

@ -26,6 +26,11 @@ namespace RoutingSample
/// </summary>
public const string StoreName = "statestore";
/// <summary>
/// Pubsub component name. "pubsub" is name of the default pub/sub configured by the Dapr CLI.
/// </summary>
public const string PubsubName = "pubsub";
/// <summary>
/// Initializes a new instance of the <see cref="Startup"/> class.
/// </summary>
@ -77,8 +82,8 @@ namespace RoutingSample
endpoints.MapSubscribeHandler();
endpoints.MapGet("{id}", Balance);
endpoints.MapPost("deposit", Deposit).WithTopic("deposit");
endpoints.MapPost("withdraw", Withdraw).WithTopic("withdraw");
endpoints.MapPost("deposit", Deposit).WithTopic(PubsubName, "deposit");
endpoints.MapPost("withdraw", Withdraw).WithTopic(PubsubName, "withdraw");
});
async Task Balance(HttpContext context)

View File

@ -18,6 +18,7 @@ namespace DaprClient
{
private static readonly string stateKeyName = "mykey";
private static readonly string storeName = "statestore";
private static readonly string pubsubName = "pubsub";
/// <summary>
/// Main entry point.
@ -48,20 +49,20 @@ namespace DaprClient
await DeleteStateAsync(client);
#region Service Invoke - Required RoutingService
// This provides an example of how to invoke a method on another REST service that is listening on http.
// To use it run RoutingService in this solution.
// Invoke deposit operation on RoutingSample service by publishing event.
//// This provides an example of how to invoke a method on another REST service that is listening on http.
//// To use it run RoutingService in this solution.
//// Invoke deposit operation on RoutingSample service by publishing event.
//await PublishDepositeEventToRoutingSampleAsync(client);
//await Task.Delay(TimeSpan.FromSeconds(1));
//await DepositUsingServiceInvocation(client);
//Invoke deposit operation on RoutingSample service by POST.
////Invoke deposit operation on RoutingSample service by POST.
//await InvokeWithdrawServiceOperationAsync(client);
//Invoke deposit operation on RoutingSample service by GET.
////Invoke deposit operation on RoutingSample service by GET.
//await InvokeBalanceServiceOperationAsync(client);
#endregion
@ -71,14 +72,14 @@ namespace DaprClient
internal static async Task PublishDepositeEventToRoutingSampleAsync(DaprClient client)
{
var eventData = new { Id = "17", Amount = (decimal)10, };
await client.PublishEventAsync("deposit", eventData);
await client.PublishEventAsync(pubsubName, "deposit", eventData);
Console.WriteLine("Published deposit event!");
}
internal static async Task PublishEventAsync(DaprClient client)
{
var eventData = new Widget() { Size = "small", Color = "yellow", };
await client.PublishEventAsync("TopicA", eventData);
var eventData = new Widget() { Size = "small", Color = "yellow", };
await client.PublishEventAsync(pubsubName, "TopicA", eventData);
Console.WriteLine("Published Event!");
}

View File

@ -0,0 +1,49 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
namespace Dapr
{
using System;
/// <summary>
/// A utility class to perform argument validations.
/// </summary>
internal static class ArgumentVerifier
{
/// <summary>
/// Throws ArgumentNullException if argument is null.
/// </summary>
/// <param name="value">Argument value to check.</param>
/// <param name="name">Name of Argument.</param>
public static void ThrowIfNull(object value, string name)
{
if (value == null)
{
throw new ArgumentNullException(name);
}
}
/// <summary>
/// Validates string and throws:
/// ArgumentNullException if argument is null.
/// ArgumentException if argument is empty.
/// </summary>
/// <param name="value">Argument value to check.</param>
/// <param name="name">Name of Argument.</param>
public static void ThrowIfNullOrEmpty(string value, string name)
{
if (value == null)
{
throw new ArgumentNullException(name);
}
if (string.IsNullOrEmpty(value))
{
throw new ArgumentException("The value cannot be null or empty", nameof(name));
}
}
}
}

View File

@ -1,4 +1,4 @@
// ------------------------------------------------------------
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
@ -16,11 +16,12 @@ namespace Microsoft.AspNetCore.Builder
/// <summary>
/// Adds <see cref="TopicAttribute" /> metadata to the provided <see cref="IEndpointConventionBuilder" />.
/// </summary>
/// <param name="builder">The <see cref="IEndpointConventionBuilder" />.</param>
/// <param name="builder">The <see cref="IEndpointConventionBuilder" />.</param>\
/// <param name="pubsubName">The name of the pubsub component to use.</param>
/// <param name="name">The topic name.</param>
/// <typeparam name="T">The <see cref="IEndpointConventionBuilder" /> type.</typeparam>
/// <returns>The <see cref="IEndpointConventionBuilder" /> builder object.</returns>
public static T WithTopic<T>(this T builder, string name)
public static T WithTopic<T>(this T builder, string pubsubName, string name)
where T : IEndpointConventionBuilder
{
if (builder is null)
@ -28,13 +29,11 @@ namespace Microsoft.AspNetCore.Builder
throw new ArgumentNullException(nameof(builder));
}
if (string.IsNullOrEmpty(name))
{
throw new ArgumentException("The value cannot be null or empty.", nameof(name));
}
ArgumentVerifier.ThrowIfNullOrEmpty(pubsubName, nameof(pubsubName));
ArgumentVerifier.ThrowIfNullOrEmpty(name, nameof(name));
builder.WithMetadata(new TopicAttribute(name));
builder.WithMetadata(new TopicAttribute(pubsubName, name));
return builder;
}
}
}
}

View File

@ -1,4 +1,4 @@
// ------------------------------------------------------------
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
@ -41,10 +41,10 @@ namespace Microsoft.AspNetCore.Builder
.OfType<RouteEndpoint>()
.Where(e => e.Metadata.GetMetadata<TopicAttribute>()?.Name != null) // only endpoints which have TopicAttribute with not null Name.
.Distinct()
.Select(e => (e.Metadata.GetMetadata<TopicAttribute>().Name, e.RoutePattern));
.Select(e => (e.Metadata.GetMetadata<TopicAttribute>().PubsubName, e.Metadata.GetMetadata<TopicAttribute>().Name, e.RoutePattern));
context.Response.ContentType = "application/json";
using Utf8JsonWriter writer = new Utf8JsonWriter(context.Response.BodyWriter);
using var writer = new Utf8JsonWriter(context.Response.BodyWriter);
writer.WriteStartArray();
var logger = context.RequestServices.GetService<ILoggerFactory>().CreateLogger("DaprTopicSubscription");
@ -70,6 +70,7 @@ namespace Microsoft.AspNetCore.Builder
.Select(part => part.Content))));
writer.WriteString("route", route);
writer.WriteString("pubsubName", entry.PubsubName);
writer.WriteEndObject();
}

View File

@ -1,4 +1,4 @@
// ------------------------------------------------------------
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
@ -15,20 +15,25 @@ namespace Dapr
/// <summary>
/// Initializes a new instance of the <see cref="TopicAttribute" /> class.
/// </summary>
/// <param name="pubsubName">The name of the pubsub component to use.</param>
/// <param name="name">The topic name.</param>
public TopicAttribute(string name)
public TopicAttribute(string pubsubName, string name)
{
if (string.IsNullOrEmpty(name))
{
throw new ArgumentException("The value cannot be null or empty.", nameof(name));
}
ArgumentVerifier.ThrowIfNullOrEmpty(pubsubName, nameof(pubsubName));
ArgumentVerifier.ThrowIfNullOrEmpty(name, nameof(name));
this.Name = name;
this.PubsubName = pubsubName;
}
/// <summary>
/// Gets the topic name.
/// </summary>
public string Name { get; }
/// <summary>
/// Gets the pubsub component name name.
/// </summary>
public string PubsubName { get; }
}
}
}

View File

@ -18,20 +18,22 @@ namespace Dapr.Client
/// <summary>
/// Publishes an event to the specified topic.
/// </summary>
/// <param name="pubsubName">The name of the pubsub component to use.</param>
/// <param name="topicName">The name of the topic the request should be published to.</param>
/// <param name="data">The event data.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken" /> that can be used to cancel the operation.</param>
/// <typeparam name="TData">The data type of the object that will be serialized.</typeparam>
/// <returns>A <see cref="Task" /> that will complete when the operation has completed.</returns>
public abstract Task PublishEventAsync<TData>(string topicName, TData data, CancellationToken cancellationToken = default);
public abstract Task PublishEventAsync<TData>(string pubsubName, string topicName, TData data, CancellationToken cancellationToken = default);
/// <summary>
/// Publishes an event to the specified topic.
/// </summary>
/// <param name="pubsubName">The name of the pubsub component to use.</param>
/// <param name="topicName">The name of the topic the request should be published to.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken" /> that can be used to cancel the operation.</param>
/// <returns>A <see cref="Task" /> that will complete when the operation has completed.</returns>
public abstract Task PublishEventAsync(string topicName, CancellationToken cancellationToken = default);
public abstract Task PublishEventAsync(string pubsubName,string topicName, CancellationToken cancellationToken = default);
/// <summary>
/// Invokes an output binding.

View File

@ -38,25 +38,28 @@ namespace Dapr.Client
#region Publish Apis
/// <inheritdoc/>
public override Task PublishEventAsync<TData>(string topicName, TData data, CancellationToken cancellationToken = default)
public override Task PublishEventAsync<TData>(string pubsubName, string topicName, TData data, CancellationToken cancellationToken = default)
{
ArgumentVerifier.ThrowIfNullOrEmpty(pubsubName, nameof(pubsubName));
ArgumentVerifier.ThrowIfNullOrEmpty(topicName, nameof(topicName));
ArgumentVerifier.ThrowIfNull(data, nameof(data));
return MakePublishRequest(topicName, data, cancellationToken);
return MakePublishRequest(pubsubName, topicName, data, cancellationToken);
}
/// <inheritdoc/>
public override Task PublishEventAsync(string topicName, CancellationToken cancellationToken = default)
public override Task PublishEventAsync(string pubsubName, string topicName, CancellationToken cancellationToken = default)
{
ArgumentVerifier.ThrowIfNullOrEmpty(pubsubName, nameof(pubsubName));
ArgumentVerifier.ThrowIfNullOrEmpty(topicName, nameof(topicName));
return MakePublishRequest(topicName, string.Empty, cancellationToken);
return MakePublishRequest(pubsubName,topicName, string.Empty, cancellationToken);
}
private async Task MakePublishRequest<TContent>(string topicName, TContent content, CancellationToken cancellationToken)
private async Task MakePublishRequest<TContent>(string pubsubName, string topicName, TContent content, CancellationToken cancellationToken)
{
// Create PublishEventEnvelope
var envelope = new Autogenerated.PublishEventRequest()
{
{
PubsubName = pubsubName,
Topic = topicName,
};

View File

@ -67,6 +67,9 @@ message TopicEventRequest {
// The pubsub topic which publisher sent to.
string topic = 6;
// The name of the pubsub the publisher sent to.
string pubsub_name = 8;
}
// TopicEventResponse is response from app on published message
@ -121,15 +124,18 @@ message ListTopicSubscriptionsResponse {
// TopicSubscription represents topic and metadata.
message TopicSubscription {
// Required. The name of the pubsub containing the topic below to subscribe to.
string pubsub_name = 1;
// Required. The name of topic which will be subscribed
string topic = 1;
string topic = 2;
// The optional properties used for this topic's subscribtion e.g. session id
map<string,string> metadata = 2;
map<string,string> metadata = 3;
}
// ListInputBindingsResponse is the message including the list of input bindings.
message ListInputBindingsResponse {
// The list of input bindings.
repeated string bindings = 1;
}
}

View File

@ -136,11 +136,14 @@ message SaveStateRequest {
// PublishEventRequest is the message to publish event data to pubsub topic
message PublishEventRequest {
// The name of the pubsub component
string pubsub_name = 1;
// The pubsub topic
string topic = 1;
string topic = 2;
// The data which will be published to topic.
bytes data = 2;
bytes data = 3;
}
// InvokeBindingRequest is the message to send data to output bindings

View File

@ -13,13 +13,13 @@ namespace Dapr.AspNetCore.IntegrationTest.App
[ApiController]
public class DaprController : ControllerBase
{
[Topic("B")]
[Topic("pubsub", "B")]
[HttpPost("/B")]
public void TopicB()
{
}
[Topic("register-user")]
[Topic("pubsub", "register-user")]
[HttpPost("/register-user")]
public ActionResult<UserInfo> RegisterUser(UserInfo user)
{

View File

@ -45,7 +45,7 @@ namespace Dapr.AspNetCore.IntegrationTest.App
endpoints.MapSubscribeHandler();
endpoints.MapControllers();
endpoints.MapPost("/topic-a", context => Task.CompletedTask).WithTopic("A");
endpoints.MapPost("/topic-a", context => Task.CompletedTask).WithTopic("testpubsub", "A");
endpoints.MapPost("/routingwithstateentry/{widget}", async context =>
{

View File

@ -14,6 +14,8 @@ namespace Dapr.Client.Test
public class PublishEventApiTest
{
const string TestPubsubName = "testpubsubname";
[Fact]
public async Task PublishEventAsync_CanPublishTopicWithData()
{
@ -23,12 +25,13 @@ namespace Dapr.Client.Test
.Build();
var publishData = new PublishData() { PublishObjectParameter = "testparam" };
var task = daprClient.PublishEventAsync<PublishData>("test", publishData);
var task = daprClient.PublishEventAsync<PublishData>(TestPubsubName, "test", publishData);
httpClient.Requests.TryDequeue(out var entry).Should().BeTrue();
var request = await GrpcUtils.GetRequestFromRequestMessageAsync<PublishEventRequest>(entry.Request);
var jsonFromRequest = request.Data.ToStringUtf8();
request.PubsubName.Should().Be(TestPubsubName);
request.Topic.Should().Be("test");
jsonFromRequest.Should().Be(JsonSerializer.Serialize(publishData));
}
@ -42,11 +45,12 @@ namespace Dapr.Client.Test
.Build();
var task = daprClient.PublishEventAsync("test");
var task = daprClient.PublishEventAsync(TestPubsubName, "test");
httpClient.Requests.TryDequeue(out var entry).Should().BeTrue();
var request = await GrpcUtils.GetRequestFromRequestMessageAsync<PublishEventRequest>(entry.Request);
var jsonFromRequest = request.Data.ToStringUtf8();
request.PubsubName.Should().Be(TestPubsubName);
request.Topic.Should().Be("test");
jsonFromRequest.Should().Be("\"\"");
}