diff --git a/daprdocs/content/en/developing-applications/building-blocks/pubsub/subscription-methods.md b/daprdocs/content/en/developing-applications/building-blocks/pubsub/subscription-methods.md index 62ed2811e..5c31057ee 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/pubsub/subscription-methods.md +++ b/daprdocs/content/en/developing-applications/building-blocks/pubsub/subscription-methods.md @@ -203,7 +203,112 @@ As messages are sent to the given message handler code, there is no concept of r The example below shows the different ways to stream subscribe to a topic. -{{< tabs Go>}} +{{< tabs Python Go >}} + +{{% codetab %}} + +You can use the `subscribe` method, which returns a `Subscription` object and allows you to pull messages from the stream by calling the `next_message` method. This runs in and may block the main thread while waiting for messages. + +```python +import time + +from dapr.clients import DaprClient +from dapr.clients.grpc.subscription import StreamInactiveError + +counter = 0 + + +def process_message(message): + global counter + counter += 1 + # Process the message here + print(f'Processing message: {message.data()} from {message.topic()}...') + return 'success' + + +def main(): + with DaprClient() as client: + global counter + + subscription = client.subscribe( + pubsub_name='pubsub', topic='orders', dead_letter_topic='orders_dead' + ) + + try: + while counter < 5: + try: + message = subscription.next_message() + + except StreamInactiveError as e: + print('Stream is inactive. Retrying...') + time.sleep(1) + continue + if message is None: + print('No message received within timeout period.') + continue + + # Process the message + response_status = process_message(message) + + if response_status == 'success': + subscription.respond_success(message) + elif response_status == 'retry': + subscription.respond_retry(message) + elif response_status == 'drop': + subscription.respond_drop(message) + + finally: + print("Closing subscription...") + subscription.close() + + +if __name__ == '__main__': + main() + +``` + +You can also use the `subscribe_with_handler` method, which accepts a callback function executed for each message received from the stream. This runs in a separate thread, so it doesn't block the main thread. + +```python +import time + +from dapr.clients import DaprClient +from dapr.clients.grpc._response import TopicEventResponse + +counter = 0 + + +def process_message(message): + # Process the message here + global counter + counter += 1 + print(f'Processing message: {message.data()} from {message.topic()}...') + return TopicEventResponse('success') + + +def main(): + with (DaprClient() as client): + # This will start a new thread that will listen for messages + # and process them in the `process_message` function + close_fn = client.subscribe_with_handler( + pubsub_name='pubsub', topic='orders', handler_fn=process_message, + dead_letter_topic='orders_dead' + ) + + while counter < 5: + time.sleep(1) + + print("Closing subscription...") + close_fn() + + +if __name__ == '__main__': + main() +``` + +[Learn more about streaming subscriptions using the Python SDK client.]({{< ref "python-client.md#streaming-message-subscription" >}}) + +{{% /codetab %}} {{% codetab %}}