Implement bindings for the Zeebe workflow engine (#806)

* Implement bindings for the Zeebe workflow engine

* Fix PR suggestion regarding error handling

* Refactor cancel_instance test to not use a mocking library

* Refactor all tests to not use a mocking library

* Use error vars

* Fix linting error regarding ineffective error assignment

* Check for mandatory jobType

* Remove double method call

* Fix suggestion regarding mock client instantiation

* Check if either bpmnProcessId or workflowKey is given for workflow instantiation

* Changed logger module to dapr/kit. Misc tweaks.

* Fix go.mod

* Fixing some lint errors

* Ignoring linter false positive

Co-authored-by: Phil Kedy <phil.kedy@gmail.com>
This commit is contained in:
Christian Kaps 2021-04-29 02:23:11 +02:00 committed by GitHub
parent b180a43044
commit 83803f575d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 2769 additions and 13 deletions

81
bindings/zeebe/client.go Normal file
View File

@ -0,0 +1,81 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
package zeebe
import (
"encoding/json"
"errors"
"github.com/zeebe-io/zeebe/clients/go/pkg/zbc"
"github.com/dapr/components-contrib/bindings"
"github.com/dapr/components-contrib/metadata"
"github.com/dapr/kit/logger"
)
var (
ErrMissingGatewayAddr = errors.New("gatewayAddr is a required attribute")
)
// ClientFactory enables injection for testing
type ClientFactory interface {
Get(metadata bindings.Metadata) (zbc.Client, error)
}
type ClientFactoryImpl struct {
logger logger.Logger
}
// https://docs.zeebe.io/operations/authentication.html
type clientMetadata struct {
GatewayAddr string `json:"gatewayAddr"`
GatewayKeepAlive metadata.Duration `json:"gatewayKeepAlive"`
CaCertificatePath string `json:"caCertificatePath"`
UsePlaintextConnection bool `json:"usePlainTextConnection,string"`
}
// NewClientFactoryImpl returns a new ClientFactory instance
func NewClientFactoryImpl(logger logger.Logger) *ClientFactoryImpl {
return &ClientFactoryImpl{logger: logger}
}
func (c *ClientFactoryImpl) Get(metadata bindings.Metadata) (zbc.Client, error) {
meta, err := c.parseMetadata(metadata)
if err != nil {
return nil, err
}
client, err := zbc.NewClient(&zbc.ClientConfig{
GatewayAddress: meta.GatewayAddr,
UsePlaintextConnection: meta.UsePlaintextConnection,
CaCertificatePath: meta.CaCertificatePath,
KeepAlive: meta.GatewayKeepAlive.Duration,
})
if err != nil {
return nil, err
}
return client, nil
}
func (c *ClientFactoryImpl) parseMetadata(metadata bindings.Metadata) (*clientMetadata, error) {
b, err := json.Marshal(metadata.Properties)
if err != nil {
return nil, err
}
var m clientMetadata
err = json.Unmarshal(b, &m)
if err != nil {
return nil, err
}
if m.GatewayAddr == "" {
return nil, ErrMissingGatewayAddr
}
return &m, nil
}

View File

@ -0,0 +1,50 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
package zeebe
import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/dapr/components-contrib/bindings"
"github.com/dapr/kit/logger"
)
func TestParseMetadata(t *testing.T) {
m := bindings.Metadata{Properties: map[string]string{
"gatewayAddr": "172.0.0.1:1234",
"gatewayKeepAlive": "5s",
"caCertificatePath": "/cert/path",
"usePlaintextConnection": "true"}}
client := ClientFactoryImpl{logger: logger.NewLogger("test")}
meta, err := client.parseMetadata(m)
assert.NoError(t, err)
assert.Equal(t, "172.0.0.1:1234", meta.GatewayAddr)
assert.Equal(t, 5*time.Second, meta.GatewayKeepAlive.Duration)
assert.Equal(t, "/cert/path", meta.CaCertificatePath)
assert.Equal(t, true, meta.UsePlaintextConnection)
}
func TestGatewayAddrMetadataIsMandatory(t *testing.T) {
m := bindings.Metadata{}
client := ClientFactoryImpl{logger: logger.NewLogger("test")}
meta, err := client.parseMetadata(m)
assert.Nil(t, meta)
assert.Error(t, err)
assert.Equal(t, err, ErrMissingGatewayAddr)
}
func TestParseMetadataDefaultValues(t *testing.T) {
m := bindings.Metadata{Properties: map[string]string{"gatewayAddr": "172.0.0.1:1234"}}
client := ClientFactoryImpl{logger: logger.NewLogger("test")}
meta, err := client.parseMetadata(m)
assert.NoError(t, err)
assert.Equal(t, time.Duration(0), meta.GatewayKeepAlive.Duration)
assert.Equal(t, "", meta.CaCertificatePath)
assert.Equal(t, false, meta.UsePlaintextConnection)
}

View File

@ -0,0 +1,77 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
package command
import (
"context"
"encoding/json"
"errors"
"fmt"
"time"
"github.com/dapr/components-contrib/bindings"
"github.com/dapr/components-contrib/metadata"
)
var (
ErrMissingJobType = errors.New("jobType is a required attribute")
ErrMissingMaxJobsToActivate = errors.New("maxJobsToActivate is a required attribute")
)
type activateJobsPayload struct {
JobType string `json:"jobType"`
MaxJobsToActivate *int32 `json:"maxJobsToActivate"`
Timeout metadata.Duration `json:"timeout"`
WorkerName string `json:"workerName"`
FetchVariables []string `json:"fetchVariables"`
}
func (z *ZeebeCommand) activateJobs(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
var payload activateJobsPayload
err := json.Unmarshal(req.Data, &payload)
if err != nil {
return nil, err
}
if payload.JobType == "" {
return nil, ErrMissingJobType
}
if payload.MaxJobsToActivate == nil {
return nil, ErrMissingMaxJobsToActivate
}
cmd := z.client.NewActivateJobsCommand().
JobType(payload.JobType).
MaxJobsToActivate(*payload.MaxJobsToActivate)
if payload.Timeout.Duration != time.Duration(0) {
cmd = cmd.Timeout(payload.Timeout.Duration)
}
if payload.WorkerName != "" {
cmd = cmd.WorkerName(payload.WorkerName)
}
if payload.FetchVariables != nil {
cmd = cmd.FetchVariables(payload.FetchVariables...)
}
ctx := context.Background()
response, err := cmd.Send(ctx)
if err != nil {
return nil, fmt.Errorf("cannot activate jobs for type %s: %w", payload.JobType, err)
}
jsonResponse, err := json.Marshal(response)
if err != nil {
return nil, fmt.Errorf("cannot marshal response to json: %w", err)
}
return &bindings.InvokeResponse{
Data: jsonResponse,
}, nil
}

View File

@ -0,0 +1,160 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
package command
import (
"context"
"encoding/json"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/zeebe-io/zeebe/clients/go/pkg/commands"
"github.com/zeebe-io/zeebe/clients/go/pkg/entities"
"github.com/zeebe-io/zeebe/clients/go/pkg/zbc"
"github.com/dapr/components-contrib/bindings"
contrib_metadata "github.com/dapr/components-contrib/metadata"
"github.com/dapr/kit/logger"
)
type mockActivateJobsClient struct {
zbc.Client
cmd1 *mockActivateJobsCommandStep1
}
type mockActivateJobsCommandStep1 struct {
commands.ActivateJobsCommandStep1
cmd2 *mockActivateJobsCommandStep2
jobType string
}
type mockActivateJobsCommandStep2 struct {
commands.ActivateJobsCommandStep2
cmd3 *mockActivateJobsCommandStep3
maxJobsToActivate int32
}
type mockActivateJobsCommandStep3 struct {
commands.ActivateJobsCommandStep3
timeout time.Duration
workerName string
fetchVariables []string
}
func (mc *mockActivateJobsClient) NewActivateJobsCommand() commands.ActivateJobsCommandStep1 {
mc.cmd1 = &mockActivateJobsCommandStep1{
cmd2: &mockActivateJobsCommandStep2{
cmd3: &mockActivateJobsCommandStep3{},
},
}
return mc.cmd1
}
func (cmd1 *mockActivateJobsCommandStep1) JobType(jobType string) commands.ActivateJobsCommandStep2 {
cmd1.jobType = jobType
return cmd1.cmd2
}
func (cmd2 *mockActivateJobsCommandStep2) MaxJobsToActivate(maxJobsToActivate int32) commands.ActivateJobsCommandStep3 {
cmd2.maxJobsToActivate = maxJobsToActivate
return cmd2.cmd3
}
func (cmd3 *mockActivateJobsCommandStep3) Timeout(timeout time.Duration) commands.ActivateJobsCommandStep3 {
cmd3.timeout = timeout
return cmd3
}
func (cmd3 *mockActivateJobsCommandStep3) WorkerName(workerName string) commands.ActivateJobsCommandStep3 {
cmd3.workerName = workerName
return cmd3
}
func (cmd3 *mockActivateJobsCommandStep3) FetchVariables(fetchVariables ...string) commands.ActivateJobsCommandStep3 {
cmd3.fetchVariables = fetchVariables
return cmd3
}
func (cmd3 *mockActivateJobsCommandStep3) Send(context.Context) ([]entities.Job, error) {
return []entities.Job{}, nil
}
func TestActivateJobs(t *testing.T) {
testLogger := logger.NewLogger("test")
t.Run("jobType is mandatory", func(t *testing.T) {
message := ZeebeCommand{logger: testLogger}
req := &bindings.InvokeRequest{Operation: activateJobsOperation}
_, err := message.Invoke(req)
assert.Error(t, err, ErrMissingJobType)
})
t.Run("maxJobsToActivate is mandatory", func(t *testing.T) {
payload := activateJobsPayload{
JobType: "a",
}
data, err := json.Marshal(payload)
assert.NoError(t, err)
message := ZeebeCommand{logger: testLogger}
req := &bindings.InvokeRequest{Data: data, Operation: activateJobsOperation}
_, err = message.Invoke(req)
assert.Error(t, err, ErrMissingMaxJobsToActivate)
})
t.Run("activate jobs with mandatory fields", func(t *testing.T) {
payload := activateJobsPayload{
JobType: "a",
MaxJobsToActivate: new(int32),
}
data, err := json.Marshal(payload)
assert.NoError(t, err)
req := &bindings.InvokeRequest{Data: data, Operation: activateJobsOperation}
var mc mockActivateJobsClient
message := ZeebeCommand{logger: testLogger, client: &mc}
_, err = message.Invoke(req)
assert.NoError(t, err)
assert.Equal(t, payload.JobType, mc.cmd1.jobType)
assert.Equal(t, *payload.MaxJobsToActivate, mc.cmd1.cmd2.maxJobsToActivate)
})
t.Run("send message with optional fields", func(t *testing.T) {
payload := activateJobsPayload{
JobType: "a",
MaxJobsToActivate: new(int32),
Timeout: contrib_metadata.Duration{Duration: 1 * time.Second},
WorkerName: "b",
FetchVariables: []string{"a", "b", "c"},
}
data, err := json.Marshal(payload)
assert.NoError(t, err)
req := &bindings.InvokeRequest{Data: data, Operation: activateJobsOperation}
var mc mockActivateJobsClient
message := ZeebeCommand{logger: testLogger, client: &mc}
_, err = message.Invoke(req)
assert.NoError(t, err)
assert.Equal(t, payload.JobType, mc.cmd1.jobType)
assert.Equal(t, *payload.MaxJobsToActivate, mc.cmd1.cmd2.maxJobsToActivate)
assert.Equal(t, payload.Timeout.Duration, mc.cmd1.cmd2.cmd3.timeout)
assert.Equal(t, payload.WorkerName, mc.cmd1.cmd2.cmd3.workerName)
assert.Equal(t, []string{"a", "b", "c"}, mc.cmd1.cmd2.cmd3.fetchVariables)
})
}

View File

