Sql component should save db object on Init instead of creating a new (#399)
one per operation. Separately, if data arrives as []byte (as in grpc case) don't Marshal it. Co-authored-by: LM <lemai>
This commit is contained in:
parent
5892feca63
commit
65575593c1
|
@ -97,6 +97,7 @@ type SQLServer struct {
|
|||
deleteWithoutETagCommand string
|
||||
|
||||
logger logger.Logger
|
||||
db *sql.DB
|
||||
}
|
||||
|
||||
func isLetterOrNumber(c rune) bool {
|
||||
|
@ -234,6 +235,11 @@ func (s *SQLServer) Init(metadata state.Metadata) error {
|
|||
s.deleteWithETagCommand = mr.deleteWithETagCommand
|
||||
s.deleteWithoutETagCommand = mr.deleteWithoutETagCommand
|
||||
|
||||
s.db, err = sql.Open("sqlserver", s.connectionString)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -281,14 +287,7 @@ func (s *SQLServer) Multi(reqs []state.TransactionalRequest) error {
|
|||
}
|
||||
|
||||
func (s *SQLServer) executeMulti(sets []state.SetRequest, deletes []state.DeleteRequest) error {
|
||||
db, err := sql.Open("sqlserver", s.connectionString)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer db.Close()
|
||||
|
||||
tx, err := db.Begin()
|
||||
tx, err := s.db.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -316,13 +315,7 @@ func (s *SQLServer) executeMulti(sets []state.SetRequest, deletes []state.Delete
|
|||
|
||||
// Delete removes an entity from the store
|
||||
func (s *SQLServer) Delete(req *state.DeleteRequest) error {
|
||||
db, err := sql.Open("sqlserver", s.connectionString)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer db.Close()
|
||||
|
||||
var err error
|
||||
var res sql.Result
|
||||
if req.ETag != "" {
|
||||
var b []byte
|
||||
|
@ -331,9 +324,9 @@ func (s *SQLServer) Delete(req *state.DeleteRequest) error {
|
|||
return err
|
||||
}
|
||||
|
||||
res, err = db.Exec(s.deleteWithETagCommand, sql.Named(keyColumnName, req.Key), sql.Named(rowVersionColumnName, b))
|
||||
res, err = s.db.Exec(s.deleteWithETagCommand, sql.Named(keyColumnName, req.Key), sql.Named(rowVersionColumnName, b))
|
||||
} else {
|
||||
res, err = db.Exec(s.deleteWithoutETagCommand, sql.Named(keyColumnName, req.Key))
|
||||
res, err = s.db.Exec(s.deleteWithoutETagCommand, sql.Named(keyColumnName, req.Key))
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
|
@ -360,14 +353,7 @@ type TvpDeleteTableStringKey struct {
|
|||
|
||||
// BulkDelete removes multiple entries from the store
|
||||
func (s *SQLServer) BulkDelete(req []state.DeleteRequest) error {
|
||||
db, err := sql.Open("sqlserver", s.connectionString)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer db.Close()
|
||||
|
||||
tx, err := db.Begin()
|
||||
tx, err := s.db.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -422,14 +408,7 @@ func (s *SQLServer) executeBulkDelete(db dbExecutor, req []state.DeleteRequest)
|
|||
|
||||
// Get returns an entity from store
|
||||
func (s *SQLServer) Get(req *state.GetRequest) (*state.GetResponse, error) {
|
||||
db, err := sql.Open("sqlserver", s.connectionString)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer db.Close()
|
||||
|
||||
rows, err := db.Query(s.getCommand, sql.Named(keyColumnName, req.Key))
|
||||
rows, err := s.db.Query(s.getCommand, sql.Named(keyColumnName, req.Key))
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -462,14 +441,7 @@ func (s *SQLServer) Get(req *state.GetRequest) (*state.GetResponse, error) {
|
|||
|
||||
// Set adds/updates an entity on store
|
||||
func (s *SQLServer) Set(req *state.SetRequest) error {
|
||||
db, err := sql.Open("sqlserver", s.connectionString)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer db.Close()
|
||||
|
||||
return s.executeSet(db, req)
|
||||
return s.executeSet(s.db, req)
|
||||
}
|
||||
|
||||
// dbExecutor implements a common functionality implemented by db or tx
|
||||
|
@ -478,9 +450,16 @@ type dbExecutor interface {
|
|||
}
|
||||
|
||||
func (s *SQLServer) executeSet(db dbExecutor, req *state.SetRequest) error {
|
||||
json, err := json.Marshal(req.Value)
|
||||
if err != nil {
|
||||
return err
|
||||
var err error
|
||||
var bytes []byte
|
||||
b, ok := req.Value.([]byte)
|
||||
if ok {
|
||||
bytes = b
|
||||
} else {
|
||||
bytes, err = json.Marshal(req.Value)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
etag := sql.Named(rowVersionColumnName, nil)
|
||||
|
@ -492,7 +471,7 @@ func (s *SQLServer) executeSet(db dbExecutor, req *state.SetRequest) error {
|
|||
}
|
||||
etag.Value = b
|
||||
}
|
||||
res, err := db.Exec(s.upsertCommand, sql.Named(keyColumnName, req.Key), sql.Named("Data", string(json)), etag)
|
||||
res, err := db.Exec(s.upsertCommand, sql.Named(keyColumnName, req.Key), sql.Named("Data", string(bytes)), etag)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -511,14 +490,7 @@ func (s *SQLServer) executeSet(db dbExecutor, req *state.SetRequest) error {
|
|||
|
||||
// BulkSet adds/updates multiple entities on store
|
||||
func (s *SQLServer) BulkSet(req []state.SetRequest) error {
|
||||
db, err := sql.Open("sqlserver", s.connectionString)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer db.Close()
|
||||
|
||||
tx, err := db.Begin()
|
||||
tx, err := s.db.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue