Adds Redis stream trimming by time(stream ID) (#3710)

Signed-off-by: Elena Kolevska <elena@kolevska.com>
Co-authored-by: Cassie Coyle <cassie@diagrid.io>
Co-authored-by: Nelson Parente <nelson_parente@live.com.pt>
Co-authored-by: Yaron Schneider <schneider.yaron@live.com>
Co-authored-by: Bernd Verst <github@bernd.dev>
This commit is contained in:
Elena Kolevska 2025-05-06 18:59:30 +01:00 committed by GitHub
parent 14921af0e1
commit 5f17025027
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 69 additions and 9 deletions

View File

@ -82,7 +82,7 @@ type RedisClient interface {
ConfigurationSubscribe(ctx context.Context, args *ConfigurationSubscribeArgs)
SetNX(ctx context.Context, key string, value interface{}, expiration time.Duration) (*bool, error)
EvalInt(ctx context.Context, script string, keys []string, args ...interface{}) (*int, error, error)
XAdd(ctx context.Context, stream string, maxLenApprox int64, values map[string]interface{}) (string, error)
XAdd(ctx context.Context, stream string, maxLenApprox int64, streamTTL string, values map[string]interface{}) (string, error)
XGroupCreateMkStream(ctx context.Context, stream string, group string, start string) error
XAck(ctx context.Context, stream string, group string, messageID string) error
XReadGroupResult(ctx context.Context, group string, consumer string, streams []string, count int64, block time.Duration) ([]RedisXStream, error)

View File

@ -102,6 +102,9 @@ type Settings struct {
// The max len of stream
MaxLenApprox int64 `mapstructure:"maxLenApprox" mdonly:"pubsub"`
// The TTL of stream entries
StreamTTL time.Duration `mapstructure:"streamTTL" mdonly:"pubsub"`
// EntraID / AzureAD Authentication based on the shared code which essentially uses the DefaultAzureCredential
// from the official Azure Identity SDK for Go
UseEntraID bool `mapstructure:"useEntraID" mapstructurealiases:"useAzureAD"`
@ -127,6 +130,15 @@ func (s *Settings) SetCertificate(fn func(cert *tls.Certificate)) error {
return nil
}
func (s *Settings) GetMinID(now time.Time) string {
// If StreamTTL is not set, return empty string (no trimming)
if s.StreamTTL == 0 {
return ""
}
return fmt.Sprintf("%d-1", now.Add(-s.StreamTTL).UnixMilli())
}
type Duration time.Duration
func (r *Duration) DecodeString(value string) error {

View File

@ -3,6 +3,7 @@ package redis
import (
"crypto/tls"
"testing"
"time"
"github.com/stretchr/testify/require"
)
@ -39,4 +40,34 @@ func TestSettings(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, c)
})
t.Run("stream TTL", func(t *testing.T) {
fixedTime := time.Date(2025, 3, 14, 0o1, 59, 26, 0, time.UTC)
tests := []struct {
name string
streamTTL time.Duration
want string
}{
{
name: "with one hour TTL",
streamTTL: time.Hour,
want: "1741913966000-1",
},
{
name: "with zero TTL",
streamTTL: 0,
want: "",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
settings := &Settings{
StreamTTL: tt.streamTTL,
}
require.Equal(t, tt.want, settings.GetMinID(fixedTime))
})
}
})
}

View File

@ -161,7 +161,7 @@ func (c v8Client) SetNX(ctx context.Context, key string, value interface{}, expi
return &val, nx.Err()
}
func (c v8Client) XAdd(ctx context.Context, stream string, maxLenApprox int64, values map[string]interface{}) (string, error) {
func (c v8Client) XAdd(ctx context.Context, stream string, maxLenApprox int64, minIDApprox string, values map[string]interface{}) (string, error) {
var writeCtx context.Context
if c.writeTimeout > 0 {
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.writeTimeout))
@ -171,9 +171,11 @@ func (c v8Client) XAdd(ctx context.Context, stream string, maxLenApprox int64, v
writeCtx = ctx
}
return c.client.XAdd(writeCtx, &v8.XAddArgs{
Stream: stream,
Values: values,
MaxLenApprox: maxLenApprox,
Stream: stream,
Values: values,
MaxLen: maxLenApprox,
MinID: minIDApprox,
Approx: true,
}).Result()
}