@ -0,0 +1,44 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
package command
import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/dapr/components-contrib/bindings"
)
var (
ErrMissingWorkflowInstanceKey = errors.New("workflowInstanceKey is a required attribute")
)
type cancelInstancePayload struct {
WorkflowInstanceKey *int64 `json:"workflowInstanceKey"`
}
func (z *ZeebeCommand) cancelInstance(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
var payload cancelInstancePayload
err := json.Unmarshal(req.Data, &payload)
if err != nil {
return nil, err
}
if payload.WorkflowInstanceKey == nil {
return nil, ErrMissingWorkflowInstanceKey
}
_, err = z.client.NewCancelInstanceCommand().
WorkflowInstanceKey(*payload.WorkflowInstanceKey).
Send(context.Background())
if err != nil {
return nil, fmt.Errorf("cannot cancel instance for workflow instance key %d: %w", payload.WorkflowInstanceKey, err)
}
return &bindings.InvokeResponse{}, nil
}

View File

@ -0,0 +1,82 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
package command
import (
"context"
"encoding/json"
"testing"
"github.com/stretchr/testify/assert"
"github.com/zeebe-io/zeebe/clients/go/pkg/commands"
"github.com/zeebe-io/zeebe/clients/go/pkg/pb"
"github.com/zeebe-io/zeebe/clients/go/pkg/zbc"
"github.com/dapr/components-contrib/bindings"
"github.com/dapr/kit/logger"
)
type mockCancelInstanceClient struct {
zbc.Client
cmd1 *mockCancelInstanceStep1
}
type mockCancelInstanceStep1 struct {
commands.CancelInstanceStep1
cmd2 *mockDispatchCancelWorkflowInstanceCommand
}
type mockDispatchCancelWorkflowInstanceCommand struct {
commands.DispatchCancelWorkflowInstanceCommand
workflowInstanceKey int64
}
func (mc *mockCancelInstanceClient) NewCancelInstanceCommand() commands.CancelInstanceStep1 {
mc.cmd1 = &mockCancelInstanceStep1{
cmd2: &mockDispatchCancelWorkflowInstanceCommand{},
}
return mc.cmd1
}
func (cmd1 *mockCancelInstanceStep1) WorkflowInstanceKey(workflowInstanceKey int64) commands.DispatchCancelWorkflowInstanceCommand {
cmd1.cmd2.workflowInstanceKey = workflowInstanceKey
return cmd1.cmd2
}
func (cmd2 *mockDispatchCancelWorkflowInstanceCommand) Send(context.Context) (*pb.CancelWorkflowInstanceResponse, error) {
return &pb.CancelWorkflowInstanceResponse{}, nil
}
func TestCancelInstance(t *testing.T) {
testLogger := logger.NewLogger("test")
t.Run("workflowInstanceKey is mandatory", func(t *testing.T) {
message := ZeebeCommand{logger: testLogger}
req := &bindings.InvokeRequest{Operation: cancelInstanceOperation}
_, err := message.Invoke(req)
assert.Error(t, err, ErrMissingWorkflowInstanceKey)
})
t.Run("cancel a command", func(t *testing.T) {
payload := cancelInstancePayload{
WorkflowInstanceKey: new(int64),
}
data, err := json.Marshal(payload)
assert.NoError(t, err)
req := &bindings.InvokeRequest{Data: data, Operation: cancelInstanceOperation}
var mc mockCancelInstanceClient
message := ZeebeCommand{logger: testLogger, client: &mc}
_, err = message.Invoke(req)
assert.NoError(t, err)
assert.Equal(t, *payload.WorkflowInstanceKey, mc.cmd1.cmd2.workflowInstanceKey)
})
}

View File

@ -0,0 +1,120 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
package command
import (
"errors"
"fmt"
"github.com/zeebe-io/zeebe/clients/go/pkg/zbc"
"github.com/dapr/components-contrib/bindings"
"github.com/dapr/components-contrib/bindings/zeebe"
"github.com/dapr/kit/logger"
)
const (
// operations
topologyOperation bindings.OperationKind = "topology"
deployWorkflowOperation bindings.OperationKind = "deploy-workflow"
createInstanceOperation bindings.OperationKind = "create-instance"
cancelInstanceOperation bindings.OperationKind = "cancel-instance"
setVariablesOperation bindings.OperationKind = "set-variables"
resolveIncidentOperation bindings.OperationKind = "resolve-incident"
publishMessageOperation bindings.OperationKind = "publish-message"
activateJobsOperation bindings.OperationKind = "activate-jobs"
completeJobOperation bindings.OperationKind = "complete-job"
failJobOperation bindings.OperationKind = "fail-job"
updateJobRetriesOperation bindings.OperationKind = "update-job-retries"
throwErrorOperation bindings.OperationKind = "throw-error"
)
var (
ErrMissingJobKey = errors.New("jobKey is a required attribute")
ErrUnsupportedOperation = func(operation bindings.OperationKind) error {
return fmt.Errorf("unsupported operation: %v", operation)
}
)
// ZeebeCommand executes Zeebe commands
type ZeebeCommand struct {
clientFactory zeebe.ClientFactory
client zbc.Client
logger logger.Logger
}
// NewZeebeCommand returns a new ZeebeCommand instance
func NewZeebeCommand(logger logger.Logger) *ZeebeCommand {
return &ZeebeCommand{clientFactory: zeebe.NewClientFactoryImpl(logger), logger: logger}
}
// Init does metadata parsing and connection creation
func (z *ZeebeCommand) Init(metadata bindings.Metadata) error {
client, err := z.clientFactory.Get(metadata)
if err != nil {
return err
}
z.client = client
return nil
}
func (z *ZeebeCommand) Operations() []bindings.OperationKind {
return []bindings.OperationKind{
topologyOperation,
deployWorkflowOperation,
createInstanceOperation,
cancelInstanceOperation,
setVariablesOperation,
resolveIncidentOperation,
publishMessageOperation,
activateJobsOperation,
completeJobOperation,
failJobOperation,
updateJobRetriesOperation,
throwErrorOperation,
}
}
func (z *ZeebeCommand) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
switch req.Operation {
case topologyOperation:
return z.topology()
case deployWorkflowOperation:
return z.deployWorkflow(req)
case createInstanceOperation:
return z.createInstance(req)
case cancelInstanceOperation:
return z.cancelInstance(req)
case setVariablesOperation:
return z.setVariables(req)
case resolveIncidentOperation:
return z.resolveIncident(req)
case publishMessageOperation:
return z.publishMessage(req)
case activateJobsOperation:
return z.activateJobs(req)
case completeJobOperation:
return z.completeJob(req)
case failJobOperation:
return z.failJob(req)
case updateJobRetriesOperation:
return z.updateJobRetries(req)
case throwErrorOperation:
return z.throwError(req)
case bindings.GetOperation:
fallthrough
case bindings.CreateOperation:
fallthrough
case bindings.DeleteOperation:
fallthrough
case bindings.ListOperation:
fallthrough
default:
return nil, ErrUnsupportedOperation(req.Operation)
}
}

View File

@ -0,0 +1,100 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
package command
import (
"errors"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/zeebe-io/zeebe/clients/go/pkg/zbc"
"github.com/dapr/components-contrib/bindings"
"github.com/dapr/components-contrib/bindings/zeebe"
"github.com/dapr/kit/logger"
)
type mockClientFactory struct {
zeebe.ClientFactory
metadata bindings.Metadata
error error
}
type mockClient struct {
zbc.Client
}
func (mcf mockClientFactory) Get(metadata bindings.Metadata) (zbc.Client, error) {
mcf.metadata = metadata
if mcf.error != nil {
return nil, mcf.error
}
return mockClient{}, nil
}
func TestInit(t *testing.T) {
testLogger := logger.NewLogger("test")
t.Run("returns error if client could not be instantiated properly", func(t *testing.T) {
errParsing := errors.New("error on parsing metadata")
metadata := bindings.Metadata{}
mcf := mockClientFactory{
error: errParsing,
}
command := ZeebeCommand{clientFactory: mcf, logger: testLogger}
err := command.Init(metadata)
assert.Error(t, err, errParsing)
})
t.Run("sets client from client factory", func(t *testing.T) {
metadata := bindings.Metadata{}
mcf := mockClientFactory{}
command := ZeebeCommand{clientFactory: mcf, logger: testLogger}
err := command.Init(metadata)
assert.NoError(t, err)
mc, err := mcf.Get(metadata)
assert.NoError(t, err)
assert.Equal(t, mc, command.client)
assert.Equal(t, metadata, mcf.metadata)
})
}
func TestInvoke(t *testing.T) {
testLogger := logger.NewLogger("test")
t.Run("operation must be supported", func(t *testing.T) {
message := ZeebeCommand{logger: testLogger}
req := &bindings.InvokeRequest{Operation: bindings.DeleteOperation}
_, err := message.Invoke(req)
assert.Error(t, err, ErrUnsupportedOperation(bindings.DeleteOperation))
})
}
func TestOperations(t *testing.T) {
testBinding := ZeebeCommand{logger: logger.NewLogger("test")}
operations := testBinding.Operations()
require.Equal(t, 12, len(operations))
assert.Equal(t, topologyOperation, operations[0])
assert.Equal(t, deployWorkflowOperation, operations[1])
assert.Equal(t, createInstanceOperation, operations[2])
assert.Equal(t, cancelInstanceOperation, operations[3])
assert.Equal(t, setVariablesOperation, operations[4])
assert.Equal(t, resolveIncidentOperation, operations[5])
assert.Equal(t, publishMessageOperation, operations[6])
assert.Equal(t, activateJobsOperation, operations[7])
assert.Equal(t, completeJobOperation, operations[8])
assert.Equal(t, failJobOperation, operations[9])
assert.Equal(t, updateJobRetriesOperation, operations[10])
assert.Equal(t, throwErrorOperation, operations[11])
}

View File

@ -0,0 +1,50 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
package command
import (
"context"
"encoding/json"
"fmt"
"github.com/zeebe-io/zeebe/clients/go/pkg/commands"
"github.com/dapr/components-contrib/bindings"
)
type completeJobPayload struct {
JobKey *int64 `json:"jobKey"`
Variables interface{} `json:"variables"`
}
func (z *ZeebeCommand) completeJob(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
var payload completeJobPayload
err := json.Unmarshal(req.Data, &payload)
if err != nil {
return nil, err
}
if payload.JobKey == nil {
return nil, ErrMissingJobKey
}
cmd1 := z.client.NewCompleteJobCommand()
cmd2 := cmd1.JobKey(*payload.JobKey)
var cmd3 commands.DispatchCompleteJobCommand = cmd2
if payload.Variables != nil {
cmd3, err = cmd2.VariablesFromObject(payload.Variables)
if err != nil {
return nil, err
}
}
_, err = cmd3.Send(context.Background())
if err != nil {
return nil, fmt.Errorf("cannot complete job for key %d: %w", payload.JobKey, err)
}
return &bindings.InvokeResponse{}, nil
}

View File

