Introducing Oracle Database backed state store component (#1454)

* Introducing Oracle Database backed state store component

Signed-off-by: lucasjellema <lucasjellema@gmail.com>

* Implement Ping method with proper database ping

Signed-off-by: lucasjellema <lucasjellema@gmail.com>

* adding results from make modtidy-all - updated go.sum files
Signed-off-by: lucasjellema <lucasjellema@gmail.com>

* etag only applied when first write policy requested - applied in integration test and component

Signed-off-by: lucasjellema <lucasjellema@gmail.com>

Co-authored-by: Looong Dai <long.dai@intel.com>
Co-authored-by: Yaron Schneider <schneider.yaron@live.com>
This commit is contained in:
Lucas Jellema 2022-02-08 21:00:34 +01:00 committed by GitHub
parent a383697ef5
commit 625f955fee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1669 additions and 1534 deletions

1
go.mod
View File

@ -158,6 +158,7 @@ require (
github.com/alibabacloud-go/tea v1.1.17
github.com/dgrijalva/jwt-go v3.2.1-0.20210802184156-9742bd7fca1c+incompatible // indirect
github.com/oracle/oci-go-sdk/v54 v54.0.0
github.com/sijms/go-ora/v2 v2.2.22
)
require github.com/nats-io/nkeys v0.3.0

2
go.sum
View File

@ -1147,6 +1147,8 @@ github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24 h1:pntxY8Ary0t4
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4=
github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5IYyJwS/kOiWx8mHo=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sijms/go-ora/v2 v2.2.22 h1:XmJAEwgokgLfe43QZtYWi6+ziYKz4yHmqKQlbdNVFmA=
github.com/sijms/go-ora/v2 v2.2.22/go.mod h1:jzfAFD+4CXHE+LjGWFl6cPrtiIpQVxakI2gvrMF2w6Y=
github.com/sirupsen/logrus v1.0.6/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=

View File

@ -0,0 +1,29 @@
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package oracledatabase
import (
"github.com/dapr/components-contrib/state"
)
// dbAccess is a private interface which enables unit testing of Oracle Database.
type dbAccess interface {
Init(metadata state.Metadata) error
Ping() error
Set(req *state.SetRequest) error
Get(req *state.GetRequest) (*state.GetResponse, error)
Delete(req *state.DeleteRequest) error
ExecuteMulti(sets []state.SetRequest, deletes []state.DeleteRequest) error
Close() error // io.Closer.
}

View File

@ -0,0 +1,131 @@
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package oracledatabase
import (
"fmt"
"github.com/dapr/components-contrib/state"
"github.com/dapr/kit/logger"
)
// Oracle Database state store.
type OracleDatabase struct {
features []state.Feature
logger logger.Logger
dbaccess dbAccess
}
// NewOracleDatabaseStateStore creates a new instance of OracleDatabase state store.
func NewOracleDatabaseStateStore(logger logger.Logger) *OracleDatabase {
dba := newOracleDatabaseAccess(logger)
return newOracleDatabaseStateStore(logger, dba)
}
// newOracleDatabaseStateStore creates a newOracleDatabaseStateStore instance of an OracleDatabase state store.
// This unexported constructor allows injecting a dbAccess instance for unit testing.
func newOracleDatabaseStateStore(logger logger.Logger, dba dbAccess) *OracleDatabase {
return &OracleDatabase{
features: []state.Feature{state.FeatureETag, state.FeatureTransactional},
logger: logger,
dbaccess: dba,
}
}
// Init initializes the SQL server state store.
func (o *OracleDatabase) Init(metadata state.Metadata) error {
return o.dbaccess.Init(metadata)
}
func (o *OracleDatabase) Ping() error {
return o.dbaccess.Ping()
}
// Features returns the features available in this state store.
func (o *OracleDatabase) Features() []state.Feature {
return o.features
}
// Delete removes an entity from the store.
func (o *OracleDatabase) Delete(req *state.DeleteRequest) error {
return o.dbaccess.Delete(req)
}
// BulkDelete removes multiple entries from the store.
func (o *OracleDatabase) BulkDelete(req []state.DeleteRequest) error {
return o.dbaccess.ExecuteMulti(nil, req)
}
// Get returns an entity from store.
func (o *OracleDatabase) Get(req *state.GetRequest) (*state.GetResponse, error) {
return o.dbaccess.Get(req)
}
// BulkGet performs a bulks get operations.
func (o *OracleDatabase) BulkGet(req []state.GetRequest) (bool, []state.BulkGetResponse, error) {
// TODO: replace with ExecuteMulti for performance.
return false, nil, nil
}
// Set adds/updates an entity on store.
func (o *OracleDatabase) Set(req *state.SetRequest) error {
return o.dbaccess.Set(req)
}
// BulkSet adds/updates multiple entities on store.
func (o *OracleDatabase) BulkSet(req []state.SetRequest) error {
return o.dbaccess.ExecuteMulti(req, nil)
}
// Multi handles multiple transactions. Implements TransactionalStore.
func (o *OracleDatabase) Multi(request *state.TransactionalStateRequest) error {
var deletes []state.DeleteRequest
var sets []state.SetRequest
for _, req := range request.Operations {
switch req.Operation {
case state.Upsert:
if setReq, ok := req.Request.(state.SetRequest); ok {
sets = append(sets, setReq)
} else {
return fmt.Errorf("expecting set request")
}
case state.Delete:
if delReq, ok := req.Request.(state.DeleteRequest); ok {
deletes = append(deletes, delReq)
} else {
return fmt.Errorf("expecting delete request")
}
default:
return fmt.Errorf("unsupported operation: %s", req.Operation)
}
}
if len(sets) > 0 || len(deletes) > 0 {
return o.dbaccess.ExecuteMulti(sets, deletes)
}
return nil
}
// Close implements io.Closer.
func (o *OracleDatabase) Close() error {
if o.dbaccess != nil {
return o.dbaccess.Close()
}
return nil
}

View File

@ -0,0 +1,895 @@
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package oracledatabase
import (
"database/sql"
"encoding/json"
"fmt"
"net/url"
"os"
"strconv"
"testing"
"time"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"github.com/dapr/components-contrib/state"
"github.com/dapr/kit/logger"
)
const (
connectionStringEnvKey = "DAPR_TEST_ORACLE_DATABASE_CONNECTSTRING" // Environment variable containing the connection string.
oracleWalletLocationEnvKey = "DAPR_TEST_ORACLE_WALLET_LOCATION" // Environment variable containing the directory that contains the Oracle Wallet contents.
)
type fakeItem struct {
Color string
}
func TestOracleDatabaseIntegration(t *testing.T) {
connectionString := getConnectionString()
if connectionString == "" {
// first run export DAPR_TEST_ORACLE_DATABASE_CONNECTSTRING="oracle://demo:demo@localhost:1521/xe".
// for autonomous first run: export DAPR_TEST_ORACLE_DATABASE_CONNECTSTRING="oracle://demo:Modem123mode@adb.us-ashburn-1.oraclecloud.com:1522/k8j2fvxbaujdcfy_daprdb_low.adb.oraclecloud.com".
// then also run: export DAPR_TEST_ORACLE_WALLET_LOCATION="/home/lucas/dapr-work/components-contrib/state/oracledatabase/Wallet_daprDB/".
t.Skipf("Oracle Database state integration tests skipped. To enable define the connection string using environment variable '%s' (example 'export %s=\"oracle://username:password@host:port/servicename\")", connectionStringEnvKey, connectionStringEnvKey)
}
t.Run("Test init configurations", func(t *testing.T) {
testInitConfiguration(t)
})
oracleWalletLocation := getWalletLocation()
metadata := state.Metadata{
Properties: map[string]string{connectionStringKey: connectionString, oracleWalletLocationKey: oracleWalletLocation},
}
ods := NewOracleDatabaseStateStore(logger.NewLogger("test"))
t.Cleanup(func() {
defer ods.Close()
})
if initerror := ods.Init(metadata); initerror != nil {
t.Fatal(initerror)
}
t.Run("Create table succeeds", func(t *testing.T) {
testCreateTable(t, ods.dbaccess.(*oracleDatabaseAccess))
})
t.Run("Get Set Delete one item", func(t *testing.T) {
t.Parallel()
setGetUpdateDeleteOneItem(t, ods)
})
t.Run("Get item that does not exist", func(t *testing.T) {
t.Parallel()
getItemThatDoesNotExist(t, ods)
})
t.Run("Get item with no key fails", func(t *testing.T) {
t.Parallel()
getItemWithNoKey(t, ods)
})
t.Run("Set item with invalid (non numeric) TTL", func(t *testing.T) {
t.Parallel()
testSetItemWithInvalidTTL(t, ods)
})
t.Run("Set item with negative TTL", func(t *testing.T) {
t.Parallel()
testSetItemWithNegativeTTL(t, ods)
})
t.Run("Set with TTL updates the expiration field", func(t *testing.T) {
setTTLUpdatesExpiry(t, ods)
})
t.Run("Set with TTL followed by set without TTL resets the expiration field", func(t *testing.T) {
setNoTTLUpdatesExpiry(t, ods)
})
t.Run("Expired item cannot be read", func(t *testing.T) {
t.Parallel()
expiredStateCannotBeRead(t, ods)
})
t.Run("Unexpired item be read", func(t *testing.T) {
t.Parallel()
unexpiredStateCanBeRead(t, ods)
})
t.Run("Set updates the updatedate field", func(t *testing.T) {
setUpdatesTheUpdatedateField(t, ods)
})
t.Run("Set item with no key fails", func(t *testing.T) {
t.Parallel()
setItemWithNoKey(t, ods)
})
t.Run("Bulk set and bulk delete", func(t *testing.T) {
// t.Parallel()
testBulkSetAndBulkDelete(t, ods)
})
t.Run("Update and delete with etag succeeds", func(t *testing.T) {
// t.Parallel()
updateAndDeleteWithEtagSucceeds(t, ods)
})
t.Run("Update with old etag fails", func(t *testing.T) {
// t.Parallel()
updateWithOldEtagFails(t, ods)
})
t.Run("Insert with etag fails", func(t *testing.T) {
t.Parallel()
newItemWithEtagFails(t, ods)
})
t.Run("Delete with invalid etag fails when first write is enforced", func(t *testing.T) {
t.Parallel()
deleteWithInvalidEtagFails(t, ods)
})
t.Run("Update and Delete with invalid etag and no first write policy enforced succeeds", func(t *testing.T) {
t.Parallel()
updateAndDeleteWithWrongEtagAndNoFirstWriteSucceeds(t, ods)
})
t.Run("Delete item with no key fails", func(t *testing.T) {
t.Parallel()
deleteWithNoKeyFails(t, ods)
})
t.Run("Delete an item that does not exist", func(t *testing.T) {
t.Parallel()
deleteItemThatDoesNotExist(t, ods)
})
t.Run("Multi with delete and set", func(t *testing.T) {
// t.Parallel()
multiWithDeleteAndSet(t, ods)
})
t.Run("Multi with delete only", func(t *testing.T) {
multiWithDeleteOnly(t, ods)
})
t.Run("Multi with set only", func(t *testing.T) {
multiWithSetOnly(t, ods)
})
}
// setGetUpdateDeleteOneItem validates setting one item, getting it, and deleting it.
func setGetUpdateDeleteOneItem(t *testing.T, ods *OracleDatabase) {
key := randomKey()
value := &fakeItem{Color: "yellow"}
setItem(t, ods, key, value, nil)
getResponse, outputObject := getItem(t, ods, key)
assert.Equal(t, value, outputObject)
newValue := &fakeItem{Color: "green"}
setItem(t, ods, key, newValue, getResponse.ETag)
getResponse, outputObject = getItem(t, ods, key)
assert.Equal(t, newValue, outputObject)
deleteItem(t, ods, key, getResponse.ETag)
}
// testCreateTable tests the ability to create the state table.
func testCreateTable(t *testing.T, dba *oracleDatabaseAccess) {
tableName := "test_state"
// Drop the table if it already exists.
exists, err := tableExists(dba.db, tableName)
assert.Nil(t, err)
if exists {
dropTable(t, dba.db, tableName)
}
// Create the state table and test for its existence.
err = dba.ensureStateTable(tableName)
assert.Nil(t, err)
exists, err = tableExists(dba.db, tableName)
assert.Nil(t, err)
assert.True(t, exists)
// Drop the state table.
dropTable(t, dba.db, tableName)
}
func dropTable(t *testing.T, db *sql.DB, tableName string) {
_, err := db.Exec(fmt.Sprintf("DROP TABLE %s", tableName))
assert.Nil(t, err)
}
func deleteItemThatDoesNotExist(t *testing.T, ods *OracleDatabase) {
// Delete the item with a key not in the store.
deleteReq := &state.DeleteRequest{
Key: randomKey(),
}
err := ods.Delete(deleteReq)
assert.Nil(t, err)
}
func multiWithSetOnly(t *testing.T, ods *OracleDatabase) {
var operations []state.TransactionalStateOperation
var setRequests []state.SetRequest
for i := 0; i < 3; i++ {
req := state.SetRequest{
Key: randomKey(),
Value: randomJSON(),
}
setRequests = append(setRequests, req)
operations = append(operations, state.TransactionalStateOperation{
Operation: state.Upsert,
Request: req,
})
}
err := ods.Multi(&state.TransactionalStateRequest{
Operations: operations,
})
assert.Nil(t, err)
for _, set := range setRequests {
assert.True(t, storeItemExists(t, set.Key))
deleteItem(t, ods, set.Key, nil)
}
}
func multiWithDeleteOnly(t *testing.T, ods *OracleDatabase) {
var operations []state.TransactionalStateOperation
var deleteRequests []state.DeleteRequest
for i := 0; i < 3; i++ {
req := state.DeleteRequest{Key: randomKey()}
// Add the item to the database.
setItem(t, ods, req.Key, randomJSON(), nil) // Add the item to the database.
// Add the item to a slice of delete requests.
deleteRequests = append(deleteRequests, req)
// Add the item to the multi transaction request.
operations = append(operations, state.TransactionalStateOperation{
Operation: state.Delete,
Request: req,
})
}
err := ods.Multi(&state.TransactionalStateRequest{
Operations: operations,
})
assert.Nil(t, err)
for _, delete := range deleteRequests {
assert.False(t, storeItemExists(t, delete.Key))
}
}
func multiWithDeleteAndSet(t *testing.T, ods *OracleDatabase) {
var operations []state.TransactionalStateOperation
var deleteRequests []state.DeleteRequest
for i := 0; i < 3; i++ {
req := state.DeleteRequest{Key: randomKey()}
// Add the item to the database.
setItem(t, ods, req.Key, randomJSON(), nil) // Add the item to the database.
// Add the item to a slice of delete requests.
deleteRequests = append(deleteRequests, req)
// Add the item to the multi transaction request.
operations = append(operations, state.TransactionalStateOperation{
Operation: state.Delete,
Request: req,
})
}
// Create the set requests.
var setRequests []state.SetRequest
for i := 0; i < 3; i++ {
req := state.SetRequest{
Key: randomKey(),
Value: randomJSON(),
}
setRequests = append(setRequests, req)
operations = append(operations, state.TransactionalStateOperation{
Operation: state.Upsert,
Request: req,
})
}
err := ods.Multi(&state.TransactionalStateRequest{
Operations: operations,
})
assert.Nil(t, err)
for _, delete := range deleteRequests {
assert.False(t, storeItemExists(t, delete.Key))
}
for _, set := range setRequests {
assert.True(t, storeItemExists(t, set.Key))
deleteItem(t, ods, set.Key, nil)
}
}
func deleteWithInvalidEtagFails(t *testing.T, ods *OracleDatabase) {
// Create new item.
key := randomKey()
value := &fakeItem{Color: "mauvebrown"}
setItem(t, ods, key, value, nil)
etag := "1234"
// Delete the item with a fake etag.
deleteReq := &state.DeleteRequest{
Key: key,
ETag: &etag,
Options: state.DeleteStateOption{
Concurrency: state.FirstWrite,
},
}
err := ods.Delete(deleteReq)
assert.NotNil(t, err, "Deleting an item with the wrong etag while enforcing FirstWrite policy should fail")
}
func deleteWithNoKeyFails(t *testing.T, ods *OracleDatabase) {
deleteReq := &state.DeleteRequest{
Key: "",
}
err := ods.Delete(deleteReq)
assert.NotNil(t, err)
}
// newItemWithEtagFails creates a new item and also supplies a non existent ETag and requests FirstWrite, which is invalid - expect failure.
func newItemWithEtagFails(t *testing.T, ods *OracleDatabase) {
value := &fakeItem{Color: "teal"}
invalidEtag := "12345"
setReq := &state.SetRequest{
Key: randomKey(),
ETag: &invalidEtag,
Value: value,
Options: state.SetStateOption{
Concurrency: state.FirstWrite,
},
}
err := ods.Set(setReq)
assert.NotNil(t, err)
}
func updateWithOldEtagFails(t *testing.T, ods *OracleDatabase) {
// Create and retrieve new item.
key := randomKey()
value := &fakeItem{Color: "gray"}
setItem(t, ods, key, value, nil)
getResponse, _ := getItem(t, ods, key)
assert.NotNil(t, getResponse.ETag)
originalEtag := getResponse.ETag
// Change the value and get the updated etag.
newValue := &fakeItem{Color: "silver"}
setItem(t, ods, key, newValue, originalEtag)
_, updatedItem := getItem(t, ods, key)
assert.Equal(t, newValue, updatedItem)
getResponse, _ = getItem(t, ods, key)
assert.NotNil(t, getResponse.ETag)
// Update again with the original etag - expect update failure.
newValue = &fakeItem{Color: "maroon"}
setReq := &state.SetRequest{
Key: key,
ETag: originalEtag,
Value: newValue,
Options: state.SetStateOption{
Concurrency: state.FirstWrite,
},
}
err := ods.Set(setReq)
assert.NotNil(t, err)
}
func updateAndDeleteWithEtagSucceeds(t *testing.T, ods *OracleDatabase) {
// Create and retrieve new item.
key := randomKey()
value := &fakeItem{Color: "hazel"}
setItem(t, ods, key, value, nil)
getResponse, _ := getItem(t, ods, key)
assert.NotNil(t, getResponse.ETag)
// Change the value and compare.
value.Color = "purple"
setReq := &state.SetRequest{
Key: key,
ETag: getResponse.ETag,
Value: value,
Options: state.SetStateOption{
Concurrency: state.FirstWrite,
},
}
err := ods.Set(setReq)
assert.Nil(t, err, "Setting the item should be successful")
updateResponse, updatedItem := getItem(t, ods, key)
assert.Equal(t, value, updatedItem)
// ETag should change when item is updated..
assert.NotEqual(t, getResponse.ETag, updateResponse.ETag)
// Delete.
deleteReq := &state.DeleteRequest{
Key: key,
ETag: updateResponse.ETag,
Options: state.DeleteStateOption{
Concurrency: state.FirstWrite,
},
}
err = ods.Delete(deleteReq)
assert.Nil(t, err, "Deleting an item with the right etag while enforcing FirstWrite policy should succeed")
// Item is not in the data store.
assert.False(t, storeItemExists(t, key))
}
func updateAndDeleteWithWrongEtagAndNoFirstWriteSucceeds(t *testing.T, ods *OracleDatabase) {
// Create and retrieve new item.
key := randomKey()
value := &fakeItem{Color: "hazel"}
setItem(t, ods, key, value, nil)
getResponse, _ := getItem(t, ods, key)
assert.NotNil(t, getResponse.ETag)
// Change the value and compare.
value.Color = "purple"
someInvalidEtag := "1234581736145"
setReq := &state.SetRequest{
Key: key,
ETag: &someInvalidEtag,
Value: value,
Options: state.SetStateOption{
Concurrency: state.LastWrite,
},
}
err := ods.Set(setReq)
assert.Nil(t, err, "Setting the item should be successful")
_, updatedItem := getItem(t, ods, key)
assert.Equal(t, value, updatedItem)
// Delete.
deleteReq := &state.DeleteRequest{
Key: key,
ETag: &someInvalidEtag,
Options: state.DeleteStateOption{
Concurrency: state.LastWrite,
},
}
err = ods.Delete(deleteReq)
assert.Nil(t, err, "Deleting an item with the wrong etag but not enforcing FirstWrite policy should succeed")
// Item is not in the data store.
assert.False(t, storeItemExists(t, key))
}
// getItemThatDoesNotExist validates the behavior of retrieving an item that does not exist.
func getItemThatDoesNotExist(t *testing.T, ods *OracleDatabase) {
key := randomKey()
response, outputObject := getItem(t, ods, key)
assert.Nil(t, response.Data)
assert.Equal(t, "", outputObject.Color)
}
// getItemWithNoKey validates that attempting a Get operation without providing a key will return an error.
func getItemWithNoKey(t *testing.T, ods *OracleDatabase) {
getReq := &state.GetRequest{
Key: "",
}
response, getErr := ods.Get(getReq)
assert.NotNil(t, getErr)
assert.Nil(t, response)
}
// setUpdatesTheUpdatedateField proves that the updateddate is set for an update, and not set upon insert.
func setUpdatesTheUpdatedateField(t *testing.T, ods *OracleDatabase) {
key := randomKey()
value := &fakeItem{Color: "orange"}
setItem(t, ods, key, value, nil)
connectionString := getConnectionString()
if getWalletLocation() != "" {
connectionString += "?TRACE FILE=trace.log&SSL=enable&SSL Verify=false&WALLET=" + url.QueryEscape(getWalletLocation())
}
db, err := sql.Open("oracle", connectionString)
assert.Nil(t, err)
defer db.Close()
// insertdate should have a value and updatedate should be nil.
_, insertdate, updatedate := getRowData(t, key)
assert.NotNil(t, insertdate)
assert.Equal(t, "", updatedate.String)
// insertdate should not change, updatedate should have a value.
value = &fakeItem{Color: "aqua"}
setItem(t, ods, key, value, nil)
_, newinsertdate, updatedate := getRowData(t, key)
assert.Equal(t, insertdate, newinsertdate) // The insertdate should not change.
assert.NotEqual(t, "", updatedate.String)
deleteItem(t, ods, key, nil)
}
// setTTLUpdatesExpiry proves that the expirydate is set when a TTL is passed for a key.
func setTTLUpdatesExpiry(t *testing.T, ods *OracleDatabase) {
key := randomKey()
value := &fakeItem{Color: "darkgray"}
setOptions := state.SetStateOption{}
setReq := &state.SetRequest{
Key: key,
ETag: nil,
Value: value,
Options: setOptions,
Metadata: map[string]string{
"ttlInSeconds": "1000",
},
}
err := ods.Set(setReq)
assert.Nil(t, err)
connectionString := getConnectionString()
if getWalletLocation() != "" {
connectionString += "?TRACE FILE=trace.log&SSL=enable&SSL Verify=false&WALLET=" + url.QueryEscape(getWalletLocation())
}
db, err := sql.Open("oracle", connectionString)
assert.Nil(t, err)
defer db.Close()
// expirationTime should be set (to a date in the future).
_, _, expirationTime := getTimesForRow(t, key)
assert.NotNil(t, expirationTime)
assert.True(t, expirationTime.Valid, "Expiration Time should have a value after set with TTL value")
deleteItem(t, ods, key, nil)
}
// setNoTTLUpdatesExpiry proves that the expirydate is reset when a state element with expiration time (TTL) loses TTL upon second set without TTL.
func setNoTTLUpdatesExpiry(t *testing.T, ods *OracleDatabase) {
key := randomKey()
value := &fakeItem{Color: "darkorange"}
setOptions := state.SetStateOption{}
setReq := &state.SetRequest{
Key: key,
ETag: nil,
Value: value,
Options: setOptions,
Metadata: map[string]string{
"ttlInSeconds": "1000",
},
}
err := ods.Set(setReq)
assert.Nil(t, err)
delete(setReq.Metadata, "ttlInSeconds")
err = ods.Set(setReq)
assert.Nil(t, err)
connectionString := getConnectionString()
if getWalletLocation() != "" {
connectionString += "?TRACE FILE=trace.log&SSL=enable&SSL Verify=false&WALLET=" + url.QueryEscape(getWalletLocation())
}
db, err := sql.Open("oracle", connectionString)
assert.Nil(t, err)
defer db.Close()
// expirationTime should not be set.
_, _, expirationTime := getTimesForRow(t, key)
assert.True(t, !expirationTime.Valid, "Expiration Time should not have a value after first being set with TTL value and then being set without TTL value")
deleteItem(t, ods, key, nil)
}
// expiredStateCannotBeRead proves that an expired state element can not be read.
func expiredStateCannotBeRead(t *testing.T, ods *OracleDatabase) {
key := randomKey()
value := &fakeItem{Color: "darkgray"}
setOptions := state.SetStateOption{}
setReq := &state.SetRequest{
Key: key,
ETag: nil,
Value: value,
Options: setOptions,
Metadata: map[string]string{
"ttlInSeconds": "1",
},
}
err := ods.Set(setReq)
assert.Nil(t, err)
time.Sleep(time.Second * time.Duration(2))
getResponse, err := ods.Get(&state.GetRequest{Key: key})
assert.Equal(t, &state.GetResponse{}, getResponse, "Response must be empty")
assert.NoError(t, err, "Expired element must not be treated as error")
deleteItem(t, ods, key, nil)
}
// unexpiredStateCanBeRead proves that a state element with TTL - but no yet expired - can be read.
func unexpiredStateCanBeRead(t *testing.T, ods *OracleDatabase) {
key := randomKey()
value := &fakeItem{Color: "dark white"}
setOptions := state.SetStateOption{}
setReq := &state.SetRequest{
Key: key,
ETag: nil,
Value: value,
Options: setOptions,
Metadata: map[string]string{
"ttlInSeconds": "10000",
},
}
err := ods.Set(setReq)
assert.Nil(t, err)
_, getValue := getItem(t, ods, key)
assert.Equal(t, value.Color, getValue.Color, "Response must be as set")
assert.NoError(t, err, "Unexpired element with future expiration time must not be treated as error")
deleteItem(t, ods, key, nil)
}
func setItemWithNoKey(t *testing.T, ods *OracleDatabase) {
setReq := &state.SetRequest{
Key: "",
}
err := ods.Set(setReq)
assert.NotNil(t, err)
}
func TestParseTTL(t *testing.T) {
t.Parallel()
t.Run("TTL Not an integer", func(t *testing.T) {
t.Parallel()
ttlInSeconds := "not an integer"
ttl, err := parseTTL(map[string]string{
"ttlInSeconds": ttlInSeconds,
})
assert.Error(t, err)
assert.Nil(t, ttl)
})
t.Run("TTL specified with wrong key", func(t *testing.T) {
t.Parallel()
ttlInSeconds := 12345
ttl, err := parseTTL(map[string]string{
"expirationTime": strconv.Itoa(ttlInSeconds),
})
assert.NoError(t, err)
assert.Nil(t, ttl)
})
t.Run("TTL is a number", func(t *testing.T) {
t.Parallel()
ttlInSeconds := 12345
ttl, err := parseTTL(map[string]string{
"ttlInSeconds": strconv.Itoa(ttlInSeconds),
})
assert.NoError(t, err)
assert.Equal(t, *ttl, ttlInSeconds)
})
t.Run("TTL not set", func(t *testing.T) {
t.Parallel()
ttl, err := parseTTL(map[string]string{})
assert.NoError(t, err)
assert.Nil(t, ttl)
})
}
func testSetItemWithInvalidTTL(t *testing.T, ods *OracleDatabase) {
setReq := &state.SetRequest{
Key: randomKey(),
Value: &fakeItem{Color: "oceanblue"},
Metadata: (map[string]string{
"ttlInSeconds": "XX",
}),
}
err := ods.Set(setReq)
assert.NotNil(t, err, "Setting a value with a proper key and a incorrect TTL value should be produce an error")
}
func testSetItemWithNegativeTTL(t *testing.T, ods *OracleDatabase) {
setReq := &state.SetRequest{
Key: randomKey(),
Value: &fakeItem{Color: "oceanblue"},
Metadata: (map[string]string{
"ttlInSeconds": "-10",
}),
}
err := ods.Set(setReq)
assert.NotNil(t, err, "Setting a value with a proper key and a negative (other than -1) TTL value should be produce an error")
}
// Tests valid bulk sets and deletes.
func testBulkSetAndBulkDelete(t *testing.T, ods *OracleDatabase) {
setReq := []state.SetRequest{
{
Key: randomKey(),
Value: &fakeItem{Color: "oceanblue"},
},
{
Key: randomKey(),
Value: &fakeItem{Color: "livingwhite"},
},
}
err := ods.BulkSet(setReq)
assert.Nil(t, err)
assert.True(t, storeItemExists(t, setReq[0].Key))
assert.True(t, storeItemExists(t, setReq[1].Key))
deleteReq := []state.DeleteRequest{
{
Key: setReq[0].Key,
},
{
Key: setReq[1].Key,
},
}
err = ods.BulkDelete(deleteReq)
assert.Nil(t, err)
assert.False(t, storeItemExists(t, setReq[0].Key))
assert.False(t, storeItemExists(t, setReq[1].Key))
}
// testInitConfiguration tests valid and invalid config settings.
func testInitConfiguration(t *testing.T) {
logger := logger.NewLogger("test")
tests := []struct {
name string
props map[string]string
expectedErr string
}{
{
name: "Empty",
props: map[string]string{},
expectedErr: errMissingConnectionString,
},
{
name: "Valid connection string",
props: map[string]string{connectionStringKey: getConnectionString(), oracleWalletLocationKey: getWalletLocation()},
expectedErr: "",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := NewOracleDatabaseStateStore(logger)
defer p.Close()
metadata := state.Metadata{
Properties: tt.props,
}
err := p.Init(metadata)
if tt.expectedErr == "" {
assert.Nil(t, err)
} else {
assert.NotNil(t, err)
assert.Equal(t, err.Error(), tt.expectedErr)
}
})
}
}
func getConnectionString() string {
return os.Getenv(connectionStringEnvKey)
}
func getWalletLocation() string {
return os.Getenv(oracleWalletLocationEnvKey)
}
func setItem(t *testing.T, ods *OracleDatabase, key string, value interface{}, etag *string) {
setOptions := state.SetStateOption{}
if etag != nil {
setOptions.Concurrency = state.FirstWrite
}
setReq := &state.SetRequest{
Key: key,
ETag: etag,
Value: value,
Options: setOptions,
}
err := ods.Set(setReq)
assert.Nil(t, err)
itemExists := storeItemExists(t, key)
assert.True(t, itemExists, "Item should exist after set has been executed ")
}
func getItem(t *testing.T, ods *OracleDatabase, key string) (*state.GetResponse, *fakeItem) {
getReq := &state.GetRequest{
Key: key,
Options: state.GetStateOption{},
}
response, getErr := ods.Get(getReq)
assert.Nil(t, getErr)
assert.NotNil(t, response)
outputObject := &fakeItem{}
_ = json.Unmarshal(response.Data, outputObject)
return response, outputObject
}
func deleteItem(t *testing.T, ods *OracleDatabase, key string, etag *string) {
deleteReq := &state.DeleteRequest{
Key: key,
ETag: etag,
Options: state.DeleteStateOption{},
}
deleteErr := ods.Delete(deleteReq)
assert.Nil(t, deleteErr)
assert.False(t, storeItemExists(t, key), "item should no longer exist after delete has been performed")
}
func storeItemExists(t *testing.T, key string) bool {
connectionString := getConnectionString()
if getWalletLocation() != "" {
connectionString += "?TRACE FILE=trace.log&SSL=enable&SSL Verify=false&WALLET=" + url.QueryEscape(getWalletLocation())
}
db, err := sql.Open("oracle", connectionString)
assert.Nil(t, err)
defer db.Close()
var rowCount int32
statement := fmt.Sprintf(`SELECT count(key) FROM %s WHERE key = :key`, tableName)
err = db.QueryRow(statement, key).Scan(&rowCount)
assert.Nil(t, err)
exists := rowCount > 0
return exists
}
func getRowData(t *testing.T, key string) (returnValue string, insertdate sql.NullString, updatedate sql.NullString) {
connectionString := getConnectionString()
if getWalletLocation() != "" {
connectionString += "?TRACE FILE=trace.log&SSL=enable&SSL Verify=false&WALLET=" + url.QueryEscape(getWalletLocation())
}
db, err := sql.Open("oracle", connectionString)
assert.Nil(t, err)
defer db.Close()
err = db.QueryRow(fmt.Sprintf("SELECT value, creation_time, update_time FROM %s WHERE key = :key", tableName), key).Scan(&returnValue, &insertdate, &updatedate)
assert.Nil(t, err)
return returnValue, insertdate, updatedate
}
func getTimesForRow(t *testing.T, key string) (insertdate sql.NullString, updatedate sql.NullString, expirationtime sql.NullString) {
connectionString := getConnectionString()
if getWalletLocation() != "" {
connectionString += "?TRACE FILE=trace.log&SSL=enable&SSL Verify=false&WALLET=" + url.QueryEscape(getWalletLocation())
}
db, err := sql.Open("oracle", connectionString)
assert.Nil(t, err)
defer db.Close()
err = db.QueryRow(fmt.Sprintf("SELECT creation_time, update_time, expiration_time FROM %s WHERE key = :key", tableName), key).Scan(&insertdate, &updatedate, &expirationtime)
assert.Nil(t, err)
return insertdate, updatedate, expirationtime
}
func randomKey() string {
return uuid.New().String()
}
func randomJSON() *fakeItem {
return &fakeItem{Color: randomKey()}
}

