mirror of https://github.com/dapr/docs.git
Added missing information about kafka-pubsub special metadata headers
Signed-off-by: Patrick Assuied <patrick.assuied@elationhealth.com>
This commit is contained in:
parent
6f8fcb2a55
commit
1ab3eca476
|
@ -468,7 +468,7 @@ Apache Kafka supports the following bulk metadata options:
|
||||||
|
|
||||||
When invoking the Kafka pub/sub, its possible to provide an optional partition key by using the `metadata` query param in the request url.
|
When invoking the Kafka pub/sub, its possible to provide an optional partition key by using the `metadata` query param in the request url.
|
||||||
|
|
||||||
The param name is `partitionKey`.
|
The param name can either `partitionKey` or `__key`
|
||||||
|
|
||||||
Example:
|
Example:
|
||||||
|
|
||||||
|
@ -484,7 +484,7 @@ curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.partiti
|
||||||
|
|
||||||
### Message headers
|
### Message headers
|
||||||
|
|
||||||
All other metadata key/value pairs (that are not `partitionKey`) are set as headers in the Kafka message. Here is an example setting a `correlationId` for the message.
|
All other metadata key/value pairs (that are not `partitionKey` or `__key`) are set as headers in the Kafka message. Here is an example setting a `correlationId` for the message.
|
||||||
|
|
||||||
```shell
|
```shell
|
||||||
curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.correlationId=myCorrelationID&metadata.partitionKey=key1 \
|
curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.correlationId=myCorrelationID&metadata.partitionKey=key1 \
|
||||||
|
@ -495,7 +495,51 @@ curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.correla
|
||||||
}
|
}
|
||||||
}'
|
}'
|
||||||
```
|
```
|
||||||
|
### Kafka Pubsub special message headers received on consumer side
|
||||||
|
|
||||||
|
When consuming messages, special message metadata are being automatically passed as headers. These are:
|
||||||
|
- `__key`: the message key if applicable
|
||||||
|
- `__topic`: the topic for the message
|
||||||
|
- `__partition`: the partition number for the message
|
||||||
|
- `__offset`: the offset of the message in the partition
|
||||||
|
- `__timestamp`: the timestamp for the message
|
||||||
|
|
||||||
|
You can access them within the consumer endpoint as follows:
|
||||||
|
{{< tabs "Python (FastAPI)" >}}
|
||||||
|
|
||||||
|
{{% codetab %}}
|
||||||
|
|
||||||
|
```python
|
||||||
|
from fastapi import APIRouter, Body, Response, status
|
||||||
|
import json
|
||||||
|
import sys
|
||||||
|
|
||||||
|
app = FastAPI()
|
||||||
|
|
||||||
|
router = APIRouter()
|
||||||
|
|
||||||
|
|
||||||
|
@router.get('/dapr/subscribe')
|
||||||
|
def subscribe():
|
||||||
|
subscriptions = [{'pubsubname': 'pubsub',
|
||||||
|
'topic': 'my-topic',
|
||||||
|
'route': 'my_topic_subscriber',
|
||||||
|
}]
|
||||||
|
return subscriptions
|
||||||
|
|
||||||
|
@router.post('/my_topic_subscriber')
|
||||||
|
def my_topic_subscriber(
|
||||||
|
key: Annotated[str, Header(alias="__key")],
|
||||||
|
offset: Annotated[int, Header(alias="__offset")],
|
||||||
|
event_data=Body()):
|
||||||
|
print(f"key={key} - offset={offset} - data={event_data}", flush=True)
|
||||||
|
return Response(status_code=status.HTTP_200_OK)
|
||||||
|
|
||||||
|
app.include_router(router)
|
||||||
|
|
||||||
|
```
|
||||||
|
|
||||||
|
{{% /codetab %}}
|
||||||
## Receiving message headers with special characters
|
## Receiving message headers with special characters
|
||||||
|
|
||||||
The consumer application may be required to receive message headers that include special characters, which may cause HTTP protocol validation errors.
|
The consumer application may be required to receive message headers that include special characters, which may cause HTTP protocol validation errors.
|
||||||
|
|
Loading…
Reference in New Issue