Initial push for workflows quickstart

Signed-off-by: Ryan Lettieri <ryanLettieri@microsoft.com>
This commit is contained in:
Ryan Lettieri 2023-02-05 10:16:05 -07:00
parent 5ac2a30743
commit f2c6352dd0
10 changed files with 410 additions and 0 deletions

View File

@ -0,0 +1,44 @@
# Dapr workflows
In this quickstart, you'll create a microservice to demonstrate Dapr's workflow API. The service starts and manages a workflow to store and retrieve data in a state store.
This quickstart includes one service:
- Dotnet client service `order-processor`
### Run Dotnet service with Dapr
1. Open a new terminal window and navigate to `order-processor` directory:
<!-- STEP
name: Install Dotnet dependencies
-->
```bash
cd ./order-processor
dotnet restore
dotnet build
```
<!-- END_STEP -->
2. Run the Dotnet service app with Dapr:
<!-- STEP
name: Run order-processor service
expected_stdout_lines:
- '== APP == Welcome to the workflows example!'
- '== APP == There are now: 90 Cars left in stock'
- '== APP == Workflow Status: COMPLETED'
- "Exited App successfully"
expected_stderr_lines:
output_match_mode: substring
background: true
sleep: 15
-->
```bash
cd ./order-processor
dapr run --dapr-grpc-port 4001 dotnet run
```
<!-- END_STEP -->

View File

@ -0,0 +1,2 @@
include ../../../docker.mk
include ../../../validate.mk

View File

@ -0,0 +1,25 @@
namespace WorkflowConsoleApp.Activities
{
using System.Threading.Tasks;
using Dapr.Workflow;
using Microsoft.Extensions.Logging;
record Notification(string Message);
class NotifyActivity : WorkflowActivity<Notification, object>
{
readonly ILogger logger;
public NotifyActivity(ILoggerFactory loggerFactory)
{
this.logger = loggerFactory.CreateLogger<NotifyActivity>();
}
public override Task<object> RunAsync(WorkflowActivityContext context, Notification notification)
{
this.logger.LogInformation(notification.Message);
return Task.FromResult<object>(null);
}
}
}

View File

@ -0,0 +1,40 @@
namespace WorkflowConsoleApp.Activities
{
using System.Threading.Tasks;
using Dapr.Client;
using Dapr.Workflow;
using Microsoft.Extensions.Logging;
using WorkflowConsoleApp.Models;
using System;
class ProcessPaymentActivity : WorkflowActivity<PaymentRequest, object>
{
readonly ILogger logger;
readonly DaprClient client;
public ProcessPaymentActivity(ILoggerFactory loggerFactory, DaprClient client)
{
this.logger = loggerFactory.CreateLogger<ProcessPaymentActivity>();
this.client = client;
}
public override async Task<object> RunAsync(WorkflowActivityContext context, PaymentRequest req)
{
this.logger.LogInformation(
"Processing payment: {requestId} for {amount} {item} at ${currency}",
req.RequestId,
req.Amount,
req.ItemBeingPruchased,
req.Currency);
// Simulate slow processing
await Task.Delay(TimeSpan.FromSeconds(7));
this.logger.LogInformation(
"Payment for request ID '{requestId}' processed successfully",
req.RequestId);
return null;
}
}
}

View File

@ -0,0 +1,62 @@
namespace WorkflowConsoleApp.Activities
{
using System.Threading.Tasks;
using Dapr.Client;
using Dapr.Workflow;
using Microsoft.Extensions.Logging;
using WorkflowConsoleApp.Models;
using System;
class ReserveInventoryActivity : WorkflowActivity<InventoryRequest, InventoryResult>
{
readonly ILogger logger;
readonly DaprClient client;
static readonly string storeName = "statestore";
public ReserveInventoryActivity(ILoggerFactory loggerFactory, DaprClient client)
{
this.logger = loggerFactory.CreateLogger<ReserveInventoryActivity>();
this.client = client;
}
public override async Task<InventoryResult> RunAsync(WorkflowActivityContext context, InventoryRequest req)
{
this.logger.LogInformation(
"Reserving inventory for order {requestId} of {quantity} {name}",
req.RequestId,
req.Quantity,
req.ItemName);
OrderPayload orderResponse;
string key;
// Ensure that the store has items
(orderResponse, key) = await client.GetStateAndETagAsync<OrderPayload>(storeName, req.ItemName);
// Catch for the case where the statestore isn't setup
if (orderResponse == null)
{
// Not enough items.
return new InventoryResult(false, orderResponse);
}
this.logger.LogInformation(
"There are: {requestId}, {name} available for purchase",
orderResponse.Quantity,
orderResponse.Name);
// See if there're enough items to purchase
if (orderResponse.Quantity >= req.Quantity)
{
// Simulate slow processing
await Task.Delay(TimeSpan.FromSeconds(2));
return new InventoryResult(true, orderResponse);
}
// Not enough items.
return new InventoryResult(false, orderResponse);
}
}
}

