We use analytics and cookies to understand site traffic. Information about your use of our site is shared with Google for that purpose. Learn more.
For up-to-date documentation, see the latest version.
Apache Kafka Source Example
Tutorial on how to build and deploy a KafkaSource
Eventing source using a Knative Serving Service
.
Prerequisites
- Ensure that you meet the prerequisites listed in the Apache Kafka overview.
- A Kubernetes cluster with Knative Kafka Source installed.
Apache Kafka Topic (Optional)
- If using Strimzi, you can set a topic modifying
source/kafka-topic.yaml
with your desired:
-
Topic
-
Cluster Name
-
Partitions
-
Replicas
apiVersion: kafka.strimzi.io/v1beta1 kind: KafkaTopic metadata: name: knative-demo-topic namespace: kafka labels: strimzi.io/cluster: my-cluster spec: partitions: 3 replicas: 1 config: retention.ms: 7200000 segment.bytes: 1073741824
-
Deploy the
KafkaTopic
$ kubectl apply -f strimzi-topic.yaml kafkatopic.kafka.strimzi.io/knative-demo-topic created
-
Ensure the
KafkaTopic
is running.$ kubectl -n kafka get kafkatopics.kafka.strimzi.io NAME AGE knative-demo-topic 16s
Create the Event Display service
-
Download a copy of the code:
git clone -b "{{< branch >}}" https://github.com/knative/docs knative-docs cd knative-docs/docs/eventing/samples/kafka/source
-
Build the Event Display Service (
event-display.yaml
)apiVersion: serving.knative.dev/v1 kind: Service metadata: name: event-display namespace: default spec: template: spec: containers: - # This corresponds to # https://github.com/knative/eventing-contrib/tree/master/cmd/event_display/main.go image: gcr.io/knative-releases/knative.dev/eventing-contrib/cmd/event_display
-
Deploy the Event Display Service
$ kubectl apply --filename event-display.yaml ... service.serving.knative.dev/event-display created
-
Ensure that the Service pod is running. The pod name will be prefixed with
event-display
.$ kubectl get pods NAME READY STATUS RESTARTS AGE event-display-00001-deployment-5d5df6c7-gv2j4 2/2 Running 0 72s ...
Apache Kafka Event Source
-
Modify
source/event-source.yaml
accordingly with bootstrap servers, topics, etc…:apiVersion: sources.knative.dev/v1beta1 kind: KafkaSource metadata: name: kafka-source spec: consumerGroup: knative-group bootstrapServers: - my-cluster-kafka-bootstrap.kafka:9092 # note the kafka namespace topics: - knative-demo-topic sink: ref: apiVersion: serving.knative.dev/v1 kind: Service name: event-display
-
Deploy the event source.
$ kubectl apply -f event-source.yaml ... kafkasource.sources.knative.dev/kafka-source created
-
Check that the event source pod is running. The pod name will be prefixed with
kafka-source
.$ kubectl get pods NAME READY STATUS RESTARTS AGE kafka-source-xlnhq-5544766765-dnl5s 1/1 Running 0 40m
-
Ensure the Apache Kafka Event Source started with the necessary configuration.
$ kubectl logs --selector='knative-eventing-source-name=kafka-source' {"level":"info","ts":"2020-05-28T10:39:42.104Z","caller":"adapter/adapter.go:81","msg":"Starting with config: ","Topics":".","ConsumerGroup":"...","SinkURI":"...","Name":".","Namespace":"."}
Verify
-
Produce a message (
{"msg": "This is a test!"}
) to the Apache Kafka topic, like shown below:kubectl -n kafka run kafka-producer -ti --image=strimzi/kafka:0.14.0-kafka-2.3.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list my-cluster-kafka-bootstrap:9092 --topic knative-demo-topic If you don't see a command prompt, try pressing enter. >{"msg": "This is a test!"}
-
Check that the Apache Kafka Event Source consumed the message and sent it to its sink properly. Since these logs are captured in debug level, edit the key
level
ofconfig-logging
configmap inknative-sources
namespace to look like this:data: loglevel.controller: info loglevel.webhook: info zap-logger-config: | { "level": "debug", "development": false, "outputPaths": ["stdout"], "errorOutputPaths": ["stderr"], "encoding": "json", "encoderConfig": { "timeKey": "ts", "levelKey": "level", "nameKey": "logger", "callerKey": "caller", "messageKey": "msg", "stacktraceKey": "stacktrace", "lineEnding": "", "levelEncoder": "", "timeEncoder": "iso8601", "durationEncoder": "", "callerEncoder": "" } }
Now manually delete the kafkasource deployment and allow the
kafka-controller-manager
deployment running inknative-sources
namespace to redeploy it. Debug level logs should be visible now.$ kubectl logs --selector='knative-eventing-source-name=kafka-source' ... {"level":"debug","ts":"2020-05-28T10:40:29.400Z","caller":"kafka/consumer_handler.go:77","msg":"Message claimed","topic":".","value":"."} {"level":"debug","ts":"2020-05-28T10:40:31.722Z","caller":"kafka/consumer_handler.go:89","msg":"Message marked","topic":".","value":"."}
-
Ensure the Event Display received the message sent to it by the Event Source.
$ kubectl logs --selector='serving.knative.dev/service=event-display' -c user-container ☁️ cloudevents.Event Validation: valid Context Attributes, specversion: 1.0 type: dev.knative.kafka.event source: /apis/v1/namespaces/default/kafkasources/kafka-source#my-topic subject: partition:0#564 id: partition:0/offset:564 time: 2020-02-10T18:10:23.861866615Z datacontenttype: application/json Extensions, key: Data, { "msg": "This is a test!" }
Teardown Steps
-
Remove the Apache Kafka Event Source
$ kubectl delete -f source/source.yaml kafkasource.sources.knative.dev "kafka-source" deleted
- Remove the Event Display
$ kubectl delete -f source/event-display.yaml service.serving.knative.dev "event-display" deleted
- Remove the Apache Kafka Event Controller
$ kubectl delete -f https://storage.googleapis.com/knative-releases/eventing-contrib/latest/kafka-source.yaml serviceaccount "kafka-controller-manager" deleted clusterrole.rbac.authorization.k8s.io "eventing-sources-kafka-controller" deleted clusterrolebinding.rbac.authorization.k8s.io "eventing-sources-kafka-controller" deleted customresourcedefinition.apiextensions.k8s.io "kafkasources.sources.knative.dev" deleted service "kafka-controller" deleted statefulset.apps "kafka-controller-manager" deleted
-
(Optional) Remove the Apache Kafka Topic
$ kubectl delete -f kafka-topic.yaml kafkatopic.kafka.strimzi.io "knative-demo-topic" deleted
(Optional) Specify the key deserializer
When KafkaSource
receives a message from Kafka, it dumps the key in the Event
extension called Key
and dumps Kafka message headers in the extensions
starting with kafkaheader
.
You can specify the key deserializer among four types:
string
(default) for UTF-8 encoded stringsint
for 32-bit & 64-bit signed integersfloat
for 32-bit & 64-bit floating pointsbyte-array
for a Base64 encoded byte array
To specify it, add the label kafkasources.sources.knative.dev/key-type
to the KafkaSource
definition like:
apiVersion: sources.knative.dev/v1beta1
kind: KafkaSource
metadata:
name: kafka-source
labels:
kafkasources.sources.knative.dev/key-type: int
spec:
consumerGroup: knative-group
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092 # note the kafka namespace
topics:
- knative-demo-topic
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: event-display
Connecting to a TLS enabled Kafka broker
The KafkaSource supports TLS and SASL authentication methods. For enabling TLS authentication, please have the below files
- CA Certificate
- Client Certificate and Key
KafkaSource expects these files to be in pem format, if it is in other format like jks, please convert to pem.
-
Create the certificate files as secrets in the namespace where KafkaSource is going to be set up
$ kubectl create secret generic cacert --from-file=caroot.pem secret/cacert created $ kubectl create secret tls kafka-secret --cert=certificate.pem --key=key.pem secret/key created
-
Apply the KafkaSource, change bootstrapServers and topics accordingly.
apiVersion: sources.knative.dev/v1beta1 kind: KafkaSource metadata: name: kafka-source-with-tls spec: net: tls: enable: true cert: secretKeyRef: key: tls.crt name: kafka-secret key: secretKeyRef: key: tls.key name: kafka-secret caCert: secretKeyRef: key: caroot.pem name: cacert consumerGroup: knative-group bootstrapServers: - my-secure-kafka-bootstrap.kafka:443 topics: - knative-demo-topic sink: ref: apiVersion: serving.knative.dev/v1 kind: Service name: event-display
Feedback
Was this page helpful?
Glad to hear it! Please tell us how we can improve.
Sorry to hear that. Please tell us how we can improve.