Addressing review comments for workflow building block API

Signed-off-by: Ryan Lettieri <ryanLettieri@microsoft.com>
This commit is contained in:
Ryan Lettieri 2022-09-08 16:17:19 -06:00
parent bd1be3f73e
commit 1205859e3e
7 changed files with 52 additions and 38 deletions

View File

@ -17,6 +17,8 @@ import (
"context"
"time"
"github.com/dapr/kit/logger"
"github.com/zouyx/agollo/v3/component/log"
"go.temporal.io/sdk/activity"
"go.temporal.io/sdk/client"
"go.temporal.io/sdk/worker"
@ -26,6 +28,7 @@ import (
func main() {
// Sleep for a bit so the docker container can spin up
time.Sleep(30 * time.Second)
logger logger.Logger
TaskQueueString := "TestTaskQueue"
// construct client here
@ -35,6 +38,7 @@ func main() {
// Create the workflow client
clientTwo, err := client.Dial(cOpt)
if err != nil {
log.Error("Unable to create client.")
return
}
wOpt := worker.Options{}
@ -47,6 +51,7 @@ func main() {
err = w.Start()
if err != nil {
log.Error("Unable to start worker.")
return
}
w.Run(worker.InterruptCh())
@ -63,9 +68,9 @@ func TestWorkflow(ctx workflow.Context, runtimeSeconds int) error {
}
ctx = workflow.WithActivityOptions(ctx, options)
newCtx, _ := workflow.NewDisconnectedContext(ctx)
err := workflow.ExecuteActivity(newCtx, ExampleActivity, runtimeSeconds).Get(ctx, nil)
err := workflow.ExecuteActivity(ctx, ExampleActivity, runtimeSeconds).Get(ctx, nil)
if err != nil {
log.Error("Unable to execute activity.")
return err
}

View File

@ -50,7 +50,7 @@ func ConformanceTests(t *testing.T, props map[string]string, workflowItem workfl
err := workflowItem.Init(workflows.Metadata{
Properties: props,
})
assert.Nil(t, err)
assert.NoError(t, err)
})
// Everything is within the same task since the workflow needs to persist between operations
@ -61,21 +61,22 @@ func ConformanceTests(t *testing.T, props map[string]string, workflowItem workfl
Parameters: 10, // Time that the activity within the workflow runs for
WorkflowName: "TestWorkflow",
}
req.WorkflowInfo.InstanceId = "TestID"
req.Options.TaskQueue = "TestTaskQueue"
req.WorkflowInfo.InstanceID = "TestID"
req.Options = make(map[string]string)
req.Options["task_queue"] = "TestTaskQueue"
wf, err := workflowItem.Start(context.Background(), req)
assert.Nil(t, err)
assert.NoError(t, err)
resp, err := workflowItem.Get(context.Background(), wf)
assert.Nil(t, err)
assert.NoError(t, err)
assert.Equal(t, "Running", resp.Status)
time.Sleep(5 * time.Second)
resp, err = workflowItem.Get(context.Background(), wf)
assert.Nil(t, err)
assert.NoError(t, err)
assert.Equal(t, resp.Status, "Running")
err = workflowItem.Terminate(context.Background(), wf)
assert.Nil(t, err)
assert.NoError(t, err)
resp, err = workflowItem.Get(context.Background(), wf)
assert.Nil(t, err)
assert.NoError(t, err)
assert.Equal(t, "Terminated", resp.Status)
})
testLogger.Info("Start test done.")

View File

@ -5,3 +5,7 @@ A workflow is custom application logic that consists of a set of tasks and or st
## Implementing a new Workflow
A compliant workflow needs to implement the `Workflow` inteface included in the [`workflow.go`](workflow.go) file.
## Using Temporal
When using temporal as the workflow, the task queue must be provided as an Option in the start request struct with the key: `task_queue`

View File

@ -6,20 +6,16 @@ import (
"go.temporal.io/sdk/workflow"
)
type StartRequestOptions struct {
TaskQueue string `json:"task_queue"`
}
type WorkflowStruct struct {
InstanceId string `json:"instance_id"`
type WorkflowReference struct {
InstanceID string `json:"instance_id"`
}
// StartRequest is the object describing a Start Workflow request.
type StartRequest struct {
Options StartRequestOptions `json:"workflow_options"`
WorkflowInfo WorkflowStruct `json:"workflow_info"`
WorkflowName string `json:"function_name"`
Parameters interface{} `json:"parameters"`
Options map[string]string `json:"workflow_options"`
WorkflowInfo WorkflowReference `json:"workflow_info"`
WorkflowName string `json:"function_name"`
Parameters interface{} `json:"parameters"`
}
// CreateWorkerRequest is the object describing a Create Worker request.

View File

@ -1,7 +1,7 @@
package workflows
type StateResponse struct {
WfInfo WorkflowStruct
WFInfo WorkflowReference
StartTime string `json:"start_time"`
TaskQueue string `json:"task_queue"`
Status string `json:"status"`

View File

@ -16,6 +16,7 @@ package temporal
import (
"context"
"encoding/json"
"errors"
"github.com/dapr/components-contrib/workflows"
"github.com/dapr/kit/logger"
@ -67,42 +68,49 @@ func (c *TemporalWF) Init(metadata workflows.Metadata) error {
return nil
}
func (c *TemporalWF) Start(ctx context.Context, req *workflows.StartRequest) (*workflows.WorkflowStruct, error) {
func (c *TemporalWF) Start(ctx context.Context, req *workflows.StartRequest) (*workflows.WorkflowReference, error) {
c.logger.Debugf("starting workflow")
if req.Options.TaskQueue == "" {
c.logger.Debugf("no task queue provided")
return &workflows.WorkflowStruct{}, nil
if len(req.Options) == 0 {
c.logger.Debugf("no options provided")
return &workflows.WorkflowReference{}, errors.New("no options provided. At the very least, a task queue is needed")
}
opt := client.StartWorkflowOptions{ID: req.WorkflowInfo.InstanceId, TaskQueue: req.Options.TaskQueue}
if _, ok := req.Options["task_queue"]; !ok {
c.logger.Debugf("no task queue provided")
return &workflows.WorkflowReference{}, errors.New("no task queue provided")
}
taskQ := req.Options["task_queue"]
opt := client.StartWorkflowOptions{ID: req.WorkflowInfo.InstanceID, TaskQueue: taskQ}
run, err := c.client.ExecuteWorkflow(ctx, opt, req.WorkflowName, req.Parameters)
if err != nil {
c.logger.Debugf("error when starting workflow")
return &workflows.WorkflowStruct{}, err
return &workflows.WorkflowReference{}, err
}
wfStruct := workflows.WorkflowStruct{InstanceId: run.GetID()}
wfStruct := workflows.WorkflowReference{InstanceID: run.GetID()}
return &wfStruct, nil
}
func (c *TemporalWF) Terminate(ctx context.Context, req *workflows.WorkflowStruct) error {
func (c *TemporalWF) Terminate(ctx context.Context, req *workflows.WorkflowReference) error {
c.logger.Debugf("terminating workflow")
err := c.client.TerminateWorkflow(ctx, req.InstanceId, "", "")
err := c.client.TerminateWorkflow(ctx, req.InstanceID, "", "")
if err != nil {
return err
}
return nil
}
func (c *TemporalWF) Get(ctx context.Context, req *workflows.WorkflowStruct) (*workflows.StateResponse, error) {
func (c *TemporalWF) Get(ctx context.Context, req *workflows.WorkflowReference) (*workflows.StateResponse, error) {
c.logger.Debugf("getting workflow data")
resp, err := c.client.DescribeWorkflowExecution(ctx, req.InstanceId, "")
resp, err := c.client.DescribeWorkflowExecution(ctx, req.InstanceID, "")
if err != nil {
return nil, err
}
// Build the output struct
outputStruct := workflows.StateResponse{
WfInfo: workflows.WorkflowStruct{InstanceId: req.InstanceId},
WFInfo: workflows.WorkflowReference{InstanceID: req.InstanceID},
StartTime: resp.WorkflowExecutionInfo.StartTime.String(),
TaskQueue: resp.WorkflowExecutionInfo.GetTaskQueue(),
Status: lookupStatus(resp.WorkflowExecutionInfo.Status),
@ -116,14 +124,14 @@ func (c *TemporalWF) Close() {
c.client.Close()
}
func (c *TemporalWF) parseMetadata(metadata workflows.Metadata) (*temporalMetaData, error) {
func (c *TemporalWF) parseMetadata(metadata workflows.Metadata) (*temporalMetadata, error) {
connInfo := metadata.Properties
b, err := json.Marshal(connInfo)
if err != nil {
return nil, err
}
var creds temporalMetaData
var creds temporalMetadata
err = json.Unmarshal(b, &creds)
if err != nil {
return nil, err

View File

@ -18,7 +18,7 @@ import "context"
// Workflow is an interface to perform operations on Workflow.
type Workflow interface {
Init(metadata Metadata) error
Start(ctx context.Context, req *StartRequest) (*WorkflowStruct, error)
Terminate(ctx context.Context, req *WorkflowStruct) error
Get(ctx context.Context, req *WorkflowStruct) (*StateResponse, error)
Start(ctx context.Context, req *StartRequest) (*WorkflowReference, error)
Terminate(ctx context.Context, req *WorkflowReference) error
Get(ctx context.Context, req *WorkflowReference) (*StateResponse, error)
}