View File

@ -0,0 +1,217 @@
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package oracledatabase
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/dapr/components-contrib/state"
"github.com/dapr/kit/logger"
)
const (
fakeConnectionString = "not a real connection"
)
// Fake implementation of interface oracledatabase.dbaccess.
type fakeDBaccess struct {
logger logger.Logger
pingExecuted bool
initExecuted bool
setExecuted bool
getExecuted bool
}
func (m *fakeDBaccess) Ping() error {
m.pingExecuted = true
return nil
}
func (m *fakeDBaccess) Init(metadata state.Metadata) error {
m.initExecuted = true
return nil
}
func (m *fakeDBaccess) Set(req *state.SetRequest) error {
m.setExecuted = true
return nil
}
func (m *fakeDBaccess) Get(req *state.GetRequest) (*state.GetResponse, error) {
m.getExecuted = true
return nil, nil
}
func (m *fakeDBaccess) Delete(req *state.DeleteRequest) error {
return nil
}
func (m *fakeDBaccess) ExecuteMulti(sets []state.SetRequest, deletes []state.DeleteRequest) error {
return nil
}
func (m *fakeDBaccess) Close() error {
return nil
}
// Proves that the Init method runs the init method.
func TestInitRunsDBAccessInit(t *testing.T) {
t.Parallel()
ods, fake := createOracleDatabaseWithFake(t)
ods.Ping()
assert.True(t, fake.initExecuted)
}
func TestMultiWithNoRequestsReturnsNil(t *testing.T) {
t.Parallel()
var operations []state.TransactionalStateOperation
ods := createOracleDatabase(t)
err := ods.Multi(&state.TransactionalStateRequest{
Operations: operations,
})
assert.Nil(t, err)
}
func TestInvalidMultiAction(t *testing.T) {
t.Parallel()
var operations []state.TransactionalStateOperation
operations = append(operations, state.TransactionalStateOperation{
Operation: "Something invalid",
Request: createSetRequest(),
})
ods := createOracleDatabase(t)
err := ods.Multi(&state.TransactionalStateRequest{
Operations: operations,
})
assert.NotNil(t, err)
}
func TestValidSetRequest(t *testing.T) {
t.Parallel()
var operations []state.TransactionalStateOperation
operations = append(operations, state.TransactionalStateOperation{
Operation: state.Upsert,
Request: createSetRequest(),
})
ods := createOracleDatabase(t)
err := ods.Multi(&state.TransactionalStateRequest{
Operations: operations,
})
assert.Nil(t, err)
}
func TestInvalidMultiSetRequest(t *testing.T) {
t.Parallel()
var operations []state.TransactionalStateOperation
operations = append(operations, state.TransactionalStateOperation{
Operation: state.Upsert,
Request: createDeleteRequest(), // Delete request is not valid for Upsert operation.
})
ods := createOracleDatabase(t)
err := ods.Multi(&state.TransactionalStateRequest{
Operations: operations,
})
assert.NotNil(t, err)
}
func TestValidMultiDeleteRequest(t *testing.T) {
t.Parallel()
var operations []state.TransactionalStateOperation
operations = append(operations, state.TransactionalStateOperation{
Operation: state.Delete,
Request: createDeleteRequest(),
})
ods := createOracleDatabase(t)
err := ods.Multi(&state.TransactionalStateRequest{
Operations: operations,
})
assert.Nil(t, err)
}
func TestInvalidMultiDeleteRequest(t *testing.T) {
t.Parallel()
var operations []state.TransactionalStateOperation
operations = append(operations, state.TransactionalStateOperation{
Operation: state.Delete,
Request: createSetRequest(), // Set request is not valid for Delete operation.
})
ods := createOracleDatabase(t)
err := ods.Multi(&state.TransactionalStateRequest{
Operations: operations,
})
assert.NotNil(t, err)
}
func createSetRequest() state.SetRequest {
return state.SetRequest{
Key: randomKey(),
Value: randomJSON(),
}
}
func createDeleteRequest() state.DeleteRequest {
return state.DeleteRequest{
Key: randomKey(),
}
}
func createOracleDatabaseWithFake(t *testing.T) (*OracleDatabase, *fakeDBaccess) {
ods := createOracleDatabase(t)
fake := ods.dbaccess.(*fakeDBaccess)
return ods, fake
}
// Proves that the Ping method runs the ping method.
func TestPingRunsDBAccessPing(t *testing.T) {
t.Parallel()
odb, fake := createOracleDatabaseWithFake(t)
odb.Ping()
assert.True(t, fake.pingExecuted)
}
func createOracleDatabase(t *testing.T) *OracleDatabase {
logger := logger.NewLogger("test")
dba := &fakeDBaccess{
logger: logger,
}
odb := newOracleDatabaseStateStore(logger, dba)
assert.NotNil(t, odb)
metadata := &state.Metadata{
Properties: map[string]string{connectionStringKey: fakeConnectionString},
}
err := odb.Init(*metadata)
assert.Nil(t, err)
assert.NotNil(t, odb.dbaccess)
return odb
}

