Add example for Input and output Bindings (#57)

* Add example for Input and output Bindings

* fix: add example tag

* chore: lint and fix

Signed-off-by: mikeee <hey@mike.ee>

* feat: implement validation

* chore: lint

Signed-off-by: mikeee <hey@mike.ee>

* chore: bump timeout

Signed-off-by: mikeee <hey@mike.ee>

* chore: ignore return code for kafka setup

Signed-off-by: mikeee <hey@mike.ee>

* chore: fix kafka step

Signed-off-by: mikeee <hey@mike.ee>

---------

Signed-off-by: Mike Nguyen <hey@mike.ee>
Signed-off-by: mikeee <hey@mike.ee>
Co-authored-by: Mike Nguyen <hey@mike.ee>
This commit is contained in:
Rafael Merlin 2024-07-07 03:56:03 +10:00 committed by GitHub
parent ea644546fb
commit 5764c6cf3d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 258 additions and 3 deletions

View File

@ -144,7 +144,7 @@ jobs:
fail-fast: false
matrix:
examples:
[ "actors", "client", "configuration", "crypto", "invoke/grpc", "invoke/grpc-proxying", "pubsub", "query_state", "secrets-bulk" ]
[ "actors", "bindings", "client", "configuration", "crypto", "invoke/grpc", "invoke/grpc-proxying", "pubsub", "query_state", "secrets-bulk" ]
steps:
- name: Check out code
uses: actions/checkout@v4

View File

@ -82,6 +82,14 @@ path = "examples/pubsub/publisher.rs"
name = "subscriber"
path = "examples/pubsub/subscriber.rs"
[[example]]
name = "output-bindings"
path = "examples/bindings/output.rs"
[[example]]
name = "input-bindings"
path = "examples/bindings/input.rs"
[[example]]
name = "query_state_q1"
path = "examples/query_state/query1.rs"

View File

@ -0,0 +1,70 @@
# Input and Output Bindings Example
This is a simple example that demonstrates Dapr's binding capabilities. To implement input bindings in your rust application, you need to implement `AppCallback` server for subscribing to events. Specifically, the following two methods need to be implemented for input bindings to work:
1. `list_input_bindings` - Dapr runtime calls this method to get list of bindings the application is subscribed to.
2. `on_binding_event` - Defines how the application handles the input binding event.
> **Note:** Make sure to use latest version of proto bindings.
In order to have both examples working with the same binding configuration ServiceBus was used here. If you don't have it available you can change to a binding that works for both Input and Output from [this list](https://docs.dapr.io/reference/components-reference/supported-bindings/)
## Running
To run this example:
1. Run a kafka container
<!-- STEP
name: Run kafka instance
background: true
sleep: 60
timeout_seconds: 120
expected_return_code:
expected_stderr_lines:
-->
```bash
docker run -p 9092:9092 apache/kafka:3.7.1
```
<!-- END_STEP -->
2. Run the multi-app run template (`dapr.yaml`)
<!-- STEP
name: Run Multi-app Run
output_match_mode: substring
match_order: sequential
expected_stdout_lines:
- '== APP - rust-input-b == Binding Name: binding-example'
- '== APP - rust-input-b == Message: 0 => hello from rust!'
- '== APP - rust-input-b == Binding Name: binding-example'
- '== APP - rust-input-b == Message: 1 => hello from rust!'
- '== APP - rust-input-b == Binding Name: binding-example'
- '== APP - rust-input-b == Message: 2 => hello from rust!'
- '== APP - rust-input-b == Binding Name: binding-example'
- '== APP - rust-input-b == Message: 3 => hello from rust!'
- '== APP - rust-input-b == Binding Name: binding-example'
- '== APP - rust-input-b == Message: 4 => hello from rust!'
- '== APP - rust-input-b == Binding Name: binding-example'
- '== APP - rust-input-b == Message: 5 => hello from rust!'
- '== APP - rust-input-b == Binding Name: binding-example'
- '== APP - rust-input-b == Message: 6 => hello from rust!'
- '== APP - rust-input-b == Binding Name: binding-example'
- '== APP - rust-input-b == Message: 7 => hello from rust!'
- '== APP - rust-input-b == Binding Name: binding-example'
- '== APP - rust-input-b == Message: 8 => hello from rust!'
- '== APP - rust-input-b == Binding Name: binding-example'
- '== APP - rust-input-b == Message: 9 => hello from rust!'
background: true
sleep: 30
timeout_seconds: 90
-->
```bash
dapr run -f .
```
<!-- END_STEP -->

View File

@ -0,0 +1,23 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: binding-example
spec:
type: bindings.kafka
metadata:
- name: direction
value: "input, output"
# Kafka broker connection setting
- name: brokers
value: localhost:9092
# consumer configuration: topic and consumer group
- name: topics
value: sample
- name: consumerGroup
value: group1
# publisher configuration: topic
- name: publishTopic
value: sample
- name: authType
value: "none"

View File

@ -0,0 +1,16 @@
version: 1
common:
resourcesPath: ./components/
daprdLogDestination: console
apps:
- appID: rust-input-b
appDirPath: ./
appProtocol: grpc
appPort: 50051
logLevel: debug
command: ["cargo", "run", "--example", "input-bindings"]
- appID: rust-output-b
appDirPath: ./
appProtocol: grpc
logLevel: debug
command: ["cargo", "run", "--example", "output-bindings"]

View File

@ -0,0 +1,87 @@
use tonic::{transport::Server, Request, Response, Status};
use dapr::dapr::dapr::proto::common::v1::{InvokeRequest, InvokeResponse};
use dapr::dapr::dapr::proto::runtime::v1::app_callback_server::{AppCallback, AppCallbackServer};
use dapr::dapr::dapr::proto::runtime::v1::{
BindingEventRequest, BindingEventResponse, ListInputBindingsResponse,
ListTopicSubscriptionsResponse, TopicEventRequest, TopicEventResponse,
};
#[derive(Default)]
pub struct AppCallbackService {}
#[tonic::async_trait]
impl AppCallback for AppCallbackService {
/// Invokes service method with InvokeRequest.
async fn on_invoke(
&self,
_request: Request<InvokeRequest>,
) -> Result<Response<InvokeResponse>, Status> {
Ok(Response::new(InvokeResponse::default()))
}
/// Lists all topics subscribed by this app.
async fn list_topic_subscriptions(
&self,
_request: Request<()>,
) -> Result<Response<ListTopicSubscriptionsResponse>, Status> {
Ok(Response::new(ListTopicSubscriptionsResponse::default()))
}
/// Subscribes events from Pubsub.
async fn on_topic_event(
&self,
_request: Request<TopicEventRequest>,
) -> Result<Response<TopicEventResponse>, Status> {
Ok(Response::new(TopicEventResponse::default()))
}
/// Lists all input bindings subscribed by this app.
/// NOTE: Dapr runtime will call this method to get
/// the list of bindings the app wants to subscribe to.
/// In this example, the app is subscribing to a local pubsub binding named "binding-example"
async fn list_input_bindings(
&self,
_request: Request<()>,
) -> Result<Response<ListInputBindingsResponse>, Status> {
let list_bindings = ListInputBindingsResponse {
bindings: vec![String::from("binding-example")],
};
Ok(Response::new(list_bindings))
}
/// Listens events from the input bindings.
async fn on_binding_event(
&self,
request: Request<BindingEventRequest>,
) -> Result<Response<BindingEventResponse>, Status> {
let r = request.into_inner();
let name = &r.name;
let data = &r.data;
let message = String::from_utf8_lossy(&data);
println!("Binding Name: {}", &name);
println!("Message: {}", &message);
Ok(Response::new(BindingEventResponse::default()))
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let addr = "[::]:50051".parse().unwrap();
let callback_service = AppCallbackService::default();
println!("AppCallback server listening on: {}", addr);
// Create a gRPC server with the callback_service.
Server::builder()
.add_service(AppCallbackServer::new(callback_service))
.serve(addr)
.await?;
Ok(())
}

View File

@ -0,0 +1,35 @@
use std::{collections::HashMap, thread, time::Duration};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// TODO: Handle this issue in the sdk
// Introduce delay so that dapr grpc port is assigned before app tries to connect
thread::sleep(Duration::from_secs(2));
// Get the Dapr port and create a connection
let addr = "https://127.0.0.1".to_string();
// Create the client
let mut client = dapr::Client::<dapr::client::TonicClient>::connect(addr).await?;
// name of the component
let binding_name = "binding-example";
for count in 0..10 {
// message metadata
let mut metadata = HashMap::<String, String>::new();
metadata.insert("count".to_string(), count.to_string());
// message
let message = format!("{} => hello from rust!", &count).into_bytes();
client
.invoke_binding(binding_name, message, "create", Some(metadata))
.await?;
// sleep for 500ms to simulate delay b/w two events
tokio::time::sleep(Duration::from_millis(500)).await;
}
Ok(())
}

View File

@ -66,6 +66,14 @@ impl TopicSubscription {
}
}
impl ListInputBindingsResponse {
pub fn binding(binding_name: String) -> Self {
Self {
bindings: vec![binding_name],
}
}
}
pub struct AppCallbackService {
handlers: Vec<Handler>,
}

View File

@ -70,15 +70,23 @@ impl<T: DaprInterface> Client<T> {
&mut self,
name: S,
data: Vec<u8>,
operation: S,
metadata: Option<HashMap<String, String>>,
) -> Result<InvokeBindingResponse, Error>
where
S: Into<String>,
{
let mut mdata = HashMap::<String, String>::new();
if let Some(m) = metadata {
mdata = m;
}
self.0
.invoke_binding(InvokeBindingRequest {
name: name.into(),
data,
..Default::default()
operation: operation.into(),
metadata: mdata,
})
.await
}

View File

@ -168,7 +168,7 @@ impl ActorTypeRegistration {
pub fn register_method<T>(
mut self,
method_name: &str,
handler: impl Handler<T, ActorState> + Send + Sync,
handler: impl Handler<T, ActorState> + Sync,
) -> Self
where
T: 'static,