mirror of https://github.com/dapr/go-sdk.git
232 lines
6.0 KiB
Go
232 lines
6.0 KiB
Go
/*
|
|
Copyright 2024 The Dapr Authors
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"log"
|
|
"time"
|
|
|
|
"github.com/dapr/go-sdk/workflow"
|
|
)
|
|
|
|
var stage = 0
|
|
var failActivityTries = 0
|
|
|
|
func main() {
|
|
w, err := workflow.NewWorker()
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
|
|
fmt.Println("Worker initialized")
|
|
|
|
if err := w.RegisterWorkflow(TestWorkflow); err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
fmt.Println("TestWorkflow registered")
|
|
|
|
if err := w.RegisterActivity(TestActivity); err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
fmt.Println("TestActivity registered")
|
|
|
|
if err := w.RegisterActivity(FailActivity); err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
fmt.Println("FailActivity registered")
|
|
|
|
// Start workflow runner
|
|
if err := w.Start(); err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
fmt.Println("runner started")
|
|
|
|
wfClient, err := workflow.NewClient()
|
|
if err != nil {
|
|
log.Fatalf("failed to intialise client: %v", err)
|
|
}
|
|
defer wfClient.Close()
|
|
ctx := context.Background()
|
|
|
|
// Start workflow test
|
|
instanceID, err := wfClient.ScheduleNewWorkflow(ctx, "TestWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput(1))
|
|
if err != nil {
|
|
log.Fatalf("failed to start workflow: %v", err)
|
|
}
|
|
fmt.Printf("workflow started with id: %v\n", instanceID)
|
|
|
|
// Pause workflow test
|
|
err = wfClient.SuspendWorkflow(ctx, instanceID, "")
|
|
if err != nil {
|
|
log.Fatalf("failed to pause workflow: %v", err)
|
|
}
|
|
|
|
respFetch, err := wfClient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true))
|
|
if err != nil {
|
|
log.Fatalf("failed to fetch workflow: %v", err)
|
|
}
|
|
|
|
if respFetch.RuntimeStatus != workflow.StatusSuspended {
|
|
log.Fatalf("workflow not paused: %v", respFetch.RuntimeStatus)
|
|
}
|
|
|
|
fmt.Printf("workflow paused\n")
|
|
|
|
// Resume workflow test
|
|
err = wfClient.ResumeWorkflow(ctx, instanceID, "")
|
|
if err != nil {
|
|
log.Fatalf("failed to resume workflow: %v", err)
|
|
}
|
|
|
|
respFetch, err = wfClient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true))
|
|
if err != nil {
|
|
log.Fatalf("failed to get workflow: %v", err)
|
|
}
|
|
|
|
if respFetch.RuntimeStatus != workflow.StatusRunning {
|
|
log.Fatalf("workflow not running")
|
|
}
|
|
|
|
fmt.Println("workflow resumed")
|
|
|
|
fmt.Printf("stage: %d\n", stage)
|
|
|
|
// Raise Event Test
|
|
|
|
err = wfClient.RaiseEvent(ctx, instanceID, "testEvent", workflow.WithEventPayload("testData"))
|
|
if err != nil {
|
|
fmt.Printf("failed to raise event: %v", err)
|
|
}
|
|
|
|
fmt.Println("workflow event raised")
|
|
|
|
time.Sleep(time.Second) // allow workflow to advance
|
|
|
|
fmt.Printf("stage: %d\n", stage)
|
|
|
|
waitCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
_, err = wfClient.WaitForWorkflowCompletion(waitCtx, instanceID)
|
|
cancel()
|
|
if err != nil {
|
|
log.Fatalf("failed to wait for workflow: %v", err)
|
|
}
|
|
|
|
fmt.Printf("fail activity executions: %d\n", failActivityTries)
|
|
|
|
respFetch, err = wfClient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true))
|
|
if err != nil {
|
|
log.Fatalf("failed to get workflow: %v", err)
|
|
}
|
|
|
|
fmt.Printf("workflow status: %v\n", respFetch.RuntimeStatus)
|
|
|
|
// Purge workflow test
|
|
err = wfClient.PurgeWorkflow(ctx, instanceID)
|
|
if err != nil {
|
|
log.Fatalf("failed to purge workflow: %v", err)
|
|
}
|
|
|
|
respFetch, err = wfClient.FetchWorkflowMetadata(ctx, instanceID, workflow.WithFetchPayloads(true))
|
|
if err == nil || respFetch != nil {
|
|
log.Fatalf("failed to purge workflow: %v", err)
|
|
}
|
|
|
|
fmt.Println("workflow purged")
|
|
|
|
fmt.Printf("stage: %d\n", stage)
|
|
|
|
// Terminate workflow test
|
|
id, err := wfClient.ScheduleNewWorkflow(ctx, "TestWorkflow", workflow.WithInstanceID("a7a4168d-3a1c-41da-8a4f-e7f6d9c718d9"), workflow.WithInput(1))
|
|
if err != nil {
|
|
log.Fatalf("failed to start workflow: %v", err)
|
|
}
|
|
fmt.Printf("workflow started with id: %v\n", instanceID)
|
|
|
|
metadata, err := wfClient.WaitForWorkflowStart(ctx, id)
|
|
if err != nil {
|
|
log.Fatalf("failed to get workflow: %v", err)
|
|
}
|
|
fmt.Printf("workflow status: %s\n", metadata.RuntimeStatus.String())
|
|
|
|
err = wfClient.TerminateWorkflow(ctx, id)
|
|
if err != nil {
|
|
log.Fatalf("failed to terminate workflow: %v", err)
|
|
}
|
|
fmt.Println("workflow terminated")
|
|
|
|
err = wfClient.PurgeWorkflow(ctx, id)
|
|
if err != nil {
|
|
log.Fatalf("failed to purge workflow: %v", err)
|
|
}
|
|
fmt.Println("workflow purged")
|
|
|
|
// stop workflow runtime
|
|
if err := w.Shutdown(); err != nil {
|
|
log.Fatalf("failed to shutdown runtime: %v", err)
|
|
}
|
|
|
|
fmt.Println("workflow worker successfully shutdown")
|
|
}
|
|
|
|
func TestWorkflow(ctx *workflow.WorkflowContext) (any, error) {
|
|
var input int
|
|
if err := ctx.GetInput(&input); err != nil {
|
|
return nil, err
|
|
}
|
|
var output string
|
|
if err := ctx.CallActivity(TestActivity, workflow.ActivityInput(input)).Await(&output); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
err := ctx.WaitForExternalEvent("testEvent", time.Second*60).Await(&output)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := ctx.CallActivity(TestActivity, workflow.ActivityInput(input)).Await(&output); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if err := ctx.CallActivity(FailActivity, workflow.ActivityRetryPolicy(workflow.RetryPolicy{
|
|
MaxAttempts: 3,
|
|
InitialRetryInterval: 100 * time.Millisecond,
|
|
BackoffCoefficient: 2,
|
|
MaxRetryInterval: 1 * time.Second,
|
|
})).Await(nil); err == nil {
|
|
return nil, fmt.Errorf("unexpected no error executing fail activity")
|
|
}
|
|
|
|
return output, nil
|
|
}
|
|
|
|
func TestActivity(ctx workflow.ActivityContext) (any, error) {
|
|
var input int
|
|
if err := ctx.GetInput(&input); err != nil {
|
|
return "", err
|
|
}
|
|
|
|
stage += input
|
|
|
|
return fmt.Sprintf("Stage: %d", stage), nil
|
|
}
|
|
|
|
func FailActivity(ctx workflow.ActivityContext) (any, error) {
|
|
failActivityTries += 1
|
|
return nil, errors.New("dummy activity error")
|
|
}
|