From 8aa121259ff6182ae8b67fe1e9dcbdf65793b6bd Mon Sep 17 00:00:00 2001 From: Whit Waldo Date: Mon, 16 Sep 2024 14:47:24 -0500 Subject: [PATCH 1/8] Added .NET implementation detail for streaming subscription support Signed-off-by: Whit Waldo --- .../pubsub/subscription-methods.md | 42 ++++++++++++++++++- 1 file changed, 41 insertions(+), 1 deletion(-) diff --git a/daprdocs/content/en/developing-applications/building-blocks/pubsub/subscription-methods.md b/daprdocs/content/en/developing-applications/building-blocks/pubsub/subscription-methods.md index 436c16295..7c90a7c58 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/pubsub/subscription-methods.md +++ b/daprdocs/content/en/developing-applications/building-blocks/pubsub/subscription-methods.md @@ -204,7 +204,47 @@ As messages are sent to the given message handler code, there is no concept of r The example below shows the different ways to stream subscribe to a topic. -{{< tabs Go>}} +{{< tabs ".NET" Go>}} + +{{% codetab %}} + +```csharp +using Dapr.Messaging.PublishSubscribe; + +var clientBuilder = new DaprPublishSubscribeClientBuilder(); +var daprMessagingClient = clientBuilder.Build(); + +async Task HandleMessage(TopicMessage message, CancellationToken cancellationToken = default) +{ + try + { + //Do something with the message + Console.WriteLine(Encoding.UTF8.GetString(message.Data.Span)); + + return await Task.FromResult(TopicResponseAction.Success); + } + catch + { + return await Task.FromResult(TopicResponseAction.Retry); + } +} + +//Create a dynamic streaming subscription +var subscription = daprMessagingClient.Register("pubsub", "myTopic", + new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(15), TopicResponseAction.Retry)), + HandleMessage, CancellationToken.None); + +//Subscribe to messages on it with a timeout of 30 seconds +var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30)); +await subscription.SubscribeAsync(cancellationTokenSource.Token); + +await Task.Delay(TimeSpan.FromMinutes(1)); + +//When you're done with the subscription, simply dispose of it +await subscription.DisposeAsync(); +``` + +{{% /codetab %}} {{% codetab %}} From c8dc1a9664a06c2ff678e62a6d475babc9d26714 Mon Sep 17 00:00:00 2001 From: Hannah Hunter Date: Mon, 28 Oct 2024 15:31:08 -0400 Subject: [PATCH 2/8] add AI generated python code example Signed-off-by: Hannah Hunter --- .../building-blocks/pubsub/pubsub-bulk.md | 57 +++++++++++++++++-- 1 file changed, 53 insertions(+), 4 deletions(-) diff --git a/daprdocs/content/en/developing-applications/building-blocks/pubsub/pubsub-bulk.md b/daprdocs/content/en/developing-applications/building-blocks/pubsub/pubsub-bulk.md index 89ca63fe8..e54fca8f4 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/pubsub/pubsub-bulk.md +++ b/daprdocs/content/en/developing-applications/building-blocks/pubsub/pubsub-bulk.md @@ -336,14 +336,13 @@ Status | Description `RETRY` | Message to be retried by Dapr `DROP` | Warning is logged and message is dropped -Please refer [Expected HTTP Response for Bulk Subscribe]({{< ref pubsub_api.md >}}) for further insights on response. +Refer to [Expected HTTP Response for Bulk Subscribe]({{< ref pubsub_api.md >}}) for further insights on response. ### Example -Please refer following code samples for how to use Bulk Subscribe: - -{{< tabs "Java" "JavaScript" ".NET" >}} +The following code examples demonstrate how to use Bulk Subscribe. +{{< tabs "Java" "JavaScript" ".NET" "Python" >}} {{% codetab %}} ```java @@ -471,7 +470,57 @@ public class BulkMessageController : ControllerBase {{% /codetab %}} +{{% codetab %}} + +```python +import requests +import json +from flask import Flask, request + +app = Flask(__name__) + +# Define the Dapr sidecar URL +DAPR_URL = "http://localhost:3500/v1.0" + +# Define the bulk subscribe endpoint +BULK_SUBSCRIBE_ENDPOINT = f"{DAPR_URL}/subscribe/bulk" + +# Define the subscription details +subscription = { + "pubsubname": "my-pubsub-name", + "topic": "topic-a", + "route": "/events", + "metadata": { + "bulkSubscribe": "true", + "maxMessagesCount": "100", + "maxAwaitDurationMs": "40" + } +} + +# Register the subscription +response = requests.post(BULK_SUBSCRIBE_ENDPOINT, json=subscription) + +if response.status_code == 200: + print("Bulk subscription registered successfully!") +else: + print(f"Failed to register bulk subscription: {response.status_code} - {response.text}") + +# Define the event handler +@app.route('/events', methods=['POST']) +def handle_events(): + events = request.json + for event in events: + print(f"Received event: {event}") + return '', 200 + +if __name__ == '__main__': + app.run(port=5000) +``` + +{{% /codetab %}} + {{< /tabs >}} + ## How components handle publishing and subscribing to bulk messages For event publish/subscribe, two kinds of network transfers are involved. From 6f47d2c74811227968719b084113ee96bda8c173 Mon Sep 17 00:00:00 2001 From: Hannah Hunter Date: Fri, 1 Nov 2024 12:25:33 -0400 Subject: [PATCH 3/8] update and add a little comment Signed-off-by: Hannah Hunter --- .../building-blocks/pubsub/pubsub-bulk.md | 48 ++++++++++--------- 1 file changed, 25 insertions(+), 23 deletions(-) diff --git a/daprdocs/content/en/developing-applications/building-blocks/pubsub/pubsub-bulk.md b/daprdocs/content/en/developing-applications/building-blocks/pubsub/pubsub-bulk.md index e54fca8f4..462350800 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/pubsub/pubsub-bulk.md +++ b/daprdocs/content/en/developing-applications/building-blocks/pubsub/pubsub-bulk.md @@ -471,46 +471,48 @@ public class BulkMessageController : ControllerBase {{% /codetab %}} {{% codetab %}} +Currently, you can only bulk subscribe in Python using an HTTP client. ```python import requests import json -from flask import Flask, request - -app = Flask(__name__) # Define the Dapr sidecar URL DAPR_URL = "http://localhost:3500/v1.0" -# Define the bulk subscribe endpoint -BULK_SUBSCRIBE_ENDPOINT = f"{DAPR_URL}/subscribe/bulk" +# Define the subscription endpoint +SUBSCRIBE_URL = f"{DAPR_URL}/subscribe" -# Define the subscription details +# Define the bulk subscribe configuration subscription = { - "pubsubname": "my-pubsub-name", - "topic": "topic-a", - "route": "/events", - "metadata": { - "bulkSubscribe": "true", - "maxMessagesCount": "100", - "maxAwaitDurationMs": "40" + "pubsubname": "order-pub-sub", + "topic": "orders", + "route": "/checkout", + "bulkSubscribe": { + "enabled": True, + "maxMessagesCount": 100, + "maxAwaitDurationMs": 40 } } -# Register the subscription -response = requests.post(BULK_SUBSCRIBE_ENDPOINT, json=subscription) +# Send the subscription request +response = requests.post(SUBSCRIBE_URL, json=subscription) if response.status_code == 200: - print("Bulk subscription registered successfully!") + print("Bulk subscription created successfully!") else: - print(f"Failed to register bulk subscription: {response.status_code} - {response.text}") + print(f"Failed to create bulk subscription: {response.status_code} - {response.text}") -# Define the event handler -@app.route('/events', methods=['POST']) -def handle_events(): - events = request.json - for event in events: - print(f"Received event: {event}") +# Define the endpoint to handle incoming messages +from flask import Flask, request + +app = Flask(__name__) + +@app.route('/checkout', methods=['POST']) +def checkout(): + messages = request.json + for message in messages: + print(f"Received message: {message}") return '', 200 if __name__ == '__main__': From 6f8fcb2a555c8da7951abc2cf31fe699541bcd81 Mon Sep 17 00:00:00 2001 From: Fernando Rocha Date: Fri, 8 Nov 2024 13:35:28 -0800 Subject: [PATCH 4/8] Update setup-eks.md (#4423) * Update setup-eks.md Signed-off-by: Fernando Rocha * Update setup-eks.md Signed-off-by: Fernando Rocha * Update setup-eks.md Signed-off-by: Fernando Rocha * Update setup-eks.md Signed-off-by: Fernando Rocha --------- Signed-off-by: Fernando Rocha --- .../hosting/kubernetes/cluster/setup-eks.md | 78 +++++++++++++++++-- 1 file changed, 71 insertions(+), 7 deletions(-) diff --git a/daprdocs/content/en/operations/hosting/kubernetes/cluster/setup-eks.md b/daprdocs/content/en/operations/hosting/kubernetes/cluster/setup-eks.md index b6e9b7747..6a87484cc 100644 --- a/daprdocs/content/en/operations/hosting/kubernetes/cluster/setup-eks.md +++ b/daprdocs/content/en/operations/hosting/kubernetes/cluster/setup-eks.md @@ -16,6 +16,7 @@ This guide walks you through installing an Elastic Kubernetes Service (EKS) clus - [AWS CLI](https://aws.amazon.com/cli/) - [eksctl](https://eksctl.io/) - [An existing VPC and subnets](https://docs.aws.amazon.com/eks/latest/userguide/network_reqs.html) + - [Dapr CLI](https://docs.dapr.io/getting-started/install-dapr-cli/) ## Deploy an EKS cluster @@ -25,20 +26,57 @@ This guide walks you through installing an Elastic Kubernetes Service (EKS) clus aws configure ``` -1. Create an EKS cluster. To use a specific version of Kubernetes, use `--version` (1.13.x or newer version required). +1. Create a new file called `cluster-config.yaml` and add the content below to it, replacing `[your_cluster_name]`, `[your_cluster_region]`, and `[your_k8s_version]` with the appropriate values: + + ```yaml + apiVersion: eksctl.io/v1alpha5 + kind: ClusterConfig + + metadata: + name: [your_cluster_name] + region: [your_cluster_region] + version: [your_k8s_version] + tags: + karpenter.sh/discovery: [your_cluster_name] + + iam: + withOIDC: true + + managedNodeGroups: + - name: mng-od-4vcpu-8gb + desiredCapacity: 2 + minSize: 1 + maxSize: 5 + instanceType: c5.xlarge + privateNetworking: true + + addons: + - name: vpc-cni + attachPolicyARNs: + - arn:aws:iam::aws:policy/AmazonEKS_CNI_Policy + - name: coredns + version: latest + - name: kube-proxy + version: latest + - name: aws-ebs-csi-driver + wellKnownPolicies: + ebsCSIController: true + ``` + +1. Create the cluster by running the following command: ```bash - eksctl create cluster --name [your_eks_cluster_name] --region [your_aws_region] --version [kubernetes_version] --vpc-private-subnets [subnet_list_seprated_by_comma] --without-nodegroup + eksctl create cluster -f cluster.yaml ``` - - Change the values for `vpc-private-subnets` to meet your requirements. You can also add additional IDs. You must specify at least two subnet IDs. If you'd rather specify public subnets, you can change `--vpc-private-subnets` to `--vpc-public-subnets`. - -1. Verify kubectl context: + +1. Verify the kubectl context: ```bash kubectl config current-context ``` +## Add Dapr requirements for sidecar access and default storage class: + 1. Update the security group rule to allow the EKS cluster to communicate with the Dapr Sidecar by creating an inbound rule for port 4000. ```bash @@ -49,11 +87,37 @@ This guide walks you through installing an Elastic Kubernetes Service (EKS) clus --source-group [your_security_group] ``` +2. Add a default storage class if you don't have one: + + ```bash + kubectl patch storageclass gp2 -p '{"metadata": {"annotations":{"storageclass.kubernetes.io/is-default-class":"true"}}}' + ``` + +## Install Dapr + +Install Dapr on your cluster by running: + +```bash +dapr init -k +``` + +You should see the following response: + +```bash +⌛ Making the jump to hyperspace... +ℹ️ Note: To install Dapr using Helm, see here: https://docs.dapr.io/getting-started/install-dapr-kubernetes/#install-with-helm-advanced + +ℹ️ Container images will be pulled from Docker Hub +✅ Deploying the Dapr control plane with latest version to your cluster... +✅ Deploying the Dapr dashboard with latest version to your cluster... +✅ Success! Dapr has been installed to namespace dapr-system. To verify, run `dapr status -k' in your terminal. To get started, go here: https://docs.dapr.io/getting-started +``` + ## Troubleshooting ### Access permissions -If you face any access permissions, make sure you are using the same AWS profile that was used to create the cluster. If needed, update the kubectl configuration with the correct profile: +If you face any access permissions, make sure you are using the same AWS profile that was used to create the cluster. If needed, update the kubectl configuration with the correct profile. More information [here](https://repost.aws/knowledge-center/eks-api-server-unauthorized-error): ```bash aws eks --region [your_aws_region] update-kubeconfig --name [your_eks_cluster_name] --profile [your_profile_name] From 1ab3eca476b056b82d748acf2b55daebdc9ad680 Mon Sep 17 00:00:00 2001 From: Patrick Assuied Date: Wed, 13 Nov 2024 08:05:01 -0800 Subject: [PATCH 5/8] Added missing information about kafka-pubsub special metadata headers Signed-off-by: Patrick Assuied --- .../supported-pubsub/setup-apache-kafka.md | 48 ++++++++++++++++++- 1 file changed, 46 insertions(+), 2 deletions(-) diff --git a/daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md b/daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md index 75b1b758c..002aa4d69 100644 --- a/daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md +++ b/daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md @@ -468,7 +468,7 @@ Apache Kafka supports the following bulk metadata options: When invoking the Kafka pub/sub, its possible to provide an optional partition key by using the `metadata` query param in the request url. -The param name is `partitionKey`. +The param name can either `partitionKey` or `__key` Example: @@ -484,7 +484,7 @@ curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.partiti ### Message headers -All other metadata key/value pairs (that are not `partitionKey`) are set as headers in the Kafka message. Here is an example setting a `correlationId` for the message. +All other metadata key/value pairs (that are not `partitionKey` or `__key`) are set as headers in the Kafka message. Here is an example setting a `correlationId` for the message. ```shell curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.correlationId=myCorrelationID&metadata.partitionKey=key1 \ @@ -495,7 +495,51 @@ curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.correla } }' ``` +### Kafka Pubsub special message headers received on consumer side +When consuming messages, special message metadata are being automatically passed as headers. These are: +- `__key`: the message key if applicable +- `__topic`: the topic for the message +- `__partition`: the partition number for the message +- `__offset`: the offset of the message in the partition +- `__timestamp`: the timestamp for the message + +You can access them within the consumer endpoint as follows: +{{< tabs "Python (FastAPI)" >}} + +{{% codetab %}} + +```python +from fastapi import APIRouter, Body, Response, status +import json +import sys + +app = FastAPI() + +router = APIRouter() + + +@router.get('/dapr/subscribe') +def subscribe(): + subscriptions = [{'pubsubname': 'pubsub', + 'topic': 'my-topic', + 'route': 'my_topic_subscriber', + }] + return subscriptions + +@router.post('/my_topic_subscriber') +def my_topic_subscriber( + key: Annotated[str, Header(alias="__key")], + offset: Annotated[int, Header(alias="__offset")], + event_data=Body()): + print(f"key={key} - offset={offset} - data={event_data}", flush=True) + return Response(status_code=status.HTTP_200_OK) + +app.include_router(router) + +``` + +{{% /codetab %}} ## Receiving message headers with special characters The consumer application may be required to receive message headers that include special characters, which may cause HTTP protocol validation errors. From 224c7de4ce7aeef00b056f1605c67493beb3507c Mon Sep 17 00:00:00 2001 From: Patrick Assuied Date: Wed, 13 Nov 2024 08:17:48 -0800 Subject: [PATCH 6/8] Typo Signed-off-by: Patrick Assuied --- .../supported-pubsub/setup-apache-kafka.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md b/daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md index 002aa4d69..c6f718883 100644 --- a/daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md +++ b/daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md @@ -468,7 +468,7 @@ Apache Kafka supports the following bulk metadata options: When invoking the Kafka pub/sub, its possible to provide an optional partition key by using the `metadata` query param in the request url. -The param name can either `partitionKey` or `__key` +The param name can either be `partitionKey` or `__key` Example: @@ -498,7 +498,7 @@ curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.correla ### Kafka Pubsub special message headers received on consumer side When consuming messages, special message metadata are being automatically passed as headers. These are: -- `__key`: the message key if applicable +- `__key`: the message key if available - `__topic`: the topic for the message - `__partition`: the partition number for the message - `__offset`: the offset of the message in the partition From a7e578aae42c871ddd60cdec5f8c3ce732394db6 Mon Sep 17 00:00:00 2001 From: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com> Date: Thu, 14 Nov 2024 12:13:23 -0500 Subject: [PATCH 7/8] Update daprdocs/content/en/developing-applications/building-blocks/pubsub/pubsub-bulk.md Co-authored-by: Elena Kolevska Signed-off-by: Hannah Hunter <94493363+hhunter-ms@users.noreply.github.com> --- .../building-blocks/pubsub/pubsub-bulk.md | 53 ++++++++----------- 1 file changed, 22 insertions(+), 31 deletions(-) diff --git a/daprdocs/content/en/developing-applications/building-blocks/pubsub/pubsub-bulk.md b/daprdocs/content/en/developing-applications/building-blocks/pubsub/pubsub-bulk.md index 462350800..5131d9080 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/pubsub/pubsub-bulk.md +++ b/daprdocs/content/en/developing-applications/building-blocks/pubsub/pubsub-bulk.md @@ -474,49 +474,40 @@ public class BulkMessageController : ControllerBase Currently, you can only bulk subscribe in Python using an HTTP client. ```python -import requests import json - -# Define the Dapr sidecar URL -DAPR_URL = "http://localhost:3500/v1.0" - -# Define the subscription endpoint -SUBSCRIBE_URL = f"{DAPR_URL}/subscribe" - -# Define the bulk subscribe configuration -subscription = { - "pubsubname": "order-pub-sub", - "topic": "orders", - "route": "/checkout", - "bulkSubscribe": { - "enabled": True, - "maxMessagesCount": 100, - "maxAwaitDurationMs": 40 - } -} - -# Send the subscription request -response = requests.post(SUBSCRIBE_URL, json=subscription) - -if response.status_code == 200: - print("Bulk subscription created successfully!") -else: - print(f"Failed to create bulk subscription: {response.status_code} - {response.text}") - -# Define the endpoint to handle incoming messages -from flask import Flask, request +from flask import Flask, request, jsonify app = Flask(__name__) +@app.route('/dapr/subscribe', methods=['GET']) +def subscribe(): + # Define the bulk subscribe configuration + subscriptions = [{ + "pubsubname": "pubsub", + "topic": "TOPIC_A", + "route": "/checkout", + "bulkSubscribe": { + "enabled": True, + "maxMessagesCount": 3, + "maxAwaitDurationMs": 40 + } + }] + print('Dapr pub/sub is subscribed to: ' + json.dumps(subscriptions)) + return jsonify(subscriptions) + + +# Define the endpoint to handle incoming messages @app.route('/checkout', methods=['POST']) def checkout(): messages = request.json + print(messages) for message in messages: print(f"Received message: {message}") - return '', 200 + return json.dumps({'success': True}), 200, {'ContentType': 'application/json'} if __name__ == '__main__': app.run(port=5000) + ``` {{% /codetab %}} From f76c3fa7072e89e6853d9f0442b3f2a9d9e1f54b Mon Sep 17 00:00:00 2001 From: Hannah Hunter Date: Mon, 18 Nov 2024 11:27:21 -0500 Subject: [PATCH 8/8] roll back changes Signed-off-by: Hannah Hunter --- .../pubsub/subscription-methods.md | 42 +------------------ 1 file changed, 1 insertion(+), 41 deletions(-) diff --git a/daprdocs/content/en/developing-applications/building-blocks/pubsub/subscription-methods.md b/daprdocs/content/en/developing-applications/building-blocks/pubsub/subscription-methods.md index ad5745abc..62ed2811e 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/pubsub/subscription-methods.md +++ b/daprdocs/content/en/developing-applications/building-blocks/pubsub/subscription-methods.md @@ -203,47 +203,7 @@ As messages are sent to the given message handler code, there is no concept of r The example below shows the different ways to stream subscribe to a topic. -{{< tabs ".NET" Go>}} - -{{% codetab %}} - -```csharp -using Dapr.Messaging.PublishSubscribe; - -var clientBuilder = new DaprPublishSubscribeClientBuilder(); -var daprMessagingClient = clientBuilder.Build(); - -async Task HandleMessage(TopicMessage message, CancellationToken cancellationToken = default) -{ - try - { - //Do something with the message - Console.WriteLine(Encoding.UTF8.GetString(message.Data.Span)); - - return await Task.FromResult(TopicResponseAction.Success); - } - catch - { - return await Task.FromResult(TopicResponseAction.Retry); - } -} - -//Create a dynamic streaming subscription -var subscription = daprMessagingClient.Register("pubsub", "myTopic", - new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(15), TopicResponseAction.Retry)), - HandleMessage, CancellationToken.None); - -//Subscribe to messages on it with a timeout of 30 seconds -var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30)); -await subscription.SubscribeAsync(cancellationTokenSource.Token); - -await Task.Delay(TimeSpan.FromMinutes(1)); - -//When you're done with the subscription, simply dispose of it -await subscription.DisposeAsync(); -``` - -{{% /codetab %}} +{{< tabs Go>}} {{% codetab %}}