mirror of https://github.com/dapr/docs.git
Merge branch 'v1.14' into issue_4219-3
This commit is contained in:
commit
c167d30d84
|
@ -336,14 +336,13 @@ Status | Description
|
|||
`RETRY` | Message to be retried by Dapr
|
||||
`DROP` | Warning is logged and message is dropped
|
||||
|
||||
Please refer [Expected HTTP Response for Bulk Subscribe]({{< ref pubsub_api.md >}}) for further insights on response.
|
||||
Refer to [Expected HTTP Response for Bulk Subscribe]({{< ref pubsub_api.md >}}) for further insights on response.
|
||||
|
||||
### Example
|
||||
|
||||
Please refer following code samples for how to use Bulk Subscribe:
|
||||
|
||||
{{< tabs "Java" "JavaScript" ".NET" >}}
|
||||
The following code examples demonstrate how to use Bulk Subscribe.
|
||||
|
||||
{{< tabs "Java" "JavaScript" ".NET" "Python" >}}
|
||||
{{% codetab %}}
|
||||
|
||||
```java
|
||||
|
@ -471,7 +470,50 @@ public class BulkMessageController : ControllerBase
|
|||
|
||||
{{% /codetab %}}
|
||||
|
||||
{{% codetab %}}
|
||||
Currently, you can only bulk subscribe in Python using an HTTP client.
|
||||
|
||||
```python
|
||||
import json
|
||||
from flask import Flask, request, jsonify
|
||||
|
||||
app = Flask(__name__)
|
||||
|
||||
@app.route('/dapr/subscribe', methods=['GET'])
|
||||
def subscribe():
|
||||
# Define the bulk subscribe configuration
|
||||
subscriptions = [{
|
||||
"pubsubname": "pubsub",
|
||||
"topic": "TOPIC_A",
|
||||
"route": "/checkout",
|
||||
"bulkSubscribe": {
|
||||
"enabled": True,
|
||||
"maxMessagesCount": 3,
|
||||
"maxAwaitDurationMs": 40
|
||||
}
|
||||
}]
|
||||
print('Dapr pub/sub is subscribed to: ' + json.dumps(subscriptions))
|
||||
return jsonify(subscriptions)
|
||||
|
||||
|
||||
# Define the endpoint to handle incoming messages
|
||||
@app.route('/checkout', methods=['POST'])
|
||||
def checkout():
|
||||
messages = request.json
|
||||
print(messages)
|
||||
for message in messages:
|
||||
print(f"Received message: {message}")
|
||||
return json.dumps({'success': True}), 200, {'ContentType': 'application/json'}
|
||||
|
||||
if __name__ == '__main__':
|
||||
app.run(port=5000)
|
||||
|
||||
```
|
||||
|
||||
{{% /codetab %}}
|
||||
|
||||
{{< /tabs >}}
|
||||
|
||||
## How components handle publishing and subscribing to bulk messages
|
||||
|
||||
For event publish/subscribe, two kinds of network transfers are involved.
|
||||
|
|
|
@ -16,6 +16,7 @@ This guide walks you through installing an Elastic Kubernetes Service (EKS) clus
|
|||
- [AWS CLI](https://aws.amazon.com/cli/)
|
||||
- [eksctl](https://eksctl.io/)
|
||||
- [An existing VPC and subnets](https://docs.aws.amazon.com/eks/latest/userguide/network_reqs.html)
|
||||
- [Dapr CLI](https://docs.dapr.io/getting-started/install-dapr-cli/)
|
||||
|
||||
## Deploy an EKS cluster
|
||||
|
||||
|
@ -25,20 +26,57 @@ This guide walks you through installing an Elastic Kubernetes Service (EKS) clus
|
|||
aws configure
|
||||
```
|
||||
|
||||
1. Create an EKS cluster. To use a specific version of Kubernetes, use `--version` (1.13.x or newer version required).
|
||||
1. Create a new file called `cluster-config.yaml` and add the content below to it, replacing `[your_cluster_name]`, `[your_cluster_region]`, and `[your_k8s_version]` with the appropriate values:
|
||||
|
||||
```yaml
|
||||
apiVersion: eksctl.io/v1alpha5
|
||||
kind: ClusterConfig
|
||||
|
||||
metadata:
|
||||
name: [your_cluster_name]
|
||||
region: [your_cluster_region]
|
||||
version: [your_k8s_version]
|
||||
tags:
|
||||
karpenter.sh/discovery: [your_cluster_name]
|
||||
|
||||
iam:
|
||||
withOIDC: true
|
||||
|
||||
managedNodeGroups:
|
||||
- name: mng-od-4vcpu-8gb
|
||||
desiredCapacity: 2
|
||||
minSize: 1
|
||||
maxSize: 5
|
||||
instanceType: c5.xlarge
|
||||
privateNetworking: true
|
||||
|
||||
addons:
|
||||
- name: vpc-cni
|
||||
attachPolicyARNs:
|
||||
- arn:aws:iam::aws:policy/AmazonEKS_CNI_Policy
|
||||
- name: coredns
|
||||
version: latest
|
||||
- name: kube-proxy
|
||||
version: latest
|
||||
- name: aws-ebs-csi-driver
|
||||
wellKnownPolicies:
|
||||
ebsCSIController: true
|
||||
```
|
||||
|
||||
1. Create the cluster by running the following command:
|
||||
|
||||
```bash
|
||||
eksctl create cluster --name [your_eks_cluster_name] --region [your_aws_region] --version [kubernetes_version] --vpc-private-subnets [subnet_list_seprated_by_comma] --without-nodegroup
|
||||
eksctl create cluster -f cluster.yaml
|
||||
```
|
||||
|
||||
Change the values for `vpc-private-subnets` to meet your requirements. You can also add additional IDs. You must specify at least two subnet IDs. If you'd rather specify public subnets, you can change `--vpc-private-subnets` to `--vpc-public-subnets`.
|
||||
|
||||
1. Verify kubectl context:
|
||||
|
||||
1. Verify the kubectl context:
|
||||
|
||||
```bash
|
||||
kubectl config current-context
|
||||
```
|
||||
|
||||
## Add Dapr requirements for sidecar access and default storage class:
|
||||
|
||||
1. Update the security group rule to allow the EKS cluster to communicate with the Dapr Sidecar by creating an inbound rule for port 4000.
|
||||
|
||||
```bash
|
||||
|
@ -49,11 +87,37 @@ This guide walks you through installing an Elastic Kubernetes Service (EKS) clus
|
|||
--source-group [your_security_group]
|
||||
```
|
||||
|
||||
2. Add a default storage class if you don't have one:
|
||||
|
||||
```bash
|
||||
kubectl patch storageclass gp2 -p '{"metadata": {"annotations":{"storageclass.kubernetes.io/is-default-class":"true"}}}'
|
||||
```
|
||||
|
||||
## Install Dapr
|
||||
|
||||
Install Dapr on your cluster by running:
|
||||
|
||||
```bash
|
||||
dapr init -k
|
||||
```
|
||||
|
||||
You should see the following response:
|
||||
|
||||
```bash
|
||||
⌛ Making the jump to hyperspace...
|
||||
ℹ️ Note: To install Dapr using Helm, see here: https://docs.dapr.io/getting-started/install-dapr-kubernetes/#install-with-helm-advanced
|
||||
|
||||
ℹ️ Container images will be pulled from Docker Hub
|
||||
✅ Deploying the Dapr control plane with latest version to your cluster...
|
||||
✅ Deploying the Dapr dashboard with latest version to your cluster...
|
||||
✅ Success! Dapr has been installed to namespace dapr-system. To verify, run `dapr status -k' in your terminal. To get started, go here: https://docs.dapr.io/getting-started
|
||||
```
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
### Access permissions
|
||||
|
||||
If you face any access permissions, make sure you are using the same AWS profile that was used to create the cluster. If needed, update the kubectl configuration with the correct profile:
|
||||
If you face any access permissions, make sure you are using the same AWS profile that was used to create the cluster. If needed, update the kubectl configuration with the correct profile. More information [here](https://repost.aws/knowledge-center/eks-api-server-unauthorized-error):
|
||||
|
||||
```bash
|
||||
aws eks --region [your_aws_region] update-kubeconfig --name [your_eks_cluster_name] --profile [your_profile_name]
|
||||
|
|
|
@ -468,7 +468,7 @@ Apache Kafka supports the following bulk metadata options:
|
|||
|
||||
When invoking the Kafka pub/sub, its possible to provide an optional partition key by using the `metadata` query param in the request url.
|
||||
|
||||
The param name is `partitionKey`.
|
||||
The param name can either be `partitionKey` or `__key`
|
||||
|
||||
Example:
|
||||
|
||||
|
@ -484,7 +484,7 @@ curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.partiti
|
|||
|
||||
### Message headers
|
||||
|
||||
All other metadata key/value pairs (that are not `partitionKey`) are set as headers in the Kafka message. Here is an example setting a `correlationId` for the message.
|
||||
All other metadata key/value pairs (that are not `partitionKey` or `__key`) are set as headers in the Kafka message. Here is an example setting a `correlationId` for the message.
|
||||
|
||||
```shell
|
||||
curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.correlationId=myCorrelationID&metadata.partitionKey=key1 \
|
||||
|
@ -495,7 +495,51 @@ curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.correla
|
|||
}
|
||||
}'
|
||||
```
|
||||
### Kafka Pubsub special message headers received on consumer side
|
||||
|
||||
When consuming messages, special message metadata are being automatically passed as headers. These are:
|
||||
- `__key`: the message key if available
|
||||
- `__topic`: the topic for the message
|
||||
- `__partition`: the partition number for the message
|
||||
- `__offset`: the offset of the message in the partition
|
||||
- `__timestamp`: the timestamp for the message
|
||||
|
||||
You can access them within the consumer endpoint as follows:
|
||||
{{< tabs "Python (FastAPI)" >}}
|
||||
|
||||
{{% codetab %}}
|
||||
|
||||
```python
|
||||
from fastapi import APIRouter, Body, Response, status
|
||||
import json
|
||||
import sys
|
||||
|
||||
app = FastAPI()
|
||||
|
||||
router = APIRouter()
|
||||
|
||||
|
||||
@router.get('/dapr/subscribe')
|
||||
def subscribe():
|
||||
subscriptions = [{'pubsubname': 'pubsub',
|
||||
'topic': 'my-topic',
|
||||
'route': 'my_topic_subscriber',
|
||||
}]
|
||||
return subscriptions
|
||||
|
||||
@router.post('/my_topic_subscriber')
|
||||
def my_topic_subscriber(
|
||||
key: Annotated[str, Header(alias="__key")],
|
||||
offset: Annotated[int, Header(alias="__offset")],
|
||||
event_data=Body()):
|
||||
print(f"key={key} - offset={offset} - data={event_data}", flush=True)
|
||||
return Response(status_code=status.HTTP_200_OK)
|
||||
|
||||
app.include_router(router)
|
||||
|
||||
```
|
||||
|
||||
{{% /codetab %}}
|
||||
## Receiving message headers with special characters
|
||||
|
||||
The consumer application may be required to receive message headers that include special characters, which may cause HTTP protocol validation errors.
|
||||
|
|
Loading…
Reference in New Issue