[WIP] PostgreSQL State Store (#353)

* Added postgres state files

* Fix SQL Server integration tests. (#324)

* Initial unit and int tests for postgres

* Implemented dbaccess interface

* Formatting updates

* Create state table if not exists

* Initial set implementation

* Get and set with integration test

* Added delete

* Create date and update date in database

* Integration tests for bulk set and bulk delete

* Etag processing, test refactor

* Added tests for etag processing

* Updated const, removed public metadata fields

* Cleanup edits

* Fixed linting issues

* Ran go mod tidy

* Initial implementation of TransactionalStore

* Fixed linting issues

* Added tests and validation

* Test for creating the state table

* Fixed issue with parsing param value as string

* Update to integration tests

* Fixed linting issues

* Added retry logic from state

* Changed primary key to text

* Updated debug logging

* Review updates

Co-authored-by: Young Bu Park <youngp@microsoft.com>
Co-authored-by: Yaron Schneider <yaronsc@microsoft.com>
This commit is contained in:
Brooke Hamilton 2020-07-09 16:56:08 -04:00 committed by GitHub
parent 6b04b4c9fe
commit a0a293ca1b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 1237 additions and 2 deletions

3
.gitignore vendored
View File

@ -1,2 +1,3 @@
/dist
.idea
.idea
.vscode

3
go.mod
View File

@ -44,6 +44,7 @@ require (
github.com/hashicorp/consul/api v1.2.0
github.com/hashicorp/go-multierror v1.0.0
github.com/hazelcast/hazelcast-go-client v0.0.0-20190530123621-6cf767c2f31a
github.com/jackc/pgx/v4 v4.6.0
github.com/json-iterator/go v1.1.8
github.com/kubernetes-client/go v0.0.0-20190625181339-cd8e39e789c7
github.com/mitchellh/mapstructure v1.3.2 // indirect
@ -61,7 +62,7 @@ require (
github.com/sendgrid/sendgrid-go v3.5.0+incompatible
github.com/sergi/go-diff v1.1.0 // indirect
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271
github.com/stretchr/testify v1.4.0
github.com/stretchr/testify v1.5.1
github.com/tidwall/pretty v1.0.1 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20200122045848-3419fae592fc // indirect
github.com/valyala/fasthttp v1.6.0

78
go.sum
View File

@ -175,6 +175,8 @@ github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMn
github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag=
github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I=
github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ=
github.com/coreos/bbolt v1.3.3 h1:n6AiVyVRKQFNb6mJlwESEvvLoDyiTzXX7ORAUlkeBdY=
github.com/coreos/bbolt v1.3.3/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
@ -186,11 +188,13 @@ github.com/coreos/go-oidc v2.1.0+incompatible/go.mod h1:CgnwVTmzoESiwO9qyAFEMiHo
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM=
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f h1:JOrtw2xFKzlg+cbHpyrpLDmnN1HqhBfnX7WDiW7eG2c=
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f h1:lBNOc5arjvs8E5mO2tbpBpLoyyu8B6e44T7hJy6potg=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE=
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
github.com/dapr/components-contrib v0.0.0-20200219164914-5b75f4d0fbc6/go.mod h1:AZi8IGs8LFdywJg/YGwDs7MAxJkvGa8RgHN4NoJSKt0=
github.com/dapr/dapr v0.4.1-0.20200228055659-71892bc0111e h1:njRp/SZ/zgqjSDywmy+Dn9oikkZqkqAHWGbfMarUuwo=
github.com/dapr/dapr v0.4.1-0.20200228055659-71892bc0111e/go.mod h1:c60DJ9TdSdpbLjgqP55A5u4ZCYChFwa9UGYIXd9pmm4=
@ -292,6 +296,8 @@ github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gocql/gocql v0.0.0-20191018090344-07ace3bab0f8 h1:ZyxBBeTImqFLu9mLtQUnXrO8K/SryXE/xjG/ygl0DxQ=
github.com/gocql/gocql v0.0.0-20191018090344-07ace3bab0f8/go.mod h1:DL0ekTmBSTdlNF25Orwt/JMzqIq3EJ4MVa/J/uK64OY=
github.com/gofrs/uuid v3.2.0+incompatible h1:y12jRkkFxsd7GpqdSZ+/KCs/fJbqpEXSGd4+jfEaewE=
github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM=
github.com/gogo/protobuf v0.0.0-20171007142547-342cbe0a0415 h1:WSBJMqJbLxsn+bTCPyPYZfqHdJmc8MK4wrBjMft6BAM=
github.com/gogo/protobuf v0.0.0-20171007142547-342cbe0a0415/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
@ -443,6 +449,47 @@ github.com/imkira/go-interpol v1.1.0 h1:KIiKr0VSG2CUW1hl1jpiyuzuJeKUUpC8iM1AIE7N
github.com/imkira/go-interpol v1.1.0/go.mod h1:z0h2/2T3XF8kyEPpRgJ3kmNv+C43p+I/CoI+jC3w2iA=
github.com/improbable-eng/go-httpwares v0.0.0-20191126155631-6144c42a79c9/go.mod h1:LE9Hs6fsYQ7RoDuFUQlYmlRAku9vUlSlO++jWNj+D0I=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/jackc/chunkreader v1.0.0 h1:4s39bBR8ByfqH+DKm8rQA3E1LHZWB9XWcrz8fqaZbe0=
github.com/jackc/chunkreader v1.0.0/go.mod h1:RT6O25fNZIuasFJRyZ4R/Y2BbhasbmZXF9QQ7T3kePo=
github.com/jackc/chunkreader/v2 v2.0.0/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk=
github.com/jackc/chunkreader/v2 v2.0.1 h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8=
github.com/jackc/chunkreader/v2 v2.0.1/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk=
github.com/jackc/pgconn v0.0.0-20190420214824-7e0022ef6ba3/go.mod h1:jkELnwuX+w9qN5YIfX0fl88Ehu4XC3keFuOJJk9pcnA=
github.com/jackc/pgconn v0.0.0-20190824142844-760dd75542eb/go.mod h1:lLjNuW/+OfW9/pnVKPazfWOgNfH2aPem8YQ7ilXGvJE=
github.com/jackc/pgconn v0.0.0-20190831204454-2fabfa3c18b7/go.mod h1:ZJKsE/KZfsUgOEh9hBm+xYTstcNHg7UPMVJqRfQxq4s=
github.com/jackc/pgconn v1.5.0 h1:oFSOilzIZkyg787M1fEmyMfOUUvwj0daqYMfaWwNL4o=
github.com/jackc/pgconn v1.5.0/go.mod h1:QeD3lBfpTFe8WUnPZWN5KY/mB8FGMIYRdd8P8Jr0fAI=
github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE=
github.com/jackc/pgio v1.0.0/go.mod h1:oP+2QK2wFfUWgr+gxjoBH9KGBb31Eio69xUb0w5bYf8=
github.com/jackc/pgmock v0.0.0-20190831213851-13a1b77aafa2 h1:JVX6jT/XfzNqIjye4717ITLaNwV9mWbJx0dLCpcRzdA=
github.com/jackc/pgmock v0.0.0-20190831213851-13a1b77aafa2/go.mod h1:fGZlG77KXmcq05nJLRkk0+p82V8B8Dw8KN2/V9c/OAE=
github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM=
github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg=
github.com/jackc/pgproto3 v1.1.0 h1:FYYE4yRw+AgI8wXIinMlNjBbp/UitDJwfj5LqqewP1A=
github.com/jackc/pgproto3 v1.1.0/go.mod h1:eR5FA3leWg7p9aeAqi37XOTgTIbkABlvcPB3E5rlc78=
github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190420180111-c116219b62db/go.mod h1:bhq50y+xrl9n5mRYyCBFKkpRVTLYJVWeCc+mEAI3yXA=
github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190609003834-432c2951c711/go.mod h1:uH0AWtUmuShn0bcesswc4aBTWGvw0cAxIJp+6OB//Wg=
github.com/jackc/pgproto3/v2 v2.0.0-rc3/go.mod h1:ryONWYqW6dqSg1Lw6vXNMXoBJhpzvWKnT95C46ckYeM=
github.com/jackc/pgproto3/v2 v2.0.0-rc3.0.20190831210041-4c03ce451f29/go.mod h1:ryONWYqW6dqSg1Lw6vXNMXoBJhpzvWKnT95C46ckYeM=
github.com/jackc/pgproto3/v2 v2.0.1 h1:Rdjp4NFjwHnEslx2b66FfCI2S0LhO4itac3hXz6WX9M=
github.com/jackc/pgproto3/v2 v2.0.1/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA=
github.com/jackc/pgservicefile v0.0.0-20200307190119-3430c5407db8 h1:Q3tB+ExeflWUW7AFcAhXqk40s9mnNYLk1nOkKNZ5GnU=
github.com/jackc/pgservicefile v0.0.0-20200307190119-3430c5407db8/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E=
github.com/jackc/pgtype v0.0.0-20190421001408-4ed0de4755e0/go.mod h1:hdSHsc1V01CGwFsrv11mJRHWJ6aifDLfdV3aVjFF0zg=
github.com/jackc/pgtype v0.0.0-20190824184912-ab885b375b90/go.mod h1:KcahbBH1nCMSo2DXpzsoWOAfFkdEtEJpPbVLq8eE+mc=
github.com/jackc/pgtype v0.0.0-20190828014616-a8802b16cc59/go.mod h1:MWlu30kVJrUS8lot6TQqcg7mtthZ9T0EoIBFiJcmcyw=
github.com/jackc/pgtype v1.3.0 h1:l8JvKrby3RI7Kg3bYEeU9TA4vqC38QDpFCfcrC7KuN0=
github.com/jackc/pgtype v1.3.0/go.mod h1:b0JqxHvPmljG+HQ5IsvQ0yqeSi4nGcDTVjFoiLDb0Ik=
github.com/jackc/pgx v3.6.2+incompatible h1:2zP5OD7kiyR3xzRYMhOcXVvkDZsImVXfj+yIyTQf3/o=
github.com/jackc/pgx v3.6.2+incompatible/go.mod h1:0ZGrqGqkRlliWnWB4zKnWtjbSWbGkVEFm4TeybAXq+I=
github.com/jackc/pgx/v4 v4.0.0-20190420224344-cc3461e65d96/go.mod h1:mdxmSJJuR08CZQyj1PVQBHy9XOp5p8/SHH6a0psbY9Y=
github.com/jackc/pgx/v4 v4.0.0-20190421002000-1b8f0016e912/go.mod h1:no/Y67Jkk/9WuGR0JG/JseM9irFbnEPbuWV2EELPNuM=
github.com/jackc/pgx/v4 v4.0.0-pre1.0.20190824185557-6972a5742186/go.mod h1:X+GQnOEnf1dqHGpw7JmHqHc1NxDoalibchSk9/RWuDc=
github.com/jackc/pgx/v4 v4.6.0 h1:Fh0O9GdlG4gYpjpwOqjdEodJUQM9jzN3Hdv7PN0xmm0=
github.com/jackc/pgx/v4 v4.6.0/go.mod h1:vPh43ZzxijXUVJ+t/EmXBtFmbFVO72cuneCT9oAlxAg=
github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jackc/puddle v1.1.0/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk=
github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03 h1:FUwcHNlEqkqLjLBdCp5PRlCFijNjvcYANOZXzCfXwCM=
github.com/jcmturner/gofork v0.0.0-20190328161633-dc7c13fece03/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o=
github.com/jcmturner/gofork v1.0.0 h1:J7uCkflzTEhUZ64xqKnkDxq3kzc96ajM1Gli5ktUem8=
@ -500,12 +547,15 @@ github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA=
github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kubernetes-client/go v0.0.0-20190625181339-cd8e39e789c7 h1:NZlvd1Qf3MwoRhh87iVkJSHK3R31fX3D7kQfdJy6LnQ=
github.com/kubernetes-client/go v0.0.0-20190625181339-cd8e39e789c7/go.mod h1:ks4KCmmxdXksTSu2dlnUanEOqNd/dsoyS6/7bay2RQ8=
github.com/kubernetes-client/go v0.0.0-20190928040339-c757968c4c36 h1:/VKCfQgtQxBXEVU9UAJkW/ybm/070TBG57x2wxYUtXI=
github.com/kubernetes-client/go v0.0.0-20190928040339-c757968c4c36/go.mod h1:ks4KCmmxdXksTSu2dlnUanEOqNd/dsoyS6/7bay2RQ8=
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.1.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0=
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.3.0 h1:/qkRGz8zljWiDcFvgpwUpwIAPu3r07TDvs3Rws+o/pU=
@ -518,10 +568,18 @@ github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN
github.com/mailru/easyjson v0.7.0/go.mod h1:KAzv3t3aY1NaHWoQz1+4F1ccyAH66Jk7yos7ldAVICs=
github.com/mattn/go-colorable v0.0.9 h1:UVL0vNpWh04HeJXV0KLcaT7r06gOH2l4OW6ddYRUIY4=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ=
github.com/mattn/go-colorable v0.1.2 h1:/bC9yWikZXAL9uJdulbSfyVNIR3n3trXl+v8+1sx8mU=
github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
github.com/mattn/go-ieproxy v0.0.0-20190610004146-91bb50d98149 h1:HfxbT6/JcvIljmERptWhwa8XzP7H3T+Z2N26gTsaDaA=
github.com/mattn/go-ieproxy v0.0.0-20190610004146-91bb50d98149/go.mod h1:31jz6HNzdxOmlERGGEc4v/dMssOfmp2p5bT/okiKFFc=
github.com/mattn/go-isatty v0.0.3 h1:ns/ykhmWi7G9O+8a448SecJU3nSMBXJfqQkl0upE1jI=
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-isatty v0.0.9 h1:d5US/mDsogSGW37IV293h//ZFaeajb69h+EHFsv2xGg=
github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/mediocregopher/radix.v2 v0.0.0-20181115013041-b67df6e626f9 h1:ViNuGS149jgnttqhc6XQNPwdupEMBXqCx9wtlW7P3sA=
@ -676,6 +734,9 @@ github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKkWU=
github.com/rs/zerolog v1.15.0/go.mod h1:xYTKnLHcpfU2225ny5qZjxnj9NvkumZYjJHlAThCjNc=
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
github.com/russross/blackfriday v2.0.0+incompatible h1:cBXrhZNUf9C+La9/YpS+UHpUT8YD6Td9ZMSU9APFcsk=
github.com/russross/blackfriday v2.0.0+incompatible/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
@ -694,6 +755,8 @@ github.com/sergi/go-diff v1.0.0 h1:Kpca3qRNrduNnOQeazBd0ysaKrUJiIuISHxogkT9RPQ=
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0=
github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24 h1:pntxY8Ary0t43dCZ5dqY4YTJCObLY1kIXl0uzMv+7DE=
github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4=
github.com/shurcooL/sanitized_anchor_name v1.0.0 h1:PdmoCO6wvbs+7yrJyMORt4/BmY5IYyJwS/kOiWx8mHo=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.1.1 h1:VzGj7lhU7KEB9e9gMpAV/v5XT2NVSvLJhJLCWbnkgXg=
@ -737,6 +800,8 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4=
github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
github.com/tidwall/pretty v1.0.1 h1:WE4RBSZ1x6McVVC8S/Md+Qse8YUv6HRObAx6ke00NY8=
@ -777,6 +842,7 @@ github.com/yudai/pp v2.0.1+incompatible h1:Q4//iY4pNF6yPLZIigmvcl7k/bPgrcTPIFIcm
github.com/yudai/pp v2.0.1+incompatible/go.mod h1:PuxR/8QJ7cyCkFp/aUDS+JY727OFEZkTdatxwunjIkc=
github.com/yuin/gopher-lua v0.0.0-20191220021717-ab39c6098bdb h1:ZkM6LRnq40pR1Ox0hTHlnpkcOTuFIDQpZ1IN8rKKhX0=
github.com/yuin/gopher-lua v0.0.0-20191220021717-ab39c6098bdb/go.mod h1:gqRgreBUhTSL0GeU64rtZ3Uq3wtjOa/TB2YfrtkCbVQ=
github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q=
go.etcd.io/bbolt v1.3.3 h1:MUGmc65QhB3pIlaQ5bB4LwqSj6GIonVJXpZiaKNyaKk=
go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/etcd v3.3.17+incompatible h1:g8iRku1SID8QAW8cDlV0L/PkZlw63LSiYEHYHoE6j/s=
@ -791,11 +857,13 @@ go.opencensus.io v0.22.2 h1:75k/FF0Q2YM8QYo07VPddOLBslDt1MZOdEslOHvmzAs=
go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.3 h1:8sGtKOrtQqkN1bp2AtX+misvLIlOmsEsNd+9NIcPEm8=
go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0 h1:cxzIVoETapQEqDhQu3QfnvXAV4AlzcvUCxkVUFw3+EU=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.2.0 h1:6I+W7f5VwC5SV9dNrZ3qXrDB9mD0dyGOi/ZJmYw03T4=
go.uber.org/multierr v1.2.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.uber.org/zap v1.10.0 h1:ORx85nbTijNz8ljznvCMR1ZBIPKFn3jQrag10X2AsuM=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.uber.org/zap v1.11.0 h1:gSmpCfs+R47a4yQPAI4xJ0IPDLTRGXskm6UelqNXpqE=
@ -809,6 +877,7 @@ golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67/go.mod h1:6SG95UA2DQfeDnf
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190325154230-a5d413f7728c/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE=
golang.org/x/crypto v0.0.0-20190411191339-88737f569e3a/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE=
golang.org/x/crypto v0.0.0-20190418165655-df01cb2cc480/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE=
golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
@ -821,6 +890,7 @@ golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413 h1:ULYEB3JvPRE/IfO+9uO7vK
golang.org/x/crypto v0.0.0-20191206172530-e9b2fee46413/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200206161412-a0c6ece9d31a h1:aczoJ0HPNE92XKa7DrIzkNN6esOKO2TBwiiYoKcINhA=
golang.org/x/crypto v0.0.0-20200206161412-a0c6ece9d31a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200604202706-70a84ac30bf9 h1:vEg9joUBmeBcK9iSJftGNf3coIG4HqZElCPehJsfAYM=
golang.org/x/crypto v0.0.0-20200604202706-70a84ac30bf9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
@ -870,6 +940,7 @@ golang.org/x/net v0.0.0-20190619014844-b5b0513f8c1b/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190812203447-cdfb69ac37fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190923162816-aa69164e4478/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191004110552-13f9640d40b9/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
@ -907,6 +978,7 @@ golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190209173611-3b5209105503/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -921,6 +993,7 @@ golang.org/x/sys v0.0.0-20190620070143-6f217b454f45/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191010194322-b09406accb47 h1:/XfQ9z7ib8eEJX2hdgFTZJ/ntt0swNk5oYBziWeTCvY=
golang.org/x/sys v0.0.0-20191010194322-b09406accb47/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -955,6 +1028,7 @@ golang.org/x/tools v0.0.0-20190312151545-0bb0c0a6e846/go.mod h1:LCzVGOaR6xXOjkQ3
golang.org/x/tools v0.0.0-20190312170243-e65039ee4138/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/tools v0.0.0-20190425163242-31fd60d6bfdc/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/tools v0.0.0-20190506145303-2d16b83fe98c/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/tools v0.0.0-20190606124116-d0a3d012864b/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
@ -962,11 +1036,14 @@ golang.org/x/tools v0.0.0-20190614205625-5aca471b1d59/go.mod h1:/rFqwRUd4F7ZHNgw
golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
golang.org/x/tools v0.0.0-20190628153133-6cdbf07be9d0/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
golang.org/x/tools v0.0.0-20190816200558-6889da9d5479/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20190823170909-c4a336ef6a2f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20190911174233-4f2ddba30aff/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200117161641-43d50277825c h1:2EA2K0k9bcvvEDlqD8xdlOhCOqq+O/p9Voqi4x9W1YU=
golang.org/x/tools v0.0.0-20200117161641-43d50277825c/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7 h1:9zdDQZ7Thm29KFXgAX/+yaf3eVbP7djjWp/dXAppNCc=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
@ -1039,6 +1116,7 @@ gopkg.in/couchbaselabs/jsonx.v1 v1.0.0/go.mod h1:oR201IRovxvLW/eISevH12/+MiKHtNQ
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/inconshreveable/log15.v2 v2.0.0-20180818164646-67afb5ed74ec/go.mod h1:aPpfJ7XW+gOuirDoZ8gHhLh3kZ1B08FtV2bbmy7Jv3s=
gopkg.in/inf.v0 v0.9.0 h1:3zYtXIO92bvsdS3ggAdA8Gb4Azj0YU+TVY1uGYNFA8o=
gopkg.in/inf.v0 v0.9.0/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=

View File

@ -0,0 +1,20 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
package postgresql
import (
"github.com/dapr/components-contrib/state"
)
// dbAccess is a private interface which enables unit testing of PostgreSQL
type dbAccess interface {
Init(metadata state.Metadata) error
Set(req *state.SetRequest) error
Get(req *state.GetRequest) (*state.GetResponse, error)
Delete(req *state.DeleteRequest) error
ExecuteMulti(sets []state.SetRequest, deletes []state.DeleteRequest) error
Close() error // io.Closer
}

View File

@ -0,0 +1,286 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
package postgresql
import (
"database/sql"
"encoding/json"
"errors"
"fmt"
"strconv"
"github.com/dapr/components-contrib/state"
"github.com/dapr/dapr/pkg/logger"
// Blank import for the underlying PostgreSQL driver
_ "github.com/jackc/pgx/v4/stdlib"
)
const (
connectionStringKey = "connectionString"
errMissingConnectionString = "missing connection string"
tableName = "state"
)
// postgresDBAccess implements dbaccess
type postgresDBAccess struct {
logger logger.Logger
metadata state.Metadata
db *sql.DB
connectionString string
}
// newPostgresDBAccess creates a new instance of postgresAccess
func newPostgresDBAccess(logger logger.Logger) *postgresDBAccess {
logger.Debug("Instantiating new PostgreSQL state store")
return &postgresDBAccess{
logger: logger,
}
}
// Init sets up PostgreSQL connection and ensures that the state table exists
func (p *postgresDBAccess) Init(metadata state.Metadata) error {
p.logger.Debug("Initializing PostgreSQL state store")
p.metadata = metadata
if val, ok := metadata.Properties[connectionStringKey]; ok && val != "" {
p.connectionString = val
} else {
p.logger.Error("Missing postgreSQL connection string")
return fmt.Errorf(errMissingConnectionString)
}
db, err := sql.Open("pgx", p.connectionString)
if err != nil {
p.logger.Error(err)
return err
}
p.db = db
pingErr := db.Ping()
if pingErr != nil {
return pingErr
}
err = p.ensureStateTable(tableName)
if err != nil {
return err
}
return nil
}
// Set makes an insert or update to the database.
func (p *postgresDBAccess) Set(req *state.SetRequest) error {
return state.SetWithRetries(p.setValue, req)
}
// setValue is an internal implementation of set to enable passing the logic to state.SetWithRetries as a func.
func (p *postgresDBAccess) setValue(req *state.SetRequest) error {
p.logger.Debug("Setting state value in PostgreSQL")
err := state.CheckSetRequestOptions(req)
if err != nil {
return err
}
if req.Key == "" {
return fmt.Errorf("missing key in set operation")
}
var valueBytes []byte
// Convert to json string
valueBytes, err = json.Marshal(req.Value)
if err != nil {
return err
}
value := string(valueBytes)
var result sql.Result
// Sprintf is required for table name because sql.DB does not substitute parameters for table names.
// Other parameters use sql.DB parameter substitution.
if req.ETag == "" {
result, err = p.db.Exec(fmt.Sprintf(
`INSERT INTO %s (key, value) VALUES ($1, $2)
ON CONFLICT (key) DO UPDATE SET value = $2, updatedate = NOW();`,
tableName), req.Key, value)
} else {
// Convert req.ETag to integer for postgres compatibility
var etag int
etag, err = strconv.Atoi(req.ETag)
if err != nil {
return err
}
// When an etag is provided do an update - no insert
result, err = p.db.Exec(fmt.Sprintf(
`UPDATE %s SET value = $1, updatedate = NOW()
WHERE key = $2 AND xmin = $3;`,
tableName), value, req.Key, etag)
}
return p.returnSingleDBResult(result, err)
}
// Get returns data from the database. If data does not exist for the key an empty state.GetResponse will be returned.
func (p *postgresDBAccess) Get(req *state.GetRequest) (*state.GetResponse, error) {
p.logger.Debug("Getting state value from PostgreSQL")
if req.Key == "" {
return nil, fmt.Errorf("missing key in get operation")
}
var value string
var etag int
err := p.db.QueryRow(fmt.Sprintf("SELECT value, xmin as etag FROM %s WHERE key = $1", tableName), req.Key).Scan(&value, &etag)
if err != nil {
// If no rows exist, return an empty response, otherwise return the error.
if err == sql.ErrNoRows {
return &state.GetResponse{}, nil
}
return nil, err
}
response := &state.GetResponse{
Data: []byte(value),
ETag: strconv.Itoa(etag),
Metadata: req.Metadata,
}
return response, nil
}
// Delete removes an item from the state store.
func (p *postgresDBAccess) Delete(req *state.DeleteRequest) error {
return state.DeleteWithRetries(p.deleteValue, req)
}
// deleteValue is an internal implementation of delete to enable passing the logic to state.DeleteWithRetries as a func.
func (p *postgresDBAccess) deleteValue(req *state.DeleteRequest) error {
p.logger.Debug("Deleting state value from PostgreSQL")
if req.Key == "" {
return fmt.Errorf("missing key in delete operation")
}
var result sql.Result
var err error
if req.ETag == "" {
result, err = p.db.Exec("DELETE FROM state WHERE key = $1", req.Key)
} else {
// Convert req.ETag to integer for postgres compatibility
etag, conversionError := strconv.Atoi(req.ETag)
if conversionError != nil {
return conversionError
}
result, err = p.db.Exec("DELETE FROM state WHERE key = $1 and xmin = $2", req.Key, etag)
}
return p.returnSingleDBResult(result, err)
}
func (p *postgresDBAccess) ExecuteMulti(sets []state.SetRequest, deletes []state.DeleteRequest) error {
p.logger.Debug("Executing multiple PostgreSQL operations")
tx, err := p.db.Begin()
if err != nil {
return err
}
if len(deletes) > 0 {
for _, d := range deletes {
da := d // Fix for gosec G601: Implicit memory aliasing in for loop.
err = p.Delete(&da)
if err != nil {
tx.Rollback()
return err
}
}
}
if len(sets) > 0 {
for _, s := range sets {
sa := s // Fix for gosec G601: Implicit memory aliasing in for loop.
err = p.Set(&sa)
if err != nil {
tx.Rollback()
return err
}
}
}
err = tx.Commit()
return err
}
// Verifies that the sql.Result affected only one row and no errors exist
func (p *postgresDBAccess) returnSingleDBResult(result sql.Result, err error) error {
if err != nil {
p.logger.Debug(err)
return err
}
rowsAffected, resultErr := result.RowsAffected()
if resultErr != nil {
p.logger.Error(resultErr)
return resultErr
}
if rowsAffected == 0 {
noRowsErr := errors.New("database operation failed: no rows match given key and etag")
p.logger.Error(noRowsErr)
return noRowsErr
}
if rowsAffected > 1 {
tooManyRowsErr := errors.New("database operation failed: more than one row affected, expected one")
p.logger.Error(tooManyRowsErr)
return tooManyRowsErr
}
return nil
}
// Close implements io.Close
func (p *postgresDBAccess) Close() error {
if p.db != nil {
return p.db.Close()
}
return nil
}
func (p *postgresDBAccess) ensureStateTable(stateTableName string) error {
exists, err := tableExists(p.db, stateTableName)
if err != nil {
return err
}
if !exists {
p.logger.Info("Creating PostgreSQL state table")
createTable := fmt.Sprintf(`CREATE TABLE %s (
key text NOT NULL PRIMARY KEY,
value json NOT NULL,
insertdate TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
updatedate TIMESTAMP WITH TIME ZONE NULL);`, stateTableName)
_, err = p.db.Exec(createTable)
if err != nil {
return err
}
}
return nil
}
func tableExists(db *sql.DB, tableName string) (bool, error) {
var exists bool = false
err := db.QueryRow("SELECT EXISTS (SELECT FROM pg_tables where tablename = $1)", tableName).Scan(&exists)
return exists, err
}

View File

@ -0,0 +1,105 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
package postgresql
import (
"fmt"
"github.com/dapr/components-contrib/state"
"github.com/dapr/dapr/pkg/logger"
)
// PostgreSQL state store
type PostgreSQL struct {
logger logger.Logger
dbaccess dbAccess
}
// NewPostgreSQLStateStore creates a new instance of PostgreSQL state store
func NewPostgreSQLStateStore(logger logger.Logger) *PostgreSQL {
dba := newPostgresDBAccess(logger)
return newPostgreSQLStateStore(logger, dba)
}
// newPostgreSQLStateStore creates a newPostgreSQLStateStore instance of a PostgreSQL state store.
// This unexported constructor allows injecting a dbAccess instance for unit testing.
func newPostgreSQLStateStore(logger logger.Logger, dba dbAccess) *PostgreSQL {
return &PostgreSQL{
logger: logger,
dbaccess: dba,
}
}
// Init initializes the SQL server state store
func (p *PostgreSQL) Init(metadata state.Metadata) error {
return p.dbaccess.Init(metadata)
}
// Delete removes an entity from the store
func (p *PostgreSQL) Delete(req *state.DeleteRequest) error {
return p.dbaccess.Delete(req)
}
// BulkDelete removes multiple entries from the store
func (p *PostgreSQL) BulkDelete(req []state.DeleteRequest) error {
return p.dbaccess.ExecuteMulti(nil, req)
}
// Get returns an entity from store
func (p *PostgreSQL) Get(req *state.GetRequest) (*state.GetResponse, error) {
return p.dbaccess.Get(req)
}
// Set adds/updates an entity on store
func (p *PostgreSQL) Set(req *state.SetRequest) error {
return p.dbaccess.Set(req)
}
// BulkSet adds/updates multiple entities on store
func (p *PostgreSQL) BulkSet(req []state.SetRequest) error {
return p.dbaccess.ExecuteMulti(req, nil)
}
// Multi handles multiple transactions. Implements TransactionalStore.
func (p *PostgreSQL) Multi(reqs []state.TransactionalRequest) error {
var deletes []state.DeleteRequest
var sets []state.SetRequest
for _, req := range reqs {
switch req.Operation {
case state.Upsert:
if setReq, ok := req.Request.(state.SetRequest); ok {
sets = append(sets, setReq)
} else {
return fmt.Errorf("expecting set request")
}
case state.Delete:
if delReq, ok := req.Request.(state.DeleteRequest); ok {
deletes = append(deletes, delReq)
} else {
return fmt.Errorf("expecting delete request")
}
default:
return fmt.Errorf("unsupported operation: %s", req.Operation)
}
}
if len(sets) > 0 || len(deletes) > 0 {
return p.dbaccess.ExecuteMulti(sets, deletes)
}
return nil
}
// Close implements io.Closer
func (p *PostgreSQL) Close() error {
if p.dbaccess != nil {
return p.dbaccess.Close()
}
return nil
}

View File

@ -0,0 +1,566 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
package postgresql
import (
"database/sql"
"encoding/json"
"fmt"
"os"
"testing"
"github.com/dapr/components-contrib/state"
"github.com/dapr/dapr/pkg/logger"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
)
const (
connectionStringEnvKey = "DAPR_TEST_POSTGRES_CONNSTRING" // Environment variable containing the connection string
)
type fakeItem struct {
Color string
}
func TestPostgreSQLIntegration(t *testing.T) {
connectionString := getConnectionString()
if connectionString == "" {
t.Skipf("PostgreSQL state integration tests skipped. To enable define the connection string using environment variable '%s' (example 'export %s=\"host=localhost user=postgres password=example port=5432 connect_timeout=10 database=dapr_test\")", connectionStringEnvKey, connectionStringEnvKey)
}
t.Run("Test init configurations", func(t *testing.T) {
testInitConfiguration(t)
})
metadata := state.Metadata{
Properties: map[string]string{connectionStringKey: connectionString},
}
pgs := NewPostgreSQLStateStore(logger.NewLogger("test"))
t.Cleanup(func() {
defer pgs.Close()
})
error := pgs.Init(metadata)
if error != nil {
t.Fatal(error)
}
t.Run("Create table succeeds", func(t *testing.T) {
t.Parallel()
testCreateTable(t, pgs.dbaccess.(*postgresDBAccess))
})
t.Run("Get Set Delete one item", func(t *testing.T) {
t.Parallel()
setGetUpdateDeleteOneItem(t, pgs)
})
t.Run("Get item that does not exist", func(t *testing.T) {
t.Parallel()
getItemThatDoesNotExist(t, pgs)
})
t.Run("Get item with no key fails", func(t *testing.T) {
t.Parallel()
getItemWithNoKey(t, pgs)
})
t.Run("Set updates the updatedate field", func(t *testing.T) {
t.Parallel()
setUpdatesTheUpdatedateField(t, pgs)
})
t.Run("Set item with no key fails", func(t *testing.T) {
t.Parallel()
setItemWithNoKey(t, pgs)
})
t.Run("Bulk set and bulk delete", func(t *testing.T) {
t.Parallel()
testBulkSetAndBulkDelete(t, pgs)
})
t.Run("Update and delete with etag succeeds", func(t *testing.T) {
t.Parallel()
updateAndDeleteWithEtagSucceeds(t, pgs)
})
t.Run("Update with old etag fails", func(t *testing.T) {
t.Parallel()
updateWithOldEtagFails(t, pgs)
})
t.Run("Insert with etag fails", func(t *testing.T) {
t.Parallel()
newItemWithEtagFails(t, pgs)
})
t.Run("Delete with invalid etag fails", func(t *testing.T) {
t.Parallel()
deleteWithInvalidEtagFails(t, pgs)
})
t.Run("Delete item with no key fails", func(t *testing.T) {
t.Parallel()
deleteWithNoKeyFails(t, pgs)
})
t.Run("Delete an item that does not exist", func(t *testing.T) {
t.Parallel()
deleteItemThatDoesNotExist(t, pgs)
})
t.Run("Multi with delete and set", func(t *testing.T) {
t.Parallel()
multiWithDeleteAndSet(t, pgs)
})
t.Run("Multi with delete only", func(t *testing.T) {
t.Parallel()
multiWithDeleteOnly(t, pgs)
})
t.Run("Multi with set only", func(t *testing.T) {
t.Parallel()
multiWithSetOnly(t, pgs)
})
}
// setGetUpdateDeleteOneItem validates setting one item, getting it, and deleting it.
func setGetUpdateDeleteOneItem(t *testing.T, pgs *PostgreSQL) {
key := randomKey()
//value := `{"something": "DKbLaZwrlCAZ"}`
value := &fakeItem{Color: "yellow"}
setItem(t, pgs, key, value, "")
getResponse, outputObject := getItem(t, pgs, key)
assert.Equal(t, value, outputObject)
newValue := &fakeItem{Color: "green"}
setItem(t, pgs, key, newValue, getResponse.ETag)
getResponse, outputObject = getItem(t, pgs, key)
assert.Equal(t, newValue, outputObject)
deleteItem(t, pgs, key, getResponse.ETag)
}
// testCreateTable tests the ability to create the state table.
func testCreateTable(t *testing.T, dba *postgresDBAccess) {
tableName := "test_state"
// Drop the table if it already exists
exists, err := tableExists(dba.db, tableName)
assert.Nil(t, err)
if exists {
dropTable(t, dba.db, tableName)
}
// Create the state table and test for its existence
err = dba.ensureStateTable(tableName)
assert.Nil(t, err)
exists, err = tableExists(dba.db, tableName)
assert.Nil(t, err)
assert.True(t, exists)
// Drop the state table
dropTable(t, dba.db, tableName)
}
func dropTable(t *testing.T, db *sql.DB, tableName string) {
_, err := db.Exec(fmt.Sprintf("DROP TABLE %s", tableName))
assert.Nil(t, err)
}
func deleteItemThatDoesNotExist(t *testing.T, pgs *PostgreSQL) {
// Delete the item with a fake etag
deleteReq := &state.DeleteRequest{
Key: randomKey(),
}
err := pgs.Delete(deleteReq)
assert.NotNil(t, err)
}
func multiWithSetOnly(t *testing.T, pgs *PostgreSQL) {
var multiRequest []state.TransactionalRequest
var setRequests []state.SetRequest
for i := 0; i < 3; i++ {
req := state.SetRequest{
Key: randomKey(),
Value: randomJSON(),
}
setRequests = append(setRequests, req)
multiRequest = append(multiRequest, state.TransactionalRequest{
Operation: state.Upsert,
Request: req,
})
}
err := pgs.Multi(multiRequest)
assert.Nil(t, err)
for _, set := range setRequests {
assert.True(t, storeItemExists(t, set.Key))
deleteItem(t, pgs, set.Key, "")
}
}
func multiWithDeleteOnly(t *testing.T, pgs *PostgreSQL) {
var multiRequest []state.TransactionalRequest
var deleteRequests []state.DeleteRequest
for i := 0; i < 3; i++ {
req := state.DeleteRequest{Key: randomKey()}
// Add the item to the database
setItem(t, pgs, req.Key, randomJSON(), "") // Add the item to the database
// Add the item to a slice of delete requests
deleteRequests = append(deleteRequests, req)
// Add the item to the multi transaction request
multiRequest = append(multiRequest, state.TransactionalRequest{
Operation: state.Delete,
Request: req,
})
}
err := pgs.Multi(multiRequest)
assert.Nil(t, err)
for _, delete := range deleteRequests {
assert.False(t, storeItemExists(t, delete.Key))
}
}
func multiWithDeleteAndSet(t *testing.T, pgs *PostgreSQL) {
var multiRequest []state.TransactionalRequest
var deleteRequests []state.DeleteRequest
for i := 0; i < 3; i++ {
req := state.DeleteRequest{Key: randomKey()}
// Add the item to the database
setItem(t, pgs, req.Key, randomJSON(), "") // Add the item to the database
// Add the item to a slice of delete requests
deleteRequests = append(deleteRequests, req)
// Add the item to the multi transaction request
multiRequest = append(multiRequest, state.TransactionalRequest{
Operation: state.Delete,
Request: req,
})
}
// Create the set requests
var setRequests []state.SetRequest
for i := 0; i < 3; i++ {
req := state.SetRequest{
Key: randomKey(),
Value: randomJSON(),
}
setRequests = append(setRequests, req)
multiRequest = append(multiRequest, state.TransactionalRequest{
Operation: state.Upsert,
Request: req,
})
}
err := pgs.Multi(multiRequest)
assert.Nil(t, err)
for _, delete := range deleteRequests {
assert.False(t, storeItemExists(t, delete.Key))
}
for _, set := range setRequests {
assert.True(t, storeItemExists(t, set.Key))
deleteItem(t, pgs, set.Key, "")
}
}
func deleteWithInvalidEtagFails(t *testing.T, pgs *PostgreSQL) {
// Create new item
key := randomKey()
value := &fakeItem{Color: "mauve"}
setItem(t, pgs, key, value, "")
// Delete the item with a fake etag
deleteReq := &state.DeleteRequest{
Key: key,
ETag: "1234",
}
err := pgs.Delete(deleteReq)
assert.NotNil(t, err)
}
func deleteWithNoKeyFails(t *testing.T, pgs *PostgreSQL) {
deleteReq := &state.DeleteRequest{
Key: "",
}
err := pgs.Delete(deleteReq)
assert.NotNil(t, err)
}
// newItemWithEtagFails creates a new item and also supplies an ETag, which is invalid - expect failure
func newItemWithEtagFails(t *testing.T, pgs *PostgreSQL) {
value := &fakeItem{Color: "teal"}
invalidEtag := "12345"
setReq := &state.SetRequest{
Key: randomKey(),
ETag: invalidEtag,
Value: value,
}
err := pgs.Set(setReq)
assert.NotNil(t, err)
}
func updateWithOldEtagFails(t *testing.T, pgs *PostgreSQL) {
// Create and retrieve new item
key := randomKey()
value := &fakeItem{Color: "gray"}
setItem(t, pgs, key, value, "")
getResponse, _ := getItem(t, pgs, key)
assert.NotNil(t, getResponse.ETag)
originalEtag := getResponse.ETag
// Change the value and get the updated etag
newValue := &fakeItem{Color: "silver"}
setItem(t, pgs, key, newValue, originalEtag)
_, updatedItem := getItem(t, pgs, key)
assert.Equal(t, newValue, updatedItem)
// Update again with the original etag - expect udpate failure
newValue = &fakeItem{Color: "maroon"}
setReq := &state.SetRequest{
Key: key,
ETag: originalEtag,
Value: newValue,
}
err := pgs.Set(setReq)
assert.NotNil(t, err)
}
func updateAndDeleteWithEtagSucceeds(t *testing.T, pgs *PostgreSQL) {
// Create and retrieve new item
key := randomKey()
value := &fakeItem{Color: "hazel"}
setItem(t, pgs, key, value, "")
getResponse, _ := getItem(t, pgs, key)
assert.NotNil(t, getResponse.ETag)
// Change the value and compare
value.Color = "purple"
setItem(t, pgs, key, value, getResponse.ETag)
updateResponse, updatedItem := getItem(t, pgs, key)
assert.Equal(t, value, updatedItem)
// ETag should change when item is updated
assert.NotEqual(t, getResponse.ETag, updateResponse.ETag)
// Delete
deleteItem(t, pgs, key, updateResponse.ETag)
// Item is not in the data store
assert.False(t, storeItemExists(t, key))
}
// getItemThatDoesNotExist validates the behavior of retrieving an item that does not exist.
func getItemThatDoesNotExist(t *testing.T, pgs *PostgreSQL) {
key := randomKey()
response, outputObject := getItem(t, pgs, key)
assert.Nil(t, response.Data)
assert.Equal(t, "", outputObject.Color)
}
// getItemWithNoKey validates that attempting a Get operation without providing a key will return an error.
func getItemWithNoKey(t *testing.T, pgs *PostgreSQL) {
getReq := &state.GetRequest{
Key: "",
}
response, getErr := pgs.Get(getReq)
assert.NotNil(t, getErr)
assert.Nil(t, response)
}
// setUpdatesTheUpdatedateField proves that the updateddate is set for an update, and not set upon insert.
func setUpdatesTheUpdatedateField(t *testing.T, pgs *PostgreSQL) {
key := randomKey()
value := &fakeItem{Color: "orange"}
setItem(t, pgs, key, value, "")
// insertdate should have a value and updatedate should be nil
_, insertdate, updatedate := getRowData(t, key)
assert.NotNil(t, insertdate)
assert.Equal(t, "", updatedate.String)
// insertdate should not change, updatedate should have a value
value = &fakeItem{Color: "aqua"}
setItem(t, pgs, key, value, "")
_, newinsertdate, updatedate := getRowData(t, key)
assert.Equal(t, insertdate, newinsertdate) // The insertdate should not change.
assert.NotEqual(t, "", updatedate.String)
deleteItem(t, pgs, key, "")
}
func setItemWithNoKey(t *testing.T, pgs *PostgreSQL) {
setReq := &state.SetRequest{
Key: "",
}
err := pgs.Set(setReq)
assert.NotNil(t, err)
}
// Tests valid bulk sets and deletes
func testBulkSetAndBulkDelete(t *testing.T, pgs *PostgreSQL) {
setReq := []state.SetRequest{
{
Key: randomKey(),
Value: &fakeItem{Color: "blue"},
},
{
Key: randomKey(),
Value: &fakeItem{Color: "red"},
},
}
err := pgs.BulkSet(setReq)
assert.Nil(t, err)
assert.True(t, storeItemExists(t, setReq[0].Key))
assert.True(t, storeItemExists(t, setReq[1].Key))
deleteReq := []state.DeleteRequest{
{
Key: setReq[0].Key,
},
{
Key: setReq[1].Key,
},
}
err = pgs.BulkDelete(deleteReq)
assert.Nil(t, err)
assert.False(t, storeItemExists(t, setReq[0].Key))
assert.False(t, storeItemExists(t, setReq[1].Key))
}
// testInitConfiguration tests valid and invalid config settings
func testInitConfiguration(t *testing.T) {
logger := logger.NewLogger("test")
tests := []struct {
name string
props map[string]string
expectedErr string
}{
{
name: "Empty",
props: map[string]string{},
expectedErr: errMissingConnectionString,
},
{
name: "Valid connection string",
props: map[string]string{connectionStringKey: getConnectionString()},
expectedErr: "",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := NewPostgreSQLStateStore(logger)
defer p.Close()
metadata := state.Metadata{
Properties: tt.props,
}
err := p.Init(metadata)
if tt.expectedErr == "" {
assert.Nil(t, err)
} else {
assert.NotNil(t, err)
assert.Equal(t, err.Error(), tt.expectedErr)
}
})
}
}
func getConnectionString() string {
return os.Getenv(connectionStringEnvKey)
}
func setItem(t *testing.T, pgs *PostgreSQL, key string, value interface{}, etag string) {
setReq := &state.SetRequest{
Key: key,
ETag: etag,
Value: value,
}
err := pgs.Set(setReq)
assert.Nil(t, err)
itemExists := storeItemExists(t, key)
assert.True(t, itemExists)
}
func getItem(t *testing.T, pgs *PostgreSQL, key string) (*state.GetResponse, *fakeItem) {
getReq := &state.GetRequest{
Key: key,
Options: state.GetStateOption{},
}
response, getErr := pgs.Get(getReq)
assert.Nil(t, getErr)
assert.NotNil(t, response)
outputObject := &fakeItem{}
_ = json.Unmarshal(response.Data, outputObject)
return response, outputObject
}
func deleteItem(t *testing.T, pgs *PostgreSQL, key string, etag string) {
deleteReq := &state.DeleteRequest{
Key: key,
ETag: etag,
Options: state.DeleteStateOption{},
}
deleteErr := pgs.Delete(deleteReq)
assert.Nil(t, deleteErr)
assert.False(t, storeItemExists(t, key))
}
func storeItemExists(t *testing.T, key string) bool {
db, err := sql.Open("pgx", getConnectionString())
assert.Nil(t, err)
defer db.Close()
var exists bool = false
statement := fmt.Sprintf(`SELECT EXISTS (SELECT FROM %s WHERE key = $1)`, tableName)
err = db.QueryRow(statement, key).Scan(&exists)
assert.Nil(t, err)
return exists
}
func getRowData(t *testing.T, key string) (returnValue string, insertdate sql.NullString, updatedate sql.NullString) {
db, err := sql.Open("pgx", getConnectionString())
assert.Nil(t, err)
defer db.Close()
err = db.QueryRow(fmt.Sprintf("SELECT value, insertdate, updatedate FROM %s WHERE key = $1", tableName), key).Scan(&returnValue, &insertdate, &updatedate)
assert.Nil(t, err)
return returnValue, insertdate, updatedate
}
func randomKey() string {
return uuid.New().String()
}
func randomJSON() *fakeItem {
return &fakeItem{Color: randomKey()}
}

View File

@ -0,0 +1,178 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
package postgresql
import (
"testing"
"github.com/dapr/components-contrib/state"
"github.com/dapr/dapr/pkg/logger"
"github.com/stretchr/testify/assert"
)
const (
fakeConnectionString = "not a real connection"
)
// Fake implementation of interface postgressql.dbaccess
type fakeDBaccess struct {
logger logger.Logger
initExecuted bool
setExecuted bool
getExecuted bool
}
func (m *fakeDBaccess) Init(metadata state.Metadata) error {
m.initExecuted = true
return nil
}
func (m *fakeDBaccess) Set(req *state.SetRequest) error {
m.setExecuted = true
return nil
}
func (m *fakeDBaccess) Get(req *state.GetRequest) (*state.GetResponse, error) {
m.getExecuted = true
return nil, nil
}
func (m *fakeDBaccess) Delete(req *state.DeleteRequest) error {
return nil
}
func (m *fakeDBaccess) ExecuteMulti(sets []state.SetRequest, deletes []state.DeleteRequest) error {
return nil
}
func (m *fakeDBaccess) Close() error {
return nil
}
// Proves that the Init method runs the init method
func TestInitRunsDBAccessInit(t *testing.T) {
t.Parallel()
_, fake := createPostgreSQLWithFake(t)
assert.True(t, fake.initExecuted)
}
func TestMultiWithNoRequestsReturnsNil(t *testing.T) {
t.Parallel()
var multiRequest []state.TransactionalRequest
pgs := createPostgreSQL(t)
err := pgs.Multi(multiRequest)
assert.Nil(t, err)
}
func TestInvalidMultiAction(t *testing.T) {
t.Parallel()
var multiRequest []state.TransactionalRequest
multiRequest = append(multiRequest, state.TransactionalRequest{
Operation: "Something invalid",
Request: createSetRequest(),
})
pgs := createPostgreSQL(t)
err := pgs.Multi(multiRequest)
assert.NotNil(t, err)
}
func TestValidSetRequest(t *testing.T) {
t.Parallel()
var multiRequest []state.TransactionalRequest
multiRequest = append(multiRequest, state.TransactionalRequest{
Operation: state.Upsert,
Request: createSetRequest(),
})
pgs := createPostgreSQL(t)
err := pgs.Multi(multiRequest)
assert.Nil(t, err)
}
func TestInvalidMultiSetRequest(t *testing.T) {
t.Parallel()
var multiRequest []state.TransactionalRequest
multiRequest = append(multiRequest, state.TransactionalRequest{
Operation: state.Upsert,
Request: createDeleteRequest(), // Delete request is not valid for Upsert operation
})
pgs := createPostgreSQL(t)
err := pgs.Multi(multiRequest)
assert.NotNil(t, err)
}
func TestValidMultiDeleteRequest(t *testing.T) {
t.Parallel()
var multiRequest []state.TransactionalRequest
multiRequest = append(multiRequest, state.TransactionalRequest{
Operation: state.Delete,
Request: createDeleteRequest(),
})
pgs := createPostgreSQL(t)
err := pgs.Multi(multiRequest)
assert.Nil(t, err)
}
func TestInvalidMultiDeleteRequest(t *testing.T) {
t.Parallel()
var multiRequest []state.TransactionalRequest
multiRequest = append(multiRequest, state.TransactionalRequest{
Operation: state.Delete,
Request: createSetRequest(), // Set request is not valid for Delete operation
})
pgs := createPostgreSQL(t)
err := pgs.Multi(multiRequest)
assert.NotNil(t, err)
}
func createSetRequest() state.SetRequest {
return state.SetRequest{
Key: randomKey(),
Value: randomJSON(),
}
}
func createDeleteRequest() state.DeleteRequest {
return state.DeleteRequest{
Key: randomKey(),
}
}
func createPostgreSQLWithFake(t *testing.T) (*PostgreSQL, *fakeDBaccess) {
pgs := createPostgreSQL(t)
fake := pgs.dbaccess.(*fakeDBaccess)
return pgs, fake
}
func createPostgreSQL(t *testing.T) *PostgreSQL {
logger := logger.NewLogger("test")
dba := &fakeDBaccess{
logger: logger,
}
pgs := newPostgreSQLStateStore(logger, dba)
assert.NotNil(t, pgs)
metadata := &state.Metadata{
Properties: map[string]string{connectionStringKey: fakeConnectionString},
}
err := pgs.Init(*metadata)
assert.Nil(t, err)
assert.NotNil(t, pgs.dbaccess)
return pgs
}