Adds Default TTL Option for Redis State Store (#1059)
* Adds Default TTL Option for Redis State Store * Add additional tests * Parse correct property key Co-authored-by: Bernd Verst <me@bernd.dev>
This commit is contained in:
parent
253ef854c5
commit
aa7d2ee1dd
|
|
@ -10,4 +10,5 @@ import "time"
|
|||
type metadata struct {
|
||||
maxRetries int
|
||||
maxRetryBackoff time.Duration
|
||||
ttlInSeconds *int
|
||||
}
|
||||
|
|
|
|||
|
|
@ -87,6 +87,17 @@ func parseRedisMetadata(meta state.Metadata) (metadata, error) {
|
|||
m.maxRetryBackoff = time.Duration(parsedVal)
|
||||
}
|
||||
|
||||
if val, ok := meta.Properties[ttlInSeconds]; ok && val != "" {
|
||||
parsedVal, err := strconv.ParseInt(val, defaultBase, defaultBitSize)
|
||||
if err != nil {
|
||||
return m, fmt.Errorf("redis store error: can't parse ttlInSeconds field: %s", err)
|
||||
}
|
||||
intVal := int(parsedVal)
|
||||
m.ttlInSeconds = &intVal
|
||||
} else {
|
||||
m.ttlInSeconds = nil
|
||||
}
|
||||
|
||||
return m, nil
|
||||
}
|
||||
|
||||
|
|
@ -235,6 +246,10 @@ func (r *StateStore) setValue(req *state.SetRequest) error {
|
|||
if err != nil {
|
||||
return fmt.Errorf("failed to parse ttl from metadata: %s", err)
|
||||
}
|
||||
// apply global TTL
|
||||
if ttl == nil {
|
||||
ttl = r.metadata.ttlInSeconds
|
||||
}
|
||||
|
||||
bt, _ := utils.Marshal(req.Value, r.json.Marshal)
|
||||
|
||||
|
|
@ -295,6 +310,11 @@ func (r *StateStore) Multi(request *state.TransactionalStateRequest) error {
|
|||
if err != nil {
|
||||
return fmt.Errorf("failed to parse ttl from metadata: %s", err)
|
||||
}
|
||||
// apply global TTL
|
||||
if ttl == nil {
|
||||
ttl = r.metadata.ttlInSeconds
|
||||
}
|
||||
|
||||
bt, _ := utils.Marshal(req.Value, r.json.Marshal)
|
||||
pipe.Do(r.ctx, "EVAL", setQuery, 1, req.Key, ver, bt)
|
||||
if ttl != nil && *ttl > 0 {
|
||||
|
|
|
|||
|
|
@ -308,6 +308,101 @@ func TestPing(t *testing.T) {
|
|||
assert.Error(t, err)
|
||||
}
|
||||
|
||||
func TestRequestsWithGlobalTTL(t *testing.T) {
|
||||
s, c := setupMiniredis()
|
||||
defer s.Close()
|
||||
|
||||
globalTTLInSeconds := 100
|
||||
|
||||
ss := &StateStore{
|
||||
client: c,
|
||||
json: jsoniter.ConfigFastest,
|
||||
logger: logger.NewLogger("test"),
|
||||
metadata: metadata{ttlInSeconds: &globalTTLInSeconds},
|
||||
}
|
||||
ss.ctx, ss.cancel = context.WithCancel(context.Background())
|
||||
|
||||
t.Run("TTL: Only global specified", func(t *testing.T) {
|
||||
ss.Set(&state.SetRequest{
|
||||
Key: "weapon100",
|
||||
Value: "deathstar100",
|
||||
})
|
||||
ttl, _ := ss.client.TTL(ss.ctx, "weapon100").Result()
|
||||
|
||||
assert.Equal(t, time.Duration(globalTTLInSeconds)*time.Second, ttl)
|
||||
})
|
||||
|
||||
t.Run("TTL: Global and Request specified", func(t *testing.T) {
|
||||
requestTTL := 200
|
||||
ss.Set(&state.SetRequest{
|
||||
Key: "weapon100",
|
||||
Value: "deathstar100",
|
||||
Metadata: map[string]string{
|
||||
"ttlInSeconds": strconv.Itoa(requestTTL),
|
||||
},
|
||||
})
|
||||
ttl, _ := ss.client.TTL(ss.ctx, "weapon100").Result()
|
||||
|
||||
assert.Equal(t, time.Duration(requestTTL)*time.Second, ttl)
|
||||
})
|
||||
|
||||
t.Run("TTL: Global and Request specified", func(t *testing.T) {
|
||||
err := ss.Multi(&state.TransactionalStateRequest{
|
||||
Operations: []state.TransactionalStateOperation{
|
||||
{
|
||||
Operation: state.Upsert,
|
||||
Request: state.SetRequest{
|
||||
Key: "weapon",
|
||||
Value: "deathstar",
|
||||
},
|
||||
},
|
||||
{
|
||||
Operation: state.Upsert,
|
||||
Request: state.SetRequest{
|
||||
Key: "weapon2",
|
||||
Value: "deathstar2",
|
||||
Metadata: map[string]string{
|
||||
"ttlInSeconds": "123",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Operation: state.Upsert,
|
||||
Request: state.SetRequest{
|
||||
Key: "weapon3",
|
||||
Value: "deathstar3",
|
||||
Metadata: map[string]string{
|
||||
"ttlInSeconds": "-1",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
assert.Equal(t, nil, err)
|
||||
|
||||
res, err := c.Do(context.Background(), "HGETALL", "weapon").Result()
|
||||
assert.Equal(t, nil, err)
|
||||
|
||||
vals := res.([]interface{})
|
||||
data, version, err := ss.getKeyVersion(vals)
|
||||
assert.Equal(t, nil, err)
|
||||
assert.Equal(t, ptr.String("1"), version)
|
||||
assert.Equal(t, `"deathstar"`, data)
|
||||
|
||||
res, err = c.Do(context.Background(), "TTL", "weapon").Result()
|
||||
assert.Equal(t, nil, err)
|
||||
assert.Equal(t, int64(globalTTLInSeconds), res)
|
||||
|
||||
res, err = c.Do(context.Background(), "TTL", "weapon2").Result()
|
||||
assert.Equal(t, nil, err)
|
||||
assert.Equal(t, int64(123), res)
|
||||
|
||||
res, err = c.Do(context.Background(), "TTL", "weapon3").Result()
|
||||
assert.Equal(t, nil, err)
|
||||
assert.Equal(t, int64(-1), res)
|
||||
})
|
||||
}
|
||||
|
||||
func TestSetRequestWithTTL(t *testing.T) {
|
||||
s, c := setupMiniredis()
|
||||
defer s.Close()
|
||||
|
|
|
|||
Loading…
Reference in New Issue