View File

@ -0,0 +1,381 @@
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package oracledatabase
import (
"database/sql"
"encoding/base64"
"encoding/json"
"fmt"
"net/url"
"strconv"
"github.com/google/uuid"
"github.com/dapr/components-contrib/state"
"github.com/dapr/components-contrib/state/utils"
"github.com/dapr/kit/logger"
// Blank import for the underlying Oracle Database driver.
_ "github.com/sijms/go-ora/v2"
)
const (
connectionStringKey = "connectionString"
oracleWalletLocationKey = "oracleWalletLocation"
metadataTTLKey = "ttlInSeconds"
errMissingConnectionString = "missing connection string"
tableName = "state"
)
// oracleDatabaseAccess implements dbaccess.
type oracleDatabaseAccess struct {
logger logger.Logger
metadata state.Metadata
db *sql.DB
connectionString string
tx *sql.Tx
}
// newOracleDatabaseAccess creates a new instance of oracleDatabaseAccess.
func newOracleDatabaseAccess(logger logger.Logger) *oracleDatabaseAccess {
logger.Debug("Instantiating new Oracle Database state store")
return &oracleDatabaseAccess{
logger: logger,
}
}
func (o *oracleDatabaseAccess) Ping() error {
return o.db.Ping()
}
// Init sets up OracleDatabase connection and ensures that the state table exists.
func (o *oracleDatabaseAccess) Init(metadata state.Metadata) error {
o.logger.Debug("Initializing OracleDatabase state store")
o.metadata = metadata
if val, ok := metadata.Properties[connectionStringKey]; ok && val != "" {
o.connectionString = val
} else {
o.logger.Error("Missing Oracle Database connection string")
return fmt.Errorf(errMissingConnectionString)
}
if val, ok := o.metadata.Properties[oracleWalletLocationKey]; ok && val != "" {
o.connectionString += "?TRACE FILE=trace.log&SSL=enable&SSL Verify=false&WALLET=" + url.QueryEscape(val)
}
db, err := sql.Open("oracle", o.connectionString)
if err != nil {
o.logger.Error(err)
return err
}
o.db = db
if pingErr := db.Ping(); pingErr != nil {
return pingErr
}
err = o.ensureStateTable(tableName)
if err != nil {
return err
}
return nil
}
// Set makes an insert or update to the database.
func (o *oracleDatabaseAccess) Set(req *state.SetRequest) error {
return state.SetWithOptions(o.setValue, req)
}
func parseTTL(requestMetadata map[string]string) (*int, error) {
if val, found := requestMetadata[metadataTTLKey]; found && val != "" {
parsedVal, err := strconv.ParseInt(val, 10, 0)
if err != nil {
return nil, fmt.Errorf("error in parsing ttl metadata : %w", err)
}
parsedInt := int(parsedVal)
return &parsedInt, nil
}
return nil, nil
}
// setValue is an internal implementation of set to enable passing the logic to state.SetWithRetries as a func.
func (o *oracleDatabaseAccess) setValue(req *state.SetRequest) error {
o.logger.Debug("Setting state value in OracleDatabase")
err := state.CheckRequestOptions(req.Options)
if err != nil {
return err
}
if req.Key == "" {
return fmt.Errorf("missing key in set operation")
}
if v, ok := req.Value.(string); ok && v == "" {
return fmt.Errorf("empty string is not allowed in set operation")
}
if req.Options.Concurrency == state.FirstWrite && (req.ETag == nil || len(*req.ETag) == 0) {
o.logger.Debugf("when FirstWrite is to be enforced, a value must be provided for the ETag")
return fmt.Errorf("when FirstWrite is to be enforced, a value must be provided for the ETag")
}
var ttlSeconds int
ttl, ttlerr := parseTTL(req.Metadata)
if ttlerr != nil {
return fmt.Errorf("error in parsing TTL %w", ttlerr)
}
if ttl != nil {
if *ttl == -1 {
o.logger.Debugf("TTL is set to -1; this means: never expire. ")
} else {
if *ttl < -1 {
return fmt.Errorf("incorrect value for %s %d", metadataTTLKey, *ttl)
}
ttlSeconds = *ttl
}
}
requestValue := req.Value
byteArray, isBinary := req.Value.([]uint8)
binaryYN := "N"
if isBinary {
requestValue = base64.StdEncoding.EncodeToString(byteArray)
binaryYN = "Y"
}
// Convert to json string.
bt, _ := utils.Marshal(requestValue, json.Marshal)
value := string(bt)
var result sql.Result
var tx *sql.Tx
if o.tx == nil { // not joining a preexisting transaction.
tx, err = o.db.Begin()
if err != nil {
return fmt.Errorf("failed to start database transaction : %w", err)
}
} else { // join the transaction passed in.
tx = o.tx
}
etag := uuid.New().String()
// Only check for etag if FirstWrite specified - as per Discord message thread https://discord.com/channels/778680217417809931/901141713089863710/938520959562952735.
if req.Options.Concurrency != state.FirstWrite {
// Sprintf is required for table name because sql.DB does not substitute parameters for table names.
// Other parameters use sql.DB parameter substitution.
// As per Discord Thread https://discord.com/channels/778680217417809931/901141713089863710/938520959562952735 expiration time is reset in case of an update.
mergeStatement := fmt.Sprintf(
`MERGE INTO %s t using (select :key key, :value value, :binary_yn binary_yn, :etag etag , :ttl_in_seconds ttl_in_seconds from dual) new_state_to_store
ON (t.key = new_state_to_store.key )
WHEN MATCHED THEN UPDATE SET value = new_state_to_store.value, binary_yn = new_state_to_store.binary_yn, update_time = systimestamp, etag = new_state_to_store.etag, t.expiration_time = case when new_state_to_store.ttl_in_seconds >0 then systimestamp + numtodsinterval(new_state_to_store.ttl_in_seconds, 'SECOND') end
WHEN NOT MATCHED THEN INSERT (t.key, t.value, t.binary_yn, t.etag, t.expiration_time) values (new_state_to_store.key, new_state_to_store.value, new_state_to_store.binary_yn, new_state_to_store.etag, case when new_state_to_store.ttl_in_seconds >0 then systimestamp + numtodsinterval(new_state_to_store.ttl_in_seconds, 'SECOND') end ) `,
tableName)
result, err = tx.Exec(mergeStatement, req.Key, value, binaryYN, etag, ttlSeconds)
} else {
// when first write policy is indicated, an existing record has to be updated - one that has the etag provided.
updateStatement := fmt.Sprintf(
`UPDATE %s SET value = :value, binary_yn = :binary_yn, etag = :new_etag
WHERE key = :key AND etag = :etag`,
tableName)
result, err = tx.Exec(updateStatement, value, binaryYN, etag, req.Key, *req.ETag)
}
if err != nil {
if req.ETag != nil && *req.ETag != "" {
return state.NewETagError(state.ETagMismatch, err)
}
if o.tx == nil { // not in a preexisting transaction so rollback the local, failed tx.
tx.Rollback()
}
return err
}
rows, err := result.RowsAffected()
if err != nil {
return err
}
if o.tx == nil { // local transaction, take responsibility.
tx.Commit()
}
if rows != 1 {
return fmt.Errorf("no item was updated")
}
return nil
}
// Get returns data from the database. If data does not exist for the key an empty state.GetResponse will be returned.
func (o *oracleDatabaseAccess) Get(req *state.GetRequest) (*state.GetResponse, error) {
o.logger.Debug("Getting state value from OracleDatabase")
if req.Key == "" {
return nil, fmt.Errorf("missing key in get operation")
}
var value string
var binaryYN string
var etag string
err := o.db.QueryRow(fmt.Sprintf("SELECT value, binary_yn, etag FROM %s WHERE key = :key and (expiration_time is null or expiration_time > systimestamp)", tableName), req.Key).Scan(&value, &binaryYN, &etag)
if err != nil {
// If no rows exist, return an empty response, otherwise return the error.
if err == sql.ErrNoRows {
return &state.GetResponse{}, nil
}
return nil, err
}
if binaryYN == "Y" {
var s string
var data []byte
if err = json.Unmarshal([]byte(value), &s); err != nil {
return nil, err
}
if data, err = base64.StdEncoding.DecodeString(s); err != nil {
return nil, err
}
return &state.GetResponse{
Data: data,
ETag: &etag,
Metadata: req.Metadata,
}, nil
}
return &state.GetResponse{
Data: []byte(value),
ETag: &etag,
Metadata: req.Metadata,
}, nil
}
// Delete removes an item from the state store.
func (o *oracleDatabaseAccess) Delete(req *state.DeleteRequest) error {
return state.DeleteWithOptions(o.deleteValue, req)
}
// deleteValue is an internal implementation of delete to enable passing the logic to state.DeleteWithRetries as a func.
func (o *oracleDatabaseAccess) deleteValue(req *state.DeleteRequest) error {
o.logger.Debug("Deleting state value from OracleDatabase")
if req.Key == "" {
return fmt.Errorf("missing key in delete operation")
}
if req.Options.Concurrency == state.FirstWrite && (req.ETag == nil || len(*req.ETag) == 0) {
o.logger.Debugf("when FirstWrite is to be enforced, a value must be provided for the ETag")
return fmt.Errorf("when FirstWrite is to be enforced, a value must be provided for the ETag")
}
var result sql.Result
var err error
var tx *sql.Tx
if o.tx == nil { // not joining a preexisting transaction.
tx, err = o.db.Begin()
if err != nil {
return err
}
} else { // join the transaction passed in.
tx = o.tx
}
// QUESTION: only check for etag if FirstWrite specified - or always when etag is supplied??
if req.Options.Concurrency != state.FirstWrite {
result, err = tx.Exec("DELETE FROM state WHERE key = :key", req.Key)
} else {
result, err = tx.Exec("DELETE FROM state WHERE key = :key and etag = :etag", req.Key, *req.ETag)
}
if err != nil {
if o.tx == nil { // not joining a preexisting transaction.
tx.Rollback()
}
return err
}
if o.tx == nil { // not joining a preexisting transaction.
tx.Commit()
}
rows, err := result.RowsAffected()
if err != nil {
return err
}
if rows != 1 && req.ETag != nil && *req.ETag != "" && req.Options.Concurrency == state.FirstWrite {
return state.NewETagError(state.ETagMismatch, nil)
}
return nil
}
func (o *oracleDatabaseAccess) ExecuteMulti(sets []state.SetRequest, deletes []state.DeleteRequest) error {
o.logger.Debug("Executing multiple OracleDatabase operations, within a single transaction")
tx, err := o.db.Begin()
if err != nil {
return err
}
o.tx = tx
if len(deletes) > 0 {
for _, d := range deletes {
da := d // Fix for gosec G601: Implicit memory aliasing in for looo.
err = o.Delete(&da)
if err != nil {
tx.Rollback()
return err
}
}
}
if len(sets) > 0 {
for _, s := range sets {
sa := s // Fix for gosec G601: Implicit memory aliasing in for looo.
err = o.Set(&sa)
if err != nil {
tx.Rollback()
return err
}
}
}
err = tx.Commit()
o.tx = nil
return err
}
// Close implements io.Close.
func (o *oracleDatabaseAccess) Close() error {
if o.db != nil {
return o.db.Close()
}
return nil
}
func (o *oracleDatabaseAccess) ensureStateTable(stateTableName string) error {
exists, err := tableExists(o.db, stateTableName)
if err != nil {
return err
}
if !exists {
o.logger.Info("Creating OracleDatabase state table")
createTable := fmt.Sprintf(`CREATE TABLE %s (
key varchar2(100) NOT NULL PRIMARY KEY,
value clob NOT NULL,
binary_yn varchar2(1) NOT NULL,
etag varchar2(50) NOT NULL,
creation_time TIMESTAMP WITH TIME ZONE DEFAULT SYSTIMESTAMP NOT NULL ,
expiration_time TIMESTAMP WITH TIME ZONE NULL,
update_time TIMESTAMP WITH TIME ZONE NULL)`, stateTableName)
_, err = o.db.Exec(createTable)
if err != nil {
return err
}
}
return nil
}
func tableExists(db *sql.DB, tableName string) (bool, error) {
var tblCount int32
err := db.QueryRow("SELECT count(table_name) tbl_count FROM user_tables where table_name = upper(:tablename)", tableName).Scan(&tblCount)
exists := tblCount > 0
return exists, err
}
// func handleError(msg string, err error) {
// if err != nil {
// fmt.Println(msg, err)
// }
// }

