Implement Bulk Publish functionality (#1001)

Closes https://github.com/dapr/dotnet-sdk/issues/958

Signed-off-by: Yash Nisar <yashnisar@microsoft.com>

Signed-off-by: Yash Nisar <yashnisar@microsoft.com>
Co-authored-by: halspang <70976921+halspang@users.noreply.github.com>
This commit is contained in:
Yash Nisar 2023-01-26 13:34:09 -06:00 committed by GitHub
parent 76d4b682ec
commit 1605ecd90e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 897 additions and 13 deletions

3
.gitignore vendored
View File

@ -101,3 +101,6 @@ coverage.json
# Examples bloat
examples/**/tmp
# MacOS
**/.DS_Store

22
all.sln
View File

@ -93,6 +93,12 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Workflow", "Workflow", "{BF
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WorkflowWebApp", "examples\Workflow\WorkflowWebApp\WorkflowWebApp.csproj", "{5C61ABED-7623-4C28-A5C9-C5972A0F669C}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "PublishSubscribe", "PublishSubscribe", "{0EF6EA64-D7C3-420D-9890-EAE8D54A57E6}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "PublishEventExample", "examples\Client\PublishSubscribe\PublishEventExample\PublishEventExample.csproj", "{4A175C27-EAFE-47E7-90F6-873B37863656}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BulkPublishEventExample", "examples\Client\PublishSubscribe\BulkPublishEventExample\BulkPublishEventExample.csproj", "{DDC41278-FB60-403A-B969-2AEBD7C2D83C}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -191,10 +197,6 @@ Global
{8B570E70-0E73-4042-A4B6-1CC3CC782A65}.Debug|Any CPU.Build.0 = Debug|Any CPU
{8B570E70-0E73-4042-A4B6-1CC3CC782A65}.Release|Any CPU.ActiveCfg = Release|Any CPU
{8B570E70-0E73-4042-A4B6-1CC3CC782A65}.Release|Any CPU.Build.0 = Release|Any CPU
{DE6913E3-E5D9-4D1D-95F9-9FED87BD09BC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{DE6913E3-E5D9-4D1D-95F9-9FED87BD09BC}.Debug|Any CPU.Build.0 = Debug|Any CPU
{DE6913E3-E5D9-4D1D-95F9-9FED87BD09BC}.Release|Any CPU.ActiveCfg = Release|Any CPU
{DE6913E3-E5D9-4D1D-95F9-9FED87BD09BC}.Release|Any CPU.Build.0 = Release|Any CPU
{4AA9E7B7-36BF-4AAE-BFA3-C9CE8740F4A0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{4AA9E7B7-36BF-4AAE-BFA3-C9CE8740F4A0}.Debug|Any CPU.Build.0 = Debug|Any CPU
{4AA9E7B7-36BF-4AAE-BFA3-C9CE8740F4A0}.Release|Any CPU.ActiveCfg = Release|Any CPU
@ -231,6 +233,14 @@ Global
{5C61ABED-7623-4C28-A5C9-C5972A0F669C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{5C61ABED-7623-4C28-A5C9-C5972A0F669C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{5C61ABED-7623-4C28-A5C9-C5972A0F669C}.Release|Any CPU.Build.0 = Release|Any CPU
{4A175C27-EAFE-47E7-90F6-873B37863656}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{4A175C27-EAFE-47E7-90F6-873B37863656}.Debug|Any CPU.Build.0 = Debug|Any CPU
{4A175C27-EAFE-47E7-90F6-873B37863656}.Release|Any CPU.ActiveCfg = Release|Any CPU
{4A175C27-EAFE-47E7-90F6-873B37863656}.Release|Any CPU.Build.0 = Release|Any CPU
{DDC41278-FB60-403A-B969-2AEBD7C2D83C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{DDC41278-FB60-403A-B969-2AEBD7C2D83C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{DDC41278-FB60-403A-B969-2AEBD7C2D83C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{DDC41278-FB60-403A-B969-2AEBD7C2D83C}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@ -262,7 +272,6 @@ Global
{A7F41094-8648-446B-AECD-DCC2CC871F73} = {D687DDC4-66C5-4667-9E3A-FD8B78ECAA78}
{F70AC78E-8925-4770-832A-2FC67A620EB2} = {A7F41094-8648-446B-AECD-DCC2CC871F73}
{8B570E70-0E73-4042-A4B6-1CC3CC782A65} = {A7F41094-8648-446B-AECD-DCC2CC871F73}
{DE6913E3-E5D9-4D1D-95F9-9FED87BD09BC} = {A7F41094-8648-446B-AECD-DCC2CC871F73}
{4AA9E7B7-36BF-4AAE-BFA3-C9CE8740F4A0} = {DD020B34-460F-455F-8D17-CF4A949F100B}
{345FC3FB-D1E9-4AE8-9052-17D20AB01FA2} = {DD020B34-460F-455F-8D17-CF4A949F100B}
{2AED1542-A8ED-488D-B6D0-E16AB5D6EF6C} = {DD020B34-460F-455F-8D17-CF4A949F100B}
@ -273,6 +282,9 @@ Global
{07578B6C-9B96-4B3D-BA2E-7800EFCA7F99} = {27C5D71D-0721-4221-9286-B94AB07B58CF}
{BF3ED6BF-ADF3-4D25-8E89-02FB8D945CA9} = {D687DDC4-66C5-4667-9E3A-FD8B78ECAA78}
{5C61ABED-7623-4C28-A5C9-C5972A0F669C} = {BF3ED6BF-ADF3-4D25-8E89-02FB8D945CA9}
{0EF6EA64-D7C3-420D-9890-EAE8D54A57E6} = {A7F41094-8648-446B-AECD-DCC2CC871F73}
{4A175C27-EAFE-47E7-90F6-873B37863656} = {0EF6EA64-D7C3-420D-9890-EAE8D54A57E6}
{DDC41278-FB60-403A-B969-2AEBD7C2D83C} = {0EF6EA64-D7C3-420D-9890-EAE8D54A57E6}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {65220BF2-EAE1-4CB2-AA58-EBE80768CB40}

View File

@ -81,7 +81,7 @@ namespace ControllerSample.Controllers
}
state.Value.Balance += transaction.Amount;
logger.LogInformation("Balance is {0}", state.Value.Balance);
logger.LogInformation("Balance for Id {0} is {1}",state.Value.Id, state.Value.Balance);
await state.SaveAsync();
return state.Value;
}

View File

@ -0,0 +1,62 @@
// ------------------------------------------------------------------------
// Copyright 2023 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Dapr.Client;
namespace Samples.Client
{
public class BulkPublishEventExample : Example
{
private const string PubsubName = "pubsub";
private const string TopicName = "deposit";
IReadOnlyList<object> BulkPublishData = new List<object>() {
new { Id = "17", Amount = 10m },
new { Id = "18", Amount = 20m },
new { Id = "19", Amount = 30m }
};
public override string DisplayName => "Bulk Publishing Events";
public override async Task RunAsync(CancellationToken cancellationToken)
{
using var client = new DaprClientBuilder().Build();
var res = await client.BulkPublishEventAsync(PubsubName, TopicName,
BulkPublishData);
if (res != null) {
if (res.FailedEntries.Count > 0)
{
Console.WriteLine("Some events failed to be published!");
foreach (var failedEntry in res.FailedEntries)
{
Console.WriteLine("EntryId : " + failedEntry.Entry.EntryId + " Error message : " +
failedEntry.ErrorMessage);
}
}
else
{
Console.WriteLine("Published multiple deposit events!");
}
} else {
throw new Exception("null response from dapr");
}
}
}
}

View File

@ -0,0 +1,24 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
<RootNamespace>Samples.Client</RootNamespace>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<Protobuf Include="..\..\..\AspNetCore\GrpcServiceSample\Protos\*.proto" ProtoRoot="..\..\..\AspNetCore\GrpcServiceSample\Protos\" GrpcServices="None" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\..\..\src\Dapr.Client\Dapr.Client.csproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Google.Protobuf" Version="3.15.0" />
<PackageReference Include="Google.Api.CommonProtos" Version="2.2.0" />
<PackageReference Include="Grpc.Tools" Version="2.47.0" PrivateAssets="All" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,25 @@
// ------------------------------------------------------------------------
// Copyright 2023 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------
using System.Threading;
using System.Threading.Tasks;
namespace Samples.Client
{
public abstract class Example
{
public abstract string DisplayName { get; }
public abstract Task RunAsync(CancellationToken cancellationToken);
}
}

View File

@ -0,0 +1,47 @@
// ------------------------------------------------------------------------
// Copyright 2023 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------
using System;
using System.Threading;
using System.Threading.Tasks;
namespace Samples.Client
{
class Program
{
private static readonly Example[] Examples = new Example[]
{
new BulkPublishEventExample(),
};
static async Task<int> Main(string[] args)
{
if (args.Length > 0 && int.TryParse(args[0], out var index) && index >= 0 && index < Examples.Length)
{
var cts = new CancellationTokenSource();
Console.CancelKeyPress += (object? sender, ConsoleCancelEventArgs e) => cts.Cancel();
await Examples[index].RunAsync(cts.Token);
return 0;
}
Console.WriteLine("Hello, please choose a sample to run:");
for (var i = 0; i < Examples.Length; i++)
{
Console.WriteLine($"{i}: {Examples[i].DisplayName}");
}
Console.WriteLine();
return 0;
}
}
}

View File

@ -0,0 +1,43 @@
# Dapr .NET SDK Bulk publish example
## Prerequisites
- [.NET Core 3.1 or .NET 5+](https://dotnet.microsoft.com/download) installed
- [Dapr CLI](https://docs.dapr.io/getting-started/install-dapr-cli/)
- [Initialized Dapr environment](https://docs.dapr.io/getting-started/install-dapr-selfhost/)
- [Dapr .NET SDK](https://docs.dapr.io/developing-applications/sdks/dotnet/)
## Running the example
### Run the subscriber
Navigate to the [ControllerSample](https://github.com/dapr/dotnet-sdk/tree/master/examples/AspNetCore/ControllerSample) directory and run the subscriber. It will subscribe to the `deposit` topic that the publisher will publish messages to.
```sh
dapr run --app-id controller --app-port 5000 -- dotnet run
```
### Run the bulk publisher
After running the subscriber, run the bulk publisher. To run the sample locally, run this command in this project root directory:
```sh
dapr run --app-id DaprClient -- dotnet run <sample number>
```
Running the following command will output a list of the samples included:
```sh
dapr run --app-id DaprClient -- dotnet run
```
Press Ctrl+C to exit, and then run the command again and provide a sample number to run the samples.
For example run this command to run the 0th sample from the list produced earlier.
```sh
dapr run --app-id DaprClient -- dotnet run 0
```
## Publishing Bulk Pub/Sub Events
See [BulkPublishEventExample.cs](./BulkPublishEventExample.cs) for an example using the `DaprClient` to publish a pub/sub event.

View File

@ -1,5 +1,5 @@
// ------------------------------------------------------------------------
// Copyright 2021 The Dapr Authors
// Copyright 2023 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at

View File

@ -1,5 +1,5 @@
// ------------------------------------------------------------------------
// Copyright 2021 The Dapr Authors
// Copyright 2023 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@ -42,7 +42,7 @@ namespace Samples.Client
Console.WriteLine($"{i}: {Examples[i].DisplayName}");
}
Console.WriteLine();
return 1;
return 0;
}
}
}

View File

@ -0,0 +1,39 @@
// ------------------------------------------------------------------------
// Copyright 2023 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.Net.Mime;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Dapr.Client;
namespace Samples.Client
{
public class PublishBytesExample : Example
{
public override string DisplayName => "Publish Bytes";
public async override Task RunAsync(CancellationToken cancellationToken)
{
using var client = new DaprClientBuilder().Build();
var transaction = new { Id = "17", Amount = 30m };
var content = JsonSerializer.SerializeToUtf8Bytes(transaction);
await client.PublishByteEventAsync(pubsubName, "deposit", content.AsMemory(), MediaTypeNames.Application.Json, new Dictionary<string, string> { }, cancellationToken);
Console.WriteLine("Published deposit event!");
}
}
}

View File

@ -1,5 +1,5 @@
// ------------------------------------------------------------------------
// Copyright 2021 The Dapr Authors
// Copyright 2023 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at

View File

@ -8,11 +8,11 @@
</PropertyGroup>
<ItemGroup>
<Protobuf Include="..\..\AspNetCore\GrpcServiceSample\Protos\*.proto" ProtoRoot="..\..\AspNetCore\GrpcServiceSample\Protos\" GrpcServices="None" />
<Protobuf Include="..\..\..\AspNetCore\GrpcServiceSample\Protos\*.proto" ProtoRoot="..\..\..\AspNetCore\GrpcServiceSample\Protos\" GrpcServices="None" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\..\src\Dapr.Client\Dapr.Client.csproj" />
<ProjectReference Include="..\..\..\..\src\Dapr.Client\Dapr.Client.csproj" />
</ItemGroup>
<ItemGroup>

View File

@ -0,0 +1,61 @@
// ------------------------------------------------------------------------
// Copyright 2023 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------
using System.Collections.Generic;
namespace Dapr.Client
{
/// <summary>
/// Class representing an entry in the BulkPublishRequest.
/// </summary>
/// <typeparam name="TValue">The data type of the value.</typeparam>
public class BulkPublishEntry<TValue>
{
/// <summary>
/// Initializes a new instance of the <see cref="BulkPublishEntry{TValue}"/> class.
/// </summary>
/// <param name="entryId">A request scoped ID uniquely identifying this entry in the BulkPublishRequest.</param>
/// <param name="eventData">Event to be published.</param>
/// <param name="contentType">Content Type of the event to be published.</param>
/// <param name="metadata">Metadata for the event.</param>
public BulkPublishEntry(string entryId, TValue eventData, string contentType, IReadOnlyDictionary<string, string> metadata = default)
{
this.EntryId = entryId;
this.EventData = eventData;
this.ContentType = contentType;
this.Metadata = metadata;
}
/// <summary>
/// The ID uniquely identifying this particular request entry across the request and scoped for this request only.
/// </summary>
public string EntryId { get; }
/// <summary>
/// The event to be published.
/// </summary>
public TValue EventData { get; }
/// <summary>
/// The content type of the event to be published.
/// </summary>
public string ContentType { get; }
/// <summary>
/// The metadata set for this particular event.
/// Any particular values in this metadata overrides the request metadata present in BulkPublishRequest.
/// </summary>
public IReadOnlyDictionary<string, string> Metadata { get; }
}
}

View File

@ -0,0 +1,37 @@
// ------------------------------------------------------------------------
// Copyright 2023 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------
using System.Collections.Generic;
namespace Dapr.Client
{
/// <summary>
/// Class representing the response returned on bulk publishing events.
/// </summary>
public class BulkPublishResponse<TValue>
{
/// <summary>
/// Initializes a new instance of the <see cref="BulkPublishResponse{TValue}"/> class.
/// </summary>
/// <param name="failedEntries">The List of BulkPublishResponseEntries representing the list of events that failed to be published.</param>
public BulkPublishResponse(List<BulkPublishResponseFailedEntry<TValue>> failedEntries)
{
this.FailedEntries = failedEntries;
}
/// <summary>
/// The List of BulkPublishResponseFailedEntry objects that have failed to publish.
/// </summary>
public List<BulkPublishResponseFailedEntry<TValue>> FailedEntries { get; }
}
}

View File

@ -0,0 +1,42 @@
// ------------------------------------------------------------------------
// Copyright 2023 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------
namespace Dapr.Client
{
/// <summary>
/// Class representing the status of each event that was published using BulkPublishRequest.
/// </summary>
public class BulkPublishResponseFailedEntry<TValue>
{
/// <summary>
/// Initializes a new instance of the <see cref="BulkPublishResponseFailedEntry{TValue}"/> class.
/// </summary>
/// <param name="entry">The entry that failed to be published.</param>
/// <param name="errorMessage">Error message as to why the entry failed to publish.</param>
public BulkPublishResponseFailedEntry(BulkPublishEntry<TValue> entry, string errorMessage)
{
this.Entry = entry;
this.ErrorMessage = errorMessage;
}
/// <summary>
/// The entry that has failed.
/// </summary>
public BulkPublishEntry<TValue> Entry { get; }
/// <summary>
/// Error message as to why the entry failed to publish.
/// </summary>
public string ErrorMessage { get; }
}
}

View File

@ -1,5 +1,5 @@
// ------------------------------------------------------------------------
// Copyright 2021 The Dapr Authors
// Copyright 2023 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at

View File

@ -209,6 +209,22 @@ namespace Dapr.Client
string topicName,
Dictionary<string, string> metadata,
CancellationToken cancellationToken = default);
/// <summary>
/// // Bulk Publishes multiple events to the specified topic.
/// </summary>
/// <param name="pubsubName">The name of the pubsub component to use.</param>
/// <param name="topicName">The name of the topic the request should be published to.</param>
/// <param name="events">The list of events to be published.</param>
/// <param name="metadata">The metadata to be set at the request level for the request.</param>
/// <param name="cancellationToken">A <see cref="CancellationToken" /> that can be used to cancel the operation.</param>
/// <returns>A <see cref="Task" /> that will complete when the operation has completed.</returns>
public abstract Task<BulkPublishResponse<TValue>> BulkPublishEventAsync<TValue>(
string pubsubName,
string topicName,
IReadOnlyList<TValue> events,
Dictionary<string, string> metadata = default,
CancellationToken cancellationToken = default);
/// <summary>
/// Publishes an event to the specified topic.

View File

@ -175,7 +175,82 @@ namespace Dapr.Client
}
}
/// <inheritdoc/>
public override Task<BulkPublishResponse<TValue>> BulkPublishEventAsync<TValue>(
string pubsubName,
string topicName,
IReadOnlyList<TValue> events,
Dictionary<string, string> metadata = default,
CancellationToken cancellationToken = default)
{
ArgumentVerifier.ThrowIfNullOrEmpty(pubsubName, nameof(pubsubName));
ArgumentVerifier.ThrowIfNullOrEmpty(topicName, nameof(topicName));
ArgumentVerifier.ThrowIfNull(events, nameof(events));
return MakeBulkPublishRequest(pubsubName, topicName, events, metadata, cancellationToken);
}
private async Task<BulkPublishResponse<TValue>> MakeBulkPublishRequest<TValue>(
string pubsubName,
string topicName,
IReadOnlyList<TValue> events,
Dictionary<string, string> metadata,
CancellationToken cancellationToken)
{
var envelope = new Autogenerated.BulkPublishRequest()
{
PubsubName = pubsubName,
Topic = topicName,
};
Dictionary<string, BulkPublishEntry<TValue>> entryMap = new Dictionary<string, BulkPublishEntry<TValue>>();
for (int counter = 0; counter < events.Count; counter++)
{
var entry = new Autogenerated.BulkPublishRequestEntry()
{
EntryId = counter.ToString(),
Event = TypeConverters.ToJsonByteString(events[counter], this.jsonSerializerOptions),
ContentType = events[counter] is CloudEvent ? Constants.ContentTypeCloudEvent : Constants.ContentTypeApplicationJson,
Metadata = {},
};
envelope.Entries.Add(entry);
entryMap.Add(counter.ToString(), new BulkPublishEntry<TValue>(
entry.EntryId, events[counter], entry.ContentType, entry.Metadata));
}
if (metadata != null)
{
foreach (var kvp in metadata)
{
envelope.Metadata.Add(kvp.Key, kvp.Value);
}
}
var options = CreateCallOptions(headers: null, cancellationToken);
try
{
var response = await client.BulkPublishEventAlpha1Async(envelope, options);
List<BulkPublishResponseFailedEntry<TValue>> failedEntries = new List<BulkPublishResponseFailedEntry<TValue>>();
foreach (var entry in response.FailedEntries)
{
BulkPublishResponseFailedEntry<TValue> domainEntry = new BulkPublishResponseFailedEntry<TValue>(
entryMap[entry.EntryId], entry.Error);
failedEntries.Add(domainEntry);
}
var bulkPublishResponse = new BulkPublishResponse<TValue>(failedEntries);
return bulkPublishResponse;
}
catch (RpcException ex)
{
throw new DaprException("Bulk Publish operation failed: the Dapr endpoint indicated a " +
"failure. See InnerException for details.", ex);
}
}
#endregion
#region InvokeBinding Apis

View File

@ -53,6 +53,9 @@ service Dapr {
// Publishes events to the specific topic.
rpc PublishEvent(PublishEventRequest) returns (google.protobuf.Empty) {}
// Bulk Publishes multiple events to the specified topic.
rpc BulkPublishEventAlpha1(BulkPublishRequest) returns (BulkPublishResponse) {}
// Invokes binding data to specific output bindings
rpc InvokeBinding(InvokeBindingRequest) returns (InvokeBindingResponse) {}
@ -287,6 +290,52 @@ message PublishEventRequest {
map<string, string> metadata = 5;
}
// BulkPublishRequest is the message to bulk publish events to pubsub topic
message BulkPublishRequest {
// The name of the pubsub component
string pubsub_name = 1;
// The pubsub topic
string topic = 2;
// The entries which contain the individual events and associated details to be published
repeated BulkPublishRequestEntry entries = 3;
// The request level metadata passing to to the pubsub components
map<string, string> metadata = 4;
}
// BulkPublishRequestEntry is the message containing the event to be bulk published
message BulkPublishRequestEntry {
// The request scoped unique ID referring to this message. Used to map status in response
string entry_id = 1;
// The event which will be published to the topic
bytes event = 2;
// The content type for the event
string content_type = 3;
// The event level metadata passing to the pubsub component
map<string, string> metadata = 4;
}
// BulkPublishResponse is the message returned from a BulkPublishEvent call
message BulkPublishResponse {
// The entries for different events that failed publish in the BulkPublishEvent call
repeated BulkPublishResponseFailedEntry failedEntries = 1;
}
// BulkPublishResponseEntry is the message mapping response status for each event in the BulkPublishEvent call
message BulkPublishResponseFailedEntry {
// The response scoped unique ID referring to this message
string entry_id = 1;
// The error message if any on failure
string error = 2;
}
// InvokeBindingRequest is the message to send data to output bindings
message InvokeBindingRequest {
// The name of the output binding to invoke.

View File

@ -0,0 +1,349 @@
// ------------------------------------------------------------------------
// Copyright 2023 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------
namespace Dapr.Client.Test
{
using System;
using System.Collections.Generic;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Autogenerated = Dapr.Client.Autogen.Grpc.v1;
using FluentAssertions;
using Grpc.Core;
using Moq;
using Xunit;
public class BulkPublishEventApiTest
{
const string TestPubsubName = "testpubsubname";
const string TestTopicName = "test";
const string TestContentType = "application/json";
static readonly List<string> bulkPublishData = new List<string>() { "hello", "world" };
[Fact]
public async Task BulkPublishEventAsync_CanPublishTopicWithEvents()
{
await using var client = TestClient.CreateForDaprClient();
var request = await client.CaptureGrpcRequestAsync(async daprClient =>
await daprClient.BulkPublishEventAsync(TestPubsubName, TestTopicName, bulkPublishData));
request.Dismiss();
var envelope = await request.GetRequestEnvelopeAsync<Autogenerated.BulkPublishRequest>();
envelope.Entries.Count.Should().Be(2);
envelope.PubsubName.Should().Be(TestPubsubName);
envelope.Topic.Should().Be(TestTopicName);
envelope.Metadata.Count.Should().Be(0);
var firstEntry = envelope.Entries[0];
firstEntry.EntryId.Should().Be("0");
firstEntry.ContentType.Should().Be(TestContentType);
firstEntry.Event.ToStringUtf8().Should()
.Be(JsonSerializer.Serialize(bulkPublishData[0], client.InnerClient.JsonSerializerOptions));
firstEntry.Metadata.Should().BeEmpty();
var secondEntry = envelope.Entries[1];
secondEntry.EntryId.Should().Be("1");
secondEntry.ContentType.Should().Be(TestContentType);
secondEntry.Event.ToStringUtf8().Should()
.Be(JsonSerializer.Serialize(bulkPublishData[1], client.InnerClient.JsonSerializerOptions));
secondEntry.Metadata.Should().BeEmpty();
// Create Response & Respond
var response = new Autogenerated.BulkPublishResponse
{
FailedEntries = { }
};
var bulkPublishResponse = await request.CompleteWithMessageAsync(response);
// Get response and validate
bulkPublishResponse.FailedEntries.Count.Should().Be(0);
}
[Fact]
public async Task BulkPublishEventAsync_CanPublishTopicWithEvents_WithMetadata()
{
await using var client = TestClient.CreateForDaprClient();
var metadata = new Dictionary<string, string> { { "key1", "value1" }, { "key2", "value2" } };
var request = await client.CaptureGrpcRequestAsync(async daprClient =>
await daprClient.BulkPublishEventAsync(TestPubsubName, TestTopicName, bulkPublishData,
metadata));
request.Dismiss();
var envelope = await request.GetRequestEnvelopeAsync<Autogenerated.BulkPublishRequest>();
envelope.Entries.Count.Should().Be(2);
envelope.PubsubName.Should().Be(TestPubsubName);
envelope.Topic.Should().Be(TestTopicName);
envelope.Metadata.Count.Should().Be(2);
envelope.Metadata.Keys.Contains("key1").Should().BeTrue();
envelope.Metadata.Keys.Contains("key2").Should().BeTrue();
envelope.Metadata["key1"].Should().Be("value1");
envelope.Metadata["key2"].Should().Be("value2");
var firstEntry = envelope.Entries[0];
firstEntry.EntryId.Should().Be("0");
firstEntry.ContentType.Should().Be(TestContentType);
firstEntry.Event.ToStringUtf8().Should()
.Be(JsonSerializer.Serialize(bulkPublishData[0], client.InnerClient.JsonSerializerOptions));
firstEntry.Metadata.Should().BeEmpty();
var secondEntry = envelope.Entries[1];
secondEntry.EntryId.Should().Be("1");
secondEntry.ContentType.Should().Be(TestContentType);
secondEntry.Event.ToStringUtf8().Should()
.Be(JsonSerializer.Serialize(bulkPublishData[1], client.InnerClient.JsonSerializerOptions));
secondEntry.Metadata.Should().BeEmpty();
// Create Response & Respond
var response = new Autogenerated.BulkPublishResponse
{
FailedEntries = { }
};
var bulkPublishResponse = await request.CompleteWithMessageAsync(response);
// Get response and validate
bulkPublishResponse.FailedEntries.Count.Should().Be(0);
}
[Fact]
public async Task BulkPublishEventAsync_CanPublishTopicWithNoContent()
{
await using var client = TestClient.CreateForDaprClient();
var request = await client.CaptureGrpcRequestAsync(async daprClient =>
await daprClient.BulkPublishEventAsync(TestPubsubName, TestTopicName, bulkPublishData,
null));
request.Dismiss();
var envelope = await request.GetRequestEnvelopeAsync<Autogenerated.BulkPublishRequest>();
envelope.Entries.Count.Should().Be(2);
envelope.PubsubName.Should().Be(TestPubsubName);
envelope.Topic.Should().Be(TestTopicName);
envelope.Metadata.Count.Should().Be(0);
var firstEntry = envelope.Entries[0];
firstEntry.EntryId.Should().Be("0");
firstEntry.ContentType.Should().Be(TestContentType);
firstEntry.Event.ToStringUtf8().Should()
.Be(JsonSerializer.Serialize(bulkPublishData[0], client.InnerClient.JsonSerializerOptions));
firstEntry.Metadata.Should().BeEmpty();
var secondEntry = envelope.Entries[1];
secondEntry.EntryId.Should().Be("1");
secondEntry.ContentType.Should().Be(TestContentType);
secondEntry.Event.ToStringUtf8().Should()
.Be(JsonSerializer.Serialize(bulkPublishData[1], client.InnerClient.JsonSerializerOptions));
secondEntry.Metadata.Should().BeEmpty();
// Create Response & Respond
var response = new Autogenerated.BulkPublishResponse
{
FailedEntries = { }
};
var bulkPublishResponse = await request.CompleteWithMessageAsync(response);
// Get response and validate
bulkPublishResponse.FailedEntries.Count.Should().Be(0);
}
[Fact]
public async Task BulkPublishEventAsync_CanPublishTopicWithNoContent_WithMetadata()
{
await using var client = TestClient.CreateForDaprClient();
var metadata = new Dictionary<string, string> { { "key1", "value1" }, { "key2", "value2" } };
var request = await client.CaptureGrpcRequestAsync(async daprClient =>
await daprClient.BulkPublishEventAsync(TestPubsubName, TestTopicName, bulkPublishData,
metadata));
request.Dismiss();
var envelope = await request.GetRequestEnvelopeAsync<Autogenerated.BulkPublishRequest>();
envelope.Entries.Count.Should().Be(2);
envelope.PubsubName.Should().Be(TestPubsubName);
envelope.Topic.Should().Be(TestTopicName);
envelope.Metadata.Count.Should().Be(2);
envelope.Metadata.Keys.Contains("key1").Should().BeTrue();
envelope.Metadata.Keys.Contains("key2").Should().BeTrue();
envelope.Metadata["key1"].Should().Be("value1");
envelope.Metadata["key2"].Should().Be("value2");
var firstEntry = envelope.Entries[0];
firstEntry.EntryId.Should().Be("0");
firstEntry.ContentType.Should().Be(TestContentType);
firstEntry.Event.ToStringUtf8().Should()
.Be(JsonSerializer.Serialize(bulkPublishData[0], client.InnerClient.JsonSerializerOptions));
firstEntry.Metadata.Should().BeEmpty();
var secondEntry = envelope.Entries[1];
secondEntry.EntryId.Should().Be("1");
secondEntry.ContentType.Should().Be(TestContentType);
secondEntry.Event.ToStringUtf8().Should()
.Be(JsonSerializer.Serialize(bulkPublishData[1], client.InnerClient.JsonSerializerOptions));
secondEntry.Metadata.Should().BeEmpty();
// Create Response & Respond
var response = new Autogenerated.BulkPublishResponse
{
FailedEntries = { }
};
var bulkPublishResponse = await request.CompleteWithMessageAsync(response);
// Get response and validate
bulkPublishResponse.FailedEntries.Count.Should().Be(0);
}
[Fact]
public async Task BulkPublishEventAsync_CanPublishTopicWithEventsObject()
{
await using var client = TestClient.CreateForDaprClient();
var bulkPublishDataObject = new Widget() { Size = "Big", Color = "Green" };
var request = await client.CaptureGrpcRequestAsync(async daprClient =>
await daprClient.BulkPublishEventAsync(TestPubsubName, TestTopicName,
new List<Widget> { bulkPublishDataObject }, null));
request.Dismiss();
var envelope = await request.GetRequestEnvelopeAsync<Autogenerated.BulkPublishRequest>();
envelope.Entries.Count.Should().Be(1);
envelope.PubsubName.Should().Be(TestPubsubName);
envelope.Topic.Should().Be(TestTopicName);
envelope.Metadata.Count.Should().Be(0);
var firstEntry = envelope.Entries[0];
firstEntry.EntryId.Should().Be("0");
firstEntry.ContentType.Should().Be(TestContentType);
firstEntry.Event.ToStringUtf8().Should()
.Be(JsonSerializer.Serialize(bulkPublishDataObject, client.InnerClient.JsonSerializerOptions));
firstEntry.Metadata.Should().BeEmpty();
// Create Response & Respond
var response = new Autogenerated.BulkPublishResponse
{
FailedEntries = { }
};
var bulkPublishResponse = await request.CompleteWithMessageAsync(response);
// Get response and validate
bulkPublishResponse.FailedEntries.Count.Should().Be(0);
}
[Fact]
public async Task BulkPublishEventAsync_WithCancelledToken()
{
await using var client = TestClient.CreateForDaprClient();
var cts = new CancellationTokenSource();
cts.Cancel();
await Assert.ThrowsAsync<OperationCanceledException>(async () =>
{
await client.InnerClient.BulkPublishEventAsync(TestPubsubName, TestTopicName, bulkPublishData,
null, cancellationToken: cts.Token);
});
}
[Fact]
public async Task BulkPublishEventAsync_WrapsRpcException()
{
var client = new MockClient();
var rpcStatus = new Status(StatusCode.Internal, "not gonna work");
var rpcException = new RpcException(rpcStatus, new Metadata(), "not gonna work");
// Setup the mock client to throw an Rpc Exception with the expected details info
client.Mock
.Setup(m => m.BulkPublishEventAlpha1Async(
It.IsAny<Autogen.Grpc.v1.BulkPublishRequest>(),
It.IsAny<CallOptions>()))
.Throws(rpcException);
var ex = await Assert.ThrowsAsync<DaprException>(async () =>
{
await client.DaprClient.BulkPublishEventAsync(TestPubsubName, TestTopicName,
bulkPublishData);
});
Assert.Same(rpcException, ex.InnerException);
}
[Fact]
public async Task BulkPublishEventAsync_FailureResponse()
{
await using var client = TestClient.CreateForDaprClient();
var metadata = new Dictionary<string, string> { { "key1", "value1" }, { "key2", "value2" } };
var request = await client.CaptureGrpcRequestAsync(async daprClient =>
{
return await daprClient.BulkPublishEventAsync(TestPubsubName, TestTopicName,
bulkPublishData, metadata);
});
request.Dismiss();
// Create Response & Respond
var response = new Autogenerated.BulkPublishResponse
{
FailedEntries =
{
new Autogenerated.BulkPublishResponseFailedEntry
{
EntryId = "0",
Error = "Failed to publish",
},
new Autogenerated.BulkPublishResponseFailedEntry
{
EntryId = "1",
Error = "Failed to publish",
},
}
};
var bulkPublishResponse = await request.CompleteWithMessageAsync(response);
// Get response and validate
bulkPublishResponse.FailedEntries[0].Entry.EntryId.Should().Be("0");
bulkPublishResponse.FailedEntries[0].ErrorMessage.Should().Be("Failed to publish");
bulkPublishResponse.FailedEntries[1].Entry.EntryId.Should().Be("1");
bulkPublishResponse.FailedEntries[1].ErrorMessage.Should().Be("Failed to publish");
}
private class Widget
{
public string Size { get; set; }
public string Color { get; set; }
}
}
}