Implementation for Hazelcast based state store (#145)
* hazelcast store impl * go.mod update
This commit is contained in:
parent
9e78671fc0
commit
c4bd2a47ea
2
go.mod
2
go.mod
|
@ -20,6 +20,7 @@ require (
|
|||
github.com/Shopify/sarama v1.23.1
|
||||
github.com/Sirupsen/logrus v1.0.6
|
||||
github.com/a8m/documentdb v1.2.0
|
||||
github.com/apache/thrift v0.13.0 // indirect
|
||||
github.com/aws/aws-sdk-go v1.25.0
|
||||
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
|
||||
github.com/cenkalti/backoff v2.2.1+incompatible // indirect
|
||||
|
@ -40,6 +41,7 @@ require (
|
|||
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
|
||||
github.com/hashicorp/consul/api v1.2.0
|
||||
github.com/hashicorp/go-multierror v1.0.0
|
||||
github.com/hazelcast/hazelcast-go-client v0.0.0-20190530123621-6cf767c2f31a
|
||||
github.com/jonboulle/clockwork v0.1.0 // indirect
|
||||
github.com/joomcode/errorx v1.0.0 // indirect
|
||||
github.com/joomcode/redispipe v0.9.0
|
||||
|
|
4
go.sum
4
go.sum
|
@ -88,6 +88,8 @@ github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuy
|
|||
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
|
||||
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
|
||||
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
|
||||
github.com/apache/thrift v0.13.0 h1:5hryIiq9gtn+MiLVn0wP37kb/uTeRZgN08WoCsAhIhI=
|
||||
github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ=
|
||||
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
|
||||
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da h1:8GUt8eRujhVEGZFFEjBj46YV4rDjvGrNxb0KMWYkL2I=
|
||||
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
|
||||
|
@ -267,6 +269,8 @@ github.com/hashicorp/memberlist v0.1.3 h1:EmmoJme1matNzb+hMpDuR/0sbJSUisxyqBGG67
|
|||
github.com/hashicorp/memberlist v0.1.3/go.mod h1:ajVTdAv/9Im8oMAAj5G31PhhMCZJV2pPBoIllUwCN7I=
|
||||
github.com/hashicorp/serf v0.8.2 h1:YZ7UKsJv+hKjqGVUUbtE3HNj79Eln2oQ75tniF6iPt0=
|
||||
github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc=
|
||||
github.com/hazelcast/hazelcast-go-client v0.0.0-20190530123621-6cf767c2f31a h1:j6SSiw7fWemWfrJL801xiQ6xRT7ZImika50xvmPN+tg=
|
||||
github.com/hazelcast/hazelcast-go-client v0.0.0-20190530123621-6cf767c2f31a/go.mod h1:VhwtcZ7sg3xq7REqGzEy7ylSWGKz4jZd05eCJropNzI=
|
||||
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
|
||||
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
|
||||
github.com/imdario/mergo v0.3.5 h1:JboBksRwiiAJWvIYJVo46AfV+IAIKZpfrSzVKj42R4Q=
|
||||
|
|
|
@ -1,70 +1,70 @@
|
|||
package rabbitmq
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
"github.com/dapr/components-contrib/pubsub"
|
||||
)
|
||||
|
||||
type metadata struct {
|
||||
consumerID string
|
||||
host string
|
||||
durable bool
|
||||
deleteWhenUnused bool
|
||||
autoAck bool
|
||||
requeueInFailure bool
|
||||
deliveryMode uint8 // Transient (0 or 1) or Persistent (2)
|
||||
}
|
||||
|
||||
// createMetadata creates a new instance from the pubsub metadata
|
||||
func createMetadata(pubSubMetadata pubsub.Metadata) (*metadata, error) {
|
||||
result := metadata{deleteWhenUnused: true, autoAck: false}
|
||||
|
||||
if val, found := pubSubMetadata.Properties[metadataHostKey]; found && val != "" {
|
||||
result.host = val
|
||||
} else {
|
||||
return &result, fmt.Errorf("%s missing RabbitMQ host", errorMessagePrefix)
|
||||
}
|
||||
|
||||
if val, found := pubSubMetadata.Properties[metadataConsumerIDKey]; found && val != "" {
|
||||
result.consumerID = val
|
||||
} else {
|
||||
return &result, fmt.Errorf("%s missing RabbitMQ consumerID", errorMessagePrefix)
|
||||
}
|
||||
|
||||
if val, found := pubSubMetadata.Properties[metadataDeliveryModeKey]; found && val != "" {
|
||||
if intVal, err := strconv.Atoi(val); err == nil {
|
||||
if intVal < 0 || intVal > 2 {
|
||||
return &result, fmt.Errorf("%s invalid RabbitMQ delivery mode, accepted values are between 0 and 2", errorMessagePrefix)
|
||||
}
|
||||
result.deliveryMode = uint8(intVal)
|
||||
}
|
||||
}
|
||||
|
||||
if val, found := pubSubMetadata.Properties[metadataDurableKey]; found && val != "" {
|
||||
if boolVal, err := strconv.ParseBool(val); err == nil {
|
||||
result.durable = boolVal
|
||||
}
|
||||
}
|
||||
|
||||
if val, found := pubSubMetadata.Properties[metadataDeleteWhenUnusedKey]; found && val != "" {
|
||||
if boolVal, err := strconv.ParseBool(val); err == nil {
|
||||
result.deleteWhenUnused = boolVal
|
||||
}
|
||||
}
|
||||
|
||||
if val, found := pubSubMetadata.Properties[metadataAutoAckKey]; found && val != "" {
|
||||
if boolVal, err := strconv.ParseBool(val); err == nil {
|
||||
result.autoAck = boolVal
|
||||
}
|
||||
}
|
||||
|
||||
if val, found := pubSubMetadata.Properties[metadataRequeueInFailureKey]; found && val != "" {
|
||||
if boolVal, err := strconv.ParseBool(val); err == nil {
|
||||
result.requeueInFailure = boolVal
|
||||
}
|
||||
}
|
||||
|
||||
return &result, nil
|
||||
}
|
||||
package rabbitmq
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
"github.com/dapr/components-contrib/pubsub"
|
||||
)
|
||||
|
||||
type metadata struct {
|
||||
consumerID string
|
||||
host string
|
||||
durable bool
|
||||
deleteWhenUnused bool
|
||||
autoAck bool
|
||||
requeueInFailure bool
|
||||
deliveryMode uint8 // Transient (0 or 1) or Persistent (2)
|
||||
}
|
||||
|
||||
// createMetadata creates a new instance from the pubsub metadata
|
||||
func createMetadata(pubSubMetadata pubsub.Metadata) (*metadata, error) {
|
||||
result := metadata{deleteWhenUnused: true, autoAck: false}
|
||||
|
||||
if val, found := pubSubMetadata.Properties[metadataHostKey]; found && val != "" {
|
||||
result.host = val
|
||||
} else {
|
||||
return &result, fmt.Errorf("%s missing RabbitMQ host", errorMessagePrefix)
|
||||
}
|
||||
|
||||
if val, found := pubSubMetadata.Properties[metadataConsumerIDKey]; found && val != "" {
|
||||
result.consumerID = val
|
||||
} else {
|
||||
return &result, fmt.Errorf("%s missing RabbitMQ consumerID", errorMessagePrefix)
|
||||
}
|
||||
|
||||
if val, found := pubSubMetadata.Properties[metadataDeliveryModeKey]; found && val != "" {
|
||||
if intVal, err := strconv.Atoi(val); err == nil {
|
||||
if intVal < 0 || intVal > 2 {
|
||||
return &result, fmt.Errorf("%s invalid RabbitMQ delivery mode, accepted values are between 0 and 2", errorMessagePrefix)
|
||||
}
|
||||
result.deliveryMode = uint8(intVal)
|
||||
}
|
||||
}
|
||||
|
||||
if val, found := pubSubMetadata.Properties[metadataDurableKey]; found && val != "" {
|
||||
if boolVal, err := strconv.ParseBool(val); err == nil {
|
||||
result.durable = boolVal
|
||||
}
|
||||
}
|
||||
|
||||
if val, found := pubSubMetadata.Properties[metadataDeleteWhenUnusedKey]; found && val != "" {
|
||||
if boolVal, err := strconv.ParseBool(val); err == nil {
|
||||
result.deleteWhenUnused = boolVal
|
||||
}
|
||||
}
|
||||
|
||||
if val, found := pubSubMetadata.Properties[metadataAutoAckKey]; found && val != "" {
|
||||
if boolVal, err := strconv.ParseBool(val); err == nil {
|
||||
result.autoAck = boolVal
|
||||
}
|
||||
}
|
||||
|
||||
if val, found := pubSubMetadata.Properties[metadataRequeueInFailureKey]; found && val != "" {
|
||||
if boolVal, err := strconv.ParseBool(val); err == nil {
|
||||
result.requeueInFailure = boolVal
|
||||
}
|
||||
}
|
||||
|
||||
return &result, nil
|
||||
}
|
||||
|
|
|
@ -1,207 +1,207 @@
|
|||
package rabbitmq
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/dapr/components-contrib/pubsub"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func getFakeProperties() map[string]string {
|
||||
props := map[string]string{}
|
||||
props[metadataHostKey] = "fakehost"
|
||||
props[metadataConsumerIDKey] = "fakeConsumerID"
|
||||
|
||||
return props
|
||||
}
|
||||
|
||||
func TestCreateMetadata(t *testing.T) {
|
||||
|
||||
var booleanFlagTests = []struct {
|
||||
in string
|
||||
expected bool
|
||||
}{
|
||||
{"true", true},
|
||||
{"TRUE", true},
|
||||
{"false", false},
|
||||
{"FALSE", false},
|
||||
}
|
||||
|
||||
t.Run("metadata is correct", func(t *testing.T) {
|
||||
fakeProperties := getFakeProperties()
|
||||
|
||||
fakeMetaData := pubsub.Metadata{
|
||||
Properties: fakeProperties,
|
||||
}
|
||||
|
||||
// act
|
||||
m, err := createMetadata(fakeMetaData)
|
||||
|
||||
// assert
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, fakeProperties[metadataHostKey], m.host)
|
||||
assert.Equal(t, fakeProperties[metadataConsumerIDKey], m.consumerID)
|
||||
assert.Equal(t, false, m.durable)
|
||||
assert.Equal(t, false, m.autoAck)
|
||||
assert.Equal(t, false, m.requeueInFailure)
|
||||
assert.Equal(t, true, m.deleteWhenUnused)
|
||||
assert.Equal(t, uint8(0), m.deliveryMode)
|
||||
})
|
||||
|
||||
t.Run("host is not given", func(t *testing.T) {
|
||||
fakeProperties := getFakeProperties()
|
||||
|
||||
fakeMetaData := pubsub.Metadata{
|
||||
Properties: fakeProperties,
|
||||
}
|
||||
fakeMetaData.Properties[metadataHostKey] = ""
|
||||
|
||||
// act
|
||||
m, err := createMetadata(fakeMetaData)
|
||||
|
||||
// assert
|
||||
assert.EqualError(t, err, "rabbitmq pub/sub error: missing RabbitMQ host")
|
||||
assert.Empty(t, m.host)
|
||||
assert.Empty(t, m.consumerID)
|
||||
})
|
||||
|
||||
t.Run("consumerID is not given", func(t *testing.T) {
|
||||
fakeProperties := getFakeProperties()
|
||||
|
||||
fakeMetaData := pubsub.Metadata{
|
||||
Properties: fakeProperties,
|
||||
}
|
||||
fakeMetaData.Properties[metadataConsumerIDKey] = ""
|
||||
|
||||
// act
|
||||
m, err := createMetadata(fakeMetaData)
|
||||
|
||||
// assert
|
||||
assert.EqualError(t, err, "rabbitmq pub/sub error: missing RabbitMQ consumerID")
|
||||
assert.Equal(t, fakeProperties[metadataHostKey], m.host)
|
||||
assert.Equal(t, fakeProperties[metadataConsumerIDKey], m.consumerID)
|
||||
assert.Empty(t, m.consumerID)
|
||||
})
|
||||
|
||||
var invalidDeliveryModes = []string{"3", "10", "-1"}
|
||||
|
||||
for _, deliveryMode := range invalidDeliveryModes {
|
||||
t.Run(fmt.Sprintf("deliveryMode value=%s", deliveryMode), func(t *testing.T) {
|
||||
fakeProperties := getFakeProperties()
|
||||
|
||||
fakeMetaData := pubsub.Metadata{
|
||||
Properties: fakeProperties,
|
||||
}
|
||||
fakeMetaData.Properties[metadataDeliveryModeKey] = deliveryMode
|
||||
|
||||
// act
|
||||
m, err := createMetadata(fakeMetaData)
|
||||
|
||||
// assert
|
||||
assert.EqualError(t, err, "rabbitmq pub/sub error: invalid RabbitMQ delivery mode, accepted values are between 0 and 2")
|
||||
assert.Equal(t, fakeProperties[metadataHostKey], m.host)
|
||||
assert.Equal(t, fakeProperties[metadataConsumerIDKey], m.consumerID)
|
||||
assert.Equal(t, uint8(0), m.deliveryMode)
|
||||
})
|
||||
}
|
||||
|
||||
t.Run("deliveryMode is set", func(t *testing.T) {
|
||||
fakeProperties := getFakeProperties()
|
||||
|
||||
fakeMetaData := pubsub.Metadata{
|
||||
Properties: fakeProperties,
|
||||
}
|
||||
fakeMetaData.Properties[metadataDeliveryModeKey] = "2"
|
||||
|
||||
// act
|
||||
m, err := createMetadata(fakeMetaData)
|
||||
|
||||
// assert
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, fakeProperties[metadataHostKey], m.host)
|
||||
assert.Equal(t, fakeProperties[metadataConsumerIDKey], m.consumerID)
|
||||
assert.Equal(t, uint8(2), m.deliveryMode)
|
||||
})
|
||||
|
||||
for _, tt := range booleanFlagTests {
|
||||
|
||||
t.Run(fmt.Sprintf("autoAck value=%s", tt.in), func(t *testing.T) {
|
||||
fakeProperties := getFakeProperties()
|
||||
|
||||
fakeMetaData := pubsub.Metadata{
|
||||
Properties: fakeProperties,
|
||||
}
|
||||
fakeMetaData.Properties[metadataAutoAckKey] = tt.in
|
||||
|
||||
// act
|
||||
m, err := createMetadata(fakeMetaData)
|
||||
|
||||
// assert
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, fakeProperties[metadataHostKey], m.host)
|
||||
assert.Equal(t, fakeProperties[metadataConsumerIDKey], m.consumerID)
|
||||
assert.Equal(t, tt.expected, m.autoAck)
|
||||
})
|
||||
}
|
||||
|
||||
for _, tt := range booleanFlagTests {
|
||||
t.Run(fmt.Sprintf("requeueInFailure value=%s", tt.in), func(t *testing.T) {
|
||||
fakeProperties := getFakeProperties()
|
||||
|
||||
fakeMetaData := pubsub.Metadata{
|
||||
Properties: fakeProperties,
|
||||
}
|
||||
fakeMetaData.Properties[metadataRequeueInFailureKey] = tt.in
|
||||
|
||||
// act
|
||||
m, err := createMetadata(fakeMetaData)
|
||||
|
||||
// assert
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, fakeProperties[metadataHostKey], m.host)
|
||||
assert.Equal(t, fakeProperties[metadataConsumerIDKey], m.consumerID)
|
||||
assert.Equal(t, tt.expected, m.requeueInFailure)
|
||||
})
|
||||
}
|
||||
|
||||
for _, tt := range booleanFlagTests {
|
||||
t.Run(fmt.Sprintf("deleteWhenUnused value=%s", tt.in), func(t *testing.T) {
|
||||
fakeProperties := getFakeProperties()
|
||||
|
||||
fakeMetaData := pubsub.Metadata{
|
||||
Properties: fakeProperties,
|
||||
}
|
||||
fakeMetaData.Properties[metadataDeleteWhenUnusedKey] = tt.in
|
||||
|
||||
// act
|
||||
m, err := createMetadata(fakeMetaData)
|
||||
|
||||
// assert
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, fakeProperties[metadataHostKey], m.host)
|
||||
assert.Equal(t, fakeProperties[metadataConsumerIDKey], m.consumerID)
|
||||
assert.Equal(t, tt.expected, m.deleteWhenUnused)
|
||||
})
|
||||
}
|
||||
|
||||
for _, tt := range booleanFlagTests {
|
||||
t.Run(fmt.Sprintf("durable value=%s", tt.in), func(t *testing.T) {
|
||||
fakeProperties := getFakeProperties()
|
||||
|
||||
fakeMetaData := pubsub.Metadata{
|
||||
Properties: fakeProperties,
|
||||
}
|
||||
fakeMetaData.Properties[metadataDurableKey] = tt.in
|
||||
|
||||
// act
|
||||
m, err := createMetadata(fakeMetaData)
|
||||
|
||||
// assert
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, fakeProperties[metadataHostKey], m.host)
|
||||
assert.Equal(t, fakeProperties[metadataConsumerIDKey], m.consumerID)
|
||||
assert.Equal(t, tt.expected, m.durable)
|
||||
})
|
||||
}
|
||||
}
|
||||
package rabbitmq
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/dapr/components-contrib/pubsub"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func getFakeProperties() map[string]string {
|
||||
props := map[string]string{}
|
||||
props[metadataHostKey] = "fakehost"
|
||||
props[metadataConsumerIDKey] = "fakeConsumerID"
|
||||
|
||||
return props
|
||||
}
|
||||
|
||||
func TestCreateMetadata(t *testing.T) {
|
||||
|
||||
var booleanFlagTests = []struct {
|
||||
in string
|
||||
expected bool
|
||||
}{
|
||||
{"true", true},
|
||||
{"TRUE", true},
|
||||
{"false", false},
|
||||
{"FALSE", false},
|
||||
}
|
||||
|
||||
t.Run("metadata is correct", func(t *testing.T) {
|
||||
fakeProperties := getFakeProperties()
|
||||
|
||||
fakeMetaData := pubsub.Metadata{
|
||||
Properties: fakeProperties,
|
||||
}
|
||||
|
||||
// act
|
||||
m, err := createMetadata(fakeMetaData)
|
||||
|
||||
// assert
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, fakeProperties[metadataHostKey], m.host)
|
||||
assert.Equal(t, fakeProperties[metadataConsumerIDKey], m.consumerID)
|
||||
assert.Equal(t, false, m.durable)
|
||||
assert.Equal(t, false, m.autoAck)
|
||||
assert.Equal(t, false, m.requeueInFailure)
|
||||
assert.Equal(t, true, m.deleteWhenUnused)
|
||||
assert.Equal(t, uint8(0), m.deliveryMode)
|
||||
})
|
||||
|
||||
t.Run("host is not given", func(t *testing.T) {
|
||||
fakeProperties := getFakeProperties()
|
||||
|
||||
fakeMetaData := pubsub.Metadata{
|
||||
Properties: fakeProperties,
|
||||
}
|
||||
fakeMetaData.Properties[metadataHostKey] = ""
|
||||
|
||||
// act
|
||||
m, err := createMetadata(fakeMetaData)
|
||||
|
||||
// assert
|
||||
assert.EqualError(t, err, "rabbitmq pub/sub error: missing RabbitMQ host")
|
||||
assert.Empty(t, m.host)
|
||||
assert.Empty(t, m.consumerID)
|
||||
})
|
||||
|
||||
t.Run("consumerID is not given", func(t *testing.T) {
|
||||
fakeProperties := getFakeProperties()
|
||||
|
||||
fakeMetaData := pubsub.Metadata{
|
||||
Properties: fakeProperties,
|
||||
}
|
||||
fakeMetaData.Properties[metadataConsumerIDKey] = ""
|
||||
|
||||
// act
|
||||
m, err := createMetadata(fakeMetaData)
|
||||
|
||||
// assert
|
||||
assert.EqualError(t, err, "rabbitmq pub/sub error: missing RabbitMQ consumerID")
|
||||
assert.Equal(t, fakeProperties[metadataHostKey], m.host)
|
||||
assert.Equal(t, fakeProperties[metadataConsumerIDKey], m.consumerID)
|
||||
assert.Empty(t, m.consumerID)
|
||||
})
|
||||
|
||||
var invalidDeliveryModes = []string{"3", "10", "-1"}
|
||||
|
||||
for _, deliveryMode := range invalidDeliveryModes {
|
||||
t.Run(fmt.Sprintf("deliveryMode value=%s", deliveryMode), func(t *testing.T) {
|
||||
fakeProperties := getFakeProperties()
|
||||
|
||||
fakeMetaData := pubsub.Metadata{
|
||||
Properties: fakeProperties,
|
||||
}
|
||||
fakeMetaData.Properties[metadataDeliveryModeKey] = deliveryMode
|
||||
|
||||
// act
|
||||
m, err := createMetadata(fakeMetaData)
|
||||
|
||||
// assert
|
||||
assert.EqualError(t, err, "rabbitmq pub/sub error: invalid RabbitMQ delivery mode, accepted values are between 0 and 2")
|
||||
assert.Equal(t, fakeProperties[metadataHostKey], m.host)
|
||||
assert.Equal(t, fakeProperties[metadataConsumerIDKey], m.consumerID)
|
||||
assert.Equal(t, uint8(0), m.deliveryMode)
|
||||
})
|
||||
}
|
||||
|
||||
t.Run("deliveryMode is set", func(t *testing.T) {
|
||||
fakeProperties := getFakeProperties()
|
||||
|
||||
fakeMetaData := pubsub.Metadata{
|
||||
Properties: fakeProperties,
|
||||
}
|
||||
fakeMetaData.Properties[metadataDeliveryModeKey] = "2"
|
||||
|
||||
// act
|
||||
m, err := createMetadata(fakeMetaData)
|
||||
|
||||
// assert
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, fakeProperties[metadataHostKey], m.host)
|
||||
assert.Equal(t, fakeProperties[metadataConsumerIDKey], m.consumerID)
|
||||
assert.Equal(t, uint8(2), m.deliveryMode)
|
||||
})
|
||||
|
||||
for _, tt := range booleanFlagTests {
|
||||
|
||||
t.Run(fmt.Sprintf("autoAck value=%s", tt.in), func(t *testing.T) {
|
||||
fakeProperties := getFakeProperties()
|
||||
|
||||
fakeMetaData := pubsub.Metadata{
|
||||
Properties: fakeProperties,
|
||||
}
|
||||
fakeMetaData.Properties[metadataAutoAckKey] = tt.in
|
||||
|
||||
// act
|
||||
m, err := createMetadata(fakeMetaData)
|
||||
|
||||
// assert
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, fakeProperties[metadataHostKey], m.host)
|
||||
assert.Equal(t, fakeProperties[metadataConsumerIDKey], m.consumerID)
|
||||
assert.Equal(t, tt.expected, m.autoAck)
|
||||
})
|
||||
}
|
||||
|
||||
for _, tt := range booleanFlagTests {
|
||||
t.Run(fmt.Sprintf("requeueInFailure value=%s", tt.in), func(t *testing.T) {
|
||||
fakeProperties := getFakeProperties()
|
||||
|
||||
fakeMetaData := pubsub.Metadata{
|
||||
Properties: fakeProperties,
|
||||
}
|
||||
fakeMetaData.Properties[metadataRequeueInFailureKey] = tt.in
|
||||
|
||||
// act
|
||||
m, err := createMetadata(fakeMetaData)
|
||||
|
||||
// assert
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, fakeProperties[metadataHostKey], m.host)
|
||||
assert.Equal(t, fakeProperties[metadataConsumerIDKey], m.consumerID)
|
||||
assert.Equal(t, tt.expected, m.requeueInFailure)
|
||||
})
|
||||
}
|
||||
|
||||
for _, tt := range booleanFlagTests {
|
||||
t.Run(fmt.Sprintf("deleteWhenUnused value=%s", tt.in), func(t *testing.T) {
|
||||
fakeProperties := getFakeProperties()
|
||||
|
||||
fakeMetaData := pubsub.Metadata{
|
||||
Properties: fakeProperties,
|
||||
}
|
||||
fakeMetaData.Properties[metadataDeleteWhenUnusedKey] = tt.in
|
||||
|
||||
// act
|
||||
m, err := createMetadata(fakeMetaData)
|
||||
|
||||
// assert
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, fakeProperties[metadataHostKey], m.host)
|
||||
assert.Equal(t, fakeProperties[metadataConsumerIDKey], m.consumerID)
|
||||
assert.Equal(t, tt.expected, m.deleteWhenUnused)
|
||||
})
|
||||
}
|
||||
|
||||
for _, tt := range booleanFlagTests {
|
||||
t.Run(fmt.Sprintf("durable value=%s", tt.in), func(t *testing.T) {
|
||||
fakeProperties := getFakeProperties()
|
||||
|
||||
fakeMetaData := pubsub.Metadata{
|
||||
Properties: fakeProperties,
|
||||
}
|
||||
fakeMetaData.Properties[metadataDurableKey] = tt.in
|
||||
|
||||
// act
|
||||
m, err := createMetadata(fakeMetaData)
|
||||
|
||||
// assert
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, fakeProperties[metadataHostKey], m.host)
|
||||
assert.Equal(t, fakeProperties[metadataConsumerIDKey], m.consumerID)
|
||||
assert.Equal(t, tt.expected, m.durable)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,175 +1,175 @@
|
|||
package rabbitmq
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
log "github.com/Sirupsen/logrus"
|
||||
"github.com/dapr/components-contrib/pubsub"
|
||||
"github.com/streadway/amqp"
|
||||
)
|
||||
|
||||
const (
|
||||
fanoutExchangeKind = "fanout"
|
||||
logMessagePrefix = "rabbitmq pub/sub:"
|
||||
errorMessagePrefix = "rabbitmq pub/sub error:"
|
||||
|
||||
metadataHostKey = "host"
|
||||
metadataConsumerIDKey = "consumerID"
|
||||
metadataDurableKey = "durable"
|
||||
metadataDeleteWhenUnusedKey = "deletedWhenUnused"
|
||||
metadataAutoAckKey = "autoAck"
|
||||
metadataDeliveryModeKey = "deliveryMode"
|
||||
metadataRequeueInFailureKey = "requeueInFailure"
|
||||
)
|
||||
|
||||
// RabbitMQ allows sending/receiving messages in pub/sub format
|
||||
type rabbitMQ struct {
|
||||
connection *amqp.Connection
|
||||
channel *amqp.Channel
|
||||
metadata *metadata
|
||||
declaredExchanges map[string]bool
|
||||
}
|
||||
|
||||
// NewRabbitMQ creates a new RabbitMQ pub/sub
|
||||
func NewRabbitMQ() pubsub.PubSub {
|
||||
return createRabbitMQ()
|
||||
}
|
||||
|
||||
func createRabbitMQ() *rabbitMQ {
|
||||
return &rabbitMQ{declaredExchanges: make(map[string]bool)}
|
||||
}
|
||||
|
||||
// Init does metadata parsing and connection creation
|
||||
func (r *rabbitMQ) Init(metadata pubsub.Metadata) error {
|
||||
meta, err := createMetadata(metadata)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
r.metadata = meta
|
||||
|
||||
conn, err := amqp.Dial(meta.host)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ch, err := conn.Channel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
r.connection = conn
|
||||
r.channel = ch
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *rabbitMQ) Publish(req *pubsub.PublishRequest) error {
|
||||
err := r.ensureExchangeDeclared(req.Topic)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Debugf("%s publishing message to topic '%s'", logMessagePrefix, req.Topic)
|
||||
|
||||
err = r.channel.Publish(req.Topic, "", false, false, amqp.Publishing{
|
||||
ContentType: "text/plain",
|
||||
Body: req.Data,
|
||||
DeliveryMode: r.metadata.deliveryMode,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *rabbitMQ) Subscribe(req pubsub.SubscribeRequest, handler func(msg *pubsub.NewMessage) error) error {
|
||||
|
||||
err := r.ensureExchangeDeclared(req.Topic)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
queueName := fmt.Sprintf("%s-%s", r.metadata.consumerID, req.Topic)
|
||||
|
||||
log.Debugf("%s declaring queue '%s'", logMessagePrefix, queueName)
|
||||
q, err := r.channel.QueueDeclare(queueName, r.metadata.durable, r.metadata.deleteWhenUnused, true, false, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Debugf("%s binding queue '%s' to exchange '%s'", logMessagePrefix, q.Name, req.Topic)
|
||||
err = r.channel.QueueBind(q.Name, "", req.Topic, false, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
msgs, err := r.channel.Consume(
|
||||
q.Name,
|
||||
queueName, // consumerId
|
||||
r.metadata.autoAck, // autoAck
|
||||
!r.metadata.durable, // exclusive
|
||||
false, // noLocal
|
||||
false, // noWait
|
||||
nil,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
go r.listenMessages(msgs, req.Topic, handler)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *rabbitMQ) listenMessages(msgs <-chan amqp.Delivery, topic string, handler func(msg *pubsub.NewMessage) error) {
|
||||
for d := range msgs {
|
||||
r.handleMessage(d, topic, handler)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *rabbitMQ) handleMessage(d amqp.Delivery, topic string, handler func(msg *pubsub.NewMessage) error) {
|
||||
pubsubMsg := &pubsub.NewMessage{
|
||||
Data: d.Body,
|
||||
Topic: topic,
|
||||
}
|
||||
|
||||
err := handler(pubsubMsg)
|
||||
if err != nil {
|
||||
log.Errorf("%s error handling message from topic '%s', %s", logMessagePrefix, topic, err)
|
||||
}
|
||||
|
||||
// if message is not auto acked we need to ack/nack
|
||||
if !r.metadata.autoAck {
|
||||
if err != nil {
|
||||
requeue := r.metadata.requeueInFailure && !d.Redelivered
|
||||
|
||||
log.Debugf("%s nacking message '%s' from topic '%s', requeue=%t", logMessagePrefix, d.MessageId, topic, requeue)
|
||||
if err = r.channel.Nack(d.DeliveryTag, false, requeue); err != nil {
|
||||
log.Errorf("%s error nacking message '%s' from topic '%s', %s", logMessagePrefix, d.MessageId, topic, err)
|
||||
}
|
||||
} else {
|
||||
log.Debugf("%s acking message '%s' from topic '%s'", logMessagePrefix, d.MessageId, topic)
|
||||
if err = r.channel.Ack(d.DeliveryTag, false); err != nil {
|
||||
log.Errorf("%s error acking message '%s' from topic '%s', %s", logMessagePrefix, d.MessageId, topic, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *rabbitMQ) ensureExchangeDeclared(exchange string) error {
|
||||
|
||||
if _, exists := r.declaredExchanges[exchange]; !exists {
|
||||
log.Debugf("%s declaring exchange '%s' of kind '%s'", logMessagePrefix, exchange, fanoutExchangeKind)
|
||||
err := r.channel.ExchangeDeclare(exchange, fanoutExchangeKind, true, false, false, false, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
r.declaredExchanges[exchange] = true
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
package rabbitmq
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
log "github.com/Sirupsen/logrus"
|
||||
"github.com/dapr/components-contrib/pubsub"
|
||||
"github.com/streadway/amqp"
|
||||
)
|
||||
|
||||
const (
|
||||
fanoutExchangeKind = "fanout"
|
||||
logMessagePrefix = "rabbitmq pub/sub:"
|
||||
errorMessagePrefix = "rabbitmq pub/sub error:"
|
||||
|
||||
metadataHostKey = "host"
|
||||
metadataConsumerIDKey = "consumerID"
|
||||
metadataDurableKey = "durable"
|
||||
metadataDeleteWhenUnusedKey = "deletedWhenUnused"
|
||||
metadataAutoAckKey = "autoAck"
|
||||
metadataDeliveryModeKey = "deliveryMode"
|
||||
metadataRequeueInFailureKey = "requeueInFailure"
|
||||
)
|
||||
|
||||
// RabbitMQ allows sending/receiving messages in pub/sub format
|
||||
type rabbitMQ struct {
|
||||
connection *amqp.Connection
|
||||
channel *amqp.Channel
|
||||
metadata *metadata
|
||||
declaredExchanges map[string]bool
|
||||
}
|
||||
|
||||
// NewRabbitMQ creates a new RabbitMQ pub/sub
|
||||
func NewRabbitMQ() pubsub.PubSub {
|
||||
return createRabbitMQ()
|
||||
}
|
||||
|
||||
func createRabbitMQ() *rabbitMQ {
|
||||
return &rabbitMQ{declaredExchanges: make(map[string]bool)}
|
||||
}
|
||||
|
||||
// Init does metadata parsing and connection creation
|
||||
func (r *rabbitMQ) Init(metadata pubsub.Metadata) error {
|
||||
meta, err := createMetadata(metadata)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
r.metadata = meta
|
||||
|
||||
conn, err := amqp.Dial(meta.host)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ch, err := conn.Channel()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
r.connection = conn
|
||||
r.channel = ch
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *rabbitMQ) Publish(req *pubsub.PublishRequest) error {
|
||||
err := r.ensureExchangeDeclared(req.Topic)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Debugf("%s publishing message to topic '%s'", logMessagePrefix, req.Topic)
|
||||
|
||||
err = r.channel.Publish(req.Topic, "", false, false, amqp.Publishing{
|
||||
ContentType: "text/plain",
|
||||
Body: req.Data,
|
||||
DeliveryMode: r.metadata.deliveryMode,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *rabbitMQ) Subscribe(req pubsub.SubscribeRequest, handler func(msg *pubsub.NewMessage) error) error {
|
||||
|
||||
err := r.ensureExchangeDeclared(req.Topic)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
queueName := fmt.Sprintf("%s-%s", r.metadata.consumerID, req.Topic)
|
||||
|
||||
log.Debugf("%s declaring queue '%s'", logMessagePrefix, queueName)
|
||||
q, err := r.channel.QueueDeclare(queueName, r.metadata.durable, r.metadata.deleteWhenUnused, true, false, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Debugf("%s binding queue '%s' to exchange '%s'", logMessagePrefix, q.Name, req.Topic)
|
||||
err = r.channel.QueueBind(q.Name, "", req.Topic, false, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
msgs, err := r.channel.Consume(
|
||||
q.Name,
|
||||
queueName, // consumerId
|
||||
r.metadata.autoAck, // autoAck
|
||||
!r.metadata.durable, // exclusive
|
||||
false, // noLocal
|
||||
false, // noWait
|
||||
nil,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
go r.listenMessages(msgs, req.Topic, handler)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *rabbitMQ) listenMessages(msgs <-chan amqp.Delivery, topic string, handler func(msg *pubsub.NewMessage) error) {
|
||||
for d := range msgs {
|
||||
r.handleMessage(d, topic, handler)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *rabbitMQ) handleMessage(d amqp.Delivery, topic string, handler func(msg *pubsub.NewMessage) error) {
|
||||
pubsubMsg := &pubsub.NewMessage{
|
||||
Data: d.Body,
|
||||
Topic: topic,
|
||||
}
|
||||
|
||||
err := handler(pubsubMsg)
|
||||
if err != nil {
|
||||
log.Errorf("%s error handling message from topic '%s', %s", logMessagePrefix, topic, err)
|
||||
}
|
||||
|
||||
// if message is not auto acked we need to ack/nack
|
||||
if !r.metadata.autoAck {
|
||||
if err != nil {
|
||||
requeue := r.metadata.requeueInFailure && !d.Redelivered
|
||||
|
||||
log.Debugf("%s nacking message '%s' from topic '%s', requeue=%t", logMessagePrefix, d.MessageId, topic, requeue)
|
||||
if err = r.channel.Nack(d.DeliveryTag, false, requeue); err != nil {
|
||||
log.Errorf("%s error nacking message '%s' from topic '%s', %s", logMessagePrefix, d.MessageId, topic, err)
|
||||
}
|
||||
} else {
|
||||
log.Debugf("%s acking message '%s' from topic '%s'", logMessagePrefix, d.MessageId, topic)
|
||||
if err = r.channel.Ack(d.DeliveryTag, false); err != nil {
|
||||
log.Errorf("%s error acking message '%s' from topic '%s', %s", logMessagePrefix, d.MessageId, topic, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *rabbitMQ) ensureExchangeDeclared(exchange string) error {
|
||||
|
||||
if _, exists := r.declaredExchanges[exchange]; !exists {
|
||||
log.Debugf("%s declaring exchange '%s' of kind '%s'", logMessagePrefix, exchange, fanoutExchangeKind)
|
||||
err := r.channel.ExchangeDeclare(exchange, fanoutExchangeKind, true, false, false, false, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
r.declaredExchanges[exchange] = true
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -1,45 +1,45 @@
|
|||
package rabbitmq
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/dapr/components-contrib/pubsub"
|
||||
"github.com/streadway/amqp"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func createAmqpMessage(body string) amqp.Delivery {
|
||||
return amqp.Delivery{Body: []byte(body)}
|
||||
}
|
||||
|
||||
func TestProcessSubscriberMessage(t *testing.T) {
|
||||
testMetadata := &metadata{autoAck: true}
|
||||
testRabbitMQSubscriber := createRabbitMQ()
|
||||
testRabbitMQSubscriber.metadata = testMetadata
|
||||
|
||||
const topic = "testTopic"
|
||||
|
||||
ch := make(chan amqp.Delivery)
|
||||
defer close(ch)
|
||||
|
||||
messageCount := 0
|
||||
|
||||
fakeHandler := func(msg *pubsub.NewMessage) error {
|
||||
messageCount++
|
||||
|
||||
assert.Equal(t, topic, msg.Topic)
|
||||
assert.NotNil(t, msg.Data)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
go testRabbitMQSubscriber.listenMessages(ch, topic, fakeHandler)
|
||||
assert.Equal(t, messageCount, 0)
|
||||
ch <- createAmqpMessage("{ \"msg\": \"1\"}")
|
||||
ch <- createAmqpMessage("{ \"msg\": \"2\"}")
|
||||
assert.GreaterOrEqual(t, messageCount, 1)
|
||||
assert.LessOrEqual(t, messageCount, 2)
|
||||
ch <- createAmqpMessage("{ \"msg\": \"3\"}")
|
||||
assert.GreaterOrEqual(t, messageCount, 2)
|
||||
assert.LessOrEqual(t, messageCount, 3)
|
||||
}
|
||||
package rabbitmq
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/dapr/components-contrib/pubsub"
|
||||
"github.com/streadway/amqp"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func createAmqpMessage(body string) amqp.Delivery {
|
||||
return amqp.Delivery{Body: []byte(body)}
|
||||
}
|
||||
|
||||
func TestProcessSubscriberMessage(t *testing.T) {
|
||||
testMetadata := &metadata{autoAck: true}
|
||||
testRabbitMQSubscriber := createRabbitMQ()
|
||||
testRabbitMQSubscriber.metadata = testMetadata
|
||||
|
||||
const topic = "testTopic"
|
||||
|
||||
ch := make(chan amqp.Delivery)
|
||||
defer close(ch)
|
||||
|
||||
messageCount := 0
|
||||
|
||||
fakeHandler := func(msg *pubsub.NewMessage) error {
|
||||
messageCount++
|
||||
|
||||
assert.Equal(t, topic, msg.Topic)
|
||||
assert.NotNil(t, msg.Data)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
go testRabbitMQSubscriber.listenMessages(ch, topic, fakeHandler)
|
||||
assert.Equal(t, messageCount, 0)
|
||||
ch <- createAmqpMessage("{ \"msg\": \"1\"}")
|
||||
ch <- createAmqpMessage("{ \"msg\": \"2\"}")
|
||||
assert.GreaterOrEqual(t, messageCount, 1)
|
||||
assert.LessOrEqual(t, messageCount, 2)
|
||||
ch <- createAmqpMessage("{ \"msg\": \"3\"}")
|
||||
assert.GreaterOrEqual(t, messageCount, 2)
|
||||
assert.LessOrEqual(t, messageCount, 3)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,144 @@
|
|||
package hazelcast
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/hazelcast/hazelcast-go-client/core"
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
|
||||
"github.com/dapr/components-contrib/state"
|
||||
"github.com/hazelcast/hazelcast-go-client"
|
||||
)
|
||||
|
||||
const (
|
||||
hazelcastServers = "hazelcastServers"
|
||||
hazelcastMap = "hazelcastMap"
|
||||
)
|
||||
|
||||
//Hazelcast state store
|
||||
type Hazelcast struct {
|
||||
hzMap core.Map
|
||||
json jsoniter.API
|
||||
}
|
||||
|
||||
// NewHazelcastStore returns a new hazelcast backed state store
|
||||
func NewHazelcastStore() *Hazelcast {
|
||||
return &Hazelcast{json: jsoniter.ConfigFastest}
|
||||
}
|
||||
|
||||
func validateMetadata(metadata state.Metadata) error {
|
||||
if metadata.Properties[hazelcastServers] == "" {
|
||||
return errors.New("hazelcast error: missing hazelcast servers")
|
||||
}
|
||||
if metadata.Properties[hazelcastMap] == "" {
|
||||
return errors.New("hazelcast error: missing hazelcast map name")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Init does metadata and connection parsing
|
||||
func (store *Hazelcast) Init(metadata state.Metadata) error {
|
||||
err := validateMetadata(metadata)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
servers := metadata.Properties[hazelcastServers]
|
||||
|
||||
hzConfig := hazelcast.NewConfig()
|
||||
hzConfig.NetworkConfig().AddAddress(strings.Split(servers, ",")...)
|
||||
|
||||
client, err := hazelcast.NewClientWithConfig(hzConfig)
|
||||
if err != nil {
|
||||
return fmt.Errorf("hazelcast error: %v", err)
|
||||
}
|
||||
store.hzMap, err = client.GetMap(metadata.Properties[hazelcastMap])
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("hazelcast error: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
//Set stores value for a key to Hazelcast
|
||||
func (store *Hazelcast) Set(req *state.SetRequest) error {
|
||||
err := state.CheckSetRequestOptions(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var value string
|
||||
b, ok := req.Value.([]byte)
|
||||
if ok {
|
||||
value = string(b)
|
||||
} else {
|
||||
value, err = store.json.MarshalToString(req.Value)
|
||||
if err != nil {
|
||||
return fmt.Errorf("hazelcast error: failed to set key %s: %s", req.Key, err)
|
||||
}
|
||||
}
|
||||
_, err = store.hzMap.Put(req.Key, value)
|
||||
|
||||
if err != nil {
|
||||
return fmt.Errorf("hazelcast error: failed to set key %s: %s", req.Key, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// BulkSet performs a bulks save operation
|
||||
func (store *Hazelcast) BulkSet(req []state.SetRequest) error {
|
||||
for _, s := range req {
|
||||
err := store.Set(&s)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get retrieves state from Hazelcast with a key
|
||||
func (store *Hazelcast) Get(req *state.GetRequest) (*state.GetResponse, error) {
|
||||
resp, err := store.hzMap.Get(req.Key)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("hazelcast error: failed to get value for %s: %s", req.Key, err)
|
||||
}
|
||||
if resp == nil {
|
||||
return nil, fmt.Errorf("hazelcast error: key %s does not exist in store", req.Key)
|
||||
}
|
||||
value, err := store.json.Marshal(&resp)
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("hazelcast error: %v", err)
|
||||
}
|
||||
|
||||
return &state.GetResponse{
|
||||
Data: value,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Delete performs a delete operation
|
||||
func (store *Hazelcast) Delete(req *state.DeleteRequest) error {
|
||||
err := state.CheckDeleteRequestOptions(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = store.hzMap.Delete(req.Key)
|
||||
if err != nil {
|
||||
return fmt.Errorf("hazelcast error: failed to delete key - %s", req.Key)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// BulkDelete performs a bulk delete operation
|
||||
func (store *Hazelcast) BulkDelete(req []state.DeleteRequest) error {
|
||||
for _, re := range req {
|
||||
err := store.Delete(&re)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,53 @@
|
|||
package hazelcast
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/dapr/components-contrib/state"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestValidateMetadata(t *testing.T) {
|
||||
t.Run("without required configuration", func(t *testing.T) {
|
||||
properties := map[string]string{}
|
||||
m := state.Metadata{
|
||||
Properties: properties,
|
||||
}
|
||||
err := validateMetadata(m)
|
||||
assert.NotNil(t, err)
|
||||
})
|
||||
|
||||
t.Run("without server configuration", func(t *testing.T) {
|
||||
properties := map[string]string{
|
||||
"hazelcastMap": "foo-map",
|
||||
}
|
||||
m := state.Metadata{
|
||||
Properties: properties,
|
||||
}
|
||||
err := validateMetadata(m)
|
||||
assert.NotNil(t, err)
|
||||
})
|
||||
|
||||
t.Run("without map configuration", func(t *testing.T) {
|
||||
properties := map[string]string{
|
||||
"hazelcastServers": "hz1:5701",
|
||||
}
|
||||
m := state.Metadata{
|
||||
Properties: properties,
|
||||
}
|
||||
err := validateMetadata(m)
|
||||
assert.NotNil(t, err)
|
||||
})
|
||||
|
||||
t.Run("with valid configuration", func(t *testing.T) {
|
||||
properties := map[string]string{
|
||||
"hazelcastServers": "hz1:5701",
|
||||
"hazelcastMap": "foo-map",
|
||||
}
|
||||
m := state.Metadata{
|
||||
Properties: properties,
|
||||
}
|
||||
err := validateMetadata(m)
|
||||
assert.Nil(t, err)
|
||||
})
|
||||
}
|
Loading…
Reference in New Issue