View File

@ -0,0 +1,52 @@
namespace WorkflowConsoleApp.Activities
{
using System.Threading.Tasks;
using Dapr.Client;
using Dapr.Workflow;
using WorkflowConsoleApp.Models;
using Microsoft.Extensions.Logging;
using System;
class UpdateInventoryActivity : WorkflowActivity<PaymentRequest, Object>
{
static readonly string storeName = "statestore";
readonly ILogger logger;
readonly DaprClient client;
public UpdateInventoryActivity(ILoggerFactory loggerFactory, DaprClient client)
{
this.logger = loggerFactory.CreateLogger<UpdateInventoryActivity>();
this.client = client;
}
public override async Task<Object> RunAsync(WorkflowActivityContext context, PaymentRequest req)
{
this.logger.LogInformation(
"Checking Inventory for: Order# {requestId} for {amount} {item}",
req.RequestId,
req.Amount,
req.ItemBeingPruchased);
// Simulate slow processing
await Task.Delay(TimeSpan.FromSeconds(5));
// Determine if there are enough Items for purchase
var (original, originalETag) = await client.GetStateAndETagAsync<OrderPayload>(storeName, req.ItemBeingPruchased);
int newQuantity = original.Quantity - req.Amount;
if (newQuantity < 0)
{
this.logger.LogInformation(
"Payment for request ID '{requestId}' could not be processed. Insufficient inventory.",
req.RequestId);
throw new InvalidOperationException();
}
// Update the statestore with the new amount of paper clips
await client.SaveStateAsync<OrderPayload>(storeName, req.ItemBeingPruchased, new OrderPayload(Name: req.ItemBeingPruchased, TotalCost: req.Currency, Quantity: newQuantity));
this.logger.LogInformation($"There are now: {newQuantity} {original.Name} left in stock");
return null;
}
}
}

View File

@ -0,0 +1,9 @@
namespace WorkflowConsoleApp.Models
{
record OrderPayload(string Name, double TotalCost, int Quantity = 1);
record InventoryRequest(string RequestId, string ItemName, int Quantity);
record InventoryResult(bool Success, OrderPayload orderPayload);
record PaymentRequest(string RequestId, string ItemBeingPruchased, int Amount, double Currency);
record OrderResult(bool Processed);
record InventoryItem(string Name, double TotalCost, int Quantity);
}

View File

