764 lines
20 KiB
Go
764 lines
20 KiB
Go
/*
|
|
Copyright 2021 The Dapr Authors
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
package mysql
|
|
|
|
import (
|
|
"crypto/tls"
|
|
"crypto/x509"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"fmt"
|
|
"os"
|
|
"sort"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/go-sql-driver/mysql"
|
|
"github.com/google/uuid"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
"github.com/dapr/components-contrib/metadata"
|
|
"github.com/dapr/components-contrib/state"
|
|
"github.com/dapr/kit/logger"
|
|
"github.com/dapr/kit/ptr"
|
|
)
|
|
|
|
const (
|
|
// Environment variable containing the connection string.
|
|
connectionStringEnvKey = "DAPR_TEST_MYSQL_CONNSTRING"
|
|
|
|
// Set to the path of the PEM file required to connect to MySQL over SSL.
|
|
pemPathEnvKey = "DAPR_TEST_MYSQL_PEMPATH"
|
|
)
|
|
|
|
type fakeItem struct {
|
|
Color string
|
|
}
|
|
|
|
func (f fakeItem) MarshalJSON() ([]byte, error) {
|
|
return json.Marshal(f.Color)
|
|
}
|
|
|
|
func (f *fakeItem) UnmarshalJSON(data []byte) error {
|
|
return json.Unmarshal(data, &f.Color)
|
|
}
|
|
|
|
func TestMySQLIntegration(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
// When the connection string is not set these tests will simply be skipped.
|
|
// This makes sure the test do not try to run during any CI builds.
|
|
connectionString := getConnectionString("")
|
|
if connectionString == "" {
|
|
t.Skipf(
|
|
`MySQL state integration tests skipped.
|
|
To enable define the connection string
|
|
using environment variable '%s'
|
|
(example 'export %s="root:password@tcp(localhost:3306)/")`,
|
|
connectionStringEnvKey, connectionStringEnvKey)
|
|
}
|
|
|
|
t.Run("Test init configurations", func(t *testing.T) {
|
|
// Tests valid and invalid config settings.
|
|
logger := logger.NewLogger("test")
|
|
|
|
// define a struct the contain the metadata and create
|
|
// two instances of it in a tests slice
|
|
tests := []struct {
|
|
name string
|
|
props map[string]string
|
|
expectedErr string
|
|
}{
|
|
{
|
|
name: "Empty",
|
|
props: map[string]string{},
|
|
expectedErr: errMissingConnectionString,
|
|
},
|
|
{
|
|
name: "Valid connection string",
|
|
props: map[string]string{
|
|
keyConnectionString: getConnectionString(""),
|
|
keyPemPath: getPemPath(),
|
|
},
|
|
expectedErr: "",
|
|
},
|
|
{
|
|
name: "Valid table name",
|
|
props: map[string]string{
|
|
keyConnectionString: getConnectionString(""),
|
|
keyPemPath: getPemPath(),
|
|
keyTableName: "stateStore",
|
|
},
|
|
expectedErr: "",
|
|
},
|
|
}
|
|
|
|
for _, tt := range tests {
|
|
t.Run(tt.name, func(t *testing.T) {
|
|
p := NewMySQLStateStore(logger).(*MySQL)
|
|
defer p.Close()
|
|
|
|
metadata := state.Metadata{
|
|
Base: metadata.Base{Properties: tt.props},
|
|
}
|
|
|
|
err := p.Init(t.Context(), metadata)
|
|
|
|
if tt.expectedErr == "" {
|
|
require.NoError(t, err)
|
|
} else {
|
|
require.Error(t, err)
|
|
assert.Equal(t, tt.expectedErr, err.Error())
|
|
}
|
|
})
|
|
}
|
|
})
|
|
|
|
pemPath := getPemPath()
|
|
|
|
metadata := state.Metadata{
|
|
Base: metadata.Base{Properties: map[string]string{keyConnectionString: connectionString, keyPemPath: pemPath}},
|
|
}
|
|
|
|
mys := NewMySQLStateStore(logger.NewLogger("test")).(*MySQL)
|
|
t.Cleanup(func() {
|
|
defer mys.Close()
|
|
})
|
|
|
|
error := mys.Init(t.Context(), metadata)
|
|
if error != nil {
|
|
t.Fatal(error)
|
|
}
|
|
|
|
t.Run("Create table succeeds", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
tableName := "test_state"
|
|
|
|
// Drop the table if it already exists
|
|
exists, err := tableExists(t.Context(), mys.db, "dapr_state_store", tableName, 10*time.Second)
|
|
require.NoError(t, err)
|
|
if exists {
|
|
dropTable(t, mys.db, tableName)
|
|
}
|
|
|
|
// Create the state table and test for its existence
|
|
// There should be no error
|
|
err = mys.ensureStateTable(t.Context(), "dapr_state_store", tableName)
|
|
require.NoError(t, err)
|
|
|
|
// Now create it and make sure there are no errors
|
|
exists, err = tableExists(t.Context(), mys.db, "dapr_state_store", tableName, 10*time.Second)
|
|
require.NoError(t, err)
|
|
assert.True(t, exists)
|
|
|
|
// Drop the state table
|
|
dropTable(t, mys.db, tableName)
|
|
})
|
|
|
|
t.Run("Get Set Delete one item", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
// Validates setting one item, getting it, and deleting it.
|
|
key := randomKey()
|
|
value := &fakeItem{Color: "yellow"}
|
|
|
|
setItem(t, mys, key, value, nil)
|
|
|
|
getResponse, outputObject := getItem(t, mys, key)
|
|
assert.Equal(t, value, outputObject)
|
|
|
|
newValue := &fakeItem{Color: "green"}
|
|
setItem(t, mys, key, newValue, getResponse.ETag)
|
|
getResponse, outputObject = getItem(t, mys, key)
|
|
assert.Equal(t, newValue, outputObject)
|
|
|
|
deleteItem(t, mys, key, getResponse.ETag)
|
|
})
|
|
|
|
t.Run("Get item that does not exist", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
// Validates the behavior of retrieving an item that does not exist.
|
|
key := randomKey()
|
|
response, outputObject := getItem(t, mys, key)
|
|
assert.Nil(t, response.Data)
|
|
assert.Equal(t, "", outputObject.Color)
|
|
})
|
|
|
|
t.Run("Get item with no key fails", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
// Validates that attempting a Get operation without providing a key will return an error.
|
|
getReq := &state.GetRequest{
|
|
Key: "",
|
|
}
|
|
|
|
response, getErr := mys.Get(t.Context(), getReq)
|
|
require.Error(t, getErr)
|
|
assert.Nil(t, response)
|
|
})
|
|
|
|
t.Run("Set updates the updatedate field", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
// Proves that the updatedate is set for an
|
|
// update, and set upon insert. The updatedate is used as the eTag so must be
|
|
// set. It is also auto updated on update by MySQL.
|
|
key := randomKey()
|
|
value := &fakeItem{Color: "orange"}
|
|
setItem(t, mys, key, value, nil)
|
|
|
|
// insertdate and updatedate should have a value
|
|
_, insertdate, updatedate, eTag := getRowData(t, key)
|
|
assert.NotNil(t, insertdate, "insertdate was not set")
|
|
assert.NotNil(t, updatedate, "updatedate was not set")
|
|
|
|
// insertdate should not change, updatedate should have a value
|
|
value = &fakeItem{Color: "aqua"}
|
|
setItem(t, mys, key, value, nil)
|
|
_, newinsertdate, _, newETag := getRowData(t, key)
|
|
assert.Equal(t, insertdate, newinsertdate, "InsertDate was changed")
|
|
assert.NotEqual(t, eTag, newETag, "eTag was not updated")
|
|
|
|
deleteItem(t, mys, key, nil)
|
|
})
|
|
|
|
t.Run("Set item with no key fails", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
setReq := &state.SetRequest{
|
|
Key: "",
|
|
}
|
|
|
|
err := mys.Set(t.Context(), setReq)
|
|
require.Error(t, err, "Error was not nil when setting item with no key.")
|
|
})
|
|
|
|
t.Run("Bulk set and bulk delete", func(t *testing.T) {
|
|
t.Parallel()
|
|
testBulkSetAndBulkDelete(t, mys)
|
|
})
|
|
|
|
t.Run("Get and BulkGet with ttl", func(t *testing.T) {
|
|
t.Parallel()
|
|
testGetExpireTime(t, mys)
|
|
testGetBulkExpireTime(t, mys)
|
|
})
|
|
|
|
t.Run("Update and delete with eTag succeeds", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
// Create and retrieve new item
|
|
key := randomKey()
|
|
value := &fakeItem{Color: "hazel"}
|
|
setItem(t, mys, key, value, nil)
|
|
getResponse, _ := getItem(t, mys, key)
|
|
assert.NotNil(t, getResponse.ETag)
|
|
|
|
// Change the value and compare
|
|
value.Color = "purple"
|
|
setItem(t, mys, key, value, getResponse.ETag)
|
|
updateResponse, updatedItem := getItem(t, mys, key)
|
|
assert.Equal(t, value, updatedItem, "Item should have been updated")
|
|
assert.NotEqual(t, getResponse.ETag, updateResponse.ETag,
|
|
"ETag should change when item is updated")
|
|
|
|
// Delete
|
|
deleteItem(t, mys, key, updateResponse.ETag)
|
|
|
|
assert.False(t, storeItemExists(t, key), "Item is not in the data store")
|
|
})
|
|
|
|
t.Run("Update with old eTag fails", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
// Create and retrieve new item
|
|
key := randomKey()
|
|
value := &fakeItem{Color: "gray"}
|
|
setItem(t, mys, key, value, nil)
|
|
|
|
getResponse, _ := getItem(t, mys, key)
|
|
assert.NotNil(t, getResponse.ETag)
|
|
originalEtag := getResponse.ETag
|
|
|
|
// Change the value and get the updated eTag
|
|
newValue := &fakeItem{Color: "silver"}
|
|
setItem(t, mys, key, newValue, originalEtag)
|
|
|
|
_, updatedItem := getItem(t, mys, key)
|
|
assert.Equal(t, newValue, updatedItem)
|
|
|
|
// Update again with the original eTag - expect update failure
|
|
newValue = &fakeItem{Color: "maroon"}
|
|
setReq := &state.SetRequest{
|
|
Key: key,
|
|
ETag: originalEtag,
|
|
Value: newValue,
|
|
}
|
|
|
|
err := mys.Set(t.Context(), setReq)
|
|
require.Error(t, err, "Error was not thrown using old eTag")
|
|
})
|
|
|
|
t.Run("Insert with eTag fails", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
value := &fakeItem{Color: "teal"}
|
|
invalidETag := "12345"
|
|
|
|
setReq := &state.SetRequest{
|
|
Key: randomKey(),
|
|
ETag: &invalidETag,
|
|
Value: value,
|
|
}
|
|
|
|
err := mys.Set(t.Context(), setReq)
|
|
require.Error(t, err)
|
|
})
|
|
|
|
t.Run("Delete with invalid eTag fails", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
// Create new item
|
|
key := randomKey()
|
|
value := &fakeItem{Color: "mauve"}
|
|
setItem(t, mys, key, value, nil)
|
|
|
|
eTag := "1234"
|
|
|
|
// Delete the item with a fake eTag
|
|
deleteReq := &state.DeleteRequest{
|
|
Key: key,
|
|
ETag: &eTag,
|
|
}
|
|
|
|
err := mys.Delete(t.Context(), deleteReq)
|
|
require.Error(t, err)
|
|
})
|
|
|
|
t.Run("Delete item with no key fails", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
deleteReq := &state.DeleteRequest{
|
|
Key: "",
|
|
}
|
|
|
|
err := mys.Delete(t.Context(), deleteReq)
|
|
require.Error(t, err)
|
|
})
|
|
|
|
t.Run("Delete an item that does not exist", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
// Delete the item with a key not in the store
|
|
deleteReq := &state.DeleteRequest{
|
|
Key: randomKey(),
|
|
}
|
|
|
|
err := mys.Delete(t.Context(), deleteReq)
|
|
require.NoError(t, err)
|
|
})
|
|
|
|
t.Run("Inserts with first-write-wins", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
// Insert without an etag should work on new keys
|
|
key := randomKey()
|
|
setReq := &state.SetRequest{
|
|
Key: key,
|
|
Value: &fakeItem{Color: "teal"},
|
|
Options: state.SetStateOption{
|
|
Concurrency: state.FirstWrite,
|
|
},
|
|
}
|
|
|
|
err := mys.Set(t.Context(), setReq)
|
|
require.NoError(t, err)
|
|
|
|
// Get the etag
|
|
getResponse, _ := getItem(t, mys, key)
|
|
assert.NotNil(t, getResponse)
|
|
assert.NotNil(t, getResponse.ETag)
|
|
originalEtag := getResponse.ETag
|
|
|
|
// Insert without an etag should fail on existing keys
|
|
setReq = &state.SetRequest{
|
|
Key: key,
|
|
Value: &fakeItem{Color: "gray or grey"},
|
|
Options: state.SetStateOption{
|
|
Concurrency: state.FirstWrite,
|
|
},
|
|
}
|
|
|
|
err = mys.Set(t.Context(), setReq)
|
|
require.ErrorContains(t, err, "Duplicate entry")
|
|
|
|
// Insert with invalid etag should fail on existing keys
|
|
setReq = &state.SetRequest{
|
|
Key: key,
|
|
Value: &fakeItem{Color: "pink"},
|
|
ETag: ptr.Of("no-etag"),
|
|
Options: state.SetStateOption{
|
|
Concurrency: state.FirstWrite,
|
|
},
|
|
}
|
|
|
|
err = mys.Set(t.Context(), setReq)
|
|
require.ErrorContains(t, err, "possible etag mismatch")
|
|
|
|
// Insert with valid etag should succeed on existing keys
|
|
setReq = &state.SetRequest{
|
|
Key: key,
|
|
Value: &fakeItem{Color: "scarlet"},
|
|
ETag: originalEtag,
|
|
Options: state.SetStateOption{
|
|
Concurrency: state.FirstWrite,
|
|
},
|
|
}
|
|
|
|
err = mys.Set(t.Context(), setReq)
|
|
require.NoError(t, err)
|
|
|
|
// Insert with an etag should fail on new keys
|
|
setReq = &state.SetRequest{
|
|
Key: randomKey(),
|
|
Value: &fakeItem{Color: "greige"},
|
|
ETag: ptr.Of("myetag"),
|
|
Options: state.SetStateOption{
|
|
Concurrency: state.FirstWrite,
|
|
},
|
|
}
|
|
|
|
err = mys.Set(t.Context(), setReq)
|
|
require.ErrorContains(t, err, "possible etag mismatch")
|
|
})
|
|
|
|
t.Run("Multi with delete and set", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
var operations []state.TransactionalStateOperation
|
|
var deleteRequests []state.DeleteRequest
|
|
for range 3 {
|
|
req := state.DeleteRequest{Key: randomKey()}
|
|
|
|
// Add the item to the database
|
|
setItem(t, mys, req.Key, randomJSON(), nil)
|
|
|
|
// Add the item to a slice of delete requests
|
|
deleteRequests = append(deleteRequests, req)
|
|
|
|
// Add the item to the multi transaction request
|
|
operations = append(operations, req)
|
|
}
|
|
|
|
// Create the set requests
|
|
var setRequests []state.SetRequest
|
|
for range 3 {
|
|
req := state.SetRequest{
|
|
Key: randomKey(),
|
|
Value: randomJSON(),
|
|
}
|
|
setRequests = append(setRequests, req)
|
|
operations = append(operations, req)
|
|
}
|
|
|
|
err := mys.Multi(t.Context(), &state.TransactionalStateRequest{
|
|
Operations: operations,
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
for _, delete := range deleteRequests {
|
|
assert.False(t, storeItemExists(t, delete.Key))
|
|
}
|
|
|
|
for _, set := range setRequests {
|
|
assert.True(t, storeItemExists(t, set.Key))
|
|
deleteItem(t, mys, set.Key, nil)
|
|
}
|
|
})
|
|
|
|
t.Run("Multi with delete only", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
var operations []state.TransactionalStateOperation
|
|
var deleteRequests []state.DeleteRequest
|
|
for range 3 {
|
|
req := state.DeleteRequest{Key: randomKey()}
|
|
|
|
// Add the item to the database
|
|
setItem(t, mys, req.Key, randomJSON(), nil)
|
|
|
|
// Add the item to a slice of delete requests
|
|
deleteRequests = append(deleteRequests, req)
|
|
|
|
// Add the item to the multi transaction request
|
|
operations = append(operations, req)
|
|
}
|
|
|
|
err := mys.Multi(t.Context(), &state.TransactionalStateRequest{
|
|
Operations: operations,
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
for _, delete := range deleteRequests {
|
|
assert.False(t, storeItemExists(t, delete.Key))
|
|
}
|
|
})
|
|
|
|
t.Run("Multi with set only", func(t *testing.T) {
|
|
t.Parallel()
|
|
|
|
var operations []state.TransactionalStateOperation
|
|
var setRequests []state.SetRequest
|
|
for range 3 {
|
|
req := state.SetRequest{
|
|
Key: randomKey(),
|
|
Value: randomJSON(),
|
|
}
|
|
setRequests = append(setRequests, req)
|
|
operations = append(operations, req)
|
|
}
|
|
|
|
err := mys.Multi(t.Context(), &state.TransactionalStateRequest{
|
|
Operations: operations,
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
for _, set := range setRequests {
|
|
assert.True(t, storeItemExists(t, set.Key))
|
|
deleteItem(t, mys, set.Key, nil)
|
|
}
|
|
})
|
|
}
|
|
|
|
// Tests valid bulk sets and deletes.
|
|
func testBulkSetAndBulkDelete(t *testing.T, mys *MySQL) {
|
|
setReq := []state.SetRequest{
|
|
{
|
|
Key: randomKey(),
|
|
Value: &fakeItem{Color: "blue"},
|
|
},
|
|
{
|
|
Key: randomKey(),
|
|
Value: &fakeItem{Color: "red"},
|
|
},
|
|
}
|
|
|
|
err := mys.BulkSet(t.Context(), setReq, state.BulkStoreOpts{})
|
|
require.NoError(t, err)
|
|
assert.True(t, storeItemExists(t, setReq[0].Key))
|
|
assert.True(t, storeItemExists(t, setReq[1].Key))
|
|
|
|
deleteReq := []state.DeleteRequest{
|
|
{
|
|
Key: setReq[0].Key,
|
|
},
|
|
{
|
|
Key: setReq[1].Key,
|
|
},
|
|
}
|
|
|
|
err = mys.BulkDelete(t.Context(), deleteReq, state.BulkStoreOpts{})
|
|
require.NoError(t, err)
|
|
assert.False(t, storeItemExists(t, setReq[0].Key))
|
|
assert.False(t, storeItemExists(t, setReq[1].Key))
|
|
}
|
|
|
|
func testGetExpireTime(t *testing.T, mys *MySQL) {
|
|
key1 := randomKey()
|
|
require.NoError(t, mys.Set(t.Context(), &state.SetRequest{
|
|
Key: key1,
|
|
Value: "123",
|
|
Metadata: map[string]string{
|
|
"ttlInSeconds": "1000",
|
|
},
|
|
}))
|
|
|
|
resp, err := mys.Get(t.Context(), &state.GetRequest{Key: key1})
|
|
require.NoError(t, err)
|
|
assert.Equal(t, `"123"`, string(resp.Data))
|
|
require.Len(t, resp.Metadata, 1)
|
|
expireTime, err := time.Parse(time.RFC3339, resp.Metadata["ttlExpireTime"])
|
|
require.NoError(t, err)
|
|
assert.InDelta(t, time.Now().Add(time.Second*1000).Unix(), expireTime.Unix(), 5)
|
|
}
|
|
|
|
func testGetBulkExpireTime(t *testing.T, mys *MySQL) {
|
|
key1 := randomKey()
|
|
key2 := randomKey()
|
|
|
|
require.NoError(t, mys.Set(t.Context(), &state.SetRequest{
|
|
Key: key1,
|
|
Value: "123",
|
|
Metadata: map[string]string{
|
|
"ttlInSeconds": "1000",
|
|
},
|
|
}))
|
|
require.NoError(t, mys.Set(t.Context(), &state.SetRequest{
|
|
Key: key2,
|
|
Value: "456",
|
|
Metadata: map[string]string{
|
|
"ttlInSeconds": "2001",
|
|
},
|
|
}))
|
|
|
|
resp, err := mys.BulkGet(t.Context(), []state.GetRequest{
|
|
{Key: key1}, {Key: key2},
|
|
}, state.BulkGetOpts{})
|
|
require.NoError(t, err)
|
|
assert.Len(t, resp, 2)
|
|
sort.Slice(resp, func(i, j int) bool {
|
|
return string(resp[i].Data) < string(resp[j].Data)
|
|
})
|
|
|
|
assert.Equal(t, `"123"`, string(resp[0].Data))
|
|
assert.Equal(t, `"456"`, string(resp[1].Data))
|
|
require.Len(t, resp[0].Metadata, 1)
|
|
require.Len(t, resp[1].Metadata, 1)
|
|
expireTime, err := time.Parse(time.RFC3339, resp[0].Metadata["ttlExpireTime"])
|
|
require.NoError(t, err)
|
|
assert.InDelta(t, time.Now().Add(time.Second*1000).Unix(), expireTime.Unix(), 5)
|
|
expireTime, err = time.Parse(time.RFC3339, resp[1].Metadata["ttlExpireTime"])
|
|
require.NoError(t, err)
|
|
assert.InDelta(t, time.Now().Add(time.Second*2001).Unix(), expireTime.Unix(), 5)
|
|
}
|
|
|
|
func dropTable(t *testing.T, db *sql.DB, tableName string) {
|
|
_, err := db.Exec(fmt.Sprintf(
|
|
`DROP TABLE %s;`,
|
|
tableName))
|
|
require.NoError(t, err)
|
|
}
|
|
|
|
func setItem(t *testing.T, mys *MySQL, key string, value interface{}, eTag *string) {
|
|
setReq := &state.SetRequest{
|
|
Key: key,
|
|
ETag: eTag,
|
|
Value: value,
|
|
}
|
|
|
|
err := mys.Set(t.Context(), setReq)
|
|
require.NoError(t, err, "Error setting an item")
|
|
itemExists := storeItemExists(t, key)
|
|
assert.True(t, itemExists, "Item does not exist after being set")
|
|
}
|
|
|
|
func getItem(t *testing.T, mys *MySQL, key string) (*state.GetResponse, *fakeItem) {
|
|
getReq := &state.GetRequest{
|
|
Key: key,
|
|
Options: state.GetStateOption{},
|
|
}
|
|
|
|
response, getErr := mys.Get(t.Context(), getReq)
|
|
require.NoError(t, getErr)
|
|
assert.NotNil(t, response)
|
|
outputObject := &fakeItem{}
|
|
_ = json.Unmarshal(response.Data, outputObject)
|
|
|
|
return response, outputObject
|
|
}
|
|
|
|
func deleteItem(t *testing.T, mys *MySQL, key string, eTag *string) {
|
|
deleteReq := &state.DeleteRequest{
|
|
Key: key,
|
|
ETag: eTag,
|
|
Options: state.DeleteStateOption{},
|
|
}
|
|
|
|
deleteErr := mys.Delete(t.Context(), deleteReq)
|
|
require.NoError(t, deleteErr, "There was an error deleting a record")
|
|
assert.False(t, storeItemExists(t, key), "Item still exists after delete")
|
|
}
|
|
|
|
func storeItemExists(t *testing.T, key string) bool {
|
|
db, err := connectToDB(t)
|
|
require.NoError(t, err)
|
|
defer db.Close()
|
|
|
|
exists := false
|
|
statement := fmt.Sprintf(
|
|
`SELECT EXISTS (SELECT * FROM %s WHERE id = ?)`,
|
|
defaultTableName)
|
|
err = db.QueryRow(statement, key).Scan(&exists)
|
|
require.NoError(t, err)
|
|
|
|
return exists
|
|
}
|
|
|
|
func getRowData(t *testing.T, key string) (returnValue string, insertdate sql.NullString, updatedate sql.NullString, eTag string) {
|
|
db, err := connectToDB(t)
|
|
require.NoError(t, err)
|
|
defer db.Close()
|
|
|
|
err = db.QueryRow(fmt.Sprintf(
|
|
`SELECT value, insertdate, updatedate, eTag FROM %s WHERE id = ?`,
|
|
defaultTableName), key).Scan(&returnValue, &insertdate, &updatedate, &eTag)
|
|
require.NoError(t, err)
|
|
|
|
return returnValue, insertdate, updatedate, eTag
|
|
}
|
|
|
|
// Connects to MySQL using SSL if required.
|
|
func connectToDB(t *testing.T) (*sql.DB, error) {
|
|
val := getPemPath()
|
|
if val != "" {
|
|
rootCertPool := x509.NewCertPool()
|
|
pem, readErr := os.ReadFile(val)
|
|
|
|
require.NoError(t, readErr, "Could not read PEM file")
|
|
|
|
ok := rootCertPool.AppendCertsFromPEM(pem)
|
|
|
|
assert.True(t, ok, "failed to append PEM")
|
|
|
|
mysql.RegisterTLSConfig("custom", &tls.Config{RootCAs: rootCertPool, MinVersion: tls.VersionTLS12})
|
|
}
|
|
|
|
db, err := sql.Open("mysql", getConnectionString(defaultSchemaName))
|
|
|
|
return db, err
|
|
}
|
|
|
|
func randomKey() string {
|
|
return uuid.New().String()
|
|
}
|
|
|
|
func randomJSON() *fakeItem {
|
|
return &fakeItem{Color: randomKey()}
|
|
}
|
|
|
|
// Returns the connection string
|
|
// The value is read from an environment variable.
|
|
func getConnectionString(database string) string {
|
|
connectionString := os.Getenv(connectionStringEnvKey)
|
|
|
|
if database != "" {
|
|
parts := strings.Split(connectionString, "/")
|
|
connectionString = fmt.Sprintf("%s/%s%s", parts[0], database, parts[1])
|
|
}
|
|
|
|
return connectionString
|
|
}
|
|
|
|
// Returns the full path to the PEM file used to connect to Azure MySQL over
|
|
// SSL. The connection string must end with &tls=custom
|
|
// The value is read from an environment variable.
|
|
func getPemPath() string {
|
|
return os.Getenv(pemPathEnvKey)
|
|
}
|