diff --git a/bindings/rabbitmq/rabbitmq.go b/bindings/rabbitmq/rabbitmq.go index eb3f1002f..274b216da 100644 --- a/bindings/rabbitmq/rabbitmq.go +++ b/bindings/rabbitmq/rabbitmq.go @@ -236,7 +236,7 @@ func (r *RabbitMQ) declareQueue() (amqp.Queue, error) { return r.channel.QueueDeclare(r.metadata.QueueName, r.metadata.Durable, r.metadata.DeleteWhenUnused, r.metadata.Exclusive, false, args) } -func (r *RabbitMQ) Read(handler func(*bindings.ReadResponse) ([]byte, error)) error { +func (r *RabbitMQ) Read(handler func(context.Context, *bindings.ReadResponse) ([]byte, error)) error { msgs, err := r.channel.Consume( r.queue.Name, "", @@ -254,7 +254,7 @@ func (r *RabbitMQ) Read(handler func(*bindings.ReadResponse) ([]byte, error)) er go func() { for d := range msgs { - _, err := handler(&bindings.ReadResponse{ + _, err := handler(context.TODO(), &bindings.ReadResponse{ Data: d.Body, }) if err == nil { diff --git a/bindings/zeebe/command/activate_jobs.go b/bindings/zeebe/command/activate_jobs.go index b8b446a9a..686b65363 100644 --- a/bindings/zeebe/command/activate_jobs.go +++ b/bindings/zeebe/command/activate_jobs.go @@ -37,7 +37,7 @@ type activateJobsPayload struct { FetchVariables []string `json:"fetchVariables"` } -func (z *ZeebeCommand) activateJobs(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { +func (z *ZeebeCommand) activateJobs(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { var payload activateJobsPayload err := json.Unmarshal(req.Data, &payload) if err != nil { @@ -68,7 +68,6 @@ func (z *ZeebeCommand) activateJobs(req *bindings.InvokeRequest) (*bindings.Invo cmd = cmd.FetchVariables(payload.FetchVariables...) } - ctx := context.Background() response, err := cmd.Send(ctx) if err != nil { return nil, fmt.Errorf("cannot activate jobs for type %s: %w", payload.JobType, err) diff --git a/bindings/zeebe/command/activate_jobs_test.go b/bindings/zeebe/command/activate_jobs_test.go index 5ef81e5f2..c85242204 100644 --- a/bindings/zeebe/command/activate_jobs_test.go +++ b/bindings/zeebe/command/activate_jobs_test.go @@ -103,7 +103,7 @@ func TestActivateJobs(t *testing.T) { t.Run("jobType is mandatory", func(t *testing.T) { cmd := ZeebeCommand{logger: testLogger} req := &bindings.InvokeRequest{Operation: ActivateJobsOperation} - _, err := cmd.Invoke(req) + _, err := cmd.Invoke(context.TODO(), req) assert.Error(t, err, ErrMissingJobType) }) @@ -116,7 +116,7 @@ func TestActivateJobs(t *testing.T) { cmd := ZeebeCommand{logger: testLogger} req := &bindings.InvokeRequest{Data: data, Operation: ActivateJobsOperation} - _, err = cmd.Invoke(req) + _, err = cmd.Invoke(context.TODO(), req) assert.Error(t, err, ErrMissingMaxJobsToActivate) }) @@ -133,7 +133,7 @@ func TestActivateJobs(t *testing.T) { var mc mockActivateJobsClient cmd := ZeebeCommand{logger: testLogger, client: &mc} - _, err = cmd.Invoke(req) + _, err = cmd.Invoke(context.TODO(), req) assert.NoError(t, err) assert.Equal(t, payload.JobType, mc.cmd1.jobType) @@ -156,7 +156,7 @@ func TestActivateJobs(t *testing.T) { var mc mockActivateJobsClient cmd := ZeebeCommand{logger: testLogger, client: &mc} - _, err = cmd.Invoke(req) + _, err = cmd.Invoke(context.TODO(), req) assert.NoError(t, err) assert.Equal(t, payload.JobType, mc.cmd1.jobType) diff --git a/bindings/zeebe/command/cancel_instance.go b/bindings/zeebe/command/cancel_instance.go index e2b81567f..cae6b76d9 100644 --- a/bindings/zeebe/command/cancel_instance.go +++ b/bindings/zeebe/command/cancel_instance.go @@ -28,7 +28,7 @@ type cancelInstancePayload struct { ProcessInstanceKey *int64 `json:"processInstanceKey"` } -func (z *ZeebeCommand) cancelInstance(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { +func (z *ZeebeCommand) cancelInstance(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { var payload cancelInstancePayload err := json.Unmarshal(req.Data, &payload) if err != nil { @@ -41,7 +41,7 @@ func (z *ZeebeCommand) cancelInstance(req *bindings.InvokeRequest) (*bindings.In _, err = z.client.NewCancelInstanceCommand(). ProcessInstanceKey(*payload.ProcessInstanceKey). - Send(context.Background()) + Send(ctx) if err != nil { return nil, fmt.Errorf("cannot cancel instance for process instance key %d: %w", payload.ProcessInstanceKey, err) } diff --git a/bindings/zeebe/command/cancel_instance_test.go b/bindings/zeebe/command/cancel_instance_test.go index 4ba85848e..e380d4f15 100644 --- a/bindings/zeebe/command/cancel_instance_test.go +++ b/bindings/zeebe/command/cancel_instance_test.go @@ -66,7 +66,7 @@ func TestCancelInstance(t *testing.T) { t.Run("processInstanceKey is mandatory", func(t *testing.T) { cmd := ZeebeCommand{logger: testLogger} req := &bindings.InvokeRequest{Operation: CancelInstanceOperation} - _, err := cmd.Invoke(req) + _, err := cmd.Invoke(context.TODO(), req) assert.Error(t, err, ErrMissingProcessInstanceKey) }) @@ -82,7 +82,7 @@ func TestCancelInstance(t *testing.T) { var mc mockCancelInstanceClient cmd := ZeebeCommand{logger: testLogger, client: &mc} - _, err = cmd.Invoke(req) + _, err = cmd.Invoke(context.TODO(), req) assert.NoError(t, err) assert.Equal(t, *payload.ProcessInstanceKey, mc.cmd1.cmd2.processInstanceKey) diff --git a/bindings/zeebe/command/command.go b/bindings/zeebe/command/command.go index 1e54e73ae..69ae8be59 100644 --- a/bindings/zeebe/command/command.go +++ b/bindings/zeebe/command/command.go @@ -14,6 +14,7 @@ limitations under the License. package command import ( + "context" "errors" "fmt" @@ -88,30 +89,30 @@ func (z *ZeebeCommand) Operations() []bindings.OperationKind { } } -func (z *ZeebeCommand) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { +func (z *ZeebeCommand) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { switch req.Operation { case TopologyOperation: - return z.topology() + return z.topology(ctx) case DeployProcessOperation: - return z.deployProcess(req) + return z.deployProcess(ctx, req) case CreateInstanceOperation: - return z.createInstance(req) + return z.createInstance(ctx, req) case CancelInstanceOperation: - return z.cancelInstance(req) + return z.cancelInstance(ctx, req) case SetVariablesOperation: - return z.setVariables(req) + return z.setVariables(ctx, req) case ResolveIncidentOperation: - return z.resolveIncident(req) + return z.resolveIncident(ctx, req) case PublishMessageOperation: - return z.publishMessage(req) + return z.publishMessage(ctx, req) case ActivateJobsOperation: - return z.activateJobs(req) + return z.activateJobs(ctx, req) case CompleteJobOperation: - return z.completeJob(req) + return z.completeJob(ctx, req) case FailJobOperation: - return z.failJob(req) + return z.failJob(ctx, req) case UpdateJobRetriesOperation: - return z.updateJobRetries(req) + return z.updateJobRetries(ctx, req) case ThrowErrorOperation: return z.throwError(req) case bindings.GetOperation: diff --git a/bindings/zeebe/command/command_test.go b/bindings/zeebe/command/command_test.go index 57bf34928..25239f521 100644 --- a/bindings/zeebe/command/command_test.go +++ b/bindings/zeebe/command/command_test.go @@ -14,6 +14,7 @@ limitations under the License. package command import ( + "context" "errors" "testing" @@ -84,7 +85,7 @@ func TestInvoke(t *testing.T) { t.Run("operation must be supported", func(t *testing.T) { cmd := ZeebeCommand{logger: testLogger} req := &bindings.InvokeRequest{Operation: bindings.DeleteOperation} - _, err := cmd.Invoke(req) + _, err := cmd.Invoke(context.TODO(), req) assert.Error(t, err, ErrUnsupportedOperation(bindings.DeleteOperation)) }) } diff --git a/bindings/zeebe/command/complete_job.go b/bindings/zeebe/command/complete_job.go index 4e71b80b9..6d8920392 100644 --- a/bindings/zeebe/command/complete_job.go +++ b/bindings/zeebe/command/complete_job.go @@ -28,7 +28,7 @@ type completeJobPayload struct { Variables interface{} `json:"variables"` } -func (z *ZeebeCommand) completeJob(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { +func (z *ZeebeCommand) completeJob(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { var payload completeJobPayload err := json.Unmarshal(req.Data, &payload) if err != nil { @@ -49,7 +49,7 @@ func (z *ZeebeCommand) completeJob(req *bindings.InvokeRequest) (*bindings.Invok } } - _, err = cmd3.Send(context.Background()) + _, err = cmd3.Send(ctx) if err != nil { return nil, fmt.Errorf("cannot complete job for key %d: %w", payload.JobKey, err) } diff --git a/bindings/zeebe/command/complete_job_test.go b/bindings/zeebe/command/complete_job_test.go index e6e2bbae7..20c0c8734 100644 --- a/bindings/zeebe/command/complete_job_test.go +++ b/bindings/zeebe/command/complete_job_test.go @@ -80,7 +80,7 @@ func TestCompleteJob(t *testing.T) { t.Run("elementInstanceKey is mandatory", func(t *testing.T) { cmd := ZeebeCommand{logger: testLogger} req := &bindings.InvokeRequest{Operation: CompleteJobOperation} - _, err := cmd.Invoke(req) + _, err := cmd.Invoke(context.TODO(), req) assert.Error(t, err, ErrMissingJobKey) }) @@ -99,7 +99,7 @@ func TestCompleteJob(t *testing.T) { var mc mockCompleteJobClient cmd := ZeebeCommand{logger: testLogger, client: &mc} - _, err = cmd.Invoke(req) + _, err = cmd.Invoke(context.TODO(), req) assert.NoError(t, err) assert.Equal(t, *payload.JobKey, mc.cmd1.jobKey) diff --git a/bindings/zeebe/command/create_instance.go b/bindings/zeebe/command/create_instance.go index 9b985fab1..632924f2f 100644 --- a/bindings/zeebe/command/create_instance.go +++ b/bindings/zeebe/command/create_instance.go @@ -36,7 +36,7 @@ type createInstancePayload struct { Variables interface{} `json:"variables"` } -func (z *ZeebeCommand) createInstance(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { +func (z *ZeebeCommand) createInstance(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { var payload createInstancePayload err := json.Unmarshal(req.Data, &payload) if err != nil { @@ -75,7 +75,7 @@ func (z *ZeebeCommand) createInstance(req *bindings.InvokeRequest) (*bindings.In } } - response, err := cmd3.Send(context.Background()) + response, err := cmd3.Send(ctx) if err != nil { return nil, fmt.Errorf("cannot create instane for %s: %w", errorDetail, err) } diff --git a/bindings/zeebe/command/create_instance_test.go b/bindings/zeebe/command/create_instance_test.go index 0409d0569..5e760fb7e 100644 --- a/bindings/zeebe/command/create_instance_test.go +++ b/bindings/zeebe/command/create_instance_test.go @@ -112,7 +112,7 @@ func TestCreateInstance(t *testing.T) { var mc mockCreateInstanceClient cmd := ZeebeCommand{logger: testLogger, client: &mc} - _, err = cmd.Invoke(req) + _, err = cmd.Invoke(context.TODO(), req) assert.Error(t, err, ErrAmbiguousCreationVars) }) @@ -126,7 +126,7 @@ func TestCreateInstance(t *testing.T) { var mc mockCreateInstanceClient cmd := ZeebeCommand{logger: testLogger, client: &mc} - _, err = cmd.Invoke(req) + _, err = cmd.Invoke(context.TODO(), req) assert.Error(t, err, ErrMissingCreationVars) }) @@ -143,7 +143,7 @@ func TestCreateInstance(t *testing.T) { var mc mockCreateInstanceClient cmd := ZeebeCommand{logger: testLogger, client: &mc} - _, err = cmd.Invoke(req) + _, err = cmd.Invoke(context.TODO(), req) assert.NoError(t, err) assert.Equal(t, payload.BpmnProcessID, mc.cmd1.bpmnProcessID) @@ -162,7 +162,7 @@ func TestCreateInstance(t *testing.T) { var mc mockCreateInstanceClient cmd := ZeebeCommand{logger: testLogger, client: &mc} - _, err = cmd.Invoke(req) + _, err = cmd.Invoke(context.TODO(), req) assert.NoError(t, err) assert.Equal(t, payload.BpmnProcessID, mc.cmd1.bpmnProcessID) @@ -181,7 +181,7 @@ func TestCreateInstance(t *testing.T) { var mc mockCreateInstanceClient cmd := ZeebeCommand{logger: testLogger, client: &mc} - _, err = cmd.Invoke(req) + _, err = cmd.Invoke(context.TODO(), req) assert.NoError(t, err) assert.Equal(t, *payload.ProcessDefinitionKey, mc.cmd1.processDefinitionKey) @@ -202,7 +202,7 @@ func TestCreateInstance(t *testing.T) { var mc mockCreateInstanceClient cmd := ZeebeCommand{logger: testLogger, client: &mc} - _, err = cmd.Invoke(req) + _, err = cmd.Invoke(context.TODO(), req) assert.NoError(t, err) assert.Equal(t, *payload.ProcessDefinitionKey, mc.cmd1.processDefinitionKey) diff --git a/bindings/zeebe/command/deploy_process.go b/bindings/zeebe/command/deploy_process.go index 20fb9796d..9aea21da8 100644 --- a/bindings/zeebe/command/deploy_process.go +++ b/bindings/zeebe/command/deploy_process.go @@ -29,7 +29,7 @@ const ( var ErrMissingFileName = errors.New("fileName is a required attribute") -func (z *ZeebeCommand) deployProcess(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { +func (z *ZeebeCommand) deployProcess(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { var deployFileName string if val, ok := req.Metadata[fileName]; ok && val != "" { @@ -40,7 +40,7 @@ func (z *ZeebeCommand) deployProcess(req *bindings.InvokeRequest) (*bindings.Inv response, err := z.client.NewDeployProcessCommand(). AddResource(req.Data, deployFileName). - Send(context.Background()) + Send(ctx) if err != nil { return nil, fmt.Errorf("cannot deploy process with fileName %s: %w", deployFileName, err) } diff --git a/bindings/zeebe/command/deploy_process_test.go b/bindings/zeebe/command/deploy_process_test.go index e92c47f85..5b479ce5f 100644 --- a/bindings/zeebe/command/deploy_process_test.go +++ b/bindings/zeebe/command/deploy_process_test.go @@ -14,6 +14,7 @@ limitations under the License. package command import ( + "context" "testing" "github.com/stretchr/testify/assert" @@ -28,7 +29,7 @@ func TestDeployProcess(t *testing.T) { t.Run("fileName is mandatory", func(t *testing.T) { cmd := ZeebeCommand{logger: testLogger} req := &bindings.InvokeRequest{Operation: DeployProcessOperation} - _, err := cmd.Invoke(req) + _, err := cmd.Invoke(context.TODO(), req) assert.Error(t, err, ErrMissingFileName) }) } diff --git a/bindings/zeebe/command/fail_job.go b/bindings/zeebe/command/fail_job.go index a2140a4c5..3220b3dea 100644 --- a/bindings/zeebe/command/fail_job.go +++ b/bindings/zeebe/command/fail_job.go @@ -30,7 +30,7 @@ type failJobPayload struct { ErrorMessage string `json:"errorMessage"` } -func (z *ZeebeCommand) failJob(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { +func (z *ZeebeCommand) failJob(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { var payload failJobPayload err := json.Unmarshal(req.Data, &payload) if err != nil { @@ -53,7 +53,7 @@ func (z *ZeebeCommand) failJob(req *bindings.InvokeRequest) (*bindings.InvokeRes cmd = cmd.ErrorMessage(payload.ErrorMessage) } - _, err = cmd.Send(context.Background()) + _, err = cmd.Send(ctx) if err != nil { return nil, fmt.Errorf("cannot fail job for key %d: %w", payload.JobKey, err) } diff --git a/bindings/zeebe/command/fail_job_test.go b/bindings/zeebe/command/fail_job_test.go index 5f701880f..1b4c19543 100644 --- a/bindings/zeebe/command/fail_job_test.go +++ b/bindings/zeebe/command/fail_job_test.go @@ -87,7 +87,7 @@ func TestFailJob(t *testing.T) { t.Run("jobKey is mandatory", func(t *testing.T) { cmd := ZeebeCommand{logger: testLogger} req := &bindings.InvokeRequest{Operation: FailJobOperation} - _, err := cmd.Invoke(req) + _, err := cmd.Invoke(context.TODO(), req) assert.Error(t, err, ErrMissingJobKey) }) @@ -100,7 +100,7 @@ func TestFailJob(t *testing.T) { cmd := ZeebeCommand{logger: testLogger} req := &bindings.InvokeRequest{Data: data, Operation: FailJobOperation} - _, err = cmd.Invoke(req) + _, err = cmd.Invoke(context.TODO(), req) assert.Error(t, err, ErrMissingRetries) }) @@ -118,7 +118,7 @@ func TestFailJob(t *testing.T) { var mc mockFailJobClient cmd := ZeebeCommand{logger: testLogger, client: &mc} - _, err = cmd.Invoke(req) + _, err = cmd.Invoke(context.TODO(), req) assert.NoError(t, err) assert.Equal(t, *payload.JobKey, mc.cmd1.jobKey) diff --git a/bindings/zeebe/command/publish_message.go b/bindings/zeebe/command/publish_message.go index 0e1d035d1..cf6e61fe2 100644 --- a/bindings/zeebe/command/publish_message.go +++ b/bindings/zeebe/command/publish_message.go @@ -34,7 +34,7 @@ type publishMessagePayload struct { Variables interface{} `json:"variables"` } -func (z *ZeebeCommand) publishMessage(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { +func (z *ZeebeCommand) publishMessage(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { var payload publishMessagePayload err := json.Unmarshal(req.Data, &payload) if err != nil { @@ -64,7 +64,6 @@ func (z *ZeebeCommand) publishMessage(req *bindings.InvokeRequest) (*bindings.In } } - ctx := context.Background() response, err := cmd.Send(ctx) if err != nil { return nil, fmt.Errorf("cannot publish message with name %s: %w", payload.MessageName, err) diff --git a/bindings/zeebe/command/publish_message_test.go b/bindings/zeebe/command/publish_message_test.go index c17f63c79..c9fa860a1 100644 --- a/bindings/zeebe/command/publish_message_test.go +++ b/bindings/zeebe/command/publish_message_test.go @@ -104,7 +104,7 @@ func TestPublishMessage(t *testing.T) { t.Run("messageName is mandatory", func(t *testing.T) { cmd := ZeebeCommand{logger: testLogger} req := &bindings.InvokeRequest{Operation: PublishMessageOperation} - _, err := cmd.Invoke(req) + _, err := cmd.Invoke(context.TODO(), req) assert.Error(t, err, ErrMissingMessageName) }) @@ -120,7 +120,7 @@ func TestPublishMessage(t *testing.T) { var mc mockPublishMessageClient cmd := ZeebeCommand{logger: testLogger, client: &mc} - _, err = cmd.Invoke(req) + _, err = cmd.Invoke(context.TODO(), req) assert.NoError(t, err) assert.Equal(t, payload.MessageName, mc.cmd1.messageName) @@ -145,7 +145,7 @@ func TestPublishMessage(t *testing.T) { var mc mockPublishMessageClient cmd := ZeebeCommand{logger: testLogger, client: &mc} - _, err = cmd.Invoke(req) + _, err = cmd.Invoke(context.TODO(), req) assert.NoError(t, err) assert.Equal(t, payload.MessageName, mc.cmd1.messageName) diff --git a/bindings/zeebe/command/resolve_incident.go b/bindings/zeebe/command/resolve_incident.go index 9c737c539..a4cdbeb30 100644 --- a/bindings/zeebe/command/resolve_incident.go +++ b/bindings/zeebe/command/resolve_incident.go @@ -28,7 +28,7 @@ type resolveIncidentPayload struct { IncidentKey *int64 `json:"incidentKey"` } -func (z *ZeebeCommand) resolveIncident(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { +func (z *ZeebeCommand) resolveIncident(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { var payload resolveIncidentPayload err := json.Unmarshal(req.Data, &payload) if err != nil { @@ -41,7 +41,7 @@ func (z *ZeebeCommand) resolveIncident(req *bindings.InvokeRequest) (*bindings.I _, err = z.client.NewResolveIncidentCommand(). IncidentKey(*payload.IncidentKey). - Send(context.Background()) + Send(ctx) if err != nil { return nil, fmt.Errorf("cannot resolve incident for key %d: %w", payload.IncidentKey, err) } diff --git a/bindings/zeebe/command/resolve_incident_test.go b/bindings/zeebe/command/resolve_incident_test.go index 786198994..893918382 100644 --- a/bindings/zeebe/command/resolve_incident_test.go +++ b/bindings/zeebe/command/resolve_incident_test.go @@ -66,7 +66,7 @@ func TestResolveIncident(t *testing.T) { t.Run("incidentKey is mandatory", func(t *testing.T) { cmd := ZeebeCommand{logger: testLogger} req := &bindings.InvokeRequest{Operation: ResolveIncidentOperation} - _, err := cmd.Invoke(req) + _, err := cmd.Invoke(context.TODO(), req) assert.Error(t, err, ErrMissingIncidentKey) }) @@ -82,7 +82,7 @@ func TestResolveIncident(t *testing.T) { var mc mockResolveIncidentClient cmd := ZeebeCommand{logger: testLogger, client: &mc} - _, err = cmd.Invoke(req) + _, err = cmd.Invoke(context.TODO(), req) assert.NoError(t, err) assert.Equal(t, *payload.IncidentKey, mc.cmd1.incidentKey) diff --git a/bindings/zeebe/command/set_variables.go b/bindings/zeebe/command/set_variables.go index ebc078dd2..b186972aa 100644 --- a/bindings/zeebe/command/set_variables.go +++ b/bindings/zeebe/command/set_variables.go @@ -33,7 +33,7 @@ type setVariablesPayload struct { Variables interface{} `json:"variables"` } -func (z *ZeebeCommand) setVariables(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { +func (z *ZeebeCommand) setVariables(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { var payload setVariablesPayload err := json.Unmarshal(req.Data, &payload) if err != nil { @@ -55,7 +55,7 @@ func (z *ZeebeCommand) setVariables(req *bindings.InvokeRequest) (*bindings.Invo return nil, err } - response, err := cmd.Local(payload.Local).Send(context.Background()) + response, err := cmd.Local(payload.Local).Send(ctx) if err != nil { return nil, fmt.Errorf("cannot set variables for element instance key %d: %w", payload.ElementInstanceKey, err) } diff --git a/bindings/zeebe/command/set_variables_test.go b/bindings/zeebe/command/set_variables_test.go index 7019b90b5..863766f1b 100644 --- a/bindings/zeebe/command/set_variables_test.go +++ b/bindings/zeebe/command/set_variables_test.go @@ -87,7 +87,7 @@ func TestSetVariables(t *testing.T) { t.Run("elementInstanceKey is mandatory", func(t *testing.T) { cmd := ZeebeCommand{logger: testLogger} req := &bindings.InvokeRequest{Operation: SetVariablesOperation} - _, err := cmd.Invoke(req) + _, err := cmd.Invoke(context.TODO(), req) assert.Error(t, err, ErrMissingElementInstanceKey) }) @@ -100,7 +100,7 @@ func TestSetVariables(t *testing.T) { cmd := ZeebeCommand{logger: testLogger} req := &bindings.InvokeRequest{Data: data, Operation: SetVariablesOperation} - _, err = cmd.Invoke(req) + _, err = cmd.Invoke(context.TODO(), req) assert.Error(t, err, ErrMissingVariables) }) @@ -119,7 +119,7 @@ func TestSetVariables(t *testing.T) { var mc mockSetVariableClient cmd := ZeebeCommand{logger: testLogger, client: &mc} - _, err = cmd.Invoke(req) + _, err = cmd.Invoke(context.TODO(), req) assert.NoError(t, err) assert.Equal(t, *payload.ElementInstanceKey, mc.cmd1.elementInstanceKey) @@ -143,7 +143,7 @@ func TestSetVariables(t *testing.T) { var mc mockSetVariableClient cmd := ZeebeCommand{logger: testLogger, client: &mc} - _, err = cmd.Invoke(req) + _, err = cmd.Invoke(context.TODO(), req) assert.NoError(t, err) assert.Equal(t, *payload.ElementInstanceKey, mc.cmd1.elementInstanceKey) diff --git a/bindings/zeebe/command/throw_error_test.go b/bindings/zeebe/command/throw_error_test.go index a03143163..3c9e9e709 100644 --- a/bindings/zeebe/command/throw_error_test.go +++ b/bindings/zeebe/command/throw_error_test.go @@ -87,7 +87,7 @@ func TestThrowError(t *testing.T) { t.Run("jobKey is mandatory", func(t *testing.T) { cmd := ZeebeCommand{logger: testLogger} req := &bindings.InvokeRequest{Operation: ThrowErrorOperation} - _, err := cmd.Invoke(req) + _, err := cmd.Invoke(context.TODO(), req) assert.Error(t, err, ErrMissingJobKey) }) @@ -100,7 +100,7 @@ func TestThrowError(t *testing.T) { cmd := ZeebeCommand{logger: testLogger} req := &bindings.InvokeRequest{Data: data, Operation: ThrowErrorOperation} - _, err = cmd.Invoke(req) + _, err = cmd.Invoke(context.TODO(), req) assert.Error(t, err, ErrMissingErrorCode) }) @@ -118,7 +118,7 @@ func TestThrowError(t *testing.T) { var mc mockThrowErrorClient cmd := ZeebeCommand{logger: testLogger, client: &mc} - _, err = cmd.Invoke(req) + _, err = cmd.Invoke(context.TODO(), req) assert.NoError(t, err) assert.Equal(t, *payload.JobKey, mc.cmd1.jobKey) diff --git a/bindings/zeebe/command/topology.go b/bindings/zeebe/command/topology.go index b7b954039..beece6667 100644 --- a/bindings/zeebe/command/topology.go +++ b/bindings/zeebe/command/topology.go @@ -21,8 +21,8 @@ import ( "github.com/dapr/components-contrib/bindings" ) -func (z *ZeebeCommand) topology() (*bindings.InvokeResponse, error) { - response, err := z.client.NewTopologyCommand().Send(context.Background()) +func (z *ZeebeCommand) topology(ctx context.Context) (*bindings.InvokeResponse, error) { + response, err := z.client.NewTopologyCommand().Send(ctx) if err != nil { return nil, fmt.Errorf("cannot get zeebe toplogy: %w", err) } diff --git a/bindings/zeebe/command/update_job_retries.go b/bindings/zeebe/command/update_job_retries.go index 38991666c..0f4650db3 100644 --- a/bindings/zeebe/command/update_job_retries.go +++ b/bindings/zeebe/command/update_job_retries.go @@ -28,7 +28,7 @@ type updateJobRetriesPayload struct { Retries *int32 `json:"retries"` } -func (z *ZeebeCommand) updateJobRetries(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { +func (z *ZeebeCommand) updateJobRetries(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { var payload updateJobRetriesPayload err := json.Unmarshal(req.Data, &payload) if err != nil { @@ -45,7 +45,7 @@ func (z *ZeebeCommand) updateJobRetries(req *bindings.InvokeRequest) (*bindings. cmd2 = cmd1.Retries(*payload.Retries) } - _, err = cmd2.Send(context.Background()) + _, err = cmd2.Send(ctx) if err != nil { return nil, fmt.Errorf("cannot uodate job retries for key %d: %w", payload.JobKey, err) } diff --git a/bindings/zeebe/command/update_job_retries_test.go b/bindings/zeebe/command/update_job_retries_test.go index 3ce4cbbb4..a01f5bf48 100644 --- a/bindings/zeebe/command/update_job_retries_test.go +++ b/bindings/zeebe/command/update_job_retries_test.go @@ -73,7 +73,7 @@ func TestUpdateJobRetries(t *testing.T) { t.Run("jobKey is mandatory", func(t *testing.T) { cmd := ZeebeCommand{logger: testLogger} req := &bindings.InvokeRequest{Operation: UpdateJobRetriesOperation} - _, err := cmd.Invoke(req) + _, err := cmd.Invoke(context.TODO(), req) assert.Error(t, err, ErrMissingJobKey) }) @@ -90,7 +90,7 @@ func TestUpdateJobRetries(t *testing.T) { var mc mockUpdateJobRetriesClient cmd := ZeebeCommand{logger: testLogger, client: &mc} - _, err = cmd.Invoke(req) + _, err = cmd.Invoke(context.TODO(), req) assert.NoError(t, err) assert.Equal(t, *payload.JobKey, mc.cmd1.jobKey) diff --git a/bindings/zeebe/jobworker/jobworker.go b/bindings/zeebe/jobworker/jobworker.go index 945566c72..5ced3ab19 100644 --- a/bindings/zeebe/jobworker/jobworker.go +++ b/bindings/zeebe/jobworker/jobworker.go @@ -58,7 +58,7 @@ type jobWorkerMetadata struct { } type jobHandler struct { - callback func(*bindings.ReadResponse) ([]byte, error) + callback func(context.Context, *bindings.ReadResponse) ([]byte, error) logger logger.Logger } @@ -89,7 +89,7 @@ func (z *ZeebeJobWorker) Init(metadata bindings.Metadata) error { return nil } -func (z *ZeebeJobWorker) Read(handler func(*bindings.ReadResponse) ([]byte, error)) error { +func (z *ZeebeJobWorker) Read(handler func(context.Context, *bindings.ReadResponse) ([]byte, error)) error { h := jobHandler{ callback: handler, logger: z.logger, @@ -174,7 +174,7 @@ func (h *jobHandler) handleJob(client worker.JobClient, job entities.Job) { headers["X-Zeebe-Retries"] = strconv.FormatInt(int64(job.Retries), 10) headers["X-Zeebe-Deadline"] = strconv.FormatInt(job.Deadline, 10) - resultVariables, err := h.callback(&bindings.ReadResponse{ + resultVariables, err := h.callback(context.TODO(), &bindings.ReadResponse{ Data: []byte(job.Variables), Metadata: headers, }) diff --git a/tests/e2e/bindings/zeebe/helper.go b/tests/e2e/bindings/zeebe/helper.go index 4e8e51dda..c67a286d9 100644 --- a/tests/e2e/bindings/zeebe/helper.go +++ b/tests/e2e/bindings/zeebe/helper.go @@ -14,6 +14,7 @@ limitations under the License. package zeebe import ( + "context" "encoding/json" "errors" "fmt" @@ -190,7 +191,7 @@ func DeployProcess( metadata := map[string]string{"fileName": fileName} req := &bindings.InvokeRequest{Data: data, Metadata: metadata, Operation: command.DeployProcessOperation} - res, err := cmd.Invoke(req) + res, err := cmd.Invoke(context.TODO(), req) if err != nil { return nil, err } @@ -228,7 +229,7 @@ func CreateProcessInstance( } req := &bindings.InvokeRequest{Data: data, Operation: command.CreateInstanceOperation} - res, err := cmd.Invoke(req) + res, err := cmd.Invoke(context.TODO(), req) if err != nil { return nil, err } @@ -252,7 +253,7 @@ func ActicateJob( } req := &bindings.InvokeRequest{Data: data, Operation: command.ActivateJobsOperation} - res, err := cmd.Invoke(req) + res, err := cmd.Invoke(context.TODO(), req) if err != nil { return nil, err } @@ -300,7 +301,7 @@ func CalcWorker(request *bindings.ReadResponse) ([]byte, error) { func InitTestProcess( cmd *command.ZeebeCommand, id string, - testWorker func(*bindings.ReadResponse) ([]byte, error), + testWorker func(context.Context, *bindings.ReadResponse) ([]byte, error), additionalMetadata ...MetadataPair, ) error { testJobType := id + "-test"