@ -0,0 +1,100 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
package command
import (
"context"
"encoding/json"
"testing"
"github.com/stretchr/testify/assert"
"github.com/zeebe-io/zeebe/clients/go/pkg/commands"
"github.com/zeebe-io/zeebe/clients/go/pkg/pb"
"github.com/zeebe-io/zeebe/clients/go/pkg/zbc"
"github.com/dapr/components-contrib/bindings"
"github.com/dapr/kit/logger"
)
type mockCompleteJobClient struct {
zbc.Client
cmd1 *mockCompleteJobCommandStep1
}
type mockCompleteJobCommandStep1 struct {
commands.CompleteJobCommandStep1
cmd2 *mockCompleteJobCommandStep2
jobKey int64
}
type mockCompleteJobCommandStep2 struct {
commands.CompleteJobCommandStep2
cmd3 *mockDispatchCompleteJobCommand
variables interface{}
}
type mockDispatchCompleteJobCommand struct {
commands.DispatchCompleteJobCommand
}
func (mc *mockCompleteJobClient) NewCompleteJobCommand() commands.CompleteJobCommandStep1 {
mc.cmd1 = &mockCompleteJobCommandStep1{
cmd2: &mockCompleteJobCommandStep2{
cmd3: &mockDispatchCompleteJobCommand{},
},
}
return mc.cmd1
}
func (cmd1 *mockCompleteJobCommandStep1) JobKey(jobKey int64) commands.CompleteJobCommandStep2 {
cmd1.jobKey = jobKey
return cmd1.cmd2
}
func (cmd2 *mockCompleteJobCommandStep2) VariablesFromObject(variables interface{}) (commands.DispatchCompleteJobCommand, error) {
cmd2.variables = variables
return cmd2.cmd3, nil
}
func (cmd3 *mockDispatchCompleteJobCommand) Send(context.Context) (*pb.CompleteJobResponse, error) {
return &pb.CompleteJobResponse{}, nil
}
func TestCompleteJob(t *testing.T) {
testLogger := logger.NewLogger("test")
t.Run("elementInstanceKey is mandatory", func(t *testing.T) {
message := ZeebeCommand{logger: testLogger}
req := &bindings.InvokeRequest{Operation: completeJobOperation}
_, err := message.Invoke(req)
assert.Error(t, err, ErrMissingJobKey)
})
t.Run("complete a job", func(t *testing.T) {
payload := completeJobPayload{
JobKey: new(int64),
Variables: map[string]interface{}{
"key": "value",
},
}
data, err := json.Marshal(payload)
assert.NoError(t, err)
req := &bindings.InvokeRequest{Data: data, Operation: completeJobOperation}
var mc mockCompleteJobClient
message := ZeebeCommand{logger: testLogger, client: &mc}
_, err = message.Invoke(req)
assert.NoError(t, err)
assert.Equal(t, *payload.JobKey, mc.cmd1.jobKey)
assert.Equal(t, payload.Variables, mc.cmd1.cmd2.variables)
})
}

View File

@ -0,0 +1,83 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
package command
import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/zeebe-io/zeebe/clients/go/pkg/commands"
"github.com/dapr/components-contrib/bindings"
)
var (
ErrAmbiguousCreationVars = errors.New("either 'bpmnProcessId' or 'workflowKey' must be passed, not both at the same time")
ErrMissingCreationVars = errors.New("either 'bpmnProcessId' or 'workflowKey' must be passed")
)
type createInstancePayload struct {
BpmnProcessID string `json:"bpmnProcessId"`
WorkflowKey *int64 `json:"workflowKey"`
Version *int32 `json:"version"`
Variables interface{} `json:"variables"`
}
func (z *ZeebeCommand) createInstance(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
var payload createInstancePayload
err := json.Unmarshal(req.Data, &payload)
if err != nil {
return nil, err
}
cmd1 := z.client.NewCreateInstanceCommand()
var cmd2 commands.CreateInstanceCommandStep2
var cmd3 commands.CreateInstanceCommandStep3
var errorDetail string
if payload.BpmnProcessID != "" { //nolint:nestif
if payload.WorkflowKey != nil {
return nil, ErrAmbiguousCreationVars
}
cmd2 = cmd1.BPMNProcessId(payload.BpmnProcessID)
if payload.Version != nil {
cmd3 = cmd2.Version(*payload.Version)
errorDetail = fmt.Sprintf("bpmnProcessId %s and version %d", payload.BpmnProcessID, payload.Version)
} else {
cmd3 = cmd2.LatestVersion()
errorDetail = fmt.Sprintf("bpmnProcessId %s and lates version", payload.BpmnProcessID)
}
} else if payload.WorkflowKey != nil {
cmd3 = cmd1.WorkflowKey(*payload.WorkflowKey)
errorDetail = fmt.Sprintf("workflowKey %d", payload.WorkflowKey)
} else {
return nil, ErrMissingCreationVars
}
if payload.Variables != nil {
cmd3, err = cmd3.VariablesFromObject(payload.Variables)
if err != nil {
return nil, err
}
}
response, err := cmd3.Send(context.Background())
if err != nil {
return nil, fmt.Errorf("cannot create instane for %s: %w", errorDetail, err)
}
jsonResponse, err := json.Marshal(response)
if err != nil {
return nil, fmt.Errorf("cannot marshal response to json: %w", err)
}
return &bindings.InvokeResponse{
Data: jsonResponse,
}, nil
}

View File

@ -0,0 +1,203 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
package command
import (
"context"
"encoding/json"
"testing"
"github.com/stretchr/testify/assert"
"github.com/zeebe-io/zeebe/clients/go/pkg/commands"
"github.com/zeebe-io/zeebe/clients/go/pkg/pb"
"github.com/zeebe-io/zeebe/clients/go/pkg/zbc"
"github.com/dapr/components-contrib/bindings"
"github.com/dapr/kit/logger"
)
type mockCreateInstanceClient struct {
zbc.Client
cmd1 *mockCreateInstanceCommandStep1
}
type mockCreateInstanceCommandStep1 struct {
commands.CreateInstanceCommandStep1
cmd2 *mockCreateInstanceCommandStep2
bpmnProcessID string
workflowKey int64
}
type mockCreateInstanceCommandStep2 struct {
commands.CreateInstanceCommandStep2
cmd3 *mockCreateInstanceCommandStep3
version int32
latestVersion bool
}
type mockCreateInstanceCommandStep3 struct {
commands.CreateInstanceCommandStep3
variables interface{}
}
func (mc *mockCreateInstanceClient) NewCreateInstanceCommand() commands.CreateInstanceCommandStep1 {
mc.cmd1 = &mockCreateInstanceCommandStep1{
cmd2: &mockCreateInstanceCommandStep2{
cmd3: &mockCreateInstanceCommandStep3{},
},
}
return mc.cmd1
}
//nolint // BPMNProcessId comes from the Zeebe client API and cannot be written as BPMNProcessID
func (cmd1 *mockCreateInstanceCommandStep1) BPMNProcessId(bpmnProcessID string) commands.CreateInstanceCommandStep2 {
cmd1.bpmnProcessID = bpmnProcessID
return cmd1.cmd2
}
func (cmd1 *mockCreateInstanceCommandStep1) WorkflowKey(workflowKey int64) commands.CreateInstanceCommandStep3 {
cmd1.workflowKey = workflowKey
return cmd1.cmd2.cmd3
}
func (cmd2 *mockCreateInstanceCommandStep2) Version(version int32) commands.CreateInstanceCommandStep3 {
cmd2.version = version
return cmd2.cmd3
}
func (cmd2 *mockCreateInstanceCommandStep2) LatestVersion() commands.CreateInstanceCommandStep3 {
cmd2.latestVersion = true
return cmd2.cmd3
}
func (cmd3 *mockCreateInstanceCommandStep3) VariablesFromObject(variables interface{}) (commands.CreateInstanceCommandStep3, error) {
cmd3.variables = variables
return cmd3, nil
}
func (cmd3 *mockCreateInstanceCommandStep3) Send(context.Context) (*pb.CreateWorkflowInstanceResponse, error) {
return &pb.CreateWorkflowInstanceResponse{}, nil
}
func TestCreateInstance(t *testing.T) {
testLogger := logger.NewLogger("test")
t.Run("bpmnProcessId and workflowKey are not allowed at the same time", func(t *testing.T) {
payload := createInstancePayload{
BpmnProcessID: "some-id",
WorkflowKey: new(int64),
}
data, err := json.Marshal(payload)
assert.Nil(t, err)
req := &bindings.InvokeRequest{Data: data, Operation: createInstanceOperation}
var mc mockCreateInstanceClient
message := ZeebeCommand{logger: testLogger, client: &mc}
_, err = message.Invoke(req)
assert.Error(t, err, ErrAmbiguousCreationVars)
})
t.Run("either bpmnProcessId or workflowKey must be given", func(t *testing.T) {
payload := createInstancePayload{}
data, err := json.Marshal(payload)
assert.NoError(t, err)
req := &bindings.InvokeRequest{Data: data, Operation: createInstanceOperation}
var mc mockCreateInstanceClient
message := ZeebeCommand{logger: testLogger, client: &mc}
_, err = message.Invoke(req)
assert.Error(t, err, ErrMissingCreationVars)
})
t.Run("create command with bpmnProcessId and specific version", func(t *testing.T) {
payload := createInstancePayload{
BpmnProcessID: "some-id",
Version: new(int32),
}
data, err := json.Marshal(payload)
assert.NoError(t, err)
req := &bindings.InvokeRequest{Data: data, Operation: createInstanceOperation}
var mc mockCreateInstanceClient
message := ZeebeCommand{logger: testLogger, client: &mc}
_, err = message.Invoke(req)
assert.NoError(t, err)
assert.Equal(t, payload.BpmnProcessID, mc.cmd1.bpmnProcessID)
assert.Equal(t, *payload.Version, mc.cmd1.cmd2.version)
})
t.Run("create command with bpmnProcessId and latest version", func(t *testing.T) {
payload := createInstancePayload{
BpmnProcessID: "some-id",
}
data, err := json.Marshal(payload)
assert.NoError(t, err)
req := &bindings.InvokeRequest{Data: data, Operation: createInstanceOperation}
var mc mockCreateInstanceClient
message := ZeebeCommand{logger: testLogger, client: &mc}
_, err = message.Invoke(req)
assert.NoError(t, err)
assert.Equal(t, payload.BpmnProcessID, mc.cmd1.bpmnProcessID)
assert.Equal(t, true, mc.cmd1.cmd2.latestVersion)
})
t.Run("create command with workflowKey", func(t *testing.T) {
payload := createInstancePayload{
WorkflowKey: new(int64),
}
data, err := json.Marshal(payload)
assert.NoError(t, err)
req := &bindings.InvokeRequest{Data: data, Operation: createInstanceOperation}
var mc mockCreateInstanceClient
message := ZeebeCommand{logger: testLogger, client: &mc}
_, err = message.Invoke(req)
assert.NoError(t, err)
assert.Equal(t, *payload.WorkflowKey, mc.cmd1.workflowKey)
})
t.Run("create command with variables", func(t *testing.T) {
payload := createInstancePayload{
WorkflowKey: new(int64),
Variables: map[string]interface{}{
"key": "value",
},
}
data, err := json.Marshal(payload)
assert.NoError(t, err)
req := &bindings.InvokeRequest{Data: data, Operation: createInstanceOperation}
var mc mockCreateInstanceClient
message := ZeebeCommand{logger: testLogger, client: &mc}
_, err = message.Invoke(req)
assert.NoError(t, err)
assert.Equal(t, *payload.WorkflowKey, mc.cmd1.workflowKey)
assert.Equal(t, payload.Variables, mc.cmd1.cmd2.cmd3.variables)
})
}

View File

@ -0,0 +1,81 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
package command
import (
"context"
"encoding/json"
"errors"
"fmt"
"path/filepath"
"github.com/zeebe-io/zeebe/clients/go/pkg/pb"
"github.com/dapr/components-contrib/bindings"
)
const (
// metadata
fileName = "fileName"
fileType = "fileType"
)
var (
ErrMissingFileName = errors.New("fileName is a required attribute")
ErrMissingFileType = errors.New("cannot determine file type from file name. Please specify a fileType")
ErrInvalidFileType = errors.New("fileType must be either 'bpmn' of 'file'")
)
func (z *ZeebeCommand) deployWorkflow(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
var deployFileName string
var deployFileType string
if val, ok := req.Metadata[fileName]; ok && val != "" {
deployFileName = val
} else {
return nil, ErrMissingFileName
}
if val, ok := req.Metadata[fileType]; ok && val != "" {
deployFileType = val
} else {
var extension = filepath.Ext(deployFileName)
if extension == "" {
return nil, ErrMissingFileType
}
if extension == ".bpmn" {
deployFileType = "bpmn"
} else {
deployFileType = "file"
}
}
var resourceType pb.WorkflowRequestObject_ResourceType
if deployFileType == "bpmn" {
resourceType = pb.WorkflowRequestObject_BPMN
} else if deployFileType == "file" {
resourceType = pb.WorkflowRequestObject_FILE
} else {
return nil, ErrInvalidFileType
}
response, err := z.client.NewDeployWorkflowCommand().
AddResource(req.Data, deployFileName, resourceType).
Send(context.Background())
if err != nil {
return nil, fmt.Errorf("cannot deploy workflow with fileName %s: %w", deployFileName, err)
}
jsonResponse, err := json.Marshal(response)
if err != nil {
return nil, fmt.Errorf("cannot marshal response to json: %w", err)
}
return &bindings.InvokeResponse{
Data: jsonResponse,
}, nil
}

