fix add add a ctx context variable in the Redis struct. And implement `Close() error` function.
This commit is contained in:
parent
c55d88cd98
commit
b1c81523c2
|
|
@ -53,6 +53,9 @@ type StateStore struct {
|
|||
|
||||
features []state.Feature
|
||||
logger logger.Logger
|
||||
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
// NewRedisStateStore returns a new redis state store
|
||||
|
|
@ -141,7 +144,9 @@ func (r *StateStore) Init(metadata state.Metadata) error {
|
|||
r.client = r.newClient(m)
|
||||
}
|
||||
|
||||
if _, err = r.client.Ping(context.Background()).Result(); err != nil {
|
||||
r.ctx, r.cancel = context.WithCancel(context.Background())
|
||||
|
||||
if _, err = r.client.Ping(r.ctx).Result(); err != nil {
|
||||
return fmt.Errorf("redis store: error connecting to redis at %s: %s", m.host, err)
|
||||
}
|
||||
|
||||
|
|
@ -196,7 +201,7 @@ func (r *StateStore) newFailoverClient(m metadata) *redis.Client {
|
|||
}
|
||||
|
||||
func (r *StateStore) getConnectedSlaves() (int, error) {
|
||||
res, err := r.client.Do(context.Background(), "INFO", "replication").Result()
|
||||
res, err := r.client.Do(r.ctx, "INFO", "replication").Result()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
|
|
@ -229,7 +234,7 @@ func (r *StateStore) deleteValue(req *state.DeleteRequest) error {
|
|||
etag := "0"
|
||||
req.ETag = &etag
|
||||
}
|
||||
_, err := r.client.Do(context.Background(), "EVAL", delQuery, 1, req.Key, *req.ETag).Result()
|
||||
_, err := r.client.Do(r.ctx, "EVAL", delQuery, 1, req.Key, *req.ETag).Result()
|
||||
if err != nil {
|
||||
return state.NewETagError(state.ETagMismatch, err)
|
||||
}
|
||||
|
|
@ -248,7 +253,7 @@ func (r *StateStore) Delete(req *state.DeleteRequest) error {
|
|||
}
|
||||
|
||||
func (r *StateStore) directGet(req *state.GetRequest) (*state.GetResponse, error) {
|
||||
res, err := r.client.Do(context.Background(), "GET", req.Key).Result()
|
||||
res, err := r.client.Do(r.ctx, "GET", req.Key).Result()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -266,7 +271,7 @@ func (r *StateStore) directGet(req *state.GetRequest) (*state.GetResponse, error
|
|||
|
||||
// Get retrieves state from redis with a key
|
||||
func (r *StateStore) Get(req *state.GetRequest) (*state.GetResponse, error) {
|
||||
res, err := r.client.Do(context.Background(), "HGETALL", req.Key).Result() // Prefer values with ETags
|
||||
res, err := r.client.Do(r.ctx, "HGETALL", req.Key).Result() // Prefer values with ETags
|
||||
if err != nil {
|
||||
return r.directGet(req) // Falls back to original get for backward compats.
|
||||
}
|
||||
|
|
@ -301,7 +306,7 @@ func (r *StateStore) setValue(req *state.SetRequest) error {
|
|||
|
||||
bt, _ := utils.Marshal(req.Value, r.json.Marshal)
|
||||
|
||||
_, err = r.client.Do(context.Background(), "EVAL", setQuery, 1, req.Key, ver, bt).Result()
|
||||
_, err = r.client.Do(r.ctx, "EVAL", setQuery, 1, req.Key, ver, bt).Result()
|
||||
if err != nil {
|
||||
if req.ETag != nil {
|
||||
return state.NewETagError(state.ETagMismatch, err)
|
||||
|
|
@ -311,7 +316,7 @@ func (r *StateStore) setValue(req *state.SetRequest) error {
|
|||
}
|
||||
|
||||
if req.Options.Consistency == state.Strong && r.replicas > 0 {
|
||||
_, err = r.client.Do(context.Background(), "WAIT", r.replicas, 1000).Result()
|
||||
_, err = r.client.Do(r.ctx, "WAIT", r.replicas, 1000).Result()
|
||||
if err != nil {
|
||||
return fmt.Errorf("redis waiting for %v replicas to acknowledge write, err: %s", r.replicas, err.Error())
|
||||
}
|
||||
|
|
@ -336,18 +341,18 @@ func (r *StateStore) Multi(request *state.TransactionalStateRequest) error {
|
|||
return err
|
||||
}
|
||||
bt, _ := utils.Marshal(req.Value, r.json.Marshal)
|
||||
pipe.Do(context.Background(), "EVAL", setQuery, 1, req.Key, ver, bt)
|
||||
pipe.Do(r.ctx, "EVAL", setQuery, 1, req.Key, ver, bt)
|
||||
} else if o.Operation == state.Delete {
|
||||
req := o.Request.(state.DeleteRequest)
|
||||
if req.ETag == nil {
|
||||
etag := "0"
|
||||
req.ETag = &etag
|
||||
}
|
||||
pipe.Do(context.Background(), "EVAL", delQuery, 1, req.Key, *req.ETag)
|
||||
pipe.Do(r.ctx, "EVAL", delQuery, 1, req.Key, *req.ETag)
|
||||
}
|
||||
}
|
||||
|
||||
_, err := pipe.Exec(context.Background())
|
||||
_, err := pipe.Exec(r.ctx)
|
||||
|
||||
return err
|
||||
}
|
||||
|
|
@ -385,3 +390,9 @@ func (r *StateStore) parseETag(req *state.SetRequest) (int, error) {
|
|||
|
||||
return ver, nil
|
||||
}
|
||||
|
||||
func (r *StateStore) Close() error {
|
||||
r.cancel()
|
||||
|
||||
return r.client.Close()
|
||||
}
|
||||
|
|
@ -122,6 +122,7 @@ func TestTransactionalUpsert(t *testing.T) {
|
|||
json: jsoniter.ConfigFastest,
|
||||
logger: logger.NewLogger("test"),
|
||||
}
|
||||
ss.ctx, ss.cancel = context.WithCancel(context.Background())
|
||||
|
||||
err := ss.Multi(&state.TransactionalStateRequest{
|
||||
Operations: []state.TransactionalStateOperation{{
|
||||
|
|
@ -153,6 +154,7 @@ func TestTransactionalDelete(t *testing.T) {
|
|||
json: jsoniter.ConfigFastest,
|
||||
logger: logger.NewLogger("test"),
|
||||
}
|
||||
ss.ctx, ss.cancel = context.WithCancel(context.Background())
|
||||
|
||||
// Insert a record first.
|
||||
ss.Set(&state.SetRequest{
|
||||
|
|
@ -188,6 +190,7 @@ func TestTransactionalDeleteNoEtag(t *testing.T) {
|
|||
json: jsoniter.ConfigFastest,
|
||||
logger: logger.NewLogger("test"),
|
||||
}
|
||||
ss.ctx, ss.cancel = context.WithCancel(context.Background())
|
||||
|
||||
// Insert a record first.
|
||||
ss.Set(&state.SetRequest{
|
||||
|
|
|
|||
Loading…
Reference in New Issue