components-contrib/state/postgresql/v2/postgresql_integration_test.go

779 lines
20 KiB
Go

/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package postgresql
import (
"encoding/json"
"errors"
"os"
"testing"
"time"
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
pgxmock "github.com/pashagolub/pgxmock/v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
postgresql "github.com/dapr/components-contrib/common/component/postgresql/v1"
"github.com/dapr/components-contrib/metadata"
"github.com/dapr/components-contrib/state"
"github.com/dapr/kit/logger"
)
const (
connectionStringEnvKey = "DAPR_TEST_POSTGRES_CONNSTRING" // Environment variable containing the connection string
)
func TestPostgreSQLIntegration(t *testing.T) {
connectionString := getConnectionString()
if connectionString == "" {
t.Skipf("PostgreSQL state integration tests skipped. To enable define the connection string using environment variable '%s' (example 'export %s=\"host=localhost user=postgres password=example port=5432 connect_timeout=10 database=dapr_test\")", connectionStringEnvKey, connectionStringEnvKey)
}
t.Run("Test init configurations", func(t *testing.T) {
testInitConfiguration(t)
})
metadata := state.Metadata{
Base: metadata.Base{Properties: map[string]string{"connectionString": connectionString}},
}
pgs := NewPostgreSQLStateStore(logger.NewLogger("test")).(*postgresql.PostgreSQL)
t.Cleanup(func() {
defer pgs.Close()
})
error := pgs.Init(t.Context(), metadata)
if error != nil {
t.Fatal(error)
}
t.Run("Get Set Delete one item", func(t *testing.T) {
t.Parallel()
setGetUpdateDeleteOneItem(t, pgs)
})
t.Run("Get item that does not exist", func(t *testing.T) {
t.Parallel()
getItemThatDoesNotExist(t, pgs)
})
t.Run("Get item with no key fails", func(t *testing.T) {
t.Parallel()
getItemWithNoKey(t, pgs)
})
t.Run("Set updates the updatedate field", func(t *testing.T) {
t.Parallel()
setUpdatesTheUpdatedateField(t, pgs)
})
t.Run("Set item with no key fails", func(t *testing.T) {
t.Parallel()
setItemWithNoKey(t, pgs)
})
t.Run("Bulk set and bulk delete", func(t *testing.T) {
t.Parallel()
testBulkSetAndBulkDelete(t, pgs)
})
t.Run("Update and delete with etag succeeds", func(t *testing.T) {
t.Parallel()
updateAndDeleteWithEtagSucceeds(t, pgs)
})
t.Run("Update with old etag fails", func(t *testing.T) {
t.Parallel()
updateWithOldEtagFails(t, pgs)
})
t.Run("Insert with etag fails", func(t *testing.T) {
t.Parallel()
newItemWithEtagFails(t, pgs)
})
t.Run("Delete with invalid etag fails", func(t *testing.T) {
t.Parallel()
deleteWithInvalidEtagFails(t, pgs)
})
t.Run("Delete item with no key fails", func(t *testing.T) {
t.Parallel()
deleteWithNoKeyFails(t, pgs)
})
t.Run("Delete an item that does not exist", func(t *testing.T) {
t.Parallel()
deleteItemThatDoesNotExist(t, pgs)
})
t.Run("Multi with delete and set", func(t *testing.T) {
t.Parallel()
multiWithDeleteAndSet(t, pgs)
})
t.Run("Multi with delete only", func(t *testing.T) {
t.Parallel()
multiWithDeleteOnly(t, pgs)
})
t.Run("Multi with set only", func(t *testing.T) {
t.Parallel()
multiWithSetOnly(t, pgs)
})
}
// setGetUpdateDeleteOneItem validates setting one item, getting it, and deleting it.
func setGetUpdateDeleteOneItem(t *testing.T, pgs *postgresql.PostgreSQL) {
key := randomKey()
value := &fakeItem{Color: "yellow"}
setItem(t, pgs, key, value, nil)
getResponse, outputObject := getItem(t, pgs, key)
assert.Equal(t, value, outputObject)
newValue := &fakeItem{Color: "green"}
setItem(t, pgs, key, newValue, getResponse.ETag)
getResponse, outputObject = getItem(t, pgs, key)
assert.Equal(t, newValue, outputObject)
deleteItem(t, pgs, key, getResponse.ETag)
}
func deleteItemThatDoesNotExist(t *testing.T, pgs *postgresql.PostgreSQL) {
// Delete the item with a key not in the store
deleteReq := &state.DeleteRequest{
Key: randomKey(),
}
err := pgs.Delete(t.Context(), deleteReq)
require.NoError(t, err)
}
func multiWithSetOnly(t *testing.T, pgs *postgresql.PostgreSQL) {
var operations []state.TransactionalStateOperation //nolint:prealloc
var setRequests []state.SetRequest //nolint:prealloc
for range 3 {
req := state.SetRequest{
Key: randomKey(),
Value: randomJSON(),
}
setRequests = append(setRequests, req)
operations = append(operations, req)
}
err := pgs.Multi(t.Context(), &state.TransactionalStateRequest{
Operations: operations,
})
require.NoError(t, err)
for _, set := range setRequests {
assert.True(t, storeItemExists(t, set.Key))
deleteItem(t, pgs, set.Key, nil)
}
}
func multiWithDeleteOnly(t *testing.T, pgs *postgresql.PostgreSQL) {
var operations []state.TransactionalStateOperation //nolint:prealloc
var deleteRequests []state.DeleteRequest //nolint:prealloc
for range 3 {
req := state.DeleteRequest{Key: randomKey()}
// Add the item to the database
setItem(t, pgs, req.Key, randomJSON(), nil) // Add the item to the database
// Add the item to a slice of delete requests
deleteRequests = append(deleteRequests, req)
// Add the item to the multi transaction request
operations = append(operations, req)
}
err := pgs.Multi(t.Context(), &state.TransactionalStateRequest{
Operations: operations,
})
require.NoError(t, err)
for _, delete := range deleteRequests {
assert.False(t, storeItemExists(t, delete.Key))
}
}
func multiWithDeleteAndSet(t *testing.T, pgs *postgresql.PostgreSQL) {
var operations []state.TransactionalStateOperation //nolint:prealloc
var deleteRequests []state.DeleteRequest //nolint:prealloc
for range 3 {
req := state.DeleteRequest{Key: randomKey()}
// Add the item to the database
setItem(t, pgs, req.Key, randomJSON(), nil) // Add the item to the database
// Add the item to a slice of delete requests
deleteRequests = append(deleteRequests, req)
// Add the item to the multi transaction request
operations = append(operations, req)
}
// Create the set requests
var setRequests []state.SetRequest //nolint:prealloc
for range 3 {
req := state.SetRequest{
Key: randomKey(),
Value: randomJSON(),
}
setRequests = append(setRequests, req)
operations = append(operations, req)
}
err := pgs.Multi(t.Context(), &state.TransactionalStateRequest{
Operations: operations,
})
require.NoError(t, err)
for _, delete := range deleteRequests {
assert.False(t, storeItemExists(t, delete.Key))
}
for _, set := range setRequests {
assert.True(t, storeItemExists(t, set.Key))
deleteItem(t, pgs, set.Key, nil)
}
}
func deleteWithInvalidEtagFails(t *testing.T, pgs *postgresql.PostgreSQL) {
// Create new item
key := randomKey()
value := &fakeItem{Color: "mauve"}
setItem(t, pgs, key, value, nil)
etag := "1234"
// Delete the item with a fake etag
deleteReq := &state.DeleteRequest{
Key: key,
ETag: &etag,
}
err := pgs.Delete(t.Context(), deleteReq)
require.Error(t, err)
}
func deleteWithNoKeyFails(t *testing.T, pgs *postgresql.PostgreSQL) {
deleteReq := &state.DeleteRequest{
Key: "",
}
err := pgs.Delete(t.Context(), deleteReq)
require.Error(t, err)
}
// newItemWithEtagFails creates a new item and also supplies an ETag, which is invalid - expect failure.
func newItemWithEtagFails(t *testing.T, pgs *postgresql.PostgreSQL) {
value := &fakeItem{Color: "teal"}
invalidEtag := "12345"
setReq := &state.SetRequest{
Key: randomKey(),
ETag: &invalidEtag,
Value: value,
}
err := pgs.Set(t.Context(), setReq)
require.Error(t, err)
}
func updateWithOldEtagFails(t *testing.T, pgs *postgresql.PostgreSQL) {
// Create and retrieve new item
key := randomKey()
value := &fakeItem{Color: "gray"}
setItem(t, pgs, key, value, nil)
getResponse, _ := getItem(t, pgs, key)
assert.NotNil(t, getResponse.ETag)
originalEtag := getResponse.ETag
// Change the value and get the updated etag
newValue := &fakeItem{Color: "silver"}
setItem(t, pgs, key, newValue, originalEtag)
_, updatedItem := getItem(t, pgs, key)
assert.Equal(t, newValue, updatedItem)
// Update again with the original etag - expect udpate failure
newValue = &fakeItem{Color: "maroon"}
setReq := &state.SetRequest{
Key: key,
ETag: originalEtag,
Value: newValue,
}
err := pgs.Set(t.Context(), setReq)
require.Error(t, err)
}
func updateAndDeleteWithEtagSucceeds(t *testing.T, pgs *postgresql.PostgreSQL) {
// Create and retrieve new item
key := randomKey()
value := &fakeItem{Color: "hazel"}
setItem(t, pgs, key, value, nil)
getResponse, _ := getItem(t, pgs, key)
assert.NotNil(t, getResponse.ETag)
// Change the value and compare
value.Color = "purple"
setItem(t, pgs, key, value, getResponse.ETag)
updateResponse, updatedItem := getItem(t, pgs, key)
assert.Equal(t, value, updatedItem)
// ETag should change when item is updated
assert.NotEqual(t, getResponse.ETag, updateResponse.ETag)
// Delete
deleteItem(t, pgs, key, updateResponse.ETag)
// Item is not in the data store
assert.False(t, storeItemExists(t, key))
}
// getItemThatDoesNotExist validates the behavior of retrieving an item that does not exist.
func getItemThatDoesNotExist(t *testing.T, pgs *postgresql.PostgreSQL) {
key := randomKey()
response, outputObject := getItem(t, pgs, key)
assert.Nil(t, response.Data)
assert.Equal(t, "", outputObject.Color)
}
// getItemWithNoKey validates that attempting a Get operation without providing a key will return an error.
func getItemWithNoKey(t *testing.T, pgs *postgresql.PostgreSQL) {
getReq := &state.GetRequest{
Key: "",
}
response, getErr := pgs.Get(t.Context(), getReq)
require.Error(t, getErr)
assert.Nil(t, response)
}
// setUpdatesTheUpdatedateField proves that the updateddate is set for an update, and not set upon insert.
func setUpdatesTheUpdatedateField(t *testing.T, pgs *postgresql.PostgreSQL) {
key := randomKey()
value := &fakeItem{Color: "orange"}
setItem(t, pgs, key, value, nil)
// insertdate should have a value and updatedate should be nil
_, insertdate, updatedate := getRowData(t, key)
assert.Nil(t, updatedate)
assert.NotNil(t, insertdate)
// insertdate should not change, updatedate should have a value
value = &fakeItem{Color: "aqua"}
setItem(t, pgs, key, value, nil)
_, newinsertdate, updatedate := getRowData(t, key)
assert.NotNil(t, updatedate)
assert.NotNil(t, newinsertdate)
assert.True(t, insertdate.Equal(*newinsertdate)) // The insertdate should not change.
deleteItem(t, pgs, key, nil)
}
func setItemWithNoKey(t *testing.T, pgs *postgresql.PostgreSQL) {
setReq := &state.SetRequest{
Key: "",
}
err := pgs.Set(t.Context(), setReq)
require.Error(t, err)
}
// Tests valid bulk sets and deletes.
func testBulkSetAndBulkDelete(t *testing.T, pgs *postgresql.PostgreSQL) {
setReq := []state.SetRequest{
{
Key: randomKey(),
Value: &fakeItem{Color: "blue"},
},
{
Key: randomKey(),
Value: &fakeItem{Color: "red"},
},
}
err := pgs.BulkSet(t.Context(), setReq, state.BulkStoreOpts{})
require.NoError(t, err)
assert.True(t, storeItemExists(t, setReq[0].Key))
assert.True(t, storeItemExists(t, setReq[1].Key))
deleteReq := []state.DeleteRequest{
{
Key: setReq[0].Key,
},
{
Key: setReq[1].Key,
},
}
err = pgs.BulkDelete(t.Context(), deleteReq, state.BulkStoreOpts{})
require.NoError(t, err)
assert.False(t, storeItemExists(t, setReq[0].Key))
assert.False(t, storeItemExists(t, setReq[1].Key))
}
// testInitConfiguration tests valid and invalid config settings.
func testInitConfiguration(t *testing.T) {
logger := logger.NewLogger("test")
tests := []struct {
name string
props map[string]string
expectedErr error
}{
{
name: "Empty",
props: map[string]string{},
expectedErr: errors.New("missing connection string"),
},
{
name: "Valid connection string",
props: map[string]string{"connectionString": getConnectionString()},
expectedErr: nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := NewPostgreSQLStateStore(logger).(*postgresql.PostgreSQL)
defer p.Close()
metadata := state.Metadata{
Base: metadata.Base{Properties: tt.props},
}
err := p.Init(t.Context(), metadata)
if tt.expectedErr == nil {
require.NoError(t, err)
} else {
require.Error(t, err)
assert.Equal(t, tt.expectedErr, err)
}
})
}
}
func getConnectionString() string {
return os.Getenv(connectionStringEnvKey)
}
func setItem(t *testing.T, pgs *postgresql.PostgreSQL, key string, value interface{}, etag *string) {
setReq := &state.SetRequest{
Key: key,
ETag: etag,
Value: value,
}
err := pgs.Set(t.Context(), setReq)
require.NoError(t, err)
itemExists := storeItemExists(t, key)
assert.True(t, itemExists)
}
func getItem(t *testing.T, pgs *postgresql.PostgreSQL, key string) (*state.GetResponse, *fakeItem) {
getReq := &state.GetRequest{
Key: key,
Options: state.GetStateOption{},
}
response, getErr := pgs.Get(t.Context(), getReq)
require.NoError(t, getErr)
assert.NotNil(t, response)
outputObject := &fakeItem{}
_ = json.Unmarshal(response.Data, outputObject)
return response, outputObject
}
func deleteItem(t *testing.T, pgs *postgresql.PostgreSQL, key string, etag *string) {
deleteReq := &state.DeleteRequest{
Key: key,
ETag: etag,
Options: state.DeleteStateOption{},
}
deleteErr := pgs.Delete(t.Context(), deleteReq)
require.NoError(t, deleteErr)
assert.False(t, storeItemExists(t, key))
}
func storeItemExists(t *testing.T, key string) bool {
ctx := t.Context()
db, err := pgx.Connect(ctx, getConnectionString())
require.NoError(t, err)
defer db.Close(ctx)
exists := false
statement := `SELECT EXISTS (SELECT FROM state WHERE key = $1)`
err = db.QueryRow(ctx, statement, key).Scan(&exists)
require.NoError(t, err)
return exists
}
func getRowData(t *testing.T, key string) (returnValue string, insertdate *time.Time, updatedate *time.Time) {
ctx := t.Context()
db, err := pgx.Connect(ctx, getConnectionString())
require.NoError(t, err)
defer db.Close(ctx)
err = db.QueryRow(ctx, "SELECT value, insertdate, updatedate FROM state WHERE key = $1", key).
Scan(&returnValue, &insertdate, &updatedate)
require.NoError(t, err)
return returnValue, insertdate, updatedate
}
type mocks struct {
db pgxmock.PgxPoolIface
pg *PostgreSQL
}
type fakeItem struct {
Color string
}
func TestMultiWithNoRequests(t *testing.T) {
// Arrange
m, _ := mockDatabase(t)
defer m.db.Close()
m.db.ExpectBegin()
m.db.ExpectCommit()
// There's also a rollback called after a commit, which is expected and will not have effect
m.db.ExpectRollback()
var operations []state.TransactionalStateOperation
// Act
err := m.pg.Multi(t.Context(), &state.TransactionalStateRequest{
Operations: operations,
})
// Assert
require.NoError(t, err)
}
func TestValidSetRequest(t *testing.T) {
// Arrange
m, _ := mockDatabase(t)
defer m.db.Close()
setReq := createSetRequest()
val, _ := json.Marshal(setReq.Value)
t.Run("single op", func(t *testing.T) {
operations := []state.TransactionalStateOperation{setReq}
m.db.ExpectExec("INSERT INTO").
WithArgs(setReq.Key, val).
WillReturnResult(pgxmock.NewResult("INSERT", 1))
// Act
err := m.pg.Multi(t.Context(), &state.TransactionalStateRequest{
Operations: operations,
})
// Assert
require.NoError(t, err)
})
t.Run("multiple ops", func(t *testing.T) {
operations := []state.TransactionalStateOperation{setReq, setReq}
m.db.ExpectBegin()
m.db.ExpectExec("INSERT INTO").
WithArgs(setReq.Key, val).
WillReturnResult(pgxmock.NewResult("INSERT", 1))
m.db.ExpectExec("INSERT INTO").
WithArgs(setReq.Key, val).
WillReturnResult(pgxmock.NewResult("INSERT", 1))
m.db.ExpectCommit()
// There's also a rollback called after a commit, which is expected and will not have effect
m.db.ExpectRollback()
// Act
err := m.pg.Multi(t.Context(), &state.TransactionalStateRequest{
Operations: operations,
})
// Assert
require.NoError(t, err)
})
}
func TestInvalidMultiSetRequestNoKey(t *testing.T) {
// Arrange
m, _ := mockDatabase(t)
defer m.db.Close()
m.db.ExpectBegin()
m.db.ExpectRollback()
operations := []state.TransactionalStateOperation{
state.SetRequest{Value: "value1"}, // Set request without key is not valid for Upsert operation
}
// Act
err := m.pg.Multi(t.Context(), &state.TransactionalStateRequest{
Operations: operations,
})
// Assert
require.Error(t, err)
}
func TestValidMultiDeleteRequest(t *testing.T) {
// Arrange
m, _ := mockDatabase(t)
defer m.db.Close()
deleteReq := createDeleteRequest()
t.Run("single op", func(t *testing.T) {
operations := []state.TransactionalStateOperation{deleteReq}
m.db.ExpectExec("DELETE FROM").
WithArgs(deleteReq.Key).
WillReturnResult(pgxmock.NewResult("DELETE", 1))
// Act
err := m.pg.Multi(t.Context(), &state.TransactionalStateRequest{
Operations: operations,
})
// Assert
require.NoError(t, err)
})
t.Run("multiple ops", func(t *testing.T) {
operations := []state.TransactionalStateOperation{deleteReq, deleteReq}
m.db.ExpectBegin()
m.db.ExpectExec("DELETE FROM").
WithArgs(deleteReq.Key).
WillReturnResult(pgxmock.NewResult("DELETE", 1))
m.db.ExpectExec("DELETE FROM").
WithArgs(deleteReq.Key).
WillReturnResult(pgxmock.NewResult("DELETE", 1))
m.db.ExpectCommit()
// There's also a rollback called after a commit, which is expected and will not have effect
m.db.ExpectRollback()
// Act
err := m.pg.Multi(t.Context(), &state.TransactionalStateRequest{
Operations: operations,
})
// Assert
require.NoError(t, err)
})
}
func TestInvalidMultiDeleteRequestNoKey(t *testing.T) {
// Arrange
m, _ := mockDatabase(t)
defer m.db.Close()
m.db.ExpectBegin()
m.db.ExpectRollback()
operations := []state.TransactionalStateOperation{state.DeleteRequest{}} // Delete request without key is not valid for Delete operation
// Act
err := m.pg.Multi(t.Context(), &state.TransactionalStateRequest{
Operations: operations,
})
// Assert
require.Error(t, err)
}
func TestMultiOperationOrder(t *testing.T) {
// Arrange
m, _ := mockDatabase(t)
defer m.db.Close()
operations := []state.TransactionalStateOperation{
state.SetRequest{Key: "key1", Value: "value1"},
state.DeleteRequest{Key: "key1"},
}
m.db.ExpectBegin()
m.db.ExpectExec("INSERT INTO").
WithArgs("key1", []byte(`"value1"`)).
WillReturnResult(pgxmock.NewResult("INSERT", 1))
m.db.ExpectExec("DELETE FROM").
WithArgs("key1").
WillReturnResult(pgxmock.NewResult("DELETE", 1))
m.db.ExpectCommit()
// There's also a rollback called after a commit, which is expected and will not have effect
m.db.ExpectRollback()
// Act
err := m.pg.Multi(t.Context(), &state.TransactionalStateRequest{
Operations: operations,
})
// Assert
require.NoError(t, err)
}
func createSetRequest() state.SetRequest {
return state.SetRequest{
Key: randomKey(),
Value: randomJSON(),
}
}
func createDeleteRequest() state.DeleteRequest {
return state.DeleteRequest{
Key: randomKey(),
}
}
func mockDatabase(t *testing.T) (*mocks, error) {
logger := logger.NewLogger("test")
db, err := pgxmock.NewPool()
if err != nil {
t.Fatalf("an error '%s' was not expected when opening a stub database connection", err)
}
dba := &PostgreSQL{
metadata: pgMetadata{
Timeout: 30 * time.Second,
},
logger: logger,
db: db,
}
return &mocks{
db: db,
pg: dba,
}, err
}
func randomKey() string {
return uuid.New().String()
}
func randomJSON() *fakeItem {
return &fakeItem{Color: randomKey()}
}