View File

@ -0,0 +1,45 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
package command
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/dapr/components-contrib/bindings"
"github.com/dapr/kit/logger"
)
func TestDeployWorkflow(t *testing.T) {
testLogger := logger.NewLogger("test")
t.Run("fileName is mandatory", func(t *testing.T) {
message := ZeebeCommand{logger: testLogger}
req := &bindings.InvokeRequest{Operation: deployWorkflowOperation}
_, err := message.Invoke(req)
assert.Error(t, err, ErrMissingFileName)
})
t.Run("return error if file type recognition doesn't work because of missing file extension", func(t *testing.T) {
message := ZeebeCommand{logger: testLogger}
req := &bindings.InvokeRequest{Operation: deployWorkflowOperation, Metadata: map[string]string{
"fileName": "test",
}}
_, err := message.Invoke(req)
assert.Error(t, err, ErrMissingFileType)
})
t.Run("return error if file type isn't supported", func(t *testing.T) {
message := ZeebeCommand{logger: testLogger}
req := &bindings.InvokeRequest{Operation: deployWorkflowOperation, Metadata: map[string]string{
"fileName": "test.bpmn",
"fileType": "unsupported",
}}
_, err := message.Invoke(req)
assert.Error(t, err, ErrInvalidFileType)
})
}

View File

@ -0,0 +1,56 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
package command
import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/dapr/components-contrib/bindings"
)
var (
ErrMissingRetries = errors.New("retries is a required attribute")
)
type failJobPayload struct {
JobKey *int64 `json:"jobKey"`
Retries *int32 `json:"retries"`
ErrorMessage string `json:"errorMessage"`
}
func (z *ZeebeCommand) failJob(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
var payload failJobPayload
err := json.Unmarshal(req.Data, &payload)
if err != nil {
return nil, err
}
if payload.JobKey == nil {
return nil, ErrMissingJobKey
}
if payload.Retries == nil {
return nil, ErrMissingRetries
}
cmd := z.client.NewFailJobCommand().
JobKey(*payload.JobKey).
Retries(*payload.Retries)
if payload.ErrorMessage != "" {
cmd = cmd.ErrorMessage(payload.ErrorMessage)
}
_, err = cmd.Send(context.Background())
if err != nil {
return nil, fmt.Errorf("cannot fail job for key %d: %w", payload.JobKey, err)
}
return &bindings.InvokeResponse{}, nil
}

View File

@ -0,0 +1,120 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
package command
import (
"context"
"encoding/json"
"testing"
"github.com/stretchr/testify/assert"
"github.com/zeebe-io/zeebe/clients/go/pkg/commands"
"github.com/zeebe-io/zeebe/clients/go/pkg/pb"
"github.com/zeebe-io/zeebe/clients/go/pkg/zbc"
"github.com/dapr/components-contrib/bindings"
"github.com/dapr/kit/logger"
)
type mockFailJobClient struct {
zbc.Client
cmd1 *mockFailJobCommandStep1
}
type mockFailJobCommandStep1 struct {
commands.FailJobCommandStep1
cmd2 *mockFailJobCommandStep2
jobKey int64
}
type mockFailJobCommandStep2 struct {
commands.FailJobCommandStep2
cmd3 *mockFailJobCommandStep3
retries int32
}
type mockFailJobCommandStep3 struct {
commands.FailJobCommandStep3
errorMessage string
}
func (mc *mockFailJobClient) NewFailJobCommand() commands.FailJobCommandStep1 {
mc.cmd1 = &mockFailJobCommandStep1{
cmd2: &mockFailJobCommandStep2{
cmd3: &mockFailJobCommandStep3{},
},
}
return mc.cmd1
}
func (cmd1 *mockFailJobCommandStep1) JobKey(jobKey int64) commands.FailJobCommandStep2 {
cmd1.jobKey = jobKey
return cmd1.cmd2
}
func (cmd2 *mockFailJobCommandStep2) Retries(retries int32) commands.FailJobCommandStep3 {
cmd2.retries = retries
return cmd2.cmd3
}
func (cmd3 *mockFailJobCommandStep3) ErrorMessage(errorMessage string) commands.FailJobCommandStep3 {
cmd3.errorMessage = errorMessage
return cmd3
}
func (cmd3 *mockFailJobCommandStep3) Send(context.Context) (*pb.FailJobResponse, error) {
return &pb.FailJobResponse{}, nil
}
func TestFailJob(t *testing.T) {
testLogger := logger.NewLogger("test")
t.Run("jobKey is mandatory", func(t *testing.T) {
message := ZeebeCommand{logger: testLogger}
req := &bindings.InvokeRequest{Operation: failJobOperation}
_, err := message.Invoke(req)
assert.Error(t, err, ErrMissingJobKey)
})
t.Run("retries is mandatory", func(t *testing.T) {
payload := failJobPayload{
JobKey: new(int64),
}
data, err := json.Marshal(payload)
assert.NoError(t, err)
message := ZeebeCommand{logger: testLogger}
req := &bindings.InvokeRequest{Data: data, Operation: failJobOperation}
_, err = message.Invoke(req)
assert.Error(t, err, ErrMissingRetries)
})
t.Run("fail a job", func(t *testing.T) {
payload := failJobPayload{
JobKey: new(int64),
Retries: new(int32),
ErrorMessage: "a",
}
data, err := json.Marshal(payload)
assert.NoError(t, err)
req := &bindings.InvokeRequest{Data: data, Operation: failJobOperation}
var mc mockFailJobClient
message := ZeebeCommand{logger: testLogger, client: &mc}
_, err = message.Invoke(req)
assert.NoError(t, err)
assert.Equal(t, *payload.JobKey, mc.cmd1.jobKey)
assert.Equal(t, *payload.Retries, mc.cmd1.cmd2.retries)
assert.Equal(t, payload.ErrorMessage, mc.cmd1.cmd2.cmd3.errorMessage)
})
}

View File

@ -0,0 +1,75 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
package command
import (
"context"
"encoding/json"
"errors"
"fmt"
"time"
"github.com/dapr/components-contrib/bindings"
"github.com/dapr/components-contrib/metadata"
)
var (
ErrMissingMessageName = errors.New("messageName is a required attribute")
)
type publishMessagePayload struct {
MessageName string `json:"messageName"`
CorrelationKey string `json:"correlationKey"`
MessageID string `json:"messageId"`
TimeToLive metadata.Duration `json:"timeToLive"`
Variables interface{} `json:"variables"`
}
func (z *ZeebeCommand) publishMessage(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
var payload publishMessagePayload
err := json.Unmarshal(req.Data, &payload)
if err != nil {
return nil, err
}
if payload.MessageName == "" {
return nil, ErrMissingMessageName
}
cmd := z.client.NewPublishMessageCommand().
MessageName(payload.MessageName).
CorrelationKey(payload.CorrelationKey)
if payload.MessageID != "" {
cmd = cmd.MessageId(payload.MessageID)
}
if payload.TimeToLive.Duration != time.Duration(0) {
cmd = cmd.TimeToLive(payload.TimeToLive.Duration)
}
if payload.Variables != nil {
cmd, err = cmd.VariablesFromObject(payload.Variables)
if err != nil {
return nil, err
}
}
ctx := context.Background()
response, err := cmd.Send(ctx)
if err != nil {
return nil, fmt.Errorf("cannot publish message with name %s: %w", payload.MessageName, err)
}
jsonResponse, err := json.Marshal(response)
if err != nil {
return nil, fmt.Errorf("cannot marshal response to json: %w", err)
}
return &bindings.InvokeResponse{
Data: jsonResponse,
}, nil
}

View File

@ -0,0 +1,149 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
package command
import (
"context"
"encoding/json"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/zeebe-io/zeebe/clients/go/pkg/commands"
"github.com/zeebe-io/zeebe/clients/go/pkg/pb"
"github.com/zeebe-io/zeebe/clients/go/pkg/zbc"
"github.com/dapr/components-contrib/bindings"
contrib_metadata "github.com/dapr/components-contrib/metadata"
"github.com/dapr/kit/logger"
)
type mockPublishMessageClient struct {
zbc.Client
cmd1 *mockPublishMessageCommandStep1
}
type mockPublishMessageCommandStep1 struct {
commands.PublishMessageCommandStep1
cmd2 *mockPublishMessageCommandStep2
messageName string
}
type mockPublishMessageCommandStep2 struct {
commands.PublishMessageCommandStep2
cmd3 *mockPublishMessageCommandStep3
correlationKey string
}
type mockPublishMessageCommandStep3 struct {
commands.PublishMessageCommandStep3
messageID string
timeToLive time.Duration
variables interface{}
}
func (mc *mockPublishMessageClient) NewPublishMessageCommand() commands.PublishMessageCommandStep1 {
mc.cmd1 = &mockPublishMessageCommandStep1{
cmd2: &mockPublishMessageCommandStep2{
cmd3: &mockPublishMessageCommandStep3{},
},
}
return mc.cmd1
}
func (cmd1 *mockPublishMessageCommandStep1) MessageName(messageName string) commands.PublishMessageCommandStep2 {
cmd1.messageName = messageName
return cmd1.cmd2
}
func (cmd2 *mockPublishMessageCommandStep2) CorrelationKey(correlationKey string) commands.PublishMessageCommandStep3 {
cmd2.correlationKey = correlationKey
return cmd2.cmd3
}
//nolint // MessageId comes from the Zeebe client API and cannot be written as MessageID
func (cmd3 *mockPublishMessageCommandStep3) MessageId(messageID string) commands.PublishMessageCommandStep3 {
cmd3.messageID = messageID
return cmd3
}
func (cmd3 *mockPublishMessageCommandStep3) TimeToLive(timeToLive time.Duration) commands.PublishMessageCommandStep3 {
cmd3.timeToLive = timeToLive
return cmd3
}
func (cmd3 *mockPublishMessageCommandStep3) VariablesFromObject(variables interface{}) (commands.PublishMessageCommandStep3, error) {
cmd3.variables = variables
return cmd3, nil
}
func (cmd3 *mockPublishMessageCommandStep3) Send(context.Context) (*pb.PublishMessageResponse, error) {
return &pb.PublishMessageResponse{}, nil
}
func TestPublishMessage(t *testing.T) {
testLogger := logger.NewLogger("test")
t.Run("messageName is mandatory", func(t *testing.T) {
message := ZeebeCommand{logger: testLogger}
req := &bindings.InvokeRequest{Operation: publishMessageOperation}
_, err := message.Invoke(req)
assert.Error(t, err, ErrMissingMessageName)
})
t.Run("send message with mandatory fields", func(t *testing.T) {
payload := publishMessagePayload{
MessageName: "a",
}
data, err := json.Marshal(payload)
assert.NoError(t, err)
req := &bindings.InvokeRequest{Data: data, Operation: publishMessageOperation}
var mc mockPublishMessageClient
message := ZeebeCommand{logger: testLogger, client: &mc}
_, err = message.Invoke(req)
assert.NoError(t, err)
assert.Equal(t, payload.MessageName, mc.cmd1.messageName)
assert.Equal(t, payload.CorrelationKey, mc.cmd1.cmd2.correlationKey)
})
t.Run("send message with optional fields", func(t *testing.T) {
payload := publishMessagePayload{
MessageName: "a",
CorrelationKey: "b",
MessageID: "c",
TimeToLive: contrib_metadata.Duration{Duration: 1 * time.Second},
Variables: map[string]interface{}{
"key": "value",
},
}
data, err := json.Marshal(payload)
assert.NoError(t, err)
req := &bindings.InvokeRequest{Data: data, Operation: publishMessageOperation}
var mc mockPublishMessageClient
message := ZeebeCommand{logger: testLogger, client: &mc}
_, err = message.Invoke(req)
assert.NoError(t, err)
assert.Equal(t, payload.MessageName, mc.cmd1.messageName)
assert.Equal(t, payload.CorrelationKey, mc.cmd1.cmd2.correlationKey)
assert.Equal(t, payload.MessageID, mc.cmd1.cmd2.cmd3.messageID)
assert.Equal(t, payload.TimeToLive.Duration, mc.cmd1.cmd2.cmd3.timeToLive)
assert.Equal(t, payload.Variables, mc.cmd1.cmd2.cmd3.variables)
})
}