View File

@ -1087,6 +1087,7 @@ github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNX
github.com/shirou/gopsutil/v3 v3.21.6/go.mod h1:JfVbDpIBLVzT8oKbvMg9P3wEIMDDpVn+LwHTKj0ST88=
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sijms/go-ora/v2 v2.2.22/go.mod h1:jzfAFD+4CXHE+LjGWFl6cPrtiIpQVxakI2gvrMF2w6Y=
github.com/sirupsen/logrus v1.0.6/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=

View File

@ -1081,6 +1081,7 @@ github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNX
github.com/shirou/gopsutil/v3 v3.21.6/go.mod h1:JfVbDpIBLVzT8oKbvMg9P3wEIMDDpVn+LwHTKj0ST88=
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sijms/go-ora/v2 v2.2.22/go.mod h1:jzfAFD+4CXHE+LjGWFl6cPrtiIpQVxakI2gvrMF2w6Y=
github.com/sirupsen/logrus v1.0.6/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=

View File

@ -1107,6 +1107,7 @@ github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNX
github.com/shirou/gopsutil/v3 v3.21.6/go.mod h1:JfVbDpIBLVzT8oKbvMg9P3wEIMDDpVn+LwHTKj0ST88=
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sijms/go-ora/v2 v2.2.22/go.mod h1:jzfAFD+4CXHE+LjGWFl6cPrtiIpQVxakI2gvrMF2w6Y=
github.com/sirupsen/logrus v1.0.6/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=

