mirror of https://github.com/dapr/docs.git
Feature/add-kafka-pubsub-schema-registry (#3946)
* Added doc for kafka pubsub Avro schema registry support Signed-off-by: Patrick Assuied <patrick.assuied@elationhealth.com> * Update daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md Co-authored-by: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com> Signed-off-by: Patrick Assuied <patrick.assuied@elationhealth.com> * Update daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md Co-authored-by: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com> Signed-off-by: Patrick Assuied <patrick.assuied@elationhealth.com> * Update daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md Co-authored-by: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com> Signed-off-by: Patrick Assuied <patrick.assuied@elationhealth.com> * Update daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md Co-authored-by: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com> Signed-off-by: Patrick Assuied <patrick.assuied@elationhealth.com> * Update daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md Co-authored-by: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com> Signed-off-by: Patrick Assuied <patrick.assuied@elationhealth.com> * Update daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md Co-authored-by: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com> Signed-off-by: Patrick Assuied <patrick.assuied@elationhealth.com> * Update daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md Co-authored-by: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com> Signed-off-by: Patrick Assuied <patrick.assuied@elationhealth.com> * Update daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md Co-authored-by: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com> Signed-off-by: Patrick Assuied <patrick.assuied@elationhealth.com> * Update daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md Co-authored-by: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com> Signed-off-by: Patrick Assuied <patrick.assuied@elationhealth.com> * Update daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md Co-authored-by: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com> Signed-off-by: Patrick Assuied <patrick.assuied@elationhealth.com> * Update daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md Co-authored-by: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com> Signed-off-by: Patrick Assuied <patrick.assuied@elationhealth.com> * Update daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md Co-authored-by: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com> Signed-off-by: Patrick Assuied <patrick.assuied@elationhealth.com> * Update daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md Co-authored-by: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com> Signed-off-by: Patrick Assuied <patrick.assuied@elationhealth.com> * Update setup-apache-kafka.md Signed-off-by: Patrick Assuied <patrick.assuied@elationhealth.com> * Update daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md Co-authored-by: Mark Fussell <markfussell@gmail.com> Signed-off-by: Patrick Assuied <patrick.assuied@elationhealth.com> --------- Signed-off-by: Patrick Assuied <patrick.assuied@elationhealth.com> Co-authored-by: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com> Co-authored-by: Mark Fussell <markfussell@gmail.com>
This commit is contained in:
parent
cc3d110373
commit
ae4e13754b
|
@ -49,6 +49,16 @@ spec:
|
|||
value: "2.0.0"
|
||||
- name: direction
|
||||
value: "input, output"
|
||||
- name: schemaRegistryURL # Optional. When using Schema Registry Avro serialization/deserialization. The Schema Registry URL.
|
||||
value: http://localhost:8081
|
||||
- name: schemaRegistryAPIKey # Optional. When using Schema Registry Avro serialization/deserialization. The Schema Registry API Key.
|
||||
value: XYAXXAZ
|
||||
- name: schemaRegistryAPISecret # Optional. When using Schema Registry Avro serialization/deserialization. The Schema Registry credentials API Secret.
|
||||
value: "ABCDEFGMEADFF"
|
||||
- name: schemaCachingEnabled # Optional. When using Schema Registry Avro serialization/deserialization. Enables caching for schemas.
|
||||
value: true
|
||||
- name: schemaLatestVersionCacheTTL # Optional. When using Schema Registry Avro serialization/deserialization. The TTL for schema caching when publishing a message with latest schema available.
|
||||
value: 5m
|
||||
```
|
||||
|
||||
## Spec metadata fields
|
||||
|
@ -75,6 +85,11 @@ spec:
|
|||
| `version` | N | Input/Output | Kafka cluster version. Defaults to 2.0.0. Please note that this needs to be mandatorily set to `1.0.0` for EventHubs with Kafka. | `"1.0.0"` |
|
||||
| `direction` | N | Input/Output | The direction of the binding. | `"input"`, `"output"`, `"input, output"` |
|
||||
| `oidcExtensions` | N | Input/Output | String containing a JSON-encoded dictionary of OAuth2/OIDC extensions to request with the access token | `{"cluster":"kafka","poolid":"kafkapool"}` |
|
||||
| `schemaRegistryURL` | N | Required when using Schema Registry Avro serialization/deserialization. The Schema Registry URL. | `http://localhost:8081` |
|
||||
| `schemaRegistryAPIKey` | N | When using Schema Registry Avro serialization/deserialization. The Schema Registry credentials API Key. | `XYAXXAZ` |
|
||||
| `schemaRegistryAPISecret` | N | When using Schema Registry Avro serialization/deserialization. The Schema Registry credentials API Secret. | `ABCDEFGMEADFF` |
|
||||
| `schemaCachingEnabled` | N | When using Schema Registry Avro serialization/deserialization. Enables caching for schemas. Default is `true` | `true` |
|
||||
| `schemaLatestVersionCacheTTL` | N | When using Schema Registry Avro serialization/deserialization. The TTL for schema caching when publishing a message with latest schema available. Default is 5 min | `5m` |
|
||||
|
||||
#### Note
|
||||
The metadata `version` must be set to `1.0.0` when using Azure EventHubs with Kafka.
|
||||
|
|
|
@ -49,6 +49,17 @@ spec:
|
|||
value: 2.0.0
|
||||
- name: disableTls # Optional. Disable TLS. This is not safe for production!! You should read the `Mutual TLS` section for how to use TLS.
|
||||
value: "true"
|
||||
- name: schemaRegistryURL # Optional. When using Schema Registry Avro serialization/deserialization. The Schema Registry URL.
|
||||
value: http://localhost:8081
|
||||
- name: schemaRegistryAPIKey # Optional. When using Schema Registry Avro serialization/deserialization. The Schema Registry API Key.
|
||||
value: XYAXXAZ
|
||||
- name: schemaRegistryAPISecret # Optional. When using Schema Registry Avro serialization/deserialization. The Schema Registry credentials API Secret.
|
||||
value: "ABCDEFGMEADFF"
|
||||
- name: schemaCachingEnabled # Optional. When using Schema Registry Avro serialization/deserialization. Enables caching for schemas.
|
||||
value: true
|
||||
- name: schemaLatestVersionCacheTTL # Optional. When using Schema Registry Avro serialization/deserialization. The TTL for schema caching when publishing a message with latest schema available.
|
||||
value: 5m
|
||||
|
||||
```
|
||||
|
||||
> For details on using `secretKeyRef`, see the guide on [how to reference secrets in components]({{< ref component-secrets.md >}}).
|
||||
|
@ -81,6 +92,11 @@ spec:
|
|||
| oidcClientSecret | N | The OAuth2 client secret that has been provisioned in the identity provider: Required when `authType` is set to `oidc` | `"KeFg23!"` |
|
||||
| oidcScopes | N | Comma-delimited list of OAuth2/OIDC scopes to request with the access token. Recommended when `authType` is set to `oidc`. Defaults to `"openid"` | `"openid,kafka-prod"` |
|
||||
| oidcExtensions | N | Input/Output | String containing a JSON-encoded dictionary of OAuth2/OIDC extensions to request with the access token | `{"cluster":"kafka","poolid":"kafkapool"}` |
|
||||
| schemaRegistryURL | N | Required when using Schema Registry Avro serialization/deserialization. The Schema Registry URL. | `http://localhost:8081` |
|
||||
| schemaRegistryAPIKey | N | When using Schema Registry Avro serialization/deserialization. The Schema Registry credentials API Key. | `XYAXXAZ` |
|
||||
| schemaRegistryAPISecret | N | When using Schema Registry Avro serialization/deserialization. The Schema Registry credentials API Secret. | `ABCDEFGMEADFF` |
|
||||
| schemaCachingEnabled | N | When using Schema Registry Avro serialization/deserialization. Enables caching for schemas. Default is `true` | `true` |
|
||||
| schemaLatestVersionCacheTTL | N | When using Schema Registry Avro serialization/deserialization. The TTL for schema caching when publishing a message with latest schema available. Default is 5 min | `5m` |
|
||||
|
||||
The `secretKeyRef` above is referencing a [kubernetes secrets store]({{< ref kubernetes-secret-store.md >}}) to access the tls information. Visit [here]({{< ref setup-secret-store.md >}}) to learn more about how to configure a secret store component.
|
||||
|
||||
|
@ -348,6 +364,102 @@ curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.correla
|
|||
}'
|
||||
```
|
||||
|
||||
## Avro Schema Registry serialization/deserialization
|
||||
You can configure pub/sub to publish or consume data encoded using [Avro binary serialization](https://avro.apache.org/docs/), leveraging an [Apache Schema Registry](https://developer.confluent.io/courses/apache-kafka/schema-registry/) (for example, [Confluent Schema Registry](https://developer.confluent.io/courses/apache-kafka/schema-registry/), [Apicurio](https://www.apicur.io/registry/)).
|
||||
|
||||
### Configuration
|
||||
|
||||
{{% alert title="Important" color="warning" %}}
|
||||
Currently, only message value serialization/deserialization is supported. Since cloud events are not supported, the `rawPayload=true` metadata must be passed.
|
||||
{{% /alert %}}
|
||||
|
||||
When configuring the Kafka pub/sub component metadata, you must define:
|
||||
- The schema registry URL
|
||||
- The API key/secret, if applicable
|
||||
|
||||
Schema subjects are automatically derived from topic names, using the standard naming convention. For example, for a topic named `my-topic`, the schema subject will be `my-topic-value`.
|
||||
When interacting with the message payload within the service, it is in JSON format. The payload is transparently serialized/deserialized within the Dapr component.
|
||||
Date/Datetime fields must be passed as their [Epoch Unix timestamp](https://en.wikipedia.org/wiki/Unix_time) equivalent (rather than typical Iso8601). For example:
|
||||
- `2024-01-10T04:36:05.986Z` should be passed as `1704861365986` (the number of milliseconds since Jan 1st, 1970)
|
||||
- `2024-01-10` should be passed as `19732` (the number of days since Jan 1st, 1970)
|
||||
|
||||
### Publishing Avro messages
|
||||
In order to indicate to the Kafka pub/sub component that the message should be using Avro serialization, the `valueSchemaType` metadata must be set to `Avro`.
|
||||
|
||||
{{< tabs curl "Python SDK">}}
|
||||
|
||||
{{% codetab %}}
|
||||
```bash
|
||||
curl -X "POST" http://localhost:3500/v1.0/publish/pubsub/my-topic?metadata.rawPayload=true&metadata.valueSchemaType=Avro -H "Content-Type: application/json" -d '{"order_number": "345", "created_date": 1704861365986}'
|
||||
```
|
||||
{{% /codetab %}}
|
||||
|
||||
{{% codetab %}}
|
||||
```python
|
||||
from dapr.clients import DaprClient
|
||||
|
||||
with DaprClient() as d:
|
||||
req_data = {
|
||||
'order_number': '345',
|
||||
'created_date': 1704861365986
|
||||
}
|
||||
# Create a typed message with content type and body
|
||||
resp = d.publish_event(
|
||||
pubsub_name='pubsub',
|
||||
topic_name='my-topic',
|
||||
data=json.dumps(req_data),
|
||||
publish_metadata={'rawPayload': 'true', 'valueSchemaType': 'Avro'}
|
||||
)
|
||||
# Print the request
|
||||
print(req_data, flush=True)
|
||||
```
|
||||
{{% /codetab %}}
|
||||
|
||||
{{< /tabs >}}
|
||||
|
||||
|
||||
### Subscribing to Avro topics
|
||||
In order to indicate to the Kafka pub/sub component that the message should be deserialized using Avro, the `valueSchemaType` metadata must be set to `Avro` in the subscription metadata.
|
||||
|
||||
{{< tabs "Python (FastAPI)" >}}
|
||||
|
||||
{{% codetab %}}
|
||||
|
||||
```python
|
||||
from fastapi import APIRouter, Body, Response, status
|
||||
import json
|
||||
import sys
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
@router.get('/dapr/subscribe')
|
||||
def subscribe():
|
||||
subscriptions = [{'pubsubname': 'pubsub',
|
||||
'topic': 'my-topic',
|
||||
'route': 'my_topic_subscriber',
|
||||
'metadata': {
|
||||
'rawPayload': 'true',
|
||||
'valueSchemaType': 'Avro',
|
||||
} }]
|
||||
return subscriptions
|
||||
|
||||
@router.post('/my_topic_subscriber')
|
||||
def my_topic_subscriber(event_data=Body()):
|
||||
print(event_data, flush=True)
|
||||
return Response(status_code=status.HTTP_200_OK)
|
||||
```
|
||||
|
||||
app.include_router(router)
|
||||
|
||||
{{% /codetab %}}
|
||||
|
||||
{{< /tabs >}}
|
||||
|
||||
|
||||
|
||||
## Create a Kafka instance
|
||||
|
||||
{{< tabs "Self-Hosted" "Kubernetes">}}
|
||||
|
|
Loading…
Reference in New Issue