Merge remote-tracking branch 'upstream/master' into merge-release1.11-master
This commit is contained in:
commit
d467890cf8
|
|
@ -38,22 +38,26 @@ type SendGrid struct {
|
|||
|
||||
// Our metadata holds standard email properties.
|
||||
type sendGridMetadata struct {
|
||||
APIKey string `mapstructure:"apiKey"`
|
||||
EmailFrom string `mapstructure:"emailFrom"`
|
||||
EmailFromName string `mapstructure:"emailFromName"`
|
||||
EmailTo string `mapstructure:"emailTo"`
|
||||
EmailToName string `mapstructure:"emailToName"`
|
||||
Subject string `mapstructure:"subject"`
|
||||
EmailCc string `mapstructure:"emailCc"`
|
||||
EmailBcc string `mapstructure:"emailBcc"`
|
||||
APIKey string `mapstructure:"apiKey"`
|
||||
EmailFrom string `mapstructure:"emailFrom"`
|
||||
EmailFromName string `mapstructure:"emailFromName"`
|
||||
EmailTo string `mapstructure:"emailTo"`
|
||||
EmailToName string `mapstructure:"emailToName"`
|
||||
Subject string `mapstructure:"subject"`
|
||||
EmailCc string `mapstructure:"emailCc"`
|
||||
EmailBcc string `mapstructure:"emailBcc"`
|
||||
DynamicTemplateData string `mapstructure:"dynamicTemplateData"`
|
||||
DynamicTemplateID string `mapstructure:"dynamicTemplateId"`
|
||||
|
||||
dynamicTemplateDataCache map[string]any // Cache the unmarshalled dynamic template data
|
||||
}
|
||||
|
||||
// Wrapper to help decode SendGrid API errors.
|
||||
type sendGridRestError struct {
|
||||
Errors []struct {
|
||||
Field interface{} `json:"field"`
|
||||
Message interface{} `json:"message"`
|
||||
Help interface{} `json:"help"`
|
||||
Field any `json:"field"`
|
||||
Message any `json:"message"`
|
||||
Help any `json:"help"`
|
||||
} `json:"errors"`
|
||||
}
|
||||
|
||||
|
|
@ -73,7 +77,15 @@ func (sg *SendGrid) parseMetadata(meta bindings.Metadata) (sendGridMetadata, err
|
|||
|
||||
// Required properties
|
||||
if sgMeta.APIKey == "" {
|
||||
return sgMeta, errors.New("SendGrid binding error: apiKey field is required in metadata")
|
||||
return sgMeta, errors.New("apiKey field is required in metadata")
|
||||
}
|
||||
|
||||
// Cache the unmarshalled dynamic template data if present
|
||||
if sgMeta.DynamicTemplateData != "" {
|
||||
templateError := UnmarshalDynamicTemplateData(sgMeta.DynamicTemplateData, &sgMeta.dynamicTemplateDataCache)
|
||||
if templateError != nil {
|
||||
return sgMeta, templateError
|
||||
}
|
||||
}
|
||||
|
||||
return sgMeta, nil
|
||||
|
|
@ -180,6 +192,25 @@ func (sg *SendGrid) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*b
|
|||
bccAddress = mail.NewEmail("", req.Metadata["emailBcc"])
|
||||
}
|
||||
|
||||
// Build email Dynamic Template Id, this is optional
|
||||
var templateID string
|
||||
if req.Metadata["dynamicTemplateId"] != "" {
|
||||
templateID = req.Metadata["dynamicTemplateId"]
|
||||
} else if sg.metadata.DynamicTemplateID != "" {
|
||||
templateID = sg.metadata.DynamicTemplateID
|
||||
}
|
||||
|
||||
// Build email dynamic template, this is optional
|
||||
var templateData map[string]any
|
||||
if req.Metadata["dynamicTemplateData"] != "" {
|
||||
templateError := UnmarshalDynamicTemplateData(req.Metadata["dynamicTemplateData"], &templateData)
|
||||
if templateError != nil {
|
||||
return nil, templateError
|
||||
}
|
||||
} else if sg.metadata.dynamicTemplateDataCache != nil {
|
||||
templateData = sg.metadata.dynamicTemplateDataCache
|
||||
}
|
||||
|
||||
// Email body is held in req.Data, after we tidy it up a bit
|
||||
emailBody, err := strconv.Unquote(string(req.Data))
|
||||
if err != nil {
|
||||
|
|
@ -202,13 +233,20 @@ func (sg *SendGrid) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*b
|
|||
if bccAddress != nil {
|
||||
personalization.AddBCCs(bccAddress)
|
||||
}
|
||||
if templateID != "" {
|
||||
email.TemplateID = templateID
|
||||
}
|
||||
if templateData != nil {
|
||||
personalization.DynamicTemplateData = templateData
|
||||
}
|
||||
|
||||
email.AddPersonalizations(personalization)
|
||||
|
||||
// Send the email
|
||||
client := sendgrid.NewSendClient(sg.metadata.APIKey)
|
||||
resp, err := client.SendWithContext(ctx, email)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error from SendGrid, sending email failed: %+v", err)
|
||||
return nil, fmt.Errorf("error from SendGrid: sending email failed: %w", err)
|
||||
}
|
||||
|
||||
// Check SendGrid response is OK
|
||||
|
|
@ -217,7 +255,7 @@ func (sg *SendGrid) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*b
|
|||
sendGridError := sendGridRestError{}
|
||||
json.NewDecoder(strings.NewReader(resp.Body)).Decode(&sendGridError)
|
||||
// Pass it back to the caller, so they have some idea what went wrong
|
||||
return nil, fmt.Errorf("error from SendGrid, sending email failed: %d %+v", resp.StatusCode, sendGridError)
|
||||
return nil, fmt.Errorf("error from SendGrid: sending email failed: %d %+v", resp.StatusCode, sendGridError)
|
||||
}
|
||||
|
||||
sg.logger.Info("sent email with SendGrid")
|
||||
|
|
@ -232,3 +270,12 @@ func (sg *SendGrid) GetComponentMetadata() map[string]string {
|
|||
metadata.GetMetadataInfoFromStructType(reflect.TypeOf(metadataStruct), &metadataInfo, metadata.BindingType)
|
||||
return metadataInfo
|
||||
}
|
||||
|
||||
// Function that unmarshals the Dynamic Template Data JSON String into a map[string]any.
|
||||
func UnmarshalDynamicTemplateData(jsonString string, result *map[string]any) error {
|
||||
err := json.Unmarshal([]byte(jsonString), &result)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error from SendGrid binding, dynamic template data is not valid JSON: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@ limitations under the License.
|
|||
package sendgrid
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
|
@ -42,8 +43,24 @@ func TestParseMetadataWithOptionalNames(t *testing.T) {
|
|||
logger := logger.NewLogger("test")
|
||||
|
||||
t.Run("Has correct metadata", func(t *testing.T) {
|
||||
// Sample nested JSON with Dynamic Template Data
|
||||
dynamicTemplateData, _ := json.Marshal(map[string]any{
|
||||
"name": map[string]any{
|
||||
"first": "MyFirst",
|
||||
"last": "MyLast",
|
||||
},
|
||||
})
|
||||
|
||||
m := bindings.Metadata{}
|
||||
m.Properties = map[string]string{"apiKey": "123", "emailFrom": "test1@example.net", "emailFromName": "test 1", "emailTo": "test2@example.net", "emailToName": "test 2", "subject": "hello"}
|
||||
m.Properties = map[string]string{
|
||||
"apiKey": "123",
|
||||
"emailFrom": "test1@example.net",
|
||||
"emailFromName": "test 1",
|
||||
"emailTo": "test2@example.net",
|
||||
"emailToName": "test 2", "subject": "hello",
|
||||
"dynamicTemplateData": string(dynamicTemplateData),
|
||||
"dynamicTemplateId": "456",
|
||||
}
|
||||
r := SendGrid{logger: logger}
|
||||
sgMeta, err := r.parseMetadata(m)
|
||||
assert.Nil(t, err)
|
||||
|
|
@ -53,5 +70,43 @@ func TestParseMetadataWithOptionalNames(t *testing.T) {
|
|||
assert.Equal(t, "test2@example.net", sgMeta.EmailTo)
|
||||
assert.Equal(t, "test 2", sgMeta.EmailToName)
|
||||
assert.Equal(t, "hello", sgMeta.Subject)
|
||||
assert.Equal(t, `{"name":{"first":"MyFirst","last":"MyLast"}}`, sgMeta.DynamicTemplateData)
|
||||
assert.Equal(t, "456", sgMeta.DynamicTemplateID)
|
||||
})
|
||||
|
||||
t.Run("Has incorrect template data metadata", func(t *testing.T) {
|
||||
m := bindings.Metadata{}
|
||||
m.Properties = map[string]string{
|
||||
"apiKey": "123",
|
||||
"emailFrom": "test1@example.net",
|
||||
"emailFromName": "test 1",
|
||||
"emailTo": "test2@example.net",
|
||||
"emailToName": "test 2",
|
||||
"subject": "hello",
|
||||
"dynamicTemplateData": `{"wrong"}`,
|
||||
"dynamicTemplateId": "456",
|
||||
}
|
||||
r := SendGrid{logger: logger}
|
||||
_, err := r.parseMetadata(m)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
// Test UnmarshalDynamicTemplateData function
|
||||
func TestUnmarshalDynamicTemplateData(t *testing.T) {
|
||||
t.Run("Test Template Data", func(t *testing.T) {
|
||||
// Sample nested JSON with Dynamic Template Data
|
||||
dynamicTemplateData, _ := json.Marshal(map[string]interface{}{"name": map[string]interface{}{"first": "MyFirst", "last": "MyLast"}})
|
||||
|
||||
var data map[string]interface{}
|
||||
|
||||
// Test valid JSON
|
||||
err := UnmarshalDynamicTemplateData(string(dynamicTemplateData), &data)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, map[string]interface{}{"first": "MyFirst", "last": "MyLast"}, data["name"])
|
||||
|
||||
// Test invalid JSON
|
||||
err = UnmarshalDynamicTemplateData("{\"wrong\"}", &data)
|
||||
assert.Error(t, err)
|
||||
})
|
||||
}
|
||||
|
|
|
|||
|
|
@ -43,7 +43,7 @@ metadata:
|
|||
# Docs mention "pemContents" but there is no mention of this metadata in the code
|
||||
# in this whole repository
|
||||
- name: schemaName
|
||||
description: "The schema name to use. Will be created if schema does not exist."
|
||||
description: "The schema name (database) to use. Will be created if schema does not exist."
|
||||
type: string
|
||||
default: "dapr_state_store"
|
||||
example: '"custom_schema"'
|
||||
|
|
@ -56,4 +56,4 @@ metadata:
|
|||
description: "Timeout for all database operations (in seconds)."
|
||||
type: number
|
||||
default: "20"
|
||||
example: "30"
|
||||
example: "30"
|
||||
|
|
|
|||
|
|
@ -0,0 +1,26 @@
|
|||
apiVersion: dapr.io/v1alpha1
|
||||
kind: Component
|
||||
metadata:
|
||||
name: messagebus
|
||||
spec:
|
||||
type: bindings.kafka
|
||||
metadata:
|
||||
# Kafka broker connection setting
|
||||
- name: brokers
|
||||
value: localhost:9092
|
||||
# consumer configuration: topic and consumer group
|
||||
- name: topics
|
||||
value: sample
|
||||
- name: consumerGroup
|
||||
value: group1
|
||||
# publisher configuration: topic
|
||||
- name: publishTopic
|
||||
value: sample
|
||||
- name: authRequired
|
||||
value: "true"
|
||||
- name: saslUsername
|
||||
value: admin
|
||||
- name: saslPassword
|
||||
value: admin-secret
|
||||
- name: disableTls
|
||||
value: "true"
|
||||
|
|
@ -51,26 +51,27 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
sidecarName1 = "dapr-1"
|
||||
sidecarName2 = "dapr-2"
|
||||
sidecarName3 = "dapr-3"
|
||||
appID1 = "app-1"
|
||||
appID2 = "app-2"
|
||||
appID3 = "app-3"
|
||||
clusterName = "kafkacertification"
|
||||
dockerComposeYAML = "docker-compose.yml"
|
||||
numMessages = 1000
|
||||
appPort = 8000
|
||||
portOffset = 2
|
||||
messageKey = "partitionKey"
|
||||
sidecarName1 = "dapr-1"
|
||||
sidecarName2 = "dapr-2"
|
||||
sidecarName3 = "dapr-3"
|
||||
sidecarName4 = "dapr-4"
|
||||
appID1 = "app-1"
|
||||
appID2 = "app-2"
|
||||
appID3 = "app-3"
|
||||
appID4 = "app-4"
|
||||
clusterName = "kafkacertification"
|
||||
dockerComposeYAML = "docker-compose.yml"
|
||||
dockerComposeYAMLSasl = "sasl-docker/docker-compose.yml"
|
||||
numMessages = 1000
|
||||
appPort = 8000
|
||||
portOffset = 2
|
||||
messageKey = "partitionKey"
|
||||
|
||||
bindingName = "messagebus"
|
||||
topicName = "neworder"
|
||||
)
|
||||
|
||||
var (
|
||||
brokers = []string{"localhost:19092", "localhost:29092", "localhost:39092"}
|
||||
)
|
||||
var brokers = []string{"localhost:19092", "localhost:29092", "localhost:39092"}
|
||||
|
||||
func TestKafka_with_retry(t *testing.T) {
|
||||
// For Kafka, we should ensure messages are received in order.
|
||||
|
|
@ -152,6 +153,20 @@ func TestKafka_with_retry(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
simpleSendTest := func(metadata map[string]string) flow.Runnable {
|
||||
return func(ctx flow.Context) error {
|
||||
client := sidecar.GetClient(ctx, sidecarName4)
|
||||
err := client.InvokeOutputBinding(ctx, &dapr.InvokeBindingRequest{
|
||||
Name: bindingName,
|
||||
Operation: string(bindings.CreateOperation),
|
||||
Data: []byte("sasl password auth test message"),
|
||||
Metadata: metadata,
|
||||
})
|
||||
require.NoError(ctx, err, "error publishing message")
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// sendMessagesInBackground and assertMessages are
|
||||
// Runnables for testing publishing and consuming
|
||||
// messages reliably when infrastructure and network
|
||||
|
|
@ -245,7 +260,7 @@ func TestKafka_with_retry(t *testing.T) {
|
|||
//
|
||||
// Run the Dapr sidecar with the Kafka component.
|
||||
Step(sidecar.Run(sidecarName1,
|
||||
embedded.WithComponentsPath("./components/consumer1"),
|
||||
embedded.WithResourcesPath("./components/consumer1"),
|
||||
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort),
|
||||
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort),
|
||||
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort),
|
||||
|
|
@ -258,7 +273,7 @@ func TestKafka_with_retry(t *testing.T) {
|
|||
//
|
||||
// Run the Dapr sidecar with the Kafka component.
|
||||
Step(sidecar.Run(sidecarName2,
|
||||
embedded.WithComponentsPath("./components/consumer2"),
|
||||
embedded.WithResourcesPath("./components/consumer2"),
|
||||
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+portOffset),
|
||||
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+portOffset),
|
||||
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+portOffset),
|
||||
|
|
@ -276,7 +291,7 @@ func TestKafka_with_retry(t *testing.T) {
|
|||
//
|
||||
// Run the Dapr sidecar with the Kafka component.
|
||||
Step(sidecar.Run(sidecarName3,
|
||||
embedded.WithComponentsPath("./components/consumer2"),
|
||||
embedded.WithResourcesPath("./components/consumer2"),
|
||||
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+portOffset*2),
|
||||
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+portOffset*2),
|
||||
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+portOffset*2),
|
||||
|
|
@ -340,6 +355,25 @@ func TestKafka_with_retry(t *testing.T) {
|
|||
Step("wait", flow.Sleep(30*time.Second)).
|
||||
Step("assert messages(consumer rebalance)", assertMessages(consumerGroup2)).
|
||||
Run()
|
||||
|
||||
flow.New(t, "kafka with sals password auth - no tls - wurstmeister").
|
||||
// Run Kafka using Docker Compose.
|
||||
Step(dockercompose.Run(clusterName, dockerComposeYAMLSasl)).
|
||||
Step("wait for broker sockets",
|
||||
network.WaitForAddresses(5*time.Minute, "localhost:9092")).
|
||||
Step("wait", flow.Sleep(20*time.Second)).
|
||||
// Run the Dapr sidecar with the Kafka component.
|
||||
Step(sidecar.Run(sidecarName4,
|
||||
embedded.WithResourcesPath("./components/sasl-password"),
|
||||
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+5*portOffset),
|
||||
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+5*portOffset),
|
||||
embedded.WithoutApp(),
|
||||
componentRuntimeOptions(),
|
||||
)).
|
||||
Step("simple send test", simpleSendTest(metadata)).
|
||||
Step("wait", flow.Sleep(10*time.Second)).
|
||||
Step("stop sidecar 1", sidecar.Stop(sidecarName4)).
|
||||
Run()
|
||||
}
|
||||
|
||||
func componentRuntimeOptions() []runtime.Option {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,42 @@
|
|||
# ------------------------------------------------------------
|
||||
# Copyright 2021 The Dapr Authors
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
# ------------------------------------------------------------
|
||||
|
||||
version: '3.7'
|
||||
services:
|
||||
zookeeper:
|
||||
image: wurstmeister/zookeeper:latest
|
||||
ports:
|
||||
- 2181:2181
|
||||
environment:
|
||||
JVMFLAGS: "-Djava.security.auth.login.config=/etc/zookeeper/zookeeper_jaas.conf"
|
||||
volumes:
|
||||
- ./zookeeper_jaas.conf:/etc/zookeeper/zookeeper_jaas.conf
|
||||
kafka:
|
||||
image: wurstmeister/kafka:latest
|
||||
depends_on:
|
||||
- zookeeper
|
||||
ports:
|
||||
- "9092:9092"
|
||||
environment:
|
||||
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
|
||||
ALLOW_PLAINTEXT_LISTENER: 'yes'
|
||||
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
|
||||
KAFKA_LISTENERS: EXT://:9092,INT://:9093
|
||||
KAFKA_ADVERTISED_LISTENERS: EXT://localhost:9092,INT://kafka:9093
|
||||
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: EXT:SASL_PLAINTEXT,INT:SASL_PLAINTEXT
|
||||
KAFKA_INTER_BROKER_LISTENER_NAME: INT
|
||||
KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
|
||||
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
|
||||
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/kafka_jaas.conf"
|
||||
volumes:
|
||||
- ./kafka_server_jaas.conf:/etc/kafka/kafka_jaas.conf
|
||||
|
|
@ -0,0 +1,12 @@
|
|||
KafkaServer {
|
||||
org.apache.kafka.common.security.plain.PlainLoginModule required
|
||||
username="admin"
|
||||
password="admin-secret"
|
||||
user_admin="admin-secret";
|
||||
};
|
||||
|
||||
Client {
|
||||
org.apache.kafka.common.security.plain.PlainLoginModule required
|
||||
username="admin"
|
||||
password="admin-secret";
|
||||
};
|
||||
|
|
@ -0,0 +1,4 @@
|
|||
Server {
|
||||
org.apache.zookeeper.server.auth.DigestLoginModule required
|
||||
user_admin="admin-secret";
|
||||
};
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
# Supported operations: create, operations and read
|
||||
# Supported operations: create, operations, read
|
||||
# Config map:
|
||||
## output: A map of strings that will be part of the request for the output binding
|
||||
## readBindingTimeout : timeout to wait to receive test event
|
||||
|
|
@ -74,7 +74,6 @@ components:
|
|||
- component: kubemq
|
||||
operations: [ "create", "operations", "read" ]
|
||||
- component: postgres
|
||||
allOperations: false
|
||||
operations: [ "exec", "query", "close", "operations" ]
|
||||
- component: aws.s3.docker
|
||||
operations: ["create", "operations", "get", "list"]
|
||||
|
|
|
|||
|
|
@ -1,9 +1,9 @@
|
|||
# Supported operation: get, subscribe, unsubscribe
|
||||
# Supported additional operation: (none)
|
||||
componentType: configuration
|
||||
components:
|
||||
- component: redis.v6
|
||||
allOperations: true
|
||||
operations: []
|
||||
- component: redis.v7
|
||||
allOperations: true
|
||||
operations: []
|
||||
- component: postgres
|
||||
allOperations: true
|
||||
operations: []
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@
|
|||
componentType: crypto
|
||||
components:
|
||||
- component: localstorage
|
||||
allOperations: true
|
||||
operations: ["public", "symmetric"]
|
||||
config:
|
||||
keys:
|
||||
- algorithms: ["EdDSA"]
|
||||
|
|
@ -29,7 +29,7 @@ components:
|
|||
type: symmetric
|
||||
name: symmetric-256.b64
|
||||
- component: jwks
|
||||
allOperations: true
|
||||
operations: ["public", "symmetric"]
|
||||
config:
|
||||
keys:
|
||||
- algorithms: ["EdDSA"]
|
||||
|
|
@ -55,7 +55,6 @@ components:
|
|||
name: symmetric-256
|
||||
- component: azure.keyvault
|
||||
# Althoguh Azure Key Vault supports symmetric keys, those are only available in "Managed HSMs", which are too impractical for our tests
|
||||
allOperations: false
|
||||
operations: []
|
||||
config:
|
||||
keys:
|
||||
|
|
|
|||
|
|
@ -1,18 +1,18 @@
|
|||
# Supported operation: publish, subscribe, multiplehandlers, bulkpublish, bulksubscribe
|
||||
# bulkpublish should only be run for components that implement pubsub.BulkPublisher interface
|
||||
# bulksubscribe should only be run for components that implement pubsub.BulkSubscriber interface
|
||||
# Supported additional operation:
|
||||
# - bulkpublish (should only be run for components that implement pubsub.BulkPublisher interface)
|
||||
# - bulksubscribe (should only be run for components that implement pubsub.BulkSubscriber interface)
|
||||
# Config map:
|
||||
## pubsubName : name of the pubsub
|
||||
## testTopicName: name of the test topic to use
|
||||
## publishMetadata: A map of strings that will be part of the publish metadata in the Publish call
|
||||
## subscribeMetadata: A map of strings that will be part of the subscribe metadata in the Subscribe call
|
||||
## maxReadDuration: duration to wait for read to complete
|
||||
## messageCount: no. of messages to publish
|
||||
## checkInOrderProcessing: false disables in-order message processing checking
|
||||
# - pubsubName : name of the pubsub
|
||||
# - testTopicName: name of the test topic to use
|
||||
# - publishMetadata: A map of strings that will be part of the publish metadata in the Publish call
|
||||
# - subscribeMetadata: A map of strings that will be part of the subscribe metadata in the Subscribe call
|
||||
# - maxReadDuration: duration to wait for read to complete
|
||||
# - messageCount: no. of messages to publish
|
||||
# - checkInOrderProcessing: false disables in-order message processing checking
|
||||
componentType: pubsub
|
||||
components:
|
||||
- component: azure.eventhubs
|
||||
operations: ['publish', 'subscribe', 'multiplehandlers', 'bulkpublish']
|
||||
operations: ['bulkpublish']
|
||||
config:
|
||||
pubsubName: azure-eventhubs
|
||||
testTopicName: eventhubs-pubsub-topic
|
||||
|
|
@ -24,7 +24,7 @@ components:
|
|||
publishMetadata:
|
||||
partitionKey: abcd
|
||||
- component: azure.servicebus.topics
|
||||
allOperations: true
|
||||
operations: ['bulkpublish', 'bulksubscribe']
|
||||
config:
|
||||
pubsubName: azure-servicebus
|
||||
testTopicName: dapr-conf-test
|
||||
|
|
@ -33,7 +33,7 @@ components:
|
|||
testMultiTopic2Name: dapr-conf-test-multi2
|
||||
checkInOrderProcessing: false
|
||||
- component: azure.servicebus.queues
|
||||
allOperations: true
|
||||
operations: ['bulkpublish', 'bulksubscribe']
|
||||
config:
|
||||
pubsubName: azure-servicebus
|
||||
testTopicName: dapr-conf-queue
|
||||
|
|
@ -42,43 +42,43 @@ components:
|
|||
testMultiTopic2Name: dapr-conf-queue-multi2
|
||||
checkInOrderProcessing: false
|
||||
- component: redis.v6
|
||||
operations: ["publish", "subscribe", "multiplehandlers"]
|
||||
operations: []
|
||||
config:
|
||||
checkInOrderProcessing: false
|
||||
- component: redis.v7
|
||||
operations: ["publish", "subscribe", "multiplehandlers"]
|
||||
operations: []
|
||||
config:
|
||||
checkInOrderProcessing: false
|
||||
- component: natsstreaming
|
||||
operations: ['publish', 'subscribe', 'multiplehandlers']
|
||||
operations: []
|
||||
- component: jetstream
|
||||
operations: ['publish', 'subscribe', 'multiplehandlers']
|
||||
operations: []
|
||||
- component: kafka
|
||||
allOperations: true
|
||||
operations: ['bulkpublish', 'bulksubscribe']
|
||||
- component: kafka
|
||||
profile: wurstmeister
|
||||
allOperations: true
|
||||
operations: ['bulkpublish', 'bulksubscribe']
|
||||
- component: kafka
|
||||
profile: confluent
|
||||
allOperations: true
|
||||
operations: ['bulkpublish', 'bulksubscribe']
|
||||
- component: pulsar
|
||||
operations: ['publish', 'subscribe', 'multiplehandlers']
|
||||
operations: []
|
||||
- component: solace.amqp
|
||||
operations: ['publish', 'subscribe']
|
||||
operations: []
|
||||
- component: mqtt3
|
||||
profile: emqx
|
||||
operations: ['publish', 'subscribe', 'multiplehandlers']
|
||||
operations: []
|
||||
- component: mqtt3
|
||||
profile: vernemq
|
||||
operations: ['publish', 'subscribe', 'multiplehandlers']
|
||||
operations: []
|
||||
- component: rabbitmq
|
||||
operations: ['publish', 'subscribe', 'multiplehandlers']
|
||||
operations: []
|
||||
config:
|
||||
checkInOrderProcessing: false
|
||||
- component: in-memory
|
||||
operations: ["publish", "subscribe", "multiplehandlers"]
|
||||
operations: []
|
||||
- component: aws.snssqs.terraform
|
||||
operations: ["publish", "subscribe", "multiplehandlers"]
|
||||
operations: []
|
||||
config:
|
||||
pubsubName: aws-snssqs
|
||||
testTopicName: ${{PUBSUB_AWS_SNSSQS_TOPIC}}
|
||||
|
|
@ -86,14 +86,14 @@ components:
|
|||
testMultiTopic2Name: ${{PUBSUB_AWS_SNSSQS_TOPIC_MULTI_2}}
|
||||
checkInOrderProcessing: false
|
||||
- component: aws.snssqs.docker
|
||||
operations: ["publish", "subscribe", "multiplehandlers"]
|
||||
operations: []
|
||||
config:
|
||||
pubsubName: aws-snssqs
|
||||
checkInOrderProcessing: false
|
||||
- component: kubemq
|
||||
operations: ['publish', 'subscribe', 'multiplehandlers']
|
||||
operations: []
|
||||
- component: gcp.pubsub.terraform
|
||||
operations: ["publish", "subscribe", "multiplehandlers"]
|
||||
operations: []
|
||||
config:
|
||||
pubsubName: gcp-pubsub
|
||||
testTopicName: ${{PUBSUB_GCP_TOPIC}}
|
||||
|
|
@ -102,7 +102,7 @@ components:
|
|||
testMultiTopic2Name: ${{PUBSUB_GCP_TOPIC_MULTI_2}}
|
||||
checkInOrderProcessing: false
|
||||
- component: gcp.pubsub.docker
|
||||
operations: ["publish", "subscribe", "multiplehandlers"]
|
||||
operations: []
|
||||
config:
|
||||
pubsubName: gcp-pubsub
|
||||
checkInOrderProcessing: false
|
||||
|
|
|
|||
|
|
@ -1,16 +1,16 @@
|
|||
# Supported operations: get, bulkget
|
||||
# Supported additional operations: (none)
|
||||
componentType: secretstores
|
||||
components:
|
||||
- component: local.env
|
||||
allOperations: true
|
||||
operations: []
|
||||
- component: local.file
|
||||
allOperations: true
|
||||
operations: []
|
||||
- component: azure.keyvault.certificate
|
||||
allOperations: true
|
||||
operations: []
|
||||
- component: azure.keyvault.serviceprincipal
|
||||
allOperations: true
|
||||
operations: []
|
||||
- component: kubernetes
|
||||
allOperations: true
|
||||
operations: []
|
||||
- component: hashicorp.vault
|
||||
allOperations: true
|
||||
operations: []
|
||||
|
||||
|
|
|
|||
|
|
@ -4,103 +4,78 @@
|
|||
componentType: state
|
||||
components:
|
||||
- component: redis.v6
|
||||
allOperations: false
|
||||
operations: [ "transaction", "etag", "first-write", "query", "ttl" ]
|
||||
config:
|
||||
# This component requires etags to be numeric
|
||||
badEtag: "9999999"
|
||||
- component: redis.v7
|
||||
allOperations: false
|
||||
# "query" is not included because redisjson hasn't been updated to Redis v7 yet
|
||||
operations: [ "transaction", "etag", "first-write", "ttl" ]
|
||||
config:
|
||||
# This component requires etags to be numeric
|
||||
badEtag: "9999999"
|
||||
- component: mongodb
|
||||
allOperations: false
|
||||
operations: [ "transaction", "etag", "first-write", "query", "ttl" ]
|
||||
- component: memcached
|
||||
allOperations: false
|
||||
operations: [ "ttl" ]
|
||||
- component: azure.cosmosdb
|
||||
allOperations: false
|
||||
operations: [ "transaction", "etag", "first-write", "query", "ttl" ]
|
||||
- component: azure.blobstorage
|
||||
allOperations: false
|
||||
operations: [ "etag", "first-write" ]
|
||||
- component: azure.sql
|
||||
allOperations: false
|
||||
operations: [ "transaction", "etag", "first-write", "ttl" ]
|
||||
config:
|
||||
# This component requires etags to be hex-encoded numbers
|
||||
badEtag: "FFFF"
|
||||
- component: sqlserver
|
||||
allOperations: false
|
||||
operations: [ "transaction", "etag", "first-write", "ttl" ]
|
||||
config:
|
||||
# This component requires etags to be hex-encoded numbers
|
||||
badEtag: "FFFF"
|
||||
- component: postgresql
|
||||
allOperations: false
|
||||
operations: [ "transaction", "etag", "first-write", "query", "ttl" ]
|
||||
config:
|
||||
# This component requires etags to be numeric
|
||||
badEtag: "1"
|
||||
- component: sqlite
|
||||
allOperations: false
|
||||
operations: [ "transaction", "etag", "first-write", "ttl" ]
|
||||
- component: mysql.mysql
|
||||
allOperations: false
|
||||
operations: [ "transaction", "etag", "first-write", "ttl" ]
|
||||
- component: mysql.mariadb
|
||||
allOperations: false
|
||||
operations: [ "transaction", "etag", "first-write", "ttl" ]
|
||||
- component: azure.tablestorage.storage
|
||||
allOperations: false
|
||||
operations: [ "etag", "first-write"]
|
||||
config:
|
||||
# This component requires etags to be in this format
|
||||
badEtag: "W/\"datetime'2023-05-09T12%3A28%3A54.1442151Z'\""
|
||||
- component: azure.tablestorage.cosmosdb
|
||||
allOperations: false
|
||||
operations: [ "etag", "first-write"]
|
||||
config:
|
||||
# This component requires etags to be in this format
|
||||
badEtag: "W/\"datetime'2023-05-09T12%3A28%3A54.1442151Z'\""
|
||||
- component: oracledatabase
|
||||
allOperations: false
|
||||
operations: [ "transaction", "etag", "first-write", "ttl" ]
|
||||
- component: cassandra
|
||||
allOperations: false
|
||||
operations: [ "ttl" ]
|
||||
- component: cloudflare.workerskv
|
||||
allOperations: false
|
||||
# Although this component supports TTLs, the minimum TTL is 60s, which makes it not suitable for our conformance tests
|
||||
operations: []
|
||||
- component: cockroachdb
|
||||
allOperations: false
|
||||
operations: [ "transaction", "etag", "first-write", "query", "ttl" ]
|
||||
config:
|
||||
# This component requires etags to be numeric
|
||||
badEtag: "9999999"
|
||||
- component: rethinkdb
|
||||
allOperations: false
|
||||
operations: []
|
||||
- component: in-memory
|
||||
allOperations: false
|
||||
operations: [ "transaction", "etag", "first-write", "ttl" ]
|
||||
- component: aws.dynamodb.docker
|
||||
allOperations: false
|
||||
operations: [ "transaction", "etag", "first-write" ]
|
||||
- component: aws.dynamodb.terraform
|
||||
allOperations: false
|
||||
operations: [ "transaction", "etag", "first-write" ]
|
||||
- component: etcd
|
||||
allOperations: false
|
||||
operations: [ "transaction", "etag", "first-write", "ttl" ]
|
||||
- component: gcp.firestore.docker
|
||||
allOperations: false
|
||||
operations: []
|
||||
- component: gcp.firestore.cloud
|
||||
allOperations: false
|
||||
operations: []
|
||||
|
|
@ -1,6 +1,5 @@
|
|||
# Supported operations: start, get, terminate
|
||||
# Supported additional operations: (none)
|
||||
componentType: workflows
|
||||
components:
|
||||
- component: temporal
|
||||
allOperations: false
|
||||
operations: [ "start", "get", "terminate"]
|
||||
operations: []
|
||||
|
|
@ -5,12 +5,14 @@
|
|||
1. `tests/` directory contains the configuration and the test definition for conformance tests.
|
||||
2. All the conformance tests are within the `tests/conformance` directory.
|
||||
3. All the configurations are in the `tests/config` directory.
|
||||
4. Each of the component specific `component` definition are in their specific `component type` folder in the `tests/config` folder. E.g. `redis` statestore component definition within `state` directory. The component types are `bindings`, `state`, `secretstores`, `pubsub`. Cloud specific components will be within their own `cloud` directory within the `component type` folder, e.g. `pubsub/azure/servicebus`.
|
||||
4. Each of the component specific `component` definition are in their specific `component type` folder in the `tests/config` folder. For example, the `redis` statestore component definition within `state` directory.
|
||||
- The component types are: `bindings`, `configuration`, `crypto`, `pubsub`, `state`, `secretstores`, `workflows`.
|
||||
- Cloud specific components will be within their own `cloud` directory within the `component type` folder, e.g. `pubsub/azure/servicebus`.
|
||||
5. Similar to the component definitions, each component type has its own set of the conformance tests definitions.
|
||||
6. Each `component type` contains a `tests.yml` definition that defines the component to be tested along with component specific test configuration. Nested folder names have their `/` in path replaced by `.` in the component name in `tests.yml`, e.g. `azure/servicebus` should be `azure.servicebus`
|
||||
6. Each `component type` contains a `tests.yml` definition that defines the component to be tested along with component specific test configuration. Nested folder names have their `/` in path replaced by `.` in the component name in `tests.yml`, e.g. `azure/servicebus/topics` should be `azure.servicebus.topics`
|
||||
7. All the tests configurations are defined in `common.go` file.
|
||||
8. Each `component type` has its own `_test` file to trigger the conformance tests. E.g. `bindings_test.go`.
|
||||
9. Each test added will also need to be added to the `conformance.yml` workflow file.
|
||||
9. Each test added will also need to be added to the `component type/tests.yml` workflow file.
|
||||
|
||||
## Conformance test workflow
|
||||
|
||||
|
|
@ -48,10 +50,12 @@
|
|||
```yaml
|
||||
componentType: binding
|
||||
components:
|
||||
## All other components
|
||||
- component: <COMPONENT>
|
||||
allOperations: <true/false>
|
||||
operations: <List of operations if needed>
|
||||
# For each component
|
||||
- component: <COMPONENT>
|
||||
# If the component supports additional (optional) operations
|
||||
operations: [ '<operation1>', '<operation2'> ]
|
||||
# If the component does NOT support additional operations
|
||||
operations: []
|
||||
```
|
||||
|
||||
5. Any UUID generation for keys can be specified using `$((uuid))`. E.g. see [/tests/config/bindings/tests.yml](../config/bindings/tests.yml)
|
||||
|
|
|
|||
|
|
@ -59,13 +59,12 @@ type TestConfig struct {
|
|||
ReadBindingWait time.Duration `mapstructure:"readBindingWait"`
|
||||
}
|
||||
|
||||
func NewTestConfig(name string, allOperations bool, operations []string, configMap map[string]interface{}) (TestConfig, error) {
|
||||
func NewTestConfig(name string, operations []string, configMap map[string]interface{}) (TestConfig, error) {
|
||||
waitForSetup = false
|
||||
testConfig := TestConfig{
|
||||
CommonConfig: utils.CommonConfig{
|
||||
ComponentType: "bindings",
|
||||
ComponentName: name,
|
||||
AllOperations: allOperations,
|
||||
Operations: utils.NewStringSet(operations...),
|
||||
},
|
||||
InputMetadata: make(map[string]string),
|
||||
|
|
|
|||
|
|
@ -139,11 +139,10 @@ type TestConfiguration struct {
|
|||
}
|
||||
|
||||
type TestComponent struct {
|
||||
Component string `yaml:"component,omitempty"`
|
||||
Profile string `yaml:"profile,omitempty"`
|
||||
AllOperations bool `yaml:"allOperations,omitempty"`
|
||||
Operations []string `yaml:"operations,omitempty"`
|
||||
Config map[string]interface{} `yaml:"config,omitempty"`
|
||||
Component string `yaml:"component,omitempty"`
|
||||
Profile string `yaml:"profile,omitempty"`
|
||||
Operations []string `yaml:"operations,omitempty"`
|
||||
Config map[string]interface{} `yaml:"config,omitempty"`
|
||||
}
|
||||
|
||||
// NewTestConfiguration reads the tests.yml and loads the TestConfiguration.
|
||||
|
|
@ -368,7 +367,7 @@ func (tc *TestConfiguration) Run(t *testing.T) {
|
|||
require.NoErrorf(t, err, "error running conformance test for component %s", comp.Component)
|
||||
store := loadStateStore(comp)
|
||||
require.NotNilf(t, store, "error running conformance test for component %s", comp.Component)
|
||||
storeConfig, err := conf_state.NewTestConfig(comp.Component, comp.AllOperations, comp.Operations, comp.Config)
|
||||
storeConfig, err := conf_state.NewTestConfig(comp.Component, comp.Operations, comp.Config)
|
||||
require.NoErrorf(t, err, "error running conformance test for component %s", comp.Component)
|
||||
conf_state.ConformanceTests(t, props, store, storeConfig)
|
||||
case "secretstores":
|
||||
|
|
@ -377,7 +376,7 @@ func (tc *TestConfiguration) Run(t *testing.T) {
|
|||
require.NoErrorf(t, err, "error running conformance test for component %s", comp.Component)
|
||||
store := loadSecretStore(comp)
|
||||
require.NotNilf(t, store, "error running conformance test for component %s", comp.Component)
|
||||
storeConfig := conf_secret.NewTestConfig(comp.Component, comp.AllOperations, comp.Operations)
|
||||
storeConfig := conf_secret.NewTestConfig(comp.Component, comp.Operations)
|
||||
conf_secret.ConformanceTests(t, props, store, storeConfig)
|
||||
case "pubsub":
|
||||
filepath := fmt.Sprintf("../config/pubsub/%s", componentConfigPath)
|
||||
|
|
@ -385,7 +384,7 @@ func (tc *TestConfiguration) Run(t *testing.T) {
|
|||
require.NoErrorf(t, err, "error running conformance test for component %s", comp.Component)
|
||||
pubsub := loadPubSub(comp)
|
||||
require.NotNil(t, pubsub, "error running conformance test for component %s", comp.Component)
|
||||
pubsubConfig, err := conf_pubsub.NewTestConfig(comp.Component, comp.AllOperations, comp.Operations, comp.Config)
|
||||
pubsubConfig, err := conf_pubsub.NewTestConfig(comp.Component, comp.Operations, comp.Config)
|
||||
require.NoErrorf(t, err, "error running conformance test for component %s", comp.Component)
|
||||
conf_pubsub.ConformanceTests(t, props, pubsub, pubsubConfig)
|
||||
case "bindings":
|
||||
|
|
@ -395,7 +394,7 @@ func (tc *TestConfiguration) Run(t *testing.T) {
|
|||
inputBinding := loadInputBindings(comp)
|
||||
outputBinding := loadOutputBindings(comp)
|
||||
require.True(t, inputBinding != nil || outputBinding != nil)
|
||||
bindingsConfig, err := conf_bindings.NewTestConfig(comp.Component, comp.AllOperations, comp.Operations, comp.Config)
|
||||
bindingsConfig, err := conf_bindings.NewTestConfig(comp.Component, comp.Operations, comp.Config)
|
||||
require.NoErrorf(t, err, "error running conformance test for component %s", comp.Component)
|
||||
conf_bindings.ConformanceTests(t, props, inputBinding, outputBinding, bindingsConfig)
|
||||
case "workflows":
|
||||
|
|
@ -403,7 +402,7 @@ func (tc *TestConfiguration) Run(t *testing.T) {
|
|||
props, err := tc.loadComponentsAndProperties(t, filepath)
|
||||
require.NoErrorf(t, err, "error running conformance test for component %s", comp.Component)
|
||||
wf := loadWorkflow(comp)
|
||||
wfConfig := conf_workflows.NewTestConfig(comp.Component, comp.AllOperations, comp.Operations, comp.Config)
|
||||
wfConfig := conf_workflows.NewTestConfig(comp.Component, comp.Operations, comp.Config)
|
||||
conf_workflows.ConformanceTests(t, props, wf, wfConfig)
|
||||
case "crypto":
|
||||
filepath := fmt.Sprintf("../config/crypto/%s", componentConfigPath)
|
||||
|
|
@ -411,7 +410,7 @@ func (tc *TestConfiguration) Run(t *testing.T) {
|
|||
require.NoErrorf(t, err, "error running conformance test for component %s", comp.Component)
|
||||
component := loadCryptoProvider(comp)
|
||||
require.NotNil(t, component, "error running conformance test for component %s", comp.Component)
|
||||
cryptoConfig, err := conf_crypto.NewTestConfig(comp.Component, comp.AllOperations, comp.Operations, comp.Config)
|
||||
cryptoConfig, err := conf_crypto.NewTestConfig(comp.Component, comp.Operations, comp.Config)
|
||||
require.NoErrorf(t, err, "error running conformance test for component %s", comp.Component)
|
||||
conf_crypto.ConformanceTests(t, props, component, cryptoConfig)
|
||||
case "configuration":
|
||||
|
|
@ -421,7 +420,7 @@ func (tc *TestConfiguration) Run(t *testing.T) {
|
|||
store, updater := loadConfigurationStore(comp)
|
||||
require.NotNil(t, store, "error running conformance test for component %s", comp.Component)
|
||||
require.NotNil(t, updater, "error running conformance test for component %s", comp.Component)
|
||||
configurationConfig := conf_configuration.NewTestConfig(comp.Component, comp.AllOperations, comp.Operations, comp.Config)
|
||||
configurationConfig := conf_configuration.NewTestConfig(comp.Component, comp.Operations, comp.Config)
|
||||
conf_configuration.ConformanceTests(t, props, store, updater, configurationConfig, comp.Component)
|
||||
default:
|
||||
t.Fatalf("unknown component type %s", tc.ComponentType)
|
||||
|
|
|
|||
|
|
@ -29,8 +29,7 @@ func TestDecodeYaml(t *testing.T) {
|
|||
yam := `componentType: state
|
||||
components:
|
||||
- component: redis
|
||||
allOperations: false
|
||||
operations: ["init", "set"]
|
||||
operations: ["foo", "bar"]
|
||||
config:
|
||||
maxInitDurationInMs: 20
|
||||
maxSetDurationInMs: 20
|
||||
|
|
@ -41,9 +40,8 @@ components:
|
|||
assert.NoError(t, err)
|
||||
assert.NotNil(t, config)
|
||||
assert.Equal(t, 1, len(config.Components))
|
||||
assert.False(t, config.Components[0].AllOperations)
|
||||
assert.Equal(t, "state", config.ComponentType)
|
||||
assert.Equal(t, 2, len(config.Components[0].Operations))
|
||||
assert.Equal(t, []string{"foo", "bar"}, config.Components[0].Operations)
|
||||
assert.Equal(t, 5, len(config.Components[0].Config))
|
||||
})
|
||||
|
||||
|
|
|
|||
|
|
@ -46,12 +46,11 @@ type TestConfig struct {
|
|||
utils.CommonConfig
|
||||
}
|
||||
|
||||
func NewTestConfig(componentName string, allOperations bool, operations []string, configMap map[string]interface{}) TestConfig {
|
||||
func NewTestConfig(componentName string, operations []string, configMap map[string]interface{}) TestConfig {
|
||||
tc := TestConfig{
|
||||
utils.CommonConfig{
|
||||
ComponentType: "configuration",
|
||||
ComponentName: componentName,
|
||||
AllOperations: allOperations,
|
||||
Operations: utils.NewStringSet(operations...),
|
||||
},
|
||||
}
|
||||
|
|
@ -177,7 +176,7 @@ func ConformanceTests(t *testing.T, props map[string]string, store configuration
|
|||
require.NoError(t, err, "expected no error on adding keys")
|
||||
})
|
||||
|
||||
if config.HasOperation("get") {
|
||||
t.Run("get", func(t *testing.T) {
|
||||
t.Run("get with non-empty key list", func(t *testing.T) {
|
||||
keys := getKeys(initValues1)
|
||||
|
||||
|
|
@ -218,9 +217,9 @@ func ConformanceTests(t *testing.T, props map[string]string, store configuration
|
|||
require.NoError(t, err)
|
||||
assert.Equal(t, expectedResponse, resp.Items)
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
if config.HasOperation("subscribe") {
|
||||
t.Run("subscribe", func(t *testing.T) {
|
||||
subscribeMetadata := make(map[string]string)
|
||||
if component == postgresComponent {
|
||||
subscribeMetadata[pgNotifyChannelKey] = pgNotifyChannel
|
||||
|
|
@ -318,9 +317,9 @@ func ConformanceTests(t *testing.T, props map[string]string, store configuration
|
|||
verifyMessagesReceived(t, processedC2, awaitingMessages2)
|
||||
verifyMessagesReceived(t, processedC3, awaitingMessages3)
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
if config.HasOperation("unsubscribe") {
|
||||
t.Run("unsubscribe", func(t *testing.T) {
|
||||
t.Run("unsubscribe subscriber 1", func(t *testing.T) {
|
||||
ID1 := subscribeIDs[0]
|
||||
err := store.Unsubscribe(context.Background(),
|
||||
|
|
@ -382,7 +381,7 @@ func ConformanceTests(t *testing.T, props map[string]string, store configuration
|
|||
|
||||
verifyNoMessagesReceived(t, processedC3)
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func verifyNoMessagesReceived(t *testing.T, processedChan chan *configuration.UpdateEvent) {
|
||||
|
|
|
|||
|
|
@ -71,12 +71,11 @@ type TestConfig struct {
|
|||
Keys []testConfigKey `mapstructure:"keys"`
|
||||
}
|
||||
|
||||
func NewTestConfig(name string, allOperations bool, operations []string, configMap map[string]interface{}) (TestConfig, error) {
|
||||
func NewTestConfig(name string, operations []string, configMap map[string]interface{}) (TestConfig, error) {
|
||||
testConfig := TestConfig{
|
||||
CommonConfig: utils.CommonConfig{
|
||||
ComponentType: "crypto",
|
||||
ComponentName: name,
|
||||
AllOperations: allOperations,
|
||||
Operations: utils.NewStringSet(operations...),
|
||||
},
|
||||
}
|
||||
|
|
|
|||
|
|
@ -69,13 +69,12 @@ type TestConfig struct {
|
|||
TestProjectID string `mapstructure:"testProjectID"`
|
||||
}
|
||||
|
||||
func NewTestConfig(componentName string, allOperations bool, operations []string, configMap map[string]interface{}) (TestConfig, error) {
|
||||
func NewTestConfig(componentName string, operations []string, configMap map[string]interface{}) (TestConfig, error) {
|
||||
// Populate defaults
|
||||
tc := TestConfig{
|
||||
CommonConfig: utils.CommonConfig{
|
||||
ComponentType: "pubsub",
|
||||
ComponentName: componentName,
|
||||
AllOperations: allOperations,
|
||||
Operations: utils.NewStringSet(operations...),
|
||||
},
|
||||
PubsubName: defaultPubsubName,
|
||||
|
|
@ -143,79 +142,77 @@ func ConformanceTests(t *testing.T, props map[string]string, ps pubsub.PubSub, c
|
|||
var muBulk sync.Mutex
|
||||
|
||||
// Subscribe
|
||||
if config.HasOperation("subscribe") { //nolint:nestif
|
||||
t.Run("subscribe", func(t *testing.T) {
|
||||
var counter int
|
||||
var lastSequence int
|
||||
err := ps.Subscribe(ctx, pubsub.SubscribeRequest{
|
||||
Topic: config.TestTopicName,
|
||||
Metadata: config.SubscribeMetadata,
|
||||
}, func(ctx context.Context, msg *pubsub.NewMessage) error {
|
||||
dataString := string(msg.Data)
|
||||
if !strings.HasPrefix(dataString, dataPrefix) {
|
||||
t.Logf("Ignoring message without expected prefix")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
sequence, err := strconv.Atoi(dataString[len(dataPrefix):])
|
||||
if err != nil {
|
||||
t.Logf("Message did not contain a sequence number")
|
||||
assert.Fail(t, "message did not contain a sequence number")
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// Ignore already processed messages
|
||||
// in case we receive a redelivery from the broker
|
||||
// during retries.
|
||||
mu.Lock()
|
||||
_, alreadyProcessed := processedMessages[sequence]
|
||||
mu.Unlock()
|
||||
if alreadyProcessed {
|
||||
t.Logf("Message was already processed: %d", sequence)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
counter++
|
||||
|
||||
// Only consider order when we receive a message for the first time
|
||||
// Messages that fail and are re-queued will naturally come out of order
|
||||
if errorCount == 0 {
|
||||
if sequence < lastSequence {
|
||||
outOfOrder = true
|
||||
t.Logf("Message received out of order: expected sequence >= %d, got %d", lastSequence, sequence)
|
||||
}
|
||||
|
||||
lastSequence = sequence
|
||||
}
|
||||
|
||||
// This behavior is standard to repro a failure of one message in a batch.
|
||||
if errorCount < 2 || counter%5 == 0 {
|
||||
// First message errors just to give time for more messages to pile up.
|
||||
// Second error is to force an error in a batch.
|
||||
errorCount++
|
||||
// Sleep to allow messages to pile up and be delivered as a batch.
|
||||
time.Sleep(1 * time.Second)
|
||||
t.Logf("Simulating subscriber error")
|
||||
return errors.New("conf test simulated error")
|
||||
}
|
||||
|
||||
t.Logf("Simulating subscriber success")
|
||||
actualReadCount++
|
||||
|
||||
mu.Lock()
|
||||
processedMessages[sequence] = struct{}{}
|
||||
mu.Unlock()
|
||||
|
||||
processedC <- dataString
|
||||
t.Run("subscribe", func(t *testing.T) {
|
||||
var counter int
|
||||
var lastSequence int
|
||||
err := ps.Subscribe(ctx, pubsub.SubscribeRequest{
|
||||
Topic: config.TestTopicName,
|
||||
Metadata: config.SubscribeMetadata,
|
||||
}, func(ctx context.Context, msg *pubsub.NewMessage) error {
|
||||
dataString := string(msg.Data)
|
||||
if !strings.HasPrefix(dataString, dataPrefix) {
|
||||
t.Logf("Ignoring message without expected prefix")
|
||||
|
||||
return nil
|
||||
})
|
||||
assert.NoError(t, err, "expected no error on subscribe")
|
||||
}
|
||||
|
||||
sequence, err := strconv.Atoi(dataString[len(dataPrefix):])
|
||||
if err != nil {
|
||||
t.Logf("Message did not contain a sequence number")
|
||||
assert.Fail(t, "message did not contain a sequence number")
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// Ignore already processed messages
|
||||
// in case we receive a redelivery from the broker
|
||||
// during retries.
|
||||
mu.Lock()
|
||||
_, alreadyProcessed := processedMessages[sequence]
|
||||
mu.Unlock()
|
||||
if alreadyProcessed {
|
||||
t.Logf("Message was already processed: %d", sequence)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
counter++
|
||||
|
||||
// Only consider order when we receive a message for the first time
|
||||
// Messages that fail and are re-queued will naturally come out of order
|
||||
if errorCount == 0 {
|
||||
if sequence < lastSequence {
|
||||
outOfOrder = true
|
||||
t.Logf("Message received out of order: expected sequence >= %d, got %d", lastSequence, sequence)
|
||||
}
|
||||
|
||||
lastSequence = sequence
|
||||
}
|
||||
|
||||
// This behavior is standard to repro a failure of one message in a batch.
|
||||
if errorCount < 2 || counter%5 == 0 {
|
||||
// First message errors just to give time for more messages to pile up.
|
||||
// Second error is to force an error in a batch.
|
||||
errorCount++
|
||||
// Sleep to allow messages to pile up and be delivered as a batch.
|
||||
time.Sleep(1 * time.Second)
|
||||
t.Logf("Simulating subscriber error")
|
||||
return errors.New("conf test simulated error")
|
||||
}
|
||||
|
||||
t.Logf("Simulating subscriber success")
|
||||
actualReadCount++
|
||||
|
||||
mu.Lock()
|
||||
processedMessages[sequence] = struct{}{}
|
||||
mu.Unlock()
|
||||
|
||||
processedC <- dataString
|
||||
|
||||
return nil
|
||||
})
|
||||
}
|
||||
assert.NoError(t, err, "expected no error on subscribe")
|
||||
})
|
||||
|
||||
// Bulk Subscribe
|
||||
if config.HasOperation("bulksubscribe") { //nolint:nestif
|
||||
|
|
@ -316,45 +313,44 @@ func ConformanceTests(t *testing.T, props map[string]string, ps pubsub.PubSub, c
|
|||
}
|
||||
|
||||
// Publish
|
||||
if config.HasOperation("publish") {
|
||||
t.Run("publish", func(t *testing.T) {
|
||||
// Some pubsub, like Kafka need to wait for Subscriber to be up before messages can be consumed.
|
||||
// So, wait for some time here.
|
||||
time.Sleep(config.WaitDurationToPublish)
|
||||
t.Run("publish", func(t *testing.T) {
|
||||
for k := 1; k <= config.MessageCount; k++ {
|
||||
|
||||
for k := 1; k <= config.MessageCount; k++ {
|
||||
data := []byte(fmt.Sprintf("%s%d", dataPrefix, k))
|
||||
err := ps.Publish(ctx, &pubsub.PublishRequest{
|
||||
Data: data,
|
||||
PubsubName: config.PubsubName,
|
||||
Topic: config.TestTopicName,
|
||||
Metadata: config.PublishMetadata,
|
||||
})
|
||||
if err == nil {
|
||||
awaitingMessages[string(data)] = struct{}{}
|
||||
}
|
||||
assert.NoError(t, err, "expected no error on publishing data %s on topic %s", data, config.TestTopicName)
|
||||
}
|
||||
if config.HasOperation("bulksubscribe") {
|
||||
_, ok := ps.(pubsub.BulkSubscriber)
|
||||
if !ok {
|
||||
t.Fatalf("cannot run bulkSubscribe conformance, BulkSubscriber interface not implemented by the component %s", config.ComponentName)
|
||||
}
|
||||
for k := bulkSubStartingKey; k <= (bulkSubStartingKey + config.MessageCount); k++ {
|
||||
data := []byte(fmt.Sprintf("%s%d", dataPrefix, k))
|
||||
err := ps.Publish(ctx, &pubsub.PublishRequest{
|
||||
Data: data,
|
||||
PubsubName: config.PubsubName,
|
||||
Topic: config.TestTopicName,
|
||||
Topic: config.TestTopicForBulkSub,
|
||||
Metadata: config.PublishMetadata,
|
||||
})
|
||||
if err == nil {
|
||||
awaitingMessages[string(data)] = struct{}{}
|
||||
awaitingMessagesBulk[string(data)] = struct{}{}
|
||||
}
|
||||
assert.NoError(t, err, "expected no error on publishing data %s on topic %s", data, config.TestTopicName)
|
||||
assert.NoError(t, err, "expected no error on publishing data %s on topic %s", data, config.TestTopicForBulkSub)
|
||||
}
|
||||
if config.HasOperation("bulksubscribe") {
|
||||
_, ok := ps.(pubsub.BulkSubscriber)
|
||||
if !ok {
|
||||
t.Fatalf("cannot run bulkSubscribe conformance, BulkSubscriber interface not implemented by the component %s", config.ComponentName)
|
||||
}
|
||||
for k := bulkSubStartingKey; k <= (bulkSubStartingKey + config.MessageCount); k++ {
|
||||
data := []byte(fmt.Sprintf("%s%d", dataPrefix, k))
|
||||
err := ps.Publish(ctx, &pubsub.PublishRequest{
|
||||
Data: data,
|
||||
PubsubName: config.PubsubName,
|
||||
Topic: config.TestTopicForBulkSub,
|
||||
Metadata: config.PublishMetadata,
|
||||
})
|
||||
if err == nil {
|
||||
awaitingMessagesBulk[string(data)] = struct{}{}
|
||||
}
|
||||
assert.NoError(t, err, "expected no error on publishing data %s on topic %s", data, config.TestTopicForBulkSub)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
// assumes that publish operation is run only once for publishing config.MessageCount number of events
|
||||
// bulkpublish needs to be run after publish operation
|
||||
|
|
@ -410,29 +406,27 @@ func ConformanceTests(t *testing.T, props map[string]string, ps pubsub.PubSub, c
|
|||
}
|
||||
|
||||
// Verify read
|
||||
if (config.HasOperation("publish") || config.HasOperation("bulkpublish")) && config.HasOperation("subscribe") {
|
||||
t.Run("verify read", func(t *testing.T) {
|
||||
t.Logf("waiting for %v to complete read", config.MaxReadDuration)
|
||||
timeout := time.After(config.MaxReadDuration)
|
||||
waiting := true
|
||||
for waiting {
|
||||
select {
|
||||
case processed := <-processedC:
|
||||
t.Logf("deleting %s processed message", processed)
|
||||
delete(awaitingMessages, processed)
|
||||
waiting = len(awaitingMessages) > 0
|
||||
case <-timeout:
|
||||
// Break out after the mamimum read duration has elapsed
|
||||
waiting = false
|
||||
}
|
||||
t.Run("verify read", func(t *testing.T) {
|
||||
t.Logf("waiting for %v to complete read", config.MaxReadDuration)
|
||||
timeout := time.After(config.MaxReadDuration)
|
||||
waiting := true
|
||||
for waiting {
|
||||
select {
|
||||
case processed := <-processedC:
|
||||
t.Logf("deleting %s processed message", processed)
|
||||
delete(awaitingMessages, processed)
|
||||
waiting = len(awaitingMessages) > 0
|
||||
case <-timeout:
|
||||
// Break out after the mamimum read duration has elapsed
|
||||
waiting = false
|
||||
}
|
||||
assert.False(t, config.CheckInOrderProcessing && outOfOrder, "received messages out of order")
|
||||
assert.Empty(t, awaitingMessages, "expected to read %v messages", config.MessageCount)
|
||||
})
|
||||
}
|
||||
}
|
||||
assert.False(t, config.CheckInOrderProcessing && outOfOrder, "received messages out of order")
|
||||
assert.Empty(t, awaitingMessages, "expected to read %v messages", config.MessageCount)
|
||||
})
|
||||
|
||||
// Verify read on bulk subscription
|
||||
if config.HasOperation("publish") && config.HasOperation("bulksubscribe") {
|
||||
if config.HasOperation("bulksubscribe") {
|
||||
t.Run("verify read on bulk subscription", func(t *testing.T) {
|
||||
_, ok := ps.(pubsub.BulkSubscriber)
|
||||
if !ok {
|
||||
|
|
@ -457,7 +451,7 @@ func ConformanceTests(t *testing.T, props map[string]string, ps pubsub.PubSub, c
|
|||
}
|
||||
|
||||
// Multiple handlers
|
||||
if config.HasOperation("multiplehandlers") {
|
||||
t.Run("multiple handlers", func(t *testing.T) {
|
||||
received1Ch := make(chan string)
|
||||
received2Ch := make(chan string)
|
||||
subscribe1Ctx, subscribe1Cancel := context.WithCancel(context.Background())
|
||||
|
|
@ -560,7 +554,7 @@ func ConformanceTests(t *testing.T, props map[string]string, ps pubsub.PubSub, c
|
|||
<-wait
|
||||
}
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func receiveInBackground(t *testing.T, timeout time.Duration, received1Ch <-chan string, received2Ch <-chan string, sent1Ch <-chan string, sent2Ch <-chan string, allSentCh <-chan bool) <-chan struct{} {
|
||||
|
|
|
|||
|
|
@ -29,12 +29,11 @@ type TestConfig struct {
|
|||
utils.CommonConfig
|
||||
}
|
||||
|
||||
func NewTestConfig(name string, allOperations bool, operations []string) TestConfig {
|
||||
func NewTestConfig(name string, operations []string) TestConfig {
|
||||
tc := TestConfig{
|
||||
CommonConfig: utils.CommonConfig{
|
||||
ComponentType: "secretstores",
|
||||
ComponentName: name,
|
||||
AllOperations: allOperations,
|
||||
Operations: utils.NewStringSet(operations...),
|
||||
},
|
||||
}
|
||||
|
|
@ -69,7 +68,7 @@ func ConformanceTests(t *testing.T, props map[string]string, store secretstores.
|
|||
})
|
||||
|
||||
// Get
|
||||
if config.HasOperation("get") {
|
||||
t.Run("get", func(t *testing.T) {
|
||||
getSecretRequest := secretstores.GetSecretRequest{
|
||||
Name: "conftestsecret",
|
||||
}
|
||||
|
|
@ -86,10 +85,10 @@ func ConformanceTests(t *testing.T, props map[string]string, store secretstores.
|
|||
assert.NotNil(t, resp.Data, "expected value to be returned")
|
||||
assert.Equal(t, getSecretResponse.Data, resp.Data, "expected values to be equal")
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
// Bulkget
|
||||
if config.HasOperation("bulkget") {
|
||||
t.Run("bulkGet", func(t *testing.T) {
|
||||
bulkReq := secretstores.BulkGetSecretRequest{}
|
||||
expectedData := map[string]map[string]string{
|
||||
"conftestsecret": {
|
||||
|
|
@ -117,5 +116,5 @@ func ConformanceTests(t *testing.T, props map[string]string, store secretstores.
|
|||
assert.Equal(t, m, resp.Data[k], "expected values to be equal")
|
||||
}
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
|
|||
|
|
@ -65,12 +65,11 @@ type TestConfig struct {
|
|||
BadEtag string `mapstructure:"badEtag"`
|
||||
}
|
||||
|
||||
func NewTestConfig(component string, allOperations bool, operations []string, configMap map[string]interface{}) (TestConfig, error) {
|
||||
func NewTestConfig(component string, operations []string, configMap map[string]interface{}) (TestConfig, error) {
|
||||
testConfig := TestConfig{
|
||||
CommonConfig: utils.CommonConfig{
|
||||
ComponentType: "state",
|
||||
ComponentName: component,
|
||||
AllOperations: allOperations,
|
||||
Operations: utils.NewStringSet(operations...),
|
||||
},
|
||||
BadEtag: "bad-etag",
|
||||
|
|
|
|||
|
|
@ -30,7 +30,6 @@ import (
|
|||
type CommonConfig struct {
|
||||
ComponentType string
|
||||
ComponentName string
|
||||
AllOperations bool
|
||||
Operations map[string]struct{}
|
||||
}
|
||||
|
||||
|
|
@ -45,11 +44,7 @@ var (
|
|||
)
|
||||
|
||||
func (cc CommonConfig) HasOperation(operation string) bool {
|
||||
if cc.AllOperations {
|
||||
return true
|
||||
}
|
||||
_, exists := cc.Operations[operation]
|
||||
|
||||
return exists
|
||||
}
|
||||
|
||||
|
|
@ -58,7 +53,6 @@ func (cc CommonConfig) CopyMap(config map[string]string) map[string]string {
|
|||
for k, v := range config {
|
||||
m[k] = v
|
||||
}
|
||||
|
||||
return m
|
||||
}
|
||||
|
||||
|
|
@ -134,13 +128,3 @@ func NewStringSet(values ...string) map[string]struct{} {
|
|||
|
||||
return set
|
||||
}
|
||||
|
||||
func Contains[V comparable](arr []V, str V) bool {
|
||||
for _, a := range arr {
|
||||
if a == str {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,14 +21,6 @@ import (
|
|||
)
|
||||
|
||||
func TestHasOperation(t *testing.T) {
|
||||
t.Run("all operations", func(t *testing.T) {
|
||||
cc := CommonConfig{
|
||||
ComponentType: "state",
|
||||
ComponentName: "redis",
|
||||
AllOperations: true,
|
||||
}
|
||||
assert.True(t, cc.HasOperation("op"))
|
||||
})
|
||||
t.Run("operations list", func(t *testing.T) {
|
||||
cc := CommonConfig{
|
||||
ComponentType: "state",
|
||||
|
|
@ -45,7 +37,6 @@ func TestCopyMap(t *testing.T) {
|
|||
cc := CommonConfig{
|
||||
ComponentType: "state",
|
||||
ComponentName: "redis",
|
||||
AllOperations: true,
|
||||
}
|
||||
in := map[string]string{
|
||||
"k": "v",
|
||||
|
|
|
|||
|
|
@ -35,12 +35,11 @@ type TestConfig struct {
|
|||
utils.CommonConfig
|
||||
}
|
||||
|
||||
func NewTestConfig(component string, allOperations bool, operations []string, conf map[string]interface{}) TestConfig {
|
||||
func NewTestConfig(component string, operations []string, conf map[string]interface{}) TestConfig {
|
||||
tc := TestConfig{
|
||||
CommonConfig: utils.CommonConfig{
|
||||
ComponentType: "workflows",
|
||||
ComponentName: component,
|
||||
AllOperations: allOperations,
|
||||
Operations: utils.NewStringSet(operations...),
|
||||
},
|
||||
}
|
||||
|
|
@ -59,7 +58,7 @@ func ConformanceTests(t *testing.T, props map[string]string, workflowItem workfl
|
|||
})
|
||||
|
||||
// Everything is within the same task since the workflow needs to persist between operations
|
||||
if config.HasOperation("start") {
|
||||
t.Run("start", func(t *testing.T) {
|
||||
testLogger.Info("Start test running...")
|
||||
|
||||
inputBytes, _ := json.Marshal(10) // Time that the activity within the workflow runs for
|
||||
|
|
@ -111,5 +110,5 @@ func ConformanceTests(t *testing.T, props map[string]string, workflowItem workfl
|
|||
assert.Equal(t, "TestID", resp.Workflow.InstanceID)
|
||||
})
|
||||
testLogger.Info("Start test done.")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue