[1.16] Update Go workflow examples to use vanity client

Signed-off-by: joshvanl <me@joshvanl.dev>
This commit is contained in:
joshvanl 2025-09-10 14:18:34 -04:00
parent 62f930f01f
commit 1bc5a32537
2 changed files with 158 additions and 88 deletions

View File

@ -874,86 +874,98 @@ package main
import (
"context"
"errors"
"fmt"
"log"
"time"
"github.com/dapr/durabletask-go/api"
"github.com/dapr/durabletask-go/backend"
"github.com/dapr/durabletask-go/client"
"github.com/dapr/durabletask-go/task"
dapr "github.com/dapr/go-sdk/client"
"github.com/dapr/durabletask-go/workflow"
"github.com/dapr/go-sdk/client"
)
var stage = 0
const (
workflowComponent = "dapr"
)
var failActivityTries = 0
func main() {
registry := task.NewTaskRegistry()
r := workflow.NewRegistry()
if err := registry.AddOrchestrator(TestWorkflow); err != nil {
if err := r.AddWorkflow(TestWorkflow); err != nil {
log.Fatal(err)
}
fmt.Println("TestWorkflow registered")
if err := registry.AddActivity(TestActivity); err != nil {
if err := r.AddActivity(TestActivity); err != nil {
log.Fatal(err)
}
fmt.Println("TestActivity registered")
daprClient, err := dapr.NewClient()
if err := r.AddActivity(FailActivity); err != nil {
log.Fatal(err)
}
fmt.Println("FailActivity registered")
wclient, err := client.NewWorkflowClient()
if err != nil {
log.Fatalf("failed to create Dapr client: %v", err)
log.Fatal(err)
}
fmt.Println("Worker initialized")
client := client.NewTaskHubGrpcClient(daprClient.GrpcClientConn(), backend.DefaultLogger())
if err := client.StartWorkItemListener(context.TODO(), registry); err != nil {
log.Fatalf("failed to start work item listener: %v", err)
ctx, cancel := context.WithCancel(context.Background())
if err = wclient.StartWorker(ctx, r); err != nil {
log.Fatal(err)
}
fmt.Println("runner started")
ctx := context.Background()
// Start workflow test
id, err := client.ScheduleNewOrchestration(ctx, "TestWorkflow", api.WithInput(1))
// Set the start time to the current time to not wait for the workflow to
// "start". This is useful for increasing the throughput of creating
// workflows.
// workflow.WithStartTime(time.Now())
instanceID, err := wclient.StartWorkflow(ctx, "TestWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput(1))
if err != nil {
log.Fatalf("failed to start workflow: %v", err)
}
fmt.Printf("workflow started with id: %v\n", id)
fmt.Printf("workflow started with id: %v\n", instanceID)
// Pause workflow test
err = client.PurgeOrchestrationState(ctx, id)
err = wclient.SuspendWorkflow(ctx, instanceID, "")
if err != nil {
log.Fatalf("failed to pause workflow: %v", err)
}
respGet, err := client.FetchOrchestrationMetadata(ctx, id)
respFetch, err := wclient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true))
if err != nil {
log.Fatalf("failed to get workflow: %v", err)
log.Fatalf("failed to fetch workflow: %v", err)
}
fmt.Printf("workflow paused: %s\n", respGet.RuntimeStatus)
if respFetch.RuntimeStatus != workflow.StatusSuspended {
log.Fatalf("workflow not paused: %s: %v", respFetch.RuntimeStatus, respFetch)
}
fmt.Printf("workflow paused\n")
// Resume workflow test
err = client.ResumeOrchestration(ctx, id, "")
err = wclient.ResumeWorkflow(ctx, instanceID, "")
if err != nil {
log.Fatalf("failed to resume workflow: %v", err)
}
fmt.Printf("workflow running: %s\n", respGet.RuntimeStatus)
respGet, err = client.FetchOrchestrationMetadata(ctx, id)
respFetch, err = wclient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true))
if err != nil {
log.Fatalf("failed to get workflow: %v", err)
}
fmt.Printf("workflow resumed: %s\n", respGet.RuntimeStatus)
if respFetch.RuntimeStatus != workflow.StatusRunning {
log.Fatalf("workflow not running")
}
fmt.Println("workflow resumed")
fmt.Printf("stage: %d\n", stage)
// Raise Event Test
err = client.RaiseEvent(ctx, id, "testEvent", api.WithEventPayload("testData"))
err = wclient.RaiseEvent(ctx, instanceID, "testEvent", workflow.WithEventPayload("testData"))
if err != nil {
fmt.Printf("failed to raise event: %v", err)
}
@ -964,44 +976,99 @@ func main() {
fmt.Printf("stage: %d\n", stage)
respGet, err = client.FetchOrchestrationMetadata(ctx, id)
waitCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
_, err = wclient.WaitForWorkflowCompletion(waitCtx, instanceID)
cancel()
if err != nil {
log.Fatalf("failed to wait for workflow: %v", err)
}
fmt.Printf("fail activity executions: %d\n", failActivityTries)
respFetch, err = wclient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true))
if err != nil {
log.Fatalf("failed to get workflow: %v", err)
}
fmt.Printf("workflow status: %v\n", respGet.RuntimeStatus)
fmt.Printf("workflow status: %v\n", respFetch.String())
// Purge workflow test
err = client.PurgeOrchestrationState(ctx, id)
err = wclient.PurgeWorkflowState(ctx, instanceID)
if err != nil {
log.Fatalf("failed to purge workflow: %v", err)
}
respFetch, err = wclient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true))
if err == nil || respFetch != nil {
log.Fatalf("failed to purge workflow: %v", err)
}
fmt.Println("workflow purged")
fmt.Printf("stage: %d\n", stage)
// Terminate workflow test
id, err := wclient.StartWorkflow(ctx, "TestWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput(1))
if err != nil {
log.Fatalf("failed to start workflow: %v", err)
}
fmt.Printf("workflow started with id: %v\n", instanceID)
metadata, err := wclient.WaitForWorkflowStart(ctx, id)
if err != nil {
log.Fatalf("failed to get workflow: %v", err)
}
fmt.Printf("workflow status: %s\n", metadata.String())
err = wclient.TerminateWorkflow(ctx, id)
if err != nil {
log.Fatalf("failed to terminate workflow: %v", err)
}
fmt.Println("workflow terminated")
err = wclient.PurgeWorkflowState(ctx, id)
if err != nil {
log.Fatalf("failed to purge workflow: %v", err)
}
fmt.Println("workflow purged")
cancel()
fmt.Println("workflow worker successfully shutdown")
}
func TestWorkflow(ctx *task.OrchestrationContext) (any, error) {
func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) {
var input int
if err := ctx.GetInput(&input); err != nil {
return nil, err
}
var output string
if err := ctx.CallActivity(TestActivity, task.WithActivityInput(input)).Await(&output); err != nil {
if err := ctx.CallActivity(TestActivity, workflow.WithActivityInput(input)).Await(&output); err != nil {
return nil, err
}
err := ctx.WaitForSingleEvent("testEvent", time.Second*60).Await(&output)
err := ctx.WaitForExternalEvent("testEvent", time.Second*60).Await(&output)
if err != nil {
return nil, err
}
if err := ctx.CallActivity(TestActivity, task.WithActivityInput(input)).Await(&output); err != nil {
if err := ctx.CallActivity(TestActivity, workflow.WithActivityInput(input)).Await(&output); err != nil {
return nil, err
}
if err := ctx.CallActivity(FailActivity, workflow.WithActivityRetryPolicy(&workflow.RetryPolicy{
MaxAttempts: 3,
InitialRetryInterval: 100 * time.Millisecond,
BackoffCoefficient: 2,
MaxRetryInterval: 1 * time.Second,
})).Await(nil); err == nil {
return nil, fmt.Errorf("unexpected no error executing fail activity")
}
return output, nil
}
func TestActivity(ctx task.ActivityContext) (any, error) {
func TestActivity(ctx workflow.ActivityContext) (any, error) {
var input int
if err := ctx.GetInput(&input); err != nil {
return "", err
@ -1011,6 +1078,11 @@ func TestActivity(ctx task.ActivityContext) (any, error) {
return fmt.Sprintf("Stage: %d", stage), nil
}
func FailActivity(ctx workflow.ActivityContext) (any, error) {
failActivityTries += 1
return nil, errors.New("dummy activity error")
}
```
[See the full Go SDK workflow example in context.](https://github.com/dapr/go-sdk/tree/main/examples/workflow/README.md)

View File

@ -1756,11 +1756,8 @@ import (
"log"
"time"
"github.com/dapr/durabletask-go/api"
"github.com/dapr/durabletask-go/backend"
"github.com/dapr/durabletask-go/client"
"github.com/dapr/durabletask-go/task"
dapr "github.com/dapr/go-sdk/client"
"github.com/dapr/durabletask-go/workflow"
"github.com/dapr/go-sdk/client"
)
var (
@ -1774,43 +1771,46 @@ func main() {
fmt.Println("*** Welcome to the Dapr Workflow console app sample!")
fmt.Println("*** Using this app, you can place orders that start workflows.")
registry := task.NewTaskRegistry()
r := workflow.NewRegistry()
if err := registry.AddOrchestrator(OrderProcessingWorkflow); err != nil {
if err := r.AddWorkflow(OrderProcessingWorkflow); err != nil {
log.Fatal(err)
}
if err := registry.AddActivity(NotifyActivity); err != nil {
if err := r.AddActivity(NotifyActivity); err != nil {
log.Fatal(err)
}
if err := registry.AddActivity(RequestApprovalActivity); err != nil {
if err := r.AddActivity(RequestApprovalActivity); err != nil {
log.Fatal(err)
}
if err := registry.AddActivity(VerifyInventoryActivity); err != nil {
if err := r.AddActivity(VerifyInventoryActivity); err != nil {
log.Fatal(err)
}
if err := registry.AddActivity(ProcessPaymentActivity); err != nil {
if err := r.AddActivity(ProcessPaymentActivity); err != nil {
log.Fatal(err)
}
if err := registry.AddActivity(UpdateInventoryActivity); err != nil {
if err := r.AddActivity(UpdateInventoryActivity); err != nil {
log.Fatal(err)
}
daprClient, err := dapr.NewClient()
wfClient, err := client.NewWorkflowClient()
if err != nil {
log.Fatalf("failed to create Dapr client: %v", err)
log.Fatalf("failed to initialise workflow client: %v", err)
}
client := client.NewTaskHubGrpcClient(daprClient.GrpcClientConn(), backend.DefaultLogger())
if err := client.StartWorkItemListener(context.TODO(), registry); err != nil {
log.Fatalf("failed to start work item listener: %v", err)
if err := wfClient.StartWorker(context.Background(), r); err != nil {
log.Fatal(err)
}
dclient, err := client.NewClient()
if err != nil {
log.Fatal(err)
}
inventory := []InventoryItem{
{ItemName: "paperclip", PerItemCost: 5, Quantity: 100},
{ItemName: "cars", PerItemCost: 5000, Quantity: 10},
{ItemName: "computers", PerItemCost: 500, Quantity: 100},
}
if err := restockInventory(daprClient, inventory); err != nil {
if err := restockInventory(dclient, inventory); err != nil {
log.Fatalf("failed to restock: %v", err)
}
@ -1827,31 +1827,29 @@ func main() {
TotalCost: totalCost,
}
id, err := client.ScheduleNewOrchestration(context.TODO(), workflowName,
api.WithInput(orderPayload),
)
id, err := wfClient.StartWorkflow(context.Background(), workflowName, workflow.WithInput(orderPayload))
if err != nil {
log.Fatalf("failed to start workflow: %v", err)
}
waitCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
_, err = client.WaitForOrchestrationCompletion(waitCtx, id)
_, err = wfClient.WaitForWorkflowCompletion(waitCtx, id)
cancel()
if err != nil {
log.Fatalf("failed to wait for workflow: %v", err)
}
respFetch, err := client.FetchOrchestrationMetadata(context.Background(), id, api.WithFetchPayloads(true))
respFetch, err := wfClient.FetchWorkflowMetadata(context.Background(), id, workflow.WithFetchPayloads(true))
if err != nil {
log.Fatalf("failed to get workflow: %v", err)
}
fmt.Printf("workflow status: %v\n", respFetch.RuntimeStatus)
fmt.Printf("workflow status: %v\n", respFetch.String())
fmt.Println("Purchase of item is complete")
}
func restockInventory(daprClient dapr.Client, inventory []InventoryItem) error {
func restockInventory(daprClient client.Client, inventory []InventoryItem) error {
for _, item := range inventory {
itemSerialized, err := json.Marshal(item)
if err != nil {
@ -1879,18 +1877,18 @@ import (
"log"
"time"
"github.com/dapr/durabletask-go/task"
"github.com/dapr/durabletask-go/workflow"
"github.com/dapr/go-sdk/client"
)
// OrderProcessingWorkflow is the main workflow for orchestrating activities in the order process.
func OrderProcessingWorkflow(ctx *task.OrchestrationContext) (any, error) {
orderID := ctx.ID
func OrderProcessingWorkflow(ctx *workflow.WorkflowContext) (any, error) {
orderID := ctx.ID()
var orderPayload OrderPayload
if err := ctx.GetInput(&orderPayload); err != nil {
return nil, err
}
err := ctx.CallActivity(NotifyActivity, task.WithActivityInput(Notification{
err := ctx.CallActivity(NotifyActivity, workflow.WithActivityInput(Notification{
Message: fmt.Sprintf("Received order %s for %d %s - $%d", orderID, orderPayload.Quantity, orderPayload.ItemName, orderPayload.TotalCost),
})).Await(nil)
if err != nil {
@ -1898,8 +1896,8 @@ func OrderProcessingWorkflow(ctx *task.OrchestrationContext) (any, error) {
}
var verifyInventoryResult InventoryResult
if err := ctx.CallActivity(VerifyInventoryActivity, task.WithActivityInput(InventoryRequest{
RequestID: string(orderID),
if err := ctx.CallActivity(VerifyInventoryActivity, workflow.WithActivityInput(InventoryRequest{
RequestID: orderID,
ItemName: orderPayload.ItemName,
Quantity: orderPayload.Quantity,
})).Await(&verifyInventoryResult); err != nil {
@ -1908,64 +1906,64 @@ func OrderProcessingWorkflow(ctx *task.OrchestrationContext) (any, error) {
if !verifyInventoryResult.Success {
notification := Notification{Message: fmt.Sprintf("Insufficient inventory for %s", orderPayload.ItemName)}
err := ctx.CallActivity(NotifyActivity, task.WithActivityInput(notification)).Await(nil)
err := ctx.CallActivity(NotifyActivity, workflow.WithActivityInput(notification)).Await(nil)
return OrderResult{Processed: false}, err
}
if orderPayload.TotalCost > 5000 {
var approvalRequired ApprovalRequired
if err := ctx.CallActivity(RequestApprovalActivity, task.WithActivityInput(orderPayload)).Await(&approvalRequired); err != nil {
if err := ctx.CallActivity(RequestApprovalActivity, workflow.WithActivityInput(orderPayload)).Await(&approvalRequired); err != nil {
return OrderResult{Processed: false}, err
}
if err := ctx.WaitForSingleEvent("manager_approval", time.Second*200).Await(nil); err != nil {
if err := ctx.WaitForExternalEvent("manager_approval", time.Second*200).Await(nil); err != nil {
return OrderResult{Processed: false}, err
}
// TODO: Confirm timeout flow - this will be in the form of an error.
if approvalRequired.Approval {
if err := ctx.CallActivity(NotifyActivity, task.WithActivityInput(Notification{Message: fmt.Sprintf("Payment for order %s has been approved!", orderID)})).Await(nil); err != nil {
if err := ctx.CallActivity(NotifyActivity, workflow.WithActivityInput(Notification{Message: fmt.Sprintf("Payment for order %s has been approved!", orderID)})).Await(nil); err != nil {
log.Printf("failed to notify of a successful order: %v\n", err)
}
} else {
if err := ctx.CallActivity(NotifyActivity, task.WithActivityInput(Notification{Message: fmt.Sprintf("Payment for order %s has been rejected!", orderID)})).Await(nil); err != nil {
if err := ctx.CallActivity(NotifyActivity, workflow.WithActivityInput(Notification{Message: fmt.Sprintf("Payment for order %s has been rejected!", orderID)})).Await(nil); err != nil {
log.Printf("failed to notify of an unsuccessful order :%v\n", err)
}
return OrderResult{Processed: false}, err
}
}
err = ctx.CallActivity(ProcessPaymentActivity, task.WithActivityInput(PaymentRequest{
RequestID: string(orderID),
err = ctx.CallActivity(ProcessPaymentActivity, workflow.WithActivityInput(PaymentRequest{
RequestID: orderID,
ItemBeingPurchased: orderPayload.ItemName,
Amount: orderPayload.TotalCost,
Quantity: orderPayload.Quantity,
})).Await(nil)
if err != nil {
if err := ctx.CallActivity(NotifyActivity, task.WithActivityInput(Notification{Message: fmt.Sprintf("Order %s failed!", orderID)})).Await(nil); err != nil {
if err := ctx.CallActivity(NotifyActivity, workflow.WithActivityInput(Notification{Message: fmt.Sprintf("Order %s failed!", orderID)})).Await(nil); err != nil {
log.Printf("failed to notify of a failed order: %v", err)
}
return OrderResult{Processed: false}, err
}
err = ctx.CallActivity(UpdateInventoryActivity, task.WithActivityInput(PaymentRequest{
RequestID: string(orderID),
err = ctx.CallActivity(UpdateInventoryActivity, workflow.WithActivityInput(PaymentRequest{
RequestID: orderID,
ItemBeingPurchased: orderPayload.ItemName,
Amount: orderPayload.TotalCost,
Quantity: orderPayload.Quantity,
})).Await(nil)
if err != nil {
if err := ctx.CallActivity(NotifyActivity, task.WithActivityInput(Notification{Message: fmt.Sprintf("Order %s failed!", orderID)})).Await(nil); err != nil {
if err := ctx.CallActivity(NotifyActivity, workflow.WithActivityInput(Notification{Message: fmt.Sprintf("Order %s failed!", orderID)})).Await(nil); err != nil {
log.Printf("failed to notify of a failed order: %v", err)
}
return OrderResult{Processed: false}, err
}
if err := ctx.CallActivity(NotifyActivity, task.WithActivityInput(Notification{Message: fmt.Sprintf("Order %s has completed!", orderID)})).Await(nil); err != nil {
if err := ctx.CallActivity(NotifyActivity, workflow.WithActivityInput(Notification{Message: fmt.Sprintf("Order %s has completed!", orderID)})).Await(nil); err != nil {
log.Printf("failed to notify of a successful order: %v", err)
}
return OrderResult{Processed: true}, err
}
// NotifyActivity outputs a notification message
func NotifyActivity(ctx task.ActivityContext) (any, error) {
func NotifyActivity(ctx workflow.ActivityContext) (any, error) {
var input Notification
if err := ctx.GetInput(&input); err != nil {
return "", err
@ -1975,7 +1973,7 @@ func NotifyActivity(ctx task.ActivityContext) (any, error) {
}
// ProcessPaymentActivity is used to process a payment
func ProcessPaymentActivity(ctx task.ActivityContext) (any, error) {
func ProcessPaymentActivity(ctx workflow.ActivityContext) (any, error) {
var input PaymentRequest
if err := ctx.GetInput(&input); err != nil {
return "", err
@ -1985,7 +1983,7 @@ func ProcessPaymentActivity(ctx task.ActivityContext) (any, error) {
}
// VerifyInventoryActivity is used to verify if an item is available in the inventory
func VerifyInventoryActivity(ctx task.ActivityContext) (any, error) {
func VerifyInventoryActivity(ctx workflow.ActivityContext) (any, error) {
var input InventoryRequest
if err := ctx.GetInput(&input); err != nil {
return nil, err
@ -2017,7 +2015,7 @@ func VerifyInventoryActivity(ctx task.ActivityContext) (any, error) {
}
// UpdateInventoryActivity modifies the inventory.
func UpdateInventoryActivity(ctx task.ActivityContext) (any, error) {
func UpdateInventoryActivity(ctx workflow.ActivityContext) (any, error) {
var input PaymentRequest
if err := ctx.GetInput(&input); err != nil {
return nil, err
@ -2051,7 +2049,7 @@ func UpdateInventoryActivity(ctx task.ActivityContext) (any, error) {
}
// RequestApprovalActivity requests approval for the order
func RequestApprovalActivity(ctx task.ActivityContext) (any, error) {
func RequestApprovalActivity(ctx workflow.ActivityContext) (any, error) {
var input OrderPayload
if err := ctx.GetInput(&input); err != nil {
return nil, err