Parallel workflow processing (#1580)

* Added example to demonstrate parallel workflow concurrency
This commit is contained in:
Whit Waldo 2025-07-08 06:13:08 -05:00 committed by GitHub
parent e1b216cc2e
commit 08c0a3fef1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 832 additions and 2 deletions

View File

@ -17,8 +17,8 @@
<PackageVersion Include="Grpc.Tools" Version="2.72.0" />
<PackageVersion Include="Microsoft.AspNetCore.Mvc.Testing" Version="8.0.17" Condition="'$(TargetFramework)' == 'net8.0' Or '$(TargetFramework)' == 'net8'" />
<PackageVersion Include="Microsoft.AspNetCore.Mvc.Testing" Version="9.0.6" Condition="'$(TargetFramework)' == 'net9.0' Or '$(TargetFramework)' == 'net9'" />
<PackageVersion Include="Microsoft.AspNetCore.TestHost" Version="8.0.17" Condition="'$(TargetFramework)' == 'net8.0' Or '$(TargetFramework)' == 'net8'"/>
<PackageVersion Include="Microsoft.AspNetCore.TestHost" Version="9.0.6" Condition="'$(TargetFramework)' == 'net9.0' Or '$(TargetFramework)' == 'net9'"/>
<PackageVersion Include="Microsoft.AspNetCore.TestHost" Version="8.0.17" Condition="'$(TargetFramework)' == 'net8.0' Or '$(TargetFramework)' == 'net8'" />
<PackageVersion Include="Microsoft.AspNetCore.TestHost" Version="9.0.6" Condition="'$(TargetFramework)' == 'net9.0' Or '$(TargetFramework)' == 'net9'" />
<PackageVersion Include="Microsoft.CodeAnalysis.Analyzers" Version="4.14.0" />
<PackageVersion Include="Microsoft.CodeAnalysis.Common" Version="4.14.0" />
<PackageVersion Include="Microsoft.CodeAnalysis.CSharp" Version="4.14.0" />

View File

@ -177,6 +177,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Cryptography", "examples\Cr
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Messaging", "Messaging", "{442E80E5-8040-4123-B88A-26FD36BA95D9}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "WorkflowParallelFanOut", "examples\Workflow\WorkflowParallelFanOut\WorkflowParallelFanOut.csproj", "{5764B1AA-66B8-43AE-9E0D-0B3B71714B92}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@ -469,6 +471,10 @@ Global
{097D5F6F-D26F-4BFB-9074-FA52577EB442}.Debug|Any CPU.Build.0 = Debug|Any CPU
{097D5F6F-D26F-4BFB-9074-FA52577EB442}.Release|Any CPU.ActiveCfg = Release|Any CPU
{097D5F6F-D26F-4BFB-9074-FA52577EB442}.Release|Any CPU.Build.0 = Release|Any CPU
{5764B1AA-66B8-43AE-9E0D-0B3B71714B92}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{5764B1AA-66B8-43AE-9E0D-0B3B71714B92}.Debug|Any CPU.Build.0 = Debug|Any CPU
{5764B1AA-66B8-43AE-9E0D-0B3B71714B92}.Release|Any CPU.ActiveCfg = Release|Any CPU
{5764B1AA-66B8-43AE-9E0D-0B3B71714B92}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@ -554,6 +560,7 @@ Global
{097D5F6F-D26F-4BFB-9074-FA52577EB442} = {6843B5B3-9E95-4022-B792-8A1DE6BFEFEC}
{442E80E5-8040-4123-B88A-26FD36BA95D9} = {D687DDC4-66C5-4667-9E3A-FD8B78ECAA78}
{E070F694-335D-4D96-8951-F41D0A5F2A8B} = {442E80E5-8040-4123-B88A-26FD36BA95D9}
{5764B1AA-66B8-43AE-9E0D-0B3B71714B92} = {BF3ED6BF-ADF3-4D25-8E89-02FB8D945CA9}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {65220BF2-EAE1-4CB2-AA58-EBE80768CB40}

View File

@ -0,0 +1,59 @@
// ------------------------------------------------------------------------
// Copyright 2025 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------
using Dapr.Workflow;
using Microsoft.Extensions.Logging;
namespace WorkflowParallelFanOut;
public sealed partial class OrderProcessingWorkflow : Workflow<OrderRequest[], OrderResult[]>
{
/// <summary>
/// Override to implement workflow logic.
/// </summary>
/// <param name="context">The workflow context.</param>
/// <param name="orders">The deserialized workflow input.</param>
/// <returns>The output of the workflow as a task.</returns>
public override async Task<OrderResult[]> RunAsync(WorkflowContext context, OrderRequest[] orders)
{
var logger = context.CreateReplaySafeLogger<OrderProcessingWorkflow>();
if (!context.IsReplaying)
{
LogStartingOrderProcessorWorkflow(logger, orders.Length);
}
//Process all orders in parallel with controlled concurrency
var orderResults = await context.ProcessInParallelAsync(
orders,
order => context.CallActivityAsync<OrderResult>(nameof(ProcessOrderActivity), order), maxConcurrency: 5);
//Calculate summary statistics
var totalProcessed = orderResults.Count(r => r.IsProcessed);
var totalFailed = orderResults.Length - totalProcessed;
var totalAmount = orderResults.Where(r => r.IsProcessed).Sum(r => r.TotalAmount);
if (!context.IsReplaying)
{
LogCompletedProcessingWorkflow(logger, orders.Length, totalProcessed, totalFailed, totalAmount);
}
return orderResults;
}
[LoggerMessage(LogLevel.Information, "Starting order processing workflow with {OrderCount} orders")]
static partial void LogStartingOrderProcessorWorkflow(ILogger logger, int orderCount);
[LoggerMessage(LogLevel.Information, "Completed processing {TotalOrders} orders. Processed: {ProcessedCount}, Failed: {FailedCount}, Total Amount: {TotalAmount:c}")]
static partial void LogCompletedProcessingWorkflow(ILogger logger, int totalOrders, int processedCount, int failedCount, decimal totalAmount);
}

View File

@ -0,0 +1,16 @@
// ------------------------------------------------------------------------
// Copyright 2025 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------
namespace WorkflowParallelFanOut;
public sealed record OrderRequest(string OrderId, string ProductName, int Quantity, decimal Price);

View File

@ -0,0 +1,21 @@
// ------------------------------------------------------------------------
// Copyright 2025 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------
namespace WorkflowParallelFanOut;
public sealed record OrderResult(
string OrderId,
bool IsProcessed,
decimal TotalAmount,
string Status,
DateTime ProcessedAt);

View File

@ -0,0 +1,98 @@
// ------------------------------------------------------------------------
// Copyright 2025 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------
using Dapr.Workflow;
using Microsoft.Extensions.Logging;
using ILogger = Microsoft.Extensions.Logging.ILogger;
namespace WorkflowParallelFanOut;
public sealed partial class ProcessOrderActivity(ILogger<ProcessOrderActivity> logger) : WorkflowActivity<OrderRequest, OrderResult>
{
private static readonly Random Random = new();
/// <summary>
/// Override to implement async (non-blocking) workflow activity logic.
/// </summary>
/// <param name="context">Provides access to additional context for the current activity execution.</param>
/// <param name="order">The deserialized activity input.</param>
/// <returns>The output of the activity as a task.</returns>
public override async Task<OrderResult> RunAsync(WorkflowActivityContext context, OrderRequest order)
{
LogProcessingOrder(logger, order.OrderId, order.ProductName);
//Simulate processing time (between 100 and 2000ms)
var processingTime = Random.Next(100, 2000);
await Task.Delay(processingTime);
//Simulate occasional failures (10% chance)
var shouldFail = Random.Next(1, 101) <= 10;
if (shouldFail)
{
LogOrderFailed(logger, order.OrderId);
return new OrderResult(
order.OrderId,
IsProcessed: false,
TotalAmount: 0,
Status: "Failed - System Error",
ProcessedAt: DateTime.UtcNow);
}
// Simulate inventory check
var hasInventory = Random.Next(1, 101) <= 90; // 90% chance of having inventory
if (!hasInventory)
{
LogInsufficientInventory(logger, order.OrderId);
return new OrderResult(
order.OrderId,
IsProcessed: false,
TotalAmount: 0,
Status: "Failed - Insufficient Inventory",
ProcessedAt: DateTime.UtcNow);
}
//Calculate total amount (with potential discount)
var totalAmount = order.Quantity * order.Price;
// Apply bulk discount for large orders
if (order.Quantity >= 10)
{
totalAmount *= 0.9m; // 10% discount
LogDiscountApplied(logger, order.OrderId);
}
LogSuccessfullyProcessedOrder(logger, order.OrderId, totalAmount);
return new OrderResult(
order.OrderId,
IsProcessed: true,
TotalAmount: totalAmount,
Status: "Processed Successfully",
ProcessedAt: DateTime.UtcNow);
}
[LoggerMessage(LogLevel.Information, "Processing order {OrderId} for product {ProductName}")]
static partial void LogProcessingOrder(ILogger logger, string orderId, string productName);
[LoggerMessage(LogLevel.Warning, "Order {OrderId} failed during processing")]
static partial void LogOrderFailed(ILogger logger, string orderId);
[LoggerMessage(LogLevel.Warning, "Order {OrderId} failed - insufficient inventory")]
static partial void LogInsufficientInventory(ILogger logger, string orderId);
[LoggerMessage(LogLevel.Information, "Applied bulk discount to order {OrderId}")]
static partial void LogDiscountApplied(ILogger logger, string orderId);
[LoggerMessage(LogLevel.Information, "Successfully processed order {OrderId} with total amount {TotalAmount:c}")]
static partial void LogSuccessfullyProcessedOrder(ILogger logger, string orderId, decimal totalAmount);
}

View File

@ -0,0 +1,82 @@
using Dapr.Workflow;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using WorkflowParallelFanOut;
var builder = Host.CreateDefaultBuilder(args).ConfigureServices(services =>
{
services.AddDaprWorkflow(options =>
{
options.RegisterWorkflow<OrderProcessingWorkflow>();
options.RegisterActivity<ProcessOrderActivity>();
});
});
var host = builder.Build();
await host.StartAsync();
await using var scope = host.Services.CreateAsyncScope();
var daprWorkflowClient = scope.ServiceProvider.GetRequiredService<DaprWorkflowClient>();
var logger = scope.ServiceProvider.GetRequiredService<ILogger<Program>>();
// Create sample orders to demonstrate parallel processing
var orders = new[]
{
new OrderRequest("ORD-001", "Laptop", 2, 999.99m),
new OrderRequest("ORD-002", "Mouse", 5, 29.99m),
new OrderRequest("ORD-003", "Keyboard", 3, 79.99m),
new OrderRequest("ORD-004", "Monitor", 1, 299.99m),
new OrderRequest("ORD-005", "Headphones", 4, 149.99m),
new OrderRequest("ORD-006", "Webcam", 2, 89.99m),
new OrderRequest("ORD-007", "Printer", 1, 199.99m),
new OrderRequest("ORD-008", "Tablet", 6, 249.99m),
new OrderRequest("ORD-009", "Phone", 1, 799.99m),
new OrderRequest("ORD-010", "Charger", 10, 24.99m), // Bulk order for discount
new OrderRequest("ORD-011", "Cable", 20, 9.99m), // Bulk order for discount
new OrderRequest("ORD-012", "Speakers", 3, 119.99m),
new OrderRequest("ORD-013", "Router", 2, 149.99m),
new OrderRequest("ORD-014", "Hard Drive", 4, 129.99m),
new OrderRequest("ORD-015", "Graphics Card", 1, 599.99m)
};
logger.LogInformation("Starting workflow with {OrderCount} orders", orders.Length);
var instanceId = $"orderprocessing-workflow-{Guid.NewGuid().ToString()[..8]}";
await daprWorkflowClient.ScheduleNewWorkflowAsync(nameof(OrderProcessingWorkflow), instanceId, orders);
logger.LogInformation("Workflow {InstanceId} started, waiting for completion...", instanceId);
// Wait for workflow completion
await daprWorkflowClient.WaitForWorkflowCompletionAsync(instanceId);
var state = await daprWorkflowClient.GetWorkflowStateAsync(instanceId);
logger.LogInformation("Workflow {InstanceId} completed with status: {Status}", instanceId, state.RuntimeStatus);
if (state.ReadOutputAs<OrderResult[]>() is { } results)
{
logger.LogInformation("Processing Results:");
logger.LogInformation("==================");
var processedOrders = results.Where(r => r.IsProcessed).ToList();
var failedOrders = results.Where(r => !r.IsProcessed).ToList();
logger.LogInformation("Successfully processed {ProcessedCount} orders:", processedOrders.Count);
foreach (var order in processedOrders)
{
logger.LogInformation(" - {OrderId}: {TotalAmount:C} ({Status})",
order.OrderId, order.TotalAmount, order.Status);
}
if (failedOrders.Any())
{
logger.LogWarning("Failed orders ({FailedCount}):", failedOrders.Count);
foreach (var order in failedOrders)
{
logger.LogWarning(" - {OrderId}: {Status}", order.OrderId, order.Status);
}
}
var totalAmount = processedOrders.Sum(r => r.TotalAmount);
logger.LogInformation("Total processed amount: {TotalAmount:C}", totalAmount);
}

View File

@ -0,0 +1,17 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<ProjectReference Include="..\..\..\src\Dapr.Workflow\Dapr.Workflow.csproj" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Hosting" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,148 @@
// ------------------------------------------------------------------------
// Copyright 2025 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
namespace Dapr.Workflow;
/// <summary>
/// Extension methods for <see cref="WorkflowContext"/> that provide high-level parallel processing primitives
/// with controlled concurrency.
/// </summary>
public static class ParallelExtensions
{
/// <summary>
/// Processes a collection of inputs in parallel with controlled concurrency using a streaming execution model.
/// </summary>
/// <typeparam name="TInput">The type of input items to process.</typeparam>
/// <typeparam name="TResult">The type of result items returned by the task factory.</typeparam>
/// <param name="context">The orchestration context.</param>
/// <param name="inputs">The collection of inputs to process in parallel.</param>
/// <param name="taskFactory">
/// A function that creates a task for each input item. This function is called in the orchestration context
/// to ensure all tasks are properly tracked by the durable task framework.
/// </param>
/// <param name="maxConcurrency">
/// The maximum number of tasks to execute concurrently. Defaults to 5 if not specified.
/// Must be greater than 0.
/// </param>
/// <returns>
/// A task that completes when all input items have been processed. The result is an array containing
/// the results in the same order as the input collection.
/// </returns>
/// <exception cref="ArgumentNullException">
/// Thrown when <paramref name="context"/>, <paramref name="inputs"/>, or <paramref name="taskFactory"/> is null.
/// </exception>
/// <exception cref="ArgumentOutOfRangeException">
/// Thrown when <paramref name="maxConcurrency"/> is less than or equal to 0.
/// </exception>
/// <exception cref="AggregateException">
/// Thrown when one or more tasks fail during execution. All task exceptions are collected and wrapped.
/// </exception>
/// <remarks>
/// <para>
/// This method uses a streaming execution model that maintains constant memory usage regardless of input size.
/// Only <paramref name="maxConcurrency"/> tasks are active at any given time, with new tasks started as
/// existing ones complete. This provides optimal resource utilization and prevents memory issues with large datasets.
/// </para>
/// <para>
/// The method is fully deterministic for durable task orchestrations. All tasks are created in the orchestration
/// context before any coordination logic begins, ensuring proper replay behavior. The framework records history
/// events for each task creation, and during replay, all tasks complete immediately with their recorded results.
/// </para>
/// <para>
/// If any task fails, the method will wait for all currently executing tasks to complete before throwing an
/// <see cref="AggregateException"/> containing all failures.
/// </para>
/// <para>
/// Example usage:
/// <code>
/// var orderIds = new[] { "order1", "order2", "order3", "order4", "order5" };
/// var results = await context.ProcessInParallelAsync(
/// orderIds,
/// orderId => context.CallActivityAsync&lt;OrderResult&gt;("ProcessOrder", orderId),
/// maxConcurrency: 3);
/// </code>
/// </para>
/// </remarks>
public static async Task<TResult[]> ProcessInParallelAsync<TInput, TResult>(
this WorkflowContext context,
IEnumerable<TInput> inputs,
Func<TInput, Task<TResult>> taskFactory,
int maxConcurrency = 5)
{
ArgumentNullException.ThrowIfNull(context);
ArgumentNullException.ThrowIfNull(inputs);
ArgumentNullException.ThrowIfNull(taskFactory);
if (maxConcurrency <= 0)
throw new ArgumentOutOfRangeException(nameof(maxConcurrency), "Max concurrency must be greater than 0.");
var inputList = inputs.ToList();
if (inputList.Count == 0)
return [];
var results = new TResult[inputList.Count];
var inFlightTasks = new Dictionary<Task<TResult>, int>(); // Task -> result index
var inputIndex = 0;
var completedCount = 0;
var exceptions = new List<Exception>();
// Start initial batch up to maxConcurrency
while (inputIndex < inputList.Count && inFlightTasks.Count < maxConcurrency)
{
var task = taskFactory(inputList[inputIndex]);
inFlightTasks[task] = inputIndex;
inputIndex++;
}
// Process remaining items with streaming execution
while (completedCount < inputList.Count)
{
var completedTask = await Task.WhenAny(inFlightTasks.Keys);
var resultIndex = inFlightTasks[completedTask];
try
{
results[resultIndex] = await completedTask;
}
catch (Exception ex)
{
exceptions.Add(ex);
}
inFlightTasks.Remove(completedTask);
completedCount++;
// Start next task if more work remains
if (inputIndex < inputList.Count)
{
var nextTask = taskFactory(inputList[inputIndex]);
inFlightTasks[nextTask] = inputIndex;
inputIndex++;
}
}
// If any exceptions occurred, throw them as an aggregate
if (exceptions.Count > 0)
{
throw new AggregateException(
$"One or more tasks failed during parallel processing. {exceptions.Count} out of {inputList.Count} tasks failed.",
exceptions);
}
return results;
}
}

View File

@ -0,0 +1,382 @@
// ------------------------------------------------------------------------
// Copyright 2025 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------
using Moq;
namespace Dapr.Workflow.Test;
/// <summary>
/// Contains tests for ParallelExtensions.ProcessInParallelAsync method.
/// </summary>
public class ParallelExtensionsTest
{
private readonly Mock<WorkflowContext> _workflowContextMock = new();
[Fact]
public async Task ProcessInParallelAsync_WithValidInputs_ShouldProcessAllItemsSuccessfully()
{
// Arrange
var inputs = new[] { 1, 2, 3, 4, 5 };
var expectedResults = new[] { 2, 4, 6, 8, 10 };
// Act
var results = await _workflowContextMock.Object.ProcessInParallelAsync(
inputs,
async input => await Task.FromResult(input * 2),
maxConcurrency: 2);
// Assert
Assert.Equal(expectedResults, results);
}
[Fact]
public async Task ProcessInParallelAsync_WithEmptyInputs_ShouldReturnEmptyArray()
{
// Arrange
var inputs = Array.Empty<int>();
// Act
var results = await _workflowContextMock.Object.ProcessInParallelAsync(
inputs,
async input => await Task.FromResult(input * 2));
// Assert
Assert.Empty(results);
}
[Fact]
public async Task ProcessInParallelAsync_WithSingleInput_ShouldProcessCorrectly()
{
// Arrange
var inputs = new[] { 42 };
// Act
var results = await _workflowContextMock.Object.ProcessInParallelAsync(
inputs,
async input => await Task.FromResult(input.ToString()));
// Assert
Assert.Single(results);
Assert.Equal("42", results[0]);
}
[Fact]
public async Task ProcessInParallelAsync_WithMaxConcurrency1_ShouldProcessSequentially()
{
// Arrange
var inputs = new[] { 1, 2, 3 };
var processedOrder = new List<int>();
var processingTasks = new List<Task>();
// Act
var results = await _workflowContextMock.Object.ProcessInParallelAsync(
inputs,
async input =>
{
processedOrder.Add(input);
await Task.Delay(10); // Small delay to ensure order
return input * 2;
},
maxConcurrency: 1);
// Assert
Assert.Equal(new[] { 2, 4, 6 }, results);
Assert.Equal(new[] { 1, 2, 3 }, processedOrder);
}
[Fact]
public async Task ProcessInParallelAsync_WithHighConcurrency_ShouldRespectConcurrencyLimit()
{
// Arrange
var inputs = Enumerable.Range(1, 100).ToArray();
var concurrentTasks = 0;
var maxConcurrentTasks = 0;
var lockObj = new object();
// Act
var results = await _workflowContextMock.Object.ProcessInParallelAsync(
inputs,
async input =>
{
lock (lockObj)
{
concurrentTasks++;
maxConcurrentTasks = Math.Max(maxConcurrentTasks, concurrentTasks);
}
await Task.Delay(10);
lock (lockObj)
{
concurrentTasks--;
}
return input * 2;
},
maxConcurrency: 10);
// Assert
Assert.Equal(inputs.Length, results.Length);
Assert.True(maxConcurrentTasks <= 10, $"Expected max concurrent tasks <= 10, but was {maxConcurrentTasks}");
Assert.True(maxConcurrentTasks >= 1, "At least one task should have been executed");
}
[Fact]
public async Task ProcessInParallelAsync_WithNullContext_ShouldThrowArgumentNullException()
{
// Arrange
WorkflowContext nullContext = null!;
var inputs = new[] { 1, 2, 3 };
// Act & Assert
await Assert.ThrowsAsync<ArgumentNullException>(
async () => await nullContext.ProcessInParallelAsync(
inputs,
async input => await Task.FromResult(input * 2)));
}
[Fact]
public async Task ProcessInParallelAsync_WithNullInputs_ShouldThrowArgumentNullException()
{
// Arrange
IEnumerable<int> nullInputs = null!;
// Act & Assert
await Assert.ThrowsAsync<ArgumentNullException>(
async () => await _workflowContextMock.Object.ProcessInParallelAsync(
nullInputs,
async input => await Task.FromResult(input * 2)));
}
[Fact]
public async Task ProcessInParallelAsync_WithNullTaskFactory_ShouldThrowArgumentNullException()
{
// Arrange
var inputs = new[] { 1, 2, 3 };
Func<int, Task<int>> nullTaskFactory = null!;
// Act & Assert
await Assert.ThrowsAsync<ArgumentNullException>(
async () => await _workflowContextMock.Object.ProcessInParallelAsync(
inputs,
nullTaskFactory));
}
[Theory]
[InlineData(0)]
[InlineData(-1)]
[InlineData(-10)]
public async Task ProcessInParallelAsync_WithInvalidMaxConcurrency_ShouldThrowArgumentOutOfRangeException(int maxConcurrency)
{
// Arrange
var inputs = new[] { 1, 2, 3 };
// Act & Assert
await Assert.ThrowsAsync<ArgumentOutOfRangeException>(
async () => await _workflowContextMock.Object.ProcessInParallelAsync(
inputs,
async input => await Task.FromResult(input * 2),
maxConcurrency));
}
[Fact]
public async Task ProcessInParallelAsync_WithTaskFailure_ShouldThrowAggregateException()
{
// Arrange
var inputs = new[] { 1, 2, 3, 4, 5 };
//var expectedSuccessfulResults = 3; // Items 1, 3, 5 should succeed
const int expectedFailures = 2; // Items 2, 4 should fail
// Act & Assert
var aggregateException = await Assert.ThrowsAsync<AggregateException>(
async () => await _workflowContextMock.Object.ProcessInParallelAsync(
inputs,
async input =>
{
await Task.Delay(10);
if (input % 2 == 0) // Even numbers fail
throw new InvalidOperationException($"Failed processing item {input}");
return input * 2;
},
maxConcurrency: 2));
// Assert
Assert.Equal(expectedFailures, aggregateException.InnerExceptions.Count);
Assert.All(aggregateException.InnerExceptions, ex =>
Assert.IsType<InvalidOperationException>(ex));
Assert.Contains("2 out of 5 tasks failed", aggregateException.Message);
}
[Fact]
public async Task ProcessInParallelAsync_WithAllTasksFailure_ShouldThrowAggregateExceptionWithAllFailures()
{
// Arrange
var inputs = new[] { 1, 2, 3 };
var expectedMessage = "Test failure";
// Act & Assert
var aggregateException = await Assert.ThrowsAsync<AggregateException>(
async () => await _workflowContextMock.Object.ProcessInParallelAsync<int, object>(
inputs,
async input =>
{
await Task.Delay(10);
throw new InvalidOperationException($"{expectedMessage} {input}");
}));
// Assert
Assert.Equal(3, aggregateException.InnerExceptions.Count);
Assert.All(aggregateException.InnerExceptions, ex =>
{
Assert.IsType<InvalidOperationException>(ex);
Assert.Contains(expectedMessage, ex.Message);
});
}
[Fact]
public async Task ProcessInParallelAsync_WithMixedSuccessAndFailure_ShouldPreserveOrderInResults()
{
// Arrange
var inputs = new[] { 1, 2, 3, 4, 5 };
// Act & Assert
var aggregateException = await Assert.ThrowsAsync<AggregateException>(
async () => await _workflowContextMock.Object.ProcessInParallelAsync(
inputs,
async input =>
{
await Task.Delay(input * 10); // Different delays to test ordering
if (input == 3)
throw new InvalidOperationException($"Failed on item {input}");
return input * 2;
},
maxConcurrency: 2));
// Assert that failure occurred
Assert.Single(aggregateException.InnerExceptions);
Assert.Contains("Failed on item 3", aggregateException.InnerExceptions[0].Message);
}
[Fact]
public async Task ProcessInParallelAsync_WithDefaultMaxConcurrency_ShouldUseDefaultValue()
{
// Arrange
var inputs = Enumerable.Range(1, 20).ToArray();
var concurrentTasks = 0;
var maxConcurrentTasks = 0;
var lockObj = new object();
// Act
var results = await _workflowContextMock.Object.ProcessInParallelAsync(
inputs,
async input =>
{
lock (lockObj)
{
concurrentTasks++;
maxConcurrentTasks = Math.Max(maxConcurrentTasks, concurrentTasks);
}
await Task.Delay(50); // Longer delay to ensure concurrency
lock (lockObj)
{
concurrentTasks--;
}
return input * 2;
}); // Using default maxConcurrency (should be 5)
// Assert
Assert.Equal(inputs.Length, results.Length);
Assert.True(maxConcurrentTasks <= 5, $"Expected max concurrent tasks <= 5, but was {maxConcurrentTasks}");
Assert.True(maxConcurrentTasks >= 1, "At least one task should have been executed");
}
[Fact]
public async Task ProcessInParallelAsync_WithDifferentInputAndOutputTypes_ShouldHandleTypeConversion()
{
// Arrange
var inputs = new[] { "1", "2", "3", "4", "5" };
var expectedResults = new[] { 1, 2, 3, 4, 5 };
// Act
var results = await _workflowContextMock.Object.ProcessInParallelAsync(
inputs,
async input => await Task.FromResult(int.Parse(input)),
maxConcurrency: 3);
// Assert
Assert.Equal(expectedResults, results);
}
[Fact]
public async Task ProcessInParallelAsync_WithComplexObjects_ShouldProcessCorrectly()
{
// Arrange
var inputs = new[]
{
new TestInput { Id = 1, Value = "Test1" },
new TestInput { Id = 2, Value = "Test2" },
new TestInput { Id = 3, Value = "Test3" }
};
// Act
var results = await _workflowContextMock.Object.ProcessInParallelAsync(
inputs,
async input => await Task.FromResult(new TestOutput
{
ProcessedId = input.Id * 10,
ProcessedValue = input.Value.ToUpper()
}),
maxConcurrency: 2);
// Assert
Assert.Equal(3, results.Length);
Assert.Equal(10, results[0].ProcessedId);
Assert.Equal("TEST1", results[0].ProcessedValue);
Assert.Equal(20, results[1].ProcessedId);
Assert.Equal("TEST2", results[1].ProcessedValue);
Assert.Equal(30, results[2].ProcessedId);
Assert.Equal("TEST3", results[2].ProcessedValue);
}
[Fact]
public async Task ProcessInParallelAsync_WithLargeDataset_ShouldHandleEfficiently()
{
// Arrange
var inputs = Enumerable.Range(1, 1000).ToArray();
var expectedResults = inputs.Select(x => x * 2).ToArray();
// Act
var results = await _workflowContextMock.Object.ProcessInParallelAsync(
inputs,
async input => await Task.FromResult(input * 2),
maxConcurrency: 10);
// Assert
Assert.Equal(expectedResults, results);
}
private class TestInput
{
public int Id { get; set; }
public string Value { get; set; } = string.Empty;
}
private class TestOutput
{
public int ProcessedId { get; set; }
public string ProcessedValue { get; set; } = string.Empty;
}
}