View File

@ -161,7 +161,7 @@ func (c v9Client) SetNX(ctx context.Context, key string, value interface{}, expi
return &val, nx.Err()
}
func (c v9Client) XAdd(ctx context.Context, stream string, maxLenApprox int64, values map[string]interface{}) (string, error) {
func (c v9Client) XAdd(ctx context.Context, stream string, maxLenApprox int64, minIDApprox string, values map[string]interface{}) (string, error) {
var writeCtx context.Context
if c.writeTimeout > 0 {
timeoutCtx, cancel := context.WithTimeout(ctx, time.Duration(c.writeTimeout))
@ -174,6 +174,7 @@ func (c v9Client) XAdd(ctx context.Context, stream string, maxLenApprox int64, v
Stream: stream,
Values: values,
MaxLen: maxLenApprox,
MinID: minIDApprox,
Approx: true,
}).Result()
}

View File

@ -165,7 +165,7 @@ metadata:
- name: failover
required: false
description: |
Property to enabled failover configuration. Needs sentinalMasterName to be set. Defaults to "false"
Property to enabled failover configuration. Needs sentinelMasterName to be set. Defaults to "false"
example: "false"
type: bool
- name: sentinelMasterName
@ -175,9 +175,19 @@ metadata:
type: string
- name: maxLenApprox
required: false
description: Maximum number of items inside a stream.The old entries are automatically evicted when the specified length is reached, so that the stream is left at a constant size. Defaults to unlimited.
description: Maximum number of items inside a stream. The old entries are automatically evicted when the specified length is reached, so that the stream is left at a constant size. Defaults to unlimited.
example: "10000"
type: number
- name: streamTTL
required: false
description: |
TTL duration for stream entries. Entries older than this duration will be evicted.
This is an approximate value, as it's implemented using Redis stream's MINID trimming with the '~' modifier.
The actual retention may include slightly more entries than strictly defined by the TTL,
as Redis optimizes the trimming operation for efficiency by potentially keeping some additional entries.
example: "30d"
type: duration
builtinAuthenticationProfiles:
- name: "azuread"
metadata:

View File

@ -38,6 +38,7 @@ const (
queueDepth = "queueDepth"
concurrency = "concurrency"
maxLenApprox = "maxLenApprox"
streamTTL = "streamTTL"
)
// redisStreams handles consuming from a Redis stream using
@ -112,7 +113,7 @@ func (r *redisStreams) Publish(ctx context.Context, req *pubsub.PublishRequest)
redisPayload["metadata"] = serializedMetadata
}
_, err := r.client.XAdd(ctx, req.Topic, r.clientSettings.MaxLenApprox, redisPayload)
_, err := r.client.XAdd(ctx, req.Topic, r.clientSettings.MaxLenApprox, r.clientSettings.GetMinID(time.Now()), redisPayload)
if err != nil {
return fmt.Errorf("redis streams: error from publish: %s", err)
}

View File

@ -19,6 +19,7 @@ import (
"strconv"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -35,6 +36,7 @@ func getFakeProperties() map[string]string {
consumerID: "fakeConsumer",
enableTLS: "true",
maxLenApprox: "1000",
streamTTL: "1h",
}
}
@ -54,6 +56,7 @@ func TestParseRedisMetadata(t *testing.T) {
require.NoError(t, err)
assert.Equal(t, fakeProperties[consumerID], m.ConsumerID)
assert.Equal(t, int64(1000), m.MaxLenApprox)
assert.Equal(t, 1*time.Hour, m.StreamTTL)
})
// TODO: fix the code to return the error for the missing property to make this test work