diff --git a/.gitignore b/.gitignore index 9e233096..6d39d845 100644 --- a/.gitignore +++ b/.gitignore @@ -101,3 +101,6 @@ coverage.json # Examples bloat examples/**/tmp + +# MacOS +**/.DS_Store diff --git a/all.sln b/all.sln index 6339c47b..b252ced3 100644 --- a/all.sln +++ b/all.sln @@ -93,6 +93,12 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Workflow", "Workflow", "{BF EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WorkflowWebApp", "examples\Workflow\WorkflowWebApp\WorkflowWebApp.csproj", "{5C61ABED-7623-4C28-A5C9-C5972A0F669C}" EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "PublishSubscribe", "PublishSubscribe", "{0EF6EA64-D7C3-420D-9890-EAE8D54A57E6}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "PublishEventExample", "examples\Client\PublishSubscribe\PublishEventExample\PublishEventExample.csproj", "{4A175C27-EAFE-47E7-90F6-873B37863656}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BulkPublishEventExample", "examples\Client\PublishSubscribe\BulkPublishEventExample\BulkPublishEventExample.csproj", "{DDC41278-FB60-403A-B969-2AEBD7C2D83C}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -191,10 +197,6 @@ Global {8B570E70-0E73-4042-A4B6-1CC3CC782A65}.Debug|Any CPU.Build.0 = Debug|Any CPU {8B570E70-0E73-4042-A4B6-1CC3CC782A65}.Release|Any CPU.ActiveCfg = Release|Any CPU {8B570E70-0E73-4042-A4B6-1CC3CC782A65}.Release|Any CPU.Build.0 = Release|Any CPU - {DE6913E3-E5D9-4D1D-95F9-9FED87BD09BC}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {DE6913E3-E5D9-4D1D-95F9-9FED87BD09BC}.Debug|Any CPU.Build.0 = Debug|Any CPU - {DE6913E3-E5D9-4D1D-95F9-9FED87BD09BC}.Release|Any CPU.ActiveCfg = Release|Any CPU - {DE6913E3-E5D9-4D1D-95F9-9FED87BD09BC}.Release|Any CPU.Build.0 = Release|Any CPU {4AA9E7B7-36BF-4AAE-BFA3-C9CE8740F4A0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {4AA9E7B7-36BF-4AAE-BFA3-C9CE8740F4A0}.Debug|Any CPU.Build.0 = Debug|Any CPU {4AA9E7B7-36BF-4AAE-BFA3-C9CE8740F4A0}.Release|Any CPU.ActiveCfg = Release|Any CPU @@ -231,6 +233,14 @@ Global {5C61ABED-7623-4C28-A5C9-C5972A0F669C}.Debug|Any CPU.Build.0 = Debug|Any CPU {5C61ABED-7623-4C28-A5C9-C5972A0F669C}.Release|Any CPU.ActiveCfg = Release|Any CPU {5C61ABED-7623-4C28-A5C9-C5972A0F669C}.Release|Any CPU.Build.0 = Release|Any CPU + {4A175C27-EAFE-47E7-90F6-873B37863656}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {4A175C27-EAFE-47E7-90F6-873B37863656}.Debug|Any CPU.Build.0 = Debug|Any CPU + {4A175C27-EAFE-47E7-90F6-873B37863656}.Release|Any CPU.ActiveCfg = Release|Any CPU + {4A175C27-EAFE-47E7-90F6-873B37863656}.Release|Any CPU.Build.0 = Release|Any CPU + {DDC41278-FB60-403A-B969-2AEBD7C2D83C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {DDC41278-FB60-403A-B969-2AEBD7C2D83C}.Debug|Any CPU.Build.0 = Debug|Any CPU + {DDC41278-FB60-403A-B969-2AEBD7C2D83C}.Release|Any CPU.ActiveCfg = Release|Any CPU + {DDC41278-FB60-403A-B969-2AEBD7C2D83C}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -262,7 +272,6 @@ Global {A7F41094-8648-446B-AECD-DCC2CC871F73} = {D687DDC4-66C5-4667-9E3A-FD8B78ECAA78} {F70AC78E-8925-4770-832A-2FC67A620EB2} = {A7F41094-8648-446B-AECD-DCC2CC871F73} {8B570E70-0E73-4042-A4B6-1CC3CC782A65} = {A7F41094-8648-446B-AECD-DCC2CC871F73} - {DE6913E3-E5D9-4D1D-95F9-9FED87BD09BC} = {A7F41094-8648-446B-AECD-DCC2CC871F73} {4AA9E7B7-36BF-4AAE-BFA3-C9CE8740F4A0} = {DD020B34-460F-455F-8D17-CF4A949F100B} {345FC3FB-D1E9-4AE8-9052-17D20AB01FA2} = {DD020B34-460F-455F-8D17-CF4A949F100B} {2AED1542-A8ED-488D-B6D0-E16AB5D6EF6C} = {DD020B34-460F-455F-8D17-CF4A949F100B} @@ -273,6 +282,9 @@ Global {07578B6C-9B96-4B3D-BA2E-7800EFCA7F99} = {27C5D71D-0721-4221-9286-B94AB07B58CF} {BF3ED6BF-ADF3-4D25-8E89-02FB8D945CA9} = {D687DDC4-66C5-4667-9E3A-FD8B78ECAA78} {5C61ABED-7623-4C28-A5C9-C5972A0F669C} = {BF3ED6BF-ADF3-4D25-8E89-02FB8D945CA9} + {0EF6EA64-D7C3-420D-9890-EAE8D54A57E6} = {A7F41094-8648-446B-AECD-DCC2CC871F73} + {4A175C27-EAFE-47E7-90F6-873B37863656} = {0EF6EA64-D7C3-420D-9890-EAE8D54A57E6} + {DDC41278-FB60-403A-B969-2AEBD7C2D83C} = {0EF6EA64-D7C3-420D-9890-EAE8D54A57E6} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {65220BF2-EAE1-4CB2-AA58-EBE80768CB40} diff --git a/examples/AspNetCore/ControllerSample/Controllers/SampleController.cs b/examples/AspNetCore/ControllerSample/Controllers/SampleController.cs index f5d40d38..e683ecc0 100644 --- a/examples/AspNetCore/ControllerSample/Controllers/SampleController.cs +++ b/examples/AspNetCore/ControllerSample/Controllers/SampleController.cs @@ -81,7 +81,7 @@ namespace ControllerSample.Controllers } state.Value.Balance += transaction.Amount; - logger.LogInformation("Balance is {0}", state.Value.Balance); + logger.LogInformation("Balance for Id {0} is {1}",state.Value.Id, state.Value.Balance); await state.SaveAsync(); return state.Value; } diff --git a/examples/Client/PublishSubscribe/BulkPublishEventExample/BulkPublishEventExample.cs b/examples/Client/PublishSubscribe/BulkPublishEventExample/BulkPublishEventExample.cs new file mode 100644 index 00000000..34361845 --- /dev/null +++ b/examples/Client/PublishSubscribe/BulkPublishEventExample/BulkPublishEventExample.cs @@ -0,0 +1,62 @@ +// ------------------------------------------------------------------------ +// Copyright 2023 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; +using Dapr.Client; + +namespace Samples.Client +{ + public class BulkPublishEventExample : Example + { + private const string PubsubName = "pubsub"; + private const string TopicName = "deposit"; + + IReadOnlyList BulkPublishData = new List() { + new { Id = "17", Amount = 10m }, + new { Id = "18", Amount = 20m }, + new { Id = "19", Amount = 30m } + }; + + public override string DisplayName => "Bulk Publishing Events"; + + public override async Task RunAsync(CancellationToken cancellationToken) + { + using var client = new DaprClientBuilder().Build(); + + var res = await client.BulkPublishEventAsync(PubsubName, TopicName, + BulkPublishData); + + if (res != null) { + if (res.FailedEntries.Count > 0) + { + Console.WriteLine("Some events failed to be published!"); + + foreach (var failedEntry in res.FailedEntries) + { + Console.WriteLine("EntryId : " + failedEntry.Entry.EntryId + " Error message : " + + failedEntry.ErrorMessage); + } + } + else + { + Console.WriteLine("Published multiple deposit events!"); + } + } else { + throw new Exception("null response from dapr"); + } + } + } +} diff --git a/examples/Client/PublishSubscribe/BulkPublishEventExample/BulkPublishEventExample.csproj b/examples/Client/PublishSubscribe/BulkPublishEventExample/BulkPublishEventExample.csproj new file mode 100644 index 00000000..5c05faf9 --- /dev/null +++ b/examples/Client/PublishSubscribe/BulkPublishEventExample/BulkPublishEventExample.csproj @@ -0,0 +1,24 @@ + + + + Exe + netcoreapp3.1 + Samples.Client + enable + + + + + + + + + + + + + + + + + diff --git a/examples/Client/PublishSubscribe/BulkPublishEventExample/Example.cs b/examples/Client/PublishSubscribe/BulkPublishEventExample/Example.cs new file mode 100644 index 00000000..e12e38d1 --- /dev/null +++ b/examples/Client/PublishSubscribe/BulkPublishEventExample/Example.cs @@ -0,0 +1,25 @@ +// ------------------------------------------------------------------------ +// Copyright 2023 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System.Threading; +using System.Threading.Tasks; + +namespace Samples.Client +{ + public abstract class Example + { + public abstract string DisplayName { get; } + + public abstract Task RunAsync(CancellationToken cancellationToken); + } +} diff --git a/examples/Client/PublishSubscribe/BulkPublishEventExample/Program.cs b/examples/Client/PublishSubscribe/BulkPublishEventExample/Program.cs new file mode 100644 index 00000000..47698235 --- /dev/null +++ b/examples/Client/PublishSubscribe/BulkPublishEventExample/Program.cs @@ -0,0 +1,47 @@ +// ------------------------------------------------------------------------ +// Copyright 2023 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace Samples.Client +{ + class Program + { + private static readonly Example[] Examples = new Example[] + { + new BulkPublishEventExample(), + }; + + static async Task Main(string[] args) + { + if (args.Length > 0 && int.TryParse(args[0], out var index) && index >= 0 && index < Examples.Length) + { + var cts = new CancellationTokenSource(); + Console.CancelKeyPress += (object? sender, ConsoleCancelEventArgs e) => cts.Cancel(); + + await Examples[index].RunAsync(cts.Token); + return 0; + } + + Console.WriteLine("Hello, please choose a sample to run:"); + for (var i = 0; i < Examples.Length; i++) + { + Console.WriteLine($"{i}: {Examples[i].DisplayName}"); + } + Console.WriteLine(); + return 0; + } + } +} diff --git a/examples/Client/PublishSubscribe/BulkPublishEventExample/README.md b/examples/Client/PublishSubscribe/BulkPublishEventExample/README.md new file mode 100644 index 00000000..dfcc99ca --- /dev/null +++ b/examples/Client/PublishSubscribe/BulkPublishEventExample/README.md @@ -0,0 +1,43 @@ +# Dapr .NET SDK Bulk publish example + +## Prerequisites + +- [.NET Core 3.1 or .NET 5+](https://dotnet.microsoft.com/download) installed +- [Dapr CLI](https://docs.dapr.io/getting-started/install-dapr-cli/) +- [Initialized Dapr environment](https://docs.dapr.io/getting-started/install-dapr-selfhost/) +- [Dapr .NET SDK](https://docs.dapr.io/developing-applications/sdks/dotnet/) + +## Running the example + +### Run the subscriber + +Navigate to the [ControllerSample](https://github.com/dapr/dotnet-sdk/tree/master/examples/AspNetCore/ControllerSample) directory and run the subscriber. It will subscribe to the `deposit` topic that the publisher will publish messages to. + +```sh +dapr run --app-id controller --app-port 5000 -- dotnet run +``` + +### Run the bulk publisher +After running the subscriber, run the bulk publisher. To run the sample locally, run this command in this project root directory: + +```sh +dapr run --app-id DaprClient -- dotnet run +``` + +Running the following command will output a list of the samples included: + +```sh +dapr run --app-id DaprClient -- dotnet run +``` + +Press Ctrl+C to exit, and then run the command again and provide a sample number to run the samples. + +For example run this command to run the 0th sample from the list produced earlier. + +```sh +dapr run --app-id DaprClient -- dotnet run 0 +``` + +## Publishing Bulk Pub/Sub Events + +See [BulkPublishEventExample.cs](./BulkPublishEventExample.cs) for an example using the `DaprClient` to publish a pub/sub event. diff --git a/examples/Client/PublishSubscribe/Example.cs b/examples/Client/PublishSubscribe/PublishEventExample/Example.cs similarity index 96% rename from examples/Client/PublishSubscribe/Example.cs rename to examples/Client/PublishSubscribe/PublishEventExample/Example.cs index ffc168c0..fbe78124 100644 --- a/examples/Client/PublishSubscribe/Example.cs +++ b/examples/Client/PublishSubscribe/PublishEventExample/Example.cs @@ -1,5 +1,5 @@ // ------------------------------------------------------------------------ -// Copyright 2021 The Dapr Authors +// Copyright 2023 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at diff --git a/examples/Client/PublishSubscribe/Program.cs b/examples/Client/PublishSubscribe/PublishEventExample/Program.cs similarity index 96% rename from examples/Client/PublishSubscribe/Program.cs rename to examples/Client/PublishSubscribe/PublishEventExample/Program.cs index 3700535a..af74ad90 100644 --- a/examples/Client/PublishSubscribe/Program.cs +++ b/examples/Client/PublishSubscribe/PublishEventExample/Program.cs @@ -1,5 +1,5 @@ // ------------------------------------------------------------------------ -// Copyright 2021 The Dapr Authors +// Copyright 2023 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at @@ -42,7 +42,7 @@ namespace Samples.Client Console.WriteLine($"{i}: {Examples[i].DisplayName}"); } Console.WriteLine(); - return 1; + return 0; } } } diff --git a/examples/Client/PublishSubscribe/PublishEventExample/PublishBytesExample.cs b/examples/Client/PublishSubscribe/PublishEventExample/PublishBytesExample.cs new file mode 100644 index 00000000..3334421c --- /dev/null +++ b/examples/Client/PublishSubscribe/PublishEventExample/PublishBytesExample.cs @@ -0,0 +1,39 @@ +// ------------------------------------------------------------------------ +// Copyright 2023 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System; +using System.Collections.Generic; +using System.Net.Mime; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using Dapr.Client; + +namespace Samples.Client +{ + public class PublishBytesExample : Example + { + public override string DisplayName => "Publish Bytes"; + + public async override Task RunAsync(CancellationToken cancellationToken) + { + using var client = new DaprClientBuilder().Build(); + + var transaction = new { Id = "17", Amount = 30m }; + var content = JsonSerializer.SerializeToUtf8Bytes(transaction); + + await client.PublishByteEventAsync(pubsubName, "deposit", content.AsMemory(), MediaTypeNames.Application.Json, new Dictionary { }, cancellationToken); + Console.WriteLine("Published deposit event!"); + } + } +} diff --git a/examples/Client/PublishSubscribe/PublishEventExample.cs b/examples/Client/PublishSubscribe/PublishEventExample/PublishEventExample.cs similarity index 97% rename from examples/Client/PublishSubscribe/PublishEventExample.cs rename to examples/Client/PublishSubscribe/PublishEventExample/PublishEventExample.cs index 2153639e..9d34ae50 100644 --- a/examples/Client/PublishSubscribe/PublishEventExample.cs +++ b/examples/Client/PublishSubscribe/PublishEventExample/PublishEventExample.cs @@ -1,5 +1,5 @@ // ------------------------------------------------------------------------ -// Copyright 2021 The Dapr Authors +// Copyright 2023 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at diff --git a/examples/Client/PublishSubscribe/PublishSubscribe.csproj b/examples/Client/PublishSubscribe/PublishEventExample/PublishEventExample.csproj similarity index 70% rename from examples/Client/PublishSubscribe/PublishSubscribe.csproj rename to examples/Client/PublishSubscribe/PublishEventExample/PublishEventExample.csproj index 790bfc53..f7d33587 100644 --- a/examples/Client/PublishSubscribe/PublishSubscribe.csproj +++ b/examples/Client/PublishSubscribe/PublishEventExample/PublishEventExample.csproj @@ -8,11 +8,11 @@ - + - + diff --git a/examples/Client/PublishSubscribe/README.md b/examples/Client/PublishSubscribe/PublishEventExample/README.md similarity index 100% rename from examples/Client/PublishSubscribe/README.md rename to examples/Client/PublishSubscribe/PublishEventExample/README.md diff --git a/src/Dapr.Client/BulkPublishEntry.cs b/src/Dapr.Client/BulkPublishEntry.cs new file mode 100644 index 00000000..d6b9a549 --- /dev/null +++ b/src/Dapr.Client/BulkPublishEntry.cs @@ -0,0 +1,61 @@ +// ------------------------------------------------------------------------ +// Copyright 2023 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System.Collections.Generic; + +namespace Dapr.Client +{ + /// + /// Class representing an entry in the BulkPublishRequest. + /// + /// The data type of the value. + public class BulkPublishEntry + { + /// + /// Initializes a new instance of the class. + /// + /// A request scoped ID uniquely identifying this entry in the BulkPublishRequest. + /// Event to be published. + /// Content Type of the event to be published. + /// Metadata for the event. + public BulkPublishEntry(string entryId, TValue eventData, string contentType, IReadOnlyDictionary metadata = default) + { + this.EntryId = entryId; + this.EventData = eventData; + this.ContentType = contentType; + this.Metadata = metadata; + } + + /// + /// The ID uniquely identifying this particular request entry across the request and scoped for this request only. + /// + public string EntryId { get; } + + /// + /// The event to be published. + /// + public TValue EventData { get; } + + /// + /// The content type of the event to be published. + /// + public string ContentType { get; } + + /// + /// The metadata set for this particular event. + /// Any particular values in this metadata overrides the request metadata present in BulkPublishRequest. + /// + public IReadOnlyDictionary Metadata { get; } + + } +} diff --git a/src/Dapr.Client/BulkPublishResponse.cs b/src/Dapr.Client/BulkPublishResponse.cs new file mode 100644 index 00000000..f37c35aa --- /dev/null +++ b/src/Dapr.Client/BulkPublishResponse.cs @@ -0,0 +1,37 @@ +// ------------------------------------------------------------------------ +// Copyright 2023 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +using System.Collections.Generic; + +namespace Dapr.Client +{ + /// + /// Class representing the response returned on bulk publishing events. + /// + public class BulkPublishResponse + { + /// + /// Initializes a new instance of the class. + /// + /// The List of BulkPublishResponseEntries representing the list of events that failed to be published. + public BulkPublishResponse(List> failedEntries) + { + this.FailedEntries = failedEntries; + } + + /// + /// The List of BulkPublishResponseFailedEntry objects that have failed to publish. + /// + public List> FailedEntries { get; } + } +} diff --git a/src/Dapr.Client/BulkPublishResponseFailedEntry.cs b/src/Dapr.Client/BulkPublishResponseFailedEntry.cs new file mode 100644 index 00000000..e8a46c9b --- /dev/null +++ b/src/Dapr.Client/BulkPublishResponseFailedEntry.cs @@ -0,0 +1,42 @@ +// ------------------------------------------------------------------------ +// Copyright 2023 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +namespace Dapr.Client +{ + /// + /// Class representing the status of each event that was published using BulkPublishRequest. + /// + public class BulkPublishResponseFailedEntry + { + /// + /// Initializes a new instance of the class. + /// + /// The entry that failed to be published. + /// Error message as to why the entry failed to publish. + public BulkPublishResponseFailedEntry(BulkPublishEntry entry, string errorMessage) + { + this.Entry = entry; + this.ErrorMessage = errorMessage; + } + + /// + /// The entry that has failed. + /// + public BulkPublishEntry Entry { get; } + + /// + /// Error message as to why the entry failed to publish. + /// + public string ErrorMessage { get; } + } +} diff --git a/src/Dapr.Client/ConfigurationSource.cs b/src/Dapr.Client/ConfigurationSource.cs index 3beb9e02..ca3f8d6d 100644 --- a/src/Dapr.Client/ConfigurationSource.cs +++ b/src/Dapr.Client/ConfigurationSource.cs @@ -1,5 +1,5 @@ // ------------------------------------------------------------------------ -// Copyright 2021 The Dapr Authors +// Copyright 2023 The Dapr Authors // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at diff --git a/src/Dapr.Client/DaprClient.cs b/src/Dapr.Client/DaprClient.cs index 60df4255..ef90d5f5 100644 --- a/src/Dapr.Client/DaprClient.cs +++ b/src/Dapr.Client/DaprClient.cs @@ -209,6 +209,22 @@ namespace Dapr.Client string topicName, Dictionary metadata, CancellationToken cancellationToken = default); + + /// + /// // Bulk Publishes multiple events to the specified topic. + /// + /// The name of the pubsub component to use. + /// The name of the topic the request should be published to. + /// The list of events to be published. + /// The metadata to be set at the request level for the request. + /// A that can be used to cancel the operation. + /// A that will complete when the operation has completed. + public abstract Task> BulkPublishEventAsync( + string pubsubName, + string topicName, + IReadOnlyList events, + Dictionary metadata = default, + CancellationToken cancellationToken = default); /// /// Publishes an event to the specified topic. diff --git a/src/Dapr.Client/DaprClientGrpc.cs b/src/Dapr.Client/DaprClientGrpc.cs index 50f4f799..28df698d 100644 --- a/src/Dapr.Client/DaprClientGrpc.cs +++ b/src/Dapr.Client/DaprClientGrpc.cs @@ -175,7 +175,82 @@ namespace Dapr.Client } } + /// + public override Task> BulkPublishEventAsync( + string pubsubName, + string topicName, + IReadOnlyList events, + Dictionary metadata = default, + CancellationToken cancellationToken = default) + { + ArgumentVerifier.ThrowIfNullOrEmpty(pubsubName, nameof(pubsubName)); + ArgumentVerifier.ThrowIfNullOrEmpty(topicName, nameof(topicName)); + ArgumentVerifier.ThrowIfNull(events, nameof(events)); + return MakeBulkPublishRequest(pubsubName, topicName, events, metadata, cancellationToken); + } + + private async Task> MakeBulkPublishRequest( + string pubsubName, + string topicName, + IReadOnlyList events, + Dictionary metadata, + CancellationToken cancellationToken) + { + var envelope = new Autogenerated.BulkPublishRequest() + { + PubsubName = pubsubName, + Topic = topicName, + }; + + Dictionary> entryMap = new Dictionary>(); + for (int counter = 0; counter < events.Count; counter++) + { + var entry = new Autogenerated.BulkPublishRequestEntry() + { + EntryId = counter.ToString(), + Event = TypeConverters.ToJsonByteString(events[counter], this.jsonSerializerOptions), + ContentType = events[counter] is CloudEvent ? Constants.ContentTypeCloudEvent : Constants.ContentTypeApplicationJson, + Metadata = {}, + }; + envelope.Entries.Add(entry); + entryMap.Add(counter.ToString(), new BulkPublishEntry( + entry.EntryId, events[counter], entry.ContentType, entry.Metadata)); + } + + if (metadata != null) + { + foreach (var kvp in metadata) + { + envelope.Metadata.Add(kvp.Key, kvp.Value); + } + } + + var options = CreateCallOptions(headers: null, cancellationToken); + + try + { + var response = await client.BulkPublishEventAlpha1Async(envelope, options); + + List> failedEntries = new List>(); + + foreach (var entry in response.FailedEntries) + { + BulkPublishResponseFailedEntry domainEntry = new BulkPublishResponseFailedEntry( + entryMap[entry.EntryId], entry.Error); + failedEntries.Add(domainEntry); + } + + var bulkPublishResponse = new BulkPublishResponse(failedEntries); + + return bulkPublishResponse; + } + catch (RpcException ex) + { + throw new DaprException("Bulk Publish operation failed: the Dapr endpoint indicated a " + + "failure. See InnerException for details.", ex); + } + } #endregion #region InvokeBinding Apis diff --git a/src/Dapr.Client/Protos/dapr/proto/dapr/v1/dapr.proto b/src/Dapr.Client/Protos/dapr/proto/dapr/v1/dapr.proto index b8325628..b5bd00db 100644 --- a/src/Dapr.Client/Protos/dapr/proto/dapr/v1/dapr.proto +++ b/src/Dapr.Client/Protos/dapr/proto/dapr/v1/dapr.proto @@ -53,6 +53,9 @@ service Dapr { // Publishes events to the specific topic. rpc PublishEvent(PublishEventRequest) returns (google.protobuf.Empty) {} + // Bulk Publishes multiple events to the specified topic. + rpc BulkPublishEventAlpha1(BulkPublishRequest) returns (BulkPublishResponse) {} + // Invokes binding data to specific output bindings rpc InvokeBinding(InvokeBindingRequest) returns (InvokeBindingResponse) {} @@ -287,6 +290,52 @@ message PublishEventRequest { map metadata = 5; } +// BulkPublishRequest is the message to bulk publish events to pubsub topic +message BulkPublishRequest { + // The name of the pubsub component + string pubsub_name = 1; + + // The pubsub topic + string topic = 2; + + // The entries which contain the individual events and associated details to be published + repeated BulkPublishRequestEntry entries = 3; + + // The request level metadata passing to to the pubsub components + map metadata = 4; +} + +// BulkPublishRequestEntry is the message containing the event to be bulk published +message BulkPublishRequestEntry { + // The request scoped unique ID referring to this message. Used to map status in response + string entry_id = 1; + + // The event which will be published to the topic + bytes event = 2; + + // The content type for the event + string content_type = 3; + + // The event level metadata passing to the pubsub component + map metadata = 4; +} + +// BulkPublishResponse is the message returned from a BulkPublishEvent call +message BulkPublishResponse { + // The entries for different events that failed publish in the BulkPublishEvent call + repeated BulkPublishResponseFailedEntry failedEntries = 1; +} + +// BulkPublishResponseEntry is the message mapping response status for each event in the BulkPublishEvent call +message BulkPublishResponseFailedEntry { + + // The response scoped unique ID referring to this message + string entry_id = 1; + + // The error message if any on failure + string error = 2; +} + // InvokeBindingRequest is the message to send data to output bindings message InvokeBindingRequest { // The name of the output binding to invoke. diff --git a/test/Dapr.Client.Test/BulkPublishEventApiTest.cs b/test/Dapr.Client.Test/BulkPublishEventApiTest.cs new file mode 100644 index 00000000..74d617a2 --- /dev/null +++ b/test/Dapr.Client.Test/BulkPublishEventApiTest.cs @@ -0,0 +1,349 @@ +// ------------------------------------------------------------------------ +// Copyright 2023 The Dapr Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// http://www.apache.org/licenses/LICENSE-2.0 +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ------------------------------------------------------------------------ + +namespace Dapr.Client.Test +{ + using System; + using System.Collections.Generic; + using System.Text.Json; + using System.Threading; + using System.Threading.Tasks; + using Autogenerated = Dapr.Client.Autogen.Grpc.v1; + using FluentAssertions; + using Grpc.Core; + using Moq; + using Xunit; + + public class BulkPublishEventApiTest + { + const string TestPubsubName = "testpubsubname"; + const string TestTopicName = "test"; + const string TestContentType = "application/json"; + static readonly List bulkPublishData = new List() { "hello", "world" }; + + [Fact] + public async Task BulkPublishEventAsync_CanPublishTopicWithEvents() + { + await using var client = TestClient.CreateForDaprClient(); + + var request = await client.CaptureGrpcRequestAsync(async daprClient => + await daprClient.BulkPublishEventAsync(TestPubsubName, TestTopicName, bulkPublishData)); + + request.Dismiss(); + + var envelope = await request.GetRequestEnvelopeAsync(); + + envelope.Entries.Count.Should().Be(2); + envelope.PubsubName.Should().Be(TestPubsubName); + envelope.Topic.Should().Be(TestTopicName); + envelope.Metadata.Count.Should().Be(0); + + var firstEntry = envelope.Entries[0]; + + firstEntry.EntryId.Should().Be("0"); + firstEntry.ContentType.Should().Be(TestContentType); + firstEntry.Event.ToStringUtf8().Should() + .Be(JsonSerializer.Serialize(bulkPublishData[0], client.InnerClient.JsonSerializerOptions)); + firstEntry.Metadata.Should().BeEmpty(); + + var secondEntry = envelope.Entries[1]; + + secondEntry.EntryId.Should().Be("1"); + secondEntry.ContentType.Should().Be(TestContentType); + secondEntry.Event.ToStringUtf8().Should() + .Be(JsonSerializer.Serialize(bulkPublishData[1], client.InnerClient.JsonSerializerOptions)); + secondEntry.Metadata.Should().BeEmpty(); + + // Create Response & Respond + var response = new Autogenerated.BulkPublishResponse + { + FailedEntries = { } + }; + var bulkPublishResponse = await request.CompleteWithMessageAsync(response); + + // Get response and validate + bulkPublishResponse.FailedEntries.Count.Should().Be(0); + } + + [Fact] + public async Task BulkPublishEventAsync_CanPublishTopicWithEvents_WithMetadata() + { + await using var client = TestClient.CreateForDaprClient(); + + var metadata = new Dictionary { { "key1", "value1" }, { "key2", "value2" } }; + + var request = await client.CaptureGrpcRequestAsync(async daprClient => + await daprClient.BulkPublishEventAsync(TestPubsubName, TestTopicName, bulkPublishData, + metadata)); + + request.Dismiss(); + + var envelope = await request.GetRequestEnvelopeAsync(); + + envelope.Entries.Count.Should().Be(2); + envelope.PubsubName.Should().Be(TestPubsubName); + envelope.Topic.Should().Be(TestTopicName); + + envelope.Metadata.Count.Should().Be(2); + envelope.Metadata.Keys.Contains("key1").Should().BeTrue(); + envelope.Metadata.Keys.Contains("key2").Should().BeTrue(); + envelope.Metadata["key1"].Should().Be("value1"); + envelope.Metadata["key2"].Should().Be("value2"); + + var firstEntry = envelope.Entries[0]; + + firstEntry.EntryId.Should().Be("0"); + firstEntry.ContentType.Should().Be(TestContentType); + firstEntry.Event.ToStringUtf8().Should() + .Be(JsonSerializer.Serialize(bulkPublishData[0], client.InnerClient.JsonSerializerOptions)); + firstEntry.Metadata.Should().BeEmpty(); + + var secondEntry = envelope.Entries[1]; + + secondEntry.EntryId.Should().Be("1"); + secondEntry.ContentType.Should().Be(TestContentType); + secondEntry.Event.ToStringUtf8().Should() + .Be(JsonSerializer.Serialize(bulkPublishData[1], client.InnerClient.JsonSerializerOptions)); + secondEntry.Metadata.Should().BeEmpty(); + + // Create Response & Respond + var response = new Autogenerated.BulkPublishResponse + { + FailedEntries = { } + }; + var bulkPublishResponse = await request.CompleteWithMessageAsync(response); + + // Get response and validate + bulkPublishResponse.FailedEntries.Count.Should().Be(0); + } + + [Fact] + public async Task BulkPublishEventAsync_CanPublishTopicWithNoContent() + { + await using var client = TestClient.CreateForDaprClient(); + + var request = await client.CaptureGrpcRequestAsync(async daprClient => + await daprClient.BulkPublishEventAsync(TestPubsubName, TestTopicName, bulkPublishData, + null)); + + request.Dismiss(); + + var envelope = await request.GetRequestEnvelopeAsync(); + envelope.Entries.Count.Should().Be(2); + envelope.PubsubName.Should().Be(TestPubsubName); + envelope.Topic.Should().Be(TestTopicName); + envelope.Metadata.Count.Should().Be(0); + + var firstEntry = envelope.Entries[0]; + + firstEntry.EntryId.Should().Be("0"); + firstEntry.ContentType.Should().Be(TestContentType); + firstEntry.Event.ToStringUtf8().Should() + .Be(JsonSerializer.Serialize(bulkPublishData[0], client.InnerClient.JsonSerializerOptions)); + firstEntry.Metadata.Should().BeEmpty(); + + var secondEntry = envelope.Entries[1]; + + secondEntry.EntryId.Should().Be("1"); + secondEntry.ContentType.Should().Be(TestContentType); + secondEntry.Event.ToStringUtf8().Should() + .Be(JsonSerializer.Serialize(bulkPublishData[1], client.InnerClient.JsonSerializerOptions)); + secondEntry.Metadata.Should().BeEmpty(); + + // Create Response & Respond + var response = new Autogenerated.BulkPublishResponse + { + FailedEntries = { } + }; + var bulkPublishResponse = await request.CompleteWithMessageAsync(response); + + // Get response and validate + bulkPublishResponse.FailedEntries.Count.Should().Be(0); + } + + [Fact] + public async Task BulkPublishEventAsync_CanPublishTopicWithNoContent_WithMetadata() + { + await using var client = TestClient.CreateForDaprClient(); + + var metadata = new Dictionary { { "key1", "value1" }, { "key2", "value2" } }; + + var request = await client.CaptureGrpcRequestAsync(async daprClient => + await daprClient.BulkPublishEventAsync(TestPubsubName, TestTopicName, bulkPublishData, + metadata)); + + request.Dismiss(); + + var envelope = await request.GetRequestEnvelopeAsync(); + envelope.Entries.Count.Should().Be(2); + envelope.PubsubName.Should().Be(TestPubsubName); + envelope.Topic.Should().Be(TestTopicName); + + envelope.Metadata.Count.Should().Be(2); + envelope.Metadata.Keys.Contains("key1").Should().BeTrue(); + envelope.Metadata.Keys.Contains("key2").Should().BeTrue(); + envelope.Metadata["key1"].Should().Be("value1"); + envelope.Metadata["key2"].Should().Be("value2"); + + var firstEntry = envelope.Entries[0]; + + firstEntry.EntryId.Should().Be("0"); + firstEntry.ContentType.Should().Be(TestContentType); + firstEntry.Event.ToStringUtf8().Should() + .Be(JsonSerializer.Serialize(bulkPublishData[0], client.InnerClient.JsonSerializerOptions)); + firstEntry.Metadata.Should().BeEmpty(); + + var secondEntry = envelope.Entries[1]; + + secondEntry.EntryId.Should().Be("1"); + secondEntry.ContentType.Should().Be(TestContentType); + secondEntry.Event.ToStringUtf8().Should() + .Be(JsonSerializer.Serialize(bulkPublishData[1], client.InnerClient.JsonSerializerOptions)); + secondEntry.Metadata.Should().BeEmpty(); + + // Create Response & Respond + var response = new Autogenerated.BulkPublishResponse + { + FailedEntries = { } + }; + var bulkPublishResponse = await request.CompleteWithMessageAsync(response); + + // Get response and validate + bulkPublishResponse.FailedEntries.Count.Should().Be(0); + } + + [Fact] + public async Task BulkPublishEventAsync_CanPublishTopicWithEventsObject() + { + await using var client = TestClient.CreateForDaprClient(); + + var bulkPublishDataObject = new Widget() { Size = "Big", Color = "Green" }; + + var request = await client.CaptureGrpcRequestAsync(async daprClient => + await daprClient.BulkPublishEventAsync(TestPubsubName, TestTopicName, + new List { bulkPublishDataObject }, null)); + + request.Dismiss(); + + var envelope = await request.GetRequestEnvelopeAsync(); + envelope.Entries.Count.Should().Be(1); + envelope.PubsubName.Should().Be(TestPubsubName); + envelope.Topic.Should().Be(TestTopicName); + envelope.Metadata.Count.Should().Be(0); + + var firstEntry = envelope.Entries[0]; + + firstEntry.EntryId.Should().Be("0"); + firstEntry.ContentType.Should().Be(TestContentType); + firstEntry.Event.ToStringUtf8().Should() + .Be(JsonSerializer.Serialize(bulkPublishDataObject, client.InnerClient.JsonSerializerOptions)); + firstEntry.Metadata.Should().BeEmpty(); + + // Create Response & Respond + var response = new Autogenerated.BulkPublishResponse + { + FailedEntries = { } + }; + var bulkPublishResponse = await request.CompleteWithMessageAsync(response); + + // Get response and validate + bulkPublishResponse.FailedEntries.Count.Should().Be(0); + } + + [Fact] + public async Task BulkPublishEventAsync_WithCancelledToken() + { + await using var client = TestClient.CreateForDaprClient(); + + var cts = new CancellationTokenSource(); + cts.Cancel(); + + await Assert.ThrowsAsync(async () => + { + await client.InnerClient.BulkPublishEventAsync(TestPubsubName, TestTopicName, bulkPublishData, + null, cancellationToken: cts.Token); + }); + } + + [Fact] + public async Task BulkPublishEventAsync_WrapsRpcException() + { + var client = new MockClient(); + + var rpcStatus = new Status(StatusCode.Internal, "not gonna work"); + var rpcException = new RpcException(rpcStatus, new Metadata(), "not gonna work"); + + // Setup the mock client to throw an Rpc Exception with the expected details info + client.Mock + .Setup(m => m.BulkPublishEventAlpha1Async( + It.IsAny(), + It.IsAny())) + .Throws(rpcException); + + var ex = await Assert.ThrowsAsync(async () => + { + await client.DaprClient.BulkPublishEventAsync(TestPubsubName, TestTopicName, + bulkPublishData); + }); + Assert.Same(rpcException, ex.InnerException); + } + + [Fact] + public async Task BulkPublishEventAsync_FailureResponse() + { + await using var client = TestClient.CreateForDaprClient(); + + var metadata = new Dictionary { { "key1", "value1" }, { "key2", "value2" } }; + + var request = await client.CaptureGrpcRequestAsync(async daprClient => + { + return await daprClient.BulkPublishEventAsync(TestPubsubName, TestTopicName, + bulkPublishData, metadata); + }); + + request.Dismiss(); + + // Create Response & Respond + var response = new Autogenerated.BulkPublishResponse + { + FailedEntries = + { + new Autogenerated.BulkPublishResponseFailedEntry + { + EntryId = "0", + Error = "Failed to publish", + }, + new Autogenerated.BulkPublishResponseFailedEntry + { + EntryId = "1", + Error = "Failed to publish", + }, + } + }; + var bulkPublishResponse = await request.CompleteWithMessageAsync(response); + + // Get response and validate + bulkPublishResponse.FailedEntries[0].Entry.EntryId.Should().Be("0"); + bulkPublishResponse.FailedEntries[0].ErrorMessage.Should().Be("Failed to publish"); + + bulkPublishResponse.FailedEntries[1].Entry.EntryId.Should().Be("1"); + bulkPublishResponse.FailedEntries[1].ErrorMessage.Should().Be("Failed to publish"); + } + + private class Widget + { + public string Size { get; set; } + public string Color { get; set; } + } + } +}