Initial creation of workflows building block
Signed-off-by: Ryan Lettieri <ryanLettieri@microsoft.com>
This commit is contained in:
parent
e639685e83
commit
7233fefcb3
39
.github/infrastructure/conformance/temporal/server/dynamicconfig/README.md
vendored
Executable file
39
.github/infrastructure/conformance/temporal/server/dynamicconfig/README.md
vendored
Executable file
|
@ -0,0 +1,39 @@
|
|||
Use `docker.yaml` file to override the default dynamic config value (they are specified
|
||||
when creating the service config).
|
||||
|
||||
Each key can have zero or more values and each value can have zero or more
|
||||
constraints. There are only three types of constraint:
|
||||
1. `namespace`: `string`
|
||||
2. `taskQueueName`: `string`
|
||||
3. `taskType`: `int` (`1`:`Workflow`, `2`:`Activity`)
|
||||
A value will be selected and returned if all its has exactly the same constraints
|
||||
as the ones specified in query filters (including the number of constraints).
|
||||
|
||||
Please use the following format:
|
||||
```
|
||||
testGetBoolPropertyKey:
|
||||
- value: false
|
||||
- value: true
|
||||
constraints:
|
||||
namespace: "global-samples-namespace"
|
||||
- value: false
|
||||
constraints:
|
||||
namespace: "samples-namespace"
|
||||
testGetDurationPropertyKey:
|
||||
- value: "1m"
|
||||
constraints:
|
||||
namespace: "samples-namespace"
|
||||
taskQueueName: "longIdleTimeTaskqueue"
|
||||
testGetFloat64PropertyKey:
|
||||
- value: 12.0
|
||||
constraints:
|
||||
namespace: "samples-namespace"
|
||||
testGetMapPropertyKey:
|
||||
- value:
|
||||
key1: 1
|
||||
key2: "value 2"
|
||||
key3:
|
||||
- false
|
||||
- key4: true
|
||||
key5: 2.0
|
||||
```
|
3
.github/infrastructure/conformance/temporal/server/dynamicconfig/development-cass.yaml
vendored
Executable file
3
.github/infrastructure/conformance/temporal/server/dynamicconfig/development-cass.yaml
vendored
Executable file
|
@ -0,0 +1,3 @@
|
|||
system.forceSearchAttributesCacheRefreshOnRead:
|
||||
- value: true # Dev setup only. Please don't turn this on in production.
|
||||
constraints: {}
|
6
.github/infrastructure/conformance/temporal/server/dynamicconfig/development-sql.yaml
vendored
Executable file
6
.github/infrastructure/conformance/temporal/server/dynamicconfig/development-sql.yaml
vendored
Executable file
|
@ -0,0 +1,6 @@
|
|||
limit.maxIDLength:
|
||||
- value: 255
|
||||
constraints: {}
|
||||
system.forceSearchAttributesCacheRefreshOnRead:
|
||||
- value: true # Dev setup only. Please don't turn this on in production.
|
||||
constraints: {}
|
0
.github/infrastructure/conformance/temporal/server/dynamicconfig/docker.yaml
vendored
Executable file
0
.github/infrastructure/conformance/temporal/server/dynamicconfig/docker.yaml
vendored
Executable file
|
@ -0,0 +1,20 @@
|
|||
# The base go-image
|
||||
FROM golang:1.18-alpine
|
||||
|
||||
# Create a directory for the app
|
||||
RUN mkdir /app
|
||||
|
||||
# Copy all files from the current directory to the app directory
|
||||
COPY . /app
|
||||
|
||||
# Set working directory
|
||||
WORKDIR /app
|
||||
|
||||
RUN go get
|
||||
|
||||
# Run command as described:
|
||||
# go build will build an executable file named server in the current directory
|
||||
RUN go build -o server .
|
||||
|
||||
# Run the server executable
|
||||
CMD [ "/app/server" ]
|
|
@ -0,0 +1,32 @@
|
|||
module github/dapr/workflow/worker
|
||||
|
||||
go 1.18
|
||||
|
||||
require go.temporal.io/sdk v1.15.0
|
||||
|
||||
require (
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a // indirect
|
||||
github.com/gogo/googleapis v1.4.1 // indirect
|
||||
github.com/gogo/protobuf v1.3.2 // indirect
|
||||
github.com/gogo/status v1.1.0 // indirect
|
||||
github.com/golang/mock v1.6.0 // indirect
|
||||
github.com/golang/protobuf v1.5.2 // indirect
|
||||
github.com/google/uuid v1.3.0 // indirect
|
||||
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
|
||||
github.com/pborman/uuid v1.2.1 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
github.com/robfig/cron v1.2.0 // indirect
|
||||
github.com/stretchr/objx v0.3.0 // indirect
|
||||
github.com/stretchr/testify v1.7.1 // indirect
|
||||
go.temporal.io/api v1.8.0 // indirect
|
||||
go.uber.org/atomic v1.9.0 // indirect
|
||||
golang.org/x/net v0.0.0-20220531201128-c960675eff93 // indirect
|
||||
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
|
||||
golang.org/x/text v0.3.7 // indirect
|
||||
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect
|
||||
google.golang.org/genproto v0.0.0-20220602131408-e326c6e8e9c8 // indirect
|
||||
google.golang.org/grpc v1.47.0 // indirect
|
||||
google.golang.org/protobuf v1.28.0 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
)
|
|
@ -0,0 +1,88 @@
|
|||
/*
|
||||
Copyright 2021 The Dapr Authors
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"go.temporal.io/sdk/activity"
|
||||
"go.temporal.io/sdk/client"
|
||||
"go.temporal.io/sdk/worker"
|
||||
"go.temporal.io/sdk/workflow"
|
||||
)
|
||||
|
||||
func main() {
|
||||
// Sleep for a bit so the docker container can spin up
|
||||
time.Sleep(30 * time.Second)
|
||||
TaskQueueString := "TestTaskQueue"
|
||||
|
||||
// construct client here
|
||||
cOpt := client.Options{}
|
||||
cOpt.HostPort = "temporal:7233"
|
||||
cOpt.Identity = "TemporalTestClient"
|
||||
// Create the workflow client
|
||||
clientTwo, err := client.Dial(cOpt)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
wOpt := worker.Options{}
|
||||
// Make default options for task q and worker options and workflow options
|
||||
w := worker.New(clientTwo, TaskQueueString, wOpt)
|
||||
|
||||
// Register workflows and activities
|
||||
w.RegisterWorkflow(TestWorkflow)
|
||||
w.RegisterActivity(ExampleActivity)
|
||||
|
||||
err = w.Start()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
w.Run(worker.InterruptCh())
|
||||
}
|
||||
|
||||
func TestWorkflow(ctx workflow.Context, runtimeSeconds int) error {
|
||||
options := workflow.ActivityOptions{
|
||||
TaskQueue: "TestTaskQueue",
|
||||
ScheduleToCloseTimeout: time.Second * 60,
|
||||
ScheduleToStartTimeout: time.Second * 60,
|
||||
StartToCloseTimeout: time.Second * 60,
|
||||
HeartbeatTimeout: time.Second * 5,
|
||||
WaitForCancellation: false,
|
||||
}
|
||||
|
||||
ctx = workflow.WithActivityOptions(ctx, options)
|
||||
newCtx, _ := workflow.NewDisconnectedContext(ctx)
|
||||
err := workflow.ExecuteActivity(newCtx, ExampleActivity, runtimeSeconds).Get(ctx, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func ExampleActivity(ctx context.Context, runtimeSeconds int) error {
|
||||
counter := 0
|
||||
for i := 0; i <= runtimeSeconds; i++ {
|
||||
|
||||
select {
|
||||
case <-time.After(1 * time.Second):
|
||||
counter++
|
||||
activity.RecordHeartbeat(ctx, "")
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
Binary file not shown.
|
@ -0,0 +1,40 @@
|
|||
version: "3.5"
|
||||
services:
|
||||
worker:
|
||||
build: ./conformance/temporal/worker
|
||||
networks:
|
||||
- temporal-network
|
||||
depends_on:
|
||||
- temporal
|
||||
postgresql:
|
||||
container_name: temporal-postgresql
|
||||
environment:
|
||||
POSTGRES_PASSWORD: temporal
|
||||
POSTGRES_USER: temporal
|
||||
image: postgres:13
|
||||
networks:
|
||||
- temporal-network
|
||||
ports:
|
||||
- 5432:5432
|
||||
temporal:
|
||||
container_name: temporal
|
||||
depends_on:
|
||||
- postgresql
|
||||
environment:
|
||||
- DB=postgresql
|
||||
- DB_PORT=5432
|
||||
- POSTGRES_USER=temporal
|
||||
- POSTGRES_PWD=temporal
|
||||
- POSTGRES_SEEDS=postgresql
|
||||
- DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development-sql.yaml
|
||||
image: temporalio/auto-setup:1.17.1
|
||||
networks:
|
||||
- temporal-network
|
||||
ports:
|
||||
- 7233:7233
|
||||
volumes:
|
||||
- ./conformance/temporal/server/dynamicconfig:/etc/temporal/config/dynamicconfig
|
||||
networks:
|
||||
temporal-network:
|
||||
driver: bridge
|
||||
name: temporal-network
|
|
@ -68,6 +68,7 @@ jobs:
|
|||
- state.redis
|
||||
- state.sqlserver
|
||||
- state.cockroachdb
|
||||
- workflows.temporal
|
||||
EOF
|
||||
)
|
||||
echo "::set-output name=pr-components::$PR_COMPONENTS"
|
||||
|
@ -221,6 +222,10 @@ jobs:
|
|||
run: docker-compose -f ./.github/infrastructure/docker-compose-redisjson.yml -p redis up -d
|
||||
if: contains(matrix.component, 'redis')
|
||||
|
||||
- name: Start Temporal
|
||||
run: docker-compose -f ./.github/infrastructure/docker-compose-temporal.yml -p temporal up -d
|
||||
if: contains(matrix.component, 'temporal')
|
||||
|
||||
- name: Start MongoDB
|
||||
uses: supercharge/mongodb-github-action@1.3.0
|
||||
with:
|
||||
|
|
|
@ -0,0 +1,8 @@
|
|||
COMPOSE_PROJECT_NAME=temporal
|
||||
CASSANDRA_VERSION=3.11.9
|
||||
ELASTICSEARCH_VERSION=7.16.2
|
||||
MYSQL_VERSION=8
|
||||
POSTGRESQL_VERSION=13
|
||||
TEMPORAL_VERSION=1.17.1
|
||||
TEMPORAL_WEB_VERSION=1.15.0
|
||||
TEMPORAL_UI_VERSION=2.2.3
|
|
@ -0,0 +1,34 @@
|
|||
version: "3.5"
|
||||
services:
|
||||
postgresql:
|
||||
container_name: temporal-postgresql
|
||||
environment:
|
||||
POSTGRES_PASSWORD: temporal
|
||||
POSTGRES_USER: temporal
|
||||
image: postgres:${POSTGRESQL_VERSION}
|
||||
networks:
|
||||
- temporal-network
|
||||
ports:
|
||||
- 5432:5432
|
||||
temporal:
|
||||
container_name: temporal
|
||||
depends_on:
|
||||
- postgresql
|
||||
environment:
|
||||
- DB=postgresql
|
||||
- DB_PORT=5432
|
||||
- POSTGRES_USER=temporal
|
||||
- POSTGRES_PWD=temporal
|
||||
- POSTGRES_SEEDS=postgresql
|
||||
- DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development-sql.yaml
|
||||
image: temporalio/auto-setup:${TEMPORAL_VERSION}
|
||||
networks:
|
||||
- temporal-network
|
||||
ports:
|
||||
- 7233:7233
|
||||
volumes:
|
||||
- ./dynamicconfig:/etc/temporal/config/dynamicconfig
|
||||
networks:
|
||||
temporal-network:
|
||||
driver: bridge
|
||||
name: temporal-network
|
|
@ -0,0 +1,39 @@
|
|||
Use `docker.yaml` file to override the default dynamic config value (they are specified
|
||||
when creating the service config).
|
||||
|
||||
Each key can have zero or more values and each value can have zero or more
|
||||
constraints. There are only three types of constraint:
|
||||
1. `namespace`: `string`
|
||||
2. `taskQueueName`: `string`
|
||||
3. `taskType`: `int` (`1`:`Workflow`, `2`:`Activity`)
|
||||
A value will be selected and returned if all its has exactly the same constraints
|
||||
as the ones specified in query filters (including the number of constraints).
|
||||
|
||||
Please use the following format:
|
||||
```
|
||||
testGetBoolPropertyKey:
|
||||
- value: false
|
||||
- value: true
|
||||
constraints:
|
||||
namespace: "global-samples-namespace"
|
||||
- value: false
|
||||
constraints:
|
||||
namespace: "samples-namespace"
|
||||
testGetDurationPropertyKey:
|
||||
- value: "1m"
|
||||
constraints:
|
||||
namespace: "samples-namespace"
|
||||
taskQueueName: "longIdleTimeTaskqueue"
|
||||
testGetFloat64PropertyKey:
|
||||
- value: 12.0
|
||||
constraints:
|
||||
namespace: "samples-namespace"
|
||||
testGetMapPropertyKey:
|
||||
- value:
|
||||
key1: 1
|
||||
key2: "value 2"
|
||||
key3:
|
||||
- false
|
||||
- key4: true
|
||||
key5: 2.0
|
||||
```
|
|
@ -0,0 +1,3 @@
|
|||
system.forceSearchAttributesCacheRefreshOnRead:
|
||||
- value: true # Dev setup only. Please don't turn this on in production.
|
||||
constraints: {}
|
|
@ -0,0 +1,6 @@
|
|||
limit.maxIDLength:
|
||||
- value: 255
|
||||
constraints: {}
|
||||
system.forceSearchAttributesCacheRefreshOnRead:
|
||||
- value: true # Dev setup only. Please don't turn this on in production.
|
||||
constraints: {}
|
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
Copyright 2021 The Dapr Authors
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package temporal_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
// "github.com/dapr/components-contrib/workflows"
|
||||
)
|
||||
|
||||
func TestTemporal(t *testing.T) {
|
||||
log := logger.NewLogger("dapr.components")
|
||||
ports, err := dapr_testing.GetFreePorts(2)
|
||||
assert.NoError(t, err)
|
||||
|
||||
currentGrpcPort := ports[0]
|
||||
currentHTTPPort := ports[1]
|
||||
}
|
|
@ -0,0 +1,12 @@
|
|||
apiVersion: dapr.io/v1alpha1
|
||||
kind: Component
|
||||
metadata:
|
||||
name: workflow
|
||||
spec:
|
||||
type: workflows.temporal
|
||||
version: v1
|
||||
metadata:
|
||||
- name: Identity
|
||||
value: TemporalTestClient
|
||||
- name: HostPort
|
||||
value: localhost:7233
|
|
@ -0,0 +1,6 @@
|
|||
# Supported operations: start, get, terminate
|
||||
componentType: workflows
|
||||
components:
|
||||
- component: temporal
|
||||
allOperations: false
|
||||
operations: [ "start", "get", "terminate"]
|
|
@ -33,6 +33,7 @@ import (
|
|||
"github.com/dapr/components-contrib/pubsub"
|
||||
"github.com/dapr/components-contrib/secretstores"
|
||||
"github.com/dapr/components-contrib/state"
|
||||
"github.com/dapr/components-contrib/workflows"
|
||||
"github.com/dapr/kit/logger"
|
||||
|
||||
b_azure_blobstorage "github.com/dapr/components-contrib/bindings/azure/blobstorage"
|
||||
|
@ -77,6 +78,8 @@ import (
|
|||
conf_pubsub "github.com/dapr/components-contrib/tests/conformance/pubsub"
|
||||
conf_secret "github.com/dapr/components-contrib/tests/conformance/secretstores"
|
||||
conf_state "github.com/dapr/components-contrib/tests/conformance/state"
|
||||
conf_workflows "github.com/dapr/components-contrib/tests/conformance/workflows"
|
||||
wf_temporal "github.com/dapr/components-contrib/workflows/temporal"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -343,6 +346,16 @@ func (tc *TestConfiguration) Run(t *testing.T) {
|
|||
break
|
||||
}
|
||||
conf_bindings.ConformanceTests(t, props, inputBinding, outputBinding, bindingsConfig)
|
||||
case "workflows":
|
||||
filepath := fmt.Sprintf("../config/workflows/%s", componentConfigPath)
|
||||
props, err := tc.loadComponentsAndProperties(t, filepath)
|
||||
if err != nil {
|
||||
t.Errorf("error running conformance test for %s: %s", comp.Component, err)
|
||||
break
|
||||
}
|
||||
wf := loadWorkflow(comp)
|
||||
wfConfig := conf_workflows.NewTestConfig(comp.Component, comp.AllOperations, comp.Operations, comp.Config)
|
||||
conf_workflows.ConformanceTests(t, props, wf, wfConfig)
|
||||
default:
|
||||
t.Errorf("unknown component type %s", tc.ComponentType)
|
||||
}
|
||||
|
@ -498,6 +511,19 @@ func loadInputBindings(tc TestComponent) bindings.InputBinding {
|
|||
return binding
|
||||
}
|
||||
|
||||
func loadWorkflow(tc TestComponent) workflows.Workflow {
|
||||
var wf workflows.Workflow
|
||||
|
||||
switch tc.Component {
|
||||
case "temporal":
|
||||
wf = wf_temporal.NewTemporalWorkflow(testLogger)
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
|
||||
return wf
|
||||
}
|
||||
|
||||
func atLeastOne(t *testing.T, predicate func(interface{}) bool, items ...interface{}) {
|
||||
met := false
|
||||
|
||||
|
|
|
@ -0,0 +1,30 @@
|
|||
//go:build conftests
|
||||
// +build conftests
|
||||
|
||||
/*
|
||||
Copyright 2021 The Dapr Authors
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package conformance
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestWorkflowConformance(t *testing.T) {
|
||||
tc, err := NewTestConfiguration("../config/workflows/tests.yml")
|
||||
assert.NoError(t, err)
|
||||
assert.NotNil(t, tc)
|
||||
tc.Run(t)
|
||||
}
|
|
@ -0,0 +1,83 @@
|
|||
/*
|
||||
Copyright 2021 The Dapr Authors
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package workflows
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/dapr/components-contrib/tests/conformance/utils"
|
||||
"github.com/dapr/components-contrib/workflows"
|
||||
"github.com/dapr/kit/logger"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"go.temporal.io/api/enums/v1"
|
||||
)
|
||||
|
||||
var testLogger = logger.NewLogger("workflowsTest")
|
||||
|
||||
type TestConfig struct {
|
||||
utils.CommonConfig
|
||||
}
|
||||
|
||||
func NewTestConfig(component string, allOperations bool, operations []string, conf map[string]interface{}) TestConfig {
|
||||
tc := TestConfig{
|
||||
CommonConfig: utils.CommonConfig{
|
||||
ComponentType: "workflows",
|
||||
ComponentName: component,
|
||||
AllOperations: allOperations,
|
||||
Operations: utils.NewStringSet(operations...),
|
||||
},
|
||||
}
|
||||
|
||||
return tc
|
||||
}
|
||||
|
||||
// ConformanceTests runs conf tests for workflows.
|
||||
func ConformanceTests(t *testing.T, props map[string]string, workflowItem workflows.Workflow, config TestConfig) {
|
||||
// Test vars
|
||||
t.Run("init", func(t *testing.T) {
|
||||
err := workflowItem.Init(workflows.Metadata{
|
||||
Properties: props,
|
||||
})
|
||||
assert.Nil(t, err)
|
||||
})
|
||||
|
||||
// Everything is within the same task since the workflow needs to persist between operations
|
||||
if config.HasOperation("start") {
|
||||
t.Run("start", func(t *testing.T) {
|
||||
testLogger.Info("Start test running...")
|
||||
req := &workflows.StartRequest{
|
||||
WorkflowName: "TestWorkflow",
|
||||
Parameters: 10, // Time that the activity within the workflow runs for
|
||||
}
|
||||
req.Options.TaskQueue = "TestTaskQueue"
|
||||
wf, err := workflowItem.Start(context.Background(), req)
|
||||
assert.Nil(t, err)
|
||||
resp, err := workflowItem.Get(context.Background(), wf)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, resp.Status, enums.WORKFLOW_EXECUTION_STATUS_RUNNING)
|
||||
time.Sleep(5 * time.Second)
|
||||
resp, err = workflowItem.Get(context.Background(), wf)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, resp.Status, enums.WORKFLOW_EXECUTION_STATUS_RUNNING)
|
||||
err = workflowItem.Terminate(context.Background(), wf)
|
||||
assert.Nil(t, err)
|
||||
resp, err = workflowItem.Get(context.Background(), wf)
|
||||
assert.Nil(t, err)
|
||||
assert.Equal(t, resp.Status, enums.WORKFLOW_EXECUTION_STATUS_TERMINATED)
|
||||
})
|
||||
testLogger.Info("Start test done.")
|
||||
}
|
||||
}
|
|
@ -0,0 +1,19 @@
|
|||
/*
|
||||
Copyright 2021 The Dapr Authors
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package workflows
|
||||
|
||||
// Metadata represents a set of binding specific properties.
|
||||
type Metadata struct {
|
||||
Properties map[string]string `json:"properties"`
|
||||
}
|
|
@ -0,0 +1,35 @@
|
|||
package workflows
|
||||
|
||||
import (
|
||||
"go.temporal.io/sdk/activity"
|
||||
"go.temporal.io/sdk/client"
|
||||
"go.temporal.io/sdk/worker"
|
||||
"go.temporal.io/sdk/workflow"
|
||||
)
|
||||
|
||||
type WorkflowStruct struct {
|
||||
WorkflowId string `json:"workflow_id"`
|
||||
WorkflowRunId string `json:"workflow_run_id"`
|
||||
}
|
||||
|
||||
// StartRequest is the object describing a Start Workflow request.
|
||||
type StartRequest struct {
|
||||
Options client.StartWorkflowOptions `json:"workflow_options"`
|
||||
WorkflowName interface{} `json:"workflow_name"`
|
||||
Parameters interface{} `json:"parameters"`
|
||||
}
|
||||
|
||||
// CreateWorkerRequest is the object describing a Create Worker request.
|
||||
type CreateWorkerRequest struct {
|
||||
TaskQueue string `json:"task_queue"`
|
||||
WorkerOptions worker.Options `json:"worker_options"`
|
||||
}
|
||||
|
||||
// RegisterWorkflowAndActivitiesRequest is the object describing the wfs and acts to assign to a worker
|
||||
type RegisterWorkflowAndActivitiesRequest struct {
|
||||
WorkerName string `json:"worker_name"`
|
||||
WorkflowFunc interface{} `json:"workflow_func_name"`
|
||||
ActivityFunc interface{} `json:"activity_func_name"`
|
||||
WorkflowOptions workflow.RegisterOptions `json:"workflow_register_options"`
|
||||
ActivityOptions activity.RegisterOptions `json:"activity_register_options"`
|
||||
}
|
|
@ -0,0 +1,10 @@
|
|||
package workflows
|
||||
|
||||
import "go.temporal.io/api/enums/v1"
|
||||
|
||||
type StateResponse struct {
|
||||
WfInfo WorkflowStruct
|
||||
StartTime string `json:"start_time"`
|
||||
TaskQueue string `json:"task_queue"`
|
||||
Status enums.WorkflowExecutionStatus `json:"status"`
|
||||
}
|
|
@ -0,0 +1,125 @@
|
|||
/*
|
||||
Copyright 2021 The Dapr Authors
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package temporal
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
|
||||
"github.com/dapr/components-contrib/workflows"
|
||||
"github.com/dapr/kit/logger"
|
||||
"go.temporal.io/sdk/client"
|
||||
)
|
||||
|
||||
// Placeholder string for the task queue
|
||||
const TaskQueueString = "TestTaskQueue"
|
||||
|
||||
type TemporalWF struct {
|
||||
client client.Client
|
||||
logger logger.Logger
|
||||
}
|
||||
|
||||
type temporalMetaData struct {
|
||||
Identity string `json:"url"`
|
||||
HostPort string `json:"masterKey"`
|
||||
}
|
||||
|
||||
// NewTemporalWorkflow returns a new CosmosDB state store.
|
||||
func NewTemporalWorkflow(logger logger.Logger) *TemporalWF {
|
||||
s := &TemporalWF{
|
||||
logger: logger,
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
func (c *TemporalWF) Init(metadata workflows.Metadata) error {
|
||||
c.logger.Debugf("Temporal init start")
|
||||
m, err := c.parseMetadata(metadata)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cOpt := client.Options{}
|
||||
if m.HostPort != "" {
|
||||
cOpt.HostPort = m.HostPort
|
||||
}
|
||||
if m.Identity != "" {
|
||||
cOpt.Identity = m.Identity
|
||||
}
|
||||
// Create the workflow client
|
||||
client, err := client.Dial(cOpt)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.client = client
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *TemporalWF) Start(ctx context.Context, req *workflows.StartRequest) (*workflows.WorkflowStruct, error) {
|
||||
c.logger.Debugf("starting workflow")
|
||||
run, err := c.client.ExecuteWorkflow(ctx, req.Options, req.WorkflowName, req.Parameters)
|
||||
if err != nil {
|
||||
c.logger.Debugf("error when starting workflow")
|
||||
return &workflows.WorkflowStruct{}, err
|
||||
}
|
||||
wfStruct := workflows.WorkflowStruct{WorkflowId: run.GetID(), WorkflowRunId: run.GetRunID()}
|
||||
return &wfStruct, nil
|
||||
}
|
||||
|
||||
func (c *TemporalWF) Terminate(ctx context.Context, req *workflows.WorkflowStruct) error {
|
||||
c.logger.Debugf("terminating workflow")
|
||||
err := c.client.TerminateWorkflow(ctx, req.WorkflowId, req.WorkflowRunId, "")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *TemporalWF) Get(ctx context.Context, req *workflows.WorkflowStruct) (*workflows.StateResponse, error) {
|
||||
c.logger.Debugf("getting workflow data")
|
||||
resp, err := c.client.DescribeWorkflowExecution(ctx, req.WorkflowId, req.WorkflowRunId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Build the output struct
|
||||
outputStruct := workflows.StateResponse{
|
||||
WfInfo: workflows.WorkflowStruct{WorkflowId: req.WorkflowId, WorkflowRunId: req.WorkflowRunId},
|
||||
StartTime: resp.WorkflowExecutionInfo.StartTime.String(),
|
||||
TaskQueue: resp.WorkflowExecutionInfo.GetTaskQueue(),
|
||||
Status: resp.WorkflowExecutionInfo.Status,
|
||||
}
|
||||
|
||||
return &outputStruct, nil
|
||||
}
|
||||
|
||||
func (c *TemporalWF) Close() {
|
||||
|
||||
c.client.Close()
|
||||
}
|
||||
|
||||
func (c *TemporalWF) parseMetadata(metadata workflows.Metadata) (*temporalMetaData, error) {
|
||||
connInfo := metadata.Properties
|
||||
b, err := json.Marshal(connInfo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var creds temporalMetaData
|
||||
err = json.Unmarshal(b, &creds)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &creds, nil
|
||||
}
|
|
@ -0,0 +1,34 @@
|
|||
/*
|
||||
Copyright 2021 The Dapr Authors
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package temporal
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
workflows "github.com/dapr/components-contrib/workflows"
|
||||
"github.com/dapr/kit/logger"
|
||||
"gotest.tools/assert"
|
||||
)
|
||||
|
||||
func TestStartTemporalClient(t *testing.T) {
|
||||
m := workflows.Metadata{}
|
||||
m.Properties = map[string]string{
|
||||
"Identity": "TemporalTestClient",
|
||||
"HostPort": "localhost:7233",
|
||||
}
|
||||
temporal := TemporalWF{logger: logger.NewLogger("TemporalTestLogger")}
|
||||
err := temporal.Init(m)
|
||||
assert.Equal(t, nil, err)
|
||||
|
||||
}
|
|
@ -0,0 +1,24 @@
|
|||
/*
|
||||
Copyright 2021 The Dapr Authors
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package workflows
|
||||
|
||||
import "context"
|
||||
|
||||
// Workflow is an interface to perform operations on Workflow.
|
||||
type Workflow interface {
|
||||
Init(metadata Metadata) error
|
||||
Start(ctx context.Context, req *StartRequest) (*WorkflowStruct, error)
|
||||
Terminate(ctx context.Context, req *WorkflowStruct) error
|
||||
Get(ctx context.Context, req *WorkflowStruct) (*StateResponse, error)
|
||||
}
|
Loading…
Reference in New Issue