diff --git a/workflows/csharp/sdk/README.md b/workflows/csharp/sdk/README.md new file mode 100644 index 00000000..dfca2bad --- /dev/null +++ b/workflows/csharp/sdk/README.md @@ -0,0 +1,44 @@ +# Dapr workflows + +In this quickstart, you'll create a microservice to demonstrate Dapr's workflow API. The service starts and manages a workflow to store and retrieve data in a state store. + +This quickstart includes one service: + +- Dotnet client service `order-processor` + +### Run Dotnet service with Dapr + +1. Open a new terminal window and navigate to `order-processor` directory: + + + +```bash +cd ./order-processor +dotnet restore +dotnet build +``` + + +2. Run the Dotnet service app with Dapr: + + + +```bash +cd ./order-processor +dapr run --dapr-grpc-port 4001 dotnet run +``` + + diff --git a/workflows/csharp/sdk/makefile b/workflows/csharp/sdk/makefile new file mode 100644 index 00000000..e7a8826b --- /dev/null +++ b/workflows/csharp/sdk/makefile @@ -0,0 +1,2 @@ +include ../../../docker.mk +include ../../../validate.mk \ No newline at end of file diff --git a/workflows/csharp/sdk/order-processor/Activities/NotifyActivity.cs b/workflows/csharp/sdk/order-processor/Activities/NotifyActivity.cs new file mode 100644 index 00000000..94745032 --- /dev/null +++ b/workflows/csharp/sdk/order-processor/Activities/NotifyActivity.cs @@ -0,0 +1,25 @@ +namespace WorkflowConsoleApp.Activities +{ + using System.Threading.Tasks; + using Dapr.Workflow; + using Microsoft.Extensions.Logging; + + record Notification(string Message); + + class NotifyActivity : WorkflowActivity + { + readonly ILogger logger; + + public NotifyActivity(ILoggerFactory loggerFactory) + { + this.logger = loggerFactory.CreateLogger(); + } + + public override Task RunAsync(WorkflowActivityContext context, Notification notification) + { + this.logger.LogInformation(notification.Message); + + return Task.FromResult(null); + } + } +} diff --git a/workflows/csharp/sdk/order-processor/Activities/ProcessPaymentActivity.cs b/workflows/csharp/sdk/order-processor/Activities/ProcessPaymentActivity.cs new file mode 100644 index 00000000..0f14d37e --- /dev/null +++ b/workflows/csharp/sdk/order-processor/Activities/ProcessPaymentActivity.cs @@ -0,0 +1,40 @@ +namespace WorkflowConsoleApp.Activities +{ + using System.Threading.Tasks; + using Dapr.Client; + using Dapr.Workflow; + using Microsoft.Extensions.Logging; + using WorkflowConsoleApp.Models; + using System; + + class ProcessPaymentActivity : WorkflowActivity + { + readonly ILogger logger; + readonly DaprClient client; + + public ProcessPaymentActivity(ILoggerFactory loggerFactory, DaprClient client) + { + this.logger = loggerFactory.CreateLogger(); + this.client = client; + } + + public override async Task RunAsync(WorkflowActivityContext context, PaymentRequest req) + { + this.logger.LogInformation( + "Processing payment: {requestId} for {amount} {item} at ${currency}", + req.RequestId, + req.Amount, + req.ItemBeingPruchased, + req.Currency); + + // Simulate slow processing + await Task.Delay(TimeSpan.FromSeconds(7)); + + this.logger.LogInformation( + "Payment for request ID '{requestId}' processed successfully", + req.RequestId); + + return null; + } + } +} diff --git a/workflows/csharp/sdk/order-processor/Activities/ReserveInventoryActivity.cs b/workflows/csharp/sdk/order-processor/Activities/ReserveInventoryActivity.cs new file mode 100644 index 00000000..d620ef56 --- /dev/null +++ b/workflows/csharp/sdk/order-processor/Activities/ReserveInventoryActivity.cs @@ -0,0 +1,62 @@ +namespace WorkflowConsoleApp.Activities +{ + using System.Threading.Tasks; + using Dapr.Client; + using Dapr.Workflow; + using Microsoft.Extensions.Logging; + using WorkflowConsoleApp.Models; + using System; + + class ReserveInventoryActivity : WorkflowActivity + { + readonly ILogger logger; + readonly DaprClient client; + static readonly string storeName = "statestore"; + + public ReserveInventoryActivity(ILoggerFactory loggerFactory, DaprClient client) + { + this.logger = loggerFactory.CreateLogger(); + this.client = client; + } + + public override async Task RunAsync(WorkflowActivityContext context, InventoryRequest req) + { + this.logger.LogInformation( + "Reserving inventory for order {requestId} of {quantity} {name}", + req.RequestId, + req.Quantity, + req.ItemName); + + OrderPayload orderResponse; + string key; + + // Ensure that the store has items + (orderResponse, key) = await client.GetStateAndETagAsync(storeName, req.ItemName); + + // Catch for the case where the statestore isn't setup + if (orderResponse == null) + { + // Not enough items. + return new InventoryResult(false, orderResponse); + } + + this.logger.LogInformation( + "There are: {requestId}, {name} available for purchase", + orderResponse.Quantity, + orderResponse.Name); + + // See if there're enough items to purchase + if (orderResponse.Quantity >= req.Quantity) + { + // Simulate slow processing + await Task.Delay(TimeSpan.FromSeconds(2)); + + return new InventoryResult(true, orderResponse); + } + + // Not enough items. + return new InventoryResult(false, orderResponse); + + } + } +} diff --git a/workflows/csharp/sdk/order-processor/Activities/UpdateInventoryActivity.cs b/workflows/csharp/sdk/order-processor/Activities/UpdateInventoryActivity.cs new file mode 100644 index 00000000..bbd01606 --- /dev/null +++ b/workflows/csharp/sdk/order-processor/Activities/UpdateInventoryActivity.cs @@ -0,0 +1,52 @@ +namespace WorkflowConsoleApp.Activities +{ + using System.Threading.Tasks; + using Dapr.Client; + using Dapr.Workflow; + using WorkflowConsoleApp.Models; + using Microsoft.Extensions.Logging; + using System; + + class UpdateInventoryActivity : WorkflowActivity + { + static readonly string storeName = "statestore"; + readonly ILogger logger; + readonly DaprClient client; + + public UpdateInventoryActivity(ILoggerFactory loggerFactory, DaprClient client) + { + this.logger = loggerFactory.CreateLogger(); + this.client = client; + } + + public override async Task RunAsync(WorkflowActivityContext context, PaymentRequest req) + { + this.logger.LogInformation( + "Checking Inventory for: Order# {requestId} for {amount} {item}", + req.RequestId, + req.Amount, + req.ItemBeingPruchased); + + // Simulate slow processing + await Task.Delay(TimeSpan.FromSeconds(5)); + + // Determine if there are enough Items for purchase + var (original, originalETag) = await client.GetStateAndETagAsync(storeName, req.ItemBeingPruchased); + int newQuantity = original.Quantity - req.Amount; + + if (newQuantity < 0) + { + this.logger.LogInformation( + "Payment for request ID '{requestId}' could not be processed. Insufficient inventory.", + req.RequestId); + throw new InvalidOperationException(); + } + + // Update the statestore with the new amount of paper clips + await client.SaveStateAsync(storeName, req.ItemBeingPruchased, new OrderPayload(Name: req.ItemBeingPruchased, TotalCost: req.Currency, Quantity: newQuantity)); + this.logger.LogInformation($"There are now: {newQuantity} {original.Name} left in stock"); + + return null; + } + } +} diff --git a/workflows/csharp/sdk/order-processor/Models.cs b/workflows/csharp/sdk/order-processor/Models.cs new file mode 100644 index 00000000..82bf7193 --- /dev/null +++ b/workflows/csharp/sdk/order-processor/Models.cs @@ -0,0 +1,9 @@ +namespace WorkflowConsoleApp.Models +{ + record OrderPayload(string Name, double TotalCost, int Quantity = 1); + record InventoryRequest(string RequestId, string ItemName, int Quantity); + record InventoryResult(bool Success, OrderPayload orderPayload); + record PaymentRequest(string RequestId, string ItemBeingPruchased, int Amount, double Currency); + record OrderResult(bool Processed); + record InventoryItem(string Name, double TotalCost, int Quantity); +} \ No newline at end of file diff --git a/workflows/csharp/sdk/order-processor/Program.cs b/workflows/csharp/sdk/order-processor/Program.cs new file mode 100644 index 00000000..dfea1dfd --- /dev/null +++ b/workflows/csharp/sdk/order-processor/Program.cs @@ -0,0 +1,96 @@ +using Dapr.Client; +using Dapr.Workflow; +using WorkflowConsoleApp.Activities; +using WorkflowConsoleApp.Models; +using WorkflowConsoleApp.Workflows; +using System.Text.Json; +using Microsoft.Extensions.Hosting; +using System; +using System.Threading; +using System.Threading.Tasks; + +const string workflowComponent = "dapr"; +const string storeName = "statestore"; +const string workflowName = nameof(OrderProcessingWorkflow); + +// The workflow host is a background service that connects to the sidecar over gRPC +var builder = Host.CreateDefaultBuilder(args).ConfigureServices(services => +{ + services.AddDaprWorkflow(options => + { + // Note that it's also possible to register a lambda function as the workflow + // or activity implementation instead of a class. + options.RegisterWorkflow(); + + // These are the activities that get invoked by the workflow(s). + options.RegisterActivity(); + options.RegisterActivity(); + options.RegisterActivity(); + options.RegisterActivity(); + }); +}); + +// Start the app - this is the point where we connect to the Dapr sidecar +using var host = builder.Build(); +host.Start(); + +// Start the client +string daprPortStr = Environment.GetEnvironmentVariable("DAPR_GRPC_PORT"); +if (string.IsNullOrEmpty(daprPortStr)) +{ + Environment.SetEnvironmentVariable("DAPR_GRPC_PORT", "4001"); +} +using var daprClient = new DaprClientBuilder().Build(); + +// Start the console app +while (true) +{ + var health = await daprClient.CheckHealthAsync(); + Console.WriteLine("Welcome to the workflows example!"); + Console.WriteLine("In this example, you will be starting a workflow and obtaining the status."); + + // Populate the store with items + RestockInventory(); + + // Can we remove the logging info? + + // Main Loop + // Generate a unique ID for the workflow + string orderId = Guid.NewGuid().ToString()[..8]; + string itemToPurchase = "Cars"; + int ammountToPurchase = 10; + Console.WriteLine("In this quickstart, you will be purhasing {0} {1}.", ammountToPurchase, itemToPurchase); + + // Construct the order + OrderPayload orderInfo = new OrderPayload(itemToPurchase, 15000, ammountToPurchase); + + OrderPayload orderResponse; + string key; + // Ensure that the store has items + (orderResponse, key) = await daprClient.GetStateAndETagAsync(storeName, itemToPurchase); + + // Start the workflow + Console.WriteLine("Starting workflow {0} purchasing {1} {2}", orderId, ammountToPurchase, itemToPurchase); + var response = await daprClient.StartWorkflowAsync(orderId, workflowComponent, workflowName, orderInfo, null, CancellationToken.None); + + // Wait a second to allow workflow to start + await Task.Delay(TimeSpan.FromSeconds(1)); + + var state = await daprClient.GetWorkflowAsync(orderId, workflowComponent, workflowName); + Console.WriteLine("Your workflow has started. Here is the status of the workflow: {0}", state); + while (state.metadata["dapr.workflow.runtime_status"].ToString() == "RUNNING") + { + await Task.Delay(TimeSpan.FromSeconds(5)); + state = await daprClient.GetWorkflowAsync(orderId, workflowComponent, workflowName); + } + Console.WriteLine("Your workflow has completed: {0}", JsonSerializer.Serialize(state)); + Console.WriteLine("Workflow Status: {0}", state.metadata["dapr.workflow.runtime_status"]); + + break; +} + +void RestockInventory() +{ + daprClient.SaveStateAsync(storeName, "Cars", new OrderPayload(Name: "Cars", TotalCost: 15000, Quantity: 100)); + return; +} diff --git a/workflows/csharp/sdk/order-processor/WorkflowConsoleApp.csproj b/workflows/csharp/sdk/order-processor/WorkflowConsoleApp.csproj new file mode 100644 index 00000000..5f91d0b7 --- /dev/null +++ b/workflows/csharp/sdk/order-processor/WorkflowConsoleApp.csproj @@ -0,0 +1,13 @@ + + + + Exe + net6.0 + + + + + + + + diff --git a/workflows/csharp/sdk/order-processor/Workflows/OrderProcessingWorkflow.cs b/workflows/csharp/sdk/order-processor/Workflows/OrderProcessingWorkflow.cs new file mode 100644 index 00000000..68de9b4f --- /dev/null +++ b/workflows/csharp/sdk/order-processor/Workflows/OrderProcessingWorkflow.cs @@ -0,0 +1,67 @@ +namespace WorkflowConsoleApp.Workflows +{ + using System.Threading.Tasks; + using Dapr.Workflow; + using DurableTask.Core.Exceptions; + using WorkflowConsoleApp.Activities; + using WorkflowConsoleApp.Models; + + class OrderProcessingWorkflow : Workflow + { + public override async Task RunAsync(WorkflowContext context, OrderPayload order) + { + string orderId = context.InstanceId; + + // Notify the user that an order has come through + await context.CallActivityAsync( + nameof(NotifyActivity), + new Notification($"Received order {orderId} for {order.Quantity} {order.Name} at ${order.TotalCost}")); + + string requestId = context.InstanceId; + + // Determine if there is enough of the item available for purchase by checking the inventory + InventoryResult result = await context.CallActivityAsync( + nameof(ReserveInventoryActivity), + new InventoryRequest(RequestId: orderId, order.Name, order.Quantity)); + + // If there is insufficient inventory, fail and let the user know + if (!result.Success) + { + // End the workflow here since we don't have sufficient inventory + await context.CallActivityAsync( + nameof(NotifyActivity), + new Notification($"Insufficient inventory for {order.Name}")); + return new OrderResult(Processed: false); + } + + // There is enough inventory available so the user can purchase the item(s). Process their payment + await context.CallActivityAsync( + nameof(ProcessPaymentActivity), + new PaymentRequest(RequestId: orderId, order.Name, order.Quantity, order.TotalCost)); + + try + { + // There is enough inventory available so the user can purchase the item(s). Process their payment + await context.CallActivityAsync( + nameof(UpdateInventoryActivity), + new PaymentRequest(RequestId: orderId, order.Name, order.Quantity, order.TotalCost)); + } + catch (TaskFailedException) + { + // Let them know their payment was processed + await context.CallActivityAsync( + nameof(NotifyActivity), + new Notification($"Order {orderId} Failed! You are now getting a refund")); + return new OrderResult(Processed: false); + } + + // Let them know their payment was processed + await context.CallActivityAsync( + nameof(NotifyActivity), + new Notification($"Order {orderId} has completed!")); + + // End the workflow with a success result + return new OrderResult(Processed: true); + } + } +}