Merge branch 'master' into aacrawfi_docsupdate

This commit is contained in:
Aaron Crawfis 2020-03-09 09:58:02 -07:00 committed by GitHub
commit c5f946ff55
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 463 additions and 9 deletions

9
go.mod
View File

@ -38,30 +38,37 @@ require (
github.com/hashicorp/consul/api v1.2.0
github.com/hashicorp/go-multierror v1.0.0
github.com/hazelcast/hazelcast-go-client v0.0.0-20190530123621-6cf767c2f31a
github.com/joomcode/errorx v1.0.1 // indirect
github.com/joomcode/redispipe v0.9.0
github.com/json-iterator/go v1.1.8
github.com/kubernetes-client/go v0.0.0-20190625181339-cd8e39e789c7
github.com/nats-io/gnatsd v1.4.1
github.com/nats-io/go-nats v1.7.2
github.com/nats-io/nats-streaming-server v0.17.0 // indirect
github.com/nats-io/nats.go v1.9.1
github.com/nats-io/stan.go v0.6.0
github.com/openzipkin/zipkin-go v0.1.6
github.com/pkg/errors v0.8.1
github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da
github.com/satori/go.uuid v1.2.0
github.com/sergi/go-diff v1.1.0 // indirect
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271
github.com/stretchr/testify v1.4.0
github.com/tidwall/pretty v1.0.1 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20200122045848-3419fae592fc // indirect
github.com/valyala/fasthttp v1.6.0
go.etcd.io/etcd v3.3.17+incompatible
go.mongodb.org/mongo-driver v1.1.2
go.opencensus.io v0.22.3
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550
golang.org/x/crypto v0.0.0-20200206161412-a0c6ece9d31a
golang.org/x/net v0.0.0-20200202094626-16171245cfb2
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
google.golang.org/api v0.15.0
google.golang.org/genproto v0.0.0-20200122232147-0452cf42e150
google.golang.org/grpc v1.26.0
gopkg.in/couchbase/gocb.v1 v1.6.4
gopkg.in/couchbase/gocbcore.v7 v7.1.16 // indirect
gopkg.in/couchbaselabs/gojcbmock.v1 v1.0.4 // indirect
k8s.io/apimachinery v0.17.0
k8s.io/client-go v0.17.0
)

24
go.sum
View File

