Add Kafka Binding SASL Password cert test (#2831)

Signed-off-by: Bernd Verst <github@bernd.dev>
This commit is contained in:
Bernd Verst 2023-05-16 19:38:24 -05:00 committed by GitHub
parent 5118b73cfc
commit eabeadf044
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 136 additions and 18 deletions

View File

@ -0,0 +1,26 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: messagebus
spec:
type: bindings.kafka
metadata:
# Kafka broker connection setting
- name: brokers
value: localhost:9092
# consumer configuration: topic and consumer group
- name: topics
value: sample
- name: consumerGroup
value: group1
# publisher configuration: topic
- name: publishTopic
value: sample
- name: authRequired
value: "true"
- name: saslUsername
value: admin
- name: saslPassword
value: admin-secret
- name: disableTls
value: "true"

View File

@ -51,26 +51,27 @@ import (
)
const (
sidecarName1 = "dapr-1"
sidecarName2 = "dapr-2"
sidecarName3 = "dapr-3"
appID1 = "app-1"
appID2 = "app-2"
appID3 = "app-3"
clusterName = "kafkacertification"
dockerComposeYAML = "docker-compose.yml"
numMessages = 1000
appPort = 8000
portOffset = 2
messageKey = "partitionKey"
sidecarName1 = "dapr-1"
sidecarName2 = "dapr-2"
sidecarName3 = "dapr-3"
sidecarName4 = "dapr-4"
appID1 = "app-1"
appID2 = "app-2"
appID3 = "app-3"
appID4 = "app-4"
clusterName = "kafkacertification"
dockerComposeYAML = "docker-compose.yml"
dockerComposeYAMLSasl = "sasl-docker/docker-compose.yml"
numMessages = 1000
appPort = 8000
portOffset = 2
messageKey = "partitionKey"
bindingName = "messagebus"
topicName = "neworder"
)
var (
brokers = []string{"localhost:19092", "localhost:29092", "localhost:39092"}
)
var brokers = []string{"localhost:19092", "localhost:29092", "localhost:39092"}
func TestKafka_with_retry(t *testing.T) {
// For Kafka, we should ensure messages are received in order.
@ -152,6 +153,20 @@ func TestKafka_with_retry(t *testing.T) {
}
}
simpleSendTest := func(metadata map[string]string) flow.Runnable {
return func(ctx flow.Context) error {
client := sidecar.GetClient(ctx, sidecarName4)
err := client.InvokeOutputBinding(ctx, &dapr.InvokeBindingRequest{
Name: bindingName,
Operation: string(bindings.CreateOperation),
Data: []byte("sasl password auth test message"),
Metadata: metadata,
})
require.NoError(ctx, err, "error publishing message")
return nil
}
}
// sendMessagesInBackground and assertMessages are
// Runnables for testing publishing and consuming
// messages reliably when infrastructure and network
@ -245,7 +260,7 @@ func TestKafka_with_retry(t *testing.T) {
//
// Run the Dapr sidecar with the Kafka component.
Step(sidecar.Run(sidecarName1,
embedded.WithComponentsPath("./components/consumer1"),
embedded.WithResourcesPath("./components/consumer1"),
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort),
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort),
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort),
@ -258,7 +273,7 @@ func TestKafka_with_retry(t *testing.T) {
//
// Run the Dapr sidecar with the Kafka component.
Step(sidecar.Run(sidecarName2,
embedded.WithComponentsPath("./components/consumer2"),
embedded.WithResourcesPath("./components/consumer2"),
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+portOffset),
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+portOffset),
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+portOffset),
@ -276,7 +291,7 @@ func TestKafka_with_retry(t *testing.T) {
//
// Run the Dapr sidecar with the Kafka component.
Step(sidecar.Run(sidecarName3,
embedded.WithComponentsPath("./components/consumer2"),
embedded.WithResourcesPath("./components/consumer2"),
embedded.WithAppProtocol(runtime.HTTPProtocol, appPort+portOffset*2),
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+portOffset*2),
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+portOffset*2),
@ -340,6 +355,25 @@ func TestKafka_with_retry(t *testing.T) {
Step("wait", flow.Sleep(30*time.Second)).
Step("assert messages(consumer rebalance)", assertMessages(consumerGroup2)).
Run()
flow.New(t, "kafka with sals password auth - no tls - wurstmeister").
// Run Kafka using Docker Compose.
Step(dockercompose.Run(clusterName, dockerComposeYAMLSasl)).
Step("wait for broker sockets",
network.WaitForAddresses(5*time.Minute, "localhost:9092")).
Step("wait", flow.Sleep(20*time.Second)).
// Run the Dapr sidecar with the Kafka component.
Step(sidecar.Run(sidecarName4,
embedded.WithResourcesPath("./components/sasl-password"),
embedded.WithDaprGRPCPort(runtime.DefaultDaprAPIGRPCPort+5*portOffset),
embedded.WithDaprHTTPPort(runtime.DefaultDaprHTTPPort+5*portOffset),
embedded.WithoutApp(),
componentRuntimeOptions(),
)).
Step("simple send test", simpleSendTest(metadata)).
Step("wait", flow.Sleep(10*time.Second)).
Step("stop sidecar 1", sidecar.Stop(sidecarName4)).
Run()
}
func componentRuntimeOptions() []runtime.Option {

View File

@ -0,0 +1,42 @@
# ------------------------------------------------------------
# Copyright 2021 The Dapr Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ------------------------------------------------------------
version: '3.7'
services:
zookeeper:
image: wurstmeister/zookeeper:latest
ports:
- 2181:2181
environment:
JVMFLAGS: "-Djava.security.auth.login.config=/etc/zookeeper/zookeeper_jaas.conf"
volumes:
- ./zookeeper_jaas.conf:/etc/zookeeper/zookeeper_jaas.conf
kafka:
image: wurstmeister/kafka:latest
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
ALLOW_PLAINTEXT_LISTENER: 'yes'
KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
KAFKA_LISTENERS: EXT://:9092,INT://:9093
KAFKA_ADVERTISED_LISTENERS: EXT://localhost:9092,INT://kafka:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: EXT:SASL_PLAINTEXT,INT:SASL_PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INT
KAFKA_SASL_ENABLED_MECHANISMS: PLAIN
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/kafka_jaas.conf"
volumes:
- ./kafka_server_jaas.conf:/etc/kafka/kafka_jaas.conf

View File

@ -0,0 +1,12 @@
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret"
user_admin="admin-secret";
};
Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin-secret";
};

View File

@ -0,0 +1,4 @@
Server {
org.apache.zookeeper.server.auth.DigestLoginModule required
user_admin="admin-secret";
};