View File

@ -1062,6 +1062,7 @@ github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNX
github.com/shirou/gopsutil/v3 v3.21.6/go.mod h1:JfVbDpIBLVzT8oKbvMg9P3wEIMDDpVn+LwHTKj0ST88=
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sijms/go-ora/v2 v2.2.22/go.mod h1:jzfAFD+4CXHE+LjGWFl6cPrtiIpQVxakI2gvrMF2w6Y=
github.com/sirupsen/logrus v1.0.6/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=

View File

@ -1072,6 +1072,7 @@ github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNX
github.com/shirou/gopsutil/v3 v3.21.6/go.mod h1:JfVbDpIBLVzT8oKbvMg9P3wEIMDDpVn+LwHTKj0ST88=
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sijms/go-ora/v2 v2.2.22/go.mod h1:jzfAFD+4CXHE+LjGWFl6cPrtiIpQVxakI2gvrMF2w6Y=
github.com/sirupsen/logrus v1.0.6/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=

View File

@ -1063,6 +1063,7 @@ github.com/shivamkm07/paho.mqtt.golang v1.3.6-0.20220106130409-e28a1db639f8 h1:B
github.com/shivamkm07/paho.mqtt.golang v1.3.6-0.20220106130409-e28a1db639f8/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA=
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sijms/go-ora/v2 v2.2.22/go.mod h1:jzfAFD+4CXHE+LjGWFl6cPrtiIpQVxakI2gvrMF2w6Y=
github.com/sirupsen/logrus v1.0.6/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=

