Updated Kafka binding component to add SASL authentication (#77)

* updated implementation

* removed InsecureSkipVerify
This commit is contained in:
Abhishek Gupta 2019-10-30 12:10:30 +05:30 committed by Yaron Schneider
parent b2c37fba02
commit 9f90855bf7
2 changed files with 299 additions and 10 deletions

View File

@ -7,8 +7,11 @@ package kafka
import (
"context"
"crypto/tls"
"errors"
"os"
"os/signal"
"strconv"
"strings"
"sync"
"syscall"
@ -25,6 +28,9 @@ type Kafka struct {
consumerGroup string
brokers []string
publishTopic string
authRequired bool
saslUsername string
saslPassword string
}
type kafkaMetadata struct {
@ -32,6 +38,9 @@ type kafkaMetadata struct {
Topics []string `json:"topics"`
PublishTopic string `json:"publishTopic"`
ConsumerGroup string `json:"consumerGroup"`
AuthRequired bool `json:"authRequired"`
SaslUsername string `json:"saslUsername"`
SaslPassword string `json:"saslPassword"`
}
type consumer struct {
@ -80,6 +89,13 @@ func (k *Kafka) Init(metadata bindings.Metadata) error {
k.topics = meta.Topics
k.publishTopic = meta.PublishTopic
k.consumerGroup = meta.ConsumerGroup
k.authRequired = meta.AuthRequired
//ignore SASL properties if authRequired is false
if meta.AuthRequired {
k.saslUsername = meta.SaslUsername
k.saslPassword = meta.SaslPassword
}
return nil
}
@ -107,6 +123,36 @@ func (k *Kafka) getKafkaMetadata(metadata bindings.Metadata) (*kafkaMetadata, er
if val, ok := metadata.Properties["topics"]; ok && val != "" {
meta.Topics = strings.Split(val, ",")
}
val, ok := metadata.Properties["authRequired"]
if !ok {
return nil, errors.New("kafka error: missing 'authRequired' attribute")
}
if val == "" {
return nil, errors.New("kafka error: 'authRequired' attribute was empty")
}
validAuthRequired, err := strconv.ParseBool(val)
if err != nil {
return nil, errors.New("kafka error: invalid value for 'authRequired' attribute")
}
meta.AuthRequired = validAuthRequired
//ignore SASL properties if authRequired is false
if meta.AuthRequired {
if val, ok := metadata.Properties["saslUsername"]; ok && val != "" {
meta.SaslUsername = val
} else {
return nil, errors.New("kafka error: missing SASL Username")
}
if val, ok := metadata.Properties["saslPassword"]; ok && val != "" {
meta.SaslPassword = val
} else {
return nil, errors.New("kafka error: missing SASL Password")
}
}
return &meta, nil
}
@ -115,6 +161,12 @@ func (k *Kafka) getSyncProducer(meta *kafkaMetadata) (sarama.SyncProducer, error
config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.Retry.Max = 5
config.Producer.Return.Successes = true
config.Version = sarama.V1_0_0_0
//ignore SASL properties if authRequired is false
if meta.AuthRequired {
updateAuthInfo(config, meta.SaslUsername, meta.SaslPassword)
}
producer, err := sarama.NewSyncProducer(meta.Brokers, config)
if err != nil {
@ -126,7 +178,10 @@ func (k *Kafka) getSyncProducer(meta *kafkaMetadata) (sarama.SyncProducer, error
func (k *Kafka) Read(handler func(*bindings.ReadResponse) error) error {
config := sarama.NewConfig()
config.Version = sarama.V1_0_0_0
//ignore SASL properties if authRequired is false
if k.authRequired {
updateAuthInfo(config, k.saslUsername, k.saslPassword)
}
c := consumer{
callback: handler,
ready: make(chan bool),
@ -171,3 +226,15 @@ func (k *Kafka) Read(handler func(*bindings.ReadResponse) error) error {
func (consumer *consumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
func updateAuthInfo(config *sarama.Config, saslUsername, saslPassword string) {
config.Net.SASL.Enable = true
config.Net.SASL.User = saslUsername
config.Net.SASL.Password = saslPassword
config.Net.SASL.Mechanism = sarama.SASLTypePlaintext
config.Net.TLS.Enable = true
config.Net.TLS.Config = &tls.Config{
//InsecureSkipVerify: true,
ClientAuth: 0,
}
}

View File

@ -6,6 +6,7 @@
package kafka
import (
"errors"
"testing"
"github.com/dapr/components-contrib/bindings"
@ -13,13 +14,234 @@ import (
)
func TestParseMetadata(t *testing.T) {
m := bindings.Metadata{}
m.Properties = map[string]string{"consumerGroup": "a", "publishTopic": "a", "brokers": "a", "topics": "a"}
k := Kafka{}
meta, err := k.getKafkaMetadata(m)
assert.Nil(t, err)
assert.Equal(t, "a", meta.Brokers[0])
assert.Equal(t, "a", meta.ConsumerGroup)
assert.Equal(t, "a", meta.PublishTopic)
assert.Equal(t, "a", meta.Topics[0])
t.Run("correct metadata (authRequired false)", func(t *testing.T) {
m := bindings.Metadata{}
m.Properties = map[string]string{"consumerGroup": "a", "publishTopic": "a", "brokers": "a", "topics": "a", "authRequired": "false"}
k := Kafka{}
meta, err := k.getKafkaMetadata(m)
assert.Nil(t, err)
assert.Equal(t, "a", meta.Brokers[0])
assert.Equal(t, "a", meta.ConsumerGroup)
assert.Equal(t, "a", meta.PublishTopic)
assert.Equal(t, "a", meta.Topics[0])
assert.False(t, meta.AuthRequired)
})
t.Run("correct metadata (authRequired FALSE)", func(t *testing.T) {
m := bindings.Metadata{}
m.Properties = map[string]string{"consumerGroup": "a", "publishTopic": "a", "brokers": "a", "topics": "a", "authRequired": "FALSE"}
k := Kafka{}
meta, err := k.getKafkaMetadata(m)
assert.Nil(t, err)
assert.Equal(t, "a", meta.Brokers[0])
assert.Equal(t, "a", meta.ConsumerGroup)
assert.Equal(t, "a", meta.PublishTopic)
assert.Equal(t, "a", meta.Topics[0])
assert.False(t, meta.AuthRequired)
})
t.Run("correct metadata (authRequired False)", func(t *testing.T) {
m := bindings.Metadata{}
m.Properties = map[string]string{"consumerGroup": "a", "publishTopic": "a", "brokers": "a", "topics": "a", "authRequired": "False"}
k := Kafka{}
meta, err := k.getKafkaMetadata(m)
assert.Nil(t, err)
assert.Equal(t, "a", meta.Brokers[0])
assert.Equal(t, "a", meta.ConsumerGroup)
assert.Equal(t, "a", meta.PublishTopic)
assert.Equal(t, "a", meta.Topics[0])
assert.False(t, meta.AuthRequired)
})
t.Run("correct metadata (authRequired F)", func(t *testing.T) {
m := bindings.Metadata{}
m.Properties = map[string]string{"consumerGroup": "a", "publishTopic": "a", "brokers": "a", "topics": "a", "authRequired": "F"}
k := Kafka{}
meta, err := k.getKafkaMetadata(m)
assert.Nil(t, err)
assert.Equal(t, "a", meta.Brokers[0])
assert.Equal(t, "a", meta.ConsumerGroup)
assert.Equal(t, "a", meta.PublishTopic)
assert.Equal(t, "a", meta.Topics[0])
assert.False(t, meta.AuthRequired)
})
t.Run("correct metadata (authRequired f)", func(t *testing.T) {
m := bindings.Metadata{}
m.Properties = map[string]string{"consumerGroup": "a", "publishTopic": "a", "brokers": "a", "topics": "a", "authRequired": "f"}
k := Kafka{}
meta, err := k.getKafkaMetadata(m)
assert.Nil(t, err)
assert.Equal(t, "a", meta.Brokers[0])
assert.Equal(t, "a", meta.ConsumerGroup)
assert.Equal(t, "a", meta.PublishTopic)
assert.Equal(t, "a", meta.Topics[0])
assert.False(t, meta.AuthRequired)
})
t.Run("correct metadata (authRequired 0)", func(t *testing.T) {
m := bindings.Metadata{}
m.Properties = map[string]string{"consumerGroup": "a", "publishTopic": "a", "brokers": "a", "topics": "a", "authRequired": "0"}
k := Kafka{}
meta, err := k.getKafkaMetadata(m)
assert.Nil(t, err)
assert.Equal(t, "a", meta.Brokers[0])
assert.Equal(t, "a", meta.ConsumerGroup)
assert.Equal(t, "a", meta.PublishTopic)
assert.Equal(t, "a", meta.Topics[0])
assert.False(t, meta.AuthRequired)
})
t.Run("correct metadata (authRequired F)", func(t *testing.T) {
m := bindings.Metadata{}
m.Properties = map[string]string{"consumerGroup": "a", "publishTopic": "a", "brokers": "a", "topics": "a", "authRequired": "F"}
k := Kafka{}
meta, err := k.getKafkaMetadata(m)
assert.Nil(t, err)
assert.Equal(t, "a", meta.Brokers[0])
assert.Equal(t, "a", meta.ConsumerGroup)
assert.Equal(t, "a", meta.PublishTopic)
assert.Equal(t, "a", meta.Topics[0])
assert.False(t, meta.AuthRequired)
})
t.Run("correct metadata (authRequired true)", func(t *testing.T) {
m := bindings.Metadata{}
m.Properties = map[string]string{"consumerGroup": "a", "publishTopic": "a", "brokers": "a", "topics": "a", "authRequired": "true", "saslUsername": "foo", "saslPassword": "bar"}
k := Kafka{}
meta, err := k.getKafkaMetadata(m)
assert.Nil(t, err)
assert.Equal(t, "a", meta.Brokers[0])
assert.Equal(t, "a", meta.ConsumerGroup)
assert.Equal(t, "a", meta.PublishTopic)
assert.Equal(t, "a", meta.Topics[0])
assert.True(t, meta.AuthRequired)
assert.Equal(t, "foo", meta.SaslUsername)
assert.Equal(t, "bar", meta.SaslPassword)
})
t.Run("correct metadata (authRequired TRUE)", func(t *testing.T) {
m := bindings.Metadata{}
m.Properties = map[string]string{"consumerGroup": "a", "publishTopic": "a", "brokers": "a", "topics": "a", "authRequired": "TRUE", "saslUsername": "foo", "saslPassword": "bar"}
k := Kafka{}
meta, err := k.getKafkaMetadata(m)
assert.Nil(t, err)
assert.Equal(t, "a", meta.Brokers[0])
assert.Equal(t, "a", meta.ConsumerGroup)
assert.Equal(t, "a", meta.PublishTopic)
assert.Equal(t, "a", meta.Topics[0])
assert.True(t, meta.AuthRequired)
assert.Equal(t, "foo", meta.SaslUsername)
assert.Equal(t, "bar", meta.SaslPassword)
})
t.Run("correct metadata (authRequired True)", func(t *testing.T) {
m := bindings.Metadata{}
m.Properties = map[string]string{"consumerGroup": "a", "publishTopic": "a", "brokers": "a", "topics": "a", "authRequired": "True", "saslUsername": "foo", "saslPassword": "bar"}
k := Kafka{}
meta, err := k.getKafkaMetadata(m)
assert.Nil(t, err)
assert.Equal(t, "a", meta.Brokers[0])
assert.Equal(t, "a", meta.ConsumerGroup)
assert.Equal(t, "a", meta.PublishTopic)
assert.Equal(t, "a", meta.Topics[0])
assert.True(t, meta.AuthRequired)
assert.Equal(t, "foo", meta.SaslUsername)
assert.Equal(t, "bar", meta.SaslPassword)
})
t.Run("correct metadata (authRequired T)", func(t *testing.T) {
m := bindings.Metadata{}
m.Properties = map[string]string{"consumerGroup": "a", "publishTopic": "a", "brokers": "a", "topics": "a", "authRequired": "T", "saslUsername": "foo", "saslPassword": "bar"}
k := Kafka{}
meta, err := k.getKafkaMetadata(m)
assert.Nil(t, err)
assert.Equal(t, "a", meta.Brokers[0])
assert.Equal(t, "a", meta.ConsumerGroup)
assert.Equal(t, "a", meta.PublishTopic)
assert.Equal(t, "a", meta.Topics[0])
assert.True(t, meta.AuthRequired)
assert.Equal(t, "foo", meta.SaslUsername)
assert.Equal(t, "bar", meta.SaslPassword)
})
t.Run("correct metadata (authRequired t)", func(t *testing.T) {
m := bindings.Metadata{}
m.Properties = map[string]string{"consumerGroup": "a", "publishTopic": "a", "brokers": "a", "topics": "a", "authRequired": "t", "saslUsername": "foo", "saslPassword": "bar"}
k := Kafka{}
meta, err := k.getKafkaMetadata(m)
assert.Nil(t, err)
assert.Equal(t, "a", meta.Brokers[0])
assert.Equal(t, "a", meta.ConsumerGroup)
assert.Equal(t, "a", meta.PublishTopic)
assert.Equal(t, "a", meta.Topics[0])
assert.True(t, meta.AuthRequired)
assert.Equal(t, "foo", meta.SaslUsername)
assert.Equal(t, "bar", meta.SaslPassword)
})
t.Run("correct metadata (authRequired 1)", func(t *testing.T) {
m := bindings.Metadata{}
m.Properties = map[string]string{"consumerGroup": "a", "publishTopic": "a", "brokers": "a", "topics": "a", "authRequired": "1", "saslUsername": "foo", "saslPassword": "bar"}
k := Kafka{}
meta, err := k.getKafkaMetadata(m)
assert.Nil(t, err)
assert.Equal(t, "a", meta.Brokers[0])
assert.Equal(t, "a", meta.ConsumerGroup)
assert.Equal(t, "a", meta.PublishTopic)
assert.Equal(t, "a", meta.Topics[0])
assert.True(t, meta.AuthRequired)
assert.Equal(t, "foo", meta.SaslUsername)
assert.Equal(t, "bar", meta.SaslPassword)
})
t.Run("missing authRequired", func(t *testing.T) {
m := bindings.Metadata{}
m.Properties = map[string]string{"consumerGroup": "a", "publishTopic": "a", "brokers": "a", "topics": "a"}
k := Kafka{}
meta, err := k.getKafkaMetadata(m)
assert.Error(t, errors.New("kafka error: missing 'authRequired' attribute"), err)
assert.Nil(t, meta)
})
t.Run("empty authRequired", func(t *testing.T) {
m := bindings.Metadata{}
m.Properties = map[string]string{"authRequired": "", "consumerGroup": "a", "publishTopic": "a", "brokers": "a", "topics": "a"}
k := Kafka{}
meta, err := k.getKafkaMetadata(m)
assert.Error(t, errors.New("kafka error: 'authRequired' attribute was empty"), err)
assert.Nil(t, meta)
})
t.Run("invalid authRequired", func(t *testing.T) {
m := bindings.Metadata{}
m.Properties = map[string]string{"authRequired": "not_sure", "consumerGroup": "a", "publishTopic": "a", "brokers": "a", "topics": "a"}
k := Kafka{}
meta, err := k.getKafkaMetadata(m)
assert.Error(t, errors.New("kafka error: invalid value for 'authRequired' attribute. use true or false"), err)
assert.Nil(t, meta)
})
t.Run("SASL username required if authRequired is true", func(t *testing.T) {
m := bindings.Metadata{}
m.Properties = map[string]string{"authRequired": "true", "saslPassword": "t0ps3cr3t", "consumerGroup": "a", "publishTopic": "a", "brokers": "a", "topics": "a"}
k := Kafka{}
meta, err := k.getKafkaMetadata(m)
assert.Error(t, errors.New("kafka error: missing SASL Username"), err)
assert.Nil(t, meta)
})
t.Run("SASL password required if authRequired is true", func(t *testing.T) {
m := bindings.Metadata{}
m.Properties = map[string]string{"authRequired": "true", "saslUsername": "foobar", "consumerGroup": "a", "publishTopic": "a", "brokers": "a", "topics": "a"}
k := Kafka{}
meta, err := k.getKafkaMetadata(m)
assert.Error(t, errors.New("kafka error: missing SASL Password"), err)
assert.Nil(t, meta)
})
}