Updates to workflow requests and responses

This commit is contained in:
Ryan Lettieri 2022-08-24 15:14:08 -06:00
parent a154700b7e
commit 3066378a62
3 changed files with 10 additions and 8 deletions

View File

@ -61,7 +61,7 @@ func ConformanceTests(t *testing.T, props map[string]string, workflowItem workfl
req := &workflows.StartRequest{
Parameters: 10, // Time that the activity within the workflow runs for
}
req.WorkflowInfo.WorkflowId = "TestWorkflow"
req.WorkflowInfo.InstanceId = "TestWorkflow"
req.Options.TaskQueue = "TestTaskQueue"
wf, err := workflowItem.Start(context.Background(), req)
assert.Nil(t, err)

View File

@ -11,7 +11,6 @@ type StartRequestOptions struct {
}
type WorkflowStruct struct {
WorkflowId string `json:"workflow_id"`
InstanceId string `json:"instance_id"`
}
@ -19,6 +18,7 @@ type WorkflowStruct struct {
type StartRequest struct {
Options StartRequestOptions `json:"workflow_options"`
WorkflowInfo WorkflowStruct `json:"workflow_info"`
WorkflowName string `json:"function_name"`
Parameters interface{} `json:"parameters"`
}

View File

@ -69,23 +69,25 @@ func (c *TemporalWF) Init(metadata workflows.Metadata) error {
func (c *TemporalWF) Start(ctx context.Context, req *workflows.StartRequest) (*workflows.WorkflowStruct, error) {
c.logger.Debugf("starting workflow")
if req.Options.TaskQueue == "" {
c.logger.Debugf("no task queue provided")
return &workflows.WorkflowStruct{}, nil
}
opt := client.StartWorkflowOptions{ID: req.WorkflowInfo.WorkflowId, TaskQueue: req.Options.TaskQueue}
run, err := c.client.ExecuteWorkflow(ctx, opt, req.WorkflowInfo.WorkflowId, req.Parameters)
opt := client.StartWorkflowOptions{ID: req.WorkflowInfo.InstanceId, TaskQueue: req.Options.TaskQueue}
run, err := c.client.ExecuteWorkflow(ctx, opt, req.WorkflowName, req.Parameters)
if err != nil {
c.logger.Debugf("error when starting workflow")
return &workflows.WorkflowStruct{}, err
}
wfStruct := workflows.WorkflowStruct{WorkflowId: run.GetID(), InstanceId: run.GetRunID()}
wfStruct := workflows.WorkflowStruct{InstanceId: run.GetID()}
return &wfStruct, nil
}
func (c *TemporalWF) Terminate(ctx context.Context, req *workflows.WorkflowStruct) error {
c.logger.Debugf("terminating workflow")
err := c.client.TerminateWorkflow(ctx, req.WorkflowId, req.InstanceId, "")
err := c.client.TerminateWorkflow(ctx, req.InstanceId, "", "")
if err != nil {
return err
}
@ -94,13 +96,13 @@ func (c *TemporalWF) Terminate(ctx context.Context, req *workflows.WorkflowStruc
func (c *TemporalWF) Get(ctx context.Context, req *workflows.WorkflowStruct) (*workflows.StateResponse, error) {
c.logger.Debugf("getting workflow data")
resp, err := c.client.DescribeWorkflowExecution(ctx, req.WorkflowId, req.InstanceId)
resp, err := c.client.DescribeWorkflowExecution(ctx, req.InstanceId, "")
if err != nil {
return nil, err
}
// Build the output struct
outputStruct := workflows.StateResponse{
WfInfo: workflows.WorkflowStruct{WorkflowId: req.WorkflowId, InstanceId: req.InstanceId},
WfInfo: workflows.WorkflowStruct{InstanceId: req.InstanceId},
StartTime: resp.WorkflowExecutionInfo.StartTime.String(),
TaskQueue: resp.WorkflowExecutionInfo.GetTaskQueue(),
Status: lookupStatus(resp.WorkflowExecutionInfo.Status),