mirror of https://github.com/dapr/docs.git
[1.16] Go Workflow durabletask (#4854)
The Go SDK workflow wrapper over durabletask-go is to be deprecated, in favour of using the durabletask-go client directly. The Go SDK wrapper is unnecessary indirection, and prevents some features from being used by the user. Signed-off-by: joshvanl <me@joshvanl.dev>
This commit is contained in:
parent
9fa3b5a161
commit
595fa5cd8e
|
@ -868,6 +868,7 @@ public class DemoWorkflow extends Workflow {
|
||||||
- Creating the workflow with input and output.
|
- Creating the workflow with input and output.
|
||||||
- API calls. In the example below, these calls start and call the workflow activities.
|
- API calls. In the example below, these calls start and call the workflow activities.
|
||||||
|
|
||||||
|
|
||||||
```go
|
```go
|
||||||
package main
|
package main
|
||||||
|
|
||||||
|
@ -877,8 +878,11 @@ import (
|
||||||
"log"
|
"log"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/dapr/go-sdk/client"
|
"github.com/dapr/durabletask-go/api"
|
||||||
"github.com/dapr/go-sdk/workflow"
|
"github.com/dapr/durabletask-go/backend"
|
||||||
|
"github.com/dapr/durabletask-go/client"
|
||||||
|
"github.com/dapr/durabletask-go/task"
|
||||||
|
dapr "github.com/dapr/go-sdk/client"
|
||||||
)
|
)
|
||||||
|
|
||||||
var stage = 0
|
var stage = 0
|
||||||
|
@ -888,110 +892,68 @@ const (
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
w, err := workflow.NewWorker()
|
registry := task.NewTaskRegistry()
|
||||||
if err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Println("Worker initialized")
|
if err := registry.AddOrchestrator(TestWorkflow); err != nil {
|
||||||
|
|
||||||
if err := w.RegisterWorkflow(TestWorkflow); err != nil {
|
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
fmt.Println("TestWorkflow registered")
|
fmt.Println("TestWorkflow registered")
|
||||||
|
|
||||||
if err := w.RegisterActivity(TestActivity); err != nil {
|
if err := registry.AddActivity(TestActivity); err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
fmt.Println("TestActivity registered")
|
fmt.Println("TestActivity registered")
|
||||||
|
|
||||||
// Start workflow runner
|
daprClient, err := dapr.NewClient()
|
||||||
if err := w.Start(); err != nil {
|
if err != nil {
|
||||||
log.Fatal(err)
|
log.Fatalf("failed to create Dapr client: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
client := client.NewTaskHubGrpcClient(daprClient.GrpcClientConn(), backend.DefaultLogger())
|
||||||
|
if err := client.StartWorkItemListener(context.TODO(), registry); err != nil {
|
||||||
|
log.Fatalf("failed to start work item listener: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
fmt.Println("runner started")
|
fmt.Println("runner started")
|
||||||
|
|
||||||
daprClient, err := client.NewClient()
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("failed to intialise client: %v", err)
|
|
||||||
}
|
|
||||||
defer daprClient.Close()
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
// Start workflow test
|
// Start workflow test
|
||||||
respStart, err := daprClient.StartWorkflow(ctx, &client.StartWorkflowRequest{
|
id, err := client.ScheduleNewOrchestration(ctx, "TestWorkflow", api.WithInput(1))
|
||||||
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
|
|
||||||
WorkflowComponent: workflowComponent,
|
|
||||||
WorkflowName: "TestWorkflow",
|
|
||||||
Options: nil,
|
|
||||||
Input: 1,
|
|
||||||
SendRawInput: false,
|
|
||||||
})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("failed to start workflow: %v", err)
|
log.Fatalf("failed to start workflow: %v", err)
|
||||||
}
|
}
|
||||||
fmt.Printf("workflow started with id: %v\n", respStart.InstanceID)
|
fmt.Printf("workflow started with id: %v\n", id)
|
||||||
|
|
||||||
// Pause workflow test
|
// Pause workflow test
|
||||||
err = daprClient.PauseWorkflow(ctx, &client.PauseWorkflowRequest{
|
err = client.PurgeOrchestrationState(ctx, id)
|
||||||
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
|
|
||||||
WorkflowComponent: workflowComponent,
|
|
||||||
})
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("failed to pause workflow: %v", err)
|
log.Fatalf("failed to pause workflow: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
respGet, err := daprClient.GetWorkflow(ctx, &client.GetWorkflowRequest{
|
respGet, err := client.FetchOrchestrationMetadata(ctx, id)
|
||||||
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
|
|
||||||
WorkflowComponent: workflowComponent,
|
|
||||||
})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("failed to get workflow: %v", err)
|
log.Fatalf("failed to get workflow: %v", err)
|
||||||
}
|
}
|
||||||
|
fmt.Printf("workflow paused: %s\n", respGet.RuntimeStatus)
|
||||||
if respGet.RuntimeStatus != workflow.StatusSuspended.String() {
|
|
||||||
log.Fatalf("workflow not paused: %v", respGet.RuntimeStatus)
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Printf("workflow paused\n")
|
|
||||||
|
|
||||||
// Resume workflow test
|
// Resume workflow test
|
||||||
err = daprClient.ResumeWorkflow(ctx, &client.ResumeWorkflowRequest{
|
err = client.ResumeOrchestration(ctx, id, "")
|
||||||
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
|
|
||||||
WorkflowComponent: workflowComponent,
|
|
||||||
})
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("failed to resume workflow: %v", err)
|
log.Fatalf("failed to resume workflow: %v", err)
|
||||||
}
|
}
|
||||||
|
fmt.Printf("workflow running: %s\n", respGet.RuntimeStatus)
|
||||||
|
|
||||||
respGet, err = daprClient.GetWorkflow(ctx, &client.GetWorkflowRequest{
|
respGet, err = client.FetchOrchestrationMetadata(ctx, id)
|
||||||
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
|
|
||||||
WorkflowComponent: workflowComponent,
|
|
||||||
})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("failed to get workflow: %v", err)
|
log.Fatalf("failed to get workflow: %v", err)
|
||||||
}
|
}
|
||||||
|
fmt.Printf("workflow resumed: %s\n", respGet.RuntimeStatus)
|
||||||
if respGet.RuntimeStatus != workflow.StatusRunning.String() {
|
|
||||||
log.Fatalf("workflow not running")
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Println("workflow resumed")
|
|
||||||
|
|
||||||
fmt.Printf("stage: %d\n", stage)
|
fmt.Printf("stage: %d\n", stage)
|
||||||
|
|
||||||
// Raise Event Test
|
// Raise Event Test
|
||||||
|
err = client.RaiseEvent(ctx, id, "testEvent", api.WithEventPayload("testData"))
|
||||||
err = daprClient.RaiseEventWorkflow(ctx, &client.RaiseEventWorkflowRequest{
|
|
||||||
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
|
|
||||||
WorkflowComponent: workflowComponent,
|
|
||||||
EventName: "testEvent",
|
|
||||||
EventData: "testData",
|
|
||||||
SendRawData: false,
|
|
||||||
})
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("failed to raise event: %v", err)
|
fmt.Printf("failed to raise event: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -1002,10 +964,7 @@ func main() {
|
||||||
|
|
||||||
fmt.Printf("stage: %d\n", stage)
|
fmt.Printf("stage: %d\n", stage)
|
||||||
|
|
||||||
respGet, err = daprClient.GetWorkflow(ctx, &client.GetWorkflowRequest{
|
respGet, err = client.FetchOrchestrationMetadata(ctx, id)
|
||||||
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
|
|
||||||
WorkflowComponent: workflowComponent,
|
|
||||||
})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("failed to get workflow: %v", err)
|
log.Fatalf("failed to get workflow: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -1013,166 +972,36 @@ func main() {
|
||||||
fmt.Printf("workflow status: %v\n", respGet.RuntimeStatus)
|
fmt.Printf("workflow status: %v\n", respGet.RuntimeStatus)
|
||||||
|
|
||||||
// Purge workflow test
|
// Purge workflow test
|
||||||
err = daprClient.PurgeWorkflow(ctx, &client.PurgeWorkflowRequest{
|
err = client.PurgeOrchestrationState(ctx, id)
|
||||||
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
|
|
||||||
WorkflowComponent: workflowComponent,
|
|
||||||
})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("failed to purge workflow: %v", err)
|
log.Fatalf("failed to purge workflow: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
respGet, err = daprClient.GetWorkflow(ctx, &client.GetWorkflowRequest{
|
|
||||||
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
|
|
||||||
WorkflowComponent: workflowComponent,
|
|
||||||
})
|
|
||||||
if err != nil && respGet != nil {
|
|
||||||
log.Fatal("failed to purge workflow")
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Println("workflow purged")
|
fmt.Println("workflow purged")
|
||||||
|
|
||||||
fmt.Printf("stage: %d\n", stage)
|
|
||||||
|
|
||||||
// Terminate workflow test
|
|
||||||
respStart, err = daprClient.StartWorkflow(ctx, &client.StartWorkflowRequest{
|
|
||||||
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
|
|
||||||
WorkflowComponent: workflowComponent,
|
|
||||||
WorkflowName: "TestWorkflow",
|
|
||||||
Options: nil,
|
|
||||||
Input: 1,
|
|
||||||
SendRawInput: false,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("failed to start workflow: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Printf("workflow started with id: %s\n", respStart.InstanceID)
|
|
||||||
|
|
||||||
err = daprClient.TerminateWorkflow(ctx, &client.TerminateWorkflowRequest{
|
|
||||||
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
|
|
||||||
WorkflowComponent: workflowComponent,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("failed to terminate workflow: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
respGet, err = daprClient.GetWorkflow(ctx, &client.GetWorkflowRequest{
|
|
||||||
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
|
|
||||||
WorkflowComponent: workflowComponent,
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("failed to get workflow: %v", err)
|
|
||||||
}
|
|
||||||
if respGet.RuntimeStatus != workflow.StatusTerminated.String() {
|
|
||||||
log.Fatal("failed to terminate workflow")
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Println("workflow terminated")
|
|
||||||
|
|
||||||
err = daprClient.PurgeWorkflow(ctx, &client.PurgeWorkflowRequest{
|
|
||||||
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
|
|
||||||
WorkflowComponent: workflowComponent,
|
|
||||||
})
|
|
||||||
|
|
||||||
respGet, err = daprClient.GetWorkflow(ctx, &client.GetWorkflowRequest{
|
|
||||||
InstanceID: "a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9",
|
|
||||||
WorkflowComponent: workflowComponent,
|
|
||||||
})
|
|
||||||
if err == nil || respGet != nil {
|
|
||||||
log.Fatalf("failed to purge workflow: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Println("workflow purged")
|
|
||||||
|
|
||||||
stage = 0
|
|
||||||
fmt.Println("workflow client test")
|
|
||||||
|
|
||||||
wfClient, err := workflow.NewClient()
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("[wfclient] faield to initialize: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
id, err := wfClient.ScheduleNewWorkflow(ctx, "TestWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput(1))
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("[wfclient] failed to start workflow: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Printf("[wfclient] started workflow with id: %s\n", id)
|
|
||||||
|
|
||||||
metadata, err := wfClient.FetchWorkflowMetadata(ctx, id)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("[wfclient] failed to get worfklow: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Printf("[wfclient] workflow status: %v\n", metadata.RuntimeStatus.String())
|
|
||||||
|
|
||||||
if stage != 1 {
|
|
||||||
log.Fatalf("Workflow assertion failed while validating the wfclient. Stage 1 expected, current: %d", stage)
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Printf("[wfclient] stage: %d\n", stage)
|
|
||||||
|
|
||||||
// raise event
|
|
||||||
|
|
||||||
if err := wfClient.RaiseEvent(ctx, id, "testEvent", workflow.WithEventPayload("testData")); err != nil {
|
|
||||||
log.Fatalf("[wfclient] failed to raise event: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Println("[wfclient] event raised")
|
|
||||||
|
|
||||||
// Sleep to allow the workflow to advance
|
|
||||||
time.Sleep(time.Second)
|
|
||||||
|
|
||||||
if stage != 2 {
|
|
||||||
log.Fatalf("Workflow assertion failed while validating the wfclient. Stage 2 expected, current: %d", stage)
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Printf("[wfclient] stage: %d\n", stage)
|
|
||||||
|
|
||||||
// stop workflow
|
|
||||||
if err := wfClient.TerminateWorkflow(ctx, id); err != nil {
|
|
||||||
log.Fatalf("[wfclient] failed to terminate workflow: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Println("[wfclient] workflow terminated")
|
|
||||||
|
|
||||||
if err := wfClient.PurgeWorkflow(ctx, id); err != nil {
|
|
||||||
log.Fatalf("[wfclient] failed to purge workflow: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Println("[wfclient] workflow purged")
|
|
||||||
|
|
||||||
// stop workflow runtime
|
|
||||||
if err := w.Shutdown(); err != nil {
|
|
||||||
log.Fatalf("failed to shutdown runtime: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Println("workflow worker successfully shutdown")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) {
|
func TestWorkflow(ctx *task.OrchestrationContext) (any, error) {
|
||||||
var input int
|
var input int
|
||||||
if err := ctx.GetInput(&input); err != nil {
|
if err := ctx.GetInput(&input); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
var output string
|
var output string
|
||||||
if err := ctx.CallActivity(TestActivity, workflow.ActivityInput(input)).Await(&output); err != nil {
|
if err := ctx.CallActivity(TestActivity, task.WithActivityInput(input)).Await(&output); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
err := ctx.WaitForExternalEvent("testEvent", time.Second*60).Await(&output)
|
err := ctx.WaitForSingleEvent("testEvent", time.Second*60).Await(&output)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := ctx.CallActivity(TestActivity, workflow.ActivityInput(input)).Await(&output); err != nil {
|
if err := ctx.CallActivity(TestActivity, task.WithActivityInput(input)).Await(&output); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return output, nil
|
return output, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestActivity(ctx workflow.ActivityContext) (any, error) {
|
func TestActivity(ctx task.ActivityContext) (any, error) {
|
||||||
var input int
|
var input int
|
||||||
if err := ctx.GetInput(&input); err != nil {
|
if err := ctx.GetInput(&input); err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
|
|
|
@ -1756,8 +1756,11 @@ import (
|
||||||
"log"
|
"log"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/dapr/go-sdk/client"
|
"github.com/dapr/durabletask-go/api"
|
||||||
"github.com/dapr/go-sdk/workflow"
|
"github.com/dapr/durabletask-go/backend"
|
||||||
|
"github.com/dapr/durabletask-go/client"
|
||||||
|
"github.com/dapr/durabletask-go/task"
|
||||||
|
dapr "github.com/dapr/go-sdk/client"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
@ -1771,41 +1774,35 @@ func main() {
|
||||||
fmt.Println("*** Welcome to the Dapr Workflow console app sample!")
|
fmt.Println("*** Welcome to the Dapr Workflow console app sample!")
|
||||||
fmt.Println("*** Using this app, you can place orders that start workflows.")
|
fmt.Println("*** Using this app, you can place orders that start workflows.")
|
||||||
|
|
||||||
w, err := workflow.NewWorker()
|
registry := task.NewTaskRegistry()
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("failed to start worker: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := w.RegisterWorkflow(OrderProcessingWorkflow); err != nil {
|
if err := registry.AddOrchestrator(OrderProcessingWorkflow); err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
if err := w.RegisterActivity(NotifyActivity); err != nil {
|
if err := registry.AddActivity(NotifyActivity); err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
if err := w.RegisterActivity(RequestApprovalActivity); err != nil {
|
if err := registry.AddActivity(RequestApprovalActivity); err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
if err := w.RegisterActivity(VerifyInventoryActivity); err != nil {
|
if err := registry.AddActivity(VerifyInventoryActivity); err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
if err := w.RegisterActivity(ProcessPaymentActivity); err != nil {
|
if err := registry.AddActivity(ProcessPaymentActivity); err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
if err := w.RegisterActivity(UpdateInventoryActivity); err != nil {
|
if err := registry.AddActivity(UpdateInventoryActivity); err != nil {
|
||||||
log.Fatal(err)
|
log.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := w.Start(); err != nil {
|
daprClient, err := dapr.NewClient()
|
||||||
log.Fatal(err)
|
if err != nil {
|
||||||
|
log.Fatalf("failed to create Dapr client: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
daprClient, err := client.NewClient()
|
client := client.NewTaskHubGrpcClient(daprClient.GrpcClientConn(), backend.DefaultLogger())
|
||||||
if err != nil {
|
if err := client.StartWorkItemListener(context.TODO(), registry); err != nil {
|
||||||
log.Fatalf("failed to initialise dapr client: %v", err)
|
log.Fatalf("failed to start work item listener: %v", err)
|
||||||
}
|
|
||||||
wfClient, err := workflow.NewClient(workflow.WithDaprClient(daprClient))
|
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("failed to initialise workflow client: %v", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
inventory := []InventoryItem{
|
inventory := []InventoryItem{
|
||||||
|
@ -1830,19 +1827,21 @@ func main() {
|
||||||
TotalCost: totalCost,
|
TotalCost: totalCost,
|
||||||
}
|
}
|
||||||
|
|
||||||
id, err := wfClient.ScheduleNewWorkflow(context.Background(), workflowName, workflow.WithInput(orderPayload))
|
id, err := client.ScheduleNewOrchestration(context.TODO(), workflowName,
|
||||||
|
api.WithInput(orderPayload),
|
||||||
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("failed to start workflow: %v", err)
|
log.Fatalf("failed to start workflow: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
waitCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
waitCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||||
_, err = wfClient.WaitForWorkflowCompletion(waitCtx, id)
|
defer cancel()
|
||||||
cancel()
|
_, err = client.WaitForOrchestrationCompletion(waitCtx, id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("failed to wait for workflow: %v", err)
|
log.Fatalf("failed to wait for workflow: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
respFetch, err := wfClient.FetchWorkflowMetadata(context.Background(), id, workflow.WithFetchPayloads(true))
|
respFetch, err := client.FetchOrchestrationMetadata(context.Background(), id, api.WithFetchPayloads(true))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("failed to get workflow: %v", err)
|
log.Fatalf("failed to get workflow: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -1852,7 +1851,7 @@ func main() {
|
||||||
fmt.Println("Purchase of item is complete")
|
fmt.Println("Purchase of item is complete")
|
||||||
}
|
}
|
||||||
|
|
||||||
func restockInventory(daprClient client.Client, inventory []InventoryItem) error {
|
func restockInventory(daprClient dapr.Client, inventory []InventoryItem) error {
|
||||||
for _, item := range inventory {
|
for _, item := range inventory {
|
||||||
itemSerialized, err := json.Marshal(item)
|
itemSerialized, err := json.Marshal(item)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1865,7 +1864,6 @@ func restockInventory(daprClient client.Client, inventory []InventoryItem) error
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
```
|
```
|
||||||
|
|
||||||
#### `order-processor/workflow.go`
|
#### `order-processor/workflow.go`
|
||||||
|
@ -1881,18 +1879,18 @@ import (
|
||||||
"log"
|
"log"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/dapr/durabletask-go/task"
|
||||||
"github.com/dapr/go-sdk/client"
|
"github.com/dapr/go-sdk/client"
|
||||||
"github.com/dapr/go-sdk/workflow"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// OrderProcessingWorkflow is the main workflow for orchestrating activities in the order process.
|
// OrderProcessingWorkflow is the main workflow for orchestrating activities in the order process.
|
||||||
func OrderProcessingWorkflow(ctx *workflow.WorkflowContext) (any, error) {
|
func OrderProcessingWorkflow(ctx *task.OrchestrationContext) (any, error) {
|
||||||
orderID := ctx.InstanceID()
|
orderID := ctx.ID
|
||||||
var orderPayload OrderPayload
|
var orderPayload OrderPayload
|
||||||
if err := ctx.GetInput(&orderPayload); err != nil {
|
if err := ctx.GetInput(&orderPayload); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
err := ctx.CallActivity(NotifyActivity, workflow.ActivityInput(Notification{
|
err := ctx.CallActivity(NotifyActivity, task.WithActivityInput(Notification{
|
||||||
Message: fmt.Sprintf("Received order %s for %d %s - $%d", orderID, orderPayload.Quantity, orderPayload.ItemName, orderPayload.TotalCost),
|
Message: fmt.Sprintf("Received order %s for %d %s - $%d", orderID, orderPayload.Quantity, orderPayload.ItemName, orderPayload.TotalCost),
|
||||||
})).Await(nil)
|
})).Await(nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -1900,8 +1898,8 @@ func OrderProcessingWorkflow(ctx *workflow.WorkflowContext) (any, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
var verifyInventoryResult InventoryResult
|
var verifyInventoryResult InventoryResult
|
||||||
if err := ctx.CallActivity(VerifyInventoryActivity, workflow.ActivityInput(InventoryRequest{
|
if err := ctx.CallActivity(VerifyInventoryActivity, task.WithActivityInput(InventoryRequest{
|
||||||
RequestID: orderID,
|
RequestID: string(orderID),
|
||||||
ItemName: orderPayload.ItemName,
|
ItemName: orderPayload.ItemName,
|
||||||
Quantity: orderPayload.Quantity,
|
Quantity: orderPayload.Quantity,
|
||||||
})).Await(&verifyInventoryResult); err != nil {
|
})).Await(&verifyInventoryResult); err != nil {
|
||||||
|
@ -1910,64 +1908,64 @@ func OrderProcessingWorkflow(ctx *workflow.WorkflowContext) (any, error) {
|
||||||
|
|
||||||
if !verifyInventoryResult.Success {
|
if !verifyInventoryResult.Success {
|
||||||
notification := Notification{Message: fmt.Sprintf("Insufficient inventory for %s", orderPayload.ItemName)}
|
notification := Notification{Message: fmt.Sprintf("Insufficient inventory for %s", orderPayload.ItemName)}
|
||||||
err := ctx.CallActivity(NotifyActivity, workflow.ActivityInput(notification)).Await(nil)
|
err := ctx.CallActivity(NotifyActivity, task.WithActivityInput(notification)).Await(nil)
|
||||||
return OrderResult{Processed: false}, err
|
return OrderResult{Processed: false}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if orderPayload.TotalCost > 5000 {
|
if orderPayload.TotalCost > 5000 {
|
||||||
var approvalRequired ApprovalRequired
|
var approvalRequired ApprovalRequired
|
||||||
if err := ctx.CallActivity(RequestApprovalActivity, workflow.ActivityInput(orderPayload)).Await(&approvalRequired); err != nil {
|
if err := ctx.CallActivity(RequestApprovalActivity, task.WithActivityInput(orderPayload)).Await(&approvalRequired); err != nil {
|
||||||
return OrderResult{Processed: false}, err
|
return OrderResult{Processed: false}, err
|
||||||
}
|
}
|
||||||
if err := ctx.WaitForExternalEvent("manager_approval", time.Second*200).Await(nil); err != nil {
|
if err := ctx.WaitForSingleEvent("manager_approval", time.Second*200).Await(nil); err != nil {
|
||||||
return OrderResult{Processed: false}, err
|
return OrderResult{Processed: false}, err
|
||||||
}
|
}
|
||||||
// TODO: Confirm timeout flow - this will be in the form of an error.
|
// TODO: Confirm timeout flow - this will be in the form of an error.
|
||||||
if approvalRequired.Approval {
|
if approvalRequired.Approval {
|
||||||
if err := ctx.CallActivity(NotifyActivity, workflow.ActivityInput(Notification{Message: fmt.Sprintf("Payment for order %s has been approved!", orderID)})).Await(nil); err != nil {
|
if err := ctx.CallActivity(NotifyActivity, task.WithActivityInput(Notification{Message: fmt.Sprintf("Payment for order %s has been approved!", orderID)})).Await(nil); err != nil {
|
||||||
log.Printf("failed to notify of a successful order: %v\n", err)
|
log.Printf("failed to notify of a successful order: %v\n", err)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if err := ctx.CallActivity(NotifyActivity, workflow.ActivityInput(Notification{Message: fmt.Sprintf("Payment for order %s has been rejected!", orderID)})).Await(nil); err != nil {
|
if err := ctx.CallActivity(NotifyActivity, task.WithActivityInput(Notification{Message: fmt.Sprintf("Payment for order %s has been rejected!", orderID)})).Await(nil); err != nil {
|
||||||
log.Printf("failed to notify of an unsuccessful order :%v\n", err)
|
log.Printf("failed to notify of an unsuccessful order :%v\n", err)
|
||||||
}
|
}
|
||||||
return OrderResult{Processed: false}, err
|
return OrderResult{Processed: false}, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
err = ctx.CallActivity(ProcessPaymentActivity, workflow.ActivityInput(PaymentRequest{
|
err = ctx.CallActivity(ProcessPaymentActivity, task.WithActivityInput(PaymentRequest{
|
||||||
RequestID: orderID,
|
RequestID: string(orderID),
|
||||||
ItemBeingPurchased: orderPayload.ItemName,
|
ItemBeingPurchased: orderPayload.ItemName,
|
||||||
Amount: orderPayload.TotalCost,
|
Amount: orderPayload.TotalCost,
|
||||||
Quantity: orderPayload.Quantity,
|
Quantity: orderPayload.Quantity,
|
||||||
})).Await(nil)
|
})).Await(nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err := ctx.CallActivity(NotifyActivity, workflow.ActivityInput(Notification{Message: fmt.Sprintf("Order %s failed!", orderID)})).Await(nil); err != nil {
|
if err := ctx.CallActivity(NotifyActivity, task.WithActivityInput(Notification{Message: fmt.Sprintf("Order %s failed!", orderID)})).Await(nil); err != nil {
|
||||||
log.Printf("failed to notify of a failed order: %v", err)
|
log.Printf("failed to notify of a failed order: %v", err)
|
||||||
}
|
}
|
||||||
return OrderResult{Processed: false}, err
|
return OrderResult{Processed: false}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = ctx.CallActivity(UpdateInventoryActivity, workflow.ActivityInput(PaymentRequest{
|
err = ctx.CallActivity(UpdateInventoryActivity, task.WithActivityInput(PaymentRequest{
|
||||||
RequestID: orderID,
|
RequestID: string(orderID),
|
||||||
ItemBeingPurchased: orderPayload.ItemName,
|
ItemBeingPurchased: orderPayload.ItemName,
|
||||||
Amount: orderPayload.TotalCost,
|
Amount: orderPayload.TotalCost,
|
||||||
Quantity: orderPayload.Quantity,
|
Quantity: orderPayload.Quantity,
|
||||||
})).Await(nil)
|
})).Await(nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if err := ctx.CallActivity(NotifyActivity, workflow.ActivityInput(Notification{Message: fmt.Sprintf("Order %s failed!", orderID)})).Await(nil); err != nil {
|
if err := ctx.CallActivity(NotifyActivity, task.WithActivityInput(Notification{Message: fmt.Sprintf("Order %s failed!", orderID)})).Await(nil); err != nil {
|
||||||
log.Printf("failed to notify of a failed order: %v", err)
|
log.Printf("failed to notify of a failed order: %v", err)
|
||||||
}
|
}
|
||||||
return OrderResult{Processed: false}, err
|
return OrderResult{Processed: false}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := ctx.CallActivity(NotifyActivity, workflow.ActivityInput(Notification{Message: fmt.Sprintf("Order %s has completed!", orderID)})).Await(nil); err != nil {
|
if err := ctx.CallActivity(NotifyActivity, task.WithActivityInput(Notification{Message: fmt.Sprintf("Order %s has completed!", orderID)})).Await(nil); err != nil {
|
||||||
log.Printf("failed to notify of a successful order: %v", err)
|
log.Printf("failed to notify of a successful order: %v", err)
|
||||||
}
|
}
|
||||||
return OrderResult{Processed: true}, err
|
return OrderResult{Processed: true}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// NotifyActivity outputs a notification message
|
// NotifyActivity outputs a notification message
|
||||||
func NotifyActivity(ctx workflow.ActivityContext) (any, error) {
|
func NotifyActivity(ctx task.ActivityContext) (any, error) {
|
||||||
var input Notification
|
var input Notification
|
||||||
if err := ctx.GetInput(&input); err != nil {
|
if err := ctx.GetInput(&input); err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
|
@ -1977,7 +1975,7 @@ func NotifyActivity(ctx workflow.ActivityContext) (any, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ProcessPaymentActivity is used to process a payment
|
// ProcessPaymentActivity is used to process a payment
|
||||||
func ProcessPaymentActivity(ctx workflow.ActivityContext) (any, error) {
|
func ProcessPaymentActivity(ctx task.ActivityContext) (any, error) {
|
||||||
var input PaymentRequest
|
var input PaymentRequest
|
||||||
if err := ctx.GetInput(&input); err != nil {
|
if err := ctx.GetInput(&input); err != nil {
|
||||||
return "", err
|
return "", err
|
||||||
|
@ -1987,7 +1985,7 @@ func ProcessPaymentActivity(ctx workflow.ActivityContext) (any, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// VerifyInventoryActivity is used to verify if an item is available in the inventory
|
// VerifyInventoryActivity is used to verify if an item is available in the inventory
|
||||||
func VerifyInventoryActivity(ctx workflow.ActivityContext) (any, error) {
|
func VerifyInventoryActivity(ctx task.ActivityContext) (any, error) {
|
||||||
var input InventoryRequest
|
var input InventoryRequest
|
||||||
if err := ctx.GetInput(&input); err != nil {
|
if err := ctx.GetInput(&input); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -2019,7 +2017,7 @@ func VerifyInventoryActivity(ctx workflow.ActivityContext) (any, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateInventoryActivity modifies the inventory.
|
// UpdateInventoryActivity modifies the inventory.
|
||||||
func UpdateInventoryActivity(ctx workflow.ActivityContext) (any, error) {
|
func UpdateInventoryActivity(ctx task.ActivityContext) (any, error) {
|
||||||
var input PaymentRequest
|
var input PaymentRequest
|
||||||
if err := ctx.GetInput(&input); err != nil {
|
if err := ctx.GetInput(&input); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -2053,7 +2051,7 @@ func UpdateInventoryActivity(ctx workflow.ActivityContext) (any, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// RequestApprovalActivity requests approval for the order
|
// RequestApprovalActivity requests approval for the order
|
||||||
func RequestApprovalActivity(ctx workflow.ActivityContext) (any, error) {
|
func RequestApprovalActivity(ctx task.ActivityContext) (any, error) {
|
||||||
var input OrderPayload
|
var input OrderPayload
|
||||||
if err := ctx.GetInput(&input); err != nil {
|
if err := ctx.GetInput(&input); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -2061,7 +2059,6 @@ func RequestApprovalActivity(ctx workflow.ActivityContext) (any, error) {
|
||||||
fmt.Printf("RequestApprovalActivity: Requesting approval for payment of %dUSD for %d %s\n", input.TotalCost, input.Quantity, input.ItemName)
|
fmt.Printf("RequestApprovalActivity: Requesting approval for payment of %dUSD for %d %s\n", input.TotalCost, input.Quantity, input.ItemName)
|
||||||
return ApprovalRequired{Approval: true}, nil
|
return ApprovalRequired{Approval: true}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
```
|
```
|
||||||
|
|
||||||
{{% /tab %}}
|
{{% /tab %}}
|
||||||
|
|
Loading…
Reference in New Issue