go-sdk/examples/workflow/main.go

232 lines
6.0 KiB
Go

/*
Copyright 2024 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"context"
"errors"
"fmt"
"log"
"time"
"github.com/dapr/go-sdk/workflow"
)
var stage = 0
var failActivityTries = 0
func main() {
w, err := workflow.NewWorker()
if err != nil {
log.Fatal(err)
}
fmt.Println("Worker initialized")
if err := w.RegisterWorkflow(TestWorkflow); err != nil {
log.Fatal(err)
}
fmt.Println("TestWorkflow registered")
if err := w.RegisterActivity(TestActivity); err != nil {
log.Fatal(err)
}
fmt.Println("TestActivity registered")
if err := w.RegisterActivity(FailActivity); err != nil {
log.Fatal(err)
}
fmt.Println("FailActivity registered")
// Start workflow runner
if err := w.Start(); err != nil {
log.Fatal(err)
}
fmt.Println("runner started")
wfClient, err := workflow.NewClient()
if err != nil {
log.Fatalf("failed to intialise client: %v", err)
}
defer wfClient.Close()
ctx := context.Background()
// Start workflow test
instanceID, err := wfClient.ScheduleNewWorkflow(ctx, "TestWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput(1))
if err != nil {
log.Fatalf("failed to start workflow: %v", err)
}
fmt.Printf("workflow started with id: %v\n", instanceID)
// Pause workflow test
err = wfClient.SuspendWorkflow(ctx, instanceID, "")
if err != nil {
log.Fatalf("failed to pause workflow: %v", err)
}
respFetch, err := wfClient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true))
if err != nil {
log.Fatalf("failed to fetch workflow: %v", err)
}
if respFetch.RuntimeStatus != workflow.StatusSuspended {
log.Fatalf("workflow not paused: %v", respFetch.RuntimeStatus)
}
fmt.Printf("workflow paused\n")
// Resume workflow test
err = wfClient.ResumeWorkflow(ctx, instanceID, "")
if err != nil {
log.Fatalf("failed to resume workflow: %v", err)
}
respFetch, err = wfClient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true))
if err != nil {
log.Fatalf("failed to get workflow: %v", err)
}
if respFetch.RuntimeStatus != workflow.StatusRunning {
log.Fatalf("workflow not running")
}
fmt.Println("workflow resumed")
fmt.Printf("stage: %d\n", stage)
// Raise Event Test
err = wfClient.RaiseEvent(ctx, instanceID, "testEvent", workflow.WithEventPayload("testData"))
if err != nil {
fmt.Printf("failed to raise event: %v", err)
}
fmt.Println("workflow event raised")
time.Sleep(time.Second) // allow workflow to advance
fmt.Printf("stage: %d\n", stage)
waitCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
_, err = wfClient.WaitForWorkflowCompletion(waitCtx, instanceID)
cancel()
if err != nil {
log.Fatalf("failed to wait for workflow: %v", err)
}
fmt.Printf("fail activity executions: %d\n", failActivityTries)
respFetch, err = wfClient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true))
if err != nil {
log.Fatalf("failed to get workflow: %v", err)
}
fmt.Printf("workflow status: %v\n", respFetch.RuntimeStatus)
// Purge workflow test
err = wfClient.PurgeWorkflow(ctx, instanceID)
if err != nil {
log.Fatalf("failed to purge workflow: %v", err)
}
respFetch, err = wfClient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true))
if err == nil || respFetch != nil {
log.Fatalf("failed to purge workflow: %v", err)
}
fmt.Println("workflow purged")
fmt.Printf("stage: %d\n", stage)
// Terminate workflow test
id, err := wfClient.ScheduleNewWorkflow(ctx, "TestWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput(1))
if err != nil {
log.Fatalf("failed to start workflow: %v", err)
}
fmt.Printf("workflow started with id: %v\n", instanceID)
metadata, err := wfClient.WaitForWorkflowStart(ctx, id)
if err != nil {
log.Fatalf("failed to get workflow: %v", err)
}
fmt.Printf("workflow status: %s\n", metadata.RuntimeStatus.String())
err = wfClient.TerminateWorkflow(ctx, id)
if err != nil {
log.Fatalf("failed to terminate workflow: %v", err)
}
fmt.Println("workflow terminated")
err = wfClient.PurgeWorkflow(ctx, id)
if err != nil {
log.Fatalf("failed to purge workflow: %v", err)
}
fmt.Println("workflow purged")
// stop workflow runtime
if err := w.Shutdown(); err != nil {
log.Fatalf("failed to shutdown runtime: %v", err)
}
fmt.Println("workflow worker successfully shutdown")
}
func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) {
var input int
if err := ctx.GetInput(&input); err != nil {
return nil, err
}
var output string
if err := ctx.CallActivity(TestActivity, workflow.ActivityInput(input)).Await(&output); err != nil {
return nil, err
}
err := ctx.WaitForExternalEvent("testEvent", time.Second*60).Await(&output)
if err != nil {
return nil, err
}
if err := ctx.CallActivity(TestActivity, workflow.ActivityInput(input)).Await(&output); err != nil {
return nil, err
}
if err := ctx.CallActivity(FailActivity, workflow.ActivityRetryPolicy(workflow.RetryPolicy{
MaxAttempts: 3,
InitialRetryInterval: 100 * time.Millisecond,
BackoffCoefficient: 2,
MaxRetryInterval: 1 * time.Second,
})).Await(nil); err == nil {
return nil, fmt.Errorf("unexpected no error executing fail activity")
}
return output, nil
}
func TestActivity(ctx workflow.ActivityContext) (any, error) {
var input int
if err := ctx.GetInput(&input); err != nil {
return "", err
}
stage += input
return fmt.Sprintf("Stage: %d", stage), nil
}
func FailActivity(ctx workflow.ActivityContext) (any, error) {
failActivityTries += 1
return nil, errors.New("dummy activity error")
}