View File

@ -1062,6 +1062,7 @@ github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNX
github.com/shirou/gopsutil/v3 v3.21.6/go.mod h1:JfVbDpIBLVzT8oKbvMg9P3wEIMDDpVn+LwHTKj0ST88=
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sijms/go-ora/v2 v2.2.22/go.mod h1:jzfAFD+4CXHE+LjGWFl6cPrtiIpQVxakI2gvrMF2w6Y=
github.com/sirupsen/logrus v1.0.6/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=

View File

@ -1083,6 +1083,7 @@ github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNX
github.com/shirou/gopsutil/v3 v3.21.6/go.mod h1:JfVbDpIBLVzT8oKbvMg9P3wEIMDDpVn+LwHTKj0ST88=
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sijms/go-ora/v2 v2.2.22/go.mod h1:jzfAFD+4CXHE+LjGWFl6cPrtiIpQVxakI2gvrMF2w6Y=
github.com/sirupsen/logrus v1.0.6/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=

View File

@ -1063,6 +1063,7 @@ github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNX
github.com/shirou/gopsutil/v3 v3.21.6/go.mod h1:JfVbDpIBLVzT8oKbvMg9P3wEIMDDpVn+LwHTKj0ST88=
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sijms/go-ora/v2 v2.2.22/go.mod h1:jzfAFD+4CXHE+LjGWFl6cPrtiIpQVxakI2gvrMF2w6Y=
github.com/sirupsen/logrus v1.0.6/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=

View File

@ -2,11 +2,13 @@ module github.com/dapr/components-contrib/tests/e2e/pubsub/jetstream
go 1.17
require github.com/dapr/components-contrib v1.5.1
require (
github.com/dapr/components-contrib v1.5.1
github.com/dapr/kit v0.0.2-0.20210614175626-b9074b64d233
)
require (
github.com/cenkalti/backoff/v4 v4.1.1 // indirect
github.com/dapr/kit v0.0.2-0.20210614175626-b9074b64d233 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/json-iterator/go v1.1.11 // indirect
github.com/minio/highwayhash v1.0.2 // indirect

File diff suppressed because it is too large Load Diff