View File

@ -0,0 +1,44 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
package command
import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/dapr/components-contrib/bindings"
)
var (
ErrMissingIncidentKey = errors.New("incidentKey is a required attribute")
)
type resolveIncidentPayload struct {
IncidentKey *int64 `json:"incidentKey"`
}
func (z *ZeebeCommand) resolveIncident(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
var payload resolveIncidentPayload
err := json.Unmarshal(req.Data, &payload)
if err != nil {
return nil, err
}
if payload.IncidentKey == nil {
return nil, ErrMissingIncidentKey
}
_, err = z.client.NewResolveIncidentCommand().
IncidentKey(*payload.IncidentKey).
Send(context.Background())
if err != nil {
return nil, fmt.Errorf("cannot resolve incident for key %d: %w", payload.IncidentKey, err)
}
return &bindings.InvokeResponse{}, nil
}

View File

@ -0,0 +1,82 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
package command
import (
"context"
"encoding/json"
"testing"
"github.com/stretchr/testify/assert"
"github.com/zeebe-io/zeebe/clients/go/pkg/commands"
"github.com/zeebe-io/zeebe/clients/go/pkg/pb"
"github.com/zeebe-io/zeebe/clients/go/pkg/zbc"
"github.com/dapr/components-contrib/bindings"
"github.com/dapr/kit/logger"
)
type mockResolveIncidentClient struct {
zbc.Client
cmd1 *mockResolveIncidentCommandStep1
}
type mockResolveIncidentCommandStep1 struct {
commands.ResolveIncidentCommandStep1
cmd2 *mockResolveIncidentCommandStep2
incidentKey int64
}
type mockResolveIncidentCommandStep2 struct {
commands.ResolveIncidentCommandStep2
}
func (mc *mockResolveIncidentClient) NewResolveIncidentCommand() commands.ResolveIncidentCommandStep1 {
mc.cmd1 = &mockResolveIncidentCommandStep1{
cmd2: &mockResolveIncidentCommandStep2{},
}
return mc.cmd1
}
func (cmd1 *mockResolveIncidentCommandStep1) IncidentKey(incidentKey int64) commands.ResolveIncidentCommandStep2 {
cmd1.incidentKey = incidentKey
return cmd1.cmd2
}
func (cmd2 *mockResolveIncidentCommandStep2) Send(context.Context) (*pb.ResolveIncidentResponse, error) {
return &pb.ResolveIncidentResponse{}, nil
}
func TestResolveIncident(t *testing.T) {
testLogger := logger.NewLogger("test")
t.Run("incidentKey is mandatory", func(t *testing.T) {
message := ZeebeCommand{logger: testLogger}
req := &bindings.InvokeRequest{Operation: resolveIncidentOperation}
_, err := message.Invoke(req)
assert.Error(t, err, ErrMissingIncidentKey)
})
t.Run("resolve a incident", func(t *testing.T) {
payload := resolveIncidentPayload{
IncidentKey: new(int64),
}
data, err := json.Marshal(payload)
assert.NoError(t, err)
req := &bindings.InvokeRequest{Data: data, Operation: resolveIncidentOperation}
var mc mockResolveIncidentClient
message := ZeebeCommand{logger: testLogger, client: &mc}
_, err = message.Invoke(req)
assert.NoError(t, err)
assert.Equal(t, *payload.IncidentKey, mc.cmd1.incidentKey)
})
}

View File

@ -0,0 +1,63 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
package command
import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/dapr/components-contrib/bindings"
)
var (
ErrMissingElementInstanceKey = errors.New("elementInstanceKey is a required attribute")
ErrMissingVariables = errors.New("variables is a required attribute")
)
type setVariablesPayload struct {
ElementInstanceKey *int64 `json:"elementInstanceKey"`
Local bool `json:"local"`
Variables interface{} `json:"variables"`
}
func (z *ZeebeCommand) setVariables(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
var payload setVariablesPayload
err := json.Unmarshal(req.Data, &payload)
if err != nil {
return nil, err
}
if payload.ElementInstanceKey == nil {
return nil, ErrMissingElementInstanceKey
}
if payload.Variables == nil {
return nil, ErrMissingVariables
}
cmd, err := z.client.NewSetVariablesCommand().
ElementInstanceKey(*payload.ElementInstanceKey).
VariablesFromObject(payload.Variables)
if err != nil {
return nil, err
}
response, err := cmd.Local(payload.Local).Send(context.Background())
if err != nil {
return nil, fmt.Errorf("cannot set variables for element instance key %d: %w", payload.ElementInstanceKey, err)
}
jsonResponse, err := json.Marshal(response)
if err != nil {
return nil, fmt.Errorf("cannot marshal response to json: %w", err)
}
return &bindings.InvokeResponse{
Data: jsonResponse,
}, nil
}

View File

@ -0,0 +1,145 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
package command
import (
"context"
"encoding/json"
"testing"
"github.com/stretchr/testify/assert"
"github.com/zeebe-io/zeebe/clients/go/pkg/commands"
"github.com/zeebe-io/zeebe/clients/go/pkg/pb"
"github.com/zeebe-io/zeebe/clients/go/pkg/zbc"
"github.com/dapr/components-contrib/bindings"
"github.com/dapr/kit/logger"
)
type mockSetVariableClient struct {
zbc.Client
cmd1 *mockSetVariablesCommandStep1
}
type mockSetVariablesCommandStep1 struct {
commands.SetVariablesCommandStep1
cmd2 *mockSetVariablesCommandStep2
elementInstanceKey int64
}
type mockSetVariablesCommandStep2 struct {
commands.SetVariablesCommandStep2
cmd3 *mockDispatchSetVariablesCommand
variables interface{}
}
type mockDispatchSetVariablesCommand struct {
commands.DispatchSetVariablesCommand
local bool
}
func (mc *mockSetVariableClient) NewSetVariablesCommand() commands.SetVariablesCommandStep1 {
mc.cmd1 = &mockSetVariablesCommandStep1{
cmd2: &mockSetVariablesCommandStep2{
cmd3: &mockDispatchSetVariablesCommand{},
},
}
return mc.cmd1
}
func (cmd1 *mockSetVariablesCommandStep1) ElementInstanceKey(elementInstanceKey int64) commands.SetVariablesCommandStep2 {
cmd1.elementInstanceKey = elementInstanceKey
return cmd1.cmd2
}
func (cmd2 *mockSetVariablesCommandStep2) VariablesFromObject(variables interface{}) (commands.DispatchSetVariablesCommand, error) {
cmd2.variables = variables
return cmd2.cmd3, nil
}
func (cmd3 *mockDispatchSetVariablesCommand) Local(local bool) commands.DispatchSetVariablesCommand {
cmd3.local = local
return cmd3
}
func (cmd3 *mockDispatchSetVariablesCommand) Send(context.Context) (*pb.SetVariablesResponse, error) {
return &pb.SetVariablesResponse{}, nil
}
func TestSetVariables(t *testing.T) {
testLogger := logger.NewLogger("test")
t.Run("elementInstanceKey is mandatory", func(t *testing.T) {
message := ZeebeCommand{logger: testLogger}
req := &bindings.InvokeRequest{Operation: setVariablesOperation}
_, err := message.Invoke(req)
assert.Error(t, err, ErrMissingElementInstanceKey)
})
t.Run("variables is mandatory", func(t *testing.T) {
payload := setVariablesPayload{
ElementInstanceKey: new(int64),
}
data, err := json.Marshal(payload)
assert.NoError(t, err)
message := ZeebeCommand{logger: testLogger}
req := &bindings.InvokeRequest{Data: data, Operation: setVariablesOperation}
_, err = message.Invoke(req)
assert.Error(t, err, ErrMissingVariables)
})
t.Run("set variables", func(t *testing.T) {
payload := setVariablesPayload{
ElementInstanceKey: new(int64),
Variables: map[string]interface{}{
"key": "value",
},
}
data, err := json.Marshal(payload)
assert.NoError(t, err)
req := &bindings.InvokeRequest{Data: data, Operation: setVariablesOperation}
var mc mockSetVariableClient
message := ZeebeCommand{logger: testLogger, client: &mc}
_, err = message.Invoke(req)
assert.NoError(t, err)
assert.Equal(t, *payload.ElementInstanceKey, mc.cmd1.elementInstanceKey)
assert.Equal(t, payload.Variables, mc.cmd1.cmd2.variables)
assert.Equal(t, false, mc.cmd1.cmd2.cmd3.local)
})
t.Run("set local variables", func(t *testing.T) {
payload := setVariablesPayload{
ElementInstanceKey: new(int64),
Variables: map[string]interface{}{
"key": "value",
},
Local: true,
}
data, err := json.Marshal(payload)
assert.NoError(t, err)
req := &bindings.InvokeRequest{Data: data, Operation: setVariablesOperation}
var mc mockSetVariableClient
message := ZeebeCommand{logger: testLogger, client: &mc}
_, err = message.Invoke(req)
assert.NoError(t, err)
assert.Equal(t, *payload.ElementInstanceKey, mc.cmd1.elementInstanceKey)
assert.Equal(t, payload.Variables, mc.cmd1.cmd2.variables)
assert.Equal(t, true, mc.cmd1.cmd2.cmd3.local)
})
}

View File

@ -0,0 +1,56 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
package command
import (
"context"
"encoding/json"
"errors"
"fmt"
"github.com/dapr/components-contrib/bindings"
)
var (
ErrMissingErrorCode = errors.New("errorCode is a required attribute")
)
type throwErrorPayload struct {
JobKey *int64 `json:"jobKey"`
ErrorCode string `json:"errorCode"`
ErrorMessage string `json:"errorMessage"`
}
func (z *ZeebeCommand) throwError(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
var payload throwErrorPayload
err := json.Unmarshal(req.Data, &payload)
if err != nil {
return nil, err
}
if payload.JobKey == nil {
return nil, ErrMissingJobKey
}
if payload.ErrorCode == "" {
return nil, ErrMissingErrorCode
}
cmd := z.client.NewThrowErrorCommand().
JobKey(*payload.JobKey).
ErrorCode(payload.ErrorCode)
if payload.ErrorMessage != "" {
cmd = cmd.ErrorMessage(payload.ErrorMessage)
}
_, err = cmd.Send(context.Background())
if err != nil {
return nil, fmt.Errorf("cannot throw error for job key %d: %w", payload.JobKey, err)
}
return &bindings.InvokeResponse{}, nil
}

View File

