Merge branch 'v1.13' into issue_3962

This commit is contained in:
Hannah Hunter 2024-02-21 12:33:18 -05:00 committed by GitHub
commit f4f08ad1fc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 1074 additions and 33 deletions

View File

@ -566,6 +566,33 @@ To launch a Dapr sidecar for the above example application, run a command simila
dapr run --app-id orderprocessing --app-port 6001 --dapr-http-port 3601 --dapr-grpc-port 60001 dotnet run
```
The above example returns a `BulkStateItem` with the serialized format of the value you saved to state. If you prefer that the value be deserialized by the SDK across each of your bulk response items, you can instead use the following:
```csharp
//dependencies
using Dapr.Client;
//code
namespace EventService
{
class Program
{
static async Task Main(string[] args)
{
string DAPR_STORE_NAME = "statestore";
//Using Dapr SDK to retrieve multiple states
using var client = new DaprClientBuilder().Build();
IReadOnlyList<BulkStateItem<Widget>> mulitpleStateResult = await client.GetBulkStateAsync<Widget>(DAPR_STORE_NAME, new List<string> { "widget_1", "widget_2" }, parallelism: 1);
}
}
class Widget
{
string Size { get; set; }
string Color { get; set; }
}
}
```
{{% /codetab %}}
{{% codetab %}}

View File

@ -34,7 +34,7 @@ The Dapr sidecar doesnt load any workflow definitions. Rather, the sidecar si
[Workflow activities]({{< ref "workflow-features-concepts.md#workflow-activites" >}}) are the basic unit of work in a workflow and are the tasks that get orchestrated in the business process.
{{< tabs Python JavaScript ".NET" Java >}}
{{< tabs Python JavaScript ".NET" Java Go >}}
{{% codetab %}}
@ -196,6 +196,27 @@ public class DemoWorkflowActivity implements WorkflowActivity {
{{% /codetab %}}
{{% codetab %}}
<!--go-->
Define each workflow activity you'd like your workflow to perform. The Activity input can be unmarshalled from the context with `ctx.GetInput`. Activities should be defined as taking a `ctx workflow.ActivityContext` parameter and returning an interface and error.
```go
func TestActivity(ctx workflow.ActivityContext) (any, error) {
var input int
if err := ctx.GetInput(&input); err != nil {
return "", err
}
// Do something here
return "result", nil
}
```
[See the Go SDK workflow activity example in context.](https://github.com/dapr/go-sdk/tree/main/examples/workflow/README.md)
{{% /codetab %}}
{{< /tabs >}}
@ -203,7 +224,7 @@ public class DemoWorkflowActivity implements WorkflowActivity {
Next, register and call the activites in a workflow.
{{< tabs Python JavaScript ".NET" Java >}}
{{< tabs Python JavaScript ".NET" Java Go >}}
{{% codetab %}}
@ -343,6 +364,37 @@ public class DemoWorkflowWorker {
[See the Java SDK workflow in context.](https://github.com/dapr/java-sdk/blob/master/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowWorker.java)
{{% /codetab %}}
{{% codetab %}}
<!--go-->
Define your workflow function with the parameter `ctx *workflow.WorkflowContext` and return any and error. Invoke your defined activities from within your workflow.
```go
func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) {
var input int
if err := ctx.GetInput(&input); err != nil {
return nil, err
}
var output string
if err := ctx.CallActivity(TestActivity, workflow.ActivityInput(input)).Await(&output); err != nil {
return nil, err
}
if err := ctx.WaitForExternalEvent("testEvent", time.Second*60).Await(&output); err != nil {
return nil, err
}
if err := ctx.CreateTimer(time.Second).Await(nil); err != nil {
return nil, nil
}
return output, nil
}
```
[See the Go SDK workflow in context.](https://github.com/dapr/go-sdk/tree/main/examples/workflow/README.md)
{{% /codetab %}}
{{< /tabs >}}
@ -351,7 +403,7 @@ public class DemoWorkflowWorker {
Finally, compose the application using the workflow.
{{< tabs Python JavaScript ".NET" Java >}}
{{< tabs Python JavaScript ".NET" Java Go >}}
{{% codetab %}}
@ -707,6 +759,336 @@ public class DemoWorkflow extends Workflow {
{{% /codetab %}}
{{% codetab %}}
<!--go-->
[As in the following example](https://github.com/dapr/go-sdk/tree/main/examples/workflow/README.md), a hello-world application using the Go SDK and Dapr Workflow would include:
- A Go package called `client` to receive the Go SDK client capabilities.
- The `TestWorkflow` method
- Creating the workflow with input and output.
- API calls. In the example below, these calls start and call the workflow activities.
```go
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/dapr/go-sdk/client"
"github.com/dapr/go-sdk/workflow"
)
var stage = 0
const (
workflowComponent = "dapr"
)
func main() {
w, err := workflow.NewWorker()
if err != nil {
log.Fatal(err)
}
fmt.Println("Worker initialized")
if err := w.RegisterWorkflow(TestWorkflow); err != nil {
log.Fatal(err)
}
fmt.Println("TestWorkflow registered")
if err := w.RegisterActivity(TestActivity); err != nil {
log.Fatal(err)
}
fmt.Println("TestActivity registered")
// Start workflow runner
if err := w.Start(); err != nil {
log.Fatal(err)
}
fmt.Println("runner started")
daprClient, err := client.NewClient()
if err != nil {
log.Fatalf("failed to intialise client: %v", err)
}
defer daprClient.Close()
ctx := context.Background()
// Start workflow test
respStart, err := daprClient.StartWorkflowBeta1(ctx, &client.StartWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
WorkflowName: "TestWorkflow",
Options: nil,
Input: 1,
SendRawInput: false,
})
if err != nil {
log.Fatalf("failed to start workflow: %v", err)
}
fmt.Printf("workflow started with id: %v\n", respStart.InstanceID)
// Pause workflow test
err = daprClient.PauseWorkflowBeta1(ctx, &client.PauseWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})
if err != nil {
log.Fatalf("failed to pause workflow: %v", err)
}
respGet, err := daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})
if err != nil {
log.Fatalf("failed to get workflow: %v", err)
}
if respGet.RuntimeStatus != workflow.StatusSuspended.String() {
log.Fatalf("workflow not paused: %v", respGet.RuntimeStatus)
}
fmt.Printf("workflow paused\n")
// Resume workflow test
err = daprClient.ResumeWorkflowBeta1(ctx, &client.ResumeWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})
if err != nil {
log.Fatalf("failed to resume workflow: %v", err)
}
respGet, err = daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})
if err != nil {
log.Fatalf("failed to get workflow: %v", err)
}
if respGet.RuntimeStatus != workflow.StatusRunning.String() {
log.Fatalf("workflow not running")
}
fmt.Println("workflow resumed")
fmt.Printf("stage: %d\n", stage)
// Raise Event Test
err = daprClient.RaiseEventWorkflowBeta1(ctx, &client.RaiseEventWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
EventName: "testEvent",
EventData: "testData",
SendRawData: false,
})
if err != nil {
fmt.Printf("failed to raise event: %v", err)
}
fmt.Println("workflow event raised")
time.Sleep(time.Second) // allow workflow to advance
fmt.Printf("stage: %d\n", stage)
respGet, err = daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})
if err != nil {
log.Fatalf("failed to get workflow: %v", err)
}
fmt.Printf("workflow status: %v\n", respGet.RuntimeStatus)
// Purge workflow test
err = daprClient.PurgeWorkflowBeta1(ctx, &client.PurgeWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})
if err != nil {
log.Fatalf("failed to purge workflow: %v", err)
}
respGet, err = daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})
if err != nil && respGet != nil {
log.Fatal("failed to purge workflow")
}
fmt.Println("workflow purged")
fmt.Printf("stage: %d\n", stage)
// Terminate workflow test
respStart, err = daprClient.StartWorkflowBeta1(ctx, &client.StartWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
WorkflowName: "TestWorkflow",
Options: nil,
Input: 1,
SendRawInput: false,
})
if err != nil {
log.Fatalf("failed to start workflow: %v", err)
}
fmt.Printf("workflow started with id: %s\n", respStart.InstanceID)
err = daprClient.TerminateWorkflowBeta1(ctx, &client.TerminateWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})
if err != nil {
log.Fatalf("failed to terminate workflow: %v", err)
}
respGet, err = daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})
if err != nil {
log.Fatalf("failed to get workflow: %v", err)
}
if respGet.RuntimeStatus != workflow.StatusTerminated.String() {
log.Fatal("failed to terminate workflow")
}
fmt.Println("workflow terminated")
err = daprClient.PurgeWorkflowBeta1(ctx, &client.PurgeWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})
respGet, err = daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})
if err == nil || respGet != nil {
log.Fatalf("failed to purge workflow: %v", err)
}
fmt.Println("workflow purged")
stage = 0
fmt.Println("workflow client test")
wfClient, err := workflow.NewClient()
if err != nil {
log.Fatalf("[wfclient] faield to initialize: %v", err)
}
id, err := wfClient.ScheduleNewWorkflow(ctx, "TestWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput(1))
if err != nil {
log.Fatalf("[wfclient] failed to start workflow: %v", err)
}
fmt.Printf("[wfclient] started workflow with id: %s\n", id)
metadata, err := wfClient.FetchWorkflowMetadata(ctx, id)
if err != nil {
log.Fatalf("[wfclient] failed to get worfklow: %v", err)
}
fmt.Printf("[wfclient] workflow status: %v\n", metadata.RuntimeStatus.String())
if stage != 1 {
log.Fatalf("Workflow assertion failed while validating the wfclient. Stage 1 expected, current: %d", stage)
}
fmt.Printf("[wfclient] stage: %d\n", stage)
// raise event
if err := wfClient.RaiseEvent(ctx, id, "testEvent", workflow.WithEventPayload("testData")); err != nil {
log.Fatalf("[wfclient] failed to raise event: %v", err)
}
fmt.Println("[wfclient] event raised")
// Sleep to allow the workflow to advance
time.Sleep(time.Second)
if stage != 2 {
log.Fatalf("Workflow assertion failed while validating the wfclient. Stage 2 expected, current: %d", stage)
}
fmt.Printf("[wfclient] stage: %d\n", stage)
// stop workflow
if err := wfClient.TerminateWorkflow(ctx, id); err != nil {
log.Fatalf("[wfclient] failed to terminate workflow: %v", err)
}
fmt.Println("[wfclient] workflow terminated")
if err := wfClient.PurgeWorkflow(ctx, id); err != nil {
log.Fatalf("[wfclient] failed to purge workflow: %v", err)
}
fmt.Println("[wfclient] workflow purged")
// stop workflow runtime
if err := w.Shutdown(); err != nil {
log.Fatalf("failed to shutdown runtime: %v", err)
}
fmt.Println("workflow worker successfully shutdown")
}
func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) {
var input int
if err := ctx.GetInput(&input); err != nil {
return nil, err
}
var output string
if err := ctx.CallActivity(TestActivity, workflow.ActivityInput(input)).Await(&output); err != nil {
return nil, err
}
err := ctx.WaitForExternalEvent("testEvent", time.Second*60).Await(&output)
if err != nil {
return nil, err
}
if err := ctx.CallActivity(TestActivity, workflow.ActivityInput(input)).Await(&output); err != nil {
return nil, err
}
return output, nil
}
func TestActivity(ctx workflow.ActivityContext) (any, error) {
var input int
if err := ctx.GetInput(&input); err != nil {
return "", err
}
stage += input
return fmt.Sprintf("Stage: %d", stage), nil
}
```
[See the full Go SDK workflow example in context.](https://github.com/dapr/go-sdk/tree/main/examples/workflow/README.md)
{{% /codetab %}}
{{< /tabs >}}
@ -730,3 +1112,4 @@ Now that you've authored a workflow, learn how to manage it.
- [JavaScript example](https://github.com/dapr/js-sdk/tree/main/examples/workflow)
- [.NET example](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow)
- [Java example](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows)
- [Go example](https://github.com/dapr/go-sdk/tree/main/examples/workflow/README.md)

View File

@ -12,7 +12,7 @@ Dapr Workflow is currently in beta. [See known limitations for {{% dapr-latest-v
Now that you've [authored the workflow and its activities in your application]({{< ref howto-author-workflow.md >}}), you can start, terminate, and get information about the workflow using HTTP API calls. For more information, read the [workflow API reference]({{< ref workflow_api.md >}}).
{{< tabs Python JavaScript ".NET" Java HTTP >}}
{{< tabs Python JavaScript ".NET" Java Go HTTP >}}
<!--Python-->
{{% codetab %}}
@ -170,10 +170,10 @@ await daprClient.PurgeWorkflowAsync(orderId, workflowComponent);
{{% /codetab %}}
<!--Python-->
<!--Java-->
{{% codetab %}}
Manage your workflow within your code. [In the workflow example from the Java SDK](https://github.com/dapr/java-sdk/blob/master/examples/src/main/java/io/dapr/examples/workflows/DemoWorkflowClient.java), the workflow is registered in the code using the following APIs:
Manage your workflow within your code. [In the workflow example from the Java SDK](https://github.com/dapr/java-sdk/blob/master/examples/src/main/java/io/dapr/examples/workflows/), the workflow is registered in the code using the following APIs:
- **scheduleNewWorkflow**: Starts a new workflow instance
- **getInstanceState**: Get information on the status of the workflow
@ -235,6 +235,84 @@ public class DemoWorkflowClient {
{{% /codetab %}}
<!--Go-->
{{% codetab %}}
Manage your workflow within your code. [In the workflow example from the Go SDK](https://github.com/dapr/go-sdk/tree/main/examples/workflow), the workflow is registered in the code using the following APIs:
- **StartWorkflow**: Starts a new workflow instance
- **GetWorkflow**: Get information on the status of the workflow
- **PauseWorkflow**: Pauses or suspends a workflow instance that can later be resumed
- **RaiseEventWorkflow**: Raises events/tasks for the running workflow instance
- **ResumeWorkflow**: Waits for the workflow to complete its tasks
- **PurgeWorkflow**: Removes all metadata related to a specific workflow instance
- **TerminateWorkflow**: Terminates the workflow
```go
// Start workflow
type StartWorkflowRequest struct {
InstanceID string // Optional instance identifier
WorkflowComponent string
WorkflowName string
Options map[string]string // Optional metadata
Input any // Optional input
SendRawInput bool // Set to True in order to disable serialization on the input
}
type StartWorkflowResponse struct {
InstanceID string
}
// Get the workflow status
type GetWorkflowRequest struct {
InstanceID string
WorkflowComponent string
}
type GetWorkflowResponse struct {
InstanceID string
WorkflowName string
CreatedAt time.Time
LastUpdatedAt time.Time
RuntimeStatus string
Properties map[string]string
}
// Purge workflow
type PurgeWorkflowRequest struct {
InstanceID string
WorkflowComponent string
}
// Terminate workflow
type TerminateWorkflowRequest struct {
InstanceID string
WorkflowComponent string
}
// Pause workflow
type PauseWorkflowRequest struct {
InstanceID string
WorkflowComponent string
}
// Resume workflow
type ResumeWorkflowRequest struct {
InstanceID string
WorkflowComponent string
}
// Raise an event for the running workflow
type RaiseEventWorkflowRequest struct {
InstanceID string
WorkflowComponent string
EventName string
EventData any
SendRawData bool // Set to True in order to disable serialization on the data
}
```
{{% /codetab %}}
<!--HTTP-->
{{% codetab %}}
@ -316,5 +394,6 @@ Learn more about these HTTP calls in the [workflow API reference guide]({{< ref
- [JavaScript example](https://github.com/dapr/js-sdk/tree/main/examples/workflow)
- [.NET example](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow)
- [Java example](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows)
- [Go example](https://github.com/dapr/go-sdk/tree/main/examples/workflow)
- [Workflow API reference]({{< ref workflow_api.md >}})

View File

@ -216,4 +216,5 @@ See the [Reminder usage and execution guarantees section]({{< ref "workflow-arch
- [Python](https://github.com/dapr/python-sdk/tree/master/examples/demo_workflow)
- [JavaScript example](https://github.com/dapr/js-sdk/tree/main/examples/workflow)
- [.NET](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow)
- [Java](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows)
- [Java](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows)
- [Go example](https://github.com/dapr/go-sdk/tree/main/examples/workflow/README.md)

View File

@ -172,7 +172,7 @@ APIs that generate random numbers, random UUIDs, or the current date are _non-de
For example, instead of this:
{{< tabs ".NET" Java JavaScript >}}
{{< tabs ".NET" Java JavaScript Go >}}
{{% codetab %}}
@ -207,11 +207,20 @@ const randomString = getRandomString();
{{% /codetab %}}
{{% codetab %}}
```go
// DON'T DO THIS!
```
{{% /codetab %}}
{{< /tabs >}}
Do this:
{{< tabs ".NET" Java JavaScript >}}
{{< tabs ".NET" Java JavaScript Go >}}
{{% codetab %}}
@ -245,6 +254,15 @@ const randomString = yield context.callActivity(getRandomString);
{{% /codetab %}}
{{% codetab %}}
```go
// Do this!!
```
{{% /codetab %}}
{{< /tabs >}}
@ -255,7 +273,7 @@ Instead, workflows should interact with external state _indirectly_ using workfl
For example, instead of this:
{{< tabs ".NET" Java JavaScript >}}
{{< tabs ".NET" Java JavaScript Go >}}
{{% codetab %}}
@ -293,7 +311,14 @@ fetch('https://postman-echo.com/get')
.catch(error => {
console.error('Error:', error);
});
```
{{% /codetab %}}
{{% codetab %}}
```go
// DON'T DO THIS!
```
{{% /codetab %}}
@ -302,7 +327,7 @@ fetch('https://postman-echo.com/get')
Do this:
{{< tabs ".NET" Java JavaScript >}}
{{< tabs ".NET" Java JavaScript Go >}}
{{% codetab %}}
@ -334,6 +359,14 @@ const data = yield ctx.callActivity(makeHttpCall, "https://example.com/api/data"
{{% /codetab %}}
{{% codetab %}}
```go
// Do this!!
```
{{% /codetab %}}
{{< /tabs >}}
@ -346,7 +379,7 @@ Failure to follow this rule could result in undefined behavior. Any background p
For example, instead of this:
{{< tabs ".NET" Java JavaScript >}}
{{< tabs ".NET" Java JavaScript Go >}}
{{% codetab %}}
@ -375,11 +408,18 @@ Don't declare JavaScript workflow as `async`. The Node.js runtime doesn't guaran
{{% /codetab %}}
{{% codetab %}}
```go
// DON'T DO THIS!
```
{{% /codetab %}}
{{< /tabs >}}
Do this:
{{< tabs ".NET" Java JavaScript >}}
{{< tabs ".NET" Java JavaScript Go >}}
{{% codetab %}}
@ -407,6 +447,14 @@ Since the Node.js runtime doesn't guarantee that asynchronous functions are dete
{{% /codetab %}}
{{% codetab %}}
```go
// Do this!!
```
{{% /codetab %}}
{{< /tabs >}}
@ -438,6 +486,7 @@ To work around these constraints:
- [Workflow API reference]({{< ref workflow_api.md >}})
- Try out the following examples:
- [Python](https://github.com/dapr/python-sdk/tree/master/examples/demo_workflow)
- [JavaScript example](https://github.com/dapr/js-sdk/tree/main/examples/workflow)
- [JavaScript](https://github.com/dapr/js-sdk/tree/main/examples/workflow)
- [.NET](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow)
- [Java](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows)
- [Java](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows)
- [Go](https://github.com/dapr/go-sdk/tree/main/examples/workflow/README.md)

View File

@ -83,6 +83,7 @@ You can use the following SDKs to author a workflow.
| JavaScript | [DaprWorkflowClient](https://github.com/dapr/js-sdk/blob/main/src/workflow/client/DaprWorkflowClient.ts) |
| .NET | [Dapr.Workflow](https://www.nuget.org/profiles/dapr.io) |
| Java | [io.dapr.workflows](https://dapr.github.io/java-sdk/io/dapr/workflows/package-summary.html) |
| Go | [workflow](https://github.com/dapr/go-sdk/tree/main/client/workflow.go) |
## Try out workflows
@ -97,6 +98,7 @@ Want to put workflows to the test? Walk through the following quickstart and tut
| [Workflow JavaScript SDK example](https://github.com/dapr/js-sdk/tree/main/examples/workflow) | Learn how to create a Dapr Workflow and invoke it using the JavaScript SDK. |
| [Workflow .NET SDK example](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow) | Learn how to create a Dapr Workflow and invoke it using ASP.NET Core web APIs. |
| [Workflow Java SDK example](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows) | Learn how to create a Dapr Workflow and invoke it using the Java `io.dapr.workflows` package. |
| [Workflow Go SDK example](https://github.com/dapr/go-sdk/tree/main/examples/workflow/README.md) | Learn how to create a Dapr Workflow and invoke it using the Go `workflow` package. |
### Start using workflows directly in your app
@ -105,7 +107,6 @@ Want to skip the quickstarts? Not a problem. You can try out the workflow buildi
## Limitations
- **State stores:** As of the 1.12.0 beta release of Dapr Workflow, using the NoSQL databases as a state store results in limitations around storing internal states. For example, CosmosDB has a maximum single operation item limit of only 100 states in a single request.
- **Horizontal scaling:** As of the 1.12.0 beta release of Dapr Workflow, if you scale out Dapr sidecars or your application pods to more than 2, then the concurrency of the workflow execution drops. It is recommended to test with 1 or 2 instances, and no more than 2.
## Watch the demo
@ -126,3 +127,4 @@ Watch [this video for an overview on Dapr Workflow](https://youtu.be/s1p9MNl4VGo
- [JavaScript example](https://github.com/dapr/js-sdk/tree/main/examples/workflow)
- [.NET example](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow)
- [Java example](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows)
- [Go example](https://github.com/dapr/go-sdk/tree/main/examples/workflow/README.md)

View File

@ -25,7 +25,7 @@ While the pattern is simple, there are many complexities hidden in the implement
Dapr Workflow solves these complexities by allowing you to implement the task chaining pattern concisely as a simple function in the programming language of your choice, as shown in the following example.
{{< tabs Python JavaScript ".NET" Java >}}
{{< tabs Python JavaScript ".NET" Java Go >}}
{{% codetab %}}
<!--python-->
@ -234,6 +234,57 @@ public class ChainWorkflow extends Workflow {
{{% /codetab %}}
{{% codetab %}}
<!--go-->
```go
func TaskChainWorkflow(ctx *workflow.WorkflowContext) (any, error) {
var input int
if err := ctx.GetInput(&input); err != nil {
return "", err
}
var result1 int
if err := ctx.CallActivity(Step1, workflow.ActivityInput(input)).Await(&result1); err != nil {
return nil, err
}
var result2 int
if err := ctx.CallActivity(Step1, workflow.ActivityInput(input)).Await(&result2); err != nil {
return nil, err
}
var result3 int
if err := ctx.CallActivity(Step1, workflow.ActivityInput(input)).Await(&result3); err != nil {
return nil, err
}
return []int{result1, result2, result3}, nil
}
func Step1(ctx workflow.ActivityContext) (any, error) {
var input int
if err := ctx.GetInput(&input); err != nil {
return "", err
}
fmt.Printf("Step 1: Received input: %s", input)
return input + 1, nil
}
func Step2(ctx workflow.ActivityContext) (any, error) {
var input int
if err := ctx.GetInput(&input); err != nil {
return "", err
}
fmt.Printf("Step 2: Received input: %s", input)
return input * 2, nil
}
func Step3(ctx workflow.ActivityContext) (any, error) {
var input int
if err := ctx.GetInput(&input); err != nil {
return "", err
}
fmt.Printf("Step 3: Received input: %s", input)
return int(math.Pow(float64(input), 2)), nil
}
```
{{% /codetab %}}
{{< /tabs >}}
As you can see, the workflow is expressed as a simple series of statements in the programming language of your choice. This allows any engineer in the organization to quickly understand the end-to-end flow without necessarily needing to understand the end-to-end system architecture.
@ -260,7 +311,7 @@ In addition to the challenges mentioned in [the previous pattern]({{< ref "workf
Dapr Workflows provides a way to express the fan-out/fan-in pattern as a simple function, as shown in the following example:
{{< tabs Python JavaScript ".NET" Java >}}
{{< tabs Python JavaScript ".NET" Java Go >}}
{{% codetab %}}
<!--python-->
@ -461,6 +512,72 @@ public class FaninoutWorkflow extends Workflow {
{{% /codetab %}}
{{% codetab %}}
<!--go-->
```go
func BatchProcessingWorkflow(ctx *workflow.WorkflowContext) (any, error) {
var input int
if err := ctx.GetInput(&input); err != nil {
return 0, err
}
var workBatch []int
if err := ctx.CallActivity(GetWorkBatch, workflow.ActivityInput(input)).Await(&workBatch); err != nil {
return 0, err
}
parallelTasks := workflow.NewTaskSlice(len(workBatch))
for i, workItem := range workBatch {
parallelTasks[i] = ctx.CallActivity(ProcessWorkItem, workflow.ActivityInput(workItem))
}
var outputs int
for _, task := range parallelTasks {
var output int
err := task.Await(&output)
if err == nil {
outputs += output
} else {
return 0, err
}
}
if err := ctx.CallActivity(ProcessResults, workflow.ActivityInput(outputs)).Await(nil); err != nil {
return 0, err
}
return 0, nil
}
func GetWorkBatch(ctx workflow.ActivityContext) (any, error) {
var batchSize int
if err := ctx.GetInput(&batchSize); err != nil {
return 0, err
}
batch := make([]int, batchSize)
for i := 0; i < batchSize; i++ {
batch[i] = i
}
return batch, nil
}
func ProcessWorkItem(ctx workflow.ActivityContext) (any, error) {
var workItem int
if err := ctx.GetInput(&workItem); err != nil {
return 0, err
}
fmt.Printf("Processing work item: %d\n", workItem)
time.Sleep(time.Second * 5)
result := workItem * 2
fmt.Printf("Work item %d processed. Result: %d\n", workItem, result)
return result, nil
}
func ProcessResults(ctx workflow.ActivityContext) (any, error) {
var finalResult int
if err := ctx.GetInput(&finalResult); err != nil {
return 0, err
}
fmt.Printf("Final result: %d\n", finalResult)
return finalResult, nil
}
```
{{% /codetab %}}
{{< /tabs >}}
The key takeaways from this example are:
@ -561,7 +678,7 @@ Depending on the business needs, there may be a single monitor or there may be m
Dapr Workflow supports this pattern natively by allowing you to implement _eternal workflows_. Rather than writing infinite while-loops ([which is an anti-pattern]({{< ref "workflow-features-concepts.md#infinite-loops-and-eternal-workflows" >}})), Dapr Workflow exposes a _continue-as-new_ API that workflow authors can use to restart a workflow function from the beginning with a new input.
{{< tabs Python JavaScript ".NET" Java >}}
{{< tabs Python JavaScript ".NET" Java Go >}}
{{% codetab %}}
<!--python-->
@ -722,6 +839,59 @@ public class MonitorWorkflow extends Workflow {
{{% /codetab %}}
{{% codetab %}}
<!--go-->
```go
type JobStatus struct {
JobID string `json:"job_id"`
IsHealthy bool `json:"is_healthy"`
}
func StatusMonitorWorkflow(ctx *workflow.WorkflowContext) (any, error) {
var sleepInterval time.Duration
var job JobStatus
if err := ctx.GetInput(&job); err != nil {
return "", err
}
var status string
if err := ctx.CallActivity(CheckStatus, workflow.ActivityInput(job)).Await(&status); err != nil {
return "", err
}
if status == "healthy" {
job.IsHealthy = true
sleepInterval = time.Second * 60
} else {
if job.IsHealthy {
job.IsHealthy = false
err := ctx.CallActivity(SendAlert, workflow.ActivityInput(fmt.Sprintf("Job '%s' is unhealthy!", job.JobID))).Await(nil)
if err != nil {
return "", err
}
}
sleepInterval = time.Second * 5
}
if err := ctx.CreateTimer(sleepInterval).Await(nil); err != nil {
return "", err
}
ctx.ContinueAsNew(job, false)
return "", nil
}
func CheckStatus(ctx workflow.ActivityContext) (any, error) {
statuses := []string{"healthy", "unhealthy"}
return statuses[rand.Intn(1)], nil
}
func SendAlert(ctx workflow.ActivityContext) (any, error) {
var message string
if err := ctx.GetInput(&message); err != nil {
return "", err
}
fmt.Printf("*** Alert: %s", message)
return "", nil
}
```
{{% /codetab %}}
{{< /tabs >}}
A workflow implementing the monitor pattern can loop forever or it can terminate itself gracefully by not calling _continue-as-new_.
@ -750,7 +920,7 @@ The following diagram illustrates this flow.
The following example code shows how this pattern can be implemented using Dapr Workflow.
{{< tabs Python JavaScript ".NET" Java >}}
{{< tabs Python JavaScript ".NET" Java Go >}}
{{% codetab %}}
<!--python-->
@ -1032,11 +1202,68 @@ public class ExternalSystemInteractionWorkflow extends Workflow {
{{% /codetab %}}
{{% codetab %}}
<!--go-->
```go
type Order struct {
Cost float64 `json:"cost"`
Product string `json:"product"`
Quantity int `json:"quantity"`
}
type Approval struct {
Approver string `json:"approver"`
}
func PurchaseOrderWorkflow(ctx *workflow.WorkflowContext) (any, error) {
var order Order
if err := ctx.GetInput(&order); err != nil {
return "", err
}
// Orders under $1000 are auto-approved
if order.Cost < 1000 {
return "Auto-approved", nil
}
// Orders of $1000 or more require manager approval
if err := ctx.CallActivity(SendApprovalRequest, workflow.ActivityInput(order)).Await(nil); err != nil {
return "", err
}
// Approvals must be received within 24 hours or they will be cancelled
var approval Approval
if err := ctx.WaitForExternalEvent("approval_received", time.Hour*24).Await(&approval); err != nil {
// Assuming that a timeout has taken place - in any case; an error.
return "error/cancelled", err
}
// The order was approved
if err := ctx.CallActivity(PlaceOrder, workflow.ActivityInput(order)).Await(nil); err != nil {
return "", err
}
return fmt.Sprintf("Approved by %s", approval.Approver), nil
}
func SendApprovalRequest(ctx workflow.ActivityContext) (any, error) {
var order Order
if err := ctx.GetInput(&order); err != nil {
return "", err
}
fmt.Printf("*** Sending approval request for order: %v\n", order)
return "", nil
}
func PlaceOrder(ctx workflow.ActivityContext) (any, error) {
var order Order
if err := ctx.GetInput(&order); err != nil {
return "", err
}
fmt.Printf("*** Placing order: %v", order)
return "", nil
}
```
{{% /codetab %}}
{{< /tabs >}}
The code that delivers the event to resume the workflow execution is external to the workflow. Workflow events can be delivered to a waiting workflow instance using the [raise event]({{< ref "howto-manage-workflow.md#raise-an-event" >}}) workflow management API, as shown in the following example:
{{< tabs Python JavaScript ".NET" Java >}}
{{< tabs Python JavaScript ".NET" Java Go >}}
{{% codetab %}}
<!--python-->
@ -1059,7 +1286,11 @@ with DaprClient() as d:
<!--javascript-->
```javascript
// Raise the workflow event to the waiting workflow
import { DaprClient } from "@dapr/dapr";
public async raiseEvent(workflowInstanceId: string, eventName: string, eventPayload?: any) {
this._innerClient.raiseOrchestrationEvent(workflowInstanceId, eventName, eventPayload);
}
```
{{% /codetab %}}
@ -1088,6 +1319,32 @@ client.raiseEvent(restartingInstanceId, "RestartEvent", "RestartEventPayload");
{{% /codetab %}}
{{% codetab %}}
<!--go-->
```go
func raiseEvent() {
daprClient, err := client.NewClient()
if err != nil {
log.Fatalf("failed to initialize the client")
}
err = daprClient.RaiseEventWorkflowBeta1(context.Background(), &client.RaiseEventWorkflowRequest{
InstanceID: "instance_id",
WorkflowComponent: "dapr",
EventName: "approval_received",
EventData: Approval{
Approver: "Jane Doe",
},
})
if err != nil {
log.Fatalf("failed to raise event on workflow")
}
log.Println("raised an event on specified workflow")
}
```
{{% /codetab %}}
{{< /tabs >}}
External events don't have to be directly triggered by humans. They can also be triggered by other systems. For example, a workflow may need to pause and wait for a payment to be received. In this case, a payment system might publish an event to a pub/sub topic on receipt of a payment, and a listener on that topic can raise an event to the workflow using the raise event workflow API.
@ -1105,4 +1362,5 @@ External events don't have to be directly triggered by humans. They can also be
- [Python](https://github.com/dapr/python-sdk/tree/master/examples/demo_workflow)
- [JavaScript](https://github.com/dapr/js-sdk/tree/main/examples/workflow)
- [.NET](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow)
- [Java](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows)
- [Java](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows)
- [Go](https://github.com/dapr/go-sdk/tree/main/examples/workflow/README.md)

View File

@ -20,8 +20,8 @@ In this guide, you'll:
<img src="/images/workflow-quickstart-overview.png" width=800 style="padding-bottom:15px;">
{{< tabs "Python" "JavaScript" ".NET" "Java" >}}
Select your preferred language-specific Dapr SDK before proceeding with the Quickstart.
{{< tabs "Python" "JavaScript" ".NET" "Java" Go >}}
<!-- Python -->
{{% codetab %}}
@ -68,14 +68,12 @@ pip3 install -r requirements.txt
### Step 3: Run the order processor app
In the terminal, start the order processor app alongside a Dapr sidecar:
In the terminal, start the order processor app alongside a Dapr sidecar using [Multi-App Run]({{< ref multi-app-dapr-run >}}):
```bash
dapr run --app-id order-processor --resources-path ../../../components/ -- python3 app.py
dapr run -f .
```
> **Note:** Since Python3.exe is not defined in Windows, you may need to use `python app.py` instead of `python3 app.py`.
This starts the `order-processor` app with unique workflow ID and runs the workflow activities.
Expected output:
@ -404,10 +402,10 @@ cd workflows/csharp/sdk/order-processor
### Step 3: Run the order processor app
In the terminal, start the order processor app alongside a Dapr sidecar:
In the terminal, start the order processor app alongside a Dapr sidecar using [Multi-App Run]({{< ref multi-app-dapr-run >}}):
```bash
dapr run --app-id order-processor dotnet run
dapr run -f .
```
This starts the `order-processor` app with unique workflow ID and runs the workflow activities.
@ -660,10 +658,10 @@ mvn clean install
### Step 3: Run the order processor app
In the terminal, start the order processor app alongside a Dapr sidecar:
In the terminal, start the order processor app alongside a Dapr sidecar using [Multi-App Run]({{< ref multi-app-dapr-run >}}):
```bash
dapr run --app-id WorkflowConsoleApp --resources-path ../../../components/ --dapr-grpc-port 50001 -- java -jar target/OrderProcessingService-0.0.1-SNAPSHOT.jar io.dapr.quickstarts.workflows.WorkflowConsoleApp
dapr run -f .
```
This starts the `order-processor` app with unique workflow ID and runs the workflow activities.
@ -953,6 +951,250 @@ The `Activities` directory holds the four workflow activities used by the workfl
{{% /codetab %}}
<!-- Go -->
{{% codetab %}}
The `order-processor` console app starts and manages the `OrderProcessingWorkflow` workflow, which simulates purchasing items from a store. The workflow consists of five unique workflow activities, or tasks:
- `NotifyActivity`: Utilizes a logger to print out messages throughout the workflow. These messages notify you when:
- You have insufficient inventory
- Your payment couldn't be processed, etc.
- `ProcessPaymentActivity`: Processes and authorizes the payment.
- `VerifyInventoryActivity`: Checks the state store to ensure there is enough inventory present for purchase.
- `UpdateInventoryActivity`: Removes the requested items from the state store and updates the store with the new remaining inventory value.
- `RequestApprovalActivity`: Seeks approval from the manager if payment is greater than 50,000 USD.
### Step 1: Pre-requisites
For this example, you will need:
- [Dapr CLI and initialized environment](https://docs.dapr.io/getting-started).
- [Latest version of Go](https://go.dev/dl/).
<!-- IGNORE_LINKS -->
- [Docker Desktop](https://www.docker.com/products/docker-desktop)
<!-- END_IGNORE -->
### Step 2: Set up the environment
Clone the [sample provided in the Quickstarts repo](https://github.com/dapr/quickstarts/tree/master/workflows).
```bash
git clone https://github.com/dapr/quickstarts.git
```
In a new terminal window, navigate to the `order-processor` directory:
```bash
cd workflows/go/sdk/order-processor
```
### Step 3: Run the order processor app
In the terminal, start the order processor app alongside a Dapr sidecar using [Multi-App Run]({{< ref multi-app-dapr-run >}}):
```bash
dapr run -f .
```
This starts the `order-processor` app with unique workflow ID and runs the workflow activities.
Expected output:
```bash
== APP - order-processor == *** Welcome to the Dapr Workflow console app sample!
== APP - order-processor == *** Using this app, you can place orders that start workflows.
== APP - order-processor == dapr client initializing for: 127.0.0.1:50056
== APP - order-processor == adding base stock item: paperclip
== APP - order-processor == 2024/02/01 12:59:52 work item listener started
== APP - order-processor == INFO: 2024/02/01 12:59:52 starting background processor
== APP - order-processor == adding base stock item: cars
== APP - order-processor == adding base stock item: computers
== APP - order-processor == ==========Begin the purchase of item:==========
== APP - order-processor == NotifyActivity: Received order 48ee83b7-5d80-48d5-97f9-6b372f5480a5 for 10 cars - $150000
== APP - order-processor == VerifyInventoryActivity: Verifying inventory for order 48ee83b7-5d80-48d5-97f9-6b372f5480a5 of 10 cars
== APP - order-processor == VerifyInventoryActivity: There are 100 cars available for purchase
== APP - order-processor == RequestApprovalActivity: Requesting approval for payment of 150000USD for 10 cars
== APP - order-processor == NotifyActivity: Payment for order 48ee83b7-5d80-48d5-97f9-6b372f5480a5 has been approved!
== APP - order-processor == ProcessPaymentActivity: 48ee83b7-5d80-48d5-97f9-6b372f5480a5 for 10 - cars (150000USD)
== APP - order-processor == UpdateInventoryActivity: Checking Inventory for order 48ee83b7-5d80-48d5-97f9-6b372f5480a5 for 10 * cars
== APP - order-processor == UpdateInventoryActivity: There are now 90 cars left in stock
== APP - order-processor == NotifyActivity: Order 48ee83b7-5d80-48d5-97f9-6b372f5480a5 has completed!
== APP - order-processor == Workflow completed - result: COMPLETED
== APP - order-processor == Purchase of item is complete
```
Stop the Dapr workflow with `CTRL+C` or:
```bash
dapr stop -f .
```
### (Optional) Step 4: View in Zipkin
Running `dapr init` launches the [openzipkin/zipkin](https://hub.docker.com/r/openzipkin/zipkin/) Docker container. If the container has stopped running, launch the Zipkin Docker container with the following command:
```
docker run -d -p 9411:9411 openzipkin/zipkin
```
View the workflow trace spans in the Zipkin web UI (typically at `http://localhost:9411/zipkin/`).
<img src="/images/workflow-trace-spans-zipkin.png" width=800 style="padding-bottom:15px;">
### What happened?
When you ran `dapr run`:
1. A unique order ID for the workflow is generated (in the above example, `48ee83b7-5d80-48d5-97f9-6b372f5480a5`) and the workflow is scheduled.
1. The `NotifyActivity` workflow activity sends a notification saying an order for 10 cars has been received.
1. The `ReserveInventoryActivity` workflow activity checks the inventory data, determines if you can supply the ordered item, and responds with the number of cars in stock.
1. Your workflow starts and notifies you of its status.
1. The `ProcessPaymentActivity` workflow activity begins processing payment for order `48ee83b7-5d80-48d5-97f9-6b372f5480a5` and confirms if successful.
1. The `UpdateInventoryActivity` workflow activity updates the inventory with the current available cars after the order has been processed.
1. The `NotifyActivity` workflow activity sends a notification saying that order `48ee83b7-5d80-48d5-97f9-6b372f5480a5` has completed.
1. The workflow terminates as completed.
#### `order-processor/main.go`
In the application's program file:
- The unique workflow order ID is generated
- The workflow is scheduled
- The workflow status is retrieved
- The workflow and the workflow activities it invokes are registered
```go
func main() {
fmt.Println("*** Welcome to the Dapr Workflow console app sample!")
fmt.Println("*** Using this app, you can place orders that start workflows.")
// ...
// Register workflow and activities
if err := w.RegisterWorkflow(OrderProcessingWorkflow); err != nil {
log.Fatal(err)
}
if err := w.RegisterActivity(NotifyActivity); err != nil {
log.Fatal(err)
}
if err := w.RegisterActivity(RequestApprovalActivity); err != nil {
log.Fatal(err)
}
if err := w.RegisterActivity(VerifyInventoryActivity); err != nil {
log.Fatal(err)
}
if err := w.RegisterActivity(ProcessPaymentActivity); err != nil {
log.Fatal(err)
}
if err := w.RegisterActivity(UpdateInventoryActivity); err != nil {
log.Fatal(err)
}
// Build and start workflow runtime, pulling and executing tasks
if err := w.Start(); err != nil {
log.Fatal(err)
}
daprClient, err := client.NewClient()
if err != nil {
log.Fatalf("failed to initialise dapr client: %v", err)
}
wfClient, err := workflow.NewClient(workflow.WithDaprClient(daprClient))
if err != nil {
log.Fatalf("failed to initialise workflow client: %v", err)
}
// Check inventory
inventory := []InventoryItem{
{ItemName: "paperclip", PerItemCost: 5, Quantity: 100},
{ItemName: "cars", PerItemCost: 15000, Quantity: 100},
{ItemName: "computers", PerItemCost: 500, Quantity: 100},
}
if err := restockInventory(daprClient, inventory); err != nil {
log.Fatalf("failed to restock: %v", err)
}
fmt.Println("==========Begin the purchase of item:==========")
itemName := defaultItemName
orderQuantity := 10
totalCost := inventory[1].PerItemCost * orderQuantity
orderPayload := OrderPayload{
ItemName: itemName,
Quantity: orderQuantity,
TotalCost: totalCost,
}
// Start workflow events, like receiving order, verifying inventory, and processing payment
id, err := wfClient.ScheduleNewWorkflow(context.Background(), workflowName, workflow.WithInput(orderPayload))
if err != nil {
log.Fatalf("failed to start workflow: %v", err)
}
// ...
// Notification that workflow has completed or failed
for {
timeDelta := time.Since(startTime)
metadata, err := wfClient.FetchWorkflowMetadata(context.Background(), id)
if err != nil {
log.Fatalf("failed to fetch workflow: %v", err)
}
if (metadata.RuntimeStatus == workflow.StatusCompleted) || (metadata.RuntimeStatus == workflow.StatusFailed) || (metadata.RuntimeStatus == workflow.StatusTerminated) {
fmt.Printf("Workflow completed - result: %v\n", metadata.RuntimeStatus.String())
break
}
if timeDelta.Seconds() >= 10 {
metadata, err := wfClient.FetchWorkflowMetadata(context.Background(), id)
if err != nil {
log.Fatalf("failed to fetch workflow: %v", err)
}
if totalCost > 50000 && !approvalSought && ((metadata.RuntimeStatus != workflow.StatusCompleted) || (metadata.RuntimeStatus != workflow.StatusFailed) || (metadata.RuntimeStatus != workflow.StatusTerminated)) {
approvalSought = true
promptForApproval(id)
}
}
// Sleep to not DoS the dapr dev instance
time.Sleep(time.Second)
}
fmt.Println("Purchase of item is complete")
}
// Request approval (RequestApprovalActivity)
func promptForApproval(id string) {
wfClient, err := workflow.NewClient()
if err != nil {
log.Fatalf("failed to initialise wfClient: %v", err)
}
if err := wfClient.RaiseEvent(context.Background(), id, "manager_approval"); err != nil {
log.Fatal(err)
}
}
// Update inventory for remaining stock (UpdateInventoryActivity)
func restockInventory(daprClient client.Client, inventory []InventoryItem) error {
for _, item := range inventory {
itemSerialized, err := json.Marshal(item)
if err != nil {
return err
}
fmt.Printf("adding base stock item: %s\n", item.ItemName)
if err := daprClient.SaveState(context.Background(), stateStoreName, item.ItemName, itemSerialized, nil); err != nil {
return err
}
}
return nil
}
```
Meanwhile, the `OrderProcessingWorkflow` and its activities are defined as methods in [`workflow.go`](https://github.com/dapr/quickstarts/workflows/go/sdk/order-processor/workflow.go)
{{% /codetab %}}
{{< /tabs >}}
## Tell us what you think!