From acfb21c6b01831c4f35ee733f7dc7e6e402c8d1a Mon Sep 17 00:00:00 2001 From: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> Date: Fri, 31 Mar 2023 20:06:24 +0000 Subject: [PATCH] RethinkDB: Disable transactions that were not working The implementation of the "Multi" method in RethinkDB was not correct, as it caused operations to be executed in the wrong order and it was not using a transaction (per comment on code: `best effort, no transacts supported`) Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> --- state/rethinkdb/rethinkdb.go | 53 ++++----------------- state/rethinkdb/rethinkdb_test.go | 78 +------------------------------ 2 files changed, 11 insertions(+), 120 deletions(-) diff --git a/state/rethinkdb/rethinkdb.go b/state/rethinkdb/rethinkdb.go index cbd268e17..e842b9943 100644 --- a/state/rethinkdb/rethinkdb.go +++ b/state/rethinkdb/rethinkdb.go @@ -52,10 +52,10 @@ type stateConfig struct { } type stateRecord struct { - ID string `json:"id" rethinkdb:"id"` - TS int64 `json:"timestamp" rethinkdb:"timestamp"` - Hash string `json:"hash,omitempty" rethinkdb:"hash,omitempty"` - Data interface{} `json:"data,omitempty" rethinkdb:"data,omitempty"` + ID string `json:"id" rethinkdb:"id"` + TS int64 `json:"timestamp" rethinkdb:"timestamp"` + Hash string `json:"hash,omitempty" rethinkdb:"hash,omitempty"` + Data any `json:"data,omitempty" rethinkdb:"data,omitempty"` } // NewRethinkDBStateStore returns a new RethinkDB state store. @@ -215,6 +215,7 @@ func (s *RethinkDB) Set(ctx context.Context, req *state.SetRequest) error { // BulkSet performs a bulk save operation. func (s *RethinkDB) BulkSet(ctx context.Context, req []state.SetRequest) error { docs := make([]*stateRecord, len(req)) + now := time.Now().UnixNano() for i, v := range req { var etag string if v.ETag != nil { @@ -223,7 +224,7 @@ func (s *RethinkDB) BulkSet(ctx context.Context, req []state.SetRequest) error { docs[i] = &stateRecord{ ID: v.Key, - TS: time.Now().UTC().UnixNano(), + TS: now, Data: v.Value, Hash: etag, } @@ -278,9 +279,9 @@ func (s *RethinkDB) Delete(ctx context.Context, req *state.DeleteRequest) error // BulkDelete performs a bulk delete operation. func (s *RethinkDB) BulkDelete(ctx context.Context, req []state.DeleteRequest) error { - list := make([]string, 0) - for _, d := range req { - list = append(list, d.Key) + list := make([]string, len(req)) + for i, d := range req { + list[i] = d.Key } c, err := r.Table(s.config.Table).GetAll(r.Args(list)).Delete().Run(s.session, r.RunOpts{Context: ctx}) @@ -292,42 +293,6 @@ func (s *RethinkDB) BulkDelete(ctx context.Context, req []state.DeleteRequest) e return nil } -// Multi performs multiple operations. -func (s *RethinkDB) Multi(ctx context.Context, req *state.TransactionalStateRequest) error { - upserts := make([]state.SetRequest, 0) - deletes := make([]state.DeleteRequest, 0) - - for _, v := range req.Operations { - switch v.Operation { - case state.Upsert: - r, ok := v.Request.(state.SetRequest) - if !ok { - return fmt.Errorf("invalid request type (expected SetRequest, got %t)", v.Request) - } - upserts = append(upserts, r) - case state.Delete: - r, ok := v.Request.(state.DeleteRequest) - if !ok { - return fmt.Errorf("invalid request type (expected DeleteRequest, got %t)", v.Request) - } - deletes = append(deletes, r) - default: - return fmt.Errorf("invalid operation type: %s", v.Operation) - } - } - - // best effort, no transacts supported - if err := s.BulkSet(ctx, upserts); err != nil { - return fmt.Errorf("error saving records to the database: %w", err) - } - - if err := s.BulkDelete(ctx, deletes); err != nil { - return fmt.Errorf("error deleting records to the database: %w", err) - } - - return nil -} - func metadataToConfig(cfg map[string]string, logger logger.Logger) (*stateConfig, error) { // defaults c := stateConfig{ diff --git a/state/rethinkdb/rethinkdb_test.go b/state/rethinkdb/rethinkdb_test.go index 6079b4e3f..b17eaae26 100644 --- a/state/rethinkdb/rethinkdb_test.go +++ b/state/rethinkdb/rethinkdb_test.go @@ -185,7 +185,7 @@ func testBulk(t *testing.T, db *RethinkDB, i int) { // check for the data for _, v := range deleteList { resp, err := db.Get(context.Background(), &state.GetRequest{Key: v.Key}) - assert.Nilf(t, err, " -- run %d", i) + assert.NoErrorf(t, err, " -- run %d", i) assert.NotNil(t, resp) assert.NotNil(t, resp.Data) } @@ -198,86 +198,12 @@ func testBulk(t *testing.T, db *RethinkDB, i int) { // check for the data NOT being there for _, v := range deleteList { resp, err := db.Get(context.Background(), &state.GetRequest{Key: v.Key}) - assert.Nilf(t, err, " -- run %d", i) + assert.NoErrorf(t, err, " -- run %d", i) assert.NotNil(t, resp) assert.Nil(t, resp.Data) } } -// go test -timeout 30s github.com/dapr/components-contrib/state/rethinkdb -run ^TestRethinkDBStateStoreMulti$ -count 1 -v. -func TestRethinkDBStateStoreMulti(t *testing.T) { - if !isLiveTest() { - t.SkipNow() - } - - m := state.Metadata{Base: metadata.Base{Properties: getTestMetadata()}} - db := NewRethinkDBStateStore(logger.NewLogger("test")).(*RethinkDB) - if err := db.Init(context.Background(), m); err != nil { - t.Fatalf("error initializing db: %v", err) - } - - numOfRecords := 4 - recordIDFormat := "multi-%d" - t.Run("With multi", func(t *testing.T) { - // create data list - d := []byte("test") - list := make([]state.SetRequest, numOfRecords) - for i := 0; i < numOfRecords; i++ { - list[i] = state.SetRequest{Key: fmt.Sprintf(recordIDFormat, i), Value: d} - } - if err := db.BulkSet(context.Background(), list); err != nil { - t.Fatalf("error setting multi to db: %v", err) - } - - // test multi - d2 := []byte("test") - req := &state.TransactionalStateRequest{ - Operations: []state.TransactionalStateOperation{ - { - Operation: state.Upsert, - Request: state.SetRequest{ - Key: fmt.Sprintf(recordIDFormat, 0), - Value: d2, - }, - }, - { - Operation: state.Upsert, - Request: state.SetRequest{ - Key: fmt.Sprintf(recordIDFormat, 1), - Value: d2, - }, - }, - { - Operation: state.Delete, - Request: state.DeleteRequest{Key: fmt.Sprintf(recordIDFormat, 2)}, - }, - { - Operation: state.Delete, - Request: state.DeleteRequest{Key: fmt.Sprintf(recordIDFormat, 3)}, - }, - }, - } - - // execute multi - if err := db.Multi(context.Background(), req); err != nil { - t.Fatalf("error setting multi to db: %v", err) - } - - // the one not deleted should be still there - m1, err := db.Get(context.Background(), &state.GetRequest{Key: fmt.Sprintf(recordIDFormat, 1)}) - assert.Nil(t, err) - assert.NotNil(t, m1) - assert.NotNil(t, m1.Data) - assert.Equal(t, string(d2), string(m1.Data)) - - // the one deleted should not - m2, err := db.Get(context.Background(), &state.GetRequest{Key: fmt.Sprintf(recordIDFormat, 3)}) - assert.Nil(t, err) - assert.NotNil(t, m2) - assert.Nil(t, m2.Data) - }) -} - type testObj struct { F1 string `json:"f1"` F2 int `json:"f2"`