@ -0,0 +1,120 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
package command
import (
"context"
"encoding/json"
"testing"
"github.com/stretchr/testify/assert"
"github.com/zeebe-io/zeebe/clients/go/pkg/commands"
"github.com/zeebe-io/zeebe/clients/go/pkg/pb"
"github.com/zeebe-io/zeebe/clients/go/pkg/zbc"
"github.com/dapr/components-contrib/bindings"
"github.com/dapr/kit/logger"
)
type mockThrowErrorClient struct {
zbc.Client
cmd1 *mockThrowErrorCommandStep1
}
type mockThrowErrorCommandStep1 struct {
commands.ThrowErrorCommandStep1
cmd2 *mockThrowErrorCommandStep2
jobKey int64
}
type mockThrowErrorCommandStep2 struct {
commands.ThrowErrorCommandStep2
cmd3 *mockDispatchThrowErrorCommand
errorCode string
}
type mockDispatchThrowErrorCommand struct {
commands.DispatchThrowErrorCommand
errorMessage string
}
func (mc *mockThrowErrorClient) NewThrowErrorCommand() commands.ThrowErrorCommandStep1 {
mc.cmd1 = &mockThrowErrorCommandStep1{
cmd2: &mockThrowErrorCommandStep2{
cmd3: &mockDispatchThrowErrorCommand{},
},
}
return mc.cmd1
}
func (cmd1 *mockThrowErrorCommandStep1) JobKey(jobKey int64) commands.ThrowErrorCommandStep2 {
cmd1.jobKey = jobKey
return cmd1.cmd2
}
func (cmd2 *mockThrowErrorCommandStep2) ErrorCode(errorCode string) commands.DispatchThrowErrorCommand {
cmd2.errorCode = errorCode
return cmd2.cmd3
}
func (cmd3 *mockDispatchThrowErrorCommand) ErrorMessage(errorMessage string) commands.DispatchThrowErrorCommand {
cmd3.errorMessage = errorMessage
return cmd3
}
func (cmd3 *mockDispatchThrowErrorCommand) Send(context.Context) (*pb.ThrowErrorResponse, error) {
return &pb.ThrowErrorResponse{}, nil
}
func TestThrowError(t *testing.T) {
testLogger := logger.NewLogger("test")
t.Run("jobKey is mandatory", func(t *testing.T) {
message := ZeebeCommand{logger: testLogger}
req := &bindings.InvokeRequest{Operation: throwErrorOperation}
_, err := message.Invoke(req)
assert.Error(t, err, ErrMissingJobKey)
})
t.Run("errorCode is mandatory", func(t *testing.T) {
payload := throwErrorPayload{
JobKey: new(int64),
}
data, err := json.Marshal(payload)
assert.NoError(t, err)
message := ZeebeCommand{logger: testLogger}
req := &bindings.InvokeRequest{Data: data, Operation: throwErrorOperation}
_, err = message.Invoke(req)
assert.Error(t, err, ErrMissingErrorCode)
})
t.Run("throw an error", func(t *testing.T) {
payload := throwErrorPayload{
JobKey: new(int64),
ErrorCode: "a",
ErrorMessage: "b",
}
data, err := json.Marshal(payload)
assert.NoError(t, err)
req := &bindings.InvokeRequest{Data: data, Operation: throwErrorOperation}
var mc mockThrowErrorClient
message := ZeebeCommand{logger: testLogger, client: &mc}
_, err = message.Invoke(req)
assert.NoError(t, err)
assert.Equal(t, *payload.JobKey, mc.cmd1.jobKey)
assert.Equal(t, payload.ErrorCode, mc.cmd1.cmd2.errorCode)
assert.Equal(t, payload.ErrorMessage, mc.cmd1.cmd2.cmd3.errorMessage)
})
}

View File

@ -0,0 +1,30 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
package command
import (
"context"
"encoding/json"
"fmt"
"github.com/dapr/components-contrib/bindings"
)
func (z *ZeebeCommand) topology() (*bindings.InvokeResponse, error) {
response, err := z.client.NewTopologyCommand().Send(context.Background())
if err != nil {
return nil, fmt.Errorf("cannot get zeebe toplogy: %w", err)
}
jsonResponse, err := json.Marshal(response)
if err != nil {
return nil, fmt.Errorf("cannot marshal response to json: %w", err)
}
return &bindings.InvokeResponse{
Data: jsonResponse,
}, nil
}

View File

@ -0,0 +1,46 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
package command
import (
"context"
"encoding/json"
"fmt"
"github.com/zeebe-io/zeebe/clients/go/pkg/commands"
"github.com/dapr/components-contrib/bindings"
)
type updateJobRetriesPayload struct {
JobKey *int64 `json:"jobKey"`
Retries *int32 `json:"retries"`
}
func (z *ZeebeCommand) updateJobRetries(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
var payload updateJobRetriesPayload
err := json.Unmarshal(req.Data, &payload)
if err != nil {
return nil, err
}
if payload.JobKey == nil {
return nil, ErrMissingJobKey
}
cmd1 := z.client.NewUpdateJobRetriesCommand().JobKey(*payload.JobKey)
var cmd2 commands.DispatchUpdateJobRetriesCommand = cmd1
if payload.Retries != nil {
cmd2 = cmd1.Retries(*payload.Retries)
}
_, err = cmd2.Send(context.Background())
if err != nil {
return nil, fmt.Errorf("cannot uodate job retries for key %d: %w", payload.JobKey, err)
}
return &bindings.InvokeResponse{}, nil
}

View File

@ -0,0 +1,91 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
package command
import (
"context"
"encoding/json"
"testing"
"github.com/stretchr/testify/assert"
"github.com/zeebe-io/zeebe/clients/go/pkg/commands"
"github.com/zeebe-io/zeebe/clients/go/pkg/pb"
"github.com/zeebe-io/zeebe/clients/go/pkg/zbc"
"github.com/dapr/components-contrib/bindings"
"github.com/dapr/kit/logger"
)
type mockUpdateJobRetriesClient struct {
zbc.Client
cmd1 *mockUpdateJobRetriesCommandStep1
}
type mockUpdateJobRetriesCommandStep1 struct {
commands.UpdateJobRetriesCommandStep1
cmd2 *mockUpdateJobRetriesCommandStep2
jobKey int64
}
type mockUpdateJobRetriesCommandStep2 struct {
commands.UpdateJobRetriesCommandStep2
retries int32
}
func (mc *mockUpdateJobRetriesClient) NewUpdateJobRetriesCommand() commands.UpdateJobRetriesCommandStep1 {
mc.cmd1 = &mockUpdateJobRetriesCommandStep1{
cmd2: &mockUpdateJobRetriesCommandStep2{},
}
return mc.cmd1
}
func (cmd1 *mockUpdateJobRetriesCommandStep1) JobKey(jobKey int64) commands.UpdateJobRetriesCommandStep2 {
cmd1.jobKey = jobKey
return cmd1.cmd2
}
func (cmd2 *mockUpdateJobRetriesCommandStep2) Retries(retries int32) commands.DispatchUpdateJobRetriesCommand {
cmd2.retries = retries
return cmd2
}
func (cmd2 *mockUpdateJobRetriesCommandStep2) Send(context.Context) (*pb.UpdateJobRetriesResponse, error) {
return &pb.UpdateJobRetriesResponse{}, nil
}
func TestUpdateJobRetries(t *testing.T) {
testLogger := logger.NewLogger("test")
t.Run("jobKey is mandatory", func(t *testing.T) {
message := ZeebeCommand{logger: testLogger}
req := &bindings.InvokeRequest{Operation: updateJobRetriesOperation}
_, err := message.Invoke(req)
assert.Error(t, err, ErrMissingJobKey)
})
t.Run("update job retries", func(t *testing.T) {
payload := updateJobRetriesPayload{
JobKey: new(int64),
Retries: new(int32),
}
data, err := json.Marshal(payload)
assert.NoError(t, err)
req := &bindings.InvokeRequest{Data: data, Operation: updateJobRetriesOperation}
var mc mockUpdateJobRetriesClient
message := ZeebeCommand{logger: testLogger, client: &mc}
_, err = message.Invoke(req)
assert.NoError(t, err)
assert.Equal(t, *payload.JobKey, mc.cmd1.jobKey)
assert.Equal(t, *payload.Retries, mc.cmd1.cmd2.retries)
})
}

9
bindings/zeebe/helper.go Normal file
View File

@ -0,0 +1,9 @@
package zeebe
import (
"strings"
)
func VariableStringToArray(variableString string) []string {
return strings.Split(strings.ReplaceAll(variableString, " ", ""), ",")
}

View File

@ -0,0 +1,21 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
package zeebe
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestVariableStringToArrayRemovesSpaces(t *testing.T) {
vars := VariableStringToArray(" a, b, c ")
require.Equal(t, 3, len(vars))
assert.Equal(t, "a", vars[0])
assert.Equal(t, "b", vars[1])
assert.Equal(t, "c", vars[2])
}

View File

@ -0,0 +1,186 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
package jobworker
import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"os/signal"
"syscall"
"github.com/zeebe-io/zeebe/clients/go/pkg/entities"
"github.com/zeebe-io/zeebe/clients/go/pkg/worker"
"github.com/zeebe-io/zeebe/clients/go/pkg/zbc"
"github.com/dapr/components-contrib/bindings"
"github.com/dapr/components-contrib/bindings/zeebe"
"github.com/dapr/components-contrib/metadata"
"github.com/dapr/kit/logger"
)
var (
ErrMissingJobType = errors.New("jobType is a required attribute")
)
// ZeebeJobWorker allows handling jobs from the Zeebe command engine
type ZeebeJobWorker struct {
clientFactory zeebe.ClientFactory
client zbc.Client
metadata *jobWorkerMetadata
logger logger.Logger
}
// https://docs.zeebe.io/basics/job-workers.html
type jobWorkerMetadata struct {
WorkerName string `json:"workerName"`
WorkerTimeout metadata.Duration `json:"workerTimeout"`
RequestTimeout metadata.Duration `json:"requestTimeout"`
JobType string `json:"jobType"`
MaxJobsActive int `json:"maxJobsActive,string"`
Concurrency int `json:"concurrency,string"`
PollInterval metadata.Duration `json:"pollInterval"`
PollThreshold float64 `json:"pollThreshold,string"`
FetchVariables string `json:"fetchVariables"`
}
type jobHandler struct {
callback func(*bindings.ReadResponse) ([]byte, error)
logger logger.Logger
}
// NewZeebeJobWorker returns a new ZeebeJobWorker instance
func NewZeebeJobWorker(logger logger.Logger) *ZeebeJobWorker {
return &ZeebeJobWorker{clientFactory: zeebe.NewClientFactoryImpl(logger), logger: logger}
}
// Init does metadata parsing and connection creation
func (z *ZeebeJobWorker) Init(metadata bindings.Metadata) error {
meta, err := z.parseMetadata(metadata)
if err != nil {
return err
}
if meta.JobType == "" {
return ErrMissingJobType
}
client, err := z.clientFactory.Get(metadata)
if err != nil {
return err
}
z.metadata = meta
z.client = client
return nil
}
func (z *ZeebeJobWorker) Read(handler func(*bindings.ReadResponse) ([]byte, error)) error {
h := jobHandler{
callback: handler,
logger: z.logger,
}
jobWorker := z.getJobWorker(h)
exitChan := make(chan os.Signal, 1)
signal.Notify(exitChan, os.Interrupt, syscall.SIGTERM)
<-exitChan
jobWorker.Close()
jobWorker.AwaitClose()
return z.client.Close()
}
func (z *ZeebeJobWorker) parseMetadata(metadata bindings.Metadata) (*jobWorkerMetadata, error) {
b, err := json.Marshal(metadata.Properties)
if err != nil {
return nil, err
}
var m jobWorkerMetadata
err = json.Unmarshal(b, &m)
if err != nil {
return nil, err
}
return &m, nil
}
func (z *ZeebeJobWorker) getJobWorker(handler jobHandler) worker.JobWorker {
return z.client.
NewJobWorker().
JobType(z.metadata.JobType).
Handler(handler.handleJob).
Name(z.metadata.WorkerName).
Timeout(z.metadata.WorkerTimeout.Duration).
RequestTimeout(z.metadata.RequestTimeout.Duration).
MaxJobsActive(z.metadata.MaxJobsActive).
Concurrency(z.metadata.Concurrency).
PollInterval(z.metadata.PollInterval.Duration).
PollThreshold(z.metadata.PollThreshold).
FetchVariables(zeebe.VariableStringToArray(z.metadata.FetchVariables)...).
Open()
}
func (h *jobHandler) handleJob(client worker.JobClient, job entities.Job) {
headers, err := job.GetCustomHeadersAsMap()
if err != nil {
h.failJob(client, job, err)
return
}
resultVariables, err := h.callback(&bindings.ReadResponse{
Data: []byte(job.Variables),
Metadata: headers,
})
if err != nil {
h.failJob(client, job, err)
return
}
variablesMap := make(map[string]interface{})
err = json.Unmarshal(resultVariables, &variablesMap)
if err != nil {
h.failJob(client, job, fmt.Errorf("cannot parse variables from binding result %s; got error %w", string(resultVariables), err))
return
}
jobKey := job.GetKey()
request, err := client.NewCompleteJobCommand().JobKey(jobKey).VariablesFromMap(variablesMap)
if err != nil {
h.failJob(client, job, err)
return
}
h.logger.Debugf("Complete job %s of type %s", jobKey, job.Type)
ctx := context.Background()
_, err = request.Send(ctx)
if err != nil {
panic(err)
}
h.logger.Debug("Successfully completed job")
}
func (h *jobHandler) failJob(client worker.JobClient, job entities.Job, reason error) {
h.logger.Errorf("Failed to complete job `%s` reason: %w", job.GetKey(), reason)
ctx := context.Background()
_, err := client.NewFailJobCommand().JobKey(job.GetKey()).Retries(job.Retries - 1).Send(ctx)
if err != nil {
panic(err)
}
}

