components-contrib/tests/certification/configuration/postgres/postgres_test.go

416 lines
14 KiB
Go

/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package postgres_test
import (
"encoding/json"
"fmt"
"strconv"
"strings"
"testing"
"time"
"github.com/dapr/components-contrib/configuration"
config_postgres "github.com/dapr/components-contrib/configuration/postgres"
"github.com/dapr/components-contrib/tests/certification/embedded"
"github.com/dapr/components-contrib/tests/certification/flow"
"github.com/dapr/components-contrib/tests/certification/flow/dockercompose"
"github.com/dapr/components-contrib/tests/certification/flow/network"
"github.com/dapr/components-contrib/tests/certification/flow/retry"
"github.com/dapr/components-contrib/tests/certification/flow/sidecar"
cu_postgres "github.com/dapr/components-contrib/tests/utils/configupdater/postgres"
configuration_loader "github.com/dapr/dapr/pkg/components/configuration"
"github.com/dapr/dapr/pkg/runtime"
dapr_testing "github.com/dapr/dapr/pkg/testing"
dapr "github.com/dapr/go-sdk/client"
"github.com/dapr/kit/logger"
"github.com/google/uuid"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/stretchr/testify/require"
"github.com/dapr/components-contrib/metadata"
"github.com/dapr/components-contrib/tests/certification/flow/watcher"
)
const (
dockerComposeYAML = "docker-compose.yml"
storeName = "configstore"
key1 = "key1"
key2 = "key2"
Val1 = "val1"
Val2 = "val2"
sidecarName1 = "dapr-1"
sidecarName2 = "dapr-2"
connectionString = "host=localhost user=postgres password=example port=5432 connect_timeout=10 database=dapr_test"
configTable = "configtable"
connectionStringKey = "connectionString"
configTableKey = "table"
pgNotifyChannelKey = "pgNotifyChannel"
pgNotifyChannel = "config"
portOffset = 2
channel1 = "channel1"
channel2 = "channel2"
)
var (
runID = strings.ReplaceAll(uuid.Must(uuid.NewRandom()).String(), "-", "_")
counter = 0
subscribeIDs = make(map[string][]string)
)
var updater *cu_postgres.ConfigUpdater
// Cast go-sdk ConfigurationItem to contrib ConfigurationItem
func castConfigurationItems(items map[string]*dapr.ConfigurationItem) map[string]*configuration.Item {
configItems := make(map[string]*configuration.Item)
for key, item := range items {
configItems[key] = &configuration.Item{
Value: item.Value,
Version: item.Version,
Metadata: item.Metadata,
}
}
return configItems
}
// Create UpdateEvent struct from given key, val pair
func getUpdateEvent(key string, val string) configuration.UpdateEvent {
expectedUpdateEvent := configuration.UpdateEvent{
Items: map[string]*configuration.Item{
key: {
Value: val,
},
},
}
return expectedUpdateEvent
}
func expectMessage(message *watcher.Watcher, key string, val string, version string) {
updateEvent := configuration.UpdateEvent{
Items: map[string]*configuration.Item{
key: {
Value: val,
Version: version,
},
},
}
updateEventInJson, _ := json.Marshal(updateEvent)
message.Expect(string(updateEventInJson))
}
func addKey(key, val, version string) error {
items := make(map[string]*configuration.Item)
items[key] = &configuration.Item{
Value: val,
Version: version,
}
err := updater.AddKey(items)
if err != nil {
return fmt.Errorf("error adding key: %s", err)
}
return nil
}
func TestPostgres(t *testing.T) {
log := logger.NewLogger("dapr.components")
// Updater client to update config table
updater = cu_postgres.NewPostgresConfigUpdater(log).(*cu_postgres.ConfigUpdater)
ports, err := dapr_testing.GetFreePorts(2)
require.NoError(t, err)
currentGrpcPort := ports[0]
currentHTTPPort := ports[1]
watcher1 := watcher.NewUnordered()
watcher2 := watcher.NewUnordered()
watcher3 := watcher.NewUnordered()
watcher4 := watcher.NewUnordered()
// Initialize the updater
initUpdater := func(ctx flow.Context) error {
err := updater.Init(map[string]string{
connectionStringKey: connectionString,
configTableKey: configTable,
})
if err != nil {
return err
}
return nil
}
// Create triggers for given channels
createTriggers := func(channels []string) flow.Runnable {
return func(ctx flow.Context) error {
for _, channel := range channels {
err := updater.CreateTrigger(channel)
if err != nil {
return err
}
}
return nil
}
}
// Tests store init in different scenarios
initTest := func(ctx flow.Context) error {
md := configuration.Metadata{
Base: metadata.Base{
Name: "inittest",
Properties: map[string]string{
connectionStringKey: connectionString,
},
},
}
t.Run("Init with non-existing table", func(t *testing.T) {
md.Base.Properties[configTableKey] = "configtable2"
storeobj := config_postgres.NewPostgresConfigurationStore(log).(*config_postgres.ConfigurationStore)
err := storeobj.Init(ctx, md)
require.Error(t, err)
require.Equal(t, err.Error(), "postgreSQL configuration table 'configtable2' does not exist")
})
t.Run("Init with upper cased tablename", func(t *testing.T) {
upperCasedConfigTable := strings.ToUpper(configTable)
md.Base.Properties[configTableKey] = upperCasedConfigTable
storeobj := config_postgres.NewPostgresConfigurationStore(log).(*config_postgres.ConfigurationStore)
err := storeobj.Init(ctx, md)
require.Error(t, err)
require.Equal(t, err.Error(), "invalid table name 'CONFIGTABLE'. non-alphanumerics or upper cased table names are not supported")
})
t.Run("Init with existing table", func(t *testing.T) {
md.Base.Properties[configTableKey] = configTable
storeobj := config_postgres.NewPostgresConfigurationStore(log).(*config_postgres.ConfigurationStore)
err := storeobj.Init(ctx, md)
require.NoError(t, err)
})
return nil
}
// Subscribes to given keys and channel
subscribefn := func(keys []string, channel string, sidecarName string, message *watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
client := sidecar.GetClient(ctx, sidecarName)
message.Reset()
subscribeID, errSubscribe := client.SubscribeConfigurationItems(ctx, storeName, keys, func(id string, items map[string]*dapr.ConfigurationItem) {
updateEvent := &configuration.UpdateEvent{
Items: castConfigurationItems(items),
}
updateEventInJson, err := json.Marshal(updateEvent)
require.NoError(t, err)
message.Observe(string(updateEventInJson))
}, func(md map[string]string) {
md[pgNotifyChannelKey] = channel
})
if subscribeIDs[sidecarName] == nil {
subscribeIDs[sidecarName] = make([]string, 0)
}
subscribeIDs[sidecarName] = append(subscribeIDs[sidecarName], subscribeID)
return errSubscribe
}
}
testGet := func(ctx flow.Context) error {
client := sidecar.GetClient(ctx, sidecarName1)
// Delete key1 if it exists
err := updater.DeleteKey([]string{key1})
require.NoError(t, err, "error deleting key")
// Add key1 without any version
err = addKey(key1, "val-no-version", "")
require.NoError(t, err, "error adding key")
items, err := client.GetConfigurationItems(ctx, storeName, []string{key1})
require.NoError(t, err)
require.Equal(t, 1, len(items))
// Should return the value added
require.Equal(t, "val-no-version", items[key1].Value)
// Add key1 with version 1
err = addKey(key1, "val-version-1", "1")
require.NoError(t, err, "error adding key")
items, err = client.GetConfigurationItems(ctx, storeName, []string{key1})
require.NoError(t, err)
require.Equal(t, 1, len(items))
// Should return the value with version 1
require.Equal(t, "val-version-1", items[key1].Value)
// Add key1 with version 2
err = addKey(key1, "val-version-2", "2")
require.NoError(t, err, "error adding key")
items, err = client.GetConfigurationItems(ctx, storeName, []string{key1})
require.NoError(t, err)
require.Equal(t, 1, len(items))
// Should return the value with version 2
require.Equal(t, "val-version-2", items[key1].Value)
// Add key1 with non-numeric version
err = addKey(key1, "val-non-numeric-version", "non-numeric")
require.NoError(t, err, "error adding key")
items, err = client.GetConfigurationItems(ctx, storeName, []string{key1})
require.NoError(t, err)
require.Equal(t, 1, len(items))
// Should return the value with version 2
require.Equal(t, "val-version-2", items[key1].Value)
// Delete key1
err = updater.DeleteKey([]string{key1})
require.NoError(t, err, "error deleting key")
return nil
}
// Tests subscribe by adding key and checking if the update event is received
testSubscribe := func(messages ...*watcher.Watcher) flow.Runnable {
return func(ctx flow.Context) error {
for _, message := range messages {
message.Reset()
expectMessage(message, key1, Val1, "")
expectMessage(message, key2, Val2, "")
}
err := addKey(key1, Val1, "")
require.NoError(t, err, "error adding key")
err = addKey(key2, Val2, "")
require.NoError(t, err, "error adding key")
for _, message := range messages {
message.Assert(t, 10*time.Second)
}
return nil
}
}
// Saves a key into config table before restart
saveBeforeRestart := func(ctx flow.Context) error {
items := map[string]*configuration.Item{
key1: {
Value: Val1,
},
}
err := updater.AddKey(items)
return err
}
// Checks if the key is present after restart
getAfterRestart := func(ctx flow.Context) error {
client := sidecar.GetClient(ctx, sidecarName1)
item, err := client.GetConfigurationItem(ctx, storeName, key1)
if err != nil {
return err
}
require.Equal(t, item.Value, Val1)
return nil
}
// Checks if the connection to postgres db is successful
checkPostgresConnection := func(ctx flow.Context) error {
config, err := pgxpool.ParseConfig(connectionString)
if err != nil {
return fmt.Errorf("postgres configuration store connection error : %w", err)
}
pool, err := pgxpool.NewWithConfig(ctx, config)
if err != nil {
return fmt.Errorf("postgres configuration store connection error : %w", err)
}
err = pool.Ping(ctx)
if err != nil {
return fmt.Errorf("postgres configuration store ping error : %w", err)
}
log.Info("Setup for Postgres done")
return nil
}
stopSubscribers := func(sidecars []string) flow.Runnable {
return func(ctx flow.Context) error {
for _, sidecarName := range sidecars {
client := sidecar.GetClient(ctx, sidecarName)
for _, subscribeID := range subscribeIDs[sidecarName] {
err := client.UnsubscribeConfigurationItems(ctx, storeName, subscribeID)
if err != nil {
return err
}
}
subscribeIDs[sidecarName] = nil
}
return nil
}
}
flow.New(t, "postgres certification test").
// Run Postgres server
Step(dockercompose.Run("db", dockerComposeYAML)).
Step("wait for postgres to be ready", retry.Do(time.Second*3, 10, checkPostgresConnection)).
Step("initialize updater", initUpdater).
Step("Init test", initTest).
//Running Dapr Sidecar `dapr-1`
Step(sidecar.Run(sidecarName1,
append(componentRuntimeOptions(),
embedded.WithoutApp(),
embedded.WithDaprGRPCPort(strconv.Itoa(currentGrpcPort)),
embedded.WithDaprHTTPPort(strconv.Itoa(currentHTTPPort)),
embedded.WithComponentsPath("components/default"),
)...,
)).
Step("Test get", testGet).
// Creating triggers for subscribers
Step("Creating trigger", createTriggers([]string{channel1, channel2})).
//Running Dapr Sidecar `dapr-2`
Step(sidecar.Run(sidecarName2,
append(componentRuntimeOptions(),
embedded.WithoutApp(),
embedded.WithDaprGRPCPort(strconv.Itoa(currentGrpcPort+portOffset)),
embedded.WithDaprHTTPPort(strconv.Itoa(currentHTTPPort+portOffset)),
embedded.WithComponentsPath("components/default"),
embedded.WithProfilePort(strconv.Itoa(runtime.DefaultProfilePort+portOffset)),
)...,
)).
//
// Start subscribers subscribing to {key1} using different channels
Step("start subscriber 1", subscribefn([]string{key1, key2}, channel1, sidecarName1, watcher1)).
Step("start subscriber 2", subscribefn([]string{key1, key2}, channel2, sidecarName1, watcher2)).
Step("start subscriber 3", subscribefn([]string{key1, key2}, channel1, sidecarName2, watcher3)).
Step("start subscriber 4", subscribefn([]string{key1, key2}, channel2, sidecarName2, watcher4)).
Step("wait for subscriber to be ready", flow.Sleep(5*time.Second)).
// Add key in postgres and verify the update event is received by the subscriber
Step("testSubscribe", testSubscribe(watcher1, watcher2, watcher3, watcher4)).
Step("reset", flow.Reset(watcher1, watcher2, watcher3, watcher4)).
// Simulate network interruptions and verify the connection still remains intact
Step("interrupt network",
network.InterruptNetwork(10*time.Second, nil, nil, "5432:5432")).
Step("testSubscribe", testSubscribe(watcher1, watcher2, watcher3, watcher4)).
Step("stop subscribers", stopSubscribers([]string{sidecarName1, sidecarName2})).
Step("reset", flow.Reset(watcher1, watcher2, watcher3, watcher4)).
// Save a key before restarting postgres server
Step("Save before restart", saveBeforeRestart).
Step("stop postgresql", dockercompose.Stop("db", dockerComposeYAML, "db")).
Step("wait for component to stop", flow.Sleep(5*time.Second)).
Step("start postgresql", dockercompose.Start("db", dockerComposeYAML, "db")).
Step("wait for component to start", flow.Sleep(5*time.Second)).
// Verify the key is still present after restart
Step("run connection test", getAfterRestart).
Run()
}
func componentRuntimeOptions() []embedded.Option {
log := logger.NewLogger("dapr.components")
configurationRegistry := configuration_loader.NewRegistry()
configurationRegistry.Logger = log
configurationRegistry.RegisterComponent(config_postgres.NewPostgresConfigurationStore, "postgres")
return []embedded.Option{
embedded.WithConfigurations(configurationRegistry),
}
}