diff --git a/.github/workflows/validate_examples.yaml b/.github/workflows/validate_examples.yaml index d67db1a..7ec624a 100644 --- a/.github/workflows/validate_examples.yaml +++ b/.github/workflows/validate_examples.yaml @@ -38,9 +38,9 @@ jobs: CHECKOUT_REF: ${{ github.ref }} outputs: DAPR_INSTALL_URL: ${{ env.DAPR_INSTALL_URL }} - DAPR_CLI_VER: ${{ steps.outputs.outputs.DAPR_CLI_VER }} + DAPR_CLI_VER: 1.16.0-rc.1 DAPR_CLI_REF: ${{ steps.outputs.outputs.DAPR_CLI_REF }} - DAPR_RUNTIME_VER: ${{ steps.outputs.outputs.DAPR_RUNTIME_VER }} + DAPR_RUNTIME_VER: 1.16.0-rc.3 CHECKOUT_REPO: ${{ steps.outputs.outputs.CHECKOUT_REPO }} CHECKOUT_REF: ${{ steps.outputs.outputs.CHECKOUT_REF }} DAPR_REF: ${{ steps.outputs.outputs.DAPR_REF }} @@ -175,6 +175,7 @@ jobs: "socket", "workflow", "workflow-parallel", + "workflow-taskexecutionid" ] steps: - name: Check out code onto GOPATH diff --git a/client/conversation.go b/client/conversation.go index 0109426..5ddefb1 100644 --- a/client/conversation.go +++ b/client/conversation.go @@ -101,8 +101,10 @@ func WithTemperature(temp float64) conversationRequestOption { // ConverseAlpha1 can invoke an LLM given a request created by the NewConversationRequest function. func (c *GRPCClient) ConverseAlpha1(ctx context.Context, req conversationRequest, options ...conversationRequestOption) (*ConversationResponse, error) { + //nolint:staticcheck cinputs := make([]*runtimev1pb.ConversationInput, len(req.inputs)) for i, in := range req.inputs { + //nolint:staticcheck cinputs[i] = &runtimev1pb.ConversationInput{ Content: in.Content, Role: in.Role, @@ -115,7 +117,7 @@ func (c *GRPCClient) ConverseAlpha1(ctx context.Context, req conversationRequest opt(&req) } } - + //nolint:staticcheck request := runtimev1pb.ConversationRequest{ Name: req.name, ContextID: req.ContextID, diff --git a/examples/go.mod b/examples/go.mod index ca51ced..313cda9 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -18,11 +18,11 @@ require ( github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect - github.com/dapr/dapr v1.15.4-0.20250618123356-78343f18338b // indirect - github.com/dapr/durabletask-go v0.7.2 // indirect + github.com/dapr/dapr v1.16.0-rc.3 // indirect + github.com/dapr/durabletask-go v0.7.3-0.20250711135247-7a35af6fe0e5 // indirect github.com/dapr/kit v0.15.4 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect - github.com/go-chi/chi/v5 v5.1.0 // indirect + github.com/go-chi/chi/v5 v5.2.2 // indirect github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/sirupsen/logrus v1.9.3 // indirect diff --git a/examples/go.sum b/examples/go.sum index 541518a..244ea52 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -6,10 +6,10 @@ github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK3 github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= -github.com/dapr/dapr v1.15.4-0.20250618123356-78343f18338b h1:fR+ae4QXF8R4GqKrzEls7WaibF1wjiVvifUl+IoP37I= -github.com/dapr/dapr v1.15.4-0.20250618123356-78343f18338b/go.mod h1:kx/7l7wDGkKRVoE6CUtuNl1FjKA0hj7bn/6xJ1Fc6HY= -github.com/dapr/durabletask-go v0.7.2 h1:ssNupibV65/o5HNJRceU6x7D4LSyrGsz6nfMFUcI540= -github.com/dapr/durabletask-go v0.7.2/go.mod h1:JhMyDybRUFmmgieGxCPeg9e2cWwtx4LwNXjD+LBtKYk= +github.com/dapr/dapr v1.16.0-rc.3 h1:D99V20GOhb+bZXH1PngME+wgzIZCcBFOvmaP7DOZxGo= +github.com/dapr/dapr v1.16.0-rc.3/go.mod h1:uyKnxMohSg87LSFzZ/oyuiGSo0+qkzeR0eXncPyIV9c= +github.com/dapr/durabletask-go v0.7.3-0.20250711135247-7a35af6fe0e5 h1:l8oBGwcfCwqvSYDZwla0A2fhENmXFc1Wk4lR0VEq+is= +github.com/dapr/durabletask-go v0.7.3-0.20250711135247-7a35af6fe0e5/go.mod h1:0Ts4rXp74JyG19gDWPcwNo5V6NBZzhARzHF5XynmA7Q= github.com/dapr/kit v0.15.4 h1:29DezCR22OuZhXX4yPEc+lqcOf/PNaeAuIEx9nGv394= github.com/dapr/kit v0.15.4/go.mod h1:HwFsBKEbcyLanWlDZE7u/jnaDCD/tU+n3pkFNUctQNw= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -20,8 +20,8 @@ github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/r github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k= github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= -github.com/go-chi/chi/v5 v5.1.0 h1:acVI1TYaD+hhedDJ3r54HyA6sExp3HfXq7QWEEY/xMw= -github.com/go-chi/chi/v5 v5.1.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8= +github.com/go-chi/chi/v5 v5.2.2 h1:CMwsvRVTbXVytCk1Wd72Zy1LAsAh9GxMmSNWLHCG618= +github.com/go-chi/chi/v5 v5.2.2/go.mod h1:L2yAIGWB3H+phAw1NxKwWM+7eUH/lU8pOMm5hHcoops= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= diff --git a/examples/workflow-taskexecutionid/README.md b/examples/workflow-taskexecutionid/README.md new file mode 100644 index 0000000..a0afa99 --- /dev/null +++ b/examples/workflow-taskexecutionid/README.md @@ -0,0 +1,61 @@ +# Dapr Parallel Workflow Example with go-sdk + +## Step + +### Prepare + +- Dapr installed + +### Run Workflow + + + +```bash +dapr run --app-id workflow-taskexecutionid \ + --dapr-grpc-port 50001 \ + --log-level debug \ + --resources-path ./config \ + -- go run ./main.go +``` + + + +## Result + +``` + - '== APP == Workflow(s) and activities registered.' + - 'work item listener started' + - '== APP == RetryN 1' + - '== APP == RetryN 2' + - '== APP == RetryN 3' + - '== APP == RetryN 4' + - '== APP == RetryN 1' + - '== APP == RetryN 2' + - '== APP == RetryN 3' + - '== APP == RetryN 4' + - '== APP == workflow status: COMPLETED' + - '== APP == workflow terminated' + - '== APP == workflow purged' +``` + diff --git a/examples/workflow-taskexecutionid/config/redis.yaml b/examples/workflow-taskexecutionid/config/redis.yaml new file mode 100644 index 0000000..5bb57b3 --- /dev/null +++ b/examples/workflow-taskexecutionid/config/redis.yaml @@ -0,0 +1,14 @@ +apiVersion: dapr.io/v1alpha1 +kind: Component +metadata: + name: wf-store +spec: + type: state.redis + version: v1 + metadata: + - name: redisHost + value: localhost:6379 + - name: redisPassword + value: "" + - name: actorStateStore + value: "true" diff --git a/examples/workflow-taskexecutionid/main.go b/examples/workflow-taskexecutionid/main.go new file mode 100644 index 0000000..95bef58 --- /dev/null +++ b/examples/workflow-taskexecutionid/main.go @@ -0,0 +1,108 @@ +package main + +import ( + "context" + "fmt" + "log" + "sync" + "sync/atomic" + "time" + + "github.com/dapr/go-sdk/workflow" +) + +func main() { + w, err := workflow.NewWorker() + if err != nil { + log.Fatalf("failed to initialise worker: %v", err) + } + + if err := w.RegisterWorkflow(TaskExecutionIdWorkflow); err != nil { + log.Fatalf("failed to register workflow: %v", err) + } + if err := w.RegisterActivity(RetryN); err != nil { + log.Fatalf("failed to register activity: %v", err) + } + fmt.Println("Workflow(s) and activities registered.") + + if err := w.Start(); err != nil { + log.Fatalf("failed to start worker") + } + + wfClient, err := workflow.NewClient() + if err != nil { + log.Fatalf("failed to initialise client: %v", err) + } + ctx := context.Background() + id, err := wfClient.ScheduleNewWorkflow(ctx, "TaskExecutionIdWorkflow", workflow.WithInput(5)) + if err != nil { + log.Fatalf("failed to schedule a new workflow: %v", err) + } + + metadata, err := wfClient.WaitForWorkflowCompletion(ctx, id) + if err != nil { + log.Fatalf("failed to get workflow: %v", err) + } + fmt.Printf("workflow status: %s\n", metadata.RuntimeStatus.String()) + + err = wfClient.TerminateWorkflow(ctx, id) + if err != nil { + log.Fatalf("failed to terminate workflow: %v", err) + } + fmt.Println("workflow terminated") + + err = wfClient.PurgeWorkflow(ctx, id) + if err != nil { + log.Fatalf("failed to purge workflow: %v", err) + } + fmt.Println("workflow purged") +} + +var eMap = sync.Map{} + +func TaskExecutionIdWorkflow(ctx *workflow.WorkflowContext) (any, error) { + var retries int + if err := ctx.GetInput(&retries); err != nil { + return 0, err + } + + var workBatch []int + if err := ctx.CallActivity(RetryN, workflow.ActivityRetryPolicy(workflow.RetryPolicy{ + MaxAttempts: retries, + InitialRetryInterval: 100 * time.Millisecond, + BackoffCoefficient: 2, + MaxRetryInterval: 1 * time.Second, + }), workflow.ActivityInput(retries)).Await(&workBatch); err != nil { + return 0, err + } + + if err := ctx.CallActivity(RetryN, workflow.ActivityRetryPolicy(workflow.RetryPolicy{ + MaxAttempts: retries, + InitialRetryInterval: 100 * time.Millisecond, + BackoffCoefficient: 2, + MaxRetryInterval: 1 * time.Second, + }), workflow.ActivityInput(retries)).Await(&workBatch); err != nil { + return 0, err + } + + return 0, nil +} + +func RetryN(ctx workflow.ActivityContext) (any, error) { + taskExecutionID := ctx.GetTaskExecutionID() + counter, _ := eMap.LoadOrStore(taskExecutionID, &atomic.Int32{}) + var retries int32 + if err := ctx.GetInput(&retries); err != nil { + return 0, err + } + + counter.(*atomic.Int32).Add(1) + fmt.Println("RetryN ", counter.(*atomic.Int32).Load()) + + if counter.(*atomic.Int32).Load() < retries-1 { + return nil, fmt.Errorf("failed") + } + + return nil, nil + +} diff --git a/go.mod b/go.mod index bd5152e..374def2 100644 --- a/go.mod +++ b/go.mod @@ -3,10 +3,10 @@ module github.com/dapr/go-sdk go 1.24.4 require ( - github.com/dapr/dapr v1.15.4-0.20250618123356-78343f18338b - github.com/dapr/durabletask-go v0.7.2 + github.com/dapr/dapr v1.16.0-rc.3 + github.com/dapr/durabletask-go v0.7.3-0.20250711135247-7a35af6fe0e5 github.com/dapr/kit v0.15.4 - github.com/go-chi/chi/v5 v5.1.0 + github.com/go-chi/chi/v5 v5.2.2 github.com/golang/mock v1.6.0 github.com/google/uuid v1.6.0 github.com/stretchr/testify v1.10.0 diff --git a/go.sum b/go.sum index 562997d..84395fe 100644 --- a/go.sum +++ b/go.sum @@ -1,17 +1,17 @@ github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= -github.com/dapr/dapr v1.15.4-0.20250618123356-78343f18338b h1:fR+ae4QXF8R4GqKrzEls7WaibF1wjiVvifUl+IoP37I= -github.com/dapr/dapr v1.15.4-0.20250618123356-78343f18338b/go.mod h1:kx/7l7wDGkKRVoE6CUtuNl1FjKA0hj7bn/6xJ1Fc6HY= -github.com/dapr/durabletask-go v0.7.2 h1:ssNupibV65/o5HNJRceU6x7D4LSyrGsz6nfMFUcI540= -github.com/dapr/durabletask-go v0.7.2/go.mod h1:JhMyDybRUFmmgieGxCPeg9e2cWwtx4LwNXjD+LBtKYk= +github.com/dapr/dapr v1.16.0-rc.3 h1:D99V20GOhb+bZXH1PngME+wgzIZCcBFOvmaP7DOZxGo= +github.com/dapr/dapr v1.16.0-rc.3/go.mod h1:uyKnxMohSg87LSFzZ/oyuiGSo0+qkzeR0eXncPyIV9c= +github.com/dapr/durabletask-go v0.7.3-0.20250711135247-7a35af6fe0e5 h1:l8oBGwcfCwqvSYDZwla0A2fhENmXFc1Wk4lR0VEq+is= +github.com/dapr/durabletask-go v0.7.3-0.20250711135247-7a35af6fe0e5/go.mod h1:0Ts4rXp74JyG19gDWPcwNo5V6NBZzhARzHF5XynmA7Q= github.com/dapr/kit v0.15.4 h1:29DezCR22OuZhXX4yPEc+lqcOf/PNaeAuIEx9nGv394= github.com/dapr/kit v0.15.4/go.mod h1:HwFsBKEbcyLanWlDZE7u/jnaDCD/tU+n3pkFNUctQNw= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/go-chi/chi/v5 v5.1.0 h1:acVI1TYaD+hhedDJ3r54HyA6sExp3HfXq7QWEEY/xMw= -github.com/go-chi/chi/v5 v5.1.0/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8= +github.com/go-chi/chi/v5 v5.2.2 h1:CMwsvRVTbXVytCk1Wd72Zy1LAsAh9GxMmSNWLHCG618= +github.com/go-chi/chi/v5 v5.2.2/go.mod h1:L2yAIGWB3H+phAw1NxKwWM+7eUH/lU8pOMm5hHcoops= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= diff --git a/workflow/activity_context.go b/workflow/activity_context.go index f4f1123..9dbe786 100644 --- a/workflow/activity_context.go +++ b/workflow/activity_context.go @@ -36,6 +36,10 @@ func (wfac *ActivityContext) Context() context.Context { return wfac.ctx.Context() } +func (wfac *ActivityContext) GetTaskExecutionID() string { + return wfac.ctx.GetTaskExecutionID() +} + type callActivityOption func(*callActivityOptions) error type callActivityOptions struct { diff --git a/workflow/activity_context_test.go b/workflow/activity_context_test.go index f033ada..239232b 100644 --- a/workflow/activity_context_test.go +++ b/workflow/activity_context_test.go @@ -28,8 +28,17 @@ import ( ) type testingTaskActivityContext struct { - inputBytes []byte - ctx context.Context + inputBytes []byte + ctx context.Context + taskExecutionID string +} + +func (t *testingTaskActivityContext) GetTaskID() int32 { + return 0 +} + +func (t *testingTaskActivityContext) GetTaskExecutionID() string { + return t.taskExecutionID } func (t *testingTaskActivityContext) GetInput(v any) error { @@ -119,3 +128,19 @@ func TestMarshalData(t *testing.T) { assert.Equal(t, []byte{0x22, 0x74, 0x65, 0x73, 0x74, 0x53, 0x74, 0x72, 0x69, 0x6e, 0x67, 0x22}, out) }) } + +func TestTaskExecutionID(t *testing.T) { + ac := ActivityContext{ctx: &testingTaskActivityContext{ctx: t.Context(), taskExecutionID: "testTaskExecutionID"}} + + t.Run("test getTaskExecutionID", func(t *testing.T) { + assert.Equal(t, "testTaskExecutionID", ac.GetTaskExecutionID()) + }) +} + +func TestTaskID(t *testing.T) { + ac := ActivityContext{ctx: &testingTaskActivityContext{ctx: t.Context(), taskExecutionID: "testTaskExecutionID"}} + + t.Run("test getTaskID", func(t *testing.T) { + assert.EqualValues(t, 0, ac.ctx.GetTaskID()) + }) +}