View File

@ -0,0 +1,103 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
package jobworker
import (
"errors"
"testing"
"github.com/stretchr/testify/assert"
"github.com/zeebe-io/zeebe/clients/go/pkg/zbc"
"github.com/dapr/components-contrib/bindings"
"github.com/dapr/components-contrib/bindings/zeebe"
"github.com/dapr/kit/logger"
)
type mockClientFactory struct {
zeebe.ClientFactory
metadata bindings.Metadata
error error
}
type mockClient struct {
zbc.Client
}
func (mcf mockClientFactory) Get(metadata bindings.Metadata) (zbc.Client, error) {
mcf.metadata = metadata
if mcf.error != nil {
return nil, mcf.error
}
return mockClient{}, nil
}
func TestInit(t *testing.T) {
testLogger := logger.NewLogger("test")
t.Run("jobType is mandatory", func(t *testing.T) {
metadata := bindings.Metadata{}
var mcf mockClientFactory
jobWorker := ZeebeJobWorker{clientFactory: &mcf, logger: testLogger}
err := jobWorker.Init(metadata)
assert.Error(t, err, ErrMissingJobType)
})
t.Run("sets client from client factory", func(t *testing.T) {
metadata := bindings.Metadata{
Properties: map[string]string{"jobType": "a"},
}
mcf := mockClientFactory{
metadata: metadata,
}
jobWorker := ZeebeJobWorker{clientFactory: mcf, logger: testLogger}
err := jobWorker.Init(metadata)
assert.NoError(t, err)
mc, err := mcf.Get(metadata)
assert.NoError(t, err)
assert.Equal(t, mc, jobWorker.client)
assert.Equal(t, metadata, mcf.metadata)
})
t.Run("returns error if client could not be instantiated properly", func(t *testing.T) {
errParsing := errors.New("error on parsing metadata")
metadata := bindings.Metadata{}
mcf := mockClientFactory{
error: errParsing,
}
jobWorker := ZeebeJobWorker{clientFactory: mcf, logger: testLogger}
err := jobWorker.Init(metadata)
assert.Error(t, err, errParsing)
})
t.Run("sets client from client factory", func(t *testing.T) {
metadata := bindings.Metadata{
Properties: map[string]string{"jobType": "a"},
}
mcf := mockClientFactory{
metadata: metadata,
}
jobWorker := ZeebeJobWorker{clientFactory: mcf, logger: testLogger}
err := jobWorker.Init(metadata)
assert.NoError(t, err)
mc, err := mcf.Get(metadata)
assert.NoError(t, err)
assert.Equal(t, mc, jobWorker.client)
assert.Equal(t, metadata, mcf.metadata)
})
}

10
go.mod
View File

