Fix ordering in transaction API for PostgreSQL (#1524)

* fix transaction API ordering

Signed-off-by: Shubham Sharma <shubhash@microsoft.com>

* Update postgres transaction API and refactor tests

Signed-off-by: Shubham Sharma <shubhash@microsoft.com>

* Remove commented tests

Signed-off-by: Shubham Sharma <shubhash@microsoft.com>

* Add more tests

Signed-off-by: Shubham Sharma <shubhash@microsoft.com>

* Fix govet issue

Signed-off-by: Shubham Sharma <shubhash@microsoft.com>

* Fix goimports

Signed-off-by: Shubham Sharma <shubhash@microsoft.com>

* Fix golangci issues

Signed-off-by: Shubham Sharma <shubhash@microsoft.com>

Co-authored-by: Looong Dai <long.dai@intel.com>
Co-authored-by: Artur Souza <artursouza.ms@outlook.com>
This commit is contained in:
Shubham Sharma 2022-03-09 11:16:32 +05:30 committed by GitHub
parent db670d26b9
commit 794e42225a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 583 additions and 150 deletions

View File

@ -21,9 +21,11 @@ import (
type dbAccess interface {
Init(metadata state.Metadata) error
Set(req *state.SetRequest) error
BulkSet(req []state.SetRequest) error
Get(req *state.GetRequest) (*state.GetResponse, error)
Delete(req *state.DeleteRequest) error
ExecuteMulti(sets []state.SetRequest, deletes []state.DeleteRequest) error
BulkDelete(req []state.DeleteRequest) error
ExecuteMulti(req *state.TransactionalStateRequest) error
Query(req *state.QueryRequest) (*state.QueryResponse, error)
Close() error // io.Closer
}

View File

@ -166,6 +166,30 @@ func (p *postgresDBAccess) setValue(req *state.SetRequest) error {
return nil
}
func (p *postgresDBAccess) BulkSet(req []state.SetRequest) error {
p.logger.Debug("Executing BulkSet request")
tx, err := p.db.Begin()
if err != nil {
return err
}
if len(req) > 0 {
for _, s := range req {
sa := s // Fix for gosec G601: Implicit memory aliasing in for loop.
err = p.Set(&sa)
if err != nil {
tx.Rollback()
return err
}
}
}
err = tx.Commit()
return err
}
// Get returns data from the database. If data does not exist for the key an empty state.GetResponse will be returned.
func (p *postgresDBAccess) Get(req *state.GetRequest) (*state.GetResponse, error) {
p.logger.Debug("Getting state value from PostgreSQL")
@ -257,15 +281,15 @@ func (p *postgresDBAccess) deleteValue(req *state.DeleteRequest) error {
return nil
}
func (p *postgresDBAccess) ExecuteMulti(sets []state.SetRequest, deletes []state.DeleteRequest) error {
p.logger.Debug("Executing multiple PostgreSQL operations")
func (p *postgresDBAccess) BulkDelete(req []state.DeleteRequest) error {
p.logger.Debug("Executing BulkDelete request")
tx, err := p.db.Begin()
if err != nil {
return err
}
if len(deletes) > 0 {
for _, d := range deletes {
if len(req) > 0 {
for _, d := range req {
da := d // Fix for gosec G601: Implicit memory aliasing in for loop.
err = p.Delete(&da)
if err != nil {
@ -276,15 +300,54 @@ func (p *postgresDBAccess) ExecuteMulti(sets []state.SetRequest, deletes []state
}
}
if len(sets) > 0 {
for _, s := range sets {
sa := s // Fix for gosec G601: Implicit memory aliasing in for loop.
err = p.Set(&sa)
err = tx.Commit()
return err
}
func (p *postgresDBAccess) ExecuteMulti(request *state.TransactionalStateRequest) error {
p.logger.Debug("Executing PostgreSQL transaction")
tx, err := p.db.Begin()
if err != nil {
return err
}
for _, o := range request.Operations {
switch o.Operation {
case state.Upsert:
var setReq state.SetRequest
setReq, err = getSet(o)
if err != nil {
tx.Rollback()
return err
}
err = p.Set(&setReq)
if err != nil {
tx.Rollback()
return err
}
case state.Delete:
var delReq state.DeleteRequest
delReq, err = getDelete(o)
if err != nil {
tx.Rollback()
return err
}
err = p.Delete(&delReq)
if err != nil {
tx.Rollback()
return err
}
default:
tx.Rollback()
return fmt.Errorf("unsupported operation: %s", o.Operation)
}
}
@ -353,3 +416,31 @@ func tableExists(db *sql.DB, tableName string) (bool, error) {
return exists, err
}
// Returns the set requests.
func getSet(req state.TransactionalStateOperation) (state.SetRequest, error) {
setReq, ok := req.Request.(state.SetRequest)
if !ok {
return setReq, fmt.Errorf("expecting set request")
}
if setReq.Key == "" {
return setReq, fmt.Errorf("missing key in upsert operation")
}
return setReq, nil
}
// Returns the delete requests.
func getDelete(req state.TransactionalStateOperation) (state.DeleteRequest, error) {
delReq, ok := req.Request.(state.DeleteRequest)
if !ok {
return delReq, fmt.Errorf("expecting delete request")
}
if delReq.Key == "" {
return delReq, fmt.Errorf("missing key in upsert operation")
}
return delReq, nil
}

View File

@ -0,0 +1,461 @@
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package postgresql
import (
"database/sql"
"testing"
"github.com/DATA-DOG/go-sqlmock"
"github.com/stretchr/testify/assert"
"github.com/dapr/components-contrib/state"
"github.com/dapr/kit/logger"
)
type mocks struct {
db *sql.DB
mock sqlmock.Sqlmock
pgDba *postgresDBAccess
}
func TestGetSetWithWrongType(t *testing.T) {
t.Parallel()
operation := state.TransactionalStateOperation{
Operation: state.Delete,
Request: state.DeleteRequest{}, // Delete request is not valid for getSets
}
_, err := getSet(operation)
assert.NotNil(t, err)
}
func TestGetSetWithNoKey(t *testing.T) {
t.Parallel()
operation := state.TransactionalStateOperation{
Operation: state.Upsert,
Request: state.SetRequest{Value: "value1"}, // Set request with no key is invalid
}
_, err := getSet(operation)
assert.NotNil(t, err)
}
func TestGetSetValid(t *testing.T) {
t.Parallel()
operation := state.TransactionalStateOperation{
Operation: state.Upsert,
Request: state.SetRequest{Key: "key1", Value: "value1"},
}
set, err := getSet(operation)
assert.Nil(t, err)
assert.Equal(t, "key1", set.Key)
}
func TestGetDeleteWithWrongType(t *testing.T) {
t.Parallel()
operation := state.TransactionalStateOperation{
Operation: state.Upsert,
Request: state.SetRequest{Value: "value1"}, // Set request is not valid for getDeletes
}
_, err := getDelete(operation)
assert.NotNil(t, err)
}
func TestGetDeleteWithNoKey(t *testing.T) {
t.Parallel()
operation := state.TransactionalStateOperation{
Operation: state.Delete,
Request: state.DeleteRequest{}, // Delete request with no key is invalid
}
_, err := getDelete(operation)
assert.NotNil(t, err)
}
func TestGetDeleteValid(t *testing.T) {
t.Parallel()
operation := state.TransactionalStateOperation{
Operation: state.Delete,
Request: state.DeleteRequest{Key: "key1"},
}
delete, err := getDelete(operation)
assert.Nil(t, err)
assert.Equal(t, "key1", delete.Key)
}
func TestMultiWithNoRequests(t *testing.T) {
// Arrange
m, _ := mockDatabase(t)
defer m.db.Close()
m.mock.ExpectBegin()
m.mock.ExpectCommit()
var operations []state.TransactionalStateOperation
// Act
err := m.pgDba.ExecuteMulti(&state.TransactionalStateRequest{
Operations: operations,
})
// Assert
assert.Nil(t, err)
}
func TestInvalidMultiInvalidAction(t *testing.T) {
// Arrange
m, _ := mockDatabase(t)
defer m.db.Close()
m.mock.ExpectBegin()
m.mock.ExpectRollback()
var operations []state.TransactionalStateOperation
operations = append(operations, state.TransactionalStateOperation{
Operation: "Something invalid",
Request: createSetRequest(),
})
// Act
err := m.pgDba.ExecuteMulti(&state.TransactionalStateRequest{
Operations: operations,
})
// Assert
assert.NotNil(t, err)
}
func TestValidSetRequest(t *testing.T) {
// Arrange
m, _ := mockDatabase(t)
defer m.db.Close()
m.mock.ExpectBegin()
m.mock.ExpectExec("INSERT INTO").WillReturnResult(sqlmock.NewResult(1, 1))
m.mock.ExpectCommit()
var operations []state.TransactionalStateOperation
operations = append(operations, state.TransactionalStateOperation{
Operation: state.Upsert,
Request: createSetRequest(),
})
// Act
err := m.pgDba.ExecuteMulti(&state.TransactionalStateRequest{
Operations: operations,
})
// Assert
assert.Nil(t, err)
}
func TestInvalidMultiSetRequest(t *testing.T) {
// Arrange
m, _ := mockDatabase(t)
defer m.db.Close()
m.mock.ExpectBegin()
m.mock.ExpectRollback()
var operations []state.TransactionalStateOperation
operations = append(operations, state.TransactionalStateOperation{
Operation: state.Upsert,
Request: createDeleteRequest(), // Delete request is not valid for Upsert operation
})
// Act
err := m.pgDba.ExecuteMulti(&state.TransactionalStateRequest{
Operations: operations,
})
// Assert
assert.NotNil(t, err)
}
func TestInvalidMultiSetRequestNoKey(t *testing.T) {
// Arrange
m, _ := mockDatabase(t)
defer m.db.Close()
m.mock.ExpectBegin()
m.mock.ExpectRollback()
var operations []state.TransactionalStateOperation
operations = append(operations, state.TransactionalStateOperation{
Operation: state.Upsert,
Request: state.SetRequest{Value: "value1"}, // Set request without key is not valid for Upsert operation
})
// Act
err := m.pgDba.ExecuteMulti(&state.TransactionalStateRequest{
Operations: operations,
})
// Assert
assert.NotNil(t, err)
}
func TestValidMultiDeleteRequest(t *testing.T) {
// Arrange
m, _ := mockDatabase(t)
defer m.db.Close()
m.mock.ExpectBegin()
m.mock.ExpectExec("DELETE FROM").WillReturnResult(sqlmock.NewResult(1, 1))
m.mock.ExpectCommit()
var operations []state.TransactionalStateOperation
operations = append(operations, state.TransactionalStateOperation{
Operation: state.Delete,
Request: createDeleteRequest(),
})
// Act
err := m.pgDba.ExecuteMulti(&state.TransactionalStateRequest{
Operations: operations,
})
// Assert
assert.Nil(t, err)
}
func TestInvalidMultiDeleteRequest(t *testing.T) {
// Arrange
m, _ := mockDatabase(t)
defer m.db.Close()
m.mock.ExpectBegin()
m.mock.ExpectRollback()
var operations []state.TransactionalStateOperation
operations = append(operations, state.TransactionalStateOperation{
Operation: state.Delete,
Request: createSetRequest(), // Set request is not valid for Delete operation
})
// Act
err := m.pgDba.ExecuteMulti(&state.TransactionalStateRequest{
Operations: operations,
})
// Assert
assert.NotNil(t, err)
}
func TestInvalidMultiDeleteRequestNoKey(t *testing.T) {
// Arrange
m, _ := mockDatabase(t)
defer m.db.Close()
m.mock.ExpectBegin()
m.mock.ExpectRollback()
var operations []state.TransactionalStateOperation
operations = append(operations, state.TransactionalStateOperation{
Operation: state.Delete,
Request: state.DeleteRequest{}, // Delete request without key is not valid for Delete operation
})
// Act
err := m.pgDba.ExecuteMulti(&state.TransactionalStateRequest{
Operations: operations,
})
// Assert
assert.NotNil(t, err)
}
func TestMultiOperationOrder(t *testing.T) {
// Arrange
m, _ := mockDatabase(t)
defer m.db.Close()
m.mock.ExpectBegin()
m.mock.ExpectExec("INSERT INTO").WillReturnResult(sqlmock.NewResult(1, 1))
m.mock.ExpectExec("DELETE FROM").WillReturnResult(sqlmock.NewResult(1, 1))
m.mock.ExpectCommit()
var operations []state.TransactionalStateOperation
operations = append(operations,
state.TransactionalStateOperation{
Operation: state.Upsert,
Request: state.SetRequest{Key: "key1", Value: "value1"},
},
state.TransactionalStateOperation{
Operation: state.Delete,
Request: state.DeleteRequest{Key: "key1"},
},
)
// Act
err := m.pgDba.ExecuteMulti(&state.TransactionalStateRequest{
Operations: operations,
})
// Assert
assert.Nil(t, err)
}
func TestInvalidBulkSetNoKey(t *testing.T) {
// Arrange
m, _ := mockDatabase(t)
defer m.db.Close()
m.mock.ExpectBegin()
m.mock.ExpectRollback()
var sets []state.SetRequest
sets = append(sets, state.SetRequest{ // Set request without key is not valid for Set operation
Value: "value1",
})
// Act
err := m.pgDba.BulkSet(sets)
// Assert
assert.NotNil(t, err)
}
func TestInvalidBulkSetEmptyValue(t *testing.T) {
// Arrange
m, _ := mockDatabase(t)
defer m.db.Close()
m.mock.ExpectBegin()
m.mock.ExpectRollback()
var sets []state.SetRequest
sets = append(sets, state.SetRequest{ // Set request without value is not valid for Set operation
Key: "key1",
Value: "",
})
// Act
err := m.pgDba.BulkSet(sets)
// Assert
assert.NotNil(t, err)
}
func TestValidBulkSet(t *testing.T) {
// Arrange
m, _ := mockDatabase(t)
defer m.db.Close()
m.mock.ExpectBegin()
m.mock.ExpectExec("INSERT INTO").WillReturnResult(sqlmock.NewResult(1, 1))
m.mock.ExpectCommit()
var sets []state.SetRequest
sets = append(sets, state.SetRequest{
Key: "key1",
Value: "value1",
})
// Act
err := m.pgDba.BulkSet(sets)
// Assert
assert.Nil(t, err)
}
func TestInvalidBulkDeleteNoKey(t *testing.T) {
// Arrange
m, _ := mockDatabase(t)
defer m.db.Close()
m.mock.ExpectBegin()
m.mock.ExpectRollback()
var deletes []state.DeleteRequest
deletes = append(deletes, state.DeleteRequest{ // Delete request without key is not valid for Delete operation
Key: "",
})
// Act
err := m.pgDba.BulkDelete(deletes)
// Assert
assert.NotNil(t, err)
}
func TestValidBulkDelete(t *testing.T) {
// Arrange
m, _ := mockDatabase(t)
defer m.db.Close()
m.mock.ExpectBegin()
m.mock.ExpectExec("DELETE FROM").WillReturnResult(sqlmock.NewResult(1, 1))
m.mock.ExpectCommit()
var deletes []state.DeleteRequest
deletes = append(deletes, state.DeleteRequest{
Key: "key1",
})
// Act
err := m.pgDba.BulkDelete(deletes)
// Assert
assert.Nil(t, err)
}
func createSetRequest() state.SetRequest {
return state.SetRequest{
Key: randomKey(),
Value: randomJSON(),
}
}
func createDeleteRequest() state.DeleteRequest {
return state.DeleteRequest{
Key: randomKey(),
}
}
func mockDatabase(t *testing.T) (*mocks, error) {
logger := logger.NewLogger("test")
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("an error '%s' was not expected when opening a stub database connection", err)
}
dba := &postgresDBAccess{
logger: logger,
db: db,
}
return &mocks{
db: db,
mock: mock,
pgDba: dba,
}, err
}

View File

@ -14,8 +14,6 @@ limitations under the License.
package postgresql
import (
"fmt"
"github.com/dapr/components-contrib/state"
"github.com/dapr/kit/logger"
)
@ -65,7 +63,7 @@ func (p *PostgreSQL) Delete(req *state.DeleteRequest) error {
// BulkDelete removes multiple entries from the store.
func (p *PostgreSQL) BulkDelete(req []state.DeleteRequest) error {
return p.dbaccess.ExecuteMulti(nil, req)
return p.dbaccess.BulkDelete(req)
}
// Get returns an entity from store.
@ -86,39 +84,12 @@ func (p *PostgreSQL) Set(req *state.SetRequest) error {
// BulkSet adds/updates multiple entities on store.
func (p *PostgreSQL) BulkSet(req []state.SetRequest) error {
return p.dbaccess.ExecuteMulti(req, nil)
return p.dbaccess.BulkSet(req)
}
// Multi handles multiple transactions. Implements TransactionalStore.
func (p *PostgreSQL) Multi(request *state.TransactionalStateRequest) error {
var deletes []state.DeleteRequest
var sets []state.SetRequest
for _, req := range request.Operations {
switch req.Operation {
case state.Upsert:
if setReq, ok := req.Request.(state.SetRequest); ok {
sets = append(sets, setReq)
} else {
return fmt.Errorf("expecting set request")
}
case state.Delete:
if delReq, ok := req.Request.(state.DeleteRequest); ok {
deletes = append(deletes, delReq)
} else {
return fmt.Errorf("expecting delete request")
}
default:
return fmt.Errorf("unsupported operation: %s", req.Operation)
}
}
if len(sets) > 0 || len(deletes) > 0 {
return p.dbaccess.ExecuteMulti(sets, deletes)
}
return nil
return p.dbaccess.ExecuteMulti(request)
}
// Query executes a query against store.

View File

@ -27,10 +27,11 @@ const (
// Fake implementation of interface postgressql.dbaccess.
type fakeDBaccess struct {
logger logger.Logger
initExecuted bool
setExecuted bool
getExecuted bool
logger logger.Logger
initExecuted bool
setExecuted bool
getExecuted bool
deleteExecuted bool
}
func (m *fakeDBaccess) Init(metadata state.Metadata) error {
@ -52,10 +53,20 @@ func (m *fakeDBaccess) Get(req *state.GetRequest) (*state.GetResponse, error) {
}
func (m *fakeDBaccess) Delete(req *state.DeleteRequest) error {
m.deleteExecuted = true
return nil
}
func (m *fakeDBaccess) ExecuteMulti(sets []state.SetRequest, deletes []state.DeleteRequest) error {
func (m *fakeDBaccess) BulkSet(req []state.SetRequest) error {
return nil
}
func (m *fakeDBaccess) BulkDelete(req []state.DeleteRequest) error {
return nil
}
func (m *fakeDBaccess) ExecuteMulti(req *state.TransactionalStateRequest) error {
return nil
}
@ -74,109 +85,6 @@ func TestInitRunsDBAccessInit(t *testing.T) {
assert.True(t, fake.initExecuted)
}
func TestMultiWithNoRequestsReturnsNil(t *testing.T) {
t.Parallel()
var operations []state.TransactionalStateOperation
pgs := createPostgreSQL(t)
err := pgs.Multi(&state.TransactionalStateRequest{
Operations: operations,
})
assert.Nil(t, err)
}
func TestInvalidMultiAction(t *testing.T) {
t.Parallel()
var operations []state.TransactionalStateOperation
operations = append(operations, state.TransactionalStateOperation{
Operation: "Something invalid",
Request: createSetRequest(),
})
pgs := createPostgreSQL(t)
err := pgs.Multi(&state.TransactionalStateRequest{
Operations: operations,
})
assert.NotNil(t, err)
}
func TestValidSetRequest(t *testing.T) {
t.Parallel()
var operations []state.TransactionalStateOperation
operations = append(operations, state.TransactionalStateOperation{
Operation: state.Upsert,
Request: createSetRequest(),
})
pgs := createPostgreSQL(t)
err := pgs.Multi(&state.TransactionalStateRequest{
Operations: operations,
})
assert.Nil(t, err)
}
func TestInvalidMultiSetRequest(t *testing.T) {
t.Parallel()
var operations []state.TransactionalStateOperation
operations = append(operations, state.TransactionalStateOperation{
Operation: state.Upsert,
Request: createDeleteRequest(), // Delete request is not valid for Upsert operation
})
pgs := createPostgreSQL(t)
err := pgs.Multi(&state.TransactionalStateRequest{
Operations: operations,
})
assert.NotNil(t, err)
}
func TestValidMultiDeleteRequest(t *testing.T) {
t.Parallel()
var operations []state.TransactionalStateOperation
operations = append(operations, state.TransactionalStateOperation{
Operation: state.Delete,
Request: createDeleteRequest(),
})
pgs := createPostgreSQL(t)
err := pgs.Multi(&state.TransactionalStateRequest{
Operations: operations,
})
assert.Nil(t, err)
}
func TestInvalidMultiDeleteRequest(t *testing.T) {
t.Parallel()
var operations []state.TransactionalStateOperation
operations = append(operations, state.TransactionalStateOperation{
Operation: state.Delete,
Request: createSetRequest(), // Set request is not valid for Delete operation
})
pgs := createPostgreSQL(t)
err := pgs.Multi(&state.TransactionalStateRequest{
Operations: operations,
})
assert.NotNil(t, err)
}
func createSetRequest() state.SetRequest {
return state.SetRequest{
Key: randomKey(),
Value: randomJSON(),
}
}
func createDeleteRequest() state.DeleteRequest {
return state.DeleteRequest{
Key: randomKey(),
}
}
func createPostgreSQLWithFake(t *testing.T) (*PostgreSQL, *fakeDBaccess) {
pgs := createPostgreSQL(t)
fake := pgs.dbaccess.(*fakeDBaccess)