mirror of https://github.com/dapr/quickstarts.git
Next iteration of python pub/sub
This commit is contained in:
parent
1f32f0cf36
commit
06767bd857
|
|
@ -0,0 +1,4 @@
|
||||||
|
{
|
||||||
|
"python.linting.flake8Enabled": true,
|
||||||
|
"python.linting.enabled": true
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,10 @@
|
||||||
|
apiVersion: dapr.io/v1alpha1
|
||||||
|
kind: Configuration
|
||||||
|
metadata:
|
||||||
|
name: daprConfig
|
||||||
|
namespace: default
|
||||||
|
spec:
|
||||||
|
tracing:
|
||||||
|
samplingRate: "1"
|
||||||
|
zipkin:
|
||||||
|
endpointAddress: "http://localhost:9411/api/v2/spans"
|
||||||
|
|
@ -10,6 +10,3 @@ spec:
|
||||||
value: localhost:6379
|
value: localhost:6379
|
||||||
- name: redisPassword
|
- name: redisPassword
|
||||||
value: ""
|
value: ""
|
||||||
scopes:
|
|
||||||
- orderprocessing
|
|
||||||
- checkout
|
|
||||||
|
|
|
||||||
|
|
@ -1,17 +0,0 @@
|
||||||
from cloudevents.sdk.event import v1
|
|
||||||
from dapr.ext.grpc import App
|
|
||||||
import logging
|
|
||||||
|
|
||||||
import json
|
|
||||||
|
|
||||||
app = App()
|
|
||||||
|
|
||||||
logging.basicConfig(level = logging.INFO)
|
|
||||||
|
|
||||||
@app.subscribe(pubsub_name='order_pub_sub', topic='orders')
|
|
||||||
def mytopic(event: v1.Event) -> None:
|
|
||||||
data = json.loads(event.Data())
|
|
||||||
logging.info('Subscriber received: ' + str(data))
|
|
||||||
return '', 200
|
|
||||||
|
|
||||||
app.run(6002)
|
|
||||||
|
|
@ -1,25 +0,0 @@
|
||||||
import random
|
|
||||||
from time import sleep
|
|
||||||
import requests
|
|
||||||
import logging
|
|
||||||
import json
|
|
||||||
from dapr.clients import DaprClient
|
|
||||||
|
|
||||||
logging.basicConfig(level = logging.INFO)
|
|
||||||
|
|
||||||
|
|
||||||
while True:
|
|
||||||
sleep(random.randrange(50, 5000) / 1000)
|
|
||||||
orderId = random.randint(1, 1000)
|
|
||||||
PUBSUB_NAME = 'order_pub_sub'
|
|
||||||
TOPIC_NAME = 'orders'
|
|
||||||
with DaprClient() as client:
|
|
||||||
result = client.publish_event(
|
|
||||||
pubsub_name=PUBSUB_NAME,
|
|
||||||
topic_name=TOPIC_NAME,
|
|
||||||
data=json.dumps(orderId),
|
|
||||||
data_content_type='application/json',
|
|
||||||
)
|
|
||||||
logging.info('Published data: ' + str(orderId))
|
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -0,0 +1,97 @@
|
||||||
|
# Dapr pub/sub
|
||||||
|
|
||||||
|
In this quickstart, you'll create a publisher microservice and a subscriber microservice to demonstrate how Dapr enables a publish-subcribe pattern. The publisher will generate messages of a specific topic, while subscribers will listen for messages of specific topics. See [Why Pub-Sub](#why-pub-sub) to understand when this pattern might be a good choice for your software architecture.
|
||||||
|
|
||||||
|
Visit [this](https://docs.dapr.io/developing-applications/building-blocks/pubsub/) link for more information about Dapr and Pub-Sub.
|
||||||
|
|
||||||
|
This quickstart includes one publisher:
|
||||||
|
|
||||||
|
- Python client message generator `checkout`
|
||||||
|
|
||||||
|
And one subscriber:
|
||||||
|
|
||||||
|
- Python subscriber `order-processor`
|
||||||
|
|
||||||
|
### Run Python message publisher with Dapr
|
||||||
|
|
||||||
|
1. Open a new terminal window and navigate to `checkout` directory:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
cd checkout
|
||||||
|
```
|
||||||
|
|
||||||
|
2. Install dependencies:
|
||||||
|
|
||||||
|
<!-- STEP
|
||||||
|
name: Install python dependencies
|
||||||
|
working_dir: ./checkout
|
||||||
|
-->
|
||||||
|
|
||||||
|
```bash
|
||||||
|
pip3 install -r requirements.txt
|
||||||
|
```
|
||||||
|
|
||||||
|
3. Run the Python publisher app with Dapr:
|
||||||
|
|
||||||
|
<!-- STEP
|
||||||
|
name: Run python publisher
|
||||||
|
expected_stdout_lines:
|
||||||
|
- "You're up and running! Both Dapr and your app logs will appear here."
|
||||||
|
- '== APP == Received message "Message on A" on topic "A"'
|
||||||
|
- '== APP == Received message "Message on C" on topic "C"'
|
||||||
|
- "Exited Dapr successfully"
|
||||||
|
- "Exited App successfully"
|
||||||
|
expected_stderr_lines:
|
||||||
|
output_match_mode: substring
|
||||||
|
working_dir: ./checkout
|
||||||
|
background: true
|
||||||
|
sleep: 10
|
||||||
|
-->
|
||||||
|
|
||||||
|
```bash
|
||||||
|
dapr run --app-id checkoout --components-path ../../components/ -- python3 app.py
|
||||||
|
```
|
||||||
|
|
||||||
|
<!-- END_STEP -->
|
||||||
|
### Run Python message subscriber with Dapr
|
||||||
|
|
||||||
|
1. Open a new terminal window and navigate to `checkout` directory:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
cd order-processor
|
||||||
|
```
|
||||||
|
|
||||||
|
2. Install dependencies:
|
||||||
|
|
||||||
|
<!-- STEP
|
||||||
|
name: Install python dependencies
|
||||||
|
working_dir: ./order-processor
|
||||||
|
-->
|
||||||
|
|
||||||
|
```bash
|
||||||
|
pip3 install -r requirements.txt
|
||||||
|
```
|
||||||
|
|
||||||
|
3. Run the Python subscriber app with Dapr:
|
||||||
|
|
||||||
|
<!-- STEP
|
||||||
|
name: Run python subscriber
|
||||||
|
expected_stdout_lines:
|
||||||
|
- "You're up and running! Both Dapr and your app logs will appear here."
|
||||||
|
- '== APP == Received message "Message on A" on topic "A"'
|
||||||
|
- '== APP == Received message "Message on C" on topic "C"'
|
||||||
|
- "Exited Dapr successfully"
|
||||||
|
- "Exited App successfully"
|
||||||
|
expected_stderr_lines:
|
||||||
|
output_match_mode: substring
|
||||||
|
working_dir: ./order-processor
|
||||||
|
background: true
|
||||||
|
sleep: 10
|
||||||
|
-->
|
||||||
|
|
||||||
|
|
||||||
|
```bash
|
||||||
|
dapr run --app-id order-processor --components-path ../../components/ --app-port 5001 -- python3 app.py
|
||||||
|
```
|
||||||
|
|
||||||
|
<!-- END_STEP -->
|
||||||
|
|
@ -0,0 +1,21 @@
|
||||||
|
from dapr.clients import DaprClient
|
||||||
|
import json
|
||||||
|
import time
|
||||||
|
import random
|
||||||
|
import logging
|
||||||
|
|
||||||
|
logging.basicConfig(level=logging.INFO)
|
||||||
|
|
||||||
|
while True:
|
||||||
|
order = {"orderid": random.randint(1, 1000)}
|
||||||
|
|
||||||
|
with DaprClient() as client:
|
||||||
|
result = client.publish_event( # publish an event using Dapr pub-sub
|
||||||
|
pubsub_name="order_pub_sub",
|
||||||
|
topic_name="orders",
|
||||||
|
data=json.dumps(order),
|
||||||
|
data_content_type="application/json",
|
||||||
|
)
|
||||||
|
|
||||||
|
logging.info("Published data: " + json.dumps(order))
|
||||||
|
time.sleep(1)
|
||||||
File diff suppressed because it is too large
Load Diff
|
|
@ -0,0 +1,29 @@
|
||||||
|
import flask
|
||||||
|
from flask import request, jsonify
|
||||||
|
# from cloudevents.sdk.event import v1
|
||||||
|
# from dapr.ext.grpc import App
|
||||||
|
import json
|
||||||
|
|
||||||
|
app = flask.Flask(__name__)
|
||||||
|
# app = App()
|
||||||
|
# @app.subscribe(pubsub_name='order_pub_sub', topic='orders')
|
||||||
|
# def orders_subscribe(event: v1.Event) -> None:
|
||||||
|
# data = json.loads(event.Data())
|
||||||
|
# logging.info('Subscriber received: ' + str(data))
|
||||||
|
# return json.dumps({'success': True}), 200, {'ContentType': 'application/json'}
|
||||||
|
|
||||||
|
|
||||||
|
@app.route('/dapr/subscribe', methods=['GET'])
|
||||||
|
def subscribe():
|
||||||
|
subscriptions = [{'pubsubname': 'order_pub_sub', 'topic': 'orders', 'route': 'orders'}]
|
||||||
|
return jsonify(subscriptions)
|
||||||
|
|
||||||
|
|
||||||
|
@app.route('/orders', methods=['POST'])
|
||||||
|
def a_subscriber():
|
||||||
|
print(f'orders: {request.json}', flush=True)
|
||||||
|
print('Received message "{}" on topic "{}"'.format(request.json['data']['orderid'], request.json['topic']), flush=True)
|
||||||
|
return json.dumps({'success':True}), 200, {'ContentType':'application/json'}
|
||||||
|
|
||||||
|
|
||||||
|
app.run(port=5001)
|
||||||
|
|
@ -1,8 +0,0 @@
|
||||||
Click==7.0
|
|
||||||
Flask==1.1.1
|
|
||||||
Flask-Cors==3.0.9
|
|
||||||
itsdangerous==1.1.0
|
|
||||||
jinja2>=2.11.3
|
|
||||||
MarkupSafe==1.1.1
|
|
||||||
six==1.12.0
|
|
||||||
Werkzeug==0.15.6
|
|
||||||
Loading…
Reference in New Issue