Add context in bindings
Signed-off-by: pigletfly <wangbing.adam@gmail.com>
This commit is contained in:
parent
77de5439da
commit
84759553bc
|
@ -236,7 +236,7 @@ func (r *RabbitMQ) declareQueue() (amqp.Queue, error) {
|
|||
return r.channel.QueueDeclare(r.metadata.QueueName, r.metadata.Durable, r.metadata.DeleteWhenUnused, r.metadata.Exclusive, false, args)
|
||||
}
|
||||
|
||||
func (r *RabbitMQ) Read(handler func(*bindings.ReadResponse) ([]byte, error)) error {
|
||||
func (r *RabbitMQ) Read(handler func(context.Context, *bindings.ReadResponse) ([]byte, error)) error {
|
||||
msgs, err := r.channel.Consume(
|
||||
r.queue.Name,
|
||||
"",
|
||||
|
@ -254,7 +254,7 @@ func (r *RabbitMQ) Read(handler func(*bindings.ReadResponse) ([]byte, error)) er
|
|||
|
||||
go func() {
|
||||
for d := range msgs {
|
||||
_, err := handler(&bindings.ReadResponse{
|
||||
_, err := handler(context.TODO(), &bindings.ReadResponse{
|
||||
Data: d.Body,
|
||||
})
|
||||
if err == nil {
|
||||
|
|
|
@ -37,7 +37,7 @@ type activateJobsPayload struct {
|
|||
FetchVariables []string `json:"fetchVariables"`
|
||||
}
|
||||
|
||||
func (z *ZeebeCommand) activateJobs(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
|
||||
func (z *ZeebeCommand) activateJobs(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
|
||||
var payload activateJobsPayload
|
||||
err := json.Unmarshal(req.Data, &payload)
|
||||
if err != nil {
|
||||
|
@ -68,7 +68,6 @@ func (z *ZeebeCommand) activateJobs(req *bindings.InvokeRequest) (*bindings.Invo
|
|||
cmd = cmd.FetchVariables(payload.FetchVariables...)
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
response, err := cmd.Send(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot activate jobs for type %s: %w", payload.JobType, err)
|
||||
|
|
|
@ -103,7 +103,7 @@ func TestActivateJobs(t *testing.T) {
|
|||
t.Run("jobType is mandatory", func(t *testing.T) {
|
||||
cmd := ZeebeCommand{logger: testLogger}
|
||||
req := &bindings.InvokeRequest{Operation: ActivateJobsOperation}
|
||||
_, err := cmd.Invoke(req)
|
||||
_, err := cmd.Invoke(context.TODO(), req)
|
||||
assert.Error(t, err, ErrMissingJobType)
|
||||
})
|
||||
|
||||
|
@ -116,7 +116,7 @@ func TestActivateJobs(t *testing.T) {
|
|||
|
||||
cmd := ZeebeCommand{logger: testLogger}
|
||||
req := &bindings.InvokeRequest{Data: data, Operation: ActivateJobsOperation}
|
||||
_, err = cmd.Invoke(req)
|
||||
_, err = cmd.Invoke(context.TODO(), req)
|
||||
assert.Error(t, err, ErrMissingMaxJobsToActivate)
|
||||
})
|
||||
|
||||
|
@ -133,7 +133,7 @@ func TestActivateJobs(t *testing.T) {
|
|||
var mc mockActivateJobsClient
|
||||
|
||||
cmd := ZeebeCommand{logger: testLogger, client: &mc}
|
||||
_, err = cmd.Invoke(req)
|
||||
_, err = cmd.Invoke(context.TODO(), req)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, payload.JobType, mc.cmd1.jobType)
|
||||
|
@ -156,7 +156,7 @@ func TestActivateJobs(t *testing.T) {
|
|||
var mc mockActivateJobsClient
|
||||
|
||||
cmd := ZeebeCommand{logger: testLogger, client: &mc}
|
||||
_, err = cmd.Invoke(req)
|
||||
_, err = cmd.Invoke(context.TODO(), req)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, payload.JobType, mc.cmd1.jobType)
|
||||
|
|
|
@ -28,7 +28,7 @@ type cancelInstancePayload struct {
|
|||
ProcessInstanceKey *int64 `json:"processInstanceKey"`
|
||||
}
|
||||
|
||||
func (z *ZeebeCommand) cancelInstance(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
|
||||
func (z *ZeebeCommand) cancelInstance(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
|
||||
var payload cancelInstancePayload
|
||||
err := json.Unmarshal(req.Data, &payload)
|
||||
if err != nil {
|
||||
|
@ -41,7 +41,7 @@ func (z *ZeebeCommand) cancelInstance(req *bindings.InvokeRequest) (*bindings.In
|
|||
|
||||
_, err = z.client.NewCancelInstanceCommand().
|
||||
ProcessInstanceKey(*payload.ProcessInstanceKey).
|
||||
Send(context.Background())
|
||||
Send(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot cancel instance for process instance key %d: %w", payload.ProcessInstanceKey, err)
|
||||
}
|
||||
|
|
|
@ -66,7 +66,7 @@ func TestCancelInstance(t *testing.T) {
|
|||
t.Run("processInstanceKey is mandatory", func(t *testing.T) {
|
||||
cmd := ZeebeCommand{logger: testLogger}
|
||||
req := &bindings.InvokeRequest{Operation: CancelInstanceOperation}
|
||||
_, err := cmd.Invoke(req)
|
||||
_, err := cmd.Invoke(context.TODO(), req)
|
||||
assert.Error(t, err, ErrMissingProcessInstanceKey)
|
||||
})
|
||||
|
||||
|
@ -82,7 +82,7 @@ func TestCancelInstance(t *testing.T) {
|
|||
var mc mockCancelInstanceClient
|
||||
|
||||
cmd := ZeebeCommand{logger: testLogger, client: &mc}
|
||||
_, err = cmd.Invoke(req)
|
||||
_, err = cmd.Invoke(context.TODO(), req)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, *payload.ProcessInstanceKey, mc.cmd1.cmd2.processInstanceKey)
|
||||
|
|
|
@ -14,6 +14,7 @@ limitations under the License.
|
|||
package command
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
|
@ -88,30 +89,30 @@ func (z *ZeebeCommand) Operations() []bindings.OperationKind {
|
|||
}
|
||||
}
|
||||
|
||||
func (z *ZeebeCommand) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
|
||||
func (z *ZeebeCommand) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
|
||||
switch req.Operation {
|
||||
case TopologyOperation:
|
||||
return z.topology()
|
||||
return z.topology(ctx)
|
||||
case DeployProcessOperation:
|
||||
return z.deployProcess(req)
|
||||
return z.deployProcess(ctx, req)
|
||||
case CreateInstanceOperation:
|
||||
return z.createInstance(req)
|
||||
return z.createInstance(ctx, req)
|
||||
case CancelInstanceOperation:
|
||||
return z.cancelInstance(req)
|
||||
return z.cancelInstance(ctx, req)
|
||||
case SetVariablesOperation:
|
||||
return z.setVariables(req)
|
||||
return z.setVariables(ctx, req)
|
||||
case ResolveIncidentOperation:
|
||||
return z.resolveIncident(req)
|
||||
return z.resolveIncident(ctx, req)
|
||||
case PublishMessageOperation:
|
||||
return z.publishMessage(req)
|
||||
return z.publishMessage(ctx, req)
|
||||
case ActivateJobsOperation:
|
||||
return z.activateJobs(req)
|
||||
return z.activateJobs(ctx, req)
|
||||
case CompleteJobOperation:
|
||||
return z.completeJob(req)
|
||||
return z.completeJob(ctx, req)
|
||||
case FailJobOperation:
|
||||
return z.failJob(req)
|
||||
return z.failJob(ctx, req)
|
||||
case UpdateJobRetriesOperation:
|
||||
return z.updateJobRetries(req)
|
||||
return z.updateJobRetries(ctx, req)
|
||||
case ThrowErrorOperation:
|
||||
return z.throwError(req)
|
||||
case bindings.GetOperation:
|
||||
|
|
|
@ -14,6 +14,7 @@ limitations under the License.
|
|||
package command
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
|
||||
|
@ -84,7 +85,7 @@ func TestInvoke(t *testing.T) {
|
|||
t.Run("operation must be supported", func(t *testing.T) {
|
||||
cmd := ZeebeCommand{logger: testLogger}
|
||||
req := &bindings.InvokeRequest{Operation: bindings.DeleteOperation}
|
||||
_, err := cmd.Invoke(req)
|
||||
_, err := cmd.Invoke(context.TODO(), req)
|
||||
assert.Error(t, err, ErrUnsupportedOperation(bindings.DeleteOperation))
|
||||
})
|
||||
}
|
||||
|
|
|
@ -28,7 +28,7 @@ type completeJobPayload struct {
|
|||
Variables interface{} `json:"variables"`
|
||||
}
|
||||
|
||||
func (z *ZeebeCommand) completeJob(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
|
||||
func (z *ZeebeCommand) completeJob(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
|
||||
var payload completeJobPayload
|
||||
err := json.Unmarshal(req.Data, &payload)
|
||||
if err != nil {
|
||||
|
@ -49,7 +49,7 @@ func (z *ZeebeCommand) completeJob(req *bindings.InvokeRequest) (*bindings.Invok
|
|||
}
|
||||
}
|
||||
|
||||
_, err = cmd3.Send(context.Background())
|
||||
_, err = cmd3.Send(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot complete job for key %d: %w", payload.JobKey, err)
|
||||
}
|
||||
|
|
|
@ -80,7 +80,7 @@ func TestCompleteJob(t *testing.T) {
|
|||
t.Run("elementInstanceKey is mandatory", func(t *testing.T) {
|
||||
cmd := ZeebeCommand{logger: testLogger}
|
||||
req := &bindings.InvokeRequest{Operation: CompleteJobOperation}
|
||||
_, err := cmd.Invoke(req)
|
||||
_, err := cmd.Invoke(context.TODO(), req)
|
||||
assert.Error(t, err, ErrMissingJobKey)
|
||||
})
|
||||
|
||||
|
@ -99,7 +99,7 @@ func TestCompleteJob(t *testing.T) {
|
|||
var mc mockCompleteJobClient
|
||||
|
||||
cmd := ZeebeCommand{logger: testLogger, client: &mc}
|
||||
_, err = cmd.Invoke(req)
|
||||
_, err = cmd.Invoke(context.TODO(), req)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, *payload.JobKey, mc.cmd1.jobKey)
|
||||
|
|
|
@ -36,7 +36,7 @@ type createInstancePayload struct {
|
|||
Variables interface{} `json:"variables"`
|
||||
}
|
||||
|
||||
func (z *ZeebeCommand) createInstance(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
|
||||
func (z *ZeebeCommand) createInstance(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
|
||||
var payload createInstancePayload
|
||||
err := json.Unmarshal(req.Data, &payload)
|
||||
if err != nil {
|
||||
|
@ -75,7 +75,7 @@ func (z *ZeebeCommand) createInstance(req *bindings.InvokeRequest) (*bindings.In
|
|||
}
|
||||
}
|
||||
|
||||
response, err := cmd3.Send(context.Background())
|
||||
response, err := cmd3.Send(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot create instane for %s: %w", errorDetail, err)
|
||||
}
|
||||
|
|
|
@ -112,7 +112,7 @@ func TestCreateInstance(t *testing.T) {
|
|||
var mc mockCreateInstanceClient
|
||||
|
||||
cmd := ZeebeCommand{logger: testLogger, client: &mc}
|
||||
_, err = cmd.Invoke(req)
|
||||
_, err = cmd.Invoke(context.TODO(), req)
|
||||
assert.Error(t, err, ErrAmbiguousCreationVars)
|
||||
})
|
||||
|
||||
|
@ -126,7 +126,7 @@ func TestCreateInstance(t *testing.T) {
|
|||
var mc mockCreateInstanceClient
|
||||
|
||||
cmd := ZeebeCommand{logger: testLogger, client: &mc}
|
||||
_, err = cmd.Invoke(req)
|
||||
_, err = cmd.Invoke(context.TODO(), req)
|
||||
assert.Error(t, err, ErrMissingCreationVars)
|
||||
})
|
||||
|
||||
|
@ -143,7 +143,7 @@ func TestCreateInstance(t *testing.T) {
|
|||
var mc mockCreateInstanceClient
|
||||
|
||||
cmd := ZeebeCommand{logger: testLogger, client: &mc}
|
||||
_, err = cmd.Invoke(req)
|
||||
_, err = cmd.Invoke(context.TODO(), req)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, payload.BpmnProcessID, mc.cmd1.bpmnProcessID)
|
||||
|
@ -162,7 +162,7 @@ func TestCreateInstance(t *testing.T) {
|
|||
var mc mockCreateInstanceClient
|
||||
|
||||
cmd := ZeebeCommand{logger: testLogger, client: &mc}
|
||||
_, err = cmd.Invoke(req)
|
||||
_, err = cmd.Invoke(context.TODO(), req)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, payload.BpmnProcessID, mc.cmd1.bpmnProcessID)
|
||||
|
@ -181,7 +181,7 @@ func TestCreateInstance(t *testing.T) {
|
|||
var mc mockCreateInstanceClient
|
||||
|
||||
cmd := ZeebeCommand{logger: testLogger, client: &mc}
|
||||
_, err = cmd.Invoke(req)
|
||||
_, err = cmd.Invoke(context.TODO(), req)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, *payload.ProcessDefinitionKey, mc.cmd1.processDefinitionKey)
|
||||
|
@ -202,7 +202,7 @@ func TestCreateInstance(t *testing.T) {
|
|||
var mc mockCreateInstanceClient
|
||||
|
||||
cmd := ZeebeCommand{logger: testLogger, client: &mc}
|
||||
_, err = cmd.Invoke(req)
|
||||
_, err = cmd.Invoke(context.TODO(), req)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, *payload.ProcessDefinitionKey, mc.cmd1.processDefinitionKey)
|
||||
|
|
|
@ -29,7 +29,7 @@ const (
|
|||
|
||||
var ErrMissingFileName = errors.New("fileName is a required attribute")
|
||||
|
||||
func (z *ZeebeCommand) deployProcess(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
|
||||
func (z *ZeebeCommand) deployProcess(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
|
||||
var deployFileName string
|
||||
|
||||
if val, ok := req.Metadata[fileName]; ok && val != "" {
|
||||
|
@ -40,7 +40,7 @@ func (z *ZeebeCommand) deployProcess(req *bindings.InvokeRequest) (*bindings.Inv
|
|||
|
||||
response, err := z.client.NewDeployProcessCommand().
|
||||
AddResource(req.Data, deployFileName).
|
||||
Send(context.Background())
|
||||
Send(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot deploy process with fileName %s: %w", deployFileName, err)
|
||||
}
|
||||
|
|
|
@ -14,6 +14,7 @@ limitations under the License.
|
|||
package command
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
@ -28,7 +29,7 @@ func TestDeployProcess(t *testing.T) {
|
|||
t.Run("fileName is mandatory", func(t *testing.T) {
|
||||
cmd := ZeebeCommand{logger: testLogger}
|
||||
req := &bindings.InvokeRequest{Operation: DeployProcessOperation}
|
||||
_, err := cmd.Invoke(req)
|
||||
_, err := cmd.Invoke(context.TODO(), req)
|
||||
assert.Error(t, err, ErrMissingFileName)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -30,7 +30,7 @@ type failJobPayload struct {
|
|||
ErrorMessage string `json:"errorMessage"`
|
||||
}
|
||||
|
||||
func (z *ZeebeCommand) failJob(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
|
||||
func (z *ZeebeCommand) failJob(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
|
||||
var payload failJobPayload
|
||||
err := json.Unmarshal(req.Data, &payload)
|
||||
if err != nil {
|
||||
|
@ -53,7 +53,7 @@ func (z *ZeebeCommand) failJob(req *bindings.InvokeRequest) (*bindings.InvokeRes
|
|||
cmd = cmd.ErrorMessage(payload.ErrorMessage)
|
||||
}
|
||||
|
||||
_, err = cmd.Send(context.Background())
|
||||
_, err = cmd.Send(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot fail job for key %d: %w", payload.JobKey, err)
|
||||
}
|
||||
|
|
|
@ -87,7 +87,7 @@ func TestFailJob(t *testing.T) {
|
|||
t.Run("jobKey is mandatory", func(t *testing.T) {
|
||||
cmd := ZeebeCommand{logger: testLogger}
|
||||
req := &bindings.InvokeRequest{Operation: FailJobOperation}
|
||||
_, err := cmd.Invoke(req)
|
||||
_, err := cmd.Invoke(context.TODO(), req)
|
||||
assert.Error(t, err, ErrMissingJobKey)
|
||||
})
|
||||
|
||||
|
@ -100,7 +100,7 @@ func TestFailJob(t *testing.T) {
|
|||
|
||||
cmd := ZeebeCommand{logger: testLogger}
|
||||
req := &bindings.InvokeRequest{Data: data, Operation: FailJobOperation}
|
||||
_, err = cmd.Invoke(req)
|
||||
_, err = cmd.Invoke(context.TODO(), req)
|
||||
assert.Error(t, err, ErrMissingRetries)
|
||||
})
|
||||
|
||||
|
@ -118,7 +118,7 @@ func TestFailJob(t *testing.T) {
|
|||
var mc mockFailJobClient
|
||||
|
||||
cmd := ZeebeCommand{logger: testLogger, client: &mc}
|
||||
_, err = cmd.Invoke(req)
|
||||
_, err = cmd.Invoke(context.TODO(), req)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, *payload.JobKey, mc.cmd1.jobKey)
|
||||
|
|
|
@ -34,7 +34,7 @@ type publishMessagePayload struct {
|
|||
Variables interface{} `json:"variables"`
|
||||
}
|
||||
|
||||
func (z *ZeebeCommand) publishMessage(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
|
||||
func (z *ZeebeCommand) publishMessage(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
|
||||
var payload publishMessagePayload
|
||||
err := json.Unmarshal(req.Data, &payload)
|
||||
if err != nil {
|
||||
|
@ -64,7 +64,6 @@ func (z *ZeebeCommand) publishMessage(req *bindings.InvokeRequest) (*bindings.In
|
|||
}
|
||||
}
|
||||
|
||||
ctx := context.Background()
|
||||
response, err := cmd.Send(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot publish message with name %s: %w", payload.MessageName, err)
|
||||
|
|
|
@ -104,7 +104,7 @@ func TestPublishMessage(t *testing.T) {
|
|||
t.Run("messageName is mandatory", func(t *testing.T) {
|
||||
cmd := ZeebeCommand{logger: testLogger}
|
||||
req := &bindings.InvokeRequest{Operation: PublishMessageOperation}
|
||||
_, err := cmd.Invoke(req)
|
||||
_, err := cmd.Invoke(context.TODO(), req)
|
||||
assert.Error(t, err, ErrMissingMessageName)
|
||||
})
|
||||
|
||||
|
@ -120,7 +120,7 @@ func TestPublishMessage(t *testing.T) {
|
|||
var mc mockPublishMessageClient
|
||||
|
||||
cmd := ZeebeCommand{logger: testLogger, client: &mc}
|
||||
_, err = cmd.Invoke(req)
|
||||
_, err = cmd.Invoke(context.TODO(), req)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, payload.MessageName, mc.cmd1.messageName)
|
||||
|
@ -145,7 +145,7 @@ func TestPublishMessage(t *testing.T) {
|
|||
var mc mockPublishMessageClient
|
||||
|
||||
cmd := ZeebeCommand{logger: testLogger, client: &mc}
|
||||
_, err = cmd.Invoke(req)
|
||||
_, err = cmd.Invoke(context.TODO(), req)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, payload.MessageName, mc.cmd1.messageName)
|
||||
|
|
|
@ -28,7 +28,7 @@ type resolveIncidentPayload struct {
|
|||
IncidentKey *int64 `json:"incidentKey"`
|
||||
}
|
||||
|
||||
func (z *ZeebeCommand) resolveIncident(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
|
||||
func (z *ZeebeCommand) resolveIncident(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
|
||||
var payload resolveIncidentPayload
|
||||
err := json.Unmarshal(req.Data, &payload)
|
||||
if err != nil {
|
||||
|
@ -41,7 +41,7 @@ func (z *ZeebeCommand) resolveIncident(req *bindings.InvokeRequest) (*bindings.I
|
|||
|
||||
_, err = z.client.NewResolveIncidentCommand().
|
||||
IncidentKey(*payload.IncidentKey).
|
||||
Send(context.Background())
|
||||
Send(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot resolve incident for key %d: %w", payload.IncidentKey, err)
|
||||
}
|
||||
|
|
|
@ -66,7 +66,7 @@ func TestResolveIncident(t *testing.T) {
|
|||
t.Run("incidentKey is mandatory", func(t *testing.T) {
|
||||
cmd := ZeebeCommand{logger: testLogger}
|
||||
req := &bindings.InvokeRequest{Operation: ResolveIncidentOperation}
|
||||
_, err := cmd.Invoke(req)
|
||||
_, err := cmd.Invoke(context.TODO(), req)
|
||||
assert.Error(t, err, ErrMissingIncidentKey)
|
||||
})
|
||||
|
||||
|
@ -82,7 +82,7 @@ func TestResolveIncident(t *testing.T) {
|
|||
var mc mockResolveIncidentClient
|
||||
|
||||
cmd := ZeebeCommand{logger: testLogger, client: &mc}
|
||||
_, err = cmd.Invoke(req)
|
||||
_, err = cmd.Invoke(context.TODO(), req)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, *payload.IncidentKey, mc.cmd1.incidentKey)
|
||||
|
|
|
@ -33,7 +33,7 @@ type setVariablesPayload struct {
|
|||
Variables interface{} `json:"variables"`
|
||||
}
|
||||
|
||||
func (z *ZeebeCommand) setVariables(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
|
||||
func (z *ZeebeCommand) setVariables(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
|
||||
var payload setVariablesPayload
|
||||
err := json.Unmarshal(req.Data, &payload)
|
||||
if err != nil {
|
||||
|
@ -55,7 +55,7 @@ func (z *ZeebeCommand) setVariables(req *bindings.InvokeRequest) (*bindings.Invo
|
|||
return nil, err
|
||||
}
|
||||
|
||||
response, err := cmd.Local(payload.Local).Send(context.Background())
|
||||
response, err := cmd.Local(payload.Local).Send(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot set variables for element instance key %d: %w", payload.ElementInstanceKey, err)
|
||||
}
|
||||
|
|
|
@ -87,7 +87,7 @@ func TestSetVariables(t *testing.T) {
|
|||
t.Run("elementInstanceKey is mandatory", func(t *testing.T) {
|
||||
cmd := ZeebeCommand{logger: testLogger}
|
||||
req := &bindings.InvokeRequest{Operation: SetVariablesOperation}
|
||||
_, err := cmd.Invoke(req)
|
||||
_, err := cmd.Invoke(context.TODO(), req)
|
||||
assert.Error(t, err, ErrMissingElementInstanceKey)
|
||||
})
|
||||
|
||||
|
@ -100,7 +100,7 @@ func TestSetVariables(t *testing.T) {
|
|||
|
||||
cmd := ZeebeCommand{logger: testLogger}
|
||||
req := &bindings.InvokeRequest{Data: data, Operation: SetVariablesOperation}
|
||||
_, err = cmd.Invoke(req)
|
||||
_, err = cmd.Invoke(context.TODO(), req)
|
||||
assert.Error(t, err, ErrMissingVariables)
|
||||
})
|
||||
|
||||
|
@ -119,7 +119,7 @@ func TestSetVariables(t *testing.T) {
|
|||
var mc mockSetVariableClient
|
||||
|
||||
cmd := ZeebeCommand{logger: testLogger, client: &mc}
|
||||
_, err = cmd.Invoke(req)
|
||||
_, err = cmd.Invoke(context.TODO(), req)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, *payload.ElementInstanceKey, mc.cmd1.elementInstanceKey)
|
||||
|
@ -143,7 +143,7 @@ func TestSetVariables(t *testing.T) {
|
|||
var mc mockSetVariableClient
|
||||
|
||||
cmd := ZeebeCommand{logger: testLogger, client: &mc}
|
||||
_, err = cmd.Invoke(req)
|
||||
_, err = cmd.Invoke(context.TODO(), req)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, *payload.ElementInstanceKey, mc.cmd1.elementInstanceKey)
|
||||
|
|
|
@ -87,7 +87,7 @@ func TestThrowError(t *testing.T) {
|
|||
t.Run("jobKey is mandatory", func(t *testing.T) {
|
||||
cmd := ZeebeCommand{logger: testLogger}
|
||||
req := &bindings.InvokeRequest{Operation: ThrowErrorOperation}
|
||||
_, err := cmd.Invoke(req)
|
||||
_, err := cmd.Invoke(context.TODO(), req)
|
||||
assert.Error(t, err, ErrMissingJobKey)
|
||||
})
|
||||
|
||||
|
@ -100,7 +100,7 @@ func TestThrowError(t *testing.T) {
|
|||
|
||||
cmd := ZeebeCommand{logger: testLogger}
|
||||
req := &bindings.InvokeRequest{Data: data, Operation: ThrowErrorOperation}
|
||||
_, err = cmd.Invoke(req)
|
||||
_, err = cmd.Invoke(context.TODO(), req)
|
||||
assert.Error(t, err, ErrMissingErrorCode)
|
||||
})
|
||||
|
||||
|
@ -118,7 +118,7 @@ func TestThrowError(t *testing.T) {
|
|||
var mc mockThrowErrorClient
|
||||
|
||||
cmd := ZeebeCommand{logger: testLogger, client: &mc}
|
||||
_, err = cmd.Invoke(req)
|
||||
_, err = cmd.Invoke(context.TODO(), req)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, *payload.JobKey, mc.cmd1.jobKey)
|
||||
|
|
|
@ -21,8 +21,8 @@ import (
|
|||
"github.com/dapr/components-contrib/bindings"
|
||||
)
|
||||
|
||||
func (z *ZeebeCommand) topology() (*bindings.InvokeResponse, error) {
|
||||
response, err := z.client.NewTopologyCommand().Send(context.Background())
|
||||
func (z *ZeebeCommand) topology(ctx context.Context) (*bindings.InvokeResponse, error) {
|
||||
response, err := z.client.NewTopologyCommand().Send(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot get zeebe toplogy: %w", err)
|
||||
}
|
||||
|
|
|
@ -28,7 +28,7 @@ type updateJobRetriesPayload struct {
|
|||
Retries *int32 `json:"retries"`
|
||||
}
|
||||
|
||||
func (z *ZeebeCommand) updateJobRetries(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
|
||||
func (z *ZeebeCommand) updateJobRetries(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
|
||||
var payload updateJobRetriesPayload
|
||||
err := json.Unmarshal(req.Data, &payload)
|
||||
if err != nil {
|
||||
|
@ -45,7 +45,7 @@ func (z *ZeebeCommand) updateJobRetries(req *bindings.InvokeRequest) (*bindings.
|
|||
cmd2 = cmd1.Retries(*payload.Retries)
|
||||
}
|
||||
|
||||
_, err = cmd2.Send(context.Background())
|
||||
_, err = cmd2.Send(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot uodate job retries for key %d: %w", payload.JobKey, err)
|
||||
}
|
||||
|
|
|
@ -73,7 +73,7 @@ func TestUpdateJobRetries(t *testing.T) {
|
|||
t.Run("jobKey is mandatory", func(t *testing.T) {
|
||||
cmd := ZeebeCommand{logger: testLogger}
|
||||
req := &bindings.InvokeRequest{Operation: UpdateJobRetriesOperation}
|
||||
_, err := cmd.Invoke(req)
|
||||
_, err := cmd.Invoke(context.TODO(), req)
|
||||
assert.Error(t, err, ErrMissingJobKey)
|
||||
})
|
||||
|
||||
|
@ -90,7 +90,7 @@ func TestUpdateJobRetries(t *testing.T) {
|
|||
var mc mockUpdateJobRetriesClient
|
||||
|
||||
cmd := ZeebeCommand{logger: testLogger, client: &mc}
|
||||
_, err = cmd.Invoke(req)
|
||||
_, err = cmd.Invoke(context.TODO(), req)
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, *payload.JobKey, mc.cmd1.jobKey)
|
||||
|
|
|
@ -58,7 +58,7 @@ type jobWorkerMetadata struct {
|
|||
}
|
||||
|
||||
type jobHandler struct {
|
||||
callback func(*bindings.ReadResponse) ([]byte, error)
|
||||
callback func(context.Context, *bindings.ReadResponse) ([]byte, error)
|
||||
logger logger.Logger
|
||||
}
|
||||
|
||||
|
@ -89,7 +89,7 @@ func (z *ZeebeJobWorker) Init(metadata bindings.Metadata) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (z *ZeebeJobWorker) Read(handler func(*bindings.ReadResponse) ([]byte, error)) error {
|
||||
func (z *ZeebeJobWorker) Read(handler func(context.Context, *bindings.ReadResponse) ([]byte, error)) error {
|
||||
h := jobHandler{
|
||||
callback: handler,
|
||||
logger: z.logger,
|
||||
|
@ -174,7 +174,7 @@ func (h *jobHandler) handleJob(client worker.JobClient, job entities.Job) {
|
|||
headers["X-Zeebe-Retries"] = strconv.FormatInt(int64(job.Retries), 10)
|
||||
headers["X-Zeebe-Deadline"] = strconv.FormatInt(job.Deadline, 10)
|
||||
|
||||
resultVariables, err := h.callback(&bindings.ReadResponse{
|
||||
resultVariables, err := h.callback(context.TODO(), &bindings.ReadResponse{
|
||||
Data: []byte(job.Variables),
|
||||
Metadata: headers,
|
||||
})
|
||||
|
|
|
@ -14,6 +14,7 @@ limitations under the License.
|
|||
package zeebe
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
@ -190,7 +191,7 @@ func DeployProcess(
|
|||
|
||||
metadata := map[string]string{"fileName": fileName}
|
||||
req := &bindings.InvokeRequest{Data: data, Metadata: metadata, Operation: command.DeployProcessOperation}
|
||||
res, err := cmd.Invoke(req)
|
||||
res, err := cmd.Invoke(context.TODO(), req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -228,7 +229,7 @@ func CreateProcessInstance(
|
|||
}
|
||||
|
||||
req := &bindings.InvokeRequest{Data: data, Operation: command.CreateInstanceOperation}
|
||||
res, err := cmd.Invoke(req)
|
||||
res, err := cmd.Invoke(context.TODO(), req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -252,7 +253,7 @@ func ActicateJob(
|
|||
}
|
||||
|
||||
req := &bindings.InvokeRequest{Data: data, Operation: command.ActivateJobsOperation}
|
||||
res, err := cmd.Invoke(req)
|
||||
res, err := cmd.Invoke(context.TODO(), req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -300,7 +301,7 @@ func CalcWorker(request *bindings.ReadResponse) ([]byte, error) {
|
|||
func InitTestProcess(
|
||||
cmd *command.ZeebeCommand,
|
||||
id string,
|
||||
testWorker func(*bindings.ReadResponse) ([]byte, error),
|
||||
testWorker func(context.Context, *bindings.ReadResponse) ([]byte, error),
|
||||
additionalMetadata ...MetadataPair,
|
||||
) error {
|
||||
testJobType := id + "-test"
|
||||
|
|
Loading…
Reference in New Issue