diff --git a/go.mod b/go.mod index aad1c8495..9df100f6a 100644 --- a/go.mod +++ b/go.mod @@ -38,30 +38,37 @@ require ( github.com/hashicorp/consul/api v1.2.0 github.com/hashicorp/go-multierror v1.0.0 github.com/hazelcast/hazelcast-go-client v0.0.0-20190530123621-6cf767c2f31a + github.com/joomcode/errorx v1.0.1 // indirect github.com/joomcode/redispipe v0.9.0 github.com/json-iterator/go v1.1.8 github.com/kubernetes-client/go v0.0.0-20190625181339-cd8e39e789c7 github.com/nats-io/gnatsd v1.4.1 github.com/nats-io/go-nats v1.7.2 + github.com/nats-io/nats-streaming-server v0.17.0 // indirect github.com/nats-io/nats.go v1.9.1 github.com/nats-io/stan.go v0.6.0 github.com/openzipkin/zipkin-go v0.1.6 github.com/pkg/errors v0.8.1 github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da github.com/satori/go.uuid v1.2.0 + github.com/sergi/go-diff v1.1.0 // indirect github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271 github.com/stretchr/testify v1.4.0 + github.com/tidwall/pretty v1.0.1 // indirect + github.com/tmc/grpc-websocket-proxy v0.0.0-20200122045848-3419fae592fc // indirect github.com/valyala/fasthttp v1.6.0 go.etcd.io/etcd v3.3.17+incompatible go.mongodb.org/mongo-driver v1.1.2 go.opencensus.io v0.22.3 - golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550 + golang.org/x/crypto v0.0.0-20200206161412-a0c6ece9d31a golang.org/x/net v0.0.0-20200202094626-16171245cfb2 golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d google.golang.org/api v0.15.0 google.golang.org/genproto v0.0.0-20200122232147-0452cf42e150 google.golang.org/grpc v1.26.0 gopkg.in/couchbase/gocb.v1 v1.6.4 + gopkg.in/couchbase/gocbcore.v7 v7.1.16 // indirect + gopkg.in/couchbaselabs/gojcbmock.v1 v1.0.4 // indirect k8s.io/apimachinery v0.17.0 k8s.io/client-go v0.17.0 ) diff --git a/go.sum b/go.sum index 323bfdeff..a70654d87 100644 --- a/go.sum +++ b/go.sum @@ -242,6 +242,8 @@ github.com/go-redis/redis/v7 v7.0.1 h1:AVkqXtvak6eXAvqIA+0rDlh6St/M7/vaf67NEqPhP github.com/go-redis/redis/v7 v7.0.1/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg= github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZpNwpA= github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= +github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs= +github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/gocql/gocql v0.0.0-20191018090344-07ace3bab0f8 h1:ZyxBBeTImqFLu9mLtQUnXrO8K/SryXE/xjG/ygl0DxQ= @@ -256,6 +258,8 @@ github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zV github.com/gogo/protobuf v1.2.2-0.20190723190241-65acae22fc9d/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= github.com/gogo/protobuf v1.3.0 h1:G8O7TerXerS4F6sx9OV7/nRfJdnXgHZu/S/7F2SN+UE= github.com/gogo/protobuf v1.3.0/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= +github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls= +github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe h1:lXe2qZdvpiX5WZkZR4hgp4KJVfY3nMkvmwbVkpv1rVY= github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0= github.com/golang/gddo v0.0.0-20190815223733-287de01127ef/go.mod h1:xEhNfoBDX1hzLm2Nf80qUvZ2sVwoMZ8d6IE2SrsQfh4= @@ -407,6 +411,8 @@ github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0 github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= github.com/joomcode/errorx v1.0.0 h1:RJAKLTy1Sv2Tszhu14m5RZP4VGRlhXutG/XlL1En5VM= github.com/joomcode/errorx v1.0.0/go.mod h1:kgco15ekB6cs+4Xjzo7SPeXzx38PbJzBwbnu9qfVNHQ= +github.com/joomcode/errorx v1.0.1 h1:CalpDWz14ZHd68fIqluJasJosAewpz2TFaJALrUxjrk= +github.com/joomcode/errorx v1.0.1/go.mod h1:kgco15ekB6cs+4Xjzo7SPeXzx38PbJzBwbnu9qfVNHQ= github.com/joomcode/redispipe v0.9.0 h1:NukwwIvxhg6r2lVxa1RJhEZXYPZZF/OX9WZJk+2cK1Q= github.com/joomcode/redispipe v0.9.0/go.mod h1:4S/gpBCZ62pB/3+XLNWDH7jQnB0vxmpddAMBva2adpM= github.com/jpillora/backoff v0.0.0-20180909062703-3050d21c67d7 h1:K//n/AqR5HjG3qxbrBCL4vJPW0MVFSs9CPK1OOJdRME= @@ -456,6 +462,8 @@ github.com/kubernetes-client/go v0.0.0-20190928040339-c757968c4c36 h1:/VKCfQgtQx github.com/kubernetes-client/go v0.0.0-20190928040339-c757968c4c36/go.mod h1:ks4KCmmxdXksTSu2dlnUanEOqNd/dsoyS6/7bay2RQ8= github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0= github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/lib/pq v1.3.0 h1:/qkRGz8zljWiDcFvgpwUpwIAPu3r07TDvs3Rws+o/pU= +github.com/lib/pq v1.3.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de/go.mod h1:zAbeS9B/r2mtpb6U+EI2rYA5OAXxsYw6wTamcNW+zcE= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/mailru/easyjson v0.0.0-20160728113105-d5b7844b561a/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= @@ -513,8 +521,12 @@ github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL github.com/nats-io/nats-server/v2 v2.0.4/go.mod h1:AWdGEVbjKRS9ZIx4DSP5eKW48nfFm7q3uiSkP/1KD7M= github.com/nats-io/nats-server/v2 v2.1.2 h1:i2Ly0B+1+rzNZHHWtD4ZwKi+OU5l+uQo1iDHZ2PmiIc= github.com/nats-io/nats-server/v2 v2.1.2/go.mod h1:Afk+wRZqkMQs/p45uXdrVLuab3gwv3Z8C4HTBu8GD/k= +github.com/nats-io/nats-server/v2 v2.1.4 h1:BILRnsJ2Yb/fefiFbBWADpViGF69uh4sxe8poVDQ06g= +github.com/nats-io/nats-server/v2 v2.1.4/go.mod h1:Jw1Z28soD/QasIA2uWjXyM9El1jly3YwyFOuR8tH1rg= github.com/nats-io/nats-streaming-server v0.16.2 h1:RyTg8dZ+A8LaDEEmh9BoHFxWJSuSrIGJ4xjsr0fLMeY= github.com/nats-io/nats-streaming-server v0.16.2/go.mod h1:P12vTqmBpT6Ufs+cu0W1C4N2wmISqa6G4xdLQeO2e2s= +github.com/nats-io/nats-streaming-server v0.17.0 h1:eYhSmjRmRsCYNsoUshmZ+RgKbhq6B+7FvMHXo3M5yMs= +github.com/nats-io/nats-streaming-server v0.17.0/go.mod h1:ewPBEsmp62Znl3dcRsYtlcfwudxHEdYMtYqUQSt4fE0= github.com/nats-io/nats.go v1.8.1/go.mod h1:BrFz9vVn0fU3AcH9Vn4Kd7W0NpJ651tD5omQ3M8LwxM= github.com/nats-io/nats.go v1.9.1 h1:ik3HbLhZ0YABLto7iX80pZLPw/6dx3T+++MZJwLnMrQ= github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w= @@ -617,6 +629,8 @@ github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUt github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/sergi/go-diff v1.0.0 h1:Kpca3qRNrduNnOQeazBd0ysaKrUJiIuISHxogkT9RPQ= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= +github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0= +github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5IYyJwS/kOiWx8mHo= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sirupsen/logrus v1.1.1 h1:VzGj7lhU7KEB9e9gMpAV/v5XT2NVSvLJhJLCWbnkgXg= @@ -657,8 +671,12 @@ github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= +github.com/tidwall/pretty v1.0.1 h1:WE4RBSZ1x6McVVC8S/Md+Qse8YUv6HRObAx6ke00NY8= +github.com/tidwall/pretty v1.0.1/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 h1:LnC5Kc/wtumK+WB441p7ynQJzVuNRJiqddSIE3IlSEQ= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= +github.com/tmc/grpc-websocket-proxy v0.0.0-20200122045848-3419fae592fc h1:yUaosFVTJwnltaHbSNC3i82I92quFs+OFPRl8kNMVwo= +github.com/tmc/grpc-websocket-proxy v0.0.0-20200122045848-3419fae592fc/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= @@ -727,6 +745,8 @@ golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550 h1:ObdrDkeb4kJdCP557AjRjq69pTHfNouLtWZG7j9rPN8= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20200206161412-a0c6ece9d31a h1:aczoJ0HPNE92XKa7DrIzkNN6esOKO2TBwiiYoKcINhA= +golang.org/x/crypto v0.0.0-20200206161412-a0c6ece9d31a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= @@ -924,10 +944,14 @@ gopkg.in/couchbase/gocb.v1 v1.6.4 h1:vAworfH5ZKDbonmayrwbGiD9jkAMroWmHXDf1GAIqMM gopkg.in/couchbase/gocb.v1 v1.6.4/go.mod h1:Ri5Qok4ZKiwmPr75YxZ0uELQy45XJgUSzeUnK806gTY= gopkg.in/couchbase/gocbcore.v7 v7.1.15 h1:2nhfrqKz6TBex0Vcc+iq9UnAZltfCGklnM4mgdf2I3o= gopkg.in/couchbase/gocbcore.v7 v7.1.15/go.mod h1:48d2Be0MxRtsyuvn+mWzqmoGUG9uA00ghopzOs148/E= +gopkg.in/couchbase/gocbcore.v7 v7.1.16 h1:n2foZ0Lg2ooBhgzgIpIxQ7VmlUbL6x0GwrP0GQcmTYo= +gopkg.in/couchbase/gocbcore.v7 v7.1.16/go.mod h1:48d2Be0MxRtsyuvn+mWzqmoGUG9uA00ghopzOs148/E= gopkg.in/couchbaselabs/gocbconnstr.v1 v1.0.4 h1:VVVoIV/nSw1w9ZnTEOjmkeJVcAzaCyxEujKglarxz7U= gopkg.in/couchbaselabs/gocbconnstr.v1 v1.0.4/go.mod h1:ZjII0iKx4Veo6N6da+pEZu/ptNyKLg9QTVt7fFmR6sw= gopkg.in/couchbaselabs/gojcbmock.v1 v1.0.3 h1:7n5tht07rEuogME6fmi6FVglIAUFD8m4t8ZQbnUq0sA= gopkg.in/couchbaselabs/gojcbmock.v1 v1.0.3/go.mod h1:jl/gd/aQ2S8whKVSTnsPs6n7BPeaAuw9UglBD/OF7eo= +gopkg.in/couchbaselabs/gojcbmock.v1 v1.0.4 h1:r5WoWGyeTJQiNGsoWAsMJfz0JFF14xc2TJrYSs09VXk= +gopkg.in/couchbaselabs/gojcbmock.v1 v1.0.4/go.mod h1:jl/gd/aQ2S8whKVSTnsPs6n7BPeaAuw9UglBD/OF7eo= gopkg.in/couchbaselabs/jsonx.v1 v1.0.0 h1:SJGarb8dXAsVZWizC26rxBkBYEKhSUxVh5wAnyzBVaI= gopkg.in/couchbaselabs/jsonx.v1 v1.0.0/go.mod h1:oR201IRovxvLW/eISevH12/+MiKHtNQAKfcX8iWZvJY= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= diff --git a/pubsub/envelope.go b/pubsub/envelope.go index e6aa3f751..713989184 100644 --- a/pubsub/envelope.go +++ b/pubsub/envelope.go @@ -27,10 +27,11 @@ type CloudEventsEnvelope struct { SpecVersion string `json:"specversion"` DataContentType string `json:"datacontenttype"` Data interface{} `json:"data"` + Subject string `json:"subject"` } // NewCloudEventsEnvelope returns a new CloudEventsEnvelope -func NewCloudEventsEnvelope(id, source, eventType string, data []byte) *CloudEventsEnvelope { +func NewCloudEventsEnvelope(id, source, eventType, subject string, data []byte) *CloudEventsEnvelope { if eventType == "" { eventType = DefaultCloudEventType } @@ -52,5 +53,6 @@ func NewCloudEventsEnvelope(id, source, eventType string, data []byte) *CloudEve Data: i, SpecVersion: CloudEventsSpecVersion, DataContentType: contentType, + Subject: subject, } } diff --git a/pubsub/envelope_test.go b/pubsub/envelope_test.go index 6f0c2d0bf..6d1cf8ffd 100644 --- a/pubsub/envelope_test.go +++ b/pubsub/envelope_test.go @@ -12,39 +12,39 @@ import ( ) func TestCreateCloudEventsEnvelope(t *testing.T) { - envelope := NewCloudEventsEnvelope("a", "source", "eventType", nil) + envelope := NewCloudEventsEnvelope("a", "source", "eventType", "", nil) assert.NotNil(t, envelope) } func TestCreateCloudEventsEnvelopeDefaults(t *testing.T) { t.Run("default event type", func(t *testing.T) { - envelope := NewCloudEventsEnvelope("a", "source", "", nil) + envelope := NewCloudEventsEnvelope("a", "source", "", "", nil) assert.Equal(t, DefaultCloudEventType, envelope.Type) }) t.Run("non-default event type", func(t *testing.T) { - envelope := NewCloudEventsEnvelope("a", "source", "e1", nil) + envelope := NewCloudEventsEnvelope("a", "source", "e1", "", nil) assert.Equal(t, "e1", envelope.Type) }) t.Run("spec version", func(t *testing.T) { - envelope := NewCloudEventsEnvelope("a", "source", "", nil) + envelope := NewCloudEventsEnvelope("a", "source", "", "", nil) assert.Equal(t, CloudEventsSpecVersion, envelope.SpecVersion) }) t.Run("has data", func(t *testing.T) { - envelope := NewCloudEventsEnvelope("a", "source", "", []byte("data")) + envelope := NewCloudEventsEnvelope("a", "source", "", "", []byte("data")) assert.Equal(t, "data", envelope.Data.(string)) }) t.Run("string data content type", func(t *testing.T) { - envelope := NewCloudEventsEnvelope("a", "source", "", []byte("data")) + envelope := NewCloudEventsEnvelope("a", "source", "", "", []byte("data")) assert.Equal(t, "text/plain", envelope.DataContentType) }) t.Run("json data content type", func(t *testing.T) { str := `{ "data": "1" }` - envelope := NewCloudEventsEnvelope("a", "source", "", []byte(str)) + envelope := NewCloudEventsEnvelope("a", "source", "", "", []byte(str)) assert.Equal(t, "application/json", envelope.DataContentType) }) } diff --git a/secretstores/Readme.md b/secretstores/Readme.md index 356b0ead8..3eb62da88 100644 --- a/secretstores/Readme.md +++ b/secretstores/Readme.md @@ -5,6 +5,7 @@ Secret Stores provide a common way to interact with different secret stores, clo Currently supported secret stores are: * Kubernetes +* Hashicorp Vault * Azure KeyVault * AWS Secret manager * GCP Cloud KMS diff --git a/state/Readme.md b/state/Readme.md index 9a1027b63..91c74dad2 100644 --- a/state/Readme.md +++ b/state/Readme.md @@ -4,6 +4,7 @@ State Stores provide a common way to interact with different data store implemen Currently supported state stores are: +* AWS DynamoDB * Azure CosmosDB * Azure Table Storage * Cassandra @@ -18,6 +19,8 @@ Currently supported state stores are: * Redis * SQL Server * Zookeeper +* Cloud Firestore (Datastore mode) +* Couchbase ## Implementing a new State Store diff --git a/state/aws/dynamodb/dynamodb.go b/state/aws/dynamodb/dynamodb.go new file mode 100644 index 000000000..246d462ae --- /dev/null +++ b/state/aws/dynamodb/dynamodb.go @@ -0,0 +1,169 @@ +package dynamodb + +import ( + "encoding/json" + "fmt" + + "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute" + "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/dynamodb" + "github.com/dapr/components-contrib/state" +) + +// StateStore is a DynamoDB state store +type StateStore struct { + client dynamodbiface.DynamoDBAPI + table string +} + +type dynamoDBMetadata struct { + Region string `json:"region"` + AccessKey string `json:"accessKey"` + SecretKey string `json:"secretKey"` + SessionToken string `json:"sessionToken"` + Table string `json:"table"` +} + +// NewDynamoDBStateStore returns a new dynamoDB state store +func NewDynamoDBStateStore() *StateStore { + return &StateStore{} +} + +// Init does metadata and connection parsing +func (d *StateStore) Init(metadata state.Metadata) error { + meta, err := d.getDynamoDBMetadata(metadata) + if err != nil { + return err + } + + client, err := d.getClient(meta) + if err != nil { + return err + } + + d.client = client + d.table = meta.Table + return nil +} + +// Get retrieves a dynamoDB item +func (d *StateStore) Get(req *state.GetRequest) (*state.GetResponse, error) { + input := &dynamodb.GetItemInput{ + ConsistentRead: aws.Bool(req.Options.Consistency == state.Strong), + TableName: aws.String(d.table), + Key: map[string]*dynamodb.AttributeValue{ + "key": { + S: aws.String(req.Key), + }, + }, + } + + result, err := d.client.GetItem(input) + if err != nil { + return nil, err + } + + if len(result.Item) == 0 { + return &state.GetResponse{}, nil + } + + var output string + if err = dynamodbattribute.Unmarshal(result.Item["value"], &output); err != nil { + return nil, err + } + return &state.GetResponse{ + Data: []byte(output), + }, nil +} + +// Set saves a dynamoDB item +func (d *StateStore) Set(req *state.SetRequest) error { + item := map[string]*dynamodb.AttributeValue{ + "key": { + S: aws.String(req.Key), + }, + "value": { + S: aws.String(fmt.Sprintf("%v", req.Value)), + }, + } + + input := &dynamodb.PutItemInput{ + Item: item, + TableName: &d.table, + } + + _, e := d.client.PutItem(input) + return e +} + +// BulkSet performs a bulk set operation +func (d *StateStore) BulkSet(req []state.SetRequest) error { + for _, r := range req { + err := d.Set(&r) + if err != nil { + return err + } + } + return nil +} + +// Delete performs a delete operation +func (d *StateStore) Delete(req *state.DeleteRequest) error { + input := &dynamodb.DeleteItemInput{ + Key: map[string]*dynamodb.AttributeValue{ + "key": { + S: aws.String(req.Key), + }, + }, + TableName: aws.String(d.table), + } + _, err := d.client.DeleteItem(input) + return err +} + +// BulkDelete performs a bulk delete operation +func (d *StateStore) BulkDelete(req []state.DeleteRequest) error { + for _, r := range req { + err := d.Delete(&r) + if err != nil { + return err + } + } + return nil +} + +func (d *StateStore) getDynamoDBMetadata(metadata state.Metadata) (*dynamoDBMetadata, error) { + b, err := json.Marshal(metadata.Properties) + if err != nil { + return nil, err + } + + var meta dynamoDBMetadata + err = json.Unmarshal(b, &meta) + if err != nil { + return nil, err + } + + if meta.SecretKey == "" || meta.AccessKey == "" || meta.Region == "" || meta.SessionToken == "" { + return nil, fmt.Errorf("missing aws credentials in metadata") + } + + return &meta, nil +} + +func (d *StateStore) getClient(meta *dynamoDBMetadata) (*dynamodb.DynamoDB, error) { + sess, err := session.NewSession(&aws.Config{ + Region: aws.String(meta.Region), + Credentials: credentials.NewStaticCredentials(meta.AccessKey, meta.SecretKey, meta.SessionToken), + }) + if err != nil { + return nil, err + } + + c := dynamodb.New(sess) + return c, nil +} diff --git a/state/aws/dynamodb/dynamodb_test.go b/state/aws/dynamodb/dynamodb_test.go new file mode 100644 index 000000000..0982bc740 --- /dev/null +++ b/state/aws/dynamodb/dynamodb_test.go @@ -0,0 +1,248 @@ +// ------------------------------------------------------------ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. +// ------------------------------------------------------------ +package dynamodb + +import ( + "fmt" + "testing" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/service/dynamodb" + "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface" + "github.com/dapr/components-contrib/state" + + "github.com/stretchr/testify/assert" +) + +type mockedDynamoDB struct { + GetItemFn func(input *dynamodb.GetItemInput) (*dynamodb.GetItemOutput, error) + PutItemFn func(input *dynamodb.PutItemInput) (*dynamodb.PutItemOutput, error) + DeleteItemFn func(input *dynamodb.DeleteItemInput) (*dynamodb.DeleteItemOutput, error) + dynamodbiface.DynamoDBAPI +} + +func (m *mockedDynamoDB) GetItem(input *dynamodb.GetItemInput) (*dynamodb.GetItemOutput, error) { + return m.GetItemFn(input) +} + +func (m *mockedDynamoDB) PutItem(input *dynamodb.PutItemInput) (*dynamodb.PutItemOutput, error) { + return m.PutItemFn(input) +} + +func (m *mockedDynamoDB) DeleteItem(input *dynamodb.DeleteItemInput) (*dynamodb.DeleteItemOutput, error) { + return m.DeleteItemFn(input) +} + +func TestInit(t *testing.T) { + m := state.Metadata{} + s := NewDynamoDBStateStore() + t.Run("Init with valid metadata", func(t *testing.T) { + m.Properties = map[string]string{ + "AccessKey": "a", + "Region": "a", + "SecretKey": "a", + "SessionToken": "a", + } + err := s.Init(m) + assert.Nil(t, err) + }) + + t.Run("Init with missing metadata", func(t *testing.T) { + m.Properties = map[string]string{ + "Dummy": "a", + } + err := s.Init(m) + assert.NotNil(t, err) + assert.Equal(t, err, fmt.Errorf("missing aws credentials in metadata")) + }) +} + +func TestGet(t *testing.T) { + t.Run("Successfully retrieve item", func(t *testing.T) { + ss := StateStore{ + client: &mockedDynamoDB{ + GetItemFn: func(input *dynamodb.GetItemInput) (output *dynamodb.GetItemOutput, err error) { + return &dynamodb.GetItemOutput{ + Item: map[string]*dynamodb.AttributeValue{ + "key": { + S: aws.String("key"), + }, + "value": { + S: aws.String("value"), + }, + }, + }, nil + }, + }, + } + req := &state.GetRequest{ + Key: "key", + Metadata: nil, + Options: state.GetStateOption{ + Consistency: "strong", + }, + } + out, err := ss.Get(req) + assert.Nil(t, err) + assert.Equal(t, []byte("value"), out.Data) + }) + t.Run("Unsuccessfully get item", func(t *testing.T) { + ss := StateStore{ + client: &mockedDynamoDB{ + GetItemFn: func(input *dynamodb.GetItemInput) (output *dynamodb.GetItemOutput, err error) { + return nil, fmt.Errorf("failed to retrieve data") + }, + }, + } + req := &state.GetRequest{ + Key: "key", + Metadata: nil, + Options: state.GetStateOption{ + Consistency: "strong", + }, + } + out, err := ss.Get(req) + assert.NotNil(t, err) + assert.Nil(t, out) + }) + t.Run("Unsuccessfully with empty response", func(t *testing.T) { + ss := StateStore{ + client: &mockedDynamoDB{ + GetItemFn: func(input *dynamodb.GetItemInput) (output *dynamodb.GetItemOutput, err error) { + return &dynamodb.GetItemOutput{ + Item: map[string]*dynamodb.AttributeValue{}, + }, nil + }, + }, + } + req := &state.GetRequest{ + Key: "key", + Metadata: nil, + Options: state.GetStateOption{ + Consistency: "strong", + }, + } + out, err := ss.Get(req) + assert.Nil(t, err) + assert.Nil(t, out.Data) + }) + t.Run("Unsuccessfully with no required key", func(t *testing.T) { + ss := StateStore{ + client: &mockedDynamoDB{ + GetItemFn: func(input *dynamodb.GetItemInput) (output *dynamodb.GetItemOutput, err error) { + return &dynamodb.GetItemOutput{ + Item: map[string]*dynamodb.AttributeValue{ + "value2": { + S: aws.String("value"), + }, + }, + }, nil + }, + }, + } + req := &state.GetRequest{ + Key: "key", + Metadata: nil, + Options: state.GetStateOption{ + Consistency: "strong", + }, + } + out, err := ss.Get(req) + assert.Nil(t, err) + assert.Empty(t, out.Data) + }) +} + +func TestSet(t *testing.T) { + type value struct { + Value string + } + + t.Run("Successfully set item", func(t *testing.T) { + ss := StateStore{ + client: &mockedDynamoDB{ + PutItemFn: func(input *dynamodb.PutItemInput) (output *dynamodb.PutItemOutput, err error) { + assert.Equal(t, map[string]*dynamodb.AttributeValue{ + "key": { + S: aws.String("key"), + }, + "value": { + S: aws.String("{value}"), + }}, input.Item) + return &dynamodb.PutItemOutput{ + Attributes: map[string]*dynamodb.AttributeValue{ + "key": { + S: aws.String("value"), + }, + }, + }, nil + }, + }, + } + req := &state.SetRequest{ + Key: "key", + Value: value{ + Value: "value", + }, + } + err := ss.Set(req) + assert.Nil(t, err) + }) + t.Run("Un-successfully set item", func(t *testing.T) { + ss := StateStore{ + client: &mockedDynamoDB{ + PutItemFn: func(input *dynamodb.PutItemInput) (output *dynamodb.PutItemOutput, err error) { + return nil, fmt.Errorf("unable to put item") + }, + }, + } + req := &state.SetRequest{ + Key: "key", + Value: value{ + Value: "value", + }, + } + err := ss.Set(req) + assert.NotNil(t, err) + }) +} + +func TestDelete(t *testing.T) { + t.Run("Successfully delete item", func(t *testing.T) { + req := &state.DeleteRequest{ + Key: "key", + } + + ss := StateStore{ + client: &mockedDynamoDB{ + DeleteItemFn: func(input *dynamodb.DeleteItemInput) (output *dynamodb.DeleteItemOutput, err error) { + assert.Equal(t, map[string]*dynamodb.AttributeValue{ + "key": { + S: aws.String(req.Key), + }, + }, input.Key) + return nil, nil + }, + }, + } + err := ss.Delete(req) + assert.Nil(t, err) + }) + + t.Run("Un-successfully delete item", func(t *testing.T) { + ss := StateStore{ + client: &mockedDynamoDB{ + DeleteItemFn: func(input *dynamodb.DeleteItemInput) (output *dynamodb.DeleteItemOutput, err error) { + return nil, fmt.Errorf("unable to delete item") + }, + }, + } + req := &state.DeleteRequest{ + Key: "key", + } + err := ss.Delete(req) + assert.NotNil(t, err) + }) +}