Improved workflows example program and added in statestore functionality. (#1020)

* Workflow Management - Initial Methods (#1003)

Initial work for workflows DotNET SDK

Signed-off-by: Ryan Lettieri <ryanLettieri@microsoft.com>

* Beefed up the workflows example program and added in statestore functionality

Signed-off-by: Ryan Lettieri <ryanLettieri@microsoft.com>

* Addressing a bunch of review comments

Signed-off-by: Ryan Lettieri <ryanLettieri@microsoft.com>

* Updates to readme and demo for workflows

Signed-off-by: Ryan Lettieri <ryanLettieri@microsoft.com>

* Changed webapp to console app

Signed-off-by: Ryan Lettieri <ryanLettieri@microsoft.com>

* Update DurableTask SDK dependency to get ARM64 compatibility (#1024)

* Update DurableTask SDK dependency to get ARM64 compatibility

Signed-off-by: Chris Gillum <cgillum@microsoft.com>

* Fix issue with gRPC address override behavior

Signed-off-by: Chris Gillum <cgillum@microsoft.com>
Signed-off-by: Ryan Lettieri <ryanLettieri@microsoft.com>

* Remove Web APIs and web dependencies

Signed-off-by: Ryan Lettieri <ryanLettieri@microsoft.com>

* Renaming WorkflowWebApp to WorkflowConsoleApp

Signed-off-by: Ryan Lettieri <ryanLettieri@microsoft.com>

* Various updates to the sample app

- Replaced DaprClient with WorkflowEngineClient
- Removed unused etag logic
- Fixed incorrect usage of certain model types
- Cleaned up logs and console output
- Simplified program loop
- Cleaned up console output and added some coloring
- Added error handling in the console interactions
- Various other tweaks/simplifications/enhancements

Signed-off-by: Chris Gillum <cgillum@microsoft.com>
Signed-off-by: Ryan Lettieri <ryanLettieri@microsoft.com>

* Updates to README and demo http commands

Signed-off-by: Ryan Lettieri <ryanLettieri@microsoft.com>

* Make README copy/paste-able and some other minor tweaks

Signed-off-by: Chris Gillum <cgillum@microsoft.com>
Signed-off-by: Ryan Lettieri <ryanLettieri@microsoft.com>

* Adding in Paul's devcontainer work

Signed-off-by: Ryan Lettieri <ryanLettieri@microsoft.com>

* More README touch-ups

Signed-off-by: Chris Gillum <cgillum@microsoft.com>
Signed-off-by: Ryan Lettieri <ryanLettieri@microsoft.com>

* [docs] Add workflows to .NET client doc (#1019)

* add workflows to client page

Signed-off-by: Hannah Hunter <hannahhunter@microsoft.com>
Signed-off-by: Ryan Lettieri <ryanLettieri@microsoft.com>

* Updating workflows readme and example

Signed-off-by: Ryan Lettieri <ryanLettieri@microsoft.com>

* Fixing README for letting users know which .NET is needed

Signed-off-by: Ryan Lettieri <ryanLettieri@microsoft.com>

* moving using statements above the namespace

Signed-off-by: Ryan Lettieri <ryanLettieri@microsoft.com>

---------

Signed-off-by: Ryan Lettieri <ryanLettieri@microsoft.com>
Signed-off-by: Chris Gillum <cgillum@microsoft.com>
Signed-off-by: Hannah Hunter <hannahhunter@microsoft.com>
Co-authored-by: Ryan Lettieri <ryanLettieri@microsoft.com>
Co-authored-by: Chris Gillum <cgillum@microsoft.com>
Co-authored-by: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com>
This commit is contained in:
Ryan Lettieri 2023-02-10 12:55:37 -07:00 committed by GitHub
parent bc3ec80e4a
commit 9dcae7b0e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 594 additions and 245 deletions

29
.devcontainer/Dockerfile Normal file
View File

@ -0,0 +1,29 @@
#
# Copyright 2021 The Dapr Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
ARG VARIANT=bullseye
FROM mcr.microsoft.com/vscode/devcontainers/dotnet:dev-7.0-bullseye
# Install minikube
RUN MINIKUBE_URL="https://storage.googleapis.com/minikube/releases/latest/minikube-linux-amd64" \
&& sudo curl -sSL -o /usr/local/bin/minikube "${MINIKUBE_URL}" \
&& sudo chmod 0755 /usr/local/bin/minikube \
&& MINIKUBE_SHA256=$(curl -sSL "${MINIKUBE_URL}.sha256") \
&& echo "${MINIKUBE_SHA256} */usr/local/bin/minikube" | sha256sum -c -
# Install Dapr CLI
RUN wget -q https://raw.githubusercontent.com/dapr/cli/master/install/install.sh -O - | /bin/bash
# Install Azure Dev CLI
RUN curl -fsSL https://aka.ms/install-azd.sh | bash

View File

@ -0,0 +1,51 @@
{
"name": "Azure Developer CLI",
"build": {
"dockerfile": "Dockerfile",
"args": {
"VARIANT": "bullseye"
}
},
"features": {
"ghcr.io/devcontainers/features/azure-cli:1": {
"version": "2.38"
},
"ghcr.io/devcontainers/features/docker-from-docker:1": {
"version": "20.10"
},
"ghcr.io/devcontainers/features/dotnet:1": {
"version": "6.0"
},
"ghcr.io/devcontainers/features/github-cli:1": {
"version": "2"
},
"ghcr.io/devcontainers/features/node:1": {
"version": "16",
"nodeGypDependencies": false
}
},
"extensions": [
"ms-azuretools.azure-dev",
"ms-azuretools.vscode-bicep",
"ms-azuretools.vscode-docker",
"ms-vscode.vscode-node-azure-pack",
"ms-dotnettools.csharp",
"ms-dotnettools.vscode-dotnet-runtime",
"ms-azuretools.vscode-dapr",
"GitHub.copilot"
],
"forwardPorts": [
3000,
3100,
3500,
3501,
5000,
5007
],
"postCreateCommand": ".devcontainer/localinit.sh",
"remoteUser": "vscode",
"hostRequirements": {
"memory": "8gb"
}
}

View File

@ -0,0 +1,9 @@
# install Azure CLI extension for Container Apps
az config set extension.use_dynamic_install=yes_without_prompt
az extension add --name containerapp --yes
# install Node.js and NPM LTS
nvm install v18.12.1
# initialize Dapr
dapr init --runtime-version=1.10.0-rc.2

View File

@ -48,10 +48,11 @@ This repo builds the following packages:
- Dapr.Actors
- Dapr.Actors.AspNetCore
- Dapr.Extensions.Configuration
- Dapr.Workflow
### Prerequisites
Each project is a normal C# project. At minimum, you need [.NET 5.0 SDK](https://dotnet.microsoft.com/download/dotnet/5.0) to build, test, and generate NuGet packages.
Each project is a normal C# project. At minimum, you need [.NET 6.0 SDK](https://dotnet.microsoft.com/download/dotnet/6.0) to build, test, and generate NuGet packages.
Also make sure to reference the [.NET SDK contribution guide](https://docs.dapr.io/contributing/sdk-contrib/dotnet-contributing/)

View File

@ -90,14 +90,17 @@ EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Dapr.Workflow", "src\Dapr.Workflow\Dapr.Workflow.csproj", "{07578B6C-9B96-4B3D-BA2E-7800EFCA7F99}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Workflow", "Workflow", "{BF3ED6BF-ADF3-4D25-8E89-02FB8D945CA9}"
ProjectSection(SolutionItems) = preProject
examples\Workflow\README.md = examples\Workflow\README.md
EndProjectSection
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WorkflowWebApp", "examples\Workflow\WorkflowWebApp\WorkflowWebApp.csproj", "{5C61ABED-7623-4C28-A5C9-C5972A0F669C}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "WorkflowConsoleApp", "examples\Workflow\WorkflowConsoleApp\WorkflowConsoleApp.csproj", "{5C61ABED-7623-4C28-A5C9-C5972A0F669C}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "PublishSubscribe", "PublishSubscribe", "{0EF6EA64-D7C3-420D-9890-EAE8D54A57E6}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "PublishEventExample", "examples\Client\PublishSubscribe\PublishEventExample\PublishEventExample.csproj", "{4A175C27-EAFE-47E7-90F6-873B37863656}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "PublishEventExample", "examples\Client\PublishSubscribe\PublishEventExample\PublishEventExample.csproj", "{4A175C27-EAFE-47E7-90F6-873B37863656}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "BulkPublishEventExample", "examples\Client\PublishSubscribe\BulkPublishEventExample\BulkPublishEventExample.csproj", "{DDC41278-FB60-403A-B969-2AEBD7C2D83C}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "BulkPublishEventExample", "examples\Client\PublishSubscribe\BulkPublishEventExample\BulkPublishEventExample.csproj", "{DDC41278-FB60-403A-B969-2AEBD7C2D83C}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution

View File

@ -1,6 +1,6 @@
# Dapr Workflow with ASP.NET Core sample
This Dapr workflow example shows how to create a Dapr workflow (`Workflow`) and invoke it using ASP.NET Core web APIs.
This Dapr workflow example shows how to create a Dapr workflow (`Workflow`) and invoke it using the console.
## Prerequisites
@ -11,105 +11,113 @@ This Dapr workflow example shows how to create a Dapr workflow (`Workflow`) and
## Projects in sample
This sample contains a single [WorkflowWebApp](./WorkflowWebApp) ASP.NET Core project. It combines both the workflow implementations and the web APIs for starting and querying workflows instances.
This sample contains a single [WorkflowConsoleApp](./WorkflowConsoleApp) .NET project. It utilizes the workflow SDK as well as the workflow management API for starting and querying workflows instances.
The main `Program.cs` file contains the main setup of the app, including the registration of the web APIs and the registration of the workflow and workflow activities. The workflow definition is found in `Workflows` directory and the workflow activity definitions are found in the `Activities` directory.
The main `Program.cs` file contains the main setup of the app, including the registration of the workflow and workflow activities. The workflow definition is found in the `Workflows` directory and the workflow activity definitions are found in the `Activities` directory.
## Running the example
To run the workflow web app locally, run this command in the `WorkflowWebApp` directory:
To run the workflow web app locally, two separate terminal windows are required.
In the first terminal window, from the `WorkflowConsoleApp` directory, run the following command to start the program itself:
```sh
dapr run --app-id wfwebapp dotnet run
dotnet run
```
The application will listen for HTTP requests at `http://localhost:10080`.
Next, in a separate terminal window, start the dapr sidecar:
To start a workflow, use the following command to send an HTTP POST request, which triggers an HTTP API that starts the workflow using the Dapr Workflow client. Two identical `curl` commands are shown, one for Linux/macOS (bash) and the other for Windows (PowerShell). The body of the request is used as the input of the workflow.
```sh
dapr run --app-id wfapp --dapr-grpc-port 4001 --dapr-http-port 3500
```
On Linux/macOS (bash):
Dapr listens for HTTP requests at `http://localhost:3500`.
This example illustrates a purchase order processing workflow. The console prompts provide directions on how to both purchase and restock items.
To start a workflow, you have two options:
1. Follow the directions from the console prompts.
2. Use the workflows API and send a request to Dapr directly. Examples are included below as well as in the "demo.http" file down the "WorkflowConsoleApp" directory.
For the workflow API option, two identical `curl` commands are shown, one for Linux/macOS (bash) and the other for Windows (PowerShell). The body of the request is the purchase order information used as the input of the workflow.
Make note of the "1234" in the commands below. This represents the unique identifier for the workflow run and can be replaced with any identifier of your choosing.
```bash
curl -i -X POST http://localhost:10080/orders \
curl -i -X POST http://localhost:3500/v1.0-alpha1/workflows/dapr/OrderProcessingWorkflow/1234/start \
-H "Content-Type: application/json" \
-d '{"name": "Paperclips", "totalCost": 99.95, "quantity": 1}'
-d '{ "input" : {"Name": "Paperclips", "TotalCost": 99.95, "Quantity": 1}}'
```
On Windows (PowerShell):
```powershell
curl -i -X POST http://localhost:10080/orders `
curl -i -X POST http://localhost:3500/v1.0-alpha1/workflows/dapr/OrderProcessingWorkflow/1234/start `
-H "Content-Type: application/json" `
-d '{"name": "Paperclips", "totalCost": 99.95, "quantity": 1}'
-d '{ "input" : {"Name": "Paperclips", "TotalCost": 99.95, "Quantity": 1}}'
```
If successful, you should see a response like the following, which contains a `Location` header pointing to a status endpoint for the workflow that was created with a randomly generated 8-digit ID:
If successful, you should see a response like the following:
```http
HTTP/1.1 202 Accepted
Content-Length: 0
Date: Tue, 24 Jan 2023 00:02:02 GMT
Server: Kestrel
Location: http://localhost:10080/orders/cdcce425
```json
{"instance_id":"1234"}
```
Next, send an HTTP request to the URL in the `Location` header in the previous HTTP response, like in the following example:
Next, send an HTTP request to get the status of the workflow that was started:
```bash
curl -i http://localhost:10080/orders/cdcce425
curl -i -X GET http://localhost:3500/v1.0-alpha1/workflows/dapr/OrderProcessingWorkflow/1234
```
If the workflow has completed running, you should see the following output (formatted for readability):
```http
HTTP/1.1 200 OK
Content-Type: application/json; charset=utf-8
Date: Tue, 24 Jan 2023 00:10:53 GMT
Server: Kestrel
Transfer-Encoding: chunked
The workflow is designed to take several seconds to complete. If the workflow hasn't completed yet when you issue the previous command, you should see the following JSON response (formatted for readability):
```json
{
"details": {
"name": "Paperclips",
"quantity": 1,
"totalCost": 99.95
},
"result": {
"processed": true
},
"status": "Completed"
"WFInfo": {
"instance_id": "1234"
},
"start_time": "2023-02-02T23:34:53Z",
"metadata": {
"dapr.workflow.custom_status": "",
"dapr.workflow.input": "{\"Name\":\"Paperclips\",\"Quantity\":1,\"TotalCost\":99.95}",
"dapr.workflow.last_updated": "2023-02-02T23:35:07Z",
"dapr.workflow.name": "OrderProcessingWorkflow",
"dapr.workflow.output": "{\"Processed\":true}",
"dapr.workflow.runtime_status": "RUNNING"
}
}
```
If the workflow hasn't completed yet, you might instead see the following:
```http
HTTP/1.1 202 Accepted
Content-Type: application/json; charset=utf-8
Date: Tue, 24 Jan 2023 00:17:49 GMT
Location: http://localhost:10080/orders/cdcce425
Server: Kestrel
Transfer-Encoding: chunked
Once the workflow has completed running, you should see the following output, indicating that it has reached the "COMPLETED" status:
```json
{
"details": {
"name": "Paperclips",
"quantity": 1,
"totalCost": 99.95
},
"status": "Running"
"WFInfo": {
"instance_id": "1234"
},
"start_time": "2023-02-02T23:34:53Z",
"metadata": {
"dapr.workflow.custom_status": "",
"dapr.workflow.input": "{\"Name\":\"Paperclips\",\"Quantity\":1,\"TotalCost\":99.95}",
"dapr.workflow.last_updated": "2023-02-02T23:35:07Z",
"dapr.workflow.name": "OrderProcessingWorkflow",
"dapr.workflow.output": "{\"Processed\":true}",
"dapr.workflow.runtime_status": "COMPLETED"
}
}
```
When the workflow has completed, the stdout of the web app should look like the following:
When the workflow has completed, the stdout of the workflow app should look like the following:
```log
info: WorkflowWebApp.Activities.NotifyActivity[0]
Received order cdcce425 for Paperclips at $99.95
info: WorkflowWebApp.Activities.ReserveInventoryActivity[0]
Reserving inventory: cdcce425, Paperclips, 1
info: WorkflowWebApp.Activities.ProcessPaymentActivity[0]
Processing payment: cdcce425, 99.95, USD
info: WorkflowWebApp.Activities.NotifyActivity[0]
Order cdcce425 processed successfully!
info: WorkflowConsoleApp.Activities.NotifyActivity[0]
Received order 1234 for Paperclips at $99.95
info: WorkflowConsoleApp.Activities.ReserveInventoryActivity[0]
Reserving inventory: 1234, Paperclips, 1
info: WorkflowConsoleApp.Activities.ProcessPaymentActivity[0]
Processing payment: 1234, 99.95, USD
info: WorkflowConsoleApp.Activities.NotifyActivity[0]
Order 1234 processed successfully!
```
If you have Zipkin configured for Dapr locally on your machine, then you can view the workflow trace spans in the Zipkin web UI (typically at http://localhost:9411/zipkin/).

View File

@ -1,8 +1,9 @@
namespace WorkflowWebApp.Activities
{
using System.Threading.Tasks;
using Dapr.Workflow;
using System.Threading.Tasks;
using Dapr.Workflow;
using Microsoft.Extensions.Logging;
namespace WorkflowConsoleApp.Activities
{
record Notification(string Message);
class NotifyActivity : WorkflowActivity<Notification, object>

View File

@ -1,25 +1,29 @@
namespace WorkflowWebApp.Activities
using System.Threading.Tasks;
using Dapr.Client;
using Dapr.Workflow;
using Microsoft.Extensions.Logging;
using WorkflowConsoleApp.Models;
namespace WorkflowConsoleApp.Activities
{
using System.Threading.Tasks;
using Dapr.Workflow;
record PaymentRequest(string RequestId, double Amount, string Currency);
class ProcessPaymentActivity : WorkflowActivity<PaymentRequest, object>
{
readonly ILogger logger;
readonly DaprClient client;
public ProcessPaymentActivity(ILoggerFactory loggerFactory)
public ProcessPaymentActivity(ILoggerFactory loggerFactory, DaprClient client)
{
this.logger = loggerFactory.CreateLogger<ProcessPaymentActivity>();
this.client = client;
}
public override async Task<object> RunAsync(WorkflowActivityContext context, PaymentRequest req)
{
this.logger.LogInformation(
"Processing payment: {requestId}, {amount}, {currency}",
"Processing payment: {requestId} for {amount} {item} at ${currency}",
req.RequestId,
req.Amount,
req.ItemName,
req.Currency);
// Simulate slow processing

View File

@ -0,0 +1,60 @@
using System.Threading.Tasks;
using Dapr.Client;
using Dapr.Workflow;
using Microsoft.Extensions.Logging;
using WorkflowConsoleApp.Models;
namespace WorkflowConsoleApp.Activities
{
class ReserveInventoryActivity : WorkflowActivity<InventoryRequest, InventoryResult>
{
readonly ILogger logger;
readonly DaprClient client;
static readonly string storeName = "statestore";
public ReserveInventoryActivity(ILoggerFactory loggerFactory, DaprClient client)
{
this.logger = loggerFactory.CreateLogger<ReserveInventoryActivity>();
this.client = client;
}
public override async Task<InventoryResult> RunAsync(WorkflowActivityContext context, InventoryRequest req)
{
this.logger.LogInformation(
"Reserving inventory for order '{requestId}' of {quantity} {name}",
req.RequestId,
req.Quantity,
req.ItemName);
// Ensure that the store has items
InventoryItem item = await client.GetStateAsync<InventoryItem>(
storeName,
req.ItemName.ToLowerInvariant());
// Catch for the case where the statestore isn't setup
if (item == null)
{
// Not enough items.
return new InventoryResult(false, item);
}
this.logger.LogInformation(
"There are {quantity} {name} available for purchase",
item.Quantity,
item.Name);
// See if there're enough items to purchase
if (item.Quantity >= req.Quantity)
{
// Simulate slow processing
await Task.Delay(TimeSpan.FromSeconds(2));
return new InventoryResult(true, item);
}
// Not enough items.
return new InventoryResult(false, item);
}
}
}

View File

@ -0,0 +1,59 @@
using System.Threading.Tasks;
using Dapr.Client;
using Dapr.Workflow;
using WorkflowConsoleApp.Models;
using Microsoft.Extensions.Logging;
namespace WorkflowConsoleApp.Activities
{
class UpdateInventoryActivity : WorkflowActivity<PaymentRequest, object>
{
static readonly string storeName = "statestore";
readonly ILogger logger;
readonly DaprClient client;
public UpdateInventoryActivity(ILoggerFactory loggerFactory, DaprClient client)
{
this.logger = loggerFactory.CreateLogger<UpdateInventoryActivity>();
this.client = client;
}
public override async Task<object> RunAsync(WorkflowActivityContext context, PaymentRequest req)
{
this.logger.LogInformation(
"Checking inventory for order '{requestId}' for {amount} {name}",
req.RequestId,
req.Amount,
req.ItemName);
// Simulate slow processing
await Task.Delay(TimeSpan.FromSeconds(5));
// Determine if there are enough Items for purchase
InventoryItem item = await client.GetStateAsync<InventoryItem>(
storeName,
req.ItemName.ToLowerInvariant());
int newQuantity = item.Quantity - req.Amount;
if (newQuantity < 0)
{
this.logger.LogInformation(
"Payment for request ID '{requestId}' could not be processed. Insufficient inventory.",
req.RequestId);
throw new InvalidOperationException();
}
// Update the statestore with the new amount of the item
await client.SaveStateAsync(
storeName,
req.ItemName.ToLowerInvariant(),
new InventoryItem(Name: req.ItemName, PerItemCost: item.PerItemCost, Quantity: newQuantity));
this.logger.LogInformation(
"There are now {quantity} {name} left in stock",
newQuantity,
item.Name);
return null;
}
}
}

View File

@ -0,0 +1,9 @@
namespace WorkflowConsoleApp.Models
{
record OrderPayload(string Name, double TotalCost, int Quantity = 1);
record InventoryRequest(string RequestId, string ItemName, int Quantity);
record InventoryResult(bool Success, InventoryItem orderPayload);
record PaymentRequest(string RequestId, string ItemName, int Amount, double Currency);
record OrderResult(bool Processed);
record InventoryItem(string Name, double PerItemCost, int Quantity);
}

View File

@ -0,0 +1,198 @@
using Dapr.Client;
using Dapr.Workflow;
using WorkflowConsoleApp.Activities;
using WorkflowConsoleApp.Models;
using WorkflowConsoleApp.Workflows;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.DependencyInjection;
const string storeName = "statestore";
// The workflow host is a background service that connects to the sidecar over gRPC
var builder = Host.CreateDefaultBuilder(args).ConfigureServices(services =>
{
services.AddDaprWorkflow(options =>
{
// Note that it's also possible to register a lambda function as the workflow
// or activity implementation instead of a class.
options.RegisterWorkflow<OrderProcessingWorkflow>();
// These are the activities that get invoked by the workflow(s).
options.RegisterActivity<NotifyActivity>();
options.RegisterActivity<ReserveInventoryActivity>();
options.RegisterActivity<ProcessPaymentActivity>();
options.RegisterActivity<UpdateInventoryActivity>();
});
});
// Dapr uses a random port for gRPC by default. If we don't know what that port
// is (because this app was started separate from dapr), then assume 4001.
if (string.IsNullOrEmpty(Environment.GetEnvironmentVariable("DAPR_GRPC_PORT")))
{
Environment.SetEnvironmentVariable("DAPR_GRPC_PORT", "4001");
}
Console.ForegroundColor = ConsoleColor.White;
Console.WriteLine("*** Welcome to the Dapr Workflow console app sample!");
Console.WriteLine("*** Using this app, you can place orders that start workflows.");
Console.WriteLine("*** Ensure that Dapr is running in a separate terminal window using the following command:");
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine(" dapr run --dapr-grpc-port 4001 --app-id wfapp");
Console.WriteLine();
Console.ResetColor();
// Start the app - this is the point where we connect to the Dapr sidecar to
// listen for workflow work-items to execute.
using var host = builder.Build();
host.Start();
using var daprClient = new DaprClientBuilder().Build();
// Wait for the sidecar to become available
while (!await daprClient.CheckHealthAsync())
{
Thread.Sleep(TimeSpan.FromSeconds(5));
}
// Wait one more second for the workflow engine to finish initializing.
// This is just to make the log output look a little nicer.
Thread.Sleep(TimeSpan.FromSeconds(1));
// NOTE: WorkflowEngineClient will be replaced with a richer version of DaprClient
// in a subsequent SDK release. This is a temporary workaround.
WorkflowEngineClient workflowClient = host.Services.GetRequiredService<WorkflowEngineClient>();
var baseInventory = new List<InventoryItem>
{
new InventoryItem(Name: "Paperclips", PerItemCost: 5, Quantity: 100),
new InventoryItem(Name: "Cars", PerItemCost: 15000, Quantity: 100),
new InventoryItem(Name: "Computers", PerItemCost: 500, Quantity: 100),
};
// Populate the store with items
await RestockInventory(daprClient, baseInventory);
// Start the input loop
while (true)
{
// Get the name of the item to order and make sure we have inventory
string items = string.Join(", ", baseInventory.Select(i => i.Name));
Console.WriteLine($"Enter the name of one of the following items to order [{items}].");
Console.WriteLine("To restock items, type 'restock'.");
string itemName = Console.ReadLine()?.Trim();
if (string.IsNullOrEmpty(itemName))
{
continue;
}
else if (string.Equals("restock", itemName, StringComparison.OrdinalIgnoreCase))
{
await RestockInventory(daprClient, baseInventory);
continue;
}
InventoryItem item = baseInventory.FirstOrDefault(item => string.Equals(item.Name, itemName, StringComparison.OrdinalIgnoreCase));
if (item == null)
{
Console.ForegroundColor = ConsoleColor.Yellow;
Console.WriteLine($"We don't have {itemName}!");
Console.ResetColor();
continue;
}
Console.WriteLine($"How many {itemName} would you like to purchase?");
string amountStr = Console.ReadLine().Trim();
if (!int.TryParse(amountStr, out int amount) || amount <= 0)
{
Console.ForegroundColor = ConsoleColor.Yellow;
Console.WriteLine($"Invalid input. Assuming you meant to type '1'.");
Console.ResetColor();
amount = 1;
}
// Construct the order with a unique order ID
string orderId = $"{itemName.ToLowerInvariant()}-{Guid.NewGuid().ToString()[..8]}";
double totalCost = amount * item.PerItemCost;
var orderInfo = new OrderPayload(itemName.ToLowerInvariant(), totalCost, amount);
// Start the workflow using the order ID as the workflow ID
Console.WriteLine($"Starting order workflow '{orderId}' purchasing {amount} {itemName}");
await workflowClient.ScheduleNewWorkflowAsync(
name: nameof(OrderProcessingWorkflow),
instanceId: orderId,
input: orderInfo);
// Wait for the workflow to complete
WorkflowState state = await workflowClient.GetWorkflowStateAsync(
instanceId: orderId,
getInputsAndOutputs: true);
while (!state.IsWorkflowCompleted)
{
Thread.Sleep(TimeSpan.FromSeconds(1));
state = await workflowClient.GetWorkflowStateAsync(
instanceId: orderId,
getInputsAndOutputs: true);
}
if (state.RuntimeStatus == WorkflowRuntimeStatus.Completed)
{
OrderResult result = state.ReadOutputAs<OrderResult>();
if (result.Processed)
{
Console.ForegroundColor = ConsoleColor.Green;
Console.WriteLine($"Order workflow is {state.RuntimeStatus} and the order was processed successfully.");
Console.ResetColor();
}
else
{
Console.WriteLine($"Order workflow is {state.RuntimeStatus} but the order was not processed.");
}
}
else if (state.RuntimeStatus == WorkflowRuntimeStatus.Failed)
{
// WorkflowEngineClient doesn't expose a way to get error information.
// For that, we resort to DaprClient. The experience will be improved in the next release.
GetWorkflowResponse response = await daprClient.GetWorkflowAsync(
instanceId: orderId,
workflowComponent: "dapr",
workflowName: nameof(OrderProcessingWorkflow),
CancellationToken.None);
string failureDetails = await GetWorkflowFailureDetails(daprClient, orderId);
Console.ForegroundColor = ConsoleColor.Red;
Console.WriteLine($"The workflow failed - {failureDetails}");
Console.ResetColor();
}
Console.WriteLine();
}
static async Task RestockInventory(DaprClient daprClient, List<InventoryItem> inventory)
{
Console.WriteLine("*** Restocking inventory...");
foreach (var item in inventory)
{
Console.WriteLine($"*** \t{item.Name}: {item.Quantity}");
await daprClient.SaveStateAsync(storeName, item.Name.ToLowerInvariant(), item);
}
}
static async Task<string> GetWorkflowFailureDetails(DaprClient daprClient, string orderId)
{
// Use DaprClient to get the error details
GetWorkflowResponse response = await daprClient.GetWorkflowAsync(
instanceId: orderId,
workflowComponent: "dapr",
workflowName: nameof(OrderProcessingWorkflow),
CancellationToken.None);
// Available metadata fields: https://github.com/dapr/dapr/blob/ad4c38756fa08dda5def8f0a7a6d672feb37be91/pkg/runtime/wfengine/component.go#L146-L161
if (response.metadata.TryGetValue("dapr.workflow.failure.error_type", out string errorType) &&
response.metadata.TryGetValue("dapr.workflow.failure.error_message", out string errorMessage))
{
return $"{errorType}: {errorMessage}";
}
else
{
return ":(";
}
}

View File

@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk.Web">
<Project Sdk="Microsoft.NET.Sdk">
<ItemGroup>
<ProjectReference Include="..\..\..\src\Dapr.Workflow\Dapr.Workflow.csproj" />
@ -9,6 +9,7 @@
<TargetFramework>net6</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<LangVersion>latest</LangVersion>
<NoWarn>612,618</NoWarn>
</PropertyGroup>

View File

@ -0,0 +1,65 @@
using System.Threading.Tasks;
using Dapr.Workflow;
using DurableTask.Core.Exceptions;
using WorkflowConsoleApp.Activities;
using WorkflowConsoleApp.Models;
namespace WorkflowConsoleApp.Workflows
{
class OrderProcessingWorkflow : Workflow<OrderPayload, OrderResult>
{
public override async Task<OrderResult> RunAsync(WorkflowContext context, OrderPayload order)
{
string orderId = context.InstanceId;
// Notify the user that an order has come through
await context.CallActivityAsync(
nameof(NotifyActivity),
new Notification($"Received order {orderId} for {order.Quantity} {order.Name} at ${order.TotalCost}"));
// Determine if there is enough of the item available for purchase by checking the inventory
InventoryResult result = await context.CallActivityAsync<InventoryResult>(
nameof(ReserveInventoryActivity),
new InventoryRequest(RequestId: orderId, order.Name, order.Quantity));
// If there is insufficient inventory, fail and let the user know
if (!result.Success)
{
// End the workflow here since we don't have sufficient inventory
await context.CallActivityAsync(
nameof(NotifyActivity),
new Notification($"Insufficient inventory for {order.Name}"));
return new OrderResult(Processed: false);
}
// There is enough inventory available so the user can purchase the item(s). Process their payment
await context.CallActivityAsync(
nameof(ProcessPaymentActivity),
new PaymentRequest(RequestId: orderId, order.Name, order.Quantity, order.TotalCost));
try
{
// There is enough inventory available so the user can purchase the item(s). Process their payment
await context.CallActivityAsync(
nameof(UpdateInventoryActivity),
new PaymentRequest(RequestId: orderId, order.Name, order.Quantity, order.TotalCost));
}
catch (TaskFailedException)
{
// Let them know their payment was processed
await context.CallActivityAsync(
nameof(NotifyActivity),
new Notification($"Order {orderId} Failed! You are now getting a refund"));
return new OrderResult(Processed: false);
}
// Let them know their payment was processed
await context.CallActivityAsync(
nameof(NotifyActivity),
new Notification($"Order {orderId} has completed!"));
// End the workflow with a success result
return new OrderResult(Processed: true);
}
}
}

View File

@ -0,0 +1,17 @@
### Start order processing workflow - replace xxx with any id you like
POST http://localhost:3500/v1.0-alpha1/workflows/dapr/OrderProcessingWorkflow/xxx/start
Content-Type: application/json
{ "input" : {"Name": "Paperclips", "TotalCost": 99.95, "Quantity": 1}}
### Start order processing workflow - replace xxx with any id you like
POST http://localhost:3500/v1.0-alpha1/workflows/dapr/OrderProcessingWorkflow/xxx/start
Content-Type: application/json
{ "input" : {"Name": "Cars", "TotalCost": 10000, "Quantity": 30}}
### Query dapr sidecar - replace xxx with id from the workflow you've created above
GET http://localhost:3500/v1.0-alpha1/workflows/dapr/OrderProcessingWorkflow/xxx
### Terminate the workflow - replace xxx with id from the workflow you've created above
POST http://localhost:3500/v1.0-alpha1/workflows/dapr/xxx/terminate

View File

@ -1,32 +0,0 @@
namespace WorkflowWebApp.Activities
{
using System.Threading.Tasks;
using Dapr.Workflow;
record InventoryRequest(string RequestId, string Name, int Quantity);
record InventoryResult(bool Success);
class ReserveInventoryActivity : WorkflowActivity<InventoryRequest, InventoryResult>
{
readonly ILogger logger;
public ReserveInventoryActivity(ILoggerFactory loggerFactory)
{
this.logger = loggerFactory.CreateLogger<ReserveInventoryActivity>();
}
public override async Task<InventoryResult> RunAsync(WorkflowActivityContext context, InventoryRequest req)
{
this.logger.LogInformation(
"Reserving inventory: {requestId}, {name}, {quantity}",
req.RequestId,
req.Name,
req.Quantity);
// Simulate slow processing
await Task.Delay(TimeSpan.FromSeconds(2));
return new InventoryResult(true);
}
}
}

View File

@ -1,82 +0,0 @@
using System.Text.Json.Serialization;
using Dapr.Workflow;
using Microsoft.AspNetCore.Mvc;
using WorkflowWebApp.Activities;
using WorkflowWebApp.Workflows;
using JsonOptions = Microsoft.AspNetCore.Http.Json.JsonOptions;
// The workflow host is a background service that connects to the sidecar over gRPC
WebApplicationBuilder builder = WebApplication.CreateBuilder(args);
// Configure HTTP JSON options.
builder.Services.Configure<JsonOptions>(options =>
{
options.SerializerOptions.Converters.Add(new JsonStringEnumConverter());
options.SerializerOptions.DefaultIgnoreCondition = JsonIgnoreCondition.WhenWritingNull;
});
// Dapr workflows are registered as part of the service configuration
builder.Services.AddDaprWorkflow(options =>
{
// Note that it's also possible to register a lambda function as the workflow
// or activity implementation instead of a class.
options.RegisterWorkflow<OrderProcessingWorkflow>();
// These are the activities that get invoked by the workflow(s).
options.RegisterActivity<NotifyActivity>();
options.RegisterActivity<ReserveInventoryActivity>();
options.RegisterActivity<ProcessPaymentActivity>();
});
WebApplication app = builder.Build();
// POST starts new order workflow instance
app.MapPost("/orders", async (WorkflowEngineClient client, [FromBody] OrderPayload orderInfo) =>
{
if (orderInfo?.Name == null)
{
return Results.BadRequest(new
{
message = "Order data was missing from the request",
example = new OrderPayload("Paperclips", 99.95),
});
}
// Randomly generated order ID that is 8 characters long.
string orderId = Guid.NewGuid().ToString()[..8];
await client.ScheduleNewWorkflowAsync(nameof(OrderProcessingWorkflow), orderId, orderInfo);
// return an HTTP 202 and a Location header to be used for status query
return Results.AcceptedAtRoute("GetOrderInfoEndpoint", new { orderId });
});
// GET fetches state for order workflow to report status
app.MapGet("/orders/{orderId}", async (string orderId, WorkflowEngineClient client) =>
{
WorkflowState state = await client.GetWorkflowStateAsync(orderId, true);
if (!state.Exists)
{
return Results.NotFound($"No order with ID = '{orderId}' was found.");
}
var httpResponsePayload = new
{
details = state.ReadInputAs<OrderPayload>(),
status = state.RuntimeStatus.ToString(),
result = state.ReadOutputAs<OrderResult>(),
};
if (state.IsWorkflowRunning)
{
// HTTP 202 Accepted - async polling clients should keep polling for status
return Results.AcceptedAtRoute("GetOrderInfoEndpoint", new { orderId }, httpResponsePayload);
}
else
{
// HTTP 200 OK
return Results.Ok(httpResponsePayload);
}
}).WithName("GetOrderInfoEndpoint");
app.Run();

View File

@ -1,44 +0,0 @@
namespace WorkflowWebApp.Workflows
{
using System.Threading.Tasks;
using Dapr.Workflow;
using WorkflowWebApp.Activities;
record OrderPayload(string Name, double TotalCost, int Quantity = 1);
record OrderResult(bool Processed);
class OrderProcessingWorkflow : Workflow<OrderPayload, OrderResult>
{
public override async Task<OrderResult> RunAsync(WorkflowContext context, OrderPayload order)
{
string orderId = context.InstanceId;
await context.CallActivityAsync(
nameof(NotifyActivity),
new Notification($"Received order {orderId} for {order.Name} at {order.TotalCost:c}"));
string requestId = context.InstanceId;
InventoryResult result = await context.CallActivityAsync<InventoryResult>(
nameof(ReserveInventoryActivity),
new InventoryRequest(RequestId: orderId, order.Name, order.Quantity));
if (!result.Success)
{
// End the workflow here since we don't have sufficient inventory
context.SetCustomStatus($"Insufficient inventory for {order.Name}");
return new OrderResult(Processed: false);
}
await context.CallActivityAsync(
nameof(ProcessPaymentActivity),
new PaymentRequest(RequestId: orderId, order.TotalCost, "USD"));
await context.CallActivityAsync(
nameof(NotifyActivity),
new Notification($"Order {orderId} processed successfully!"));
// End the workflow with a success result
return new OrderResult(Processed: true);
}
}
}

View File

@ -1,8 +0,0 @@
### Create new order
POST http://localhost:10080/orders
Content-Type: application/json
{"name": "Paperclips", "totalCost": 99.95, "quantity": 1}
### Query placeholder
GET http://localhost:10080/orders/XXX