Added pub sub example (#20)

* Added appcall proto and server implemenation

* added wrapper for appcallback

* WIP: working pubsub example

* added publisher code, some code cleanup

* fixed README typo

* code clean up, added comments

* resolved PR comments

* update README

* replaced thread::sleep with tokio::delay_for
This commit is contained in:
Gurpreet Singh 2020-07-13 18:22:42 -07:00 committed by GitHub
parent 9f503019e5
commit ef8ca517b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 417 additions and 46 deletions

View File

@ -15,4 +15,16 @@ async-trait = "0.1.24"
tonic-build = "0.2"
[dev-dependencies]
tokio = {version = "0.2", features = ["full"] }
tokio = {version = "0.2", features = ["full"] }
[[example]]
name = "client"
path = "examples/client/client.rs"
[[example]]
name = "publisher"
path = "examples/pubsub/publisher.rs"
[[example]]
name = "subscriber"
path = "examples/pubsub/subscriber.rs"

View File

@ -27,6 +27,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut client = dapr::Client::<dapr::client::TonicClient>::connect(addr).await?;
```
## Try out examples
[Examples](./examples)
## Building
To build

View File

@ -2,10 +2,11 @@
fn main() -> Result<(), std::io::Error> {
// env::set_var("OUT_DIR", "src");
tonic_build::configure().build_server(false).compile(
tonic_build::configure().build_server(true).compile(
&[
"dapr/proto/common/v1/common.proto",
"dapr/proto/runtime/v1/dapr.proto",
"dapr/proto/runtime/v1/appcallback.proto",
],
&["."],
)?;

View File

@ -0,0 +1,131 @@
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
syntax = "proto3";
package dapr.proto.runtime.v1;
import "google/protobuf/empty.proto";
import "dapr/proto/common/v1/common.proto";
option csharp_namespace = "Dapr.AppCallback.Autogen.Grpc.v1";
option java_outer_classname = "DaprAppCallbackProtos";
option java_package = "io.dapr.v1";
option go_package = "github.com/dapr/dapr/pkg/proto/runtime/v1;runtime";
// AppCallback V1 allows user application to interact with Dapr runtime.
// User application needs to implement AppCallback service if it needs to
// receive message from dapr runtime.
service AppCallback {
// Invokes service method with InvokeRequest.
rpc OnInvoke (common.v1.InvokeRequest) returns (common.v1.InvokeResponse) {}
// Lists all topics subscribed by this app.
rpc ListTopicSubscriptions(google.protobuf.Empty) returns (ListTopicSubscriptionsResponse) {}
// Subscribes events from Pubsub
rpc OnTopicEvent(TopicEventRequest) returns (google.protobuf.Empty) {}
// Lists all input bindings subscribed by this app.
rpc ListInputBindings(google.protobuf.Empty) returns (ListInputBindingsResponse) {}
// Listens events from the input bindings
//
// User application can save the states or send the events to the output
// bindings optionally by returning BindingEventResponse.
rpc OnBindingEvent(BindingEventRequest) returns (BindingEventResponse) {}
}
// TopicEventRequest message is compatiable with CloudEvent spec v1.0
// https://github.com/cloudevents/spec/blob/v1.0/spec.md
message TopicEventRequest {
// id identifies the event. Producers MUST ensure that source + id
// is unique for each distinct event. If a duplicate event is re-sent
// (e.g. due to a network error) it MAY have the same id.
string id = 1;
// source identifies the context in which an event happened.
// Often this will include information such as the type of the
// event source, the organization publishing the event or the process
// that produced the event. The exact syntax and semantics behind
// the data encoded in the URI is defined by the event producer.
string source = 2;
// The type of event related to the originating occurrence.
string type = 3;
// The version of the CloudEvents specification.
string spec_version = 4;
// The content type of data value.
string data_content_type = 5;
// The content of the event.
bytes data = 7;
// The pubsub topic which publisher sent to.
string topic = 6;
}
// BindingEventRequest represents input bindings event.
message BindingEventRequest {
// Requried. The name of the input binding component.
string name = 1;
// Required. The payload that the input bindings sent
bytes data = 2;
// The metadata set by the input binging components.
map<string,string> metadata = 3;
}
// BindingEventResponse includes operations to save state or
// send data to output bindings optionally.
message BindingEventResponse {
// The name of state store where states are saved.
string store_name = 1;
// The state key values which will be stored in store_name.
repeated common.v1.StateItem states = 2;
// BindingEventConcurrency is the kind of concurrency
enum BindingEventConcurrency {
// SEQUENTIAL sends data to output bindings specified in "to" sequentially.
SEQUENTIAL = 0;
// PARALLEL sends data to output bindings specified in "to" in parallel.
PARALLEL = 1;
}
// The list of output bindings.
repeated string to = 3;
// The content which will be sent to "to" output bindings.
bytes data = 4;
// The concurrency of output bindings to send data to
// "to" output bindings list. The default is SEQUENTIAL.
BindingEventConcurrency concurrency = 5;
}
// ListTopicSubscriptionsResponse is the message including the list of the subscribing topics.
message ListTopicSubscriptionsResponse {
// The list of topics.
repeated TopicSubscription subscriptions = 1;
}
// TopicSubscription represents topic and metadata.
message TopicSubscription {
// Required. The name of topic which will be subscribed
string topic = 1;
// The optional properties used for this topic's subscribtion e.g. session id
map<string,string> metadata = 2;
}
// ListInputBindingsResponse is the message including the list of input bindings.
message ListInputBindingsResponse {
// The list of input bindings.
repeated string bindings = 1;
}

View File

@ -1,24 +1,18 @@
Before you run the example make sure local redis state store is running by executing:
```
docker ps
```
# Dapr Rust SDK - Examples
1. To run the example we need to first build the examples using the following command:
These examples demonstrates how to use Dapr rust sdk.
```
cargo build --examples
```
* [client](./client)
* Simple dapr client example that saves, gets, and deletes state from the state stores
* [pubsub](./pubsub)
* Publishes and subscribes to events
2. Run the example with dapr using the following command:
## Adding new examples
```
dapr run --app-id=rustapp --grpc-port 3500 cargo run -- --example client
```
If everything went well you should see the following output along with dapr logs:
```
Successfully saved!
Value is "world"
Deleted value: []
```
To add new examples, `Cargo.toml` would have to be updated as follows:
```rust
[[example]]
name = "example-name"
path = "examples/example-name/example.rs"
```

24
examples/client/README.md Normal file
View File

@ -0,0 +1,24 @@
Before you run the example make sure local redis state store is running by executing:
```
docker ps
```
1. To run the example we need to first build the examples using the following command:
```
cargo build --examples
```
2. Run the example with dapr using the following command:
```
dapr run --app-id=rustapp --grpc-port 3500 cargo run -- --example client
```
If everything went well you should see the following output along with dapr logs:
```
Successfully saved!
Value is "world"
Deleted value: []
```

View File

@ -1,6 +1,3 @@
extern crate async_trait;
extern crate dapr;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// TODO: Handle this issue in the sdk
@ -16,7 +13,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let key = String::from("hello");
let val = String::from("world").as_bytes().to_vec();
let val = String::from("world").into_bytes();
let store_name = String::from("statestore");

27
examples/pubsub/README.md Normal file
View File

@ -0,0 +1,27 @@
# Pub/Sub Example
This is a simple example that demonstrates Dapr's pub/sub capabilities. To implement pub/sub in your rust application, you need to implement `AppCallback` server for subscribing to events. Specifically, the following two methods need to be implemented for pub/sub to work:
1. `list_topic_subscriptions` - Dapr runtime calls this method to get list of topics the application is subscribed to.
2. `on_topic_event` - Defines how the application handles the topic event.
> **Note:** Make sure to use latest version of proto bindings.
## Running
> Before you run the example make sure local redis state store is running by executing:
> ```
> docker ps
> ```
To run this example:
1. Start Subscriber (expose gRPC server receiver on port 50051):
```bash
dapr run --app-id rust-subscriber --protocol grpc --app-port 50051 cargo run -- --example subscriber
```
2. Start Publisher:
```bash
dapr run --app-id python-publisher --protocol grpc cargo run -- --example publisher
```

View File

@ -0,0 +1,29 @@
use std::{thread, time::Duration};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// TODO: Handle this issue in the sdk
// Introduce delay so that dapr grpc port is assigned before app tries to connect
thread::sleep(Duration::from_secs(2));
// Get the Dapr port and create a connection
let port: u16 = std::env::var("DAPR_GRPC_PORT")?.parse()?;
let addr = format!("https://127.0.0.1:{}", port);
// Create the client
let mut client = dapr::Client::<dapr::client::TonicClient>::connect(addr).await?;
// topic to publish message to
let topic = "A".to_string();
for count in 0..100 {
let message = format!("{} => hello from rust!", &count).into_bytes();
client.publish_event(&topic, message).await?;
// sleep for 2 secs to simulate delay b/w two events
tokio::time::delay_for(Duration::from_secs(2)).await;
}
Ok(())
}

View File

@ -0,0 +1,83 @@
use tonic::{transport::Server, Request, Response, Status};
use dapr::{
appcallback::*,
dapr::dapr::proto::runtime::v1::app_callback_server::{AppCallback, AppCallbackServer},
};
#[derive(Default)]
pub struct AppCallbackService {}
#[tonic::async_trait]
impl AppCallback for AppCallbackService {
/// Invokes service method with InvokeRequest.
async fn on_invoke(
&self,
_request: Request<InvokeRequest>,
) -> Result<Response<InvokeResponse>, Status> {
Ok(Response::new(InvokeResponse::default()))
}
/// Lists all topics subscribed by this app.
///
/// NOTE: Dapr runtime will call this method to get
/// the list of topics the app wants to subscribe to.
/// In this example, the app is subscribing to topic `A`.
async fn list_topic_subscriptions(
&self,
_request: Request<()>,
) -> Result<Response<ListTopicSubscriptionsResponse>, Status> {
let topic = "A".to_string();
let list_subscriptions = ListTopicSubscriptionsResponse::topic(topic);
Ok(Response::new(list_subscriptions))
}
/// Subscribes events from Pubsub.
async fn on_topic_event(
&self,
request: Request<TopicEventRequest>,
) -> Result<Response<()>, Status> {
let data = &request.into_inner().data;
let message = String::from_utf8_lossy(&data);
println!("Message: {}", &message);
Ok(Response::new(()))
}
/// Lists all input bindings subscribed by this app.
async fn list_input_bindings(
&self,
_request: Request<()>,
) -> Result<Response<ListInputBindingsResponse>, Status> {
Ok(Response::new(ListInputBindingsResponse::default()))
}
/// Listens events from the input bindings.
async fn on_binding_event(
&self,
_request: Request<BindingEventRequest>,
) -> Result<Response<BindingEventResponse>, Status> {
Ok(Response::new(BindingEventResponse::default()))
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "[::]:50051".parse().unwrap();
let callback_service = AppCallbackService::default();
println!("AppCallback server listening on: {}", addr);
// Create a gRPC server with the callback_service.
Server::builder()
.add_service(AppCallbackServer::new(callback_service))
.serve(addr)
.await?;
Ok(())
}

1
rustfmt.toml Normal file
View File

@ -0,0 +1 @@
edition = "2018"

58
src/appcallback.rs Normal file
View File

@ -0,0 +1,58 @@
use std::collections::HashMap;
use dapr::proto::{common::v1 as common_v1, runtime::v1 as dapr_v1};
use crate::dapr::*;
/// InvokeRequest is the message to invoke a method with the data.
pub type InvokeRequest = common_v1::InvokeRequest;
/// InvokeResponse is the response message inclduing data and its content type
/// from app callback.
pub type InvokeResponse = common_v1::InvokeResponse;
/// ListTopicSubscriptionsResponse is the message including the list of the subscribing topics.
pub type ListTopicSubscriptionsResponse = dapr_v1::ListTopicSubscriptionsResponse;
/// TopicSubscription represents a topic and it's metadata (session id etc.)
pub type TopicSubscription = dapr_v1::TopicSubscription;
/// TopicEventRequest message is compatiable with CloudEvent spec v1.0.
pub type TopicEventRequest = dapr_v1::TopicEventRequest;
/// ListInputBindingsResponse is the message including the list of input bindings.
pub type ListInputBindingsResponse = dapr_v1::ListInputBindingsResponse;
/// BindingEventRequest represents input bindings event.
pub type BindingEventRequest = dapr_v1::BindingEventRequest;
/// BindingEventResponse includes operations to save state or
/// send data to output bindings optionally.
pub type BindingEventResponse = dapr_v1::BindingEventResponse;
impl ListTopicSubscriptionsResponse {
/// Create `ListTopicSubscriptionsResponse` with a topic.
pub fn topic(topic: String) -> Self {
let topic_subscription = TopicSubscription::new(topic, None);
Self {
subscriptions: vec![topic_subscription],
}
}
}
impl TopicSubscription {
/// Create a new `TopicSubscription` for a give topic.
pub fn new(topic: String, metadata: Option<HashMap<String, String>>) -> Self {
let mut topic_subscription = TopicSubscription {
topic,
..Default::default()
};
if let Some(metadata) = metadata {
topic_subscription.metadata = metadata;
}
topic_subscription
}
}

View File

@ -1,10 +1,9 @@
use std::marker::Sized;
use async_trait::async_trait;
use dapr::proto::{common::v1 as common_v1, runtime::v1 as dapr_v1};
use prost_types::Any;
use tonic::{transport::Channel as TonicChannel, Request};
use crate::dapr::*;
use crate::error::Error;
pub struct Client<T>(T);
@ -55,7 +54,11 @@ impl<T: DaprInterface> Client<T> {
///
/// * `name` - The name of the output binding to invoke.
/// * `data` - The data which will be sent to the output binding.
pub async fn invoke_binding<S>(&mut self, name: S, data: Vec<u8>) -> Result<InvokeBindingResponse, Error>
pub async fn invoke_binding<S>(
&mut self,
name: S,
data: Vec<u8>,
) -> Result<InvokeBindingResponse, Error>
where
S: Into<String>,
{
@ -173,7 +176,10 @@ pub trait DaprInterface: Sized {
&mut self,
request: InvokeServiceRequest,
) -> Result<InvokeServiceResponse, Error>;
async fn invoke_binding(&mut self, request: InvokeBindingRequest) -> Result<InvokeBindingResponse, Error>;
async fn invoke_binding(
&mut self,
request: InvokeBindingRequest,
) -> Result<InvokeBindingResponse, Error>;
async fn get_secret(&mut self, request: GetSecretRequest) -> Result<GetSecretResponse, Error>;
async fn get_state(&mut self, request: GetStateRequest) -> Result<GetStateResponse, Error>;
async fn save_state(&mut self, request: SaveStateRequest) -> Result<(), Error>;
@ -196,7 +202,10 @@ impl DaprInterface for dapr_v1::dapr_client::DaprClient<TonicChannel> {
.into_inner())
}
async fn invoke_binding(&mut self, request: InvokeBindingRequest) -> Result<InvokeBindingResponse, Error> {
async fn invoke_binding(
&mut self,
request: InvokeBindingRequest,
) -> Result<InvokeBindingResponse, Error> {
Ok(self
.invoke_binding(Request::new(request))
.await?
@ -227,21 +236,6 @@ impl DaprInterface for dapr_v1::dapr_client::DaprClient<TonicChannel> {
}
}
pub mod dapr {
pub mod proto {
pub mod common {
pub mod v1 {
tonic::include_proto!("dapr.proto.common.v1");
}
}
pub mod runtime {
pub mod v1 {
tonic::include_proto!("dapr.proto.runtime.v1");
}
}
}
}
/// A request from invoking a service
pub type InvokeServiceRequest = dapr_v1::InvokeServiceRequest;
@ -276,7 +270,7 @@ pub type GetSecretRequest = dapr_v1::GetSecretRequest;
pub type GetSecretResponse = dapr_v1::GetSecretResponse;
/// A tonic based gRPC client
pub type TonicClient = dapr_v1::dapr_client::DaprClient<tonic::transport::Channel>;
pub type TonicClient = dapr_v1::dapr_client::DaprClient<TonicChannel>;
impl<K> From<(K, Vec<u8>)> for common_v1::StateItem
where

14
src/dapr.rs Normal file
View File

@ -0,0 +1,14 @@
pub mod dapr {
pub mod proto {
pub mod common {
pub mod v1 {
tonic::include_proto!("dapr.proto.common.v1");
}
}
pub mod runtime {
pub mod v1 {
tonic::include_proto!("dapr.proto.runtime.v1");
}
}
}
}

View File

@ -1,4 +1,6 @@
pub mod appcallback;
pub mod client;
pub mod dapr;
pub mod error;
pub use client::Client;