@ -242,6 +242,8 @@ github.com/go-redis/redis/v7 v7.0.1 h1:AVkqXtvak6eXAvqIA+0rDlh6St/M7/vaf67NEqPhP
github.com/go-redis/redis/v7 v7.0.1/go.mod h1:JDNMw23GTyLNC4GZu9njt15ctBQVn7xjRfnwdHj/Dcg=
github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZpNwpA=
github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs=
github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gocql/gocql v0.0.0-20191018090344-07ace3bab0f8 h1:ZyxBBeTImqFLu9mLtQUnXrO8K/SryXE/xjG/ygl0DxQ=
@ -256,6 +258,8 @@ github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zV
github.com/gogo/protobuf v1.2.2-0.20190723190241-65acae22fc9d/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
github.com/gogo/protobuf v1.3.0 h1:G8O7TerXerS4F6sx9OV7/nRfJdnXgHZu/S/7F2SN+UE=
github.com/gogo/protobuf v1.3.0/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls=
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe h1:lXe2qZdvpiX5WZkZR4hgp4KJVfY3nMkvmwbVkpv1rVY=
github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe/go.mod h1:8vg3r2VgvsThLBIFL93Qb5yWzgyZWhEmBwUJWevAkK0=
github.com/golang/gddo v0.0.0-20190815223733-287de01127ef/go.mod h1:xEhNfoBDX1hzLm2Nf80qUvZ2sVwoMZ8d6IE2SrsQfh4=
@ -407,6 +411,8 @@ github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/joomcode/errorx v1.0.0 h1:RJAKLTy1Sv2Tszhu14m5RZP4VGRlhXutG/XlL1En5VM=
github.com/joomcode/errorx v1.0.0/go.mod h1:kgco15ekB6cs+4Xjzo7SPeXzx38PbJzBwbnu9qfVNHQ=
github.com/joomcode/errorx v1.0.1 h1:CalpDWz14ZHd68fIqluJasJosAewpz2TFaJALrUxjrk=
github.com/joomcode/errorx v1.0.1/go.mod h1:kgco15ekB6cs+4Xjzo7SPeXzx38PbJzBwbnu9qfVNHQ=
github.com/joomcode/redispipe v0.9.0 h1:NukwwIvxhg6r2lVxa1RJhEZXYPZZF/OX9WZJk+2cK1Q=
github.com/joomcode/redispipe v0.9.0/go.mod h1:4S/gpBCZ62pB/3+XLNWDH7jQnB0vxmpddAMBva2adpM=
github.com/jpillora/backoff v0.0.0-20180909062703-3050d21c67d7 h1:K//n/AqR5HjG3qxbrBCL4vJPW0MVFSs9CPK1OOJdRME=
@ -456,6 +462,8 @@ github.com/kubernetes-client/go v0.0.0-20190928040339-c757968c4c36 h1:/VKCfQgtQx
github.com/kubernetes-client/go v0.0.0-20190928040339-c757968c4c36/go.mod h1:ks4KCmmxdXksTSu2dlnUanEOqNd/dsoyS6/7bay2RQ8=
github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0=
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.3.0 h1:/qkRGz8zljWiDcFvgpwUpwIAPu3r07TDvs3Rws+o/pU=
github.com/lib/pq v1.3.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de/go.mod h1:zAbeS9B/r2mtpb6U+EI2rYA5OAXxsYw6wTamcNW+zcE=
github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/mailru/easyjson v0.0.0-20160728113105-d5b7844b561a/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
@ -513,8 +521,12 @@ github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL
github.com/nats-io/nats-server/v2 v2.0.4/go.mod h1:AWdGEVbjKRS9ZIx4DSP5eKW48nfFm7q3uiSkP/1KD7M=
github.com/nats-io/nats-server/v2 v2.1.2 h1:i2Ly0B+1+rzNZHHWtD4ZwKi+OU5l+uQo1iDHZ2PmiIc=
github.com/nats-io/nats-server/v2 v2.1.2/go.mod h1:Afk+wRZqkMQs/p45uXdrVLuab3gwv3Z8C4HTBu8GD/k=
github.com/nats-io/nats-server/v2 v2.1.4 h1:BILRnsJ2Yb/fefiFbBWADpViGF69uh4sxe8poVDQ06g=
github.com/nats-io/nats-server/v2 v2.1.4/go.mod h1:Jw1Z28soD/QasIA2uWjXyM9El1jly3YwyFOuR8tH1rg=
github.com/nats-io/nats-streaming-server v0.16.2 h1:RyTg8dZ+A8LaDEEmh9BoHFxWJSuSrIGJ4xjsr0fLMeY=
github.com/nats-io/nats-streaming-server v0.16.2/go.mod h1:P12vTqmBpT6Ufs+cu0W1C4N2wmISqa6G4xdLQeO2e2s=
github.com/nats-io/nats-streaming-server v0.17.0 h1:eYhSmjRmRsCYNsoUshmZ+RgKbhq6B+7FvMHXo3M5yMs=
github.com/nats-io/nats-streaming-server v0.17.0/go.mod h1:ewPBEsmp62Znl3dcRsYtlcfwudxHEdYMtYqUQSt4fE0=
github.com/nats-io/nats.go v1.8.1/go.mod h1:BrFz9vVn0fU3AcH9Vn4Kd7W0NpJ651tD5omQ3M8LwxM=
github.com/nats-io/nats.go v1.9.1 h1:ik3HbLhZ0YABLto7iX80pZLPw/6dx3T+++MZJwLnMrQ=
github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w=
@ -617,6 +629,8 @@ github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUt
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/sergi/go-diff v1.0.0 h1:Kpca3qRNrduNnOQeazBd0ysaKrUJiIuISHxogkT9RPQ=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0=
github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5IYyJwS/kOiWx8mHo=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.1.1 h1:VzGj7lhU7KEB9e9gMpAV/v5XT2NVSvLJhJLCWbnkgXg=
@ -657,8 +671,12 @@ github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJy
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tidwall/pretty v1.0.1 h1:WE4RBSZ1x6McVVC8S/Md+Qse8YUv6HRObAx6ke00NY8=
github.com/tidwall/pretty v1.0.1/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5 h1:LnC5Kc/wtumK+WB441p7ynQJzVuNRJiqddSIE3IlSEQ=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tmc/grpc-websocket-proxy v0.0.0-20200122045848-3419fae592fc h1:yUaosFVTJwnltaHbSNC3i82I92quFs+OFPRl8kNMVwo=
github.com/tmc/grpc-websocket-proxy v0.0.0-20200122045848-3419fae592fc/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
@ -727,6 +745,8 @@ golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550 h1:ObdrDkeb4kJdCP557AjRjq69pTHfNouLtWZG7j9rPN8=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200206161412-a0c6ece9d31a h1:aczoJ0HPNE92XKa7DrIzkNN6esOKO2TBwiiYoKcINhA=
golang.org/x/crypto v0.0.0-20200206161412-a0c6ece9d31a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
@ -924,10 +944,14 @@ gopkg.in/couchbase/gocb.v1 v1.6.4 h1:vAworfH5ZKDbonmayrwbGiD9jkAMroWmHXDf1GAIqMM
gopkg.in/couchbase/gocb.v1 v1.6.4/go.mod h1:Ri5Qok4ZKiwmPr75YxZ0uELQy45XJgUSzeUnK806gTY=
gopkg.in/couchbase/gocbcore.v7 v7.1.15 h1:2nhfrqKz6TBex0Vcc+iq9UnAZltfCGklnM4mgdf2I3o=
gopkg.in/couchbase/gocbcore.v7 v7.1.15/go.mod h1:48d2Be0MxRtsyuvn+mWzqmoGUG9uA00ghopzOs148/E=
gopkg.in/couchbase/gocbcore.v7 v7.1.16 h1:n2foZ0Lg2ooBhgzgIpIxQ7VmlUbL6x0GwrP0GQcmTYo=
gopkg.in/couchbase/gocbcore.v7 v7.1.16/go.mod h1:48d2Be0MxRtsyuvn+mWzqmoGUG9uA00ghopzOs148/E=
gopkg.in/couchbaselabs/gocbconnstr.v1 v1.0.4 h1:VVVoIV/nSw1w9ZnTEOjmkeJVcAzaCyxEujKglarxz7U=
gopkg.in/couchbaselabs/gocbconnstr.v1 v1.0.4/go.mod h1:ZjII0iKx4Veo6N6da+pEZu/ptNyKLg9QTVt7fFmR6sw=
gopkg.in/couchbaselabs/gojcbmock.v1 v1.0.3 h1:7n5tht07rEuogME6fmi6FVglIAUFD8m4t8ZQbnUq0sA=
gopkg.in/couchbaselabs/gojcbmock.v1 v1.0.3/go.mod h1:jl/gd/aQ2S8whKVSTnsPs6n7BPeaAuw9UglBD/OF7eo=
gopkg.in/couchbaselabs/gojcbmock.v1 v1.0.4 h1:r5WoWGyeTJQiNGsoWAsMJfz0JFF14xc2TJrYSs09VXk=
gopkg.in/couchbaselabs/gojcbmock.v1 v1.0.4/go.mod h1:jl/gd/aQ2S8whKVSTnsPs6n7BPeaAuw9UglBD/OF7eo=
gopkg.in/couchbaselabs/jsonx.v1 v1.0.0 h1:SJGarb8dXAsVZWizC26rxBkBYEKhSUxVh5wAnyzBVaI=
gopkg.in/couchbaselabs/jsonx.v1 v1.0.0/go.mod h1:oR201IRovxvLW/eISevH12/+MiKHtNQAKfcX8iWZvJY=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=

