Merge pull request #2535 from JoshVanL/state-store-ttl-mongodb

Add TTL support to mongodb state store
This commit is contained in:
Bernd Verst 2023-02-22 12:20:10 -08:00 committed by GitHub
commit 3bc8eaeeac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 155 additions and 24 deletions

View File

@ -35,6 +35,7 @@ import (
"github.com/dapr/components-contrib/metadata"
"github.com/dapr/components-contrib/state"
"github.com/dapr/components-contrib/state/query"
stateutils "github.com/dapr/components-contrib/state/utils"
"github.com/dapr/kit/logger"
"github.com/dapr/kit/ptr"
)
@ -51,6 +52,8 @@ const (
id = "_id"
value = "value"
etag = "_etag"
ttl = "_ttl"
ttlDollar = "$" + ttl
defaultTimeout = 5 * time.Second
defaultDatabaseName = "daprStore"
@ -146,9 +149,19 @@ func (m *MongoDB) Init(ctx context.Context, metadata state.Metadata) error {
m.metadata = *meta
opts := options.Collection().SetWriteConcern(wc).SetReadConcern(rc)
collection := m.client.Database(meta.DatabaseName).Collection(meta.CollectionName, opts)
m.collection = m.client.Database(meta.DatabaseName).Collection(meta.CollectionName, opts)
m.collection = collection
// Set expireAfterSeconds index on ttl field with a value of 0 to delete
// values immediately when the TTL value is reached.
// MongoDB TTL Indexes: https://docs.mongodb.com/manual/core/index-ttl/
// TTL fields are deleted at most 60 seconds after the TTL value is reached.
_, err = m.collection.Indexes().CreateOne(context.TODO(), mongo.IndexModel{
Keys: bson.M{ttl: 1},
Options: options.Index().SetExpireAfterSeconds(0),
})
if err != nil {
return fmt.Errorf("error in creating ttl index: %s", err)
}
return nil
}
@ -192,20 +205,78 @@ func (m *MongoDB) setInternal(ctx context.Context, req *state.SetRequest) error
if req.ETag != nil {
filter[etag] = *req.ETag
} else if req.Options.Concurrency == state.FirstWrite {
filter[etag] = uuid.NewString()
uuid, err := uuid.NewRandom()
if err != nil {
return err
}
filter[etag] = uuid.String()
}
update := bson.M{"$set": bson.M{id: req.Key, value: v, etag: uuid.NewString()}}
_, err := m.collection.UpdateOne(ctx, filter, update, options.Update().SetUpsert(true))
reqTTL, err := stateutils.ParseTTL(req.Metadata)
if err != nil {
return fmt.Errorf("failed to parse TTL: %w", err)
}
return err
etagV, err := uuid.NewRandom()
if err != nil {
return err
}
update := make(mongo.Pipeline, 2)
update[0] = bson.D{{Key: "$set", Value: bson.D{
{Key: id, Value: req.Key},
{Key: value, Value: v},
{Key: etag, Value: etagV.String()},
}}}
if reqTTL != nil {
update[1] = primitive.D{{
Key: "$addFields", Value: bson.D{
{
Key: ttl, Value: bson.D{
{
Key: "$add", Value: bson.A{
// MongoDB stores time in milliseconds so multiply seconds by 1000.
"$$NOW", *reqTTL * 1000,
},
},
},
},
},
}}
} else {
update[1] = primitive.D{
{Key: "$addFields", Value: bson.D{
{Key: ttl, Value: nil},
}},
}
}
_, err = m.collection.UpdateOne(ctx, filter, update, options.Update().SetUpsert(true))
if err != nil {
return fmt.Errorf("error in updating document: %s", err)
}
return nil
}
// Get retrieves state from MongoDB with a key.
func (m *MongoDB) Get(ctx context.Context, req *state.GetRequest) (*state.GetResponse, error) {
// Since MongoDB doesn't delete the document immediately when the TTL value
// is reached, we need to filter out the documents with TTL value less than
// the current time.
filter := bson.D{
{Key: "$and", Value: bson.A{
bson.D{{Key: id, Value: bson.M{"$eq": req.Key}}},
bson.D{{Key: "$expr", Value: bson.D{
{Key: "$or", Value: bson.A{
bson.D{{Key: "$eq", Value: bson.A{ttlDollar, primitive.Null{}}}},
bson.D{{Key: "$gte", Value: bson.A{ttlDollar, "$$NOW"}}},
}},
}}},
}},
}
var result Item
filter := bson.M{id: req.Key}
err := m.collection.FindOne(ctx, filter).Decode(&result)
if err != nil {
if err == mongo.ErrNoDocuments {

View File

@ -9,6 +9,7 @@ require (
github.com/dapr/go-sdk v1.7.0
github.com/dapr/kit v0.0.4
github.com/stretchr/testify v1.8.1
go.mongodb.org/mongo-driver v1.11.1
)
require (
@ -103,7 +104,6 @@ require (
github.com/xdg-go/scram v1.1.2 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect
go.mongodb.org/mongo-driver v1.11.1 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/otel v1.11.2 // indirect
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.11.2 // indirect

View File

@ -1,23 +1,29 @@
package mongodb_test
import (
"errors"
"fmt"
"github.com/dapr/components-contrib/tests/certification/flow/network"
"github.com/dapr/go-sdk/client"
"testing"
"time"
"github.com/dapr/go-sdk/client"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"github.com/dapr/components-contrib/state"
stateMongoDB "github.com/dapr/components-contrib/state/mongodb"
"github.com/dapr/components-contrib/tests/certification/embedded"
"github.com/dapr/components-contrib/tests/certification/flow"
"github.com/dapr/components-contrib/tests/certification/flow/dockercompose"
"github.com/dapr/components-contrib/tests/certification/flow/network"
"github.com/dapr/components-contrib/tests/certification/flow/sidecar"
stateLoader "github.com/dapr/dapr/pkg/components/state"
"github.com/dapr/dapr/pkg/runtime"
daprTesting "github.com/dapr/dapr/pkg/testing"
"github.com/dapr/kit/logger"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const (
@ -40,7 +46,6 @@ func TestMongoDB(t *testing.T) {
return stateStore
}, "mongodb")
// var rdb redis.Client
currentGrpcPort := ports[0]
currentHTTPPort := ports[1]
@ -65,7 +70,7 @@ func TestMongoDB(t *testing.T) {
errUpdate := client.SaveState(ctx, stateStoreName, certificationTestPrefix+"key1", []byte("mongodbCertUpdate"), nil)
assert.NoError(t, errUpdate)
item, errUpdatedGet := client.GetState(ctx, stateStoreName, certificationTestPrefix+"key1", nil)
assert.NoError(t, errUpdatedGet)
require.NoError(t, errUpdatedGet)
assert.Equal(t, "mongodbCertUpdate", string(item.Value))
// delete state
@ -90,9 +95,56 @@ func TestMongoDB(t *testing.T) {
return nil
}
// Time-To-Live Test
timeToLiveTest := func(sidecarname string) func(ctx flow.Context) error {
return func(ctx flow.Context) error {
client, err := client.NewClientWithPort(fmt.Sprint(currentGrpcPort))
require.NoError(t, err)
defer client.Close()
assert.Error(t, client.SaveState(ctx, stateStoreName, certificationTestPrefix+"ttl1", []byte("mongodbCert"), map[string]string{
"ttlInSeconds": "mock value",
}))
assert.NoError(t, client.SaveState(ctx, stateStoreName, certificationTestPrefix+"ttl2", []byte("mongodbCert2"), map[string]string{
"ttlInSeconds": "-1",
}))
assert.NoError(t, client.SaveState(ctx, stateStoreName, certificationTestPrefix+"ttl3", []byte("mongodbCert3"), map[string]string{
"ttlInSeconds": "3",
}))
// Check we have the correct database ID for the TTL test.
cl, err := mongo.Connect(ctx, options.Client().ApplyURI("mongodb://localhost:27017/?directConnection=true"))
require.NoError(t, err)
resp := cl.Database("admin").
Collection("daprCollection").
FindOne(ctx, bson.M{"_id": sidecarname + "||stable-certification-ttl3"})
require.NoError(t, resp.Err())
// get state
item, err := client.GetState(ctx, stateStoreName, certificationTestPrefix+"ttl3", nil)
assert.NoError(t, err)
assert.Equal(t, "mongodbCert3", string(item.Value))
assert.Eventually(t, func() bool {
item, err = client.GetState(ctx, stateStoreName, certificationTestPrefix+"ttl3", nil)
require.NoError(t, err)
return len(item.Value) == 0
}, time.Second*7, time.Millisecond*500)
// MongoDB will delete a document after a maximum of 60 seconds.
assert.Eventually(t, func() bool {
resp := cl.Database("admin").
Collection("daprCollection").
FindOne(ctx, bson.M{"_id": sidecarname + "||stable-certification-ttl3"})
return resp.Err() != nil && errors.Is(resp.Err(), mongo.ErrNoDocuments)
}, time.Second*60, time.Millisecond*500)
return nil
}
}
flow.New(t, "Connecting MongoDB And Verifying majority of the tests for a replica set here").
Step(dockercompose.Run("mongodb", dockerComposeClusterYAML)).
Step("Waiting for component to start...", flow.Sleep(10*time.Second)).
Step("Waiting for component to start...", flow.Sleep(20*time.Second)).
Step(sidecar.Run(sidecarNamePrefix+"dockerClusterDefault",
embedded.WithoutApp(),
embedded.WithDaprGRPCPort(currentGrpcPort),
@ -101,6 +153,7 @@ func TestMongoDB(t *testing.T) {
runtime.WithStates(stateRegistry))).
Step("Waiting for component to load...", flow.Sleep(10*time.Second)).
Step("Run basic test", basicTest).
Step("Run time to live test", timeToLiveTest(sidecarNamePrefix+"dockerClusterDefault")).
Step("Interrupt network",
network.InterruptNetwork(5*time.Second, nil, nil, "27017:27017")).
// Component should recover at this point.
@ -115,7 +168,7 @@ func TestMongoDB(t *testing.T) {
flow.New(t, "Connecting MongoDB And Verifying majority of the tests for a replica set "+
"here with valid read, write concerns and operation timeout").
Step(dockercompose.Run("mongodb", dockerComposeClusterYAML)).
Step("Waiting for component to start...", flow.Sleep(10*time.Second)).
Step("Waiting for component to start...", flow.Sleep(20*time.Second)).
Step(sidecar.Run(sidecarNamePrefix+"dockerClusterValidReadWriteConcernAndTimeout",
embedded.WithoutApp(),
embedded.WithDaprGRPCPort(currentGrpcPort),
@ -124,6 +177,7 @@ func TestMongoDB(t *testing.T) {
runtime.WithStates(stateRegistry))).
Step("Waiting for component to load...", flow.Sleep(10*time.Second)).
Step("Run basic test", basicTest).
Step("Run time to live test", timeToLiveTest(sidecarNamePrefix+"dockerClusterValidReadWriteConcernAndTimeout")).
Step("Interrupt network",
network.InterruptNetwork(5*time.Second, nil, nil, "27017:27017")).
// Component should recover at this point.
@ -138,7 +192,7 @@ func TestMongoDB(t *testing.T) {
flow.New(t, "Connecting MongoDB And Verifying majority of the tests here for a single node with valid read, "+
"write concerns and operation timeout").
Step(dockercompose.Run("mongodb", dockerComposeSingleYAML)).
Step("Waiting for component to start...", flow.Sleep(10*time.Second)).
Step("Waiting for component to start...", flow.Sleep(20*time.Second)).
Step(sidecar.Run(sidecarNamePrefix+"dockerSingleNode",
embedded.WithoutApp(),
embedded.WithDaprGRPCPort(currentGrpcPort),
@ -147,6 +201,7 @@ func TestMongoDB(t *testing.T) {
runtime.WithStates(stateRegistry))).
Step("Waiting for component to load...", flow.Sleep(10*time.Second)).
Step("Run basic test", basicTest).
Step("Run time to live test", timeToLiveTest(sidecarNamePrefix+"dockerSingleNode")).
Step("Interrupt network",
network.InterruptNetwork(5*time.Second, nil, nil, "27017:27017")).
// Component should recover at this point.

View File

@ -7,7 +7,7 @@ components:
allOperations: false
operations: [ "set", "get", "delete", "bulkset", "bulkdelete", "transaction", "etag", "first-write" ]
- component: mongodb
operations: [ "set", "get", "delete", "bulkset", "bulkdelete", "transaction", "etag", "first-write", "query" ]
operations: [ "set", "get", "delete", "bulkset", "bulkdelete", "transaction", "etag", "first-write", "query", "ttl" ]
- component: memcached
allOperations: false
operations: [ "set", "get", "delete", "bulkset", "bulkdelete", "ttl" ]
@ -44,6 +44,10 @@ components:
- component: cloudflare.workerskv
allOperations: false
# Although this component supports TTLs, the minimum TTL is 60s, which makes it not suitable for our conformance tests
# TODO: perhaps create a special case `ttl60` operation for this component
# where the test would set a TTL of 60s for this particular operation.
# `ttl` and `ttl60` are mutually exclusive, and `allOperations` would
# exclude `ttl60`
operations: [ "set", "get", "delete", "bulkset", "bulkdelete"]
- component: cockroachdb
allOperations: false
@ -59,4 +63,4 @@ components:
operations: [ "set", "get", "delete", "etag", "bulkset", "bulkdelete", "first-write" ]
- component: aws.dynamodb.terraform
allOperations: false
operations: [ "set", "get", "delete", "etag", "bulkset", "bulkdelete", "first-write" ]
operations: [ "set", "get", "delete", "etag", "bulkset", "bulkdelete", "first-write" ]

View File

@ -801,12 +801,13 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St
assertEquals(t, "⏱️", res)
// Wait for the object to expire and request again
time.Sleep(3 * time.Second)
res, err = statestore.Get(context.Background(), &state.GetRequest{
Key: key + "-ttl",
})
require.NoError(t, err)
assert.Nil(t, res.Data)
assert.Eventually(t, func() bool {
res, err = statestore.Get(context.Background(), &state.GetRequest{
Key: key + "-ttl",
})
require.NoError(t, err)
return res.Data == nil
}, time.Second*3, 200*time.Millisecond, "expected object to have been deleted in time")
})
}
}