Merging changes from 1.10 to master (#1047)

* Update DurableTask SDK dependency to get ARM64 compatibility (#1024) (#1025)

* Update DurableTask SDK dependency to get ARM64 compatibility

* Fix issue with gRPC address override behavior

Signed-off-by: Chris Gillum <cgillum@microsoft.com>

* Initial Bulk Subscribe functionality (#1009)

Signed-off-by: Yash Nisar <yashnisar@microsoft.com>

* Workflow unit testing changes for 1.10 release (#1038)

Signed-off-by: Chris Gillum <cgillum@microsoft.com>

* Fix issue with gRPC address override behavior

Signed-off-by: Chris Gillum <cgillum@microsoft.com>

* Workflow SDK changes to enable unit testing

Signed-off-by: Chris Gillum <cgillum@microsoft.com>
This commit is contained in:
Yash Nisar 2023-02-27 16:42:32 -06:00 committed by GitHub
parent 45e6e43388
commit f42b690f4c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 852 additions and 63 deletions

View File

@ -14,10 +14,12 @@
namespace ControllerSample.Controllers
{
using System;
using System.Collections.Generic;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
using Dapr;
using Dapr.AspNetCore;
using Dapr.Client;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Extensions.Logging;
@ -86,6 +88,51 @@ namespace ControllerSample.Controllers
return state.Value;
}
/// <summary>
/// Method for depositing multiple times to the account as specified in transaction.
/// </summary>
/// <param name="bulkMessage">List of entries of type BulkMessageModel received from dapr.</param>
/// <param name="daprClient">State client to interact with Dapr runtime.</param>
/// <returns>A <see cref="Task{TResult}"/> representing the result of the asynchronous operation.</returns>
/// "pubsub", the first parameter into the Topic attribute, is name of the default pub/sub configured by the Dapr CLI.
[Topic("pubsub", "multideposit", "amountDeadLetterTopic", false)]
[BulkSubscribe("multideposit", 500, 2000)]
[HttpPost("multideposit")]
public async Task<ActionResult<BulkSubscribeAppResponse>> MultiDeposit([FromBody] BulkSubscribeMessage<BulkMessageModel<Transaction>>
bulkMessage, [FromServices] DaprClient daprClient)
{
logger.LogInformation("Enter bulk deposit");
List<BulkSubscribeAppResponseEntry> entries = new List<BulkSubscribeAppResponseEntry>();
foreach (var entry in bulkMessage.Entries)
{
try
{
var transaction = entry.Event.Data;
var state = await daprClient.GetStateEntryAsync<Account>(StoreName, transaction.Id);
state.Value ??= new Account() { Id = transaction.Id, };
logger.LogInformation("Id is {0}, the amount to be deposited is {1}",
transaction.Id, transaction.Amount);
if (transaction.Amount < 0m)
{
return BadRequest(new { statusCode = 400, message = "bad request" });
}
state.Value.Balance += transaction.Amount;
logger.LogInformation("Balance is {0}", state.Value.Balance);
await state.SaveAsync();
entries.Add(new BulkSubscribeAppResponseEntry(entry.EntryId, BulkSubscribeAppResponseStatus.SUCCESS));
} catch (Exception e) {
logger.LogError(e.Message);
entries.Add(new BulkSubscribeAppResponseEntry(entry.EntryId, BulkSubscribeAppResponseStatus.RETRY));
}
}
return new BulkSubscribeAppResponse(entries);
}
/// <summary>
/// Method for viewing the error message when the deposit/withdrawal amounts
/// are negative.
@ -190,6 +237,7 @@ namespace ControllerSample.Controllers
/// <summary>
/// Method for returning a BadRequest result which will cause Dapr sidecar to throw an RpcException
/// </summary>
[HttpPost("throwException")]
public async Task<ActionResult<Account>> ThrowException(Transaction transaction, [FromServices] DaprClient daprClient)
{

View File

@ -5,9 +5,10 @@ This sample shows using Dapr with ASP.NET Core controllers. This application is
It exposes the following endpoints over HTTP:
- GET `/{account}`: Get the balance for the account specified by `id`
- POST `/deposit`: Accepts a JSON payload to deposit money to an account
- POST `/multideposit`: Accepts a JSON payload to deposit money multiple times to a bulk subscribed topic
- POST `/withdraw`: Accepts a JSON payload to withdraw money from an account
The application also registers for pub/sub with the `deposit` and `withdraw` topics.
The application also registers for pub/sub with the `deposit`, `multideposit` and `withdraw` topics.
## Prerequisitess
@ -57,7 +58,76 @@ Output:
```
---
**Deposit Money multiple times to a bulk subscribed topic**
On Linux, MacOS:
```
curl -X POST http://127.0.0.1:5000/multideposit \
-H 'Content-Type: application/json' \
-d '{
"entries":[
{
"entryId":"653dd9f5-f375-499b-8b2a-c4599bbd36b0",
"event":{
"data":{
"amount":10,
"id":"17"
},
"datacontenttype":"application/json",
"id":"DaprClient",
"pubsubname":"pubsub",
"source":"Dapr",
"specversion":"1.0",
"topic":"multideposit",
"type":"com.dapr.event.sent"
},
"metadata":null,
"contentType":"application/cloudevents+json"
},
{
"entryId":"7ea8191e-1e62-46d0-9ba8-ff6e571351cc",
"event":{
"data":{
"amount":20,
"id":"17"
},
"datacontenttype":"application/json",
"id":"DaprClient",
"pubsubname":"pubsub",
"source":"Dapr",
"specversion":"1.0",
"topic":"multideposit",
"type":"com.dapr.event.sent"
},
"metadata":null,
"contentType":"application/cloudevents+json"
}
],
"id":"fa68c580-1b96-40d3-aa2c-04bab05e954e",
"metadata":{
"pubsubName":"pubsub"
},
"pubsubname":"pubsub",
"topic":"multideposit",
"type":"com.dapr.event.sent.bulk"
}'
```
Output:
```
{
"statuses":[
{
"entryId":"653dd9f5-f375-499b-8b2a-c4599bbd36b0",
"status":"SUCCESS"
},
{
"entryId":"7ea8191e-1e62-46d0-9ba8-ff6e571351cc",
"status":"SUCCESS"
}
]
}
```
---
**Withdraw Money**
On Linux, MacOS:
```sh
@ -213,6 +283,20 @@ public async Task<ActionResult<Account>> Deposit(...)
`[Topic(...)]` associates a pub/sub named `pubsub` (this is the default configured by the Dapr CLI) pub/sub topic `deposit` with this endpoint.
---
```C#
[Topic("pubsub", "multideposit", "amountDeadLetterTopic", false)]
[BulkSubscribe("multideposit")]
[HttpPost("multideposit")]
public async Task<ActionResult<BulkSubscribeAppResponse>> MultiDeposit([FromBody] BulkSubscribeMessage<BulkMessageModel<Transaction>>
bulkMessage, [FromServices] DaprClient daprClient)
```
`[BulkSubscribe(...)]` associates a topic with the name mentioned in the attribute with the ability to be bulk subscribed to. It can take additional parameters like `MaxMessagesCount` and `MaxAwaitDurationMs`.
If those parameters are not supplied, the defaults of 100 and 1000ms are set.
However, you need to use `BulkSubscribeMessage<BulkMessageModel<T>>` in the input and that you need to return the `BulkSubscribeAppResponse` as well.
---
```C#

View File

@ -3,11 +3,12 @@
This sample shows using Dapr with ASP.NET Core routing. This application is a simple and not-so-secure banking application. The application uses the Dapr state-store for its data storage.
It exposes the following endpoints over HTTP:
- GET `/{id}`: Get the balance for the account specified by `id`
- POST `/deposit`: Accepts a JSON payload to deposit money to an account
- POST `/withdraw`: Accepts a JSON payload to withdraw money from an account
- GET `/{id}`: Get the balance for the account specified by `id`
- POST `/deposit`: Accepts a JSON payload to deposit money to an account
- POST `/multideposit`: Accepts a JSON payload to deposit money multiple times to a bulk subscribed topic
- POST `/withdraw`: Accepts a JSON payload to withdraw money from an account
The application also registers for pub/sub with the `deposit` and `withdraw` topics.
The application also registers for pub/sub with the `deposit`, `multideposit`, and `withdraw` topics.
## Prerequisites
@ -56,6 +57,76 @@ Output:
```
---
**Deposit Money multiple times to a bulk subscribed topic**
On Linux, MacOS:
```
curl -X POST http://127.0.0.1:5000/multideposit \
-H 'Content-Type: application/json' \
-d '{
"entries":[
{
"entryId":"653dd9f5-f375-499b-8b2a-c4599bbd36b0",
"event":{
"data":{
"amount":10,
"id":"17"
},
"datacontenttype":"application/json",
"id":"DaprClient",
"pubsubname":"pubsub",
"source":"Dapr",
"specversion":"1.0",
"topic":"multideposit",
"type":"com.dapr.event.sent"
},
"metadata":null,
"contentType":"application/cloudevents+json"
},
{
"entryId":"7ea8191e-1e62-46d0-9ba8-ff6e571351cc",
"event":{
"data":{
"amount":20,
"id":"17"
},
"datacontenttype":"application/json",
"id":"DaprClient",
"pubsubname":"pubsub",
"source":"Dapr",
"specversion":"1.0",
"topic":"multideposit",
"type":"com.dapr.event.sent"
},
"metadata":null,
"contentType":"application/cloudevents+json"
}
],
"id":"fa68c580-1b96-40d3-aa2c-04bab05e954e",
"metadata":{
"pubsubName":"pubsub"
},
"pubsubname":"pubsub",
"topic":"multideposit",
"type":"com.dapr.event.sent.bulk"
}'
```
Output:
```
{
"statuses":[
{
"entryId":"653dd9f5-f375-499b-8b2a-c4599bbd36b0",
"status":"SUCCESS"
},
{
"entryId":"7ea8191e-1e62-46d0-9ba8-ff6e571351cc",
"status":"SUCCESS"
}
]
}
```
---
**Withdraw Money**
On Linux, MacOS:
@ -194,6 +265,7 @@ app.UseEndpoints(endpoints =>
endpoints.MapGet("{id}", Balance);
endpoints.MapPost("deposit", Deposit).WithTopic(PubsubName, "deposit");
endpoints.MapPost("multideposit", MultiDeposit).WithTopic(multiDepositTopicOptions).WithBulkSubscribe(bulkSubscribeTopicOptions);
endpoints.MapPost("withdraw", Withdraw).WithTopic(PubsubName, "withdraw");
});
```
@ -213,9 +285,19 @@ var withdrawTopicOptions = new TopicOptions();
withdrawTopicOptions.PubsubName = PubsubName;
withdrawTopicOptions.Name = "withdraw";
withdrawTopicOptions.DeadLetterTopic = "amountDeadLetterTopic";
var multiDepositTopicOptions = new TopicOptions
{ PubsubName = PubsubName, Name = "multideposit" };
var bulkSubscribeTopicOptions = new BulkSubscribeTopicOptions
{
TopicName = "multideposit", MaxMessagesCount = 250, MaxAwaitDurationMs = 1000
};
```
`WithTopic(...)` now takes the `TopicOptions(..)` instance that defines configurations for the subscribe endpoint.
`WithBulkSubscribe(...)` now takes the `BulkSubscribeTopicOptions(..)` instance that defines configurations for the bulk subscribe endpoint.
---
```C#

View File

@ -1,7 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk.Web">
<PropertyGroup>
<TargetFramework>netcoreapp3.1</TargetFramework>
<TargetFramework>net6</TargetFramework>
</PropertyGroup>
<ItemGroup>

View File

@ -14,9 +14,11 @@
namespace RoutingSample
{
using System;
using System.Collections.Generic;
using System.Text.Json;
using System.Threading.Tasks;
using Dapr;
using Dapr.AspNetCore;
using Dapr.Client;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
@ -101,9 +103,17 @@ namespace RoutingSample
withdrawTopicOptions.PubsubName = PubsubName;
withdrawTopicOptions.Name = "withdraw";
withdrawTopicOptions.DeadLetterTopic = "amountDeadLetterTopic";
var multiDepositTopicOptions = new TopicOptions { PubsubName = PubsubName, Name = "multideposit" };
var bulkSubscribeTopicOptions = new BulkSubscribeTopicOptions
{
TopicName = "multideposit", MaxMessagesCount = 250, MaxAwaitDurationMs = 1000
};
endpoints.MapGet("{id}", Balance);
endpoints.MapPost("deposit", Deposit).WithTopic(depositTopicOptions);
endpoints.MapPost("multideposit", MultiDeposit).WithTopic(multiDepositTopicOptions).WithBulkSubscribe(bulkSubscribeTopicOptions);
endpoints.MapPost("deadLetterTopicRoute", ViewErrorMessage).WithTopic(PubsubName, "amountDeadLetterTopic");
endpoints.MapPost("withdraw", Withdraw).WithTopic(withdrawTopicOptions);
});
@ -158,6 +168,56 @@ namespace RoutingSample
context.Response.ContentType = "application/json";
await JsonSerializer.SerializeAsync(context.Response.Body, account, serializerOptions);
}
async Task MultiDeposit(HttpContext context)
{
logger.LogInformation("Enter bulk deposit");
var client = context.RequestServices.GetRequiredService<DaprClient>();
var bulkMessage = await JsonSerializer.DeserializeAsync<BulkSubscribeMessage<BulkMessageModel<Transaction>>>(
context.Request.Body, serializerOptions);
List<BulkSubscribeAppResponseEntry> entries = new List<BulkSubscribeAppResponseEntry>();
if (bulkMessage != null)
{
foreach (var entry in bulkMessage.Entries)
{
try
{
var transaction = entry.Event.Data;
var state = await client.GetStateEntryAsync<Account>(StoreName, transaction.Id);
state.Value ??= new Account() { Id = transaction.Id, };
logger.LogInformation("Id is {0}, the amount to be deposited is {1}",
transaction.Id, transaction.Amount);
if (transaction.Amount < 0m)
{
logger.LogInformation("Invalid amount");
context.Response.StatusCode = 400;
return;
}
state.Value.Balance += transaction.Amount;
logger.LogInformation("Balance is {0}", state.Value.Balance);
await state.SaveAsync();
entries.Add(new BulkSubscribeAppResponseEntry(entry.EntryId,
BulkSubscribeAppResponseStatus.SUCCESS));
}
catch (Exception e)
{
logger.LogError(e.Message);
entries.Add(new BulkSubscribeAppResponseEntry(entry.EntryId,
BulkSubscribeAppResponseStatus.RETRY));
}
}
}
await JsonSerializer.SerializeAsync(context.Response.Body,
new BulkSubscribeAppResponse(entries), serializerOptions);
}
async Task ViewErrorMessage(HttpContext context)
{

View File

@ -1,26 +0,0 @@
using System;
using System.Collections.Generic;
using System.Net.Mime;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Dapr.Client;
namespace Samples.Client
{
public class PublishBytesExample : Example
{
public override string DisplayName => "Publish Bytes";
public async override Task RunAsync(CancellationToken cancellationToken)
{
using var client = new DaprClientBuilder().Build();
var transaction = new { Id = "17", Amount = 30m };
var content = JsonSerializer.SerializeToUtf8Bytes(transaction);
await client.PublishByteEventAsync(pubsubName, "deposit", content.AsMemory(), MediaTypeNames.Application.Json, new Dictionary<string, string> { }, cancellationToken);
Console.WriteLine("Published deposit event!");
}
}
}

View File

@ -0,0 +1,77 @@
// ------------------------------------------------------------------------
// Copyright 2021 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------
namespace Dapr.AspNetCore
{
/// <summary>
/// Class representing an entry in the DaprBulkMessage.
/// </summary>
/// <typeparam name="TValue">The type of value contained in the data.</typeparam>
public class BulkMessageModel<TValue>
{
/// <summary>
/// Initializes a new instance of the <see cref="BulkMessageModel{TValue}"/> class.
/// </summary>
public BulkMessageModel() {
}
/// <summary>
/// Initializes a new instance of the <see cref="BulkMessageModel{TValue}"/> class.
/// </summary>
/// <param name="id">Identifier of the message being processed.</param>
/// <param name="source">Source for this event.</param>
/// <param name="type">Type of event.</param>
/// <param name="specversion">Version of the event spec.</param>
/// <param name="datacontenttype">Type of the payload.</param>
/// <param name="data">Payload.</param>
public BulkMessageModel(string id, string source, string type, string specversion, string datacontenttype,
TValue data) {
this.Id = id;
this.Source = source;
this.Type = type;
this.Specversion = specversion;
this.Datacontenttype = datacontenttype;
this.Data = data;
}
/// <summary>
/// Identifier of the message being processed.
/// </summary>
public string Id { get; set; }
/// <summary>
/// Source for this event.
/// </summary>
public string Source { get; set; }
/// <summary>
/// Type of event.
/// </summary>
public string Type { get; set; }
/// <summary>
/// Version of the event spec.
/// </summary>
public string Specversion { get; set; }
/// <summary>
/// Type of the payload.
/// </summary>
public string Datacontenttype { get; set; }
/// <summary>
/// Payload.
/// </summary>
public TValue Data { get; set; }
}
}

View File

@ -0,0 +1,38 @@
// ------------------------------------------------------------------------
// Copyright 2021 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------
using System.Collections.Generic;
namespace Dapr.AspNetCore
{
/// <summary>
/// Response from the application containing status for each entry in the bulk message.
/// It is posted to the bulk subscribe handler.
/// </summary>
public class BulkSubscribeAppResponse
{
/// <summary>
/// Initializes a new instance of the <see cref="BulkSubscribeAppResponse"/> class.
/// </summary>
/// <param name="statuses">List of statuses.</param>
public BulkSubscribeAppResponse(List<BulkSubscribeAppResponseEntry> statuses)
{
this.Statuses = statuses;
}
/// <summary>
/// List of statuses.
/// </summary>
public List<BulkSubscribeAppResponseEntry> Statuses { get; }
}
}

View File

@ -0,0 +1,42 @@
// ------------------------------------------------------------------------
// Copyright 2021 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------
namespace Dapr.AspNetCore
{
/// <summary>
/// Maps an entry from bulk subscribe messages to a response status.
/// </summary>
public class BulkSubscribeAppResponseEntry
{
/// <summary>
/// Initializes a new instance of the <see cref="BulkSubscribeAppResponseEntry"/> class.
/// </summary>
/// <param name="entryId">Entry ID of the event.</param>
/// <param name="status">Status of the event processing in application.</param>
public BulkSubscribeAppResponseEntry(string entryId, BulkSubscribeAppResponseStatus status) {
this.EntryId = entryId;
this.Status = status.ToString();
}
/// <summary>
/// Entry ID of the event.
/// </summary>
public string EntryId { get; }
/// <summary>
/// Status of the event processing in application.
/// </summary>
public string Status { get; }
}
}

View File

@ -0,0 +1,34 @@
// ------------------------------------------------------------------------
// Copyright 2021 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------
namespace Dapr.AspNetCore
{
/// <summary>
/// Status of the message handled in bulk subscribe handler.
/// </summary>
public enum BulkSubscribeAppResponseStatus {
/// <summary>
/// Success
/// </summary>
SUCCESS,
/// <summary>
/// Failure
/// </summary>
RETRY,
/// <summary>
/// Drop
/// </summary>
DROP
}
}

View File

@ -0,0 +1,74 @@
// ------------------------------------------------------------------------
// Copyright 2023 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------
using System;
namespace Dapr.AspNetCore
{
/// <summary>
/// BulkSubscribeAttribute describes options for a bulk subscriber with respect to a topic.
/// It needs to be paired with at least one [Topic] depending on the use case.
/// </summary>
[AttributeUsage(AttributeTargets.All, AllowMultiple = true)]
public class BulkSubscribeAttribute : Attribute, IBulkSubscribeMetadata
{
/// <summary>
/// Initializes a new instance of the <see cref="BulkSubscribeAttribute" /> class.
/// </summary>
/// <param name="topicName">The name of topic.</param>
/// <param name="maxMessagesCount">The name of the pubsub component to use.</param>
/// <param name="maxAwaitDurationMs">The topic name.</param>
public BulkSubscribeAttribute(string topicName, int maxMessagesCount, int maxAwaitDurationMs)
{
this.TopicName = topicName;
this.MaxMessagesCount = maxMessagesCount;
this.MaxAwaitDurationMs = maxAwaitDurationMs;
}
/// <summary>
/// Initializes a new instance of the <see cref="BulkSubscribeAttribute" /> class.
/// </summary>
/// <param name="topicName">The name of topic.</param>
/// <param name="maxMessagesCount">The name of the pubsub component to use.</param>
public BulkSubscribeAttribute(string topicName, int maxMessagesCount)
{
this.TopicName = topicName;
this.MaxMessagesCount = maxMessagesCount;
}
/// <summary>
/// Initializes a new instance of the <see cref="BulkSubscribeAttribute" /> class.
/// </summary>
/// <param name="topicName">The name of topic.</param>
public BulkSubscribeAttribute(string topicName)
{
this.TopicName = topicName;
}
/// <summary>
/// Maximum number of messages in a bulk message from the message bus.
/// </summary>
public int MaxMessagesCount { get; } = 100;
/// <summary>
/// Maximum duration to wait for maxBulkSubCount messages by the message bus
/// before sending the messages to Dapr.
/// </summary>
public int MaxAwaitDurationMs { get; } = 1000;
/// <summary>
/// The name of the topic to be bulk subscribed.
/// </summary>
public string TopicName { get; }
}
}

View File

@ -0,0 +1,59 @@
// ------------------------------------------------------------------------
// Copyright 2021 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------
using System.Collections.Generic;
namespace Dapr.AspNetCore
{
/// <summary>
/// Represents a bulk of messages received from the message bus.
/// </summary>
/// <typeparam name="TValue">The type of value contained in the data.</typeparam>
public class BulkSubscribeMessage<TValue>
{
/// <summary>
/// Initializes a new instance of the <see cref="BulkSubscribeMessage{TValue}"/> class.
/// </summary>
public BulkSubscribeMessage()
{
}
/// <summary>
/// Initializes a new instance of the <see cref="BulkSubscribeMessage{TValue}"/> class.
/// </summary>
/// <param name="entries">A list of entries representing the event and other metadata.</param>
/// <param name="topic">The name of the pubsub topic.</param>
/// <param name="metadata">Metadata for the bulk message.</param>
public BulkSubscribeMessage(List<BulkSubscribeMessageEntry<TValue>> entries, string topic, Dictionary<string, string> metadata)
{
this.Entries = entries;
this.Topic = topic;
this.Metadata = metadata;
}
/// <summary>
/// A list of entries representing the event and other metadata.
/// </summary>
public List<BulkSubscribeMessageEntry<TValue>> Entries { get; set; }
/// <summary>
/// The name of the pubsub topic.
/// </summary>
public string Topic { get; set; }
/// <summary>
/// Metadata for the bulk message.
/// </summary>
public Dictionary<string, string> Metadata { get; set; }
}
}

View File

@ -0,0 +1,67 @@
// ------------------------------------------------------------------------
// Copyright 2021 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------
using System.Collections.Generic;
namespace Dapr.AspNetCore
{
/// <summary>
/// Represents a single event from a bulk of messages sent by the message bus.
/// </summary>
/// <typeparam name="TValue">The type of value contained in the data.</typeparam>
public class BulkSubscribeMessageEntry<TValue>
{
/// <summary>
/// Initializes a new instance of the <see cref="BulkSubscribeMessageEntry{TValue}"/> class.
/// </summary>
public BulkSubscribeMessageEntry() {
}
/// <summary>
/// Initializes a new instance of the <see cref="BulkSubscribeMessageEntry{TValue}"/> class.
/// </summary>
/// <param name="entryId">A unique identifier for the event.</param>
/// <param name="contentType">Content type of the event.</param>
/// <param name="metadata">Metadata for the event.</param>
/// <param name="eventData">The pubsub event.</param>
public BulkSubscribeMessageEntry(string entryId, string contentType, Dictionary<string, string> metadata,
TValue eventData)
{
this.EntryId = entryId;
this.ContentType = contentType;
this.Metadata = metadata;
this.Event = eventData;
}
/// <summary>
/// A unique identifier for the event.
/// </summary>
public string EntryId { get; set; }
/// <summary>
/// Content type of the event.
/// </summary>
public string ContentType { get; set; }
/// <summary>
/// Metadata for the event.
/// </summary>
public Dictionary<string, string> Metadata { get; set; }
/// <summary>
/// The pubsub event.
/// </summary>
public TValue Event { get; set; }
}
}

View File

@ -0,0 +1,24 @@
namespace Dapr.AspNetCore
{
/// <summary>
/// This class defines configurations for the bulk subscribe endpoint.
/// </summary>
public class BulkSubscribeTopicOptions
{
/// <summary>
/// Maximum number of messages in a bulk message from the message bus.
/// </summary>
public int MaxMessagesCount { get; set; } = 100;
/// <summary>
/// Maximum duration to wait for maxBulkSubCount messages by the message bus
/// before sending the messages to Dapr.
/// </summary>
public int MaxAwaitDurationMs { get; set; } = 1000;
/// <summary>
/// The name of the topic to be bulk subscribed.
/// </summary>
public string TopicName { get; set; }
}
}

View File

@ -13,12 +13,10 @@
namespace Microsoft.AspNetCore.Builder
{
using Dapr;
using Dapr.AspNetCore;
using System;
using System.Collections.Generic;
using System.Reflection;
using System.Xml.Linq;
using Dapr;
using Grpc.Core;
/// <summary>
/// Contains extension methods for <see cref="IEndpointConventionBuilder" />.
@ -144,5 +142,30 @@ namespace Microsoft.AspNetCore.Builder
return builder;
}
/// <summary>
/// Adds <see cref="IBulkSubscribeMetadata" /> metadata to the provided <see cref="IEndpointConventionBuilder" />.
/// </summary>
/// <param name="builder">The <see cref="IEndpointConventionBuilder" />.</param>\
/// <param name="bulkSubscribeTopicOptions">The object of BulkSubscribeTopicOptions class that provides
/// all bulk subscribe topic attributes.</param>
/// <typeparam name="T">The <see cref="IEndpointConventionBuilder" /> type.</typeparam>
/// <returns>The <see cref="IEndpointConventionBuilder" /> builder object.</returns>
public static T WithBulkSubscribe<T>(this T builder, BulkSubscribeTopicOptions bulkSubscribeTopicOptions)
where T : IEndpointConventionBuilder
{
if (builder is null)
{
throw new ArgumentNullException(nameof(builder));
}
ArgumentVerifier.ThrowIfNullOrEmpty(bulkSubscribeTopicOptions.TopicName,
nameof(bulkSubscribeTopicOptions.TopicName));
builder.WithMetadata(new BulkSubscribeAttribute(bulkSubscribeTopicOptions.TopicName,
bulkSubscribeTopicOptions.MaxMessagesCount, bulkSubscribeTopicOptions.MaxAwaitDurationMs));
return builder;
}
}
}

View File

@ -18,6 +18,7 @@ namespace Microsoft.AspNetCore.Builder
using System.Text.Json;
using System.Text.Json.Serialization;
using Dapr;
using Dapr.AspNetCore;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Routing;
using Microsoft.AspNetCore.Routing.Patterns;
@ -71,11 +72,32 @@ namespace Microsoft.AspNetCore.Builder
{
var topicMetadata = e.Metadata.GetOrderedMetadata<ITopicMetadata>();
var originalTopicMetadata = e.Metadata.GetOrderedMetadata<IOriginalTopicMetadata>();
var bulkSubscribeMetadata = e.Metadata.GetOrderedMetadata<IBulkSubscribeMetadata>();
var subs = new List<(string PubsubName, string Name, string DeadLetterTopic, bool? EnableRawPayload, string Match, int Priority, Dictionary<string, string[]> OriginalTopicMetadata, string MetadataSeparator, RoutePattern RoutePattern)>();
var subs = new List<(string PubsubName, string Name, string DeadLetterTopic, bool? EnableRawPayload,
string Match, int Priority, Dictionary<string, string[]> OriginalTopicMetadata,
string MetadataSeparator, RoutePattern RoutePattern, DaprTopicBulkSubscribe bulkSubscribe)>();
for (int i = 0; i < topicMetadata.Count(); i++)
{
DaprTopicBulkSubscribe bulkSubscribe = null;
foreach (var bulkSubscribeAttr in bulkSubscribeMetadata)
{
if (bulkSubscribeAttr.TopicName != topicMetadata[i].Name)
{
continue;
}
bulkSubscribe = new DaprTopicBulkSubscribe
{
Enabled = true,
MaxMessagesCount = bulkSubscribeAttr.MaxMessagesCount,
MaxAwaitDurationMs = bulkSubscribeAttr.MaxAwaitDurationMs
};
break;
}
subs.Add((topicMetadata[i].PubsubName,
topicMetadata[i].Name,
(topicMetadata[i] as IDeadLetterTopicMetadata)?.DeadLetterTopic,
@ -86,7 +108,8 @@ namespace Microsoft.AspNetCore.Builder
.GroupBy(c => c.Name)
.ToDictionary(m => m.Key, m => m.Select(c => c.Value).Distinct().ToArray()),
(topicMetadata[i] as IOwnedOriginalTopicMetadata)?.MetadataSeparator,
e.RoutePattern));
e.RoutePattern,
bulkSubscribe));
}
return subs;
@ -132,6 +155,7 @@ namespace Microsoft.AspNetCore.Builder
Topic = first.Name,
PubsubName = first.PubsubName,
Metadata = metadata.Count > 0 ? metadata : null,
BulkSubscribe = first.bulkSubscribe
};
if (first.DeadLetterTopic != null)

View File

@ -0,0 +1,24 @@
namespace Dapr.AspNetCore
{
/// <summary>
/// Bulk Subscribe Metadata that describes bulk subscribe configuration options.
/// </summary>
public interface IBulkSubscribeMetadata
{
/// <summary>
/// Gets the maximum number of messages in a bulk message from the message bus.
/// </summary>
int MaxMessagesCount { get; }
/// <summary>
/// Gets the Maximum duration to wait for maxBulkSubCount messages by the message bus
/// before sending the messages to Dapr.
/// </summary>
int MaxAwaitDurationMs { get; }
/// <summary>
/// The name of the topic to be bulk subscribed.
/// </summary>
public string TopicName { get; }
}
}

View File

@ -49,6 +49,11 @@ namespace Dapr
/// Gets or sets the deadletter topic.
/// </summary>
public string DeadLetterTopic { get; set; }
/// <summary>
/// Gets or sets the bulk subscribe options.
/// </summary>
public DaprTopicBulkSubscribe BulkSubscribe { get; set; }
}
/// <summary>
@ -91,4 +96,23 @@ namespace Dapr
/// </summary>
public string Path { get; set; }
}
internal class DaprTopicBulkSubscribe
{
/// <summary>
/// Gets or sets whether bulk subscribe option is enabled for a topic.
/// </summary>
public bool Enabled { get; set; }
/// <summary>
/// Gets or sets the maximum number of messages in a bulk message from the message bus.
/// </summary>
public int MaxMessagesCount { get; set; }
/// <summary>
/// Gets or sets the Maximum duration to wait for maxBulkSubCount messages by the message bus
/// before sending the messages to Dapr.
/// </summary>
public int MaxAwaitDurationMs { get; set; }
}
}

View File

@ -151,6 +151,9 @@ message TopicSubscription {
// The optional dead letter queue for this topic to send events to.
string dead_letter_topic = 6;
// The optional bulk subscribe settings for this topic.
BulkSubscribeConfig bulk_subscribe = 7;
}
message TopicRoutes {
@ -161,6 +164,18 @@ message TopicRoutes {
string default = 2;
}
// BulkSubscribeConfig is the message to pass settings for bulk subscribe
message BulkSubscribeConfig {
// Required. Flag to enable/disable bulk subscribe
bool enabled = 1;
// Optional. Max number of messages to be sent in a single bulk request
int32 max_messages_count = 2;
// Optional. Max duration to wait for messages to be sent in a single bulk request
int32 max_await_duration_ms = 3;
}
message TopicRule {
// The optional CEL expression used to match the event.
// If the match is not specified, then the route is considered

View File

@ -13,7 +13,6 @@
namespace Dapr.AspNetCore.IntegrationTest.App
{
using System;
using System.Text;
using System.Threading.Tasks;
using Dapr;
@ -61,19 +60,22 @@ namespace Dapr.AspNetCore.IntegrationTest.App
{
}
[BulkSubscribe("F")]
[Topic("pubsub", "F")]
[Topic("pubsub", "F.1", true)]
[HttpPost("/multiTopicAttr")]
public void MultipleTopics()
{
}
[BulkSubscribe("G", 300)]
[Topic("pubsub", "G", "deadLetterTopicName", false)]
[HttpPost("/G")]
public void TopicG()
{
}
[BulkSubscribe("metadata.1", 500, 2000)]
[Topic("pubsub", "metadata", new string[1] { "id1" })]
[Topic("pubsub", "metadata.1", true)]
[HttpPost("/multiMetadataTopicAttr")]

View File

@ -13,6 +13,7 @@
namespace Dapr.AspNetCore.IntegrationTest
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Http;
@ -41,7 +42,8 @@ namespace Dapr.AspNetCore.IntegrationTest
json.ValueKind.Should().Be(JsonValueKind.Array);
json.GetArrayLength().Should().Be(18);
var subscriptions = new List<(string PubsubName, string Topic, string Route, string rawPayload, string match, string metadata, string DeadLetterTopic)>();
var subscriptions = new List<(string PubsubName, string Topic, string Route, string rawPayload,
string match, string metadata, string DeadLetterTopic, string bulkSubscribeMetadata)>();
foreach (var element in json.EnumerateArray())
{
@ -49,8 +51,14 @@ namespace Dapr.AspNetCore.IntegrationTest
var topic = element.GetProperty("topic").GetString();
var rawPayload = string.Empty;
var deadLetterTopic = string.Empty;
var bulkSubscribeMetadata = string.Empty;
//JsonElement bulkSubscribeMetadata;
Dictionary<string, string> originalMetadata = new Dictionary<string, string>();
if (element.TryGetProperty("bulkSubscribe", out var BulkSubscribeMetadata))
{
bulkSubscribeMetadata = BulkSubscribeMetadata.ToString();
}
if (element.TryGetProperty("deadLetterTopic", out JsonElement DeadLetterTopic))
{
deadLetterTopic = DeadLetterTopic.GetString();
@ -78,7 +86,8 @@ namespace Dapr.AspNetCore.IntegrationTest
if (element.TryGetProperty("route", out JsonElement route))
{
subscriptions.Add((pubsubName, topic, route.GetString(), rawPayload, string.Empty, originalMetadataString, deadLetterTopic));
subscriptions.Add((pubsubName, topic, route.GetString(), rawPayload, string.Empty,
originalMetadataString, deadLetterTopic, bulkSubscribeMetadata));
}
else if (element.TryGetProperty("routes", out JsonElement routes))
{
@ -88,35 +97,40 @@ namespace Dapr.AspNetCore.IntegrationTest
{
var match = rule.GetProperty("match").GetString();
var path = rule.GetProperty("path").GetString();
subscriptions.Add((pubsubName, topic, path, rawPayload, match, originalMetadataString, deadLetterTopic));
subscriptions.Add((pubsubName, topic, path, rawPayload, match,
originalMetadataString, deadLetterTopic, bulkSubscribeMetadata));
}
}
if (routes.TryGetProperty("default", out JsonElement defaultProperty))
{
subscriptions.Add((pubsubName, topic, defaultProperty.GetString(), rawPayload, string.Empty, originalMetadataString, deadLetterTopic));
subscriptions.Add((pubsubName, topic, defaultProperty.GetString(), rawPayload,
string.Empty, originalMetadataString, deadLetterTopic, bulkSubscribeMetadata));
}
}
}
subscriptions.Should().Contain(("testpubsub", "A", "topic-a", string.Empty, string.Empty, string.Empty, string.Empty));
subscriptions.Should().Contain(("testpubsub", "A.1", "topic-a", string.Empty, string.Empty, string.Empty, string.Empty));
subscriptions.Should().Contain(("pubsub", "B", "B", string.Empty, string.Empty, string.Empty, string.Empty));
subscriptions.Should().Contain(("custom-pubsub", "custom-C", "C", string.Empty, string.Empty, string.Empty, string.Empty));
subscriptions.Should().Contain(("pubsub", "register-user", "register-user", string.Empty, string.Empty, string.Empty, string.Empty));
subscriptions.Should().Contain(("pubsub", "register-user-plaintext", "register-user-plaintext", string.Empty, string.Empty, string.Empty, string.Empty));
subscriptions.Should().Contain(("pubsub", "D", "D", "true", string.Empty, string.Empty, string.Empty));
subscriptions.Should().Contain(("pubsub", "E", "E", string.Empty, string.Empty, string.Empty, string.Empty));
subscriptions.Should().Contain(("pubsub", "E", "E-Critical", string.Empty, "event.type == \"critical\"", string.Empty, string.Empty));
subscriptions.Should().Contain(("pubsub", "E", "E-Important", string.Empty, "event.type == \"important\"", string.Empty, string.Empty));
subscriptions.Should().Contain(("pubsub", "F", "multiTopicAttr", string.Empty, string.Empty, string.Empty, string.Empty));
subscriptions.Should().Contain(("pubsub", "F.1", "multiTopicAttr", "true", string.Empty, string.Empty, string.Empty));
subscriptions.Should().Contain(("pubsub", "G", "G", string.Empty, string.Empty, string.Empty, "deadLetterTopicName"));
subscriptions.Should().Contain(("pubsub", "splitTopicBuilder", "splitTopics", string.Empty, string.Empty, string.Empty, string.Empty));
subscriptions.Should().Contain(("pubsub", "splitTopicAttr", "splitTopics", "true", string.Empty, string.Empty, string.Empty));
subscriptions.Should().Contain(("pubsub", "metadata", "multiMetadataTopicAttr", string.Empty, string.Empty, "n1=v1;n2=v2,v3", string.Empty));
subscriptions.Should().Contain(("pubsub", "metadata.1", "multiMetadataTopicAttr", "true", string.Empty, "n1=v1", string.Empty));
subscriptions.Should().Contain(("pubsub", "splitMetadataTopicBuilder", "splitMetadataTopics", string.Empty, string.Empty, "n1=v1;n2=v1", string.Empty));
subscriptions.Should().Contain(("pubsub", "metadataseparatorbyemptytring", "topicmetadataseparatorattrbyemptytring", string.Empty, string.Empty, "n1=v1,", string.Empty));
subscriptions.Should().Contain(("testpubsub", "A", "topic-a", string.Empty, string.Empty, string.Empty, string.Empty, String.Empty));
subscriptions.Should().Contain(("testpubsub", "A.1", "topic-a", string.Empty, string.Empty, string.Empty, string.Empty, String.Empty));
subscriptions.Should().Contain(("pubsub", "B", "B", string.Empty, string.Empty, string.Empty, string.Empty, String.Empty));
subscriptions.Should().Contain(("custom-pubsub", "custom-C", "C", string.Empty, string.Empty, string.Empty, string.Empty, String.Empty));
subscriptions.Should().Contain(("pubsub", "register-user", "register-user", string.Empty, string.Empty, string.Empty, string.Empty, String.Empty));
subscriptions.Should().Contain(("pubsub", "register-user-plaintext", "register-user-plaintext", string.Empty, string.Empty, string.Empty, string.Empty, String.Empty));
subscriptions.Should().Contain(("pubsub", "D", "D", "true", string.Empty, string.Empty, string.Empty, String.Empty));
subscriptions.Should().Contain(("pubsub", "E", "E", string.Empty, string.Empty, string.Empty, string.Empty, String.Empty));
subscriptions.Should().Contain(("pubsub", "E", "E-Critical", string.Empty, "event.type == \"critical\"", string.Empty, string.Empty, String.Empty));
subscriptions.Should().Contain(("pubsub", "E", "E-Important", string.Empty, "event.type == \"important\"", string.Empty, string.Empty, String.Empty));
subscriptions.Should().Contain(("pubsub", "F", "multiTopicAttr", string.Empty, string.Empty, string.Empty, string.Empty,
"{\"enabled\":true,\"maxMessagesCount\":100,\"maxAwaitDurationMs\":1000}"));
subscriptions.Should().Contain(("pubsub", "F.1", "multiTopicAttr", "true", string.Empty, string.Empty, string.Empty, String.Empty));
subscriptions.Should().Contain(("pubsub", "G", "G", string.Empty, string.Empty, string.Empty, "deadLetterTopicName",
"{\"enabled\":true,\"maxMessagesCount\":300,\"maxAwaitDurationMs\":1000}"));
subscriptions.Should().Contain(("pubsub", "splitTopicBuilder", "splitTopics", string.Empty, string.Empty, string.Empty, string.Empty, String.Empty));
subscriptions.Should().Contain(("pubsub", "splitTopicAttr", "splitTopics", "true", string.Empty, string.Empty, string.Empty, String.Empty));
subscriptions.Should().Contain(("pubsub", "metadata", "multiMetadataTopicAttr", string.Empty, string.Empty, "n1=v1;n2=v2,v3", string.Empty, String.Empty));
subscriptions.Should().Contain(("pubsub", "metadata.1", "multiMetadataTopicAttr", "true", string.Empty, "n1=v1", string.Empty,
"{\"enabled\":true,\"maxMessagesCount\":500,\"maxAwaitDurationMs\":2000}"));
subscriptions.Should().Contain(("pubsub", "splitMetadataTopicBuilder", "splitMetadataTopics", string.Empty, string.Empty, "n1=v1;n2=v1", string.Empty, String.Empty));
subscriptions.Should().Contain(("pubsub", "metadataseparatorbyemptytring", "topicmetadataseparatorattrbyemptytring", string.Empty, string.Empty, "n1=v1,", string.Empty, String.Empty));
// Test priority route sorting
var eTopic = subscriptions.FindAll(e => e.Topic == "E");
eTopic.Count.Should().Be(3);