From b387fcdff3ea75cc17d998fb5ca0bc01cecaf0e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EB=AC=B8=EC=B0=AC=EC=9A=A9?= Date: Wed, 26 May 2021 15:48:46 +0900 Subject: [PATCH] add Ping operation to`store` interface. (#757) * add `Ping` operation. * add `Ping` implementation of GA components * add context to an argument for Ping * add `Ping` implementation of blobstorage * fix error string for passing lint * Update blobstorage.go Co-authored-by: Artur Souza --- state/aerospike/aerospike.go | 4 ++++ state/aws/dynamodb/dynamodb.go | 4 ++++ state/azure/blobstorage/blobstorage.go | 10 ++++++++++ state/azure/cosmosdb/cosmosdb.go | 20 ++++++++++++++++++++ state/azure/tablestorage/tablestorage.go | 4 ++++ state/cassandra/cassandra.go | 4 ++++ state/cloudstate/cloudstate_crdt.go | 4 ++++ state/couchbase/couchbase.go | 4 ++++ state/gcp/firestore/firestore.go | 4 ++++ state/hashicorp/consul/consul.go | 9 ++++++--- state/hazelcast/hazelcast.go | 4 ++++ state/memcached/memcached.go | 4 ++++ state/mongodb/mongodb.go | 10 ++++++++++ state/mysql/mysql.go | 4 ++++ state/postgresql/postgresql.go | 4 ++++ state/redis/redis.go | 8 ++++++++ state/redis/redis_test.go | 18 ++++++++++++++++++ state/rethinkdb/rethinkdb.go | 4 ++++ state/sqlserver/sqlserver.go | 4 ++++ state/store.go | 1 + state/store_test.go | 8 ++++++++ state/zookeeper/zk.go | 4 ++++ tests/conformance/state/state.go | 5 +++++ 23 files changed, 142 insertions(+), 3 deletions(-) diff --git a/state/aerospike/aerospike.go b/state/aerospike/aerospike.go index 985f12e20..f08da531c 100644 --- a/state/aerospike/aerospike.go +++ b/state/aerospike/aerospike.go @@ -230,6 +230,10 @@ func (aspike *Aerospike) Delete(req *state.DeleteRequest) error { return nil } +func (aspike *Aerospike) Ping() error { + return nil +} + func parseHosts(hostsMeta string) ([]*as.Host, error) { hostPorts := []*as.Host{} for _, hostPort := range strings.Split(hostsMeta, ",") { diff --git a/state/aws/dynamodb/dynamodb.go b/state/aws/dynamodb/dynamodb.go index a0df7b0c7..9a80ca90b 100644 --- a/state/aws/dynamodb/dynamodb.go +++ b/state/aws/dynamodb/dynamodb.go @@ -51,6 +51,10 @@ func (d *StateStore) Init(metadata state.Metadata) error { return nil } +func (d *StateStore) Ping() error { + return nil +} + // Features returns the features available in this state store func (d *StateStore) Features() []state.Feature { return nil diff --git a/state/azure/blobstorage/blobstorage.go b/state/azure/blobstorage/blobstorage.go index ed2ec1f4d..286b41aa3 100644 --- a/state/azure/blobstorage/blobstorage.go +++ b/state/azure/blobstorage/blobstorage.go @@ -138,6 +138,16 @@ func (r *StateStore) Set(req *state.SetRequest) error { return r.writeFile(req) } +func (r *StateStore) Ping() error { + accessConditions := azblob.BlobAccessConditions{} + + if _, err := r.containerURL.GetProperties(context.Background(), accessConditions.LeaseAccessConditions); err != nil { + return fmt.Errorf("blob storage: error connecting to Blob storage at %s: %s", r.containerURL.URL().Host, err) + } + + return nil +} + // NewAzureBlobStorageStore instance func NewAzureBlobStorageStore(logger logger.Logger) *StateStore { s := &StateStore{ diff --git a/state/azure/cosmosdb/cosmosdb.go b/state/azure/cosmosdb/cosmosdb.go index 59f03876f..2d48682b1 100644 --- a/state/azure/cosmosdb/cosmosdb.go +++ b/state/azure/cosmosdb/cosmosdb.go @@ -28,6 +28,7 @@ type StateStore struct { collection *documentdb.Collection db *documentdb.Database sp *documentdb.Sproc + metadata metadata contentType string features []state.Feature @@ -139,6 +140,7 @@ func (c *StateStore) Init(meta state.Metadata) error { return fmt.Errorf("collection %s for CosmosDB state store not found. This must be created before Dapr uses it", m.Collection) } + c.metadata = m c.collection = &colls[0] c.client = client c.contentType = m.ContentType @@ -359,6 +361,24 @@ func (c *StateStore) Multi(request *state.TransactionalStateRequest) error { return nil } +func (c *StateStore) Ping() error { + m := c.metadata + + colls, err := c.client.QueryCollections(c.db.Self, &documentdb.Query{ + Query: "SELECT * FROM ROOT r WHERE r.id=@id", + Parameters: []documentdb.Parameter{ + {Name: "@id", Value: m.Collection}, + }, + }) + if err != nil { + return err + } else if len(colls) == 0 { + return fmt.Errorf("collection %s for CosmosDB state store not found", m.Collection) + } + + return nil +} + func createUpsertItem(contentType string, req state.SetRequest, partitionKey string) CosmosItem { byteArray, isBinary := req.Value.([]uint8) if isBinary { diff --git a/state/azure/tablestorage/tablestorage.go b/state/azure/tablestorage/tablestorage.go index 51ad632c2..d87a89986 100644 --- a/state/azure/tablestorage/tablestorage.go +++ b/state/azure/tablestorage/tablestorage.go @@ -236,6 +236,10 @@ func (r *StateStore) deleteRow(req *state.DeleteRequest) error { return entity.Delete(true, nil) } +func (r *StateStore) Ping() error { + return nil +} + func getPartitionAndRowKey(key string) (string, string) { pr := strings.Split(key, keyDelimiter) if len(pr) != 2 { diff --git a/state/cassandra/cassandra.go b/state/cassandra/cassandra.go index 1872820f0..224c8d271 100644 --- a/state/cassandra/cassandra.go +++ b/state/cassandra/cassandra.go @@ -289,6 +289,10 @@ func (c *Cassandra) Set(req *state.SetRequest) error { return session.Query("INSERT INTO ? (key, value) VALUES (?, ?)", c.table, req.Key, bt).Exec() } +func (c *Cassandra) Ping() error { + return nil +} + func (c *Cassandra) createSession(consistency gocql.Consistency) (*gocql.Session, error) { session, err := c.cluster.CreateSession() if err != nil { diff --git a/state/cloudstate/cloudstate_crdt.go b/state/cloudstate/cloudstate_crdt.go index 6f5d38393..946ee069c 100644 --- a/state/cloudstate/cloudstate_crdt.go +++ b/state/cloudstate/cloudstate_crdt.go @@ -265,6 +265,10 @@ func (c *CRDT) Init(metadata state.Metadata) error { return nil } +func (c *CRDT) Ping() error { + return nil +} + // Features returns the features available in this state store func (c *CRDT) Features() []state.Feature { return c.features diff --git a/state/couchbase/couchbase.go b/state/couchbase/couchbase.go index 72e0670b7..2cfc969fa 100644 --- a/state/couchbase/couchbase.go +++ b/state/couchbase/couchbase.go @@ -228,6 +228,10 @@ func (cbs *Couchbase) Delete(req *state.DeleteRequest) error { return nil } +func (cbs *Couchbase) Ping() error { + return nil +} + // converts string etag sent by the application into a gocb.Cas object, which can then be used for optimistic locking for set and delete operations func eTagToCas(eTag string) (gocb.Cas, error) { var cas gocb.Cas = 0 diff --git a/state/gcp/firestore/firestore.go b/state/gcp/firestore/firestore.go index a23fe2575..ac88b410a 100644 --- a/state/gcp/firestore/firestore.go +++ b/state/gcp/firestore/firestore.go @@ -136,6 +136,10 @@ func (f *Firestore) Set(req *state.SetRequest) error { return state.SetWithOptions(f.setValue, req) } +func (f *Firestore) Ping() error { + return nil +} + func (f *Firestore) deleteValue(req *state.DeleteRequest) error { ctx := context.Background() key := datastore.NameKey(f.entityKind, req.Key, nil) diff --git a/state/hashicorp/consul/consul.go b/state/hashicorp/consul/consul.go index b70b57d31..d5b5de331 100644 --- a/state/hashicorp/consul/consul.go +++ b/state/hashicorp/consul/consul.go @@ -10,11 +10,10 @@ import ( "fmt" "github.com/agrea/ptr" - "github.com/hashicorp/consul/api" - "github.com/pkg/errors" - "github.com/dapr/components-contrib/state" "github.com/dapr/kit/logger" + "github.com/hashicorp/consul/api" + "github.com/pkg/errors" ) // Consul is a state store implementation for HashiCorp Consul. @@ -138,6 +137,10 @@ func (c *Consul) Set(req *state.SetRequest) error { return nil } +func (c *Consul) Ping() error { + return nil +} + // Delete performes a Consul KV delete operation func (c *Consul) Delete(req *state.DeleteRequest) error { keyWithPath := fmt.Sprintf("%s/%s", c.keyPrefixPath, req.Key) diff --git a/state/hazelcast/hazelcast.go b/state/hazelcast/hazelcast.go index 5d38d0e0e..c3f1da28f 100644 --- a/state/hazelcast/hazelcast.go +++ b/state/hazelcast/hazelcast.go @@ -123,6 +123,10 @@ func (store *Hazelcast) Get(req *state.GetRequest) (*state.GetResponse, error) { }, nil } +func (store *Hazelcast) Ping() error { + return nil +} + // Delete performs a delete operation func (store *Hazelcast) Delete(req *state.DeleteRequest) error { err := state.CheckRequestOptions(req.Options) diff --git a/state/memcached/memcached.go b/state/memcached/memcached.go index 9f3c840d4..0cb5e9d03 100644 --- a/state/memcached/memcached.go +++ b/state/memcached/memcached.go @@ -141,3 +141,7 @@ func (m *Memcached) Get(req *state.GetRequest) (*state.GetResponse, error) { func (m *Memcached) Set(req *state.SetRequest) error { return state.SetWithOptions(m.setValue, req) } + +func (m *Memcached) Ping() error { + return nil +} diff --git a/state/mongodb/mongodb.go b/state/mongodb/mongodb.go index 016ddad50..3282fe220 100644 --- a/state/mongodb/mongodb.go +++ b/state/mongodb/mongodb.go @@ -58,6 +58,7 @@ type MongoDB struct { client *mongo.Client collection *mongo.Collection operationTimeout time.Duration + metadata mongoDBMetadata features []state.Feature logger logger.Logger @@ -125,6 +126,7 @@ func (m *MongoDB) Init(metadata state.Metadata) error { return fmt.Errorf("error in getting read concern object: %s", err) } + m.metadata = *meta opts := options.Collection().SetWriteConcern(wc).SetReadConcern(rc) collection := m.client.Database(meta.databaseName).Collection(meta.collectionName, opts) @@ -151,6 +153,14 @@ func (m *MongoDB) Set(req *state.SetRequest) error { return nil } +func (m *MongoDB) Ping() error { + if err := m.client.Ping(context.Background(), nil); err != nil { + return fmt.Errorf("mongoDB store: error connecting to mongoDB at %s: %s", m.metadata.host, err) + } + + return nil +} + func (m *MongoDB) setInternal(ctx context.Context, req *state.SetRequest) error { var vStr string b, ok := req.Value.([]byte) diff --git a/state/mysql/mysql.go b/state/mysql/mysql.go index aabab44ba..9d4b0939a 100644 --- a/state/mysql/mysql.go +++ b/state/mysql/mysql.go @@ -149,6 +149,10 @@ func (m *MySQL) Init(metadata state.Metadata) error { return m.finishInit(db, err) } +func (m *MySQL) Ping() error { + return nil +} + // Features returns the features available in this state store func (m *MySQL) Features() []state.Feature { return m.features diff --git a/state/postgresql/postgresql.go b/state/postgresql/postgresql.go index acf28575a..9cf3a4114 100644 --- a/state/postgresql/postgresql.go +++ b/state/postgresql/postgresql.go @@ -41,6 +41,10 @@ func (p *PostgreSQL) Init(metadata state.Metadata) error { return p.dbaccess.Init(metadata) } +func (p *PostgreSQL) Ping() error { + return nil +} + // Features returns the features available in this state store func (p *PostgreSQL) Features() []state.Feature { return p.features diff --git a/state/redis/redis.go b/state/redis/redis.go index 9632368c0..1cdfc5842 100644 --- a/state/redis/redis.go +++ b/state/redis/redis.go @@ -130,6 +130,14 @@ func parseRedisMetadata(meta state.Metadata) (metadata, error) { return m, nil } +func (r *StateStore) Ping() error { + if _, err := r.client.Ping(context.Background()).Result(); err != nil { + return fmt.Errorf("redis store: error connecting to redis at %s: %s", r.metadata.host, err) + } + + return nil +} + // Init does metadata and connection parsing func (r *StateStore) Init(metadata state.Metadata) error { m, err := parseRedisMetadata(metadata) diff --git a/state/redis/redis_test.go b/state/redis/redis_test.go index 614b85926..6649939a0 100644 --- a/state/redis/redis_test.go +++ b/state/redis/redis_test.go @@ -181,6 +181,24 @@ func TestTransactionalDelete(t *testing.T) { assert.Equal(t, 0, len(vals)) } +func TestPing(t *testing.T) { + s, c := setupMiniredis() + + ss := &StateStore{ + client: c, + json: jsoniter.ConfigFastest, + logger: logger.NewLogger("test"), + } + + err := ss.Ping() + assert.Nil(t, err) + + s.Close() + + err = ss.Ping() + assert.NotNil(t, err) +} + func TestTransactionalDeleteNoEtag(t *testing.T) { s, c := setupMiniredis() defer s.Close() diff --git a/state/rethinkdb/rethinkdb.go b/state/rethinkdb/rethinkdb.go index 809e7a187..a293ce69d 100644 --- a/state/rethinkdb/rethinkdb.go +++ b/state/rethinkdb/rethinkdb.go @@ -123,6 +123,10 @@ func (s *RethinkDB) Init(metadata state.Metadata) error { return nil } +func (s *RethinkDB) Ping() error { + return nil +} + // Features returns the features available in this state store func (s *RethinkDB) Features() []state.Feature { return s.features diff --git a/state/sqlserver/sqlserver.go b/state/sqlserver/sqlserver.go index 935918fb9..b62103e5c 100644 --- a/state/sqlserver/sqlserver.go +++ b/state/sqlserver/sqlserver.go @@ -253,6 +253,10 @@ func (s *SQLServer) Init(metadata state.Metadata) error { return nil } +func (s *SQLServer) Ping() error { + return nil +} + // Features returns the features available in this state store func (s *SQLServer) Features() []state.Feature { return s.features diff --git a/state/store.go b/state/store.go index ef51ac013..44cce7951 100644 --- a/state/store.go +++ b/state/store.go @@ -13,6 +13,7 @@ type Store interface { Delete(req *DeleteRequest) error Get(req *GetRequest) (*GetResponse, error) Set(req *SetRequest) error + Ping() error } // BulkStore is an interface to perform bulk operations on store diff --git a/state/store_test.go b/state/store_test.go index 9cc78863d..57b27bebe 100644 --- a/state/store_test.go +++ b/state/store_test.go @@ -115,6 +115,10 @@ func (s *Store1) Set(req *SetRequest) error { return nil } +func (s *Store1) Ping() error { + return nil +} + // example of store which supports bulk method type Store2 struct { // DefaultBulkStore @@ -150,6 +154,10 @@ func (s *Store2) Set(req *SetRequest) error { return nil } +func (s *Store2) Ping() error { + return nil +} + func (s *Store2) BulkGet(req []GetRequest) (bool, []BulkGetResponse, error) { if s.supportBulkGet { s.bulkCount++ diff --git a/state/zookeeper/zk.go b/state/zookeeper/zk.go index cdc8beb56..2dad51eee 100644 --- a/state/zookeeper/zk.go +++ b/state/zookeeper/zk.go @@ -295,6 +295,10 @@ func (s *StateStore) BulkSet(reqs []state.SetRequest) error { } } +func (s *StateStore) Ping() error { + return nil +} + func (s *StateStore) newCreateRequest(req *zk.SetDataRequest) *zk.CreateRequest { return &zk.CreateRequest{Path: req.Path, Data: req.Data} } diff --git a/tests/conformance/state/state.go b/tests/conformance/state/state.go index 28eeb5f38..60cbbb121 100644 --- a/tests/conformance/state/state.go +++ b/tests/conformance/state/state.go @@ -196,6 +196,11 @@ func ConformanceTests(t *testing.T, props map[string]string, statestore state.St assert.Nil(t, err) }) + t.Run("ping", func(t *testing.T) { + err := statestore.Ping() + assert.Nil(t, err) + }) + if config.HasOperation("set") { t.Run("set", func(t *testing.T) { for _, scenario := range scenarios {