View File

@ -27,10 +27,11 @@ type CloudEventsEnvelope struct {
SpecVersion string `json:"specversion"`
DataContentType string `json:"datacontenttype"`
Data interface{} `json:"data"`
Subject string `json:"subject"`
}
// NewCloudEventsEnvelope returns a new CloudEventsEnvelope
func NewCloudEventsEnvelope(id, source, eventType string, data []byte) *CloudEventsEnvelope {
func NewCloudEventsEnvelope(id, source, eventType, subject string, data []byte) *CloudEventsEnvelope {
if eventType == "" {
eventType = DefaultCloudEventType
}
@ -52,5 +53,6 @@ func NewCloudEventsEnvelope(id, source, eventType string, data []byte) *CloudEve
Data: i,
SpecVersion: CloudEventsSpecVersion,
DataContentType: contentType,
Subject: subject,
}
}

View File

@ -12,39 +12,39 @@ import (
)
func TestCreateCloudEventsEnvelope(t *testing.T) {
envelope := NewCloudEventsEnvelope("a", "source", "eventType", nil)
envelope := NewCloudEventsEnvelope("a", "source", "eventType", "", nil)
assert.NotNil(t, envelope)
}
func TestCreateCloudEventsEnvelopeDefaults(t *testing.T) {
t.Run("default event type", func(t *testing.T) {
envelope := NewCloudEventsEnvelope("a", "source", "", nil)
envelope := NewCloudEventsEnvelope("a", "source", "", "", nil)
assert.Equal(t, DefaultCloudEventType, envelope.Type)
})
t.Run("non-default event type", func(t *testing.T) {
envelope := NewCloudEventsEnvelope("a", "source", "e1", nil)
envelope := NewCloudEventsEnvelope("a", "source", "e1", "", nil)
assert.Equal(t, "e1", envelope.Type)
})
t.Run("spec version", func(t *testing.T) {
envelope := NewCloudEventsEnvelope("a", "source", "", nil)
envelope := NewCloudEventsEnvelope("a", "source", "", "", nil)
assert.Equal(t, CloudEventsSpecVersion, envelope.SpecVersion)
})
t.Run("has data", func(t *testing.T) {
envelope := NewCloudEventsEnvelope("a", "source", "", []byte("data"))
envelope := NewCloudEventsEnvelope("a", "source", "", "", []byte("data"))
assert.Equal(t, "data", envelope.Data.(string))
})
t.Run("string data content type", func(t *testing.T) {
envelope := NewCloudEventsEnvelope("a", "source", "", []byte("data"))
envelope := NewCloudEventsEnvelope("a", "source", "", "", []byte("data"))
assert.Equal(t, "text/plain", envelope.DataContentType)
})
t.Run("json data content type", func(t *testing.T) {
str := `{ "data": "1" }`
envelope := NewCloudEventsEnvelope("a", "source", "", []byte(str))
envelope := NewCloudEventsEnvelope("a", "source", "", "", []byte(str))
assert.Equal(t, "application/json", envelope.DataContentType)
})
}

