Update transaction API to execute sequentially (#1542)

This commit is contained in:
Shubham Sharma 2022-03-10 01:07:28 +05:30 committed by GitHub
parent 794e42225a
commit eecb3015c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 23 additions and 53 deletions

View File

@ -343,50 +343,46 @@ func (s *SQLServer) Features() []state.Feature {
// Multi performs multiple updates on a Sql server store.
func (s *SQLServer) Multi(request *state.TransactionalStateRequest) error {
keyMap := make(map[string]struct{})
var sets []state.SetRequest
var deletes []state.DeleteRequest
tx, err := s.db.Begin()
if err != nil {
return err
}
// The order of unique key operations does not matter in an atomic transaction.
// Only the latest operation for any unique key is selected for execution.
// The other operations are redundant, and hence ignored.
for i := len(request.Operations) - 1; i >= 0; i-- {
req := request.Operations[i]
for _, req := range request.Operations {
switch req.Operation {
case state.Upsert:
setReq, err := s.getSets(req)
if err != nil {
tx.Rollback()
return err
}
_, ok := keyMap[setReq.Key]
if !ok {
sets = append(sets, setReq)
keyMap[setReq.Key] = struct{}{}
err = s.executeSet(tx, &setReq)
if err != nil {
tx.Rollback()
return err
}
case state.Delete:
delReq, err := s.getDeletes(req)
if err != nil {
tx.Rollback()
return err
}
_, ok := keyMap[delReq.Key]
if !ok {
deletes = append(deletes, delReq)
keyMap[delReq.Key] = struct{}{}
err = s.executeDelete(tx, &delReq)
if err != nil {
tx.Rollback()
return err
}
default:
tx.Rollback()
return fmt.Errorf("unsupported operation: %s", req.Operation)
}
}
if len(sets) > 0 || len(deletes) > 0 {
return s.executeMulti(sets, deletes)
}
return nil
return tx.Commit()
}
// Returns the set requests.
@ -417,38 +413,12 @@ func (s *SQLServer) getDeletes(req state.TransactionalStateOperation) (state.Del
return delReq, nil
}
func (s *SQLServer) executeMulti(sets []state.SetRequest, deletes []state.DeleteRequest) error {
tx, err := s.db.Begin()
if err != nil {
return err
}
if len(deletes) > 0 {
switch err = s.executeBulkDelete(tx, deletes); err.(type) {
case nil, *state.BulkDeleteRowMismatchError:
default:
tx.Rollback()
return err
}
}
if len(sets) > 0 {
for i := range sets {
err = s.executeSet(tx, &sets[i])
if err != nil {
tx.Rollback()
return err
}
}
}
return tx.Commit()
}
// Delete removes an entity from the store.
func (s *SQLServer) Delete(req *state.DeleteRequest) error {
return s.executeDelete(s.db, req)
}
func (s *SQLServer) executeDelete(db dbExecutor, req *state.DeleteRequest) error {
var err error
var res sql.Result
if req.ETag != nil {
@ -458,9 +428,9 @@ func (s *SQLServer) Delete(req *state.DeleteRequest) error {
return state.NewETagError(state.ETagInvalid, err)
}
res, err = s.db.Exec(s.deleteWithETagCommand, sql.Named(keyColumnName, req.Key), sql.Named(rowVersionColumnName, b))
res, err = db.Exec(s.deleteWithETagCommand, sql.Named(keyColumnName, req.Key), sql.Named(rowVersionColumnName, b))
} else {
res, err = s.db.Exec(s.deleteWithoutETagCommand, sql.Named(keyColumnName, req.Key))
res, err = db.Exec(s.deleteWithoutETagCommand, sql.Named(keyColumnName, req.Key))
}
// err represents errors thrown by the stored procedure or the database itself