@ -68,7 +68,7 @@ require (
github.com/mitchellh/mapstructure v1.3.3
github.com/moul/http2curl v1.0.0 // indirect
github.com/nats-io/nats-server/v2 v2.2.1 // indirect
github.com/nats-io/nats-streaming-server v0.21.1 // indirect
github.com/nats-io/nats-streaming-server v0.21.2 // indirect
github.com/nats-io/nats.go v1.10.1-0.20210330225420-a0b1f60162f8
github.com/nats-io/stan.go v0.8.3
github.com/open-policy-agent/opa v0.23.2
@ -83,17 +83,18 @@ require (
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271
github.com/stretchr/testify v1.7.0
github.com/tidwall/pretty v1.1.0 // indirect
github.com/valyala/fasthttp v1.19.0
github.com/valyala/fasthttp v1.21.0
github.com/vmware/vmware-go-kcl v0.0.0-20191104173950-b6c74c3fe74e
github.com/xeipuuv/gojsonschema v1.2.0 // indirect
github.com/yalp/jsonpath v0.0.0-20180802001716-5cc68e5049a0 // indirect
github.com/yudai/gojsondiff v1.0.0 // indirect
github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82 // indirect
github.com/zeebe-io/zeebe/clients/go v0.26.3
go.mongodb.org/mongo-driver v1.1.2
goji.io v2.0.2+incompatible // indirect
golang.org/x/crypto v0.0.0-20210314154223-e6e6c4f2bb5b
golang.org/x/net v0.0.0-20210331212208-0fccb6fa2b5c
golang.org/x/oauth2 v0.0.0-20200902213428-5d25da1a8d43
golang.org/x/oauth2 v0.0.0-20201208152858-08078c50e5b5
google.golang.org/api v0.32.0
google.golang.org/genproto v0.0.0-20201204160425-06b3db808446
google.golang.org/grpc v1.34.0
@ -107,8 +108,7 @@ require (
gopkg.in/gorethink/gorethink.v4 v4.1.0 // indirect
gopkg.in/jcmturner/goidentity.v3 v3.0.0 // indirect
gopkg.in/kataras/go-serializer.v0 v0.0.4 // indirect
gopkg.in/yaml.v2 v2.3.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776
k8s.io/api v0.20.0
k8s.io/apiextensions-apiserver v0.20.0
k8s.io/apimachinery v0.20.0

56
go.sum
View File

@ -111,6 +111,9 @@ github.com/DATA-DOG/go-sqlmock v1.5.0/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q
github.com/DataDog/datadog-go v2.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
github.com/DataDog/zstd v1.3.6-0.20190409195224-796139022798 h1:2T/jmrHeTezcCM58lvEQXs0UpQJCo5SoGAcg+mbSTIg=
github.com/DataDog/zstd v1.3.6-0.20190409195224-796139022798/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
github.com/Microsoft/go-winio v0.4.11/go.mod h1:VhR8bwka0BXejwEJY73c50VrPtXAaKcyvVC4A4RozmA=
github.com/Microsoft/go-winio v0.4.14/go.mod h1:qXqCSQ3Xa7+6tgxaGTIe4Kpcdsi+P8jBhyzoq1bpyYA=
github.com/Microsoft/hcsshim v0.8.6/go.mod h1:Op3hHsoHPAvb6lceZHDtd9OkTew38wNoXnJs8iY7rUg=
github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/OneOfOne/xxhash v1.2.7 h1:fzrmmkskv067ZQbd9wERNGuxckWw67dyzoMG62p7LMo=
@ -151,6 +154,8 @@ github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 h1:EFSB7Zo9Eg91v7
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878/go.mod h1:3AMJUQhVx52RsWOnlkpikZr01T/yAVN2gn0861vByNg=
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
github.com/asaskevich/govalidator v0.0.0-20200108200545-475eaeb16496 h1:zV3ejI06GQ59hwDQAvmK1qxOQGB3WuVTRoY0okPTAv0=
github.com/asaskevich/govalidator v0.0.0-20200108200545-475eaeb16496/go.mod h1:oGkLhpf+kjZl6xBf758TQhh5XrAeiJv/7FRz/2spLIg=
github.com/aws/aws-sdk-go v1.19.38/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/aws/aws-sdk-go v1.27.0 h1:0xphMHGMLBrPMfxR2AmVjZKcMEESEgWF8Kru94BNByk=
github.com/aws/aws-sdk-go v1.27.0/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
@ -175,8 +180,9 @@ github.com/boltdb/bolt v1.3.1/go.mod h1:clJnj/oiGkjum5o1McbSZDSLxVThjynRyGBgiAx2
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b h1:L/QXpzIa3pOvUGt1D1lA5KjYhPBAN/3iWdP7xeFS9F0=
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b/go.mod h1:H0wQNHz2YrLsuXOZozoeDmnHXkNCRmMW0gwFWDfEZDA=
github.com/cenkalti/backoff v2.0.0+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/cenkalti/backoff v2.1.1+incompatible h1:tKJnvO2kl0zmb/jA5UKAt4VoEVw1qxKWjE/Bpp46npY=
github.com/cenkalti/backoff v2.1.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/cenkalti/backoff/v4 v4.1.0 h1:c8LkOFQTzuO0WBM/ae5HdGQuZPfPxp7lqBRwQRm4fSc=
github.com/cenkalti/backoff/v4 v4.1.0/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
@ -195,6 +201,9 @@ github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354/go.mod h1:WmhPx2Nbnht
github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I=
github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ=
github.com/cockroachdb/datadriven v0.0.0-20190809214429-80d97fb3cbaa/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8=
github.com/containerd/containerd v1.4.1/go.mod h1:bC6axHOhabU15QhwfG7w5PipXdVtMXFTttgp+kVtyUA=
github.com/containerd/continuity v0.0.0-20190426062206-aaeac12a7ffc/go.mod h1:GL3xCUCBDV3CZiTSEKksMWbLE66hEyuu9qyDOOqM47Y=
github.com/containerd/continuity v0.0.0-20190827140505-75bee3e2ccb6/go.mod h1:GL3xCUCBDV3CZiTSEKksMWbLE66hEyuu9qyDOOqM47Y=
github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-oidc v2.1.0+incompatible h1:sdJrfw8akMnCuUlaZU3tE/uYXFgfqom8DBE9so9EBsM=
@ -241,6 +250,12 @@ github.com/dimchansky/utfbom v1.1.0 h1:FcM3g+nofKgUteL8dm/UpdRXNC9KmADgTpLKsu0TR
github.com/dimchansky/utfbom v1.1.0/go.mod h1:rO41eb7gLfo8SF1jd9F8HplJm1Fewwi4mQvIirEdv+8=
github.com/dnaeon/go-vcr v1.1.0 h1:ReYa/UBrRyQdant9B4fNHGoCNKw6qh6P0fsdGmZpR7c=
github.com/dnaeon/go-vcr v1.1.0/go.mod h1:M7tiix8f0r6mKKJ3Yq/kqU1OYf3MnfmBWVbPx/yU9ko=
github.com/docker/distribution v2.7.1-0.20190205005809-0d3efadf0154+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w=
github.com/docker/distribution v2.7.1+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w=
github.com/docker/docker v17.12.0-ce-rc1.0.20200916142827-bd33bbf0497b+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk=
github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec=
github.com/docker/go-units v0.3.3/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/docker/spdystream v0.0.0-20160310174837-449fdfce4d96/go.mod h1:Qh8CwZgvJUkLughtfhJv5dyTYa91l1fOUCrgjqmcifM=
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
@ -304,6 +319,8 @@ github.com/go-openapi/jsonreference v0.19.3/go.mod h1:rjx6GuL8TTa9VaixXglHmQmIL9
github.com/go-openapi/spec v0.19.3/go.mod h1:FpwSN1ksY1eteniUU7X0N/BgJ7a4WvBFVA8Lj9mJglo=
github.com/go-openapi/swag v0.19.2/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk=
github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk=
github.com/go-ozzo/ozzo-validation/v4 v4.3.0 h1:byhDUpfEwjsVQb1vBunvIjh2BHQ9ead57VkAEY4V+Es=
github.com/go-ozzo/ozzo-validation/v4 v4.3.0/go.mod h1:2NKgrcHl3z6cJs+3Oo940FPRiTzuqKbvfrL2RxCj6Ew=
github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
github.com/go-playground/locales v0.13.0 h1:HyWk6mgj5qFqCT5fjGBuRArbVDfE4hi8+e8ceBS/t7Q=
github.com/go-playground/locales v0.13.0/go.mod h1:taPMhCMXrRLJO55olJkUXHZBHCxTMfnGwq/HNwmWNS8=
@ -311,6 +328,8 @@ github.com/go-playground/universal-translator v0.17.0 h1:icxd5fm+REJzpZx7ZfpaD87
github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+Scu5vgOQjsIJAF8j9muTVoKLVtA=
github.com/go-playground/validator/v10 v10.2.0 h1:KgJ0snyC2R9VXYN2rneOtQcw5aHQB1Vv0sFl1UcHBOY=
github.com/go-playground/validator/v10 v10.2.0/go.mod h1:uOYAAleCW8F/7oMFd6aG0GOhaH6EGOAJShg8Id5JGkI=
github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg=
github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/go-redis/redis/v7 v7.0.1 h1:AVkqXtvak6eXAvqIA+0rDlh6St/M7/vaf67NEqPhP2w=
github.com/go-redis/redis/v7 v7.0.1/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg=
github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs=
@ -330,6 +349,7 @@ github.com/gocql/gocql v0.0.0-20191018090344-07ace3bab0f8/go.mod h1:DL0ekTmBSTdl
github.com/gofrs/uuid v3.2.0+incompatible h1:y12jRkkFxsd7GpqdSZ+/KCs/fJbqpEXSGd4+jfEaewE=
github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/gogo/protobuf v1.3.0/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
@ -413,9 +433,11 @@ github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5m
github.com/googleapis/gnostic v0.4.1 h1:DLJCy1n/vrD4HPjOvYcT8aYQXpPIzoRZONaYwyycI+I=
github.com/googleapis/gnostic v0.4.1/go.mod h1:LRhVm6pbyptWbWbuZ38d1eyptfvIytN3ir6b65WBswg=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
github.com/gorilla/css v1.0.0 h1:BQqNyPTi50JCFMTw/b67hByjMVXZRwGha6wxVGkeihY=
github.com/gorilla/css v1.0.0/go.mod h1:Dn721qIggHpt4+EFCcTLTU/vk5ySda2ReITrtgBl60c=
github.com/gorilla/mux v0.0.0-20181024020800-521ea7b17d02/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/mux v1.7.3 h1:gnP5JzjVOuiZD07fKKToCAOjS0yOpj/qPETTXCCS6hw=
github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
@ -648,6 +670,7 @@ github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lN
github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modocache/gover v0.0.0-20171022184752-b58185e213c5/go.mod h1:caMODM3PzxT8aQXRPkAt8xlV/e7d7w8GM5g0fa5F0D8=
github.com/morikuni/aec v0.0.0-20170113033406-39771216ff4c/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc=
github.com/moul/http2curl v1.0.0 h1:dRMWoAtb+ePxMlLkrCbAqh4TlPHXvoGUSQ323/9Zahs=
github.com/moul/http2curl v1.0.0/go.mod h1:8UbvGypXm98wA/IqH45anm5Y2Z6ep6O31QGOAZ3H0fQ=
github.com/munnerz/goautoneg v0.0.0-20120707110453-a547fc61f48d/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
@ -675,8 +698,8 @@ github.com/nats-io/nats-server/v2 v2.1.9/go.mod h1:9qVyoewoYXzG1ME9ox0HwkkzyYvnl
github.com/nats-io/nats-server/v2 v2.2.1-0.20210330155036-61cbd74e213d/go.mod h1:eKlAaGmSQHZMFQA6x56AaP5/Bl9N3mWF4awyT2TTpzc=
github.com/nats-io/nats-server/v2 v2.2.1 h1:QaWKih9qAa1kod7xXy0G1ry0AEUGmDEaptaiqzuO1e8=
github.com/nats-io/nats-server/v2 v2.2.1/go.mod h1:A+5EOqdnhH7FvLxtAK6SEDx6hyHriVOwf+FT/eEV99c=
github.com/nats-io/nats-streaming-server v0.21.1 h1:jb/osnXmFJtKDS9DFghDjX82v1NT9IhaoR/r6s6toNg=
github.com/nats-io/nats-streaming-server v0.21.1/go.mod h1:2W8QfNVOtcFpmf0bRiwuLtRb0/hkX4NuOxPOFNOThVQ=
github.com/nats-io/nats-streaming-server v0.21.2 h1:chyaVdWlPdBcSbLq3cpyCYcuXA+7bVXJmM4yWrdqL/8=
github.com/nats-io/nats-streaming-server v0.21.2/go.mod h1:2W8QfNVOtcFpmf0bRiwuLtRb0/hkX4NuOxPOFNOThVQ=
github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE=
github.com/nats-io/nats.go v1.10.1-0.20200531124210-96f2130e4d55/go.mod h1:ARiFsjW9DVxk48WJbO3OSZ2DG8fjkMi7ecLmXoY/n9I=
github.com/nats-io/nats.go v1.10.1-0.20200606002146-fc6fed82929a/go.mod h1:8eAIv96Mo9QW6Or40jUHejS7e4VwZ3VRYD6Sf0BTDp4=
@ -700,14 +723,19 @@ github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:v
github.com/olekukonko/tablewriter v0.0.1/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
github.com/onsi/ginkgo v0.0.0-20170829012221-11459a886d9c/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.11.0 h1:JAKSXpt1YjtLA7YpPiqO9ss6sNXEsPfSGdwN0UHqzrw=
github.com/onsi/ginkgo v1.11.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME=
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/open-policy-agent/opa v0.23.2 h1:co9fPjnLPwnvaEThBJjCb5E2iAyvW95Qq2PvSOEIwGE=
github.com/open-policy-agent/opa v0.23.2/go.mod h1:rrwxoT/b011T0cyj+gg2VvxqTtn6N3gp/jzmr3fjW44=
github.com/opencontainers/go-digest v1.0.0-rc1/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s=
github.com/opencontainers/image-spec v1.0.1/go.mod h1:BtxoFyWECRxE4U/7sNtV5W15zMzWCbyJoFRP3s7yZA0=
github.com/opencontainers/runc v0.1.1/go.mod h1:qT5XzbpPznkRYVz/mWwUaVBUv2rmF59PVA73FjuZG0U=
github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs=
github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc=
@ -836,6 +864,7 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/testcontainers/testcontainers-go v0.9.0/go.mod h1:b22BFXhRbg4PJmeMVWh6ftqjyZHgiIl3w274e9r3C2E=
github.com/tidwall/pretty v1.1.0 h1:K3hMW5epkdAVwibsQEfR/7Zj0Qgt4DxtNumTq/VloO8=
github.com/tidwall/pretty v1.1.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
@ -848,8 +877,8 @@ github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLY
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.19.0 h1:PfTS4PeH3xDr3WomrDS2ID8lU2GskK1xS3YG6gIpibU=
github.com/valyala/fasthttp v1.19.0/go.mod h1:jjraHZVbKOXftJfsOYoAjaeygpj5hr8ermTRJNroD7A=
github.com/valyala/fasthttp v1.21.0 h1:fJjaQ7cXdaSF9vDBujlHLDGj7AgoMTMIXvICeePzYbU=
github.com/valyala/fasthttp v1.21.0/go.mod h1:jjraHZVbKOXftJfsOYoAjaeygpj5hr8ermTRJNroD7A=
github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8=
github.com/valyala/fasttemplate v1.1.0 h1:RZqt0yGBsps8NGvLSGW804QQqCUYYLsaOjTVHy1Ocw4=
github.com/valyala/fasttemplate v1.1.0/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8=
@ -881,6 +910,8 @@ github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/gopher-lua v0.0.0-20191220021717-ab39c6098bdb h1:ZkM6LRnq40pR1Ox0hTHlnpkcOTuFIDQpZ1IN8rKKhX0=
github.com/yuin/gopher-lua v0.0.0-20191220021717-ab39c6098bdb/go.mod h1:gqRgreBUhTSL0GeU64rtZ3Uq3wtjOa/TB2YfrtkCbVQ=
github.com/zeebe-io/zeebe/clients/go v0.26.3 h1:VAESg/b9bOz5lcyd3/RHpSrFWlad5/kppNrZApKwb0g=
github.com/zeebe-io/zeebe/clients/go v0.26.3/go.mod h1:XfnxtvL1qYlSymFFb7/ZI3KpeklhsjVCiWpplkzTLII=
github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
@ -1004,6 +1035,7 @@ golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81R
golang.org/x/net v0.0.0-20201016165138-7b1cca2348c0/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201209123823-ac852fbbde11/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210331212208-0fccb6fa2b5c h1:KHUzaHIpjWVlVVNh65G3hhuj3KB1HnjY6Cq5cTvRQT8=
golang.org/x/net v0.0.0-20210331212208-0fccb6fa2b5c/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
@ -1012,8 +1044,9 @@ golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4Iltr
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20191202225959-858c2ad4c8b6/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20200902213428-5d25da1a8d43 h1:ld7aEMNHoBnnDAX15v1T6z31v8HwR2A9FYOuAhWqkwc=
golang.org/x/oauth2 v0.0.0-20200902213428-5d25da1a8d43/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20201208152858-08078c50e5b5 h1:Lm4OryKCca1vehdsWogr9N4t7NfZxLbJoc/H0w4K4S4=
golang.org/x/oauth2 v0.0.0-20201208152858-08078c50e5b5/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@ -1102,6 +1135,8 @@ golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1/go.mod h1:tRJNPiyCQ0inRvYxb
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e h1:EHBhcS0mlXEAVwNyO2dLfjToGsyY4j24pTs2ScHnX7s=
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180810170437-e96c4e24768d/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
@ -1225,6 +1260,7 @@ google.golang.org/genproto v0.0.0-20201110150050-8816d57aaa9a/go.mod h1:FWY/as6D
google.golang.org/genproto v0.0.0-20201204160425-06b3db808446 h1:65ppmIPdaZE+BO34gntwqexoTYr30IRNGmS0OGOHu3A=
google.golang.org/genproto v0.0.0-20201204160425-06b3db808446/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
@ -1308,12 +1344,16 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 h1:tQIYjPdBoyREyB9XMu+nnTclpTYkz2zFM+lzLJFO4gQ=
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools v0.0.0-20181223230014-1083505acf35/go.mod h1:R//lfYlUuTOTfblYI3lGoAAAebUdzjvbmQsuB7Ykd90=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk=
honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

44
metadata/duration.go Normal file
View File

@ -0,0 +1,44 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
package metadata
// JSON marshaling and unmarshaling methods for time.Duration based on https://stackoverflow.com/a/48051946
import (
"encoding/json"
"errors"
"time"
)
type Duration struct {
time.Duration
}
func (d Duration) MarshalJSON() ([]byte, error) {
return json.Marshal(d.String())
}
func (d *Duration) UnmarshalJSON(b []byte) error {
var v interface{}
if err := json.Unmarshal(b, &v); err != nil {
return err
}
switch value := v.(type) {
case float64:
d.Duration = time.Duration(value)
return nil
case string:
var err error
d.Duration, err = time.ParseDuration(value)
if err != nil {
return err
}
return nil
default:
return errors.New("invalid duration")
}
}