@ -0,0 +1,96 @@
using Dapr.Client;
using Dapr.Workflow;
using WorkflowConsoleApp.Activities;
using WorkflowConsoleApp.Models;
using WorkflowConsoleApp.Workflows;
using System.Text.Json;
using Microsoft.Extensions.Hosting;
using System;
using System.Threading;
using System.Threading.Tasks;
const string workflowComponent = "dapr";
const string storeName = "statestore";
const string workflowName = nameof(OrderProcessingWorkflow);
// The workflow host is a background service that connects to the sidecar over gRPC
var builder = Host.CreateDefaultBuilder(args).ConfigureServices(services =>
{
services.AddDaprWorkflow(options =>
{
// Note that it's also possible to register a lambda function as the workflow
// or activity implementation instead of a class.
options.RegisterWorkflow<OrderProcessingWorkflow>();
// These are the activities that get invoked by the workflow(s).
options.RegisterActivity<NotifyActivity>();
options.RegisterActivity<ReserveInventoryActivity>();
options.RegisterActivity<ProcessPaymentActivity>();
options.RegisterActivity<UpdateInventoryActivity>();
});
});
// Start the app - this is the point where we connect to the Dapr sidecar
using var host = builder.Build();
host.Start();
// Start the client
string daprPortStr = Environment.GetEnvironmentVariable("DAPR_GRPC_PORT");
if (string.IsNullOrEmpty(daprPortStr))
{
Environment.SetEnvironmentVariable("DAPR_GRPC_PORT", "4001");
}
using var daprClient = new DaprClientBuilder().Build();
// Start the console app
while (true)
{
var health = await daprClient.CheckHealthAsync();
Console.WriteLine("Welcome to the workflows example!");
Console.WriteLine("In this example, you will be starting a workflow and obtaining the status.");
// Populate the store with items
RestockInventory();
// Can we remove the logging info?
// Main Loop
// Generate a unique ID for the workflow
string orderId = Guid.NewGuid().ToString()[..8];
string itemToPurchase = "Cars";
int ammountToPurchase = 10;
Console.WriteLine("In this quickstart, you will be purhasing {0} {1}.", ammountToPurchase, itemToPurchase);
// Construct the order
OrderPayload orderInfo = new OrderPayload(itemToPurchase, 15000, ammountToPurchase);
OrderPayload orderResponse;
string key;
// Ensure that the store has items
(orderResponse, key) = await daprClient.GetStateAndETagAsync<OrderPayload>(storeName, itemToPurchase);
// Start the workflow
Console.WriteLine("Starting workflow {0} purchasing {1} {2}", orderId, ammountToPurchase, itemToPurchase);
var response = await daprClient.StartWorkflowAsync(orderId, workflowComponent, workflowName, orderInfo, null, CancellationToken.None);
// Wait a second to allow workflow to start
await Task.Delay(TimeSpan.FromSeconds(1));
var state = await daprClient.GetWorkflowAsync(orderId, workflowComponent, workflowName);
Console.WriteLine("Your workflow has started. Here is the status of the workflow: {0}", state);
while (state.metadata["dapr.workflow.runtime_status"].ToString() == "RUNNING")
{
await Task.Delay(TimeSpan.FromSeconds(5));
state = await daprClient.GetWorkflowAsync(orderId, workflowComponent, workflowName);
}
Console.WriteLine("Your workflow has completed: {0}", JsonSerializer.Serialize(state));
Console.WriteLine("Workflow Status: {0}", state.metadata["dapr.workflow.runtime_status"]);
break;
}
void RestockInventory()
{
daprClient.SaveStateAsync<OrderPayload>(storeName, "Cars", new OrderPayload(Name: "Cars", TotalCost: 15000, Quantity: 100));
return;
}

View File

@ -0,0 +1,13 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net6.0</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Dapr.AspNetCore" Version="1.10.0-rc01" />
<PackageReference Include="Dapr.Workflow" Version="1.10.0-rc01" />
</ItemGroup>
</Project>

View File

@ -0,0 +1,67 @@
namespace WorkflowConsoleApp.Workflows
{
using System.Threading.Tasks;
using Dapr.Workflow;
using DurableTask.Core.Exceptions;
using WorkflowConsoleApp.Activities;
using WorkflowConsoleApp.Models;
class OrderProcessingWorkflow : Workflow<OrderPayload, OrderResult>
{
public override async Task<OrderResult> RunAsync(WorkflowContext context, OrderPayload order)
{
string orderId = context.InstanceId;
// Notify the user that an order has come through
await context.CallActivityAsync(
nameof(NotifyActivity),
new Notification($"Received order {orderId} for {order.Quantity} {order.Name} at ${order.TotalCost}"));
string requestId = context.InstanceId;
// Determine if there is enough of the item available for purchase by checking the inventory
InventoryResult result = await context.CallActivityAsync<InventoryResult>(
nameof(ReserveInventoryActivity),
new InventoryRequest(RequestId: orderId, order.Name, order.Quantity));
// If there is insufficient inventory, fail and let the user know
if (!result.Success)
{
// End the workflow here since we don't have sufficient inventory
await context.CallActivityAsync(
nameof(NotifyActivity),
new Notification($"Insufficient inventory for {order.Name}"));
return new OrderResult(Processed: false);
}
// There is enough inventory available so the user can purchase the item(s). Process their payment
await context.CallActivityAsync(
nameof(ProcessPaymentActivity),
new PaymentRequest(RequestId: orderId, order.Name, order.Quantity, order.TotalCost));
try
{
// There is enough inventory available so the user can purchase the item(s). Process their payment
await context.CallActivityAsync(
nameof(UpdateInventoryActivity),
new PaymentRequest(RequestId: orderId, order.Name, order.Quantity, order.TotalCost));
}
catch (TaskFailedException)
{
// Let them know their payment was processed
await context.CallActivityAsync(
nameof(NotifyActivity),
new Notification($"Order {orderId} Failed! You are now getting a refund"));
return new OrderResult(Processed: false);
}
// Let them know their payment was processed
await context.CallActivityAsync(
nameof(NotifyActivity),
new Notification($"Order {orderId} has completed!"));
// End the workflow with a success result
return new OrderResult(Processed: true);
}
}
}