View File

@ -5,6 +5,7 @@ Secret Stores provide a common way to interact with different secret stores, clo
Currently supported secret stores are:
* Kubernetes
* Hashicorp Vault
* Azure KeyVault
* AWS Secret manager
* GCP Cloud KMS

View File

@ -4,6 +4,7 @@ State Stores provide a common way to interact with different data store implemen
Currently supported state stores are:
* AWS DynamoDB
* Azure CosmosDB
* Azure Table Storage
* Cassandra
@ -18,6 +19,8 @@ Currently supported state stores are:
* Redis
* SQL Server
* Zookeeper
* Cloud Firestore (Datastore mode)
* Couchbase
## Implementing a new State Store

View File

@ -0,0 +1,169 @@
package dynamodb
import (
"encoding/json"
"fmt"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/dapr/components-contrib/state"
)
// StateStore is a DynamoDB state store
type StateStore struct {
client dynamodbiface.DynamoDBAPI
table string
}
type dynamoDBMetadata struct {
Region string `json:"region"`
AccessKey string `json:"accessKey"`
SecretKey string `json:"secretKey"`
SessionToken string `json:"sessionToken"`
Table string `json:"table"`
}
// NewDynamoDBStateStore returns a new dynamoDB state store
func NewDynamoDBStateStore() *StateStore {
return &StateStore{}
}
// Init does metadata and connection parsing
func (d *StateStore) Init(metadata state.Metadata) error {
meta, err := d.getDynamoDBMetadata(metadata)
if err != nil {
return err
}
client, err := d.getClient(meta)
if err != nil {
return err
}
d.client = client
d.table = meta.Table
return nil
}
// Get retrieves a dynamoDB item
func (d *StateStore) Get(req *state.GetRequest) (*state.GetResponse, error) {
input := &dynamodb.GetItemInput{
ConsistentRead: aws.Bool(req.Options.Consistency == state.Strong),
TableName: aws.String(d.table),
Key: map[string]*dynamodb.AttributeValue{
"key": {
S: aws.String(req.Key),
},
},
}
result, err := d.client.GetItem(input)
if err != nil {
return nil, err
}
if len(result.Item) == 0 {
return &state.GetResponse{}, nil
}
var output string
if err = dynamodbattribute.Unmarshal(result.Item["value"], &output); err != nil {
return nil, err
}
return &state.GetResponse{
Data: []byte(output),
}, nil
}
// Set saves a dynamoDB item
func (d *StateStore) Set(req *state.SetRequest) error {
item := map[string]*dynamodb.AttributeValue{
"key": {
S: aws.String(req.Key),
},
"value": {
S: aws.String(fmt.Sprintf("%v", req.Value)),
},
}
input := &dynamodb.PutItemInput{
Item: item,
TableName: &d.table,
}
_, e := d.client.PutItem(input)
return e
}
// BulkSet performs a bulk set operation
func (d *StateStore) BulkSet(req []state.SetRequest) error {
for _, r := range req {
err := d.Set(&r)
if err != nil {
return err
}
}
return nil
}
// Delete performs a delete operation
func (d *StateStore) Delete(req *state.DeleteRequest) error {
input := &dynamodb.DeleteItemInput{
Key: map[string]*dynamodb.AttributeValue{
"key": {
S: aws.String(req.Key),
},
},
TableName: aws.String(d.table),
}
_, err := d.client.DeleteItem(input)
return err
}
// BulkDelete performs a bulk delete operation
func (d *StateStore) BulkDelete(req []state.DeleteRequest) error {
for _, r := range req {
err := d.Delete(&r)
if err != nil {
return err
}
}
return nil
}
func (d *StateStore) getDynamoDBMetadata(metadata state.Metadata) (*dynamoDBMetadata, error) {
b, err := json.Marshal(metadata.Properties)
if err != nil {
return nil, err
}
var meta dynamoDBMetadata
err = json.Unmarshal(b, &meta)
if err != nil {
return nil, err
}
if meta.SecretKey == "" || meta.AccessKey == "" || meta.Region == "" || meta.SessionToken == "" {
return nil, fmt.Errorf("missing aws credentials in metadata")
}
return &meta, nil
}
func (d *StateStore) getClient(meta *dynamoDBMetadata) (*dynamodb.DynamoDB, error) {
sess, err := session.NewSession(&aws.Config{
Region: aws.String(meta.Region),
Credentials: credentials.NewStaticCredentials(meta.AccessKey, meta.SecretKey, meta.SessionToken),
})
if err != nil {
return nil, err
}
c := dynamodb.New(sess)
return c, nil
}

View File

@ -0,0 +1,248 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
package dynamodb
import (
"fmt"
"testing"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
"github.com/dapr/components-contrib/state"
"github.com/stretchr/testify/assert"
)
type mockedDynamoDB struct {
GetItemFn func(input *dynamodb.GetItemInput) (*dynamodb.GetItemOutput, error)
PutItemFn func(input *dynamodb.PutItemInput) (*dynamodb.PutItemOutput, error)
DeleteItemFn func(input *dynamodb.DeleteItemInput) (*dynamodb.DeleteItemOutput, error)
dynamodbiface.DynamoDBAPI
}
func (m *mockedDynamoDB) GetItem(input *dynamodb.GetItemInput) (*dynamodb.GetItemOutput, error) {
return m.GetItemFn(input)
}
func (m *mockedDynamoDB) PutItem(input *dynamodb.PutItemInput) (*dynamodb.PutItemOutput, error) {
return m.PutItemFn(input)
}
func (m *mockedDynamoDB) DeleteItem(input *dynamodb.DeleteItemInput) (*dynamodb.DeleteItemOutput, error) {
return m.DeleteItemFn(input)
}
func TestInit(t *testing.T) {
m := state.Metadata{}
s := NewDynamoDBStateStore()
t.Run("Init with valid metadata", func(t *testing.T) {
m.Properties = map[string]string{
"AccessKey": "a",
"Region": "a",
"SecretKey": "a",
"SessionToken": "a",
}
err := s.Init(m)
assert.Nil(t, err)
})
t.Run("Init with missing metadata", func(t *testing.T) {
m.Properties = map[string]string{
"Dummy": "a",
}
err := s.Init(m)
assert.NotNil(t, err)
assert.Equal(t, err, fmt.Errorf("missing aws credentials in metadata"))
})
}
func TestGet(t *testing.T) {
t.Run("Successfully retrieve item", func(t *testing.T) {
ss := StateStore{
client: &mockedDynamoDB{
GetItemFn: func(input *dynamodb.GetItemInput) (output *dynamodb.GetItemOutput, err error) {
return &dynamodb.GetItemOutput{
Item: map[string]*dynamodb.AttributeValue{
"key": {
S: aws.String("key"),
},
"value": {
S: aws.String("value"),
},
},
}, nil
},
},
}
req := &state.GetRequest{
Key: "key",
Metadata: nil,
Options: state.GetStateOption{
Consistency: "strong",
},
}
out, err := ss.Get(req)
assert.Nil(t, err)
assert.Equal(t, []byte("value"), out.Data)
})
t.Run("Unsuccessfully get item", func(t *testing.T) {
ss := StateStore{
client: &mockedDynamoDB{
GetItemFn: func(input *dynamodb.GetItemInput) (output *dynamodb.GetItemOutput, err error) {
return nil, fmt.Errorf("failed to retrieve data")
},
},
}
req := &state.GetRequest{
Key: "key",
Metadata: nil,
Options: state.GetStateOption{
Consistency: "strong",
},
}
out, err := ss.Get(req)
assert.NotNil(t, err)
assert.Nil(t, out)
})
t.Run("Unsuccessfully with empty response", func(t *testing.T) {
ss := StateStore{
client: &mockedDynamoDB{
GetItemFn: func(input *dynamodb.GetItemInput) (output *dynamodb.GetItemOutput, err error) {
return &dynamodb.GetItemOutput{
Item: map[string]*dynamodb.AttributeValue{},
}, nil
},
},
}
req := &state.GetRequest{
Key: "key",
Metadata: nil,
Options: state.GetStateOption{
Consistency: "strong",
},
}
out, err := ss.Get(req)
assert.Nil(t, err)
assert.Nil(t, out.Data)
})
t.Run("Unsuccessfully with no required key", func(t *testing.T) {
ss := StateStore{
client: &mockedDynamoDB{
GetItemFn: func(input *dynamodb.GetItemInput) (output *dynamodb.GetItemOutput, err error) {
return &dynamodb.GetItemOutput{
Item: map[string]*dynamodb.AttributeValue{
"value2": {
S: aws.String("value"),
},
},
}, nil
},
},
}
req := &state.GetRequest{
Key: "key",
Metadata: nil,
Options: state.GetStateOption{
Consistency: "strong",
},
}
out, err := ss.Get(req)
assert.Nil(t, err)
assert.Empty(t, out.Data)
})
}
func TestSet(t *testing.T) {
type value struct {
Value string
}
t.Run("Successfully set item", func(t *testing.T) {
ss := StateStore{
client: &mockedDynamoDB{
PutItemFn: func(input *dynamodb.PutItemInput) (output *dynamodb.PutItemOutput, err error) {
assert.Equal(t, map[string]*dynamodb.AttributeValue{
"key": {
S: aws.String("key"),
},
"value": {
S: aws.String("{value}"),
}}, input.Item)
return &dynamodb.PutItemOutput{
Attributes: map[string]*dynamodb.AttributeValue{
"key": {
S: aws.String("value"),
},
},
}, nil
},
},
}
req := &state.SetRequest{
Key: "key",
Value: value{
Value: "value",
},
}
err := ss.Set(req)
assert.Nil(t, err)
})
t.Run("Un-successfully set item", func(t *testing.T) {
ss := StateStore{
client: &mockedDynamoDB{
PutItemFn: func(input *dynamodb.PutItemInput) (output *dynamodb.PutItemOutput, err error) {
return nil, fmt.Errorf("unable to put item")
},
},
}
req := &state.SetRequest{
Key: "key",
Value: value{
Value: "value",
},
}
err := ss.Set(req)
assert.NotNil(t, err)
})
}
func TestDelete(t *testing.T) {
t.Run("Successfully delete item", func(t *testing.T) {
req := &state.DeleteRequest{
Key: "key",
}
ss := StateStore{
client: &mockedDynamoDB{
DeleteItemFn: func(input *dynamodb.DeleteItemInput) (output *dynamodb.DeleteItemOutput, err error) {
assert.Equal(t, map[string]*dynamodb.AttributeValue{
"key": {
S: aws.String(req.Key),
},
}, input.Key)
return nil, nil
},
},
}
err := ss.Delete(req)
assert.Nil(t, err)
})
t.Run("Un-successfully delete item", func(t *testing.T) {
ss := StateStore{
client: &mockedDynamoDB{
DeleteItemFn: func(input *dynamodb.DeleteItemInput) (output *dynamodb.DeleteItemOutput, err error) {
return nil, fmt.Errorf("unable to delete item")
},
},
}
req := &state.DeleteRequest{
Key: "key",
}
err := ss.Delete(req)
assert.NotNil(t, err)
})
}