diff --git a/examples/AspNetCore/ControllerSample/Controllers/SampleController.cs b/examples/AspNetCore/ControllerSample/Controllers/SampleController.cs
index a8071778..7a5f8a75 100644
--- a/examples/AspNetCore/ControllerSample/Controllers/SampleController.cs
+++ b/examples/AspNetCore/ControllerSample/Controllers/SampleController.cs
@@ -64,18 +64,39 @@ namespace ControllerSample.Controllers
/// State client to interact with Dapr runtime.
/// A representing the result of the asynchronous operation.
/// "pubsub", the first parameter into the Topic attribute, is name of the default pub/sub configured by the Dapr CLI.
- [Topic("pubsub", "deposit")]
+ [Topic("pubsub", "deposit", "amountDeadLetterTopic", false)]
[HttpPost("deposit")]
public async Task> Deposit(Transaction transaction, [FromServices] DaprClient daprClient)
{
logger.LogDebug("Enter deposit");
var state = await daprClient.GetStateEntryAsync(StoreName, transaction.Id);
state.Value ??= new Account() { Id = transaction.Id, };
+ logger.LogDebug("Id is {0}, the amount to be deposited is {1}", transaction.Id, transaction.Amount);
+
+ if (transaction.Amount < 0m)
+ {
+ return BadRequest(new { statusCode = 400, message = "bad request" });
+ }
+
state.Value.Balance += transaction.Amount;
+ logger.LogDebug("Balance is {0}", state.Value.Balance);
await state.SaveAsync();
return state.Value;
}
+ ///
+ /// Method for viewing the error message when the deposit/withdrawal amounts
+ /// are negative.
+ ///
+ /// Transaction info.
+ [Topic("pubsub", "amountDeadLetterTopic")]
+ [HttpPost("deadLetterTopicRoute")]
+ public ActionResult ViewErrorMessage(Transaction transaction)
+ {
+ logger.LogDebug("The amount cannot be negative: {0}", transaction.Amount);
+ return Ok();
+ }
+
///
/// Method for withdrawing from account as specified in transaction.
///
@@ -83,19 +104,25 @@ namespace ControllerSample.Controllers
/// State client to interact with Dapr runtime.
/// A representing the result of the asynchronous operation.
/// "pubsub", the first parameter into the Topic attribute, is name of the default pub/sub configured by the Dapr CLI.
- [Topic("pubsub", "withdraw")]
+ [Topic("pubsub", "withdraw", "amountDeadLetterTopic", false)]
[HttpPost("withdraw")]
public async Task> Withdraw(Transaction transaction, [FromServices] DaprClient daprClient)
{
- logger.LogDebug("Enter withdraw");
+ logger.LogDebug("Enter withdraw method...");
var state = await daprClient.GetStateEntryAsync(StoreName, transaction.Id);
+ logger.LogDebug("Id is {0}, the amount to be withdrawn is {1}", transaction.Id, transaction.Amount);
if (state.Value == null)
{
return this.NotFound();
}
+ if (transaction.Amount < 0m)
+ {
+ return BadRequest(new { statusCode = 400, message = "bad request" });
+ }
state.Value.Balance -= transaction.Amount;
+ logger.LogDebug("Balance is {0}", state.Value.Balance);
await state.SaveAsync();
return state.Value;
}
@@ -134,7 +161,7 @@ namespace ControllerSample.Controllers
[HttpPost("throwException")]
public async Task> ThrowException(Transaction transaction, [FromServices] DaprClient daprClient)
{
- Console.WriteLine("Enter ThrowException");
+ logger.LogDebug("Enter ThrowException");
var task = Task.Delay(10);
await task;
return BadRequest(new { statusCode = 400, message = "bad request" });
diff --git a/examples/AspNetCore/ControllerSample/CustomTopicAttribute.cs b/examples/AspNetCore/ControllerSample/CustomTopicAttribute.cs
index 2017315e..96eb918f 100644
--- a/examples/AspNetCore/ControllerSample/CustomTopicAttribute.cs
+++ b/examples/AspNetCore/ControllerSample/CustomTopicAttribute.cs
@@ -1,4 +1,4 @@
-// ------------------------------------------------------------------------
+// ------------------------------------------------------------------------
// Copyright 2021 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
diff --git a/examples/AspNetCore/ControllerSample/README.md b/examples/AspNetCore/ControllerSample/README.md
index 0a727b70..bb6c448a 100644
--- a/examples/AspNetCore/ControllerSample/README.md
+++ b/examples/AspNetCore/ControllerSample/README.md
@@ -133,7 +133,32 @@ On Windows:
dapr publish --pubsub pubsub --publish-app-id controller -t deposit -d "{\"id\": \"17\", \"amount\": 15 }"
```
---
+**Dead Letter Topic example (pubsub)**
+Publish an event using the Dapr cli with an incorrect input, i.e. negative amount:
+Deposit:
+On Linux, MacOS:
+```sh
+dapr publish --pubsub pubsub --publish-app-id controller -t deposit -d '{"id": "17", "amount": -15 }'
+```
+On Windows:
+ ```sh
+ dapr publish --pubsub pubsub --publish-app-id controller -t deposit -d "{\"id\": \"17\", \"amount\": -15 }"
+```
+
+Withdraw:
+ On Linux, MacOS:
+```sh
+dapr publish --pubsub pubsub --publish-app-id controller -t withdraw -d '{"id": "17", "amount": -15 }'
+```
+On Windows:
+ ```sh
+ dapr publish --pubsub pubsub --publish-app-id controller -t withdraw -d "{\"id\": \"17\", \"amount\": -15 }"
+ ```
+
+First a message is sent from a publisher on a `deposit` or `withdraw` topic. Dapr receives the message on behalf of a subscriber application, however the `deposit` or `withdraw` topic message fails to be delivered to the `/deposit` or `/withdraw` endpoint on the application, even after retries. As a result of the failure to deliver, the message is forwarded to the `amountDeadLetterTopic` topic which delivers this to the `/deadLetterTopicRoute` endpoint.
+
+ ---
## Code Samples
*All of the interesting code in this sample is in Startup.cs and Controllers/SampleController.cs*
diff --git a/examples/AspNetCore/ControllerSample/Transaction.cs b/examples/AspNetCore/ControllerSample/Transaction.cs
index cf553a9c..2d6f5a5a 100644
--- a/examples/AspNetCore/ControllerSample/Transaction.cs
+++ b/examples/AspNetCore/ControllerSample/Transaction.cs
@@ -28,8 +28,7 @@ namespace ControllerSample
///
/// Gets or sets amount for the transaction.
- ///
- [Range(0, double.MaxValue)]
+ ///
`MapGet(...)` and `MapPost(...)` are provided by ASP.NET Core routing - these are used to setup endpoints to handle HTTP requests.
`WithTopic(...)` associates an endpoint with a pub/sub topic.
+```C#
+var depositTopicOptions = new TopicOptions();
+depositTopicOptions.PubsubName = PubsubName;
+depositTopicOptions.Name = "deposit";
+depositTopicOptions.DeadLetterTopic = "amountDeadLetterTopic";
+
+var withdrawTopicOptions = new TopicOptions();
+withdrawTopicOptions.PubsubName = PubsubName;
+withdrawTopicOptions.Name = "withdraw";
+withdrawTopicOptions.DeadLetterTopic = "amountDeadLetterTopic";
+```
+`WithTopic(...)` now takes the `TopicOptions(..)` instance that defines configurations for the subscribe endpoint.
---
diff --git a/examples/AspNetCore/RoutingSample/Startup.cs b/examples/AspNetCore/RoutingSample/Startup.cs
index 5ac1541a..bb8aad83 100644
--- a/examples/AspNetCore/RoutingSample/Startup.cs
+++ b/examples/AspNetCore/RoutingSample/Startup.cs
@@ -1,4 +1,4 @@
-// ------------------------------------------------------------------------
+// ------------------------------------------------------------------------
// Copyright 2021 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@@ -16,6 +16,7 @@ namespace RoutingSample
using System;
using System.Text.Json;
using System.Threading.Tasks;
+ using Dapr;
using Dapr.Client;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
@@ -23,6 +24,7 @@ namespace RoutingSample
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
+ using Microsoft.Extensions.Logging;
///
/// Startup class.
@@ -74,7 +76,8 @@ namespace RoutingSample
/// Application builder.
/// Webhost environment.
/// Options for JSON serialization.
- public void Configure(IApplicationBuilder app, IWebHostEnvironment env, JsonSerializerOptions serializerOptions)
+ public void Configure(IApplicationBuilder app, IWebHostEnvironment env, JsonSerializerOptions serializerOptions,
+ ILogger logger)
{
if (env.IsDevelopment())
{
@@ -89,27 +92,38 @@ namespace RoutingSample
{
endpoints.MapSubscribeHandler();
+ var depositTopicOptions = new TopicOptions();
+ depositTopicOptions.PubsubName = PubsubName;
+ depositTopicOptions.Name = "deposit";
+ depositTopicOptions.DeadLetterTopic = "amountDeadLetterTopic";
+
+ var withdrawTopicOptions = new TopicOptions();
+ withdrawTopicOptions.PubsubName = PubsubName;
+ withdrawTopicOptions.Name = "withdraw";
+ withdrawTopicOptions.DeadLetterTopic = "amountDeadLetterTopic";
+
endpoints.MapGet("{id}", Balance);
- endpoints.MapPost("deposit", Deposit).WithTopic(PubsubName, "deposit");
- endpoints.MapPost("withdraw", Withdraw).WithTopic(PubsubName, "withdraw");
+ endpoints.MapPost("deposit", Deposit).WithTopic(depositTopicOptions);
+ endpoints.MapPost("deadLetterTopicRoute", ViewErrorMessage).WithTopic(PubsubName, "amountDeadLetterTopic");
+ endpoints.MapPost("withdraw", Withdraw).WithTopic(withdrawTopicOptions);
});
async Task Balance(HttpContext context)
{
- Console.WriteLine("Enter Balance");
+ logger.LogDebug("Enter Balance");
var client = context.RequestServices.GetRequiredService();
var id = (string)context.Request.RouteValues["id"];
- Console.WriteLine("id is {0}", id);
+ logger.LogDebug("id is {0}", id);
var account = await client.GetStateAsync(StoreName, id);
if (account == null)
{
- Console.WriteLine("Account not found");
+ logger.LogDebug("Account not found");
context.Response.StatusCode = 404;
return;
}
- Console.WriteLine("Account balance is {0}", account.Balance);
+ logger.LogDebug("Account balance is {0}", account.Balance);
context.Response.ContentType = "application/json";
await JsonSerializer.SerializeAsync(context.Response.Body, account, serializerOptions);
@@ -117,12 +131,13 @@ namespace RoutingSample
async Task Deposit(HttpContext context)
{
- Console.WriteLine("Enter Deposit");
-
- var client = context.RequestServices.GetRequiredService();
+ logger.LogDebug("Enter Deposit");
+ var client = context.RequestServices.GetRequiredService();
var transaction = await JsonSerializer.DeserializeAsync(context.Request.Body, serializerOptions);
- Console.WriteLine("Id is {0}, Amount is {1}", transaction.Id, transaction.Amount);
+
+ logger.LogDebug("Id is {0}, Amount is {1}", transaction.Id, transaction.Amount);
+
var account = await client.GetStateAsync(StoreName, transaction.Id);
if (account == null)
{
@@ -131,43 +146,56 @@ namespace RoutingSample
if (transaction.Amount < 0m)
{
- Console.WriteLine("Invalid amount");
+ logger.LogDebug("Invalid amount");
context.Response.StatusCode = 400;
return;
}
account.Balance += transaction.Amount;
await client.SaveStateAsync(StoreName, transaction.Id, account);
- Console.WriteLine("Balance is {0}", account.Balance);
+ logger.LogDebug("Balance is {0}", account.Balance);
context.Response.ContentType = "application/json";
await JsonSerializer.SerializeAsync(context.Response.Body, account, serializerOptions);
}
- async Task Withdraw(HttpContext context)
+ async Task ViewErrorMessage(HttpContext context)
{
- Console.WriteLine("Enter Withdraw");
var client = context.RequestServices.GetRequiredService();
var transaction = await JsonSerializer.DeserializeAsync(context.Request.Body, serializerOptions);
- Console.WriteLine("Id is {0}", transaction.Id);
+
+ logger.LogDebug("The amount cannot be negative: {0}", transaction.Amount);
+
+ return;
+ }
+
+ async Task Withdraw(HttpContext context)
+ {
+ logger.LogDebug("Enter Withdraw");
+
+ var client = context.RequestServices.GetRequiredService();
+ var transaction = await JsonSerializer.DeserializeAsync(context.Request.Body, serializerOptions);
+
+ logger.LogDebug("Id is {0}, Amount is {1}", transaction.Id, transaction.Amount);
+
var account = await client.GetStateAsync(StoreName, transaction.Id);
if (account == null)
{
- Console.WriteLine("Account not found");
+ logger.LogDebug("Account not found");
context.Response.StatusCode = 404;
return;
}
if (transaction.Amount < 0m)
{
- Console.WriteLine("Invalid amount");
+ logger.LogDebug("Invalid amount");
context.Response.StatusCode = 400;
return;
}
account.Balance -= transaction.Amount;
await client.SaveStateAsync(StoreName, transaction.Id, account);
- Console.WriteLine("Balance is {0}", account.Balance);
+ logger.LogDebug("Balance is {0}", account.Balance);
context.Response.ContentType = "application/json";
await JsonSerializer.SerializeAsync(context.Response.Body, account, serializerOptions);
diff --git a/src/Dapr.AspNetCore/DaprEndpointConventionBuilderExtensions.cs b/src/Dapr.AspNetCore/DaprEndpointConventionBuilderExtensions.cs
index b29b8150..af8440e3 100644
--- a/src/Dapr.AspNetCore/DaprEndpointConventionBuilderExtensions.cs
+++ b/src/Dapr.AspNetCore/DaprEndpointConventionBuilderExtensions.cs
@@ -15,7 +15,10 @@ namespace Microsoft.AspNetCore.Builder
{
using System;
using System.Collections.Generic;
+ using System.Reflection;
+ using System.Xml.Linq;
using Dapr;
+ using Grpc.Core;
///
/// Contains extension methods for .
@@ -103,5 +106,43 @@ namespace Microsoft.AspNetCore.Builder
}
return builder;
}
+
+ ///
+ /// Adds metadata to the provided .
+ ///
+ /// The .\
+ /// The object of TopicOptions class that provides all topic attributes.
+ /// The type.
+ /// The builder object.
+ public static T WithTopic(this T builder, TopicOptions topicOptions)
+ where T : IEndpointConventionBuilder
+ {
+ if (builder is null)
+ {
+ throw new ArgumentNullException(nameof(builder));
+ }
+
+ ArgumentVerifier.ThrowIfNullOrEmpty(topicOptions.PubsubName, nameof(topicOptions.PubsubName));
+ ArgumentVerifier.ThrowIfNullOrEmpty(topicOptions.Name, nameof(topicOptions.Name));
+
+ var topicObject = new TopicAttribute(topicOptions.PubsubName, topicOptions.Name, topicOptions.DeadLetterTopic, topicOptions.EnableRawPayload);
+
+ topicObject.Match = topicOptions.Match;
+ topicObject.Priority = topicOptions.Priority;
+ topicObject.OwnedMetadatas = topicOptions.OwnedMetadatas;
+ topicObject.MetadataSeparator = topicObject.MetadataSeparator;
+
+ if (topicOptions.Metadata is not null)
+ {
+ foreach (var md in topicOptions.Metadata)
+ {
+ builder.WithMetadata(new TopicMetadataAttribute(md.Key, md.Value));
+ }
+ }
+
+ builder.WithMetadata(topicObject);
+
+ return builder;
+ }
}
}
diff --git a/src/Dapr.AspNetCore/DaprEndpointRouteBuilderExtensions.cs b/src/Dapr.AspNetCore/DaprEndpointRouteBuilderExtensions.cs
index 558e415c..2ad7ac74 100644
--- a/src/Dapr.AspNetCore/DaprEndpointRouteBuilderExtensions.cs
+++ b/src/Dapr.AspNetCore/DaprEndpointRouteBuilderExtensions.cs
@@ -72,12 +72,13 @@ namespace Microsoft.AspNetCore.Builder
var topicMetadata = e.Metadata.GetOrderedMetadata();
var originalTopicMetadata = e.Metadata.GetOrderedMetadata();
- var subs = new List<(string PubsubName, string Name, bool? EnableRawPayload, string Match, int Priority, Dictionary OriginalTopicMetadata, string MetadataSeparator, RoutePattern RoutePattern)>();
+ var subs = new List<(string PubsubName, string Name, string DeadLetterTopic, bool? EnableRawPayload, string Match, int Priority, Dictionary OriginalTopicMetadata, string MetadataSeparator, RoutePattern RoutePattern)>();
for (int i = 0; i < topicMetadata.Count(); i++)
{
subs.Add((topicMetadata[i].PubsubName,
topicMetadata[i].Name,
+ (topicMetadata[i] as IDeadLetterTopicMetadata)?.DeadLetterTopic,
(topicMetadata[i] as IRawTopicMetadata)?.EnableRawPayload,
topicMetadata[i].Match,
topicMetadata[i].Priority,
@@ -133,6 +134,11 @@ namespace Microsoft.AspNetCore.Builder
Metadata = metadata.Count > 0 ? metadata : null,
};
+ if (first.DeadLetterTopic != null)
+ {
+ subscription.DeadLetterTopic = first.DeadLetterTopic;
+ }
+
// Use the V2 routing rules structure
if (rules.Count > 0)
{
diff --git a/src/Dapr.AspNetCore/IDeadLetterTopicMetadata.cs b/src/Dapr.AspNetCore/IDeadLetterTopicMetadata.cs
new file mode 100644
index 00000000..97707b98
--- /dev/null
+++ b/src/Dapr.AspNetCore/IDeadLetterTopicMetadata.cs
@@ -0,0 +1,27 @@
+// ------------------------------------------------------------------------
+// Copyright 2021 The Dapr Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+// http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ------------------------------------------------------------------------
+
+namespace Dapr
+{
+ ///
+ /// IDeadLetterTopicMetadata that describes the metadata of a dead letter topic.
+ ///
+ public interface IDeadLetterTopicMetadata
+ {
+ ///
+ /// Gets the dead letter topic name
+ ///
+ public string DeadLetterTopic { get; }
+ }
+}
+
diff --git a/src/Dapr.AspNetCore/ITopicMetadata.cs b/src/Dapr.AspNetCore/ITopicMetadata.cs
index 4f297470..eb373213 100644
--- a/src/Dapr.AspNetCore/ITopicMetadata.cs
+++ b/src/Dapr.AspNetCore/ITopicMetadata.cs
@@ -1,4 +1,4 @@
-// ------------------------------------------------------------------------
+// ------------------------------------------------------------------------
// Copyright 2021 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
diff --git a/src/Dapr.AspNetCore/Subscription.cs b/src/Dapr.AspNetCore/Subscription.cs
index 54e41956..1c3177b9 100644
--- a/src/Dapr.AspNetCore/Subscription.cs
+++ b/src/Dapr.AspNetCore/Subscription.cs
@@ -44,6 +44,11 @@ namespace Dapr
/// Gets or sets the metadata.
///
public Metadata Metadata { get; set; }
+
+ ///
+ /// Gets or sets the deadletter topic.
+ ///
+ public string DeadLetterTopic { get; set; }
}
///
diff --git a/src/Dapr.AspNetCore/TopicAttribute.cs b/src/Dapr.AspNetCore/TopicAttribute.cs
index 770bfe18..daa5d850 100644
--- a/src/Dapr.AspNetCore/TopicAttribute.cs
+++ b/src/Dapr.AspNetCore/TopicAttribute.cs
@@ -19,7 +19,7 @@ namespace Dapr
/// TopicAttribute describes an endpoint as a subscriber to a topic.
///
[AttributeUsage(AttributeTargets.All, AllowMultiple = true)]
- public class TopicAttribute : Attribute, ITopicMetadata, IRawTopicMetadata, IOwnedOriginalTopicMetadata
+ public class TopicAttribute : Attribute, ITopicMetadata, IRawTopicMetadata, IOwnedOriginalTopicMetadata, IDeadLetterTopicMetadata
{
///
/// Initializes a new instance of the class.
@@ -105,25 +105,50 @@ namespace Dapr
this.MetadataSeparator = metadataSeparator;
}
- ///
- public string Name { get; }
+ ///
+ /// Initializes a new instance of the class.
+ ///
+ /// The name of the pubsub component to use.
+ /// The topic name.
+ /// The dead letter topic name.
+ /// The enable/disable raw pay load flag.
+ /// The topic owned metadata ids.
+ /// Separator to use for metadata.
+ public TopicAttribute(string pubsubName, string name, string deadLetterTopic, bool enableRawPayload, string[] ownedMetadatas = null, string metadataSeparator = null)
+ {
+ ArgumentVerifier.ThrowIfNullOrEmpty(pubsubName, nameof(pubsubName));
+ ArgumentVerifier.ThrowIfNullOrEmpty(name, nameof(name));
+
+ this.Name = name;
+ this.PubsubName = pubsubName;
+ this.DeadLetterTopic = deadLetterTopic;
+ this.EnableRawPayload = enableRawPayload;
+ this.OwnedMetadatas = ownedMetadatas;
+ this.MetadataSeparator = metadataSeparator;
+ }
///
- public string PubsubName { get; }
+ public string Name { get; set; }
///
- public bool? EnableRawPayload { get; }
+ public string PubsubName { get; set; }
///
- public new string Match { get; }
+ public bool? EnableRawPayload { get; set; }
///
- public int Priority { get; }
+ public new string Match { get; set; }
///
- public string[] OwnedMetadatas { get; }
+ public int Priority { get; set; }
///
- public string MetadataSeparator { get; }
+ public string[] OwnedMetadatas { get; set; }
+
+ ///
+ public string MetadataSeparator { get; set; }
+
+ ///
+ public string DeadLetterTopic { get; set; }
}
}
diff --git a/src/Dapr.AspNetCore/TopicOptions.cs b/src/Dapr.AspNetCore/TopicOptions.cs
new file mode 100644
index 00000000..2ca2eea4
--- /dev/null
+++ b/src/Dapr.AspNetCore/TopicOptions.cs
@@ -0,0 +1,67 @@
+// ------------------------------------------------------------------------
+// Copyright 2022 The Dapr Authors
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+// http://www.apache.org/licenses/LICENSE-2.0
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ------------------------------------------------------------------------
+
+namespace Dapr
+{
+ using System.Collections.Generic;
+ ///
+ /// This class defines configurations for the subscribe endpoint.
+ ///
+ public class TopicOptions
+ {
+ ///
+ /// Gets or Sets the topic name.
+ ///
+ public string Name { get; set; }
+
+ ///
+ /// Gets or Sets the name of the pubsub component to use.
+ ///
+ public string PubsubName { get; set; }
+
+ ///
+ /// Gets or Sets a value which indicates whether to enable or disable processing raw messages.
+ ///
+ public bool EnableRawPayload { get; set; }
+
+ ///
+ /// Gets or Sets the CEL expression to use to match events for this handler.
+ ///
+ public string Match { get; set; }
+
+ ///
+ /// Gets or Sets the priority in which this rule should be evaluated (lower to higher).
+ ///
+ public int Priority { get; set; }
+
+ ///
+ /// Gets or Sets the owned by topic.
+ ///
+ public string[] OwnedMetadatas { get; set; }
+
+ ///
+ /// Get or Sets the separator to use for metadata.
+ ///
+ public string MetadataSeparator { get; set; }
+
+ ///
+ /// Gets or Sets the dead letter topic.
+ ///
+ public string DeadLetterTopic { get; set; }
+
+ ///
+ /// Gets or Sets the original topic metadata.
+ ///
+ public IDictionary Metadata;
+ }
+}
diff --git a/src/Dapr.Client/Protos/dapr/proto/dapr/v1/appcallback.proto b/src/Dapr.Client/Protos/dapr/proto/dapr/v1/appcallback.proto
index 02436dae..bfbb4d79 100644
--- a/src/Dapr.Client/Protos/dapr/proto/dapr/v1/appcallback.proto
+++ b/src/Dapr.Client/Protos/dapr/proto/dapr/v1/appcallback.proto
@@ -160,6 +160,9 @@ message TopicSubscription {
// The optional routing rules to match against. In the gRPC interface, OnTopicEvent
// is still invoked but the matching path is sent in the TopicEventRequest.
TopicRoutes routes = 5;
+
+ // The optional dead letter queue for this topic to send events to.
+ string dead_letter_topic = 6;
}
message TopicRoutes {
diff --git a/test/Dapr.AspNetCore.IntegrationTest.App/CustomTopicAttribute.cs b/test/Dapr.AspNetCore.IntegrationTest.App/CustomTopicAttribute.cs
index e0d5df8e..6f65f060 100644
--- a/test/Dapr.AspNetCore.IntegrationTest.App/CustomTopicAttribute.cs
+++ b/test/Dapr.AspNetCore.IntegrationTest.App/CustomTopicAttribute.cs
@@ -1,4 +1,4 @@
-// ------------------------------------------------------------------------
+// ------------------------------------------------------------------------
// Copyright 2021 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
diff --git a/test/Dapr.AspNetCore.IntegrationTest.App/DaprController.cs b/test/Dapr.AspNetCore.IntegrationTest.App/DaprController.cs
index 52e78687..733ee0b7 100644
--- a/test/Dapr.AspNetCore.IntegrationTest.App/DaprController.cs
+++ b/test/Dapr.AspNetCore.IntegrationTest.App/DaprController.cs
@@ -67,6 +67,12 @@ namespace Dapr.AspNetCore.IntegrationTest.App
public void MultipleTopics()
{
}
+
+ [Topic("pubsub", "G", "deadLetterTopicName", false)]
+ [HttpPost("/G")]
+ public void TopicG()
+ {
+ }
[Topic("pubsub", "metadata", new string[1] { "id1" })]
[Topic("pubsub", "metadata.1", true)]
diff --git a/test/Dapr.AspNetCore.IntegrationTest/SubscribeEndpointTest.cs b/test/Dapr.AspNetCore.IntegrationTest/SubscribeEndpointTest.cs
index 048018a6..62a66898 100644
--- a/test/Dapr.AspNetCore.IntegrationTest/SubscribeEndpointTest.cs
+++ b/test/Dapr.AspNetCore.IntegrationTest/SubscribeEndpointTest.cs
@@ -39,14 +39,22 @@ namespace Dapr.AspNetCore.IntegrationTest
var json = await JsonSerializer.DeserializeAsync(stream);
json.ValueKind.Should().Be(JsonValueKind.Array);
- json.GetArrayLength().Should().Be(16);
- var subscriptions = new List<(string PubsubName, string Topic, string Route, string rawPayload, string match, string metadata)>();
+ json.GetArrayLength().Should().Be(17);
+
+ var subscriptions = new List<(string PubsubName, string Topic, string Route, string rawPayload, string match, string metadata, string DeadLetterTopic)>();
+
foreach (var element in json.EnumerateArray())
{
var pubsubName = element.GetProperty("pubsubName").GetString();
var topic = element.GetProperty("topic").GetString();
var rawPayload = string.Empty;
+ var deadLetterTopic = string.Empty;
Dictionary originalMetadata = new Dictionary();
+
+ if (element.TryGetProperty("deadLetterTopic", out JsonElement DeadLetterTopic))
+ {
+ deadLetterTopic = DeadLetterTopic.GetString();
+ }
if (element.TryGetProperty("metadata", out JsonElement metadata))
{
if (metadata.TryGetProperty("rawPayload", out JsonElement rawPayloadJson))
@@ -70,7 +78,7 @@ namespace Dapr.AspNetCore.IntegrationTest
if (element.TryGetProperty("route", out JsonElement route))
{
- subscriptions.Add((pubsubName, topic, route.GetString(), rawPayload, string.Empty, originalMetadataString));
+ subscriptions.Add((pubsubName, topic, route.GetString(), rawPayload, string.Empty, originalMetadataString, deadLetterTopic));
}
else if (element.TryGetProperty("routes", out JsonElement routes))
{
@@ -80,34 +88,35 @@ namespace Dapr.AspNetCore.IntegrationTest
{
var match = rule.GetProperty("match").GetString();
var path = rule.GetProperty("path").GetString();
- subscriptions.Add((pubsubName, topic, path, rawPayload, match, originalMetadataString));
+ subscriptions.Add((pubsubName, topic, path, rawPayload, match, originalMetadataString, deadLetterTopic));
}
}
if (routes.TryGetProperty("default", out JsonElement defaultProperty))
{
- subscriptions.Add((pubsubName, topic, defaultProperty.GetString(), rawPayload, string.Empty, originalMetadataString));
+ subscriptions.Add((pubsubName, topic, defaultProperty.GetString(), rawPayload, string.Empty, originalMetadataString, deadLetterTopic));
}
}
}
- subscriptions.Should().Contain(("testpubsub", "A", "topic-a", string.Empty, string.Empty, string.Empty));
- subscriptions.Should().Contain(("testpubsub", "A.1", "topic-a", string.Empty, string.Empty, string.Empty));
- subscriptions.Should().Contain(("pubsub", "B", "B", string.Empty, string.Empty, string.Empty));
- subscriptions.Should().Contain(("custom-pubsub", "custom-C", "C", string.Empty, string.Empty, string.Empty));
- subscriptions.Should().Contain(("pubsub", "register-user", "register-user", string.Empty, string.Empty, string.Empty));
- subscriptions.Should().Contain(("pubsub", "register-user-plaintext", "register-user-plaintext", string.Empty, string.Empty, string.Empty));
- subscriptions.Should().Contain(("pubsub", "D", "D", "true", string.Empty, string.Empty));
- subscriptions.Should().Contain(("pubsub", "E", "E", string.Empty, string.Empty, string.Empty));
- subscriptions.Should().Contain(("pubsub", "E", "E-Critical", string.Empty, "event.type == \"critical\"", string.Empty));
- subscriptions.Should().Contain(("pubsub", "E", "E-Important", string.Empty, "event.type == \"important\"", string.Empty));
- subscriptions.Should().Contain(("pubsub", "F", "multiTopicAttr", string.Empty, string.Empty, string.Empty));
- subscriptions.Should().Contain(("pubsub", "F.1", "multiTopicAttr", "true", string.Empty, string.Empty));
- subscriptions.Should().Contain(("pubsub", "splitTopicBuilder", "splitTopics", string.Empty, string.Empty, string.Empty));
- subscriptions.Should().Contain(("pubsub", "splitTopicAttr", "splitTopics", "true", string.Empty, string.Empty));
- subscriptions.Should().Contain(("pubsub", "metadata", "multiMetadataTopicAttr", string.Empty, string.Empty, "n1=v1;n2=v2,v3"));
- subscriptions.Should().Contain(("pubsub", "metadata.1", "multiMetadataTopicAttr", "true", string.Empty, "n1=v1"));
- subscriptions.Should().Contain(("pubsub", "splitMetadataTopicBuilder", "splitMetadataTopics", string.Empty, string.Empty, "n1=v1;n2=v1"));
- subscriptions.Should().Contain(("pubsub", "metadataseparator", "topicmetadataseparatorattr", string.Empty, string.Empty, "n1=v1|v2"));
+ subscriptions.Should().Contain(("testpubsub", "A", "topic-a", string.Empty, string.Empty, string.Empty, string.Empty));
+ subscriptions.Should().Contain(("testpubsub", "A.1", "topic-a", string.Empty, string.Empty, string.Empty, string.Empty));
+ subscriptions.Should().Contain(("pubsub", "B", "B", string.Empty, string.Empty, string.Empty, string.Empty));
+ subscriptions.Should().Contain(("custom-pubsub", "custom-C", "C", string.Empty, string.Empty, string.Empty, string.Empty));
+ subscriptions.Should().Contain(("pubsub", "register-user", "register-user", string.Empty, string.Empty, string.Empty, string.Empty));
+ subscriptions.Should().Contain(("pubsub", "register-user-plaintext", "register-user-plaintext", string.Empty, string.Empty, string.Empty, string.Empty));
+ subscriptions.Should().Contain(("pubsub", "D", "D", "true", string.Empty, string.Empty, string.Empty));
+ subscriptions.Should().Contain(("pubsub", "E", "E", string.Empty, string.Empty, string.Empty, string.Empty));
+ subscriptions.Should().Contain(("pubsub", "E", "E-Critical", string.Empty, "event.type == \"critical\"", string.Empty, string.Empty));
+ subscriptions.Should().Contain(("pubsub", "E", "E-Important", string.Empty, "event.type == \"important\"", string.Empty, string.Empty));
+ subscriptions.Should().Contain(("pubsub", "F", "multiTopicAttr", string.Empty, string.Empty, string.Empty, string.Empty));
+ subscriptions.Should().Contain(("pubsub", "F.1", "multiTopicAttr", "true", string.Empty, string.Empty, string.Empty));
+ subscriptions.Should().Contain(("pubsub", "G", "G", string.Empty, string.Empty, string.Empty, "deadLetterTopicName"));
+ subscriptions.Should().Contain(("pubsub", "splitTopicBuilder", "splitTopics", string.Empty, string.Empty, string.Empty, string.Empty));
+ subscriptions.Should().Contain(("pubsub", "splitTopicAttr", "splitTopics", "true", string.Empty, string.Empty, string.Empty));
+ subscriptions.Should().Contain(("pubsub", "metadata", "multiMetadataTopicAttr", string.Empty, string.Empty, "n1=v1;n2=v2,v3", string.Empty));
+ subscriptions.Should().Contain(("pubsub", "metadata.1", "multiMetadataTopicAttr", "true", string.Empty, "n1=v1", string.Empty));
+ subscriptions.Should().Contain(("pubsub", "splitMetadataTopicBuilder", "splitMetadataTopics", string.Empty, string.Empty, "n1=v1;n2=v1", string.Empty));
+
// Test priority route sorting
var eTopic = subscriptions.FindAll(e => e.Topic == "E");
eTopic.Count.Should().Be(3);