mirror of https://github.com/dapr/dotnet-sdk.git
Add dead letter topic support (#929)
Closes https://github.com/dapr/dotnet-sdk/issues/897 Signed-off-by: Yash Nisar <yashnisar@microsoft.com> Signed-off-by: Yash Nisar <yashnisar@microsoft.com>
This commit is contained in:
parent
62c1d72c41
commit
e6ded69ca4
|
@ -64,18 +64,39 @@ namespace ControllerSample.Controllers
|
|||
/// <param name="daprClient">State client to interact with Dapr runtime.</param>
|
||||
/// <returns>A <see cref="Task{TResult}"/> representing the result of the asynchronous operation.</returns>
|
||||
/// "pubsub", the first parameter into the Topic attribute, is name of the default pub/sub configured by the Dapr CLI.
|
||||
[Topic("pubsub", "deposit")]
|
||||
[Topic("pubsub", "deposit", "amountDeadLetterTopic", false)]
|
||||
[HttpPost("deposit")]
|
||||
public async Task<ActionResult<Account>> Deposit(Transaction transaction, [FromServices] DaprClient daprClient)
|
||||
{
|
||||
logger.LogDebug("Enter deposit");
|
||||
var state = await daprClient.GetStateEntryAsync<Account>(StoreName, transaction.Id);
|
||||
state.Value ??= new Account() { Id = transaction.Id, };
|
||||
logger.LogDebug("Id is {0}, the amount to be deposited is {1}", transaction.Id, transaction.Amount);
|
||||
|
||||
if (transaction.Amount < 0m)
|
||||
{
|
||||
return BadRequest(new { statusCode = 400, message = "bad request" });
|
||||
}
|
||||
|
||||
state.Value.Balance += transaction.Amount;
|
||||
logger.LogDebug("Balance is {0}", state.Value.Balance);
|
||||
await state.SaveAsync();
|
||||
return state.Value;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Method for viewing the error message when the deposit/withdrawal amounts
|
||||
/// are negative.
|
||||
/// </summary>
|
||||
/// <param name="transaction">Transaction info.</param>
|
||||
[Topic("pubsub", "amountDeadLetterTopic")]
|
||||
[HttpPost("deadLetterTopicRoute")]
|
||||
public ActionResult<Account> ViewErrorMessage(Transaction transaction)
|
||||
{
|
||||
logger.LogDebug("The amount cannot be negative: {0}", transaction.Amount);
|
||||
return Ok();
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Method for withdrawing from account as specified in transaction.
|
||||
/// </summary>
|
||||
|
@ -83,19 +104,25 @@ namespace ControllerSample.Controllers
|
|||
/// <param name="daprClient">State client to interact with Dapr runtime.</param>
|
||||
/// <returns>A <see cref="Task{TResult}"/> representing the result of the asynchronous operation.</returns>
|
||||
/// "pubsub", the first parameter into the Topic attribute, is name of the default pub/sub configured by the Dapr CLI.
|
||||
[Topic("pubsub", "withdraw")]
|
||||
[Topic("pubsub", "withdraw", "amountDeadLetterTopic", false)]
|
||||
[HttpPost("withdraw")]
|
||||
public async Task<ActionResult<Account>> Withdraw(Transaction transaction, [FromServices] DaprClient daprClient)
|
||||
{
|
||||
logger.LogDebug("Enter withdraw");
|
||||
logger.LogDebug("Enter withdraw method...");
|
||||
var state = await daprClient.GetStateEntryAsync<Account>(StoreName, transaction.Id);
|
||||
logger.LogDebug("Id is {0}, the amount to be withdrawn is {1}", transaction.Id, transaction.Amount);
|
||||
|
||||
if (state.Value == null)
|
||||
{
|
||||
return this.NotFound();
|
||||
}
|
||||
if (transaction.Amount < 0m)
|
||||
{
|
||||
return BadRequest(new { statusCode = 400, message = "bad request" });
|
||||
}
|
||||
|
||||
state.Value.Balance -= transaction.Amount;
|
||||
logger.LogDebug("Balance is {0}", state.Value.Balance);
|
||||
await state.SaveAsync();
|
||||
return state.Value;
|
||||
}
|
||||
|
@ -134,7 +161,7 @@ namespace ControllerSample.Controllers
|
|||
[HttpPost("throwException")]
|
||||
public async Task<ActionResult<Account>> ThrowException(Transaction transaction, [FromServices] DaprClient daprClient)
|
||||
{
|
||||
Console.WriteLine("Enter ThrowException");
|
||||
logger.LogDebug("Enter ThrowException");
|
||||
var task = Task.Delay(10);
|
||||
await task;
|
||||
return BadRequest(new { statusCode = 400, message = "bad request" });
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
// ------------------------------------------------------------------------
|
||||
// ------------------------------------------------------------------------
|
||||
// Copyright 2021 The Dapr Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
|
|
|
@ -133,7 +133,32 @@ On Windows:
|
|||
dapr publish --pubsub pubsub --publish-app-id controller -t deposit -d "{\"id\": \"17\", \"amount\": 15 }"
|
||||
```
|
||||
---
|
||||
**Dead Letter Topic example (pubsub)**
|
||||
Publish an event using the Dapr cli with an incorrect input, i.e. negative amount:
|
||||
|
||||
Deposit:
|
||||
On Linux, MacOS:
|
||||
```sh
|
||||
dapr publish --pubsub pubsub --publish-app-id controller -t deposit -d '{"id": "17", "amount": -15 }'
|
||||
```
|
||||
On Windows:
|
||||
```sh
|
||||
dapr publish --pubsub pubsub --publish-app-id controller -t deposit -d "{\"id\": \"17\", \"amount\": -15 }"
|
||||
```
|
||||
|
||||
Withdraw:
|
||||
On Linux, MacOS:
|
||||
```sh
|
||||
dapr publish --pubsub pubsub --publish-app-id controller -t withdraw -d '{"id": "17", "amount": -15 }'
|
||||
```
|
||||
On Windows:
|
||||
```sh
|
||||
dapr publish --pubsub pubsub --publish-app-id controller -t withdraw -d "{\"id\": \"17\", \"amount\": -15 }"
|
||||
```
|
||||
|
||||
First a message is sent from a publisher on a `deposit` or `withdraw` topic. Dapr receives the message on behalf of a subscriber application, however the `deposit` or `withdraw` topic message fails to be delivered to the `/deposit` or `/withdraw` endpoint on the application, even after retries. As a result of the failure to deliver, the message is forwarded to the `amountDeadLetterTopic` topic which delivers this to the `/deadLetterTopicRoute` endpoint.
|
||||
|
||||
---
|
||||
## Code Samples
|
||||
|
||||
*All of the interesting code in this sample is in Startup.cs and Controllers/SampleController.cs*
|
||||
|
|
|
@ -28,8 +28,7 @@ namespace ControllerSample
|
|||
|
||||
/// <summary>
|
||||
/// Gets or sets amount for the transaction.
|
||||
/// </summary>
|
||||
[Range(0, double.MaxValue)]
|
||||
/// </summary
|
||||
public decimal Amount { get; set; }
|
||||
}
|
||||
}
|
|
@ -131,7 +131,31 @@ On Windows:
|
|||
dapr publish --pubsub pubsub --publish-app-id routing -t deposit -d "{\"id\": \"17\", \"amount\": 15 }"
|
||||
```
|
||||
---
|
||||
**Dead Letter Topic example (pubsub)**
|
||||
Publish an event using the Dapr cli with an incorrect input, i.e. negative amount:
|
||||
|
||||
Deposit:
|
||||
On Linux, MacOS:
|
||||
```sh
|
||||
dapr publish --pubsub pubsub --publish-app-id routing -t deposit -d '{"id": "17", "amount": -15 }'
|
||||
```
|
||||
On Windows:
|
||||
```sh
|
||||
dapr publish --pubsub pubsub --publish-app-id routing -t deposit -d "{\"id\": \"17\", \"amount\": -15 }"
|
||||
```
|
||||
|
||||
Withdraw:
|
||||
On Linux, MacOS:
|
||||
```sh
|
||||
dapr publish --pubsub pubsub --publish-app-id routing -t withdraw -d '{"id": "17", "amount": -15 }'
|
||||
```
|
||||
On Windows:
|
||||
```sh
|
||||
dapr publish --pubsub pubsub --publish-app-id routing -t withdraw -d "{\"id\": \"17\", \"amount\": -15 }"
|
||||
```
|
||||
First a message is sent from a publisher on a `deposit` or `withdraw` topic. Dapr receives the message on behalf of a subscriber application, however the `deposit` or `withdraw` topic message fails to be delivered to the `/deposit` or `/withdraw` endpoint on the application, even after retries. As a result of the failure to deliver, the message is forwarded to the `amountDeadLetterTopic` topic which delivers this to the `/deadLetterTopicRoute` endpoint.
|
||||
|
||||
---
|
||||
## Code Samples
|
||||
|
||||
*All of the interesting code in this sample is in Startup.cs*
|
||||
|
@ -179,6 +203,18 @@ app.UseEndpoints(endpoints =>
|
|||
`MapGet(...)` and `MapPost(...)` are provided by ASP.NET Core routing - these are used to setup endpoints to handle HTTP requests.
|
||||
|
||||
`WithTopic(...)` associates an endpoint with a pub/sub topic.
|
||||
```C#
|
||||
var depositTopicOptions = new TopicOptions();
|
||||
depositTopicOptions.PubsubName = PubsubName;
|
||||
depositTopicOptions.Name = "deposit";
|
||||
depositTopicOptions.DeadLetterTopic = "amountDeadLetterTopic";
|
||||
|
||||
var withdrawTopicOptions = new TopicOptions();
|
||||
withdrawTopicOptions.PubsubName = PubsubName;
|
||||
withdrawTopicOptions.Name = "withdraw";
|
||||
withdrawTopicOptions.DeadLetterTopic = "amountDeadLetterTopic";
|
||||
```
|
||||
`WithTopic(...)` now takes the `TopicOptions(..)` instance that defines configurations for the subscribe endpoint.
|
||||
|
||||
---
|
||||
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
// ------------------------------------------------------------------------
|
||||
// ------------------------------------------------------------------------
|
||||
// Copyright 2021 The Dapr Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
|
@ -16,6 +16,7 @@ namespace RoutingSample
|
|||
using System;
|
||||
using System.Text.Json;
|
||||
using System.Threading.Tasks;
|
||||
using Dapr;
|
||||
using Dapr.Client;
|
||||
using Microsoft.AspNetCore.Builder;
|
||||
using Microsoft.AspNetCore.Hosting;
|
||||
|
@ -23,6 +24,7 @@ namespace RoutingSample
|
|||
using Microsoft.Extensions.Configuration;
|
||||
using Microsoft.Extensions.DependencyInjection;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
|
||||
/// <summary>
|
||||
/// Startup class.
|
||||
|
@ -74,7 +76,8 @@ namespace RoutingSample
|
|||
/// <param name="app">Application builder.</param>
|
||||
/// <param name="env">Webhost environment.</param>
|
||||
/// <param name="serializerOptions">Options for JSON serialization.</param>
|
||||
public void Configure(IApplicationBuilder app, IWebHostEnvironment env, JsonSerializerOptions serializerOptions)
|
||||
public void Configure(IApplicationBuilder app, IWebHostEnvironment env, JsonSerializerOptions serializerOptions,
|
||||
ILogger<Startup> logger)
|
||||
{
|
||||
if (env.IsDevelopment())
|
||||
{
|
||||
|
@ -89,27 +92,38 @@ namespace RoutingSample
|
|||
{
|
||||
endpoints.MapSubscribeHandler();
|
||||
|
||||
var depositTopicOptions = new TopicOptions();
|
||||
depositTopicOptions.PubsubName = PubsubName;
|
||||
depositTopicOptions.Name = "deposit";
|
||||
depositTopicOptions.DeadLetterTopic = "amountDeadLetterTopic";
|
||||
|
||||
var withdrawTopicOptions = new TopicOptions();
|
||||
withdrawTopicOptions.PubsubName = PubsubName;
|
||||
withdrawTopicOptions.Name = "withdraw";
|
||||
withdrawTopicOptions.DeadLetterTopic = "amountDeadLetterTopic";
|
||||
|
||||
endpoints.MapGet("{id}", Balance);
|
||||
endpoints.MapPost("deposit", Deposit).WithTopic(PubsubName, "deposit");
|
||||
endpoints.MapPost("withdraw", Withdraw).WithTopic(PubsubName, "withdraw");
|
||||
endpoints.MapPost("deposit", Deposit).WithTopic(depositTopicOptions);
|
||||
endpoints.MapPost("deadLetterTopicRoute", ViewErrorMessage).WithTopic(PubsubName, "amountDeadLetterTopic");
|
||||
endpoints.MapPost("withdraw", Withdraw).WithTopic(withdrawTopicOptions);
|
||||
});
|
||||
|
||||
async Task Balance(HttpContext context)
|
||||
{
|
||||
Console.WriteLine("Enter Balance");
|
||||
logger.LogDebug("Enter Balance");
|
||||
var client = context.RequestServices.GetRequiredService<DaprClient>();
|
||||
|
||||
var id = (string)context.Request.RouteValues["id"];
|
||||
Console.WriteLine("id is {0}", id);
|
||||
logger.LogDebug("id is {0}", id);
|
||||
var account = await client.GetStateAsync<Account>(StoreName, id);
|
||||
if (account == null)
|
||||
{
|
||||
Console.WriteLine("Account not found");
|
||||
logger.LogDebug("Account not found");
|
||||
context.Response.StatusCode = 404;
|
||||
return;
|
||||
}
|
||||
|
||||
Console.WriteLine("Account balance is {0}", account.Balance);
|
||||
logger.LogDebug("Account balance is {0}", account.Balance);
|
||||
|
||||
context.Response.ContentType = "application/json";
|
||||
await JsonSerializer.SerializeAsync(context.Response.Body, account, serializerOptions);
|
||||
|
@ -117,12 +131,13 @@ namespace RoutingSample
|
|||
|
||||
async Task Deposit(HttpContext context)
|
||||
{
|
||||
Console.WriteLine("Enter Deposit");
|
||||
|
||||
var client = context.RequestServices.GetRequiredService<DaprClient>();
|
||||
logger.LogDebug("Enter Deposit");
|
||||
|
||||
var client = context.RequestServices.GetRequiredService<DaprClient>();
|
||||
var transaction = await JsonSerializer.DeserializeAsync<Transaction>(context.Request.Body, serializerOptions);
|
||||
Console.WriteLine("Id is {0}, Amount is {1}", transaction.Id, transaction.Amount);
|
||||
|
||||
logger.LogDebug("Id is {0}, Amount is {1}", transaction.Id, transaction.Amount);
|
||||
|
||||
var account = await client.GetStateAsync<Account>(StoreName, transaction.Id);
|
||||
if (account == null)
|
||||
{
|
||||
|
@ -131,43 +146,56 @@ namespace RoutingSample
|
|||
|
||||
if (transaction.Amount < 0m)
|
||||
{
|
||||
Console.WriteLine("Invalid amount");
|
||||
logger.LogDebug("Invalid amount");
|
||||
context.Response.StatusCode = 400;
|
||||
return;
|
||||
}
|
||||
|
||||
account.Balance += transaction.Amount;
|
||||
await client.SaveStateAsync(StoreName, transaction.Id, account);
|
||||
Console.WriteLine("Balance is {0}", account.Balance);
|
||||
logger.LogDebug("Balance is {0}", account.Balance);
|
||||
|
||||
context.Response.ContentType = "application/json";
|
||||
await JsonSerializer.SerializeAsync(context.Response.Body, account, serializerOptions);
|
||||
}
|
||||
|
||||
async Task Withdraw(HttpContext context)
|
||||
async Task ViewErrorMessage(HttpContext context)
|
||||
{
|
||||
Console.WriteLine("Enter Withdraw");
|
||||
var client = context.RequestServices.GetRequiredService<DaprClient>();
|
||||
var transaction = await JsonSerializer.DeserializeAsync<Transaction>(context.Request.Body, serializerOptions);
|
||||
Console.WriteLine("Id is {0}", transaction.Id);
|
||||
|
||||
logger.LogDebug("The amount cannot be negative: {0}", transaction.Amount);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
async Task Withdraw(HttpContext context)
|
||||
{
|
||||
logger.LogDebug("Enter Withdraw");
|
||||
|
||||
var client = context.RequestServices.GetRequiredService<DaprClient>();
|
||||
var transaction = await JsonSerializer.DeserializeAsync<Transaction>(context.Request.Body, serializerOptions);
|
||||
|
||||
logger.LogDebug("Id is {0}, Amount is {1}", transaction.Id, transaction.Amount);
|
||||
|
||||
var account = await client.GetStateAsync<Account>(StoreName, transaction.Id);
|
||||
if (account == null)
|
||||
{
|
||||
Console.WriteLine("Account not found");
|
||||
logger.LogDebug("Account not found");
|
||||
context.Response.StatusCode = 404;
|
||||
return;
|
||||
}
|
||||
|
||||
if (transaction.Amount < 0m)
|
||||
{
|
||||
Console.WriteLine("Invalid amount");
|
||||
logger.LogDebug("Invalid amount");
|
||||
context.Response.StatusCode = 400;
|
||||
return;
|
||||
}
|
||||
|
||||
account.Balance -= transaction.Amount;
|
||||
await client.SaveStateAsync(StoreName, transaction.Id, account);
|
||||
Console.WriteLine("Balance is {0}", account.Balance);
|
||||
logger.LogDebug("Balance is {0}", account.Balance);
|
||||
|
||||
context.Response.ContentType = "application/json";
|
||||
await JsonSerializer.SerializeAsync(context.Response.Body, account, serializerOptions);
|
||||
|
|
|
@ -15,7 +15,10 @@ namespace Microsoft.AspNetCore.Builder
|
|||
{
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Reflection;
|
||||
using System.Xml.Linq;
|
||||
using Dapr;
|
||||
using Grpc.Core;
|
||||
|
||||
/// <summary>
|
||||
/// Contains extension methods for <see cref="IEndpointConventionBuilder" />.
|
||||
|
@ -103,5 +106,43 @@ namespace Microsoft.AspNetCore.Builder
|
|||
}
|
||||
return builder;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Adds <see cref="ITopicMetadata" /> metadata to the provided <see cref="IEndpointConventionBuilder" />.
|
||||
/// </summary>
|
||||
/// <param name="builder">The <see cref="IEndpointConventionBuilder" />.</param>\
|
||||
/// <param name="topicOptions">The object of TopicOptions class that provides all topic attributes.</param>
|
||||
/// <typeparam name="T">The <see cref="IEndpointConventionBuilder" /> type.</typeparam>
|
||||
/// <returns>The <see cref="IEndpointConventionBuilder" /> builder object.</returns>
|
||||
public static T WithTopic<T>(this T builder, TopicOptions topicOptions)
|
||||
where T : IEndpointConventionBuilder
|
||||
{
|
||||
if (builder is null)
|
||||
{
|
||||
throw new ArgumentNullException(nameof(builder));
|
||||
}
|
||||
|
||||
ArgumentVerifier.ThrowIfNullOrEmpty(topicOptions.PubsubName, nameof(topicOptions.PubsubName));
|
||||
ArgumentVerifier.ThrowIfNullOrEmpty(topicOptions.Name, nameof(topicOptions.Name));
|
||||
|
||||
var topicObject = new TopicAttribute(topicOptions.PubsubName, topicOptions.Name, topicOptions.DeadLetterTopic, topicOptions.EnableRawPayload);
|
||||
|
||||
topicObject.Match = topicOptions.Match;
|
||||
topicObject.Priority = topicOptions.Priority;
|
||||
topicObject.OwnedMetadatas = topicOptions.OwnedMetadatas;
|
||||
topicObject.MetadataSeparator = topicObject.MetadataSeparator;
|
||||
|
||||
if (topicOptions.Metadata is not null)
|
||||
{
|
||||
foreach (var md in topicOptions.Metadata)
|
||||
{
|
||||
builder.WithMetadata(new TopicMetadataAttribute(md.Key, md.Value));
|
||||
}
|
||||
}
|
||||
|
||||
builder.WithMetadata(topicObject);
|
||||
|
||||
return builder;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -72,12 +72,13 @@ namespace Microsoft.AspNetCore.Builder
|
|||
var topicMetadata = e.Metadata.GetOrderedMetadata<ITopicMetadata>();
|
||||
var originalTopicMetadata = e.Metadata.GetOrderedMetadata<IOriginalTopicMetadata>();
|
||||
|
||||
var subs = new List<(string PubsubName, string Name, bool? EnableRawPayload, string Match, int Priority, Dictionary<string, string[]> OriginalTopicMetadata, string MetadataSeparator, RoutePattern RoutePattern)>();
|
||||
var subs = new List<(string PubsubName, string Name, string DeadLetterTopic, bool? EnableRawPayload, string Match, int Priority, Dictionary<string, string[]> OriginalTopicMetadata, string MetadataSeparator, RoutePattern RoutePattern)>();
|
||||
|
||||
for (int i = 0; i < topicMetadata.Count(); i++)
|
||||
{
|
||||
subs.Add((topicMetadata[i].PubsubName,
|
||||
topicMetadata[i].Name,
|
||||
(topicMetadata[i] as IDeadLetterTopicMetadata)?.DeadLetterTopic,
|
||||
(topicMetadata[i] as IRawTopicMetadata)?.EnableRawPayload,
|
||||
topicMetadata[i].Match,
|
||||
topicMetadata[i].Priority,
|
||||
|
@ -133,6 +134,11 @@ namespace Microsoft.AspNetCore.Builder
|
|||
Metadata = metadata.Count > 0 ? metadata : null,
|
||||
};
|
||||
|
||||
if (first.DeadLetterTopic != null)
|
||||
{
|
||||
subscription.DeadLetterTopic = first.DeadLetterTopic;
|
||||
}
|
||||
|
||||
// Use the V2 routing rules structure
|
||||
if (rules.Count > 0)
|
||||
{
|
||||
|
|
|
@ -0,0 +1,27 @@
|
|||
// ------------------------------------------------------------------------
|
||||
// Copyright 2021 The Dapr Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
// ------------------------------------------------------------------------
|
||||
|
||||
namespace Dapr
|
||||
{
|
||||
/// <summary>
|
||||
/// IDeadLetterTopicMetadata that describes the metadata of a dead letter topic.
|
||||
/// </summary>
|
||||
public interface IDeadLetterTopicMetadata
|
||||
{
|
||||
/// <summary>
|
||||
/// Gets the dead letter topic name
|
||||
/// </summary>
|
||||
public string DeadLetterTopic { get; }
|
||||
}
|
||||
}
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
// ------------------------------------------------------------------------
|
||||
// ------------------------------------------------------------------------
|
||||
// Copyright 2021 The Dapr Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
|
|
|
@ -44,6 +44,11 @@ namespace Dapr
|
|||
/// Gets or sets the metadata.
|
||||
/// </summary>
|
||||
public Metadata Metadata { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Gets or sets the deadletter topic.
|
||||
/// </summary>
|
||||
public string DeadLetterTopic { get; set; }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
|
|
|
@ -19,7 +19,7 @@ namespace Dapr
|
|||
/// TopicAttribute describes an endpoint as a subscriber to a topic.
|
||||
/// </summary>
|
||||
[AttributeUsage(AttributeTargets.All, AllowMultiple = true)]
|
||||
public class TopicAttribute : Attribute, ITopicMetadata, IRawTopicMetadata, IOwnedOriginalTopicMetadata
|
||||
public class TopicAttribute : Attribute, ITopicMetadata, IRawTopicMetadata, IOwnedOriginalTopicMetadata, IDeadLetterTopicMetadata
|
||||
{
|
||||
/// <summary>
|
||||
/// Initializes a new instance of the <see cref="TopicAttribute" /> class.
|
||||
|
@ -105,25 +105,50 @@ namespace Dapr
|
|||
this.MetadataSeparator = metadataSeparator;
|
||||
}
|
||||
|
||||
/// <inheritdoc/>
|
||||
public string Name { get; }
|
||||
/// <summary>
|
||||
/// Initializes a new instance of the <see cref="TopicAttribute" /> class.
|
||||
/// </summary>
|
||||
/// <param name="pubsubName">The name of the pubsub component to use.</param>
|
||||
/// <param name="name">The topic name.</param>
|
||||
/// <param name="deadLetterTopic">The dead letter topic name.</param>
|
||||
/// <param name="enableRawPayload">The enable/disable raw pay load flag.</param>
|
||||
/// <param name="ownedMetadatas">The topic owned metadata ids.</param>
|
||||
/// <param name="metadataSeparator">Separator to use for metadata.</param>
|
||||
public TopicAttribute(string pubsubName, string name, string deadLetterTopic, bool enableRawPayload, string[] ownedMetadatas = null, string metadataSeparator = null)
|
||||
{
|
||||
ArgumentVerifier.ThrowIfNullOrEmpty(pubsubName, nameof(pubsubName));
|
||||
ArgumentVerifier.ThrowIfNullOrEmpty(name, nameof(name));
|
||||
|
||||
this.Name = name;
|
||||
this.PubsubName = pubsubName;
|
||||
this.DeadLetterTopic = deadLetterTopic;
|
||||
this.EnableRawPayload = enableRawPayload;
|
||||
this.OwnedMetadatas = ownedMetadatas;
|
||||
this.MetadataSeparator = metadataSeparator;
|
||||
}
|
||||
|
||||
/// <inheritdoc/>
|
||||
public string PubsubName { get; }
|
||||
public string Name { get; set; }
|
||||
|
||||
/// <inheritdoc/>
|
||||
public bool? EnableRawPayload { get; }
|
||||
public string PubsubName { get; set; }
|
||||
|
||||
/// <inheritdoc/>
|
||||
public new string Match { get; }
|
||||
public bool? EnableRawPayload { get; set; }
|
||||
|
||||
/// <inheritdoc/>
|
||||
public int Priority { get; }
|
||||
public new string Match { get; set; }
|
||||
|
||||
/// <inheritdoc/>
|
||||
public string[] OwnedMetadatas { get; }
|
||||
public int Priority { get; set; }
|
||||
|
||||
/// <inheritdoc/>
|
||||
public string MetadataSeparator { get; }
|
||||
public string[] OwnedMetadatas { get; set; }
|
||||
|
||||
/// <inheritdoc/>
|
||||
public string MetadataSeparator { get; set; }
|
||||
|
||||
/// <inheritdoc/>
|
||||
public string DeadLetterTopic { get; set; }
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,67 @@
|
|||
// ------------------------------------------------------------------------
|
||||
// Copyright 2022 The Dapr Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
// ------------------------------------------------------------------------
|
||||
|
||||
namespace Dapr
|
||||
{
|
||||
using System.Collections.Generic;
|
||||
/// <summary>
|
||||
/// This class defines configurations for the subscribe endpoint.
|
||||
/// </summary>
|
||||
public class TopicOptions
|
||||
{
|
||||
/// <summary>
|
||||
/// Gets or Sets the topic name.
|
||||
/// </summary>
|
||||
public string Name { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Gets or Sets the name of the pubsub component to use.
|
||||
/// </summary>
|
||||
public string PubsubName { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Gets or Sets a value which indicates whether to enable or disable processing raw messages.
|
||||
/// </summary>
|
||||
public bool EnableRawPayload { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Gets or Sets the CEL expression to use to match events for this handler.
|
||||
/// </summary>
|
||||
public string Match { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Gets or Sets the priority in which this rule should be evaluated (lower to higher).
|
||||
/// </summary>
|
||||
public int Priority { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Gets or Sets the <see cref="IOriginalTopicMetadata.Id"/> owned by topic.
|
||||
/// </summary>
|
||||
public string[] OwnedMetadatas { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Get or Sets the separator to use for metadata.
|
||||
/// </summary>
|
||||
public string MetadataSeparator { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Gets or Sets the dead letter topic.
|
||||
/// </summary>
|
||||
public string DeadLetterTopic { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Gets or Sets the original topic metadata.
|
||||
/// </summary>
|
||||
public IDictionary<string, string> Metadata;
|
||||
}
|
||||
}
|
|
@ -160,6 +160,9 @@ message TopicSubscription {
|
|||
// The optional routing rules to match against. In the gRPC interface, OnTopicEvent
|
||||
// is still invoked but the matching path is sent in the TopicEventRequest.
|
||||
TopicRoutes routes = 5;
|
||||
|
||||
// The optional dead letter queue for this topic to send events to.
|
||||
string dead_letter_topic = 6;
|
||||
}
|
||||
|
||||
message TopicRoutes {
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
// ------------------------------------------------------------------------
|
||||
// ------------------------------------------------------------------------
|
||||
// Copyright 2021 The Dapr Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
|
|
|
@ -67,6 +67,12 @@ namespace Dapr.AspNetCore.IntegrationTest.App
|
|||
public void MultipleTopics()
|
||||
{
|
||||
}
|
||||
|
||||
[Topic("pubsub", "G", "deadLetterTopicName", false)]
|
||||
[HttpPost("/G")]
|
||||
public void TopicG()
|
||||
{
|
||||
}
|
||||
|
||||
[Topic("pubsub", "metadata", new string[1] { "id1" })]
|
||||
[Topic("pubsub", "metadata.1", true)]
|
||||
|
|
|
@ -39,14 +39,22 @@ namespace Dapr.AspNetCore.IntegrationTest
|
|||
var json = await JsonSerializer.DeserializeAsync<JsonElement>(stream);
|
||||
|
||||
json.ValueKind.Should().Be(JsonValueKind.Array);
|
||||
json.GetArrayLength().Should().Be(16);
|
||||
var subscriptions = new List<(string PubsubName, string Topic, string Route, string rawPayload, string match, string metadata)>();
|
||||
json.GetArrayLength().Should().Be(17);
|
||||
|
||||
var subscriptions = new List<(string PubsubName, string Topic, string Route, string rawPayload, string match, string metadata, string DeadLetterTopic)>();
|
||||
|
||||
foreach (var element in json.EnumerateArray())
|
||||
{
|
||||
var pubsubName = element.GetProperty("pubsubName").GetString();
|
||||
var topic = element.GetProperty("topic").GetString();
|
||||
var rawPayload = string.Empty;
|
||||
var deadLetterTopic = string.Empty;
|
||||
Dictionary<string, string> originalMetadata = new Dictionary<string, string>();
|
||||
|
||||
if (element.TryGetProperty("deadLetterTopic", out JsonElement DeadLetterTopic))
|
||||
{
|
||||
deadLetterTopic = DeadLetterTopic.GetString();
|
||||
}
|
||||
if (element.TryGetProperty("metadata", out JsonElement metadata))
|
||||
{
|
||||
if (metadata.TryGetProperty("rawPayload", out JsonElement rawPayloadJson))
|
||||
|
@ -70,7 +78,7 @@ namespace Dapr.AspNetCore.IntegrationTest
|
|||
|
||||
if (element.TryGetProperty("route", out JsonElement route))
|
||||
{
|
||||
subscriptions.Add((pubsubName, topic, route.GetString(), rawPayload, string.Empty, originalMetadataString));
|
||||
subscriptions.Add((pubsubName, topic, route.GetString(), rawPayload, string.Empty, originalMetadataString, deadLetterTopic));
|
||||
}
|
||||
else if (element.TryGetProperty("routes", out JsonElement routes))
|
||||
{
|
||||
|
@ -80,34 +88,35 @@ namespace Dapr.AspNetCore.IntegrationTest
|
|||
{
|
||||
var match = rule.GetProperty("match").GetString();
|
||||
var path = rule.GetProperty("path").GetString();
|
||||
subscriptions.Add((pubsubName, topic, path, rawPayload, match, originalMetadataString));
|
||||
subscriptions.Add((pubsubName, topic, path, rawPayload, match, originalMetadataString, deadLetterTopic));
|
||||
}
|
||||
}
|
||||
if (routes.TryGetProperty("default", out JsonElement defaultProperty))
|
||||
{
|
||||
subscriptions.Add((pubsubName, topic, defaultProperty.GetString(), rawPayload, string.Empty, originalMetadataString));
|
||||
subscriptions.Add((pubsubName, topic, defaultProperty.GetString(), rawPayload, string.Empty, originalMetadataString, deadLetterTopic));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
subscriptions.Should().Contain(("testpubsub", "A", "topic-a", string.Empty, string.Empty, string.Empty));
|
||||
subscriptions.Should().Contain(("testpubsub", "A.1", "topic-a", string.Empty, string.Empty, string.Empty));
|
||||
subscriptions.Should().Contain(("pubsub", "B", "B", string.Empty, string.Empty, string.Empty));
|
||||
subscriptions.Should().Contain(("custom-pubsub", "custom-C", "C", string.Empty, string.Empty, string.Empty));
|
||||
subscriptions.Should().Contain(("pubsub", "register-user", "register-user", string.Empty, string.Empty, string.Empty));
|
||||
subscriptions.Should().Contain(("pubsub", "register-user-plaintext", "register-user-plaintext", string.Empty, string.Empty, string.Empty));
|
||||
subscriptions.Should().Contain(("pubsub", "D", "D", "true", string.Empty, string.Empty));
|
||||
subscriptions.Should().Contain(("pubsub", "E", "E", string.Empty, string.Empty, string.Empty));
|
||||
subscriptions.Should().Contain(("pubsub", "E", "E-Critical", string.Empty, "event.type == \"critical\"", string.Empty));
|
||||
subscriptions.Should().Contain(("pubsub", "E", "E-Important", string.Empty, "event.type == \"important\"", string.Empty));
|
||||
subscriptions.Should().Contain(("pubsub", "F", "multiTopicAttr", string.Empty, string.Empty, string.Empty));
|
||||
subscriptions.Should().Contain(("pubsub", "F.1", "multiTopicAttr", "true", string.Empty, string.Empty));
|
||||
subscriptions.Should().Contain(("pubsub", "splitTopicBuilder", "splitTopics", string.Empty, string.Empty, string.Empty));
|
||||
subscriptions.Should().Contain(("pubsub", "splitTopicAttr", "splitTopics", "true", string.Empty, string.Empty));
|
||||
subscriptions.Should().Contain(("pubsub", "metadata", "multiMetadataTopicAttr", string.Empty, string.Empty, "n1=v1;n2=v2,v3"));
|
||||
subscriptions.Should().Contain(("pubsub", "metadata.1", "multiMetadataTopicAttr", "true", string.Empty, "n1=v1"));
|
||||
subscriptions.Should().Contain(("pubsub", "splitMetadataTopicBuilder", "splitMetadataTopics", string.Empty, string.Empty, "n1=v1;n2=v1"));
|
||||
subscriptions.Should().Contain(("pubsub", "metadataseparator", "topicmetadataseparatorattr", string.Empty, string.Empty, "n1=v1|v2"));
|
||||
subscriptions.Should().Contain(("testpubsub", "A", "topic-a", string.Empty, string.Empty, string.Empty, string.Empty));
|
||||
subscriptions.Should().Contain(("testpubsub", "A.1", "topic-a", string.Empty, string.Empty, string.Empty, string.Empty));
|
||||
subscriptions.Should().Contain(("pubsub", "B", "B", string.Empty, string.Empty, string.Empty, string.Empty));
|
||||
subscriptions.Should().Contain(("custom-pubsub", "custom-C", "C", string.Empty, string.Empty, string.Empty, string.Empty));
|
||||
subscriptions.Should().Contain(("pubsub", "register-user", "register-user", string.Empty, string.Empty, string.Empty, string.Empty));
|
||||
subscriptions.Should().Contain(("pubsub", "register-user-plaintext", "register-user-plaintext", string.Empty, string.Empty, string.Empty, string.Empty));
|
||||
subscriptions.Should().Contain(("pubsub", "D", "D", "true", string.Empty, string.Empty, string.Empty));
|
||||
subscriptions.Should().Contain(("pubsub", "E", "E", string.Empty, string.Empty, string.Empty, string.Empty));
|
||||
subscriptions.Should().Contain(("pubsub", "E", "E-Critical", string.Empty, "event.type == \"critical\"", string.Empty, string.Empty));
|
||||
subscriptions.Should().Contain(("pubsub", "E", "E-Important", string.Empty, "event.type == \"important\"", string.Empty, string.Empty));
|
||||
subscriptions.Should().Contain(("pubsub", "F", "multiTopicAttr", string.Empty, string.Empty, string.Empty, string.Empty));
|
||||
subscriptions.Should().Contain(("pubsub", "F.1", "multiTopicAttr", "true", string.Empty, string.Empty, string.Empty));
|
||||
subscriptions.Should().Contain(("pubsub", "G", "G", string.Empty, string.Empty, string.Empty, "deadLetterTopicName"));
|
||||
subscriptions.Should().Contain(("pubsub", "splitTopicBuilder", "splitTopics", string.Empty, string.Empty, string.Empty, string.Empty));
|
||||
subscriptions.Should().Contain(("pubsub", "splitTopicAttr", "splitTopics", "true", string.Empty, string.Empty, string.Empty));
|
||||
subscriptions.Should().Contain(("pubsub", "metadata", "multiMetadataTopicAttr", string.Empty, string.Empty, "n1=v1;n2=v2,v3", string.Empty));
|
||||
subscriptions.Should().Contain(("pubsub", "metadata.1", "multiMetadataTopicAttr", "true", string.Empty, "n1=v1", string.Empty));
|
||||
subscriptions.Should().Contain(("pubsub", "splitMetadataTopicBuilder", "splitMetadataTopics", string.Empty, string.Empty, "n1=v1;n2=v1", string.Empty));
|
||||
|
||||
// Test priority route sorting
|
||||
var eTopic = subscriptions.FindAll(e => e.Topic == "E");
|
||||
eTopic.Count.Should().Be(3);
|
||||
|
|
Loading…
Reference in New Issue