Add priority for rabbitmq (#3282)

* add priority for rabbitmq

Signed-off-by: yaron2 <schneider.yaron@live.com>

* update subscription version

Signed-off-by: yaron2 <schneider.yaron@live.com>

---------

Signed-off-by: yaron2 <schneider.yaron@live.com>
This commit is contained in:
Yaron Schneider 2023-03-23 08:36:22 -07:00 committed by GitHub
parent eeccdd1261
commit d8672a9aeb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 172 additions and 8 deletions

View File

@ -183,14 +183,15 @@ Setting `exchangeKind` to `"topic"` uses the topic exchanges, which are commonly
Messages with a `routing key` will be routed to one or many queues based on the `routing key` defined in the metadata when subscribing.
The routing key is defined by the `routingKey` metadata. For example, if an app is configured with a routing key `keyA`:
```
apiVersion: dapr.io/v1alpha1
```yaml
apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
name: order_pub_sub
name: orderspubsub
spec:
topic: B
route: /B
routes:
default: /B
pubsubname: pubsub
metadata:
routingKey: keyA
@ -210,14 +211,15 @@ client.PublishEvent(context.Background(), "pubsub", "B", []byte("this is another
Multiple routing keys can be separated by commas.
The example below binds three `routingKey`: `keyA`, `keyB`, and `""`. Note the binding method of empty keys.
```
apiVersion: dapr.io/v1alpha1
```yaml
apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
name: order_pub_sub
name: orderspubsub
spec:
topic: B
route: /B
routes:
default: /B
pubsubname: pubsub
metadata:
routingKey: keyA,keyB,
@ -226,6 +228,168 @@ spec:
For more information see [rabbitmq exchanges](https://www.rabbitmq.com/tutorials/amqp-concepts.html#exchanges).
## Use priority queues
Dapr supports RabbitMQ [priority queues](https://www.rabbitmq.com/priority.html). To set a priority for a queue, use the `maxPriority` topic subscription metadata.
### Declarative priority queue example
```yaml
apiVersion: dapr.io/v2alpha1
kind: Subscription
metadata:
name: pubsub
spec:
topic: checkout
routes:
default: /orders
pubsubname: order-pub-sub
metadata:
maxPriority: 3
```
### Programmatic priority queue example
{{< tabs Python JavaScript Go>}}
{{% codetab %}}
```python
@app.route('/dapr/subscribe', methods=['GET'])
def subscribe():
subscriptions = [
{
'pubsubname': 'pubsub',
'topic': 'checkout',
'routes': {
'default': '/orders'
},
'metadata': {'maxPriority': '3'}
}
]
return jsonify(subscriptions)
```
{{% /codetab %}}
{{% codetab %}}
```javascript
const express = require('express')
const bodyParser = require('body-parser')
const app = express()
app.use(bodyParser.json({ type: 'application/*+json' }));
const port = 3000
app.get('/dapr/subscribe', (req, res) => {
res.json([
{
pubsubname: "pubsub",
topic: "checkout",
routes: {
default: '/orders'
},
metadata: {
maxPriority: '3'
}
}
]);
})
```
{{% /codetab %}}
{{% codetab %}}
```go
package main
"encoding/json"
"net/http"
const appPort = 3000
type subscription struct {
PubsubName string `json:"pubsubname"`
Topic string `json:"topic"`
Metadata map[string]string `json:"metadata,omitempty"`
Routes routes `json:"routes"`
}
type routes struct {
Rules []rule `json:"rules,omitempty"`
Default string `json:"default,omitempty"`
}
// This handles /dapr/subscribe
func configureSubscribeHandler(w http.ResponseWriter, _ *http.Request) {
t := []subscription{
{
PubsubName: "pubsub",
Topic: "checkout",
Routes: routes{
Default: "/orders",
},
Metadata: map[string]string{
"maxPriority": "3"
},
},
}
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(t)
}
```
{{% /codetab %}}
{{< /tabs >}}
### Setting a priority when publishing a message
To set a priority on a message, add the publish metadata key `maxPriority` to the publish endpoint or SDK method.
{{< tabs "HTTP API (Bash)" Python JavaScript Go>}}
{{% codetab %}}
```bash
curl -X POST http://localhost:3601/v1.0/publish/order-pub-sub/orders?metadata.maxPriority=3 -H "Content-Type: application/json" -d '{"orderId": "100"}'
```
{{% /codetab %}}
{{% codetab %}}
```python
with DaprClient() as client:
result = client.publish_event(
pubsub_name=PUBSUB_NAME,
topic_name=TOPIC_NAME,
data=json.dumps(orderId),
data_content_type='application/json',
metadata= { 'maxPriority': '3' })
```
{{% /codetab %}}
{{% codetab %}}
```javascript
await client.pubsub.publish(PUBSUB_NAME, TOPIC_NAME, orderId, { 'maxPriority': '3' });
```
{{% /codetab %}}
{{% codetab %}}
```go
client.PublishEvent(ctx, PUBSUB_NAME, TOPIC_NAME, []byte(strconv.Itoa(orderId)), map[string]string{"maxPriority": "3"})
```
{{% /codetab %}}
{{< /tabs >}}
## Related links
- [Basic schema for a Dapr component]({{< ref component-schema >}}) in the Related links section