From 7402ff5cf9899f8ed566e1d29bafde0a0183158a Mon Sep 17 00:00:00 2001 From: joshvanl Date: Wed, 15 Feb 2023 16:34:05 +0000 Subject: [PATCH 1/8] Add TTL support to mongodb state store Signed-off-by: joshvanl --- state/mongodb/mongodb.go | 39 +++++++++++++-- .../state/mongodb/mongodb_test.go | 47 ++++++++++++++++--- tests/config/state/tests.yml | 8 +++- tests/conformance/state/state.go | 13 ++--- 4 files changed, 87 insertions(+), 20 deletions(-) diff --git a/state/mongodb/mongodb.go b/state/mongodb/mongodb.go index 0ee1aff47..2ebae20e1 100644 --- a/state/mongodb/mongodb.go +++ b/state/mongodb/mongodb.go @@ -35,6 +35,7 @@ import ( "github.com/dapr/components-contrib/metadata" "github.com/dapr/components-contrib/state" "github.com/dapr/components-contrib/state/query" + stateutils "github.com/dapr/components-contrib/state/utils" "github.com/dapr/kit/logger" "github.com/dapr/kit/ptr" ) @@ -51,6 +52,7 @@ const ( id = "_id" value = "value" etag = "_etag" + daprttl = "_dapr_ttl" defaultTimeout = 5 * time.Second defaultDatabaseName = "daprStore" @@ -146,9 +148,18 @@ func (m *MongoDB) Init(metadata state.Metadata) error { m.metadata = *meta opts := options.Collection().SetWriteConcern(wc).SetReadConcern(rc) - collection := m.client.Database(meta.DatabaseName).Collection(meta.CollectionName, opts) + m.collection = m.client.Database(meta.DatabaseName).Collection(meta.CollectionName, opts) - m.collection = collection + // Set expireAfterSeconds index on ttl field with a value of 0 to delete + // values immediately when the TTL value is reached. + // MongoDB TTL Indexes: https://docs.mongodb.com/manual/core/index-ttl/ + // TTL fields are deleted at most 60 seconds after the TTL value is reached. + if _, err := m.collection.Indexes().CreateOne(context.Background(), mongo.IndexModel{ + Keys: bson.M{daprttl: 1}, + Options: options.Index().SetExpireAfterSeconds(0), + }); err != nil { + return fmt.Errorf("error in creating ttl index: %s", err) + } return nil } @@ -195,10 +206,28 @@ func (m *MongoDB) setInternal(ctx context.Context, req *state.SetRequest) error filter[etag] = uuid.NewString() } - update := bson.M{"$set": bson.M{id: req.Key, value: v, etag: uuid.NewString()}} - _, err := m.collection.UpdateOne(ctx, filter, update, options.Update().SetUpsert(true)) + reqTTL, err := stateutils.ParseTTL(req.Metadata) + if err != nil { + return fmt.Errorf("failed to parse TTL: %w", err) + } - return err + var ttl any + if reqTTL != nil { + ttl = primitive.NewDateTimeFromTime(time.Now().Add(time.Second * time.Duration(*reqTTL))) + } + + update := bson.D{{"$set", bson.D{ + {id, req.Key}, + {value, v}, + {etag, uuid.NewString()}, + {daprttl, ttl}, + }}} + _, err = m.collection.UpdateOne(ctx, filter, update, options.Update().SetUpsert(true)) + if err != nil { + return fmt.Errorf("error in updating document: %s", err) + } + + return nil } // Get retrieves state from MongoDB with a key. diff --git a/tests/certification/state/mongodb/mongodb_test.go b/tests/certification/state/mongodb/mongodb_test.go index f7789574e..22416703a 100644 --- a/tests/certification/state/mongodb/mongodb_test.go +++ b/tests/certification/state/mongodb/mongodb_test.go @@ -2,22 +2,24 @@ package mongodb_test import ( "fmt" - "github.com/dapr/components-contrib/tests/certification/flow/network" - "github.com/dapr/go-sdk/client" "testing" "time" + "github.com/dapr/go-sdk/client" + "github.com/dapr/components-contrib/state" stateMongoDB "github.com/dapr/components-contrib/state/mongodb" "github.com/dapr/components-contrib/tests/certification/embedded" "github.com/dapr/components-contrib/tests/certification/flow" "github.com/dapr/components-contrib/tests/certification/flow/dockercompose" + "github.com/dapr/components-contrib/tests/certification/flow/network" "github.com/dapr/components-contrib/tests/certification/flow/sidecar" stateLoader "github.com/dapr/dapr/pkg/components/state" "github.com/dapr/dapr/pkg/runtime" daprTesting "github.com/dapr/dapr/pkg/testing" "github.com/dapr/kit/logger" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) const ( @@ -40,7 +42,6 @@ func TestMongoDB(t *testing.T) { return stateStore }, "mongodb") - // var rdb redis.Client currentGrpcPort := ports[0] currentHTTPPort := ports[1] @@ -65,7 +66,7 @@ func TestMongoDB(t *testing.T) { errUpdate := client.SaveState(ctx, stateStoreName, certificationTestPrefix+"key1", []byte("mongodbCertUpdate"), nil) assert.NoError(t, errUpdate) item, errUpdatedGet := client.GetState(ctx, stateStoreName, certificationTestPrefix+"key1", nil) - assert.NoError(t, errUpdatedGet) + require.NoError(t, errUpdatedGet) assert.Equal(t, "mongodbCertUpdate", string(item.Value)) // delete state @@ -90,9 +91,38 @@ func TestMongoDB(t *testing.T) { return nil } + // Time-To-Live Test + timeToLiveTest := func(ctx flow.Context) error { + client, err := client.NewClientWithPort(fmt.Sprint(currentGrpcPort)) + require.NoError(t, err) + defer client.Close() + + assert.Error(t, client.SaveState(ctx, stateStoreName, certificationTestPrefix+"ttl1", []byte("mongodbCert"), map[string]string{ + "ttlInSeconds": "mock value", + })) + assert.NoError(t, client.SaveState(ctx, stateStoreName, certificationTestPrefix+"ttl2", []byte("mongodbCert2"), map[string]string{ + "ttlInSeconds": "-1", + })) + assert.NoError(t, client.SaveState(ctx, stateStoreName, certificationTestPrefix+"ttl3", []byte("mongodbCert3"), map[string]string{ + "ttlInSeconds": "1", + })) + + // get state + item, err := client.GetState(ctx, stateStoreName, certificationTestPrefix+"ttl3", nil) + assert.NoError(t, err) + assert.Equal(t, "mongodbCert3", string(item.Value)) + assert.Eventually(t, func() bool { + item, err = client.GetState(ctx, stateStoreName, certificationTestPrefix+"ttl3", nil) + assert.NoError(t, err) + return len(item.Value) == 0 + }, time.Second*70, time.Second, "%s", item) + + return nil + } + flow.New(t, "Connecting MongoDB And Verifying majority of the tests for a replica set here"). Step(dockercompose.Run("mongodb", dockerComposeClusterYAML)). - Step("Waiting for component to start...", flow.Sleep(10*time.Second)). + Step("Waiting for component to start...", flow.Sleep(20*time.Second)). Step(sidecar.Run(sidecarNamePrefix+"dockerClusterDefault", embedded.WithoutApp(), embedded.WithDaprGRPCPort(currentGrpcPort), @@ -101,6 +131,7 @@ func TestMongoDB(t *testing.T) { runtime.WithStates(stateRegistry))). Step("Waiting for component to load...", flow.Sleep(10*time.Second)). Step("Run basic test", basicTest). + Step("Run time to live test", timeToLiveTest). Step("Interrupt network", network.InterruptNetwork(5*time.Second, nil, nil, "27017:27017")). // Component should recover at this point. @@ -115,7 +146,7 @@ func TestMongoDB(t *testing.T) { flow.New(t, "Connecting MongoDB And Verifying majority of the tests for a replica set "+ "here with valid read, write concerns and operation timeout"). Step(dockercompose.Run("mongodb", dockerComposeClusterYAML)). - Step("Waiting for component to start...", flow.Sleep(10*time.Second)). + Step("Waiting for component to start...", flow.Sleep(20*time.Second)). Step(sidecar.Run(sidecarNamePrefix+"dockerClusterValidReadWriteConcernAndTimeout", embedded.WithoutApp(), embedded.WithDaprGRPCPort(currentGrpcPort), @@ -124,6 +155,7 @@ func TestMongoDB(t *testing.T) { runtime.WithStates(stateRegistry))). Step("Waiting for component to load...", flow.Sleep(10*time.Second)). Step("Run basic test", basicTest). + Step("Run time to live test", timeToLiveTest). Step("Interrupt network", network.InterruptNetwork(5*time.Second, nil, nil, "27017:27017")). // Component should recover at this point. @@ -138,7 +170,7 @@ func TestMongoDB(t *testing.T) { flow.New(t, "Connecting MongoDB And Verifying majority of the tests here for a single node with valid read, "+ "write concerns and operation timeout"). Step(dockercompose.Run("mongodb", dockerComposeSingleYAML)). - Step("Waiting for component to start...", flow.Sleep(10*time.Second)). + Step("Waiting for component to start...", flow.Sleep(20*time.Second)). Step(sidecar.Run(sidecarNamePrefix+"dockerSingleNode", embedded.WithoutApp(), embedded.WithDaprGRPCPort(currentGrpcPort), @@ -147,6 +179,7 @@ func TestMongoDB(t *testing.T) { runtime.WithStates(stateRegistry))). Step("Waiting for component to load...", flow.Sleep(10*time.Second)). Step("Run basic test", basicTest). + Step("Run time to live test", timeToLiveTest). Step("Interrupt network", network.InterruptNetwork(5*time.Second, nil, nil, "27017:27017")). // Component should recover at this point. diff --git a/tests/config/state/tests.yml b/tests/config/state/tests.yml index e178c9786..d839b206f 100644 --- a/tests/config/state/tests.yml +++ b/tests/config/state/tests.yml @@ -7,7 +7,7 @@ components: allOperations: false operations: [ "set", "get", "delete", "bulkset", "bulkdelete", "transaction", "etag", "first-write" ] - component: mongodb - operations: [ "set", "get", "delete", "bulkset", "bulkdelete", "transaction", "etag", "first-write", "query" ] + operations: [ "set", "get", "delete", "bulkset", "bulkdelete", "transaction", "etag", "first-write", "query", "ttl" ] - component: memcached allOperations: false operations: [ "set", "get", "delete", "bulkset", "bulkdelete", "ttl" ] @@ -44,6 +44,10 @@ components: - component: cloudflare.workerskv allOperations: false # Although this component supports TTLs, the minimum TTL is 60s, which makes it not suitable for our conformance tests + # TODO: perhaps create a special case `ttl60` operation for this component + # where the test would set a TTL of 60s for this particular operation. + # `ttl` and `ttl60` are mutually exclusive, and `allOperations` would + # exclude `ttl60` operations: [ "set", "get", "delete", "bulkset", "bulkdelete"] - component: cockroachdb allOperations: false @@ -59,4 +63,4 @@ components: operations: [ "set", "get", "delete", "etag", "bulkset", "bulkdelete", "first-write" ] - component: aws.dynamodb.terraform allOperations: false - operations: [ "set", "get", "delete", "etag", "bulkset", "bulkdelete", "first-write" ] \ No newline at end of file + operations: [ "set", "get", "delete", "etag", "bulkset", "bulkdelete", "first-write" ] diff --git a/tests/conformance/state/state.go b/tests/conformance/state/state.go index 166ba087e..6a83d9e8c 100644 --- a/tests/conformance/state/state.go +++ b/tests/conformance/state/state.go @@ -801,12 +801,13 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St assertEquals(t, "⏱️", res) // Wait for the object to expire and request again - time.Sleep(3 * time.Second) - res, err = statestore.Get(context.Background(), &state.GetRequest{ - Key: key + "-ttl", - }) - require.NoError(t, err) - assert.Nil(t, res.Data) + assert.Eventually(t, func() bool { + res, err = statestore.Get(context.Background(), &state.GetRequest{ + Key: key + "-ttl", + }) + require.NoError(t, err) + return res.Data == nil + }, time.Second*70, 200*time.Millisecond, "expected object to have been deleted in time") }) } } From 1a3e267d5dbd95932ef6ced43c425ccc2c9d7a63 Mon Sep 17 00:00:00 2001 From: joshvanl Date: Thu, 16 Feb 2023 18:18:23 +0000 Subject: [PATCH 2/8] Adds review comments. Filter on TTL when getting documents. Add test to ensure data is deleted. Signed-off-by: joshvanl --- state/mongodb/mongodb.go | 62 ++++++++++++---- .../state/mongodb/mongodb_test.go | 72 ++++++++++++------- 2 files changed, 95 insertions(+), 39 deletions(-) diff --git a/state/mongodb/mongodb.go b/state/mongodb/mongodb.go index 2ebae20e1..281d32170 100644 --- a/state/mongodb/mongodb.go +++ b/state/mongodb/mongodb.go @@ -53,6 +53,7 @@ const ( value = "value" etag = "_etag" daprttl = "_dapr_ttl" + daprttlDollar = "$" + daprttl defaultTimeout = 5 * time.Second defaultDatabaseName = "daprStore" @@ -154,10 +155,11 @@ func (m *MongoDB) Init(metadata state.Metadata) error { // values immediately when the TTL value is reached. // MongoDB TTL Indexes: https://docs.mongodb.com/manual/core/index-ttl/ // TTL fields are deleted at most 60 seconds after the TTL value is reached. - if _, err := m.collection.Indexes().CreateOne(context.Background(), mongo.IndexModel{ + _, err = m.collection.Indexes().CreateOne(context.TODO(), mongo.IndexModel{ Keys: bson.M{daprttl: 1}, Options: options.Index().SetExpireAfterSeconds(0), - }); err != nil { + }) + if err != nil { return fmt.Errorf("error in creating ttl index: %s", err) } @@ -203,7 +205,11 @@ func (m *MongoDB) setInternal(ctx context.Context, req *state.SetRequest) error if req.ETag != nil { filter[etag] = *req.ETag } else if req.Options.Concurrency == state.FirstWrite { - filter[etag] = uuid.NewString() + uuid, err := uuid.NewRandom() + if err != nil { + return err + } + filter[etag] = uuid.String() } reqTTL, err := stateutils.ParseTTL(req.Metadata) @@ -211,17 +217,33 @@ func (m *MongoDB) setInternal(ctx context.Context, req *state.SetRequest) error return fmt.Errorf("failed to parse TTL: %w", err) } - var ttl any - if reqTTL != nil { - ttl = primitive.NewDateTimeFromTime(time.Now().Add(time.Second * time.Duration(*reqTTL))) + etagV, err := uuid.NewRandom() + if err != nil { + return err + } + + update := mongo.Pipeline{ + {{"$set", bson.D{ + {id, req.Key}, + {value, v}, + {etag, etagV.String()}, + }}}, + } + + if reqTTL != nil { + update = append(update, primitive.D{{"$addFields", bson.D{ + {daprttl, bson.D{ + {"$dateAdd", + bson.D{{"startDate", "$$NOW"}, {"unit", "second"}, {"amount", *reqTTL}}, + }, + }}, + }}}) + } else { + update = append(update, primitive.D{{"$addFields", bson.D{ + {daprttl, nil}, + }}}) } - update := bson.D{{"$set", bson.D{ - {id, req.Key}, - {value, v}, - {etag, uuid.NewString()}, - {daprttl, ttl}, - }}} _, err = m.collection.UpdateOne(ctx, filter, update, options.Update().SetUpsert(true)) if err != nil { return fmt.Errorf("error in updating document: %s", err) @@ -232,9 +254,21 @@ func (m *MongoDB) setInternal(ctx context.Context, req *state.SetRequest) error // Get retrieves state from MongoDB with a key. func (m *MongoDB) Get(ctx context.Context, req *state.GetRequest) (*state.GetResponse, error) { + // Since MongoDB doesn't delete the document immediately when the TTL value + // is reached, we need to filter out the documents with TTL value less than + // the current time. + filter := bson.D{ + {"$and", bson.A{ + bson.D{{id, bson.M{"$eq": req.Key}}}, + bson.D{{"$expr", bson.D{ + bson.E{"$or", bson.A{ + bson.D{{"$eq", bson.A{daprttlDollar, primitive.Null{}}}}, + bson.D{{"$gte", bson.A{daprttlDollar, "$$NOW"}}}, + }}, + }}}, + }}, + } var result Item - - filter := bson.M{id: req.Key} err := m.collection.FindOne(ctx, filter).Decode(&result) if err != nil { if err == mongo.ErrNoDocuments { diff --git a/tests/certification/state/mongodb/mongodb_test.go b/tests/certification/state/mongodb/mongodb_test.go index 22416703a..7d89438c9 100644 --- a/tests/certification/state/mongodb/mongodb_test.go +++ b/tests/certification/state/mongodb/mongodb_test.go @@ -1,11 +1,15 @@ package mongodb_test import ( + "errors" "fmt" "testing" "time" "github.com/dapr/go-sdk/client" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" "github.com/dapr/components-contrib/state" stateMongoDB "github.com/dapr/components-contrib/state/mongodb" @@ -92,32 +96,50 @@ func TestMongoDB(t *testing.T) { } // Time-To-Live Test - timeToLiveTest := func(ctx flow.Context) error { - client, err := client.NewClientWithPort(fmt.Sprint(currentGrpcPort)) - require.NoError(t, err) - defer client.Close() + timeToLiveTest := func(sidecarname string) func(ctx flow.Context) error { + return func(ctx flow.Context) error { + client, err := client.NewClientWithPort(fmt.Sprint(currentGrpcPort)) + require.NoError(t, err) + defer client.Close() - assert.Error(t, client.SaveState(ctx, stateStoreName, certificationTestPrefix+"ttl1", []byte("mongodbCert"), map[string]string{ - "ttlInSeconds": "mock value", - })) - assert.NoError(t, client.SaveState(ctx, stateStoreName, certificationTestPrefix+"ttl2", []byte("mongodbCert2"), map[string]string{ - "ttlInSeconds": "-1", - })) - assert.NoError(t, client.SaveState(ctx, stateStoreName, certificationTestPrefix+"ttl3", []byte("mongodbCert3"), map[string]string{ - "ttlInSeconds": "1", - })) + assert.Error(t, client.SaveState(ctx, stateStoreName, certificationTestPrefix+"ttl1", []byte("mongodbCert"), map[string]string{ + "ttlInSeconds": "mock value", + })) + assert.NoError(t, client.SaveState(ctx, stateStoreName, certificationTestPrefix+"ttl2", []byte("mongodbCert2"), map[string]string{ + "ttlInSeconds": "-1", + })) + assert.NoError(t, client.SaveState(ctx, stateStoreName, certificationTestPrefix+"ttl3", []byte("mongodbCert3"), map[string]string{ + "ttlInSeconds": "3", + })) - // get state - item, err := client.GetState(ctx, stateStoreName, certificationTestPrefix+"ttl3", nil) - assert.NoError(t, err) - assert.Equal(t, "mongodbCert3", string(item.Value)) - assert.Eventually(t, func() bool { - item, err = client.GetState(ctx, stateStoreName, certificationTestPrefix+"ttl3", nil) + // Check we have the correct database ID for the TTL test. + cl, err := mongo.Connect(ctx, options.Client().ApplyURI("mongodb://localhost:27017/?directConnection=true")) + require.NoError(t, err) + resp := cl.Database("admin"). + Collection("daprCollection"). + FindOne(ctx, bson.M{"_id": sidecarname + "||stable-certification-ttl3"}) + require.NoError(t, resp.Err()) + + // get state + item, err := client.GetState(ctx, stateStoreName, certificationTestPrefix+"ttl3", nil) assert.NoError(t, err) - return len(item.Value) == 0 - }, time.Second*70, time.Second, "%s", item) + assert.Equal(t, "mongodbCert3", string(item.Value)) + assert.Eventually(t, func() bool { + item, err = client.GetState(ctx, stateStoreName, certificationTestPrefix+"ttl3", nil) + require.NoError(t, err) + return len(item.Value) == 0 + }, time.Second*7, time.Millisecond*500) - return nil + // MongoDB will delete a document after a maximum of 60 seconds. + assert.Eventually(t, func() bool { + resp := cl.Database("admin"). + Collection("daprCollection"). + FindOne(ctx, bson.M{"_id": sidecarname + "||stable-certification-ttl3"}) + return resp.Err() != nil && errors.Is(resp.Err(), mongo.ErrNoDocuments) + }, time.Second*60, time.Millisecond*500) + + return nil + } } flow.New(t, "Connecting MongoDB And Verifying majority of the tests for a replica set here"). @@ -131,7 +153,7 @@ func TestMongoDB(t *testing.T) { runtime.WithStates(stateRegistry))). Step("Waiting for component to load...", flow.Sleep(10*time.Second)). Step("Run basic test", basicTest). - Step("Run time to live test", timeToLiveTest). + Step("Run time to live test", timeToLiveTest(sidecarNamePrefix+"dockerClusterDefault")). Step("Interrupt network", network.InterruptNetwork(5*time.Second, nil, nil, "27017:27017")). // Component should recover at this point. @@ -155,7 +177,7 @@ func TestMongoDB(t *testing.T) { runtime.WithStates(stateRegistry))). Step("Waiting for component to load...", flow.Sleep(10*time.Second)). Step("Run basic test", basicTest). - Step("Run time to live test", timeToLiveTest). + Step("Run time to live test", timeToLiveTest(sidecarNamePrefix+"dockerClusterValidReadWriteConcernAndTimeout")). Step("Interrupt network", network.InterruptNetwork(5*time.Second, nil, nil, "27017:27017")). // Component should recover at this point. @@ -179,7 +201,7 @@ func TestMongoDB(t *testing.T) { runtime.WithStates(stateRegistry))). Step("Waiting for component to load...", flow.Sleep(10*time.Second)). Step("Run basic test", basicTest). - Step("Run time to live test", timeToLiveTest). + Step("Run time to live test", timeToLiveTest(sidecarNamePrefix+"dockerSingleNode")). Step("Interrupt network", network.InterruptNetwork(5*time.Second, nil, nil, "27017:27017")). // Component should recover at this point. From 9cef173fe6f68e4f47b1d71abb6e15fd53e69ace Mon Sep 17 00:00:00 2001 From: joshvanl Date: Fri, 17 Feb 2023 15:04:18 +0000 Subject: [PATCH 3/8] Fix go vet errors Signed-off-by: joshvanl --- state/mongodb/mongodb.go | 37 ++++++++++++++++++++----------------- 1 file changed, 20 insertions(+), 17 deletions(-) diff --git a/state/mongodb/mongodb.go b/state/mongodb/mongodb.go index 281d32170..fd68bf7e0 100644 --- a/state/mongodb/mongodb.go +++ b/state/mongodb/mongodb.go @@ -223,25 +223,28 @@ func (m *MongoDB) setInternal(ctx context.Context, req *state.SetRequest) error } update := mongo.Pipeline{ - {{"$set", bson.D{ - {id, req.Key}, - {value, v}, - {etag, etagV.String()}, + bson.D{{Key: "$set", Value: bson.D{ + {Key: id, Value: req.Key}, + {Key: value, Value: v}, + {Key: etag, Value: etagV.String()}, }}}, } if reqTTL != nil { - update = append(update, primitive.D{{"$addFields", bson.D{ - {daprttl, bson.D{ - {"$dateAdd", - bson.D{{"startDate", "$$NOW"}, {"unit", "second"}, {"amount", *reqTTL}}, + update = append(update, primitive.D{{Key: "$addFields", Value: bson.D{ + {Key: daprttl, Value: bson.D{ + {Key: "$dateAdd", Value: bson.D{ + {Key: "startDate", Value: "$$NOW"}, + {Key: "unit", Value: "second"}, + {Key: "amount", Value: *reqTTL}}, }, }}, }}}) } else { - update = append(update, primitive.D{{"$addFields", bson.D{ - {daprttl, nil}, - }}}) + update = append(update, primitive.D{ + {Key: "$addFields", Value: bson.D{ + {Key: daprttl, Value: nil}, + }}}) } _, err = m.collection.UpdateOne(ctx, filter, update, options.Update().SetUpsert(true)) @@ -258,12 +261,12 @@ func (m *MongoDB) Get(ctx context.Context, req *state.GetRequest) (*state.GetRes // is reached, we need to filter out the documents with TTL value less than // the current time. filter := bson.D{ - {"$and", bson.A{ - bson.D{{id, bson.M{"$eq": req.Key}}}, - bson.D{{"$expr", bson.D{ - bson.E{"$or", bson.A{ - bson.D{{"$eq", bson.A{daprttlDollar, primitive.Null{}}}}, - bson.D{{"$gte", bson.A{daprttlDollar, "$$NOW"}}}, + {Key: "$and", Value: bson.A{ + bson.D{{Key: id, Value: bson.M{"$eq": req.Key}}}, + bson.D{{Key: "$expr", Value: bson.D{ + {Key: "$or", Value: bson.A{ + bson.D{{Key: "$eq", Value: bson.A{daprttlDollar, primitive.Null{}}}}, + bson.D{{Key: "$gte", Value: bson.A{daprttlDollar, "$$NOW"}}}, }}, }}}, }}, From db56a285810c91d29cd9b7965b0df6d5eff045f7 Mon Sep 17 00:00:00 2001 From: joshvanl Date: Fri, 17 Feb 2023 17:44:53 +0000 Subject: [PATCH 4/8] Don't case extra allocations. Revert 70s state wait for ttl deletion Signed-off-by: joshvanl --- state/mongodb/mongodb.go | 21 ++++++++++----------- tests/conformance/state/state.go | 2 +- 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/state/mongodb/mongodb.go b/state/mongodb/mongodb.go index dfdcaf6f1..ac0a7f3ad 100644 --- a/state/mongodb/mongodb.go +++ b/state/mongodb/mongodb.go @@ -222,16 +222,15 @@ func (m *MongoDB) setInternal(ctx context.Context, req *state.SetRequest) error return err } - update := mongo.Pipeline{ - bson.D{{Key: "$set", Value: bson.D{ - {Key: id, Value: req.Key}, - {Key: value, Value: v}, - {Key: etag, Value: etagV.String()}, - }}}, - } + update := make(mongo.Pipeline, 2) + update[0] = bson.D{{Key: "$set", Value: bson.D{ + {Key: id, Value: req.Key}, + {Key: value, Value: v}, + {Key: etag, Value: etagV.String()}, + }}} if reqTTL != nil { - update = append(update, primitive.D{{Key: "$addFields", Value: bson.D{ + update[1] = primitive.D{{Key: "$addFields", Value: bson.D{ {Key: daprttl, Value: bson.D{ {Key: "$dateAdd", Value: bson.D{ {Key: "startDate", Value: "$$NOW"}, @@ -239,12 +238,12 @@ func (m *MongoDB) setInternal(ctx context.Context, req *state.SetRequest) error {Key: "amount", Value: *reqTTL}}, }, }}, - }}}) + }}} } else { - update = append(update, primitive.D{ + update[1] = primitive.D{ {Key: "$addFields", Value: bson.D{ {Key: daprttl, Value: nil}, - }}}) + }}} } _, err = m.collection.UpdateOne(ctx, filter, update, options.Update().SetUpsert(true)) diff --git a/tests/conformance/state/state.go b/tests/conformance/state/state.go index 9c2542661..c34866e6d 100644 --- a/tests/conformance/state/state.go +++ b/tests/conformance/state/state.go @@ -807,7 +807,7 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St }) require.NoError(t, err) return res.Data == nil - }, time.Second*70, 200*time.Millisecond, "expected object to have been deleted in time") + }, time.Second*3, 200*time.Millisecond, "expected object to have been deleted in time") }) } } From de038d508010f0f3b493544604fe1f9d12d600ca Mon Sep 17 00:00:00 2001 From: joshvanl Date: Mon, 20 Feb 2023 11:11:18 +0000 Subject: [PATCH 5/8] Fix gofumpt linting errors Signed-off-by: joshvanl --- state/mongodb/mongodb.go | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/state/mongodb/mongodb.go b/state/mongodb/mongodb.go index ac0a7f3ad..994125702 100644 --- a/state/mongodb/mongodb.go +++ b/state/mongodb/mongodb.go @@ -232,10 +232,12 @@ func (m *MongoDB) setInternal(ctx context.Context, req *state.SetRequest) error if reqTTL != nil { update[1] = primitive.D{{Key: "$addFields", Value: bson.D{ {Key: daprttl, Value: bson.D{ - {Key: "$dateAdd", Value: bson.D{ - {Key: "startDate", Value: "$$NOW"}, - {Key: "unit", Value: "second"}, - {Key: "amount", Value: *reqTTL}}, + { + Key: "$dateAdd", Value: bson.D{ + {Key: "startDate", Value: "$$NOW"}, + {Key: "unit", Value: "second"}, + {Key: "amount", Value: *reqTTL}, + }, }, }}, }}} @@ -243,7 +245,8 @@ func (m *MongoDB) setInternal(ctx context.Context, req *state.SetRequest) error update[1] = primitive.D{ {Key: "$addFields", Value: bson.D{ {Key: daprttl, Value: nil}, - }}} + }}, + } } _, err = m.collection.UpdateOne(ctx, filter, update, options.Update().SetUpsert(true)) From 77c209e71a1676030e3298eec3d96a5480fd6026 Mon Sep 17 00:00:00 2001 From: joshvanl Date: Tue, 21 Feb 2023 17:22:21 +0000 Subject: [PATCH 6/8] Update go.mod in tests/certification/status/mongodb Signed-off-by: joshvanl --- tests/certification/state/mongodb/go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/certification/state/mongodb/go.mod b/tests/certification/state/mongodb/go.mod index f67876c81..b6ed57906 100644 --- a/tests/certification/state/mongodb/go.mod +++ b/tests/certification/state/mongodb/go.mod @@ -9,6 +9,7 @@ require ( github.com/dapr/go-sdk v1.7.0 github.com/dapr/kit v0.0.4 github.com/stretchr/testify v1.8.1 + go.mongodb.org/mongo-driver v1.11.1 ) require ( @@ -103,7 +104,6 @@ require ( github.com/xdg-go/scram v1.1.2 // indirect github.com/xdg-go/stringprep v1.0.4 // indirect github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect - go.mongodb.org/mongo-driver v1.11.1 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/otel v1.11.2 // indirect go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.11.2 // indirect From 2b3374bd4b5e182e9e4694492364e99937df61d5 Mon Sep 17 00:00:00 2001 From: joshvanl Date: Tue, 21 Feb 2023 18:31:24 +0000 Subject: [PATCH 7/8] Remove `_dapr` prefix from mongodb state store ttl field name Signed-off-by: joshvanl --- state/mongodb/mongodb.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/state/mongodb/mongodb.go b/state/mongodb/mongodb.go index 994125702..216e50ebf 100644 --- a/state/mongodb/mongodb.go +++ b/state/mongodb/mongodb.go @@ -52,8 +52,8 @@ const ( id = "_id" value = "value" etag = "_etag" - daprttl = "_dapr_ttl" - daprttlDollar = "$" + daprttl + ttl = "_ttl" + ttlDollar = "$" + ttl defaultTimeout = 5 * time.Second defaultDatabaseName = "daprStore" @@ -156,7 +156,7 @@ func (m *MongoDB) Init(ctx context.Context, metadata state.Metadata) error { // MongoDB TTL Indexes: https://docs.mongodb.com/manual/core/index-ttl/ // TTL fields are deleted at most 60 seconds after the TTL value is reached. _, err = m.collection.Indexes().CreateOne(context.TODO(), mongo.IndexModel{ - Keys: bson.M{daprttl: 1}, + Keys: bson.M{ttl: 1}, Options: options.Index().SetExpireAfterSeconds(0), }) if err != nil { @@ -231,7 +231,7 @@ func (m *MongoDB) setInternal(ctx context.Context, req *state.SetRequest) error if reqTTL != nil { update[1] = primitive.D{{Key: "$addFields", Value: bson.D{ - {Key: daprttl, Value: bson.D{ + {Key: ttl, Value: bson.D{ { Key: "$dateAdd", Value: bson.D{ {Key: "startDate", Value: "$$NOW"}, @@ -244,7 +244,7 @@ func (m *MongoDB) setInternal(ctx context.Context, req *state.SetRequest) error } else { update[1] = primitive.D{ {Key: "$addFields", Value: bson.D{ - {Key: daprttl, Value: nil}, + {Key: ttl, Value: nil}, }}, } } @@ -267,8 +267,8 @@ func (m *MongoDB) Get(ctx context.Context, req *state.GetRequest) (*state.GetRes bson.D{{Key: id, Value: bson.M{"$eq": req.Key}}}, bson.D{{Key: "$expr", Value: bson.D{ {Key: "$or", Value: bson.A{ - bson.D{{Key: "$eq", Value: bson.A{daprttlDollar, primitive.Null{}}}}, - bson.D{{Key: "$gte", Value: bson.A{daprttlDollar, "$$NOW"}}}, + bson.D{{Key: "$eq", Value: bson.A{ttlDollar, primitive.Null{}}}}, + bson.D{{Key: "$gte", Value: bson.A{ttlDollar, "$$NOW"}}}, }}, }}}, }}, From 6e1efe2cc9b93ccd963b6f9e81ec9579258e6acd Mon Sep 17 00:00:00 2001 From: joshvanl Date: Wed, 22 Feb 2023 13:09:47 +0000 Subject: [PATCH 8/8] Use operator `$add` instead of `$addDate` to have compatibility with older mongodb versions. Signed-off-by: joshvanl --- state/mongodb/mongodb.go | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/state/mongodb/mongodb.go b/state/mongodb/mongodb.go index 216e50ebf..1cf679c8a 100644 --- a/state/mongodb/mongodb.go +++ b/state/mongodb/mongodb.go @@ -230,17 +230,20 @@ func (m *MongoDB) setInternal(ctx context.Context, req *state.SetRequest) error }}} if reqTTL != nil { - update[1] = primitive.D{{Key: "$addFields", Value: bson.D{ - {Key: ttl, Value: bson.D{ + update[1] = primitive.D{{ + Key: "$addFields", Value: bson.D{ { - Key: "$dateAdd", Value: bson.D{ - {Key: "startDate", Value: "$$NOW"}, - {Key: "unit", Value: "second"}, - {Key: "amount", Value: *reqTTL}, + Key: ttl, Value: bson.D{ + { + Key: "$add", Value: bson.A{ + // MongoDB stores time in milliseconds so multiply seconds by 1000. + "$$NOW", *reqTTL * 1000, + }, + }, }, }, - }}, - }}} + }, + }} } else { update[1] = primitive.D{ {Key: "$addFields", Value: bson.D{