mike initial review

Signed-off-by: Hannah Hunter <hannahhunter@microsoft.com>
This commit is contained in:
Hannah Hunter 2024-02-05 12:43:29 -05:00
parent a45e25f25c
commit 0f50934b63
5 changed files with 354 additions and 96 deletions

View File

@ -169,49 +169,21 @@ public class DemoWorkflowActivity implements WorkflowActivity {
<!--go-->
Define the workflow activities you'd like your workflow to perform. Activities are wrapped in the public `callActivityOptions` method, which implements the workflow activities.
Define each workflow activity you'd like your workflow to perform. The Activity input can be unmarshalled from the context with `ctx.GetInput`. Activities should be defined as taking a `ctx workflow.ActivityContext` parameter and returning an interface and error.
```go
type ActivityContext struct {
ctx task.ActivityContext
}
func (wfac *ActivityContext) GetInput(v interface{}) error {
return wfac.ctx.GetInput(&v)
}
func (wfac *ActivityContext) Context() context.Context {
return wfac.ctx.Context()
}
type callActivityOption func(*callActivityOptions) error
type callActivityOptions struct {
rawInput *wrapperspb.StringValue
}
// ActivityInput is an option to pass a JSON-serializable input
func ActivityInput(input any) callActivityOption {
return func(opts *callActivityOptions) error {
data, err := marshalData(input)
if err != nil {
return err
}
opts.rawInput = wrapperspb.String(string(data))
return nil
}
}
// ActivityRawInput is an option to pass a byte slice as an input
func ActivityRawInput(input string) callActivityOption {
return func(opts *callActivityOptions) error {
opts.rawInput = wrapperspb.String(input)
return nil
func TestActivity(ctx workflow.ActivityContext) (any, error) {
var input int
if err := ctx.GetInput(&input); err != nil {
return "", err
}
// Do something here
return "result", nil
}
```
[See the Go SDK workflow activity example in context.](todo)
[See the Go SDK workflow activity example in context.](https://github.com/dapr/go-sdk/tree/main/examples/workflow/README.md)
{{% /codetab %}}
@ -322,62 +294,30 @@ public class DemoWorkflowWorker {
<!--go-->
Next, register the workflow and workflow activities and start the workflow runtime.
Define your workflow function with the parameter `ctx *workflow.WorkflowContext` and return any and error. Invoke your defined activities from within your workflow.
```go
package workflow
// RegisterWorkflow adds a workflow function to the registry
func (ww *WorkflowWorker) RegisterWorkflow(w Workflow) error {
wrappedOrchestration := wrapWorkflow(w)
// get the function name for the passed workflow
name, err := getFunctionName(w)
if err != nil {
return fmt.Errorf("failed to get workflow decorator: %v", err)
func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) {
var input int
if err := ctx.GetInput(&input); err != nil {
return nil, err
}
err = ww.tasks.AddOrchestratorN(name, wrappedOrchestration)
return err
}
// Activity wrapper
func wrapActivity(a Activity) task.Activity {
return func(ctx task.ActivityContext) (any, error) {
aCtx := ActivityContext{ctx: ctx}
return a(aCtx)
var output string
if err := ctx.CallActivity(TestActivity, workflow.ActivityInput(input)).Await(&output); err != nil {
return nil, err
}
}
// RegisterActivity adds an activity function to the registry
func (ww *WorkflowWorker) RegisterActivity(a Activity) error {
wrappedActivity := wrapActivity(a)
// get the function name for the passed activity
name, err := getFunctionName(a)
if err != nil {
return fmt.Errorf("failed to get activity decorator: %v", err)
if err := ctx.WaitForExternalEvent("testEvent", time.Second*60).Await(&output); err != nil {
return nil, err
}
err = ww.tasks.AddActivityN(name, wrappedActivity)
return err
}
// Start initialises a non-blocking worker to handle workflows and activities registered
// prior to this being called.
func (ww *WorkflowWorker) Start() error {
ctx, cancel := context.WithCancel(context.Background())
ww.cancel = cancel
if err := ww.client.StartWorkItemListener(ctx, ww.tasks); err != nil {
return fmt.Errorf("failed to start work stream: %v", err)
if err := ctx.CreateTimer(time.Second).Await(nil); err != nil {
return nil, nil
}
log.Println("work item listener started")
return nil
return output, nil
}
```
[See the Go SDK workflow in context.](todo)
[See the Go SDK workflow in context.](https://github.com/dapr/go-sdk/tree/main/examples/workflow/README.md)
{{% /codetab %}}
@ -600,19 +540,337 @@ public class DemoWorkflow extends Workflow {
<!--go-->
[As in the following example](todo), a hello-world application using the Go SDK and Dapr Workflow would include:
[As in the following example](https://github.com/dapr/go-sdk/tree/main/examples/workflow/README.md), a hello-world application using the Go SDK and Dapr Workflow would include:
- A Go package called `todo` to receive the Go SDK client capabilities.
- An import of `todo`
- A Go package called `client` to receive the Go SDK client capabilities.
- An import of `workflow`
- The `DemoWorkflow` class which extends `Workflow`
- Creating the workflow with input and output.
- API calls. In the example below, these calls start and call the workflow activities.
```go
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/dapr/go-sdk/client"
"github.com/dapr/go-sdk/workflow"
)
var stage = 0
const (
workflowComponent = "dapr"
)
func main() {
w, err := workflow.NewWorker()
if err != nil {
log.Fatal(err)
}
fmt.Println("Worker initialized")
if err := w.RegisterWorkflow(TestWorkflow); err != nil {
log.Fatal(err)
}
fmt.Println("TestWorkflow registered")
if err := w.RegisterActivity(TestActivity); err != nil {
log.Fatal(err)
}
fmt.Println("TestActivity registered")
// Start workflow runner
if err := w.Start(); err != nil {
log.Fatal(err)
}
fmt.Println("runner started")
daprClient, err := client.NewClient()
if err != nil {
log.Fatalf("failed to intialise client: %v", err)
}
defer daprClient.Close()
ctx := context.Background()
// Start workflow test
respStart, err := daprClient.StartWorkflowBeta1(ctx, &client.StartWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
WorkflowName: "TestWorkflow",
Options: nil,
Input: 1,
SendRawInput: false,
})
if err != nil {
log.Fatalf("failed to start workflow: %v", err)
}
fmt.Printf("workflow started with id: %v\n", respStart.InstanceID)
// Pause workflow test
err = daprClient.PauseWorkflowBeta1(ctx, &client.PauseWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})
if err != nil {
log.Fatalf("failed to pause workflow: %v", err)
}
respGet, err := daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})
if err != nil {
log.Fatalf("failed to get workflow: %v", err)
}
if respGet.RuntimeStatus != workflow.StatusSuspended.String() {
log.Fatalf("workflow not paused: %v", respGet.RuntimeStatus)
}
fmt.Printf("workflow paused\n")
// Resume workflow test
err = daprClient.ResumeWorkflowBeta1(ctx, &client.ResumeWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})
if err != nil {
log.Fatalf("failed to resume workflow: %v", err)
}
respGet, err = daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})
if err != nil {
log.Fatalf("failed to get workflow: %v", err)
}
if respGet.RuntimeStatus != workflow.StatusRunning.String() {
log.Fatalf("workflow not running")
}
fmt.Println("workflow resumed")
fmt.Printf("stage: %d\n", stage)
// Raise Event Test
err = daprClient.RaiseEventWorkflowBeta1(ctx, &client.RaiseEventWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
EventName: "testEvent",
EventData: "testData",
SendRawData: false,
})
if err != nil {
fmt.Printf("failed to raise event: %v", err)
}
fmt.Println("workflow event raised")
time.Sleep(time.Second) // allow workflow to advance
fmt.Printf("stage: %d\n", stage)
respGet, err = daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})
if err != nil {
log.Fatalf("failed to get workflow: %v", err)
}
fmt.Printf("workflow status: %v\n", respGet.RuntimeStatus)
// Purge workflow test
err = daprClient.PurgeWorkflowBeta1(ctx, &client.PurgeWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})
if err != nil {
log.Fatalf("failed to purge workflow: %v", err)
}
respGet, err = daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})
if err != nil && respGet != nil {
log.Fatal("failed to purge workflow")
}
fmt.Println("workflow purged")
fmt.Printf("stage: %d\n", stage)
// Terminate workflow test
respStart, err = daprClient.StartWorkflowBeta1(ctx, &client.StartWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
WorkflowName: "TestWorkflow",
Options: nil,
Input: 1,
SendRawInput: false,
})
if err != nil {
log.Fatalf("failed to start workflow: %v", err)
}
fmt.Printf("workflow started with id: %s\n", respStart.InstanceID)
err = daprClient.TerminateWorkflowBeta1(ctx, &client.TerminateWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})
if err != nil {
log.Fatalf("failed to terminate workflow: %v", err)
}
respGet, err = daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})
if err != nil {
log.Fatalf("failed to get workflow: %v", err)
}
if respGet.RuntimeStatus != workflow.StatusTerminated.String() {
log.Fatal("failed to terminate workflow")
}
fmt.Println("workflow terminated")
err = daprClient.PurgeWorkflowBeta1(ctx, &client.PurgeWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})
respGet, err = daprClient.GetWorkflowBeta1(ctx, &client.GetWorkflowRequest{
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
WorkflowComponent: workflowComponent,
})
if err == nil || respGet != nil {
log.Fatalf("failed to purge workflow: %v", err)
}
fmt.Println("workflow purged")
// WFClient
// TODO: Expand client validation
stage = 0
fmt.Println("workflow client test")
wfClient, err := workflow.NewClient()
if err != nil {
log.Fatalf("[wfclient] faield to initialize: %v", err)
}
id, err := wfClient.ScheduleNewWorkflow(ctx, "TestWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput(1))
if err != nil {
log.Fatalf("[wfclient] failed to start workflow: %v", err)
}
fmt.Printf("[wfclient] started workflow with id: %s\n", id)
metadata, err := wfClient.FetchWorkflowMetadata(ctx, id)
if err != nil {
log.Fatalf("[wfclient] failed to get worfklow: %v", err)
}
fmt.Printf("[wfclient] workflow status: %v\n", metadata.RuntimeStatus.String())
if stage != 1 {
log.Fatalf("Workflow assertion failed while validating the wfclient. Stage 1 expected, current: %d", stage)
}
fmt.Printf("[wfclient] stage: %d\n", stage)
// TODO: WaitForWorkflowStart
// TODO: WaitForWorkflowCompletion
// raise event
if err := wfClient.RaiseEvent(ctx, id, "testEvent", workflow.WithEventPayload("testData")); err != nil {
log.Fatalf("[wfclient] failed to raise event: %v", err)
}
fmt.Println("[wfclient] event raised")
// Sleep to allow the workflow to advance
time.Sleep(time.Second)
if stage != 2 {
log.Fatalf("Workflow assertion failed while validating the wfclient. Stage 2 expected, current: %d", stage)
}
fmt.Printf("[wfclient] stage: %d\n", stage)
// stop workflow
if err := wfClient.TerminateWorkflow(ctx, id); err != nil {
log.Fatalf("[wfclient] failed to terminate workflow: %v", err)
}
fmt.Println("[wfclient] workflow terminated")
if err := wfClient.PurgeWorkflow(ctx, id); err != nil {
log.Fatalf("[wfclient] failed to purge workflow: %v", err)
}
fmt.Println("[wfclient] workflow purged")
// stop workflow runtime
if err := w.Shutdown(); err != nil {
log.Fatalf("failed to shutdown runtime: %v", err)
}
fmt.Println("workflow worker successfully shutdown")
}
func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) {
var input int
if err := ctx.GetInput(&input); err != nil {
return nil, err
}
var output string
if err := ctx.CallActivity(TestActivity, workflow.ActivityInput(input)).Await(&output); err != nil {
return nil, err
}
err := ctx.WaitForExternalEvent("testEvent", time.Second*60).Await(&output)
if err != nil {
return nil, err
}
if err := ctx.CallActivity(TestActivity, workflow.ActivityInput(input)).Await(&output); err != nil {
return nil, err
}
return output, nil
}
func TestActivity(ctx workflow.ActivityContext) (any, error) {
var input int
if err := ctx.GetInput(&input); err != nil {
return "", err
}
stage += input
return fmt.Sprintf("Stage: %d", stage), nil
}
```
[See the full Go SDK workflow example in context.](todo)
[See the full Go SDK workflow example in context.](https://github.com/dapr/go-sdk/tree/main/examples/workflow/README.md)
{{% /codetab %}}
@ -637,4 +895,4 @@ Now that you've authored a workflow, learn how to manage it.
- [Python example](https://github.com/dapr/python-sdk/tree/master/examples/demo_workflow)
- [.NET example](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow)
- [Java example](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows)
- [Go example](todo)
- [Go example](https://github.com/dapr/go-sdk/tree/main/examples/workflow/README.md)

View File

@ -197,4 +197,4 @@ See the [Reminder usage and execution guarantees section]({{< ref "workflow-arch
- [Python](https://github.com/dapr/python-sdk/tree/master/examples/demo_workflow)
- [.NET](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow)
- [Java](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows)
- [Go example](todo)
- [Go example](https://github.com/dapr/go-sdk/tree/main/examples/workflow/README.md)

View File

@ -467,4 +467,4 @@ To work around these constraints:
- [Python](https://github.com/dapr/python-sdk/tree/master/examples/demo_workflow)
- [.NET](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow)
- [Java](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows)
- [Go example](todo)
- [Go example](https://github.com/dapr/go-sdk/tree/main/examples/workflow/README.md)

View File

@ -82,7 +82,7 @@ You can use the following SDKs to author a workflow.
| Python | [dapr-ext-workflow](https://github.com/dapr/python-sdk/tree/master/ext/dapr-ext-workflow) |
| .NET | [Dapr.Workflow](https://www.nuget.org/profiles/dapr.io) |
| Java | [io.dapr.workflows](https://dapr.github.io/java-sdk/io/dapr/workflows/package-summary.html) |
| Go | [workflow](todo) |
| Go | [workflow](https://github.com/dapr/go-sdk/tree/main/client/workflow.go) |
## Try out workflows
@ -96,7 +96,7 @@ Want to put workflows to the test? Walk through the following quickstart and tut
| [Workflow Python SDK example](https://github.com/dapr/python-sdk/tree/master/examples/demo_workflow) | Learn how to create a Dapr Workflow and invoke it using the Python `DaprClient` package. |
| [Workflow .NET SDK example](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow) | Learn how to create a Dapr Workflow and invoke it using ASP.NET Core web APIs. |
| [Workflow Java SDK example](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows) | Learn how to create a Dapr Workflow and invoke it using the Java `io.dapr.workflows` package. |
| [Workflow Go SDK example](todo) | Learn how to create a Dapr Workflow and invoke it using the Go `workflow` package. |
| [Workflow Go SDK example](https://github.com/dapr/go-sdk/tree/main/examples/workflow/README.md) | Learn how to create a Dapr Workflow and invoke it using the Go `workflow` package. |
### Start using workflows directly in your app
@ -126,4 +126,4 @@ Watch [this video for an overview on Dapr Workflow](https://youtu.be/s1p9MNl4VGo
- [.NET example](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow)
- [Python example](https://github.com/dapr/python-sdk/tree/master/examples/demo_workflow)
- [Java example](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows)
- [Go example](todo)
- [Go example](https://github.com/dapr/go-sdk/tree/main/examples/workflow/README.md)

View File

@ -790,4 +790,4 @@ External events don't have to be directly triggered by humans. They can also be
- [Python](https://github.com/dapr/python-sdk/tree/master/examples/demo_workflow)
- [.NET](https://github.com/dapr/dotnet-sdk/tree/master/examples/Workflow)
- [Java](https://github.com/dapr/java-sdk/tree/master/examples/src/main/java/io/dapr/examples/workflows)
- [Go example](todo)
- [Go example](https://github.com/dapr/go-sdk/tree/main/examples/workflow/README.md)