From ae4e13754b46c86f4aff55419f3fab06b2be2eea Mon Sep 17 00:00:00 2001 From: Patrick Assuied Date: Fri, 12 Jan 2024 11:25:17 -0800 Subject: [PATCH] Feature/add-kafka-pubsub-schema-registry (#3946) * Added doc for kafka pubsub Avro schema registry support Signed-off-by: Patrick Assuied * Update daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md Co-authored-by: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com> Signed-off-by: Patrick Assuied * Update daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md Co-authored-by: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com> Signed-off-by: Patrick Assuied * Update daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md Co-authored-by: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com> Signed-off-by: Patrick Assuied * Update daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md Co-authored-by: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com> Signed-off-by: Patrick Assuied * Update daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md Co-authored-by: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com> Signed-off-by: Patrick Assuied * Update daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md Co-authored-by: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com> Signed-off-by: Patrick Assuied * Update daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md Co-authored-by: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com> Signed-off-by: Patrick Assuied * Update daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md Co-authored-by: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com> Signed-off-by: Patrick Assuied * Update daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md Co-authored-by: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com> Signed-off-by: Patrick Assuied * Update daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md Co-authored-by: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com> Signed-off-by: Patrick Assuied * Update daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md Co-authored-by: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com> Signed-off-by: Patrick Assuied * Update daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md Co-authored-by: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com> Signed-off-by: Patrick Assuied * Update daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md Co-authored-by: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com> Signed-off-by: Patrick Assuied * Update setup-apache-kafka.md Signed-off-by: Patrick Assuied * Update daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md Co-authored-by: Mark Fussell Signed-off-by: Patrick Assuied --------- Signed-off-by: Patrick Assuied Co-authored-by: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com> Co-authored-by: Mark Fussell --- .../supported-bindings/kafka.md | 15 +++ .../supported-pubsub/setup-apache-kafka.md | 112 ++++++++++++++++++ 2 files changed, 127 insertions(+) diff --git a/daprdocs/content/en/reference/components-reference/supported-bindings/kafka.md b/daprdocs/content/en/reference/components-reference/supported-bindings/kafka.md index 43f030799..510e73c2d 100644 --- a/daprdocs/content/en/reference/components-reference/supported-bindings/kafka.md +++ b/daprdocs/content/en/reference/components-reference/supported-bindings/kafka.md @@ -49,6 +49,16 @@ spec: value: "2.0.0" - name: direction value: "input, output" + - name: schemaRegistryURL # Optional. When using Schema Registry Avro serialization/deserialization. The Schema Registry URL. + value: http://localhost:8081 + - name: schemaRegistryAPIKey # Optional. When using Schema Registry Avro serialization/deserialization. The Schema Registry API Key. + value: XYAXXAZ + - name: schemaRegistryAPISecret # Optional. When using Schema Registry Avro serialization/deserialization. The Schema Registry credentials API Secret. + value: "ABCDEFGMEADFF" + - name: schemaCachingEnabled # Optional. When using Schema Registry Avro serialization/deserialization. Enables caching for schemas. + value: true + - name: schemaLatestVersionCacheTTL # Optional. When using Schema Registry Avro serialization/deserialization. The TTL for schema caching when publishing a message with latest schema available. + value: 5m ``` ## Spec metadata fields @@ -75,6 +85,11 @@ spec: | `version` | N | Input/Output | Kafka cluster version. Defaults to 2.0.0. Please note that this needs to be mandatorily set to `1.0.0` for EventHubs with Kafka. | `"1.0.0"` | | `direction` | N | Input/Output | The direction of the binding. | `"input"`, `"output"`, `"input, output"` | | `oidcExtensions` | N | Input/Output | String containing a JSON-encoded dictionary of OAuth2/OIDC extensions to request with the access token | `{"cluster":"kafka","poolid":"kafkapool"}` | +| `schemaRegistryURL` | N | Required when using Schema Registry Avro serialization/deserialization. The Schema Registry URL. | `http://localhost:8081` | +| `schemaRegistryAPIKey` | N | When using Schema Registry Avro serialization/deserialization. The Schema Registry credentials API Key. | `XYAXXAZ` | +| `schemaRegistryAPISecret` | N | When using Schema Registry Avro serialization/deserialization. The Schema Registry credentials API Secret. | `ABCDEFGMEADFF` | +| `schemaCachingEnabled` | N | When using Schema Registry Avro serialization/deserialization. Enables caching for schemas. Default is `true` | `true` | +| `schemaLatestVersionCacheTTL` | N | When using Schema Registry Avro serialization/deserialization. The TTL for schema caching when publishing a message with latest schema available. Default is 5 min | `5m` | #### Note The metadata `version` must be set to `1.0.0` when using Azure EventHubs with Kafka. diff --git a/daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md b/daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md index 05ff83520..7c39f01af 100644 --- a/daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md +++ b/daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md @@ -49,6 +49,17 @@ spec: value: 2.0.0 - name: disableTls # Optional. Disable TLS. This is not safe for production!! You should read the `Mutual TLS` section for how to use TLS. value: "true" + - name: schemaRegistryURL # Optional. When using Schema Registry Avro serialization/deserialization. The Schema Registry URL. + value: http://localhost:8081 + - name: schemaRegistryAPIKey # Optional. When using Schema Registry Avro serialization/deserialization. The Schema Registry API Key. + value: XYAXXAZ + - name: schemaRegistryAPISecret # Optional. When using Schema Registry Avro serialization/deserialization. The Schema Registry credentials API Secret. + value: "ABCDEFGMEADFF" + - name: schemaCachingEnabled # Optional. When using Schema Registry Avro serialization/deserialization. Enables caching for schemas. + value: true + - name: schemaLatestVersionCacheTTL # Optional. When using Schema Registry Avro serialization/deserialization. The TTL for schema caching when publishing a message with latest schema available. + value: 5m + ``` > For details on using `secretKeyRef`, see the guide on [how to reference secrets in components]({{< ref component-secrets.md >}}). @@ -81,6 +92,11 @@ spec: | oidcClientSecret | N | The OAuth2 client secret that has been provisioned in the identity provider: Required when `authType` is set to `oidc` | `"KeFg23!"` | | oidcScopes | N | Comma-delimited list of OAuth2/OIDC scopes to request with the access token. Recommended when `authType` is set to `oidc`. Defaults to `"openid"` | `"openid,kafka-prod"` | | oidcExtensions | N | Input/Output | String containing a JSON-encoded dictionary of OAuth2/OIDC extensions to request with the access token | `{"cluster":"kafka","poolid":"kafkapool"}` | +| schemaRegistryURL | N | Required when using Schema Registry Avro serialization/deserialization. The Schema Registry URL. | `http://localhost:8081` | +| schemaRegistryAPIKey | N | When using Schema Registry Avro serialization/deserialization. The Schema Registry credentials API Key. | `XYAXXAZ` | +| schemaRegistryAPISecret | N | When using Schema Registry Avro serialization/deserialization. The Schema Registry credentials API Secret. | `ABCDEFGMEADFF` | +| schemaCachingEnabled | N | When using Schema Registry Avro serialization/deserialization. Enables caching for schemas. Default is `true` | `true` | +| schemaLatestVersionCacheTTL | N | When using Schema Registry Avro serialization/deserialization. The TTL for schema caching when publishing a message with latest schema available. Default is 5 min | `5m` | The `secretKeyRef` above is referencing a [kubernetes secrets store]({{< ref kubernetes-secret-store.md >}}) to access the tls information. Visit [here]({{< ref setup-secret-store.md >}}) to learn more about how to configure a secret store component. @@ -348,6 +364,102 @@ curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.correla }' ``` +## Avro Schema Registry serialization/deserialization +You can configure pub/sub to publish or consume data encoded using [Avro binary serialization](https://avro.apache.org/docs/), leveraging an [Apache Schema Registry](https://developer.confluent.io/courses/apache-kafka/schema-registry/) (for example, [Confluent Schema Registry](https://developer.confluent.io/courses/apache-kafka/schema-registry/), [Apicurio](https://www.apicur.io/registry/)). + +### Configuration + +{{% alert title="Important" color="warning" %}} +Currently, only message value serialization/deserialization is supported. Since cloud events are not supported, the `rawPayload=true` metadata must be passed. +{{% /alert %}} + +When configuring the Kafka pub/sub component metadata, you must define: +- The schema registry URL +- The API key/secret, if applicable + +Schema subjects are automatically derived from topic names, using the standard naming convention. For example, for a topic named `my-topic`, the schema subject will be `my-topic-value`. +When interacting with the message payload within the service, it is in JSON format. The payload is transparently serialized/deserialized within the Dapr component. +Date/Datetime fields must be passed as their [Epoch Unix timestamp](https://en.wikipedia.org/wiki/Unix_time) equivalent (rather than typical Iso8601). For example: +- `2024-01-10T04:36:05.986Z` should be passed as `1704861365986` (the number of milliseconds since Jan 1st, 1970) +- `2024-01-10` should be passed as `19732` (the number of days since Jan 1st, 1970) + +### Publishing Avro messages +In order to indicate to the Kafka pub/sub component that the message should be using Avro serialization, the `valueSchemaType` metadata must be set to `Avro`. + +{{< tabs curl "Python SDK">}} + +{{% codetab %}} +```bash +curl -X "POST" http://localhost:3500/v1.0/publish/pubsub/my-topic?metadata.rawPayload=true&metadata.valueSchemaType=Avro -H "Content-Type: application/json" -d '{"order_number": "345", "created_date": 1704861365986}' +``` +{{% /codetab %}} + +{{% codetab %}} +```python +from dapr.clients import DaprClient + +with DaprClient() as d: + req_data = { + 'order_number': '345', + 'created_date': 1704861365986 + } + # Create a typed message with content type and body + resp = d.publish_event( + pubsub_name='pubsub', + topic_name='my-topic', + data=json.dumps(req_data), + publish_metadata={'rawPayload': 'true', 'valueSchemaType': 'Avro'} + ) + # Print the request + print(req_data, flush=True) +``` +{{% /codetab %}} + +{{< /tabs >}} + + +### Subscribing to Avro topics +In order to indicate to the Kafka pub/sub component that the message should be deserialized using Avro, the `valueSchemaType` metadata must be set to `Avro` in the subscription metadata. + +{{< tabs "Python (FastAPI)" >}} + +{{% codetab %}} + +```python +from fastapi import APIRouter, Body, Response, status +import json +import sys + +app = FastAPI() + +router = APIRouter() + + +@router.get('/dapr/subscribe') +def subscribe(): + subscriptions = [{'pubsubname': 'pubsub', + 'topic': 'my-topic', + 'route': 'my_topic_subscriber', + 'metadata': { + 'rawPayload': 'true', + 'valueSchemaType': 'Avro', + } }] + return subscriptions + +@router.post('/my_topic_subscriber') +def my_topic_subscriber(event_data=Body()): + print(event_data, flush=True) + return Response(status_code=status.HTTP_200_OK) +``` + +app.include_router(router) + +{{% /codetab %}} + +{{< /tabs >}} + + + ## Create a Kafka instance {{< tabs "Self-Hosted" "Kubernetes">}}