416 lines
14 KiB
Go
416 lines
14 KiB
Go
/*
|
|
Copyright 2021 The Dapr Authors
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package postgres_test
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"strconv"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/dapr/components-contrib/configuration"
|
|
config_postgres "github.com/dapr/components-contrib/configuration/postgres"
|
|
"github.com/dapr/components-contrib/tests/certification/embedded"
|
|
"github.com/dapr/components-contrib/tests/certification/flow"
|
|
"github.com/dapr/components-contrib/tests/certification/flow/dockercompose"
|
|
"github.com/dapr/components-contrib/tests/certification/flow/network"
|
|
"github.com/dapr/components-contrib/tests/certification/flow/retry"
|
|
"github.com/dapr/components-contrib/tests/certification/flow/sidecar"
|
|
cu_postgres "github.com/dapr/components-contrib/tests/utils/configupdater/postgres"
|
|
configuration_loader "github.com/dapr/dapr/pkg/components/configuration"
|
|
"github.com/dapr/dapr/pkg/runtime"
|
|
dapr_testing "github.com/dapr/dapr/pkg/testing"
|
|
dapr "github.com/dapr/go-sdk/client"
|
|
"github.com/dapr/kit/logger"
|
|
"github.com/google/uuid"
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"github.com/dapr/components-contrib/metadata"
|
|
"github.com/dapr/components-contrib/tests/certification/flow/watcher"
|
|
)
|
|
|
|
const (
|
|
dockerComposeYAML = "docker-compose.yml"
|
|
storeName = "configstore"
|
|
key1 = "key1"
|
|
key2 = "key2"
|
|
Val1 = "val1"
|
|
Val2 = "val2"
|
|
sidecarName1 = "dapr-1"
|
|
sidecarName2 = "dapr-2"
|
|
connectionString = "host=localhost user=postgres password=example port=5432 connect_timeout=10 database=dapr_test"
|
|
configTable = "configtable"
|
|
connectionStringKey = "connectionString"
|
|
configTableKey = "table"
|
|
pgNotifyChannelKey = "pgNotifyChannel"
|
|
pgNotifyChannel = "config"
|
|
portOffset = 2
|
|
channel1 = "channel1"
|
|
channel2 = "channel2"
|
|
)
|
|
|
|
var (
|
|
runID = strings.ReplaceAll(uuid.Must(uuid.NewRandom()).String(), "-", "_")
|
|
counter = 0
|
|
subscribeIDs = make(map[string][]string)
|
|
)
|
|
|
|
var updater *cu_postgres.ConfigUpdater
|
|
|
|
// Cast go-sdk ConfigurationItem to contrib ConfigurationItem
|
|
func castConfigurationItems(items map[string]*dapr.ConfigurationItem) map[string]*configuration.Item {
|
|
configItems := make(map[string]*configuration.Item)
|
|
for key, item := range items {
|
|
configItems[key] = &configuration.Item{
|
|
Value: item.Value,
|
|
Version: item.Version,
|
|
Metadata: item.Metadata,
|
|
}
|
|
}
|
|
return configItems
|
|
}
|
|
|
|
// Create UpdateEvent struct from given key, val pair
|
|
func getUpdateEvent(key string, val string) configuration.UpdateEvent {
|
|
expectedUpdateEvent := configuration.UpdateEvent{
|
|
Items: map[string]*configuration.Item{
|
|
key: {
|
|
Value: val,
|
|
},
|
|
},
|
|
}
|
|
return expectedUpdateEvent
|
|
}
|
|
|
|
func expectMessage(message *watcher.Watcher, key string, val string, version string) {
|
|
updateEvent := configuration.UpdateEvent{
|
|
Items: map[string]*configuration.Item{
|
|
key: {
|
|
Value: val,
|
|
Version: version,
|
|
},
|
|
},
|
|
}
|
|
updateEventInJson, _ := json.Marshal(updateEvent)
|
|
message.Expect(string(updateEventInJson))
|
|
}
|
|
|
|
func addKey(key, val, version string) error {
|
|
items := make(map[string]*configuration.Item)
|
|
items[key] = &configuration.Item{
|
|
Value: val,
|
|
Version: version,
|
|
}
|
|
err := updater.AddKey(items)
|
|
if err != nil {
|
|
return fmt.Errorf("error adding key: %s", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func TestPostgres(t *testing.T) {
|
|
log := logger.NewLogger("dapr.components")
|
|
|
|
// Updater client to update config table
|
|
updater = cu_postgres.NewPostgresConfigUpdater(log).(*cu_postgres.ConfigUpdater)
|
|
|
|
ports, err := dapr_testing.GetFreePorts(2)
|
|
require.NoError(t, err)
|
|
currentGrpcPort := ports[0]
|
|
currentHTTPPort := ports[1]
|
|
|
|
watcher1 := watcher.NewUnordered()
|
|
watcher2 := watcher.NewUnordered()
|
|
watcher3 := watcher.NewUnordered()
|
|
watcher4 := watcher.NewUnordered()
|
|
|
|
// Initialize the updater
|
|
initUpdater := func(ctx flow.Context) error {
|
|
err := updater.Init(map[string]string{
|
|
connectionStringKey: connectionString,
|
|
configTableKey: configTable,
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Create triggers for given channels
|
|
createTriggers := func(channels []string) flow.Runnable {
|
|
return func(ctx flow.Context) error {
|
|
for _, channel := range channels {
|
|
err := updater.CreateTrigger(channel)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// Tests store init in different scenarios
|
|
initTest := func(ctx flow.Context) error {
|
|
md := configuration.Metadata{
|
|
Base: metadata.Base{
|
|
Name: "inittest",
|
|
Properties: map[string]string{
|
|
connectionStringKey: connectionString,
|
|
},
|
|
},
|
|
}
|
|
t.Run("Init with non-existing table", func(t *testing.T) {
|
|
md.Base.Properties[configTableKey] = "configtable2"
|
|
storeobj := config_postgres.NewPostgresConfigurationStore(log).(*config_postgres.ConfigurationStore)
|
|
err := storeobj.Init(ctx, md)
|
|
require.Error(t, err)
|
|
require.Equal(t, err.Error(), "postgreSQL configuration table 'configtable2' does not exist")
|
|
})
|
|
t.Run("Init with upper cased tablename", func(t *testing.T) {
|
|
upperCasedConfigTable := strings.ToUpper(configTable)
|
|
md.Base.Properties[configTableKey] = upperCasedConfigTable
|
|
storeobj := config_postgres.NewPostgresConfigurationStore(log).(*config_postgres.ConfigurationStore)
|
|
err := storeobj.Init(ctx, md)
|
|
require.Error(t, err)
|
|
require.Equal(t, err.Error(), "invalid table name 'CONFIGTABLE'. non-alphanumerics or upper cased table names are not supported")
|
|
})
|
|
t.Run("Init with existing table", func(t *testing.T) {
|
|
md.Base.Properties[configTableKey] = configTable
|
|
storeobj := config_postgres.NewPostgresConfigurationStore(log).(*config_postgres.ConfigurationStore)
|
|
err := storeobj.Init(ctx, md)
|
|
require.NoError(t, err)
|
|
})
|
|
return nil
|
|
}
|
|
|
|
// Subscribes to given keys and channel
|
|
subscribefn := func(keys []string, channel string, sidecarName string, message *watcher.Watcher) flow.Runnable {
|
|
return func(ctx flow.Context) error {
|
|
client := sidecar.GetClient(ctx, sidecarName)
|
|
message.Reset()
|
|
subscribeID, errSubscribe := client.SubscribeConfigurationItems(ctx, storeName, keys, func(id string, items map[string]*dapr.ConfigurationItem) {
|
|
updateEvent := &configuration.UpdateEvent{
|
|
Items: castConfigurationItems(items),
|
|
}
|
|
updateEventInJson, err := json.Marshal(updateEvent)
|
|
require.NoError(t, err)
|
|
message.Observe(string(updateEventInJson))
|
|
}, func(md map[string]string) {
|
|
md[pgNotifyChannelKey] = channel
|
|
})
|
|
if subscribeIDs[sidecarName] == nil {
|
|
subscribeIDs[sidecarName] = make([]string, 0)
|
|
}
|
|
subscribeIDs[sidecarName] = append(subscribeIDs[sidecarName], subscribeID)
|
|
return errSubscribe
|
|
}
|
|
}
|
|
|
|
testGet := func(ctx flow.Context) error {
|
|
client := sidecar.GetClient(ctx, sidecarName1)
|
|
// Delete key1 if it exists
|
|
err := updater.DeleteKey([]string{key1})
|
|
require.NoError(t, err, "error deleting key")
|
|
|
|
// Add key1 without any version
|
|
err = addKey(key1, "val-no-version", "")
|
|
require.NoError(t, err, "error adding key")
|
|
items, err := client.GetConfigurationItems(ctx, storeName, []string{key1})
|
|
require.NoError(t, err)
|
|
require.Equal(t, 1, len(items))
|
|
// Should return the value added
|
|
require.Equal(t, "val-no-version", items[key1].Value)
|
|
|
|
// Add key1 with version 1
|
|
err = addKey(key1, "val-version-1", "1")
|
|
require.NoError(t, err, "error adding key")
|
|
items, err = client.GetConfigurationItems(ctx, storeName, []string{key1})
|
|
require.NoError(t, err)
|
|
require.Equal(t, 1, len(items))
|
|
// Should return the value with version 1
|
|
require.Equal(t, "val-version-1", items[key1].Value)
|
|
|
|
// Add key1 with version 2
|
|
err = addKey(key1, "val-version-2", "2")
|
|
require.NoError(t, err, "error adding key")
|
|
items, err = client.GetConfigurationItems(ctx, storeName, []string{key1})
|
|
require.NoError(t, err)
|
|
require.Equal(t, 1, len(items))
|
|
// Should return the value with version 2
|
|
require.Equal(t, "val-version-2", items[key1].Value)
|
|
|
|
// Add key1 with non-numeric version
|
|
err = addKey(key1, "val-non-numeric-version", "non-numeric")
|
|
require.NoError(t, err, "error adding key")
|
|
items, err = client.GetConfigurationItems(ctx, storeName, []string{key1})
|
|
require.NoError(t, err)
|
|
require.Equal(t, 1, len(items))
|
|
// Should return the value with version 2
|
|
require.Equal(t, "val-version-2", items[key1].Value)
|
|
|
|
// Delete key1
|
|
err = updater.DeleteKey([]string{key1})
|
|
require.NoError(t, err, "error deleting key")
|
|
return nil
|
|
}
|
|
|
|
// Tests subscribe by adding key and checking if the update event is received
|
|
testSubscribe := func(messages ...*watcher.Watcher) flow.Runnable {
|
|
return func(ctx flow.Context) error {
|
|
for _, message := range messages {
|
|
message.Reset()
|
|
expectMessage(message, key1, Val1, "")
|
|
expectMessage(message, key2, Val2, "")
|
|
}
|
|
|
|
err := addKey(key1, Val1, "")
|
|
require.NoError(t, err, "error adding key")
|
|
err = addKey(key2, Val2, "")
|
|
require.NoError(t, err, "error adding key")
|
|
|
|
for _, message := range messages {
|
|
message.Assert(t, 10*time.Second)
|
|
}
|
|
return nil
|
|
}
|
|
}
|
|
|
|
// Saves a key into config table before restart
|
|
saveBeforeRestart := func(ctx flow.Context) error {
|
|
items := map[string]*configuration.Item{
|
|
key1: {
|
|
Value: Val1,
|
|
},
|
|
}
|
|
err := updater.AddKey(items)
|
|
return err
|
|
}
|
|
|
|
// Checks if the key is present after restart
|
|
getAfterRestart := func(ctx flow.Context) error {
|
|
client := sidecar.GetClient(ctx, sidecarName1)
|
|
|
|
item, err := client.GetConfigurationItem(ctx, storeName, key1)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
require.Equal(t, item.Value, Val1)
|
|
return nil
|
|
|
|
}
|
|
|
|
// Checks if the connection to postgres db is successful
|
|
checkPostgresConnection := func(ctx flow.Context) error {
|
|
config, err := pgxpool.ParseConfig(connectionString)
|
|
if err != nil {
|
|
return fmt.Errorf("postgres configuration store connection error : %w", err)
|
|
}
|
|
pool, err := pgxpool.NewWithConfig(ctx, config)
|
|
if err != nil {
|
|
return fmt.Errorf("postgres configuration store connection error : %w", err)
|
|
}
|
|
err = pool.Ping(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("postgres configuration store ping error : %w", err)
|
|
}
|
|
log.Info("Setup for Postgres done")
|
|
return nil
|
|
}
|
|
|
|
stopSubscribers := func(sidecars []string) flow.Runnable {
|
|
return func(ctx flow.Context) error {
|
|
for _, sidecarName := range sidecars {
|
|
client := sidecar.GetClient(ctx, sidecarName)
|
|
for _, subscribeID := range subscribeIDs[sidecarName] {
|
|
err := client.UnsubscribeConfigurationItems(ctx, storeName, subscribeID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
subscribeIDs[sidecarName] = nil
|
|
}
|
|
return nil
|
|
}
|
|
}
|
|
|
|
flow.New(t, "postgres certification test").
|
|
// Run Postgres server
|
|
Step(dockercompose.Run("db", dockerComposeYAML)).
|
|
Step("wait for postgres to be ready", retry.Do(time.Second*3, 10, checkPostgresConnection)).
|
|
Step("initialize updater", initUpdater).
|
|
Step("Init test", initTest).
|
|
//Running Dapr Sidecar `dapr-1`
|
|
Step(sidecar.Run(sidecarName1,
|
|
append(componentRuntimeOptions(),
|
|
embedded.WithoutApp(),
|
|
embedded.WithDaprGRPCPort(strconv.Itoa(currentGrpcPort)),
|
|
embedded.WithDaprHTTPPort(strconv.Itoa(currentHTTPPort)),
|
|
embedded.WithComponentsPath("components/default"),
|
|
)...,
|
|
)).
|
|
Step("Test get", testGet).
|
|
// Creating triggers for subscribers
|
|
Step("Creating trigger", createTriggers([]string{channel1, channel2})).
|
|
//Running Dapr Sidecar `dapr-2`
|
|
Step(sidecar.Run(sidecarName2,
|
|
append(componentRuntimeOptions(),
|
|
embedded.WithoutApp(),
|
|
embedded.WithDaprGRPCPort(strconv.Itoa(currentGrpcPort+portOffset)),
|
|
embedded.WithDaprHTTPPort(strconv.Itoa(currentHTTPPort+portOffset)),
|
|
embedded.WithComponentsPath("components/default"),
|
|
embedded.WithProfilePort(strconv.Itoa(runtime.DefaultProfilePort+portOffset)),
|
|
)...,
|
|
)).
|
|
//
|
|
// Start subscribers subscribing to {key1} using different channels
|
|
Step("start subscriber 1", subscribefn([]string{key1, key2}, channel1, sidecarName1, watcher1)).
|
|
Step("start subscriber 2", subscribefn([]string{key1, key2}, channel2, sidecarName1, watcher2)).
|
|
Step("start subscriber 3", subscribefn([]string{key1, key2}, channel1, sidecarName2, watcher3)).
|
|
Step("start subscriber 4", subscribefn([]string{key1, key2}, channel2, sidecarName2, watcher4)).
|
|
Step("wait for subscriber to be ready", flow.Sleep(5*time.Second)).
|
|
// Add key in postgres and verify the update event is received by the subscriber
|
|
Step("testSubscribe", testSubscribe(watcher1, watcher2, watcher3, watcher4)).
|
|
Step("reset", flow.Reset(watcher1, watcher2, watcher3, watcher4)).
|
|
// Simulate network interruptions and verify the connection still remains intact
|
|
Step("interrupt network",
|
|
network.InterruptNetwork(10*time.Second, nil, nil, "5432:5432")).
|
|
Step("testSubscribe", testSubscribe(watcher1, watcher2, watcher3, watcher4)).
|
|
Step("stop subscribers", stopSubscribers([]string{sidecarName1, sidecarName2})).
|
|
Step("reset", flow.Reset(watcher1, watcher2, watcher3, watcher4)).
|
|
// Save a key before restarting postgres server
|
|
Step("Save before restart", saveBeforeRestart).
|
|
Step("stop postgresql", dockercompose.Stop("db", dockerComposeYAML, "db")).
|
|
Step("wait for component to stop", flow.Sleep(5*time.Second)).
|
|
Step("start postgresql", dockercompose.Start("db", dockerComposeYAML, "db")).
|
|
Step("wait for component to start", flow.Sleep(5*time.Second)).
|
|
// Verify the key is still present after restart
|
|
Step("run connection test", getAfterRestart).
|
|
Run()
|
|
|
|
}
|
|
func componentRuntimeOptions() []embedded.Option {
|
|
log := logger.NewLogger("dapr.components")
|
|
|
|
configurationRegistry := configuration_loader.NewRegistry()
|
|
configurationRegistry.Logger = log
|
|
configurationRegistry.RegisterComponent(config_postgres.NewPostgresConfigurationStore, "postgres")
|
|
|
|
return []embedded.Option{
|
|
embedded.WithConfigurations(configurationRegistry),
|
|
}
|
|
}
|