RethinkDB: Disable transactions that were not working (#2733)
This commit is contained in:
commit
c65319b85c
|
@ -52,10 +52,10 @@ type stateConfig struct {
|
|||
}
|
||||
|
||||
type stateRecord struct {
|
||||
ID string `json:"id" rethinkdb:"id"`
|
||||
TS int64 `json:"timestamp" rethinkdb:"timestamp"`
|
||||
Hash string `json:"hash,omitempty" rethinkdb:"hash,omitempty"`
|
||||
Data interface{} `json:"data,omitempty" rethinkdb:"data,omitempty"`
|
||||
ID string `json:"id" rethinkdb:"id"`
|
||||
TS int64 `json:"timestamp" rethinkdb:"timestamp"`
|
||||
Hash string `json:"hash,omitempty" rethinkdb:"hash,omitempty"`
|
||||
Data any `json:"data,omitempty" rethinkdb:"data,omitempty"`
|
||||
}
|
||||
|
||||
// NewRethinkDBStateStore returns a new RethinkDB state store.
|
||||
|
@ -215,6 +215,7 @@ func (s *RethinkDB) Set(ctx context.Context, req *state.SetRequest) error {
|
|||
// BulkSet performs a bulk save operation.
|
||||
func (s *RethinkDB) BulkSet(ctx context.Context, req []state.SetRequest) error {
|
||||
docs := make([]*stateRecord, len(req))
|
||||
now := time.Now().UnixNano()
|
||||
for i, v := range req {
|
||||
var etag string
|
||||
if v.ETag != nil {
|
||||
|
@ -223,7 +224,7 @@ func (s *RethinkDB) BulkSet(ctx context.Context, req []state.SetRequest) error {
|
|||
|
||||
docs[i] = &stateRecord{
|
||||
ID: v.Key,
|
||||
TS: time.Now().UTC().UnixNano(),
|
||||
TS: now,
|
||||
Data: v.Value,
|
||||
Hash: etag,
|
||||
}
|
||||
|
@ -278,9 +279,9 @@ func (s *RethinkDB) Delete(ctx context.Context, req *state.DeleteRequest) error
|
|||
|
||||
// BulkDelete performs a bulk delete operation.
|
||||
func (s *RethinkDB) BulkDelete(ctx context.Context, req []state.DeleteRequest) error {
|
||||
list := make([]string, 0)
|
||||
for _, d := range req {
|
||||
list = append(list, d.Key)
|
||||
list := make([]string, len(req))
|
||||
for i, d := range req {
|
||||
list[i] = d.Key
|
||||
}
|
||||
|
||||
c, err := r.Table(s.config.Table).GetAll(r.Args(list)).Delete().Run(s.session, r.RunOpts{Context: ctx})
|
||||
|
@ -292,42 +293,6 @@ func (s *RethinkDB) BulkDelete(ctx context.Context, req []state.DeleteRequest) e
|
|||
return nil
|
||||
}
|
||||
|
||||
// Multi performs multiple operations.
|
||||
func (s *RethinkDB) Multi(ctx context.Context, req *state.TransactionalStateRequest) error {
|
||||
upserts := make([]state.SetRequest, 0)
|
||||
deletes := make([]state.DeleteRequest, 0)
|
||||
|
||||
for _, v := range req.Operations {
|
||||
switch v.Operation {
|
||||
case state.Upsert:
|
||||
r, ok := v.Request.(state.SetRequest)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid request type (expected SetRequest, got %t)", v.Request)
|
||||
}
|
||||
upserts = append(upserts, r)
|
||||
case state.Delete:
|
||||
r, ok := v.Request.(state.DeleteRequest)
|
||||
if !ok {
|
||||
return fmt.Errorf("invalid request type (expected DeleteRequest, got %t)", v.Request)
|
||||
}
|
||||
deletes = append(deletes, r)
|
||||
default:
|
||||
return fmt.Errorf("invalid operation type: %s", v.Operation)
|
||||
}
|
||||
}
|
||||
|
||||
// best effort, no transacts supported
|
||||
if err := s.BulkSet(ctx, upserts); err != nil {
|
||||
return fmt.Errorf("error saving records to the database: %w", err)
|
||||
}
|
||||
|
||||
if err := s.BulkDelete(ctx, deletes); err != nil {
|
||||
return fmt.Errorf("error deleting records to the database: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func metadataToConfig(cfg map[string]string, logger logger.Logger) (*stateConfig, error) {
|
||||
// defaults
|
||||
c := stateConfig{
|
||||
|
|
|
@ -185,7 +185,7 @@ func testBulk(t *testing.T, db *RethinkDB, i int) {
|
|||
// check for the data
|
||||
for _, v := range deleteList {
|
||||
resp, err := db.Get(context.Background(), &state.GetRequest{Key: v.Key})
|
||||
assert.Nilf(t, err, " -- run %d", i)
|
||||
assert.NoErrorf(t, err, " -- run %d", i)
|
||||
assert.NotNil(t, resp)
|
||||
assert.NotNil(t, resp.Data)
|
||||
}
|
||||
|
@ -198,86 +198,12 @@ func testBulk(t *testing.T, db *RethinkDB, i int) {
|
|||
// check for the data NOT being there
|
||||
for _, v := range deleteList {
|
||||
resp, err := db.Get(context.Background(), &state.GetRequest{Key: v.Key})
|
||||
assert.Nilf(t, err, " -- run %d", i)
|
||||
assert.NoErrorf(t, err, " -- run %d", i)
|
||||
assert.NotNil(t, resp)
|
||||
assert.Nil(t, resp.Data)
|
||||
}
|
||||
}
|
||||
|
||||
// go test -timeout 30s github.com/dapr/components-contrib/state/rethinkdb -run ^TestRethinkDBStateStoreMulti$ -count 1 -v.
|
||||
func TestRethinkDBStateStoreMulti(t *testing.T) {
|
||||
if !isLiveTest() {
|
||||
t.SkipNow()
|
||||
}
|
||||
|
||||
m := state.Metadata{Base: metadata.Base{Properties: getTestMetadata()}}
|
||||
db := NewRethinkDBStateStore(logger.NewLogger("test")).(*RethinkDB)
|
||||
if err := db.Init(context.Background(), m); err != nil {
|
||||
t.Fatalf("error initializing db: %v", err)
|
||||
}
|
||||
|
||||
numOfRecords := 4
|
||||
recordIDFormat := "multi-%d"
|
||||
t.Run("With multi", func(t *testing.T) {
|
||||
// create data list
|
||||
d := []byte("test")
|
||||
list := make([]state.SetRequest, numOfRecords)
|
||||
for i := 0; i < numOfRecords; i++ {
|
||||
list[i] = state.SetRequest{Key: fmt.Sprintf(recordIDFormat, i), Value: d}
|
||||
}
|
||||
if err := db.BulkSet(context.Background(), list); err != nil {
|
||||
t.Fatalf("error setting multi to db: %v", err)
|
||||
}
|
||||
|
||||
// test multi
|
||||
d2 := []byte("test")
|
||||
req := &state.TransactionalStateRequest{
|
||||
Operations: []state.TransactionalStateOperation{
|
||||
{
|
||||
Operation: state.Upsert,
|
||||
Request: state.SetRequest{
|
||||
Key: fmt.Sprintf(recordIDFormat, 0),
|
||||
Value: d2,
|
||||
},
|
||||
},
|
||||
{
|
||||
Operation: state.Upsert,
|
||||
Request: state.SetRequest{
|
||||
Key: fmt.Sprintf(recordIDFormat, 1),
|
||||
Value: d2,
|
||||
},
|
||||
},
|
||||
{
|
||||
Operation: state.Delete,
|
||||
Request: state.DeleteRequest{Key: fmt.Sprintf(recordIDFormat, 2)},
|
||||
},
|
||||
{
|
||||
Operation: state.Delete,
|
||||
Request: state.DeleteRequest{Key: fmt.Sprintf(recordIDFormat, 3)},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
// execute multi
|
||||
if err := db.Multi(context.Background(), req); err != nil {
|
||||
t.Fatalf("error setting multi to db: %v", err)
|
||||
}
|
||||
|
||||
// the one not deleted should be still there
|
||||
m1, err := db.Get(context.Background(), &state.GetRequest{Key: fmt.Sprintf(recordIDFormat, 1)})
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, m1)
|
||||
assert.NotNil(t, m1.Data)
|
||||
assert.Equal(t, string(d2), string(m1.Data))
|
||||
|
||||
// the one deleted should not
|
||||
m2, err := db.Get(context.Background(), &state.GetRequest{Key: fmt.Sprintf(recordIDFormat, 3)})
|
||||
assert.Nil(t, err)
|
||||
assert.NotNil(t, m2)
|
||||
assert.Nil(t, m2.Data)
|
||||
})
|
||||
}
|
||||
|
||||
type testObj struct {
|
||||
F1 string `json:"f1"`
|
||||
F2 int `json:"f2"`
|
||||
|
|
Loading…
Reference in New Issue