Merge pull request #234 from mikeee/conversation-api

feat: conversation api initial implementation
This commit is contained in:
Mike Nguyen 2024-11-07 19:29:12 +00:00 committed by GitHub
commit 04375f4fd2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 401 additions and 12 deletions

View File

@ -46,7 +46,7 @@ jobs:
DAPR_INSTALL_URL: https://raw.githubusercontent.com/dapr/cli/master/install/install.sh
DAPR_CLI_REF: ${{ github.event.inputs_daprcli_commit }}
DAPR_CLI_VERSION: ${{ github.event.inputs_daprcli_version }}
DAPR_REF: ${{ github.event.inputs.dapr_commit }}
DAPR_REF: 334ae9eea43d487a7b29a0e4aef904e3eba57a10
DAPR_RUNTIME_VERSION: ${{ github.event.inputs.dapr_version }}
CHECKOUT_REPO: ${{ github.repository }}
CHECKOUT_REF: ${{ github.ref }}
@ -220,7 +220,7 @@ jobs:
fail-fast: false
matrix:
examples:
[ "actors", "bindings", "client", "configuration", "crypto", "invoke/grpc", "invoke/grpc-proxying", "jobs", "pubsub", "query_state", "secrets-bulk" ]
[ "actors", "bindings", "client", "configuration", "conversation", "crypto", "invoke/grpc", "invoke/grpc-proxying", "jobs", "pubsub", "query_state", "secrets-bulk" ]
steps:
- name: Check out code
uses: actions/checkout@v4

View File

@ -533,6 +533,18 @@ impl<T: DaprInterface> Client<T> {
};
self.0.delete_job_alpha1(request).await
}
/// Converse with an LLM
///
/// # Arguments
///
/// * ConversationRequest - The request containing inputs to send to the LLM
pub async fn converse_alpha1(
&mut self,
request: ConversationRequest,
) -> Result<ConversationResponse, Error> {
self.0.converse_alpha1(request).await
}
}
#[async_trait]
@ -595,6 +607,11 @@ pub trait DaprInterface: Sized {
&mut self,
request: DeleteJobRequest,
) -> Result<DeleteJobResponse, Error>;
async fn converse_alpha1(
&mut self,
request: ConversationRequest,
) -> Result<ConversationResponse, Error>;
}
#[async_trait]
@ -789,6 +806,16 @@ impl DaprInterface for dapr_v1::dapr_client::DaprClient<TonicChannel> {
.await?
.into_inner())
}
async fn converse_alpha1(
&mut self,
request: ConversationRequest,
) -> Result<ConversationResponse, Error> {
Ok(self
.converse_alpha1(Request::new(request))
.await?
.into_inner())
}
}
/// A request from invoking a service
@ -907,6 +934,18 @@ pub type DeleteJobRequest = crate::dapr::proto::runtime::v1::DeleteJobRequest;
/// A response from a delete job request
pub type DeleteJobResponse = crate::dapr::proto::runtime::v1::DeleteJobResponse;
/// A request to conversate with an LLM
pub type ConversationRequest = crate::dapr::proto::runtime::v1::ConversationRequest;
/// A response from conversating with an LLM
pub type ConversationResponse = crate::dapr::proto::runtime::v1::ConversationResponse;
/// A result from an interacting with a LLM
pub type ConversationResult = crate::dapr::proto::runtime::v1::ConversationResult;
/// An input to the conversation
pub type ConversationInput = crate::dapr::proto::runtime::v1::ConversationInput;
type StreamPayload = crate::dapr::proto::common::v1::StreamPayload;
impl<K> From<(K, Vec<u8>)> for common_v1::StateItem
where
@ -987,3 +1026,62 @@ impl JobBuilder {
}
}
}
pub struct ConversationInputBuilder {
message: String,
role: Option<String>,
scrub_pii: Option<bool>,
}
impl ConversationInputBuilder {
pub fn new(message: &str) -> Self {
ConversationInputBuilder {
message: message.to_string(),
role: None,
scrub_pii: None,
}
}
pub fn build(self) -> ConversationInput {
ConversationInput {
message: self.message,
role: self.role,
scrub_pii: self.scrub_pii,
}
}
}
pub struct ConversationRequestBuilder {
name: String,
context_id: Option<String>,
inputs: Vec<ConversationInput>,
parameters: HashMap<String, Any>,
metadata: HashMap<String, String>,
scrub_pii: Option<bool>,
temperature: Option<f64>,
}
impl ConversationRequestBuilder {
pub fn new(name: &str, inputs: Vec<ConversationInput>) -> Self {
ConversationRequestBuilder {
name: name.to_string(),
context_id: None,
inputs,
parameters: Default::default(),
metadata: Default::default(),
scrub_pii: None,
temperature: None,
}
}
pub fn build(self) -> ConversationRequest {
ConversationRequest {
name: self.name,
context_id: self.context_id,
inputs: self.inputs,
parameters: self.parameters,
metadata: self.metadata,
scrub_pii: self.scrub_pii,
temperature: self.temperature,
}
}
}

View File

@ -3154,7 +3154,7 @@ pub struct Job {
///
/// Systemd timer style cron accepts 6 fields:
/// seconds | minutes | hours | day of month | month | day of week
/// 0-59 | 0-59 | 0-23 | 1-31 | 1-12/jan-dec | 0-7/sun-sat
/// 0-59 | 0-59 | 0-23 | 1-31 | 1-12/jan-dec | 0-6/sun-sat
///
/// "0 30 * * * *" - every hour on the half hour
/// "0 15 3 * * *" - every day at 03:15
@ -3228,6 +3228,72 @@ pub struct DeleteJobRequest {
/// Empty
#[derive(Clone, Copy, PartialEq, ::prost::Message)]
pub struct DeleteJobResponse {}
/// ConversationRequest is the request object for Conversation.
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ConversationRequest {
/// The name of Conversation component
#[prost(string, tag = "1")]
pub name: ::prost::alloc::string::String,
/// The ID of an existing chat (like in ChatGPT)
#[prost(string, optional, tag = "2")]
pub context_id: ::core::option::Option<::prost::alloc::string::String>,
/// Inputs for the conversation, support multiple input in one time.
#[prost(message, repeated, tag = "3")]
pub inputs: ::prost::alloc::vec::Vec<ConversationInput>,
/// Parameters for all custom fields.
#[prost(map = "string, message", tag = "4")]
pub parameters: ::std::collections::HashMap<
::prost::alloc::string::String,
::prost_types::Any,
>,
/// The metadata passing to conversation components.
#[prost(map = "string, string", tag = "5")]
pub metadata: ::std::collections::HashMap<
::prost::alloc::string::String,
::prost::alloc::string::String,
>,
/// Scrub PII data that comes back from the LLM
#[prost(bool, optional, tag = "6")]
pub scrub_pii: ::core::option::Option<bool>,
/// Temperature for the LLM to optimize for creativity or predictability
#[prost(double, optional, tag = "7")]
pub temperature: ::core::option::Option<f64>,
}
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ConversationInput {
/// The message to send to the llm
#[prost(string, tag = "1")]
pub message: ::prost::alloc::string::String,
/// The role to set for the message
#[prost(string, optional, tag = "2")]
pub role: ::core::option::Option<::prost::alloc::string::String>,
/// Scrub PII data that goes into the LLM
#[prost(bool, optional, tag = "3")]
pub scrub_pii: ::core::option::Option<bool>,
}
/// ConversationResult is the result for one input.
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ConversationResult {
/// Result for the one conversation input.
#[prost(string, tag = "1")]
pub result: ::prost::alloc::string::String,
/// Parameters for all custom fields.
#[prost(map = "string, message", tag = "2")]
pub parameters: ::std::collections::HashMap<
::prost::alloc::string::String,
::prost_types::Any,
>,
}
/// ConversationResponse is the response for Conversation.
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct ConversationResponse {
/// The ID of an existing chat (like in ChatGPT)
#[prost(string, optional, tag = "1")]
pub context_id: ::core::option::Option<::prost::alloc::string::String>,
/// An array of results.
#[prost(message, repeated, tag = "2")]
pub outputs: ::prost::alloc::vec::Vec<ConversationResult>,
}
/// PubsubSubscriptionType indicates the type of subscription
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
@ -4871,6 +4937,31 @@ pub mod dapr_client {
);
self.inner.unary(req, path, codec).await
}
/// Converse with a LLM service
pub async fn converse_alpha1(
&mut self,
request: impl tonic::IntoRequest<super::ConversationRequest>,
) -> std::result::Result<
tonic::Response<super::ConversationResponse>,
tonic::Status,
> {
self.inner
.ready()
.await
.map_err(|e| {
tonic::Status::unknown(
format!("Service was not ready: {}", e.into()),
)
})?;
let codec = tonic::codec::ProstCodec::default();
let path = http::uri::PathAndQuery::from_static(
"/dapr.proto.runtime.v1.Dapr/ConverseAlpha1",
);
let mut req = request.into_request();
req.extensions_mut()
.insert(GrpcMethod::new("dapr.proto.runtime.v1.Dapr", "ConverseAlpha1"));
self.inner.unary(req, path, codec).await
}
}
}
/// Generated server implementations.
@ -5318,6 +5409,14 @@ pub mod dapr_server {
tonic::Response<super::DeleteJobResponse>,
tonic::Status,
>;
/// Converse with a LLM service
async fn converse_alpha1(
&self,
request: tonic::Request<super::ConversationRequest>,
) -> std::result::Result<
tonic::Response<super::ConversationResponse>,
tonic::Status,
>;
}
/// Dapr service provides APIs to user application to access Dapr building blocks.
#[derive(Debug)]
@ -8000,6 +8099,49 @@ pub mod dapr_server {
};
Box::pin(fut)
}
"/dapr.proto.runtime.v1.Dapr/ConverseAlpha1" => {
#[allow(non_camel_case_types)]
struct ConverseAlpha1Svc<T: Dapr>(pub Arc<T>);
impl<T: Dapr> tonic::server::UnaryService<super::ConversationRequest>
for ConverseAlpha1Svc<T> {
type Response = super::ConversationResponse;
type Future = BoxFuture<
tonic::Response<Self::Response>,
tonic::Status,
>;
fn call(
&mut self,
request: tonic::Request<super::ConversationRequest>,
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
<T as Dapr>::converse_alpha1(&inner, request).await
};
Box::pin(fut)
}
}
let accept_compression_encodings = self.accept_compression_encodings;
let send_compression_encodings = self.send_compression_encodings;
let max_decoding_message_size = self.max_decoding_message_size;
let max_encoding_message_size = self.max_encoding_message_size;
let inner = self.inner.clone();
let fut = async move {
let method = ConverseAlpha1Svc(inner);
let codec = tonic::codec::ProstCodec::default();
let mut grpc = tonic::server::Grpc::new(codec)
.apply_compression_config(
accept_compression_encodings,
send_compression_encodings,
)
.apply_max_message_size_config(
max_decoding_message_size,
max_encoding_message_size,
);
let res = grpc.unary(method, req).await;
Ok(res)
};
Box::pin(fut)
}
_ => {
Box::pin(async move {
let mut response = http::Response::new(empty_body());

Binary file not shown.

View File

@ -47,6 +47,10 @@ path = "src/client/client.rs"
name = "configuration"
path = "src/configuration/main.rs"
[[example]]
name = "conversation"
path = "src/conversation/main.rs"
[[example]]
name = "crypto"
path = "src/crypto/main.rs"

View File

@ -0,0 +1,53 @@
# Dapr Conversation Example with the Rust-SDK
This example uses the echo component to send a request and the component response will be the exact message received.
## Step
### Prepare
- Dapr installed
### Run Conversation Example
1. To run the example we need to first build the examples using the following command:
<!-- STEP
name: Build
background: false
sleep: 30
timeout: 60
-->
```bash
cargo build --examples
```
<!-- END_STEP -->
2. Run the example using the Dapr CLI
<!-- STEP
name: Run Conversation
output_match_mode: substring
expected_stdout_lines:
- 'conversation input: "hello world"'
- 'conversation output: "hello world"'
background: true
sleep: 15
timeout_seconds: 30
-->
```bash
dapr run --app-id=conversation --resources-path ./config --dapr-grpc-port 3500 -- cargo run --example conversation
```
<!-- END_STEP -->
## Result
```
- 'conversation input: hello world'
- 'conversation output: hello world'
```

View File

@ -0,0 +1,7 @@
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: echo
spec:
type: conversation.echo
version: v1

View File

@ -0,0 +1,30 @@
use dapr::client::{ConversationInputBuilder, ConversationRequestBuilder};
use std::thread;
use std::time::Duration;
type DaprClient = dapr::Client<dapr::client::TonicClient>;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Sleep to allow for the server to become available
thread::sleep(Duration::from_secs(5));
// Set the Dapr address
let address = "https://127.0.0.1".to_string();
let mut client = DaprClient::connect(address).await?;
let input = ConversationInputBuilder::new("hello world").build();
let conversation_component = "echo";
let request =
ConversationRequestBuilder::new(conversation_component, vec![input.clone()]).build();
println!("conversation input: {:?}", input.message);
let response = client.converse_alpha1(request).await?;
println!("conversation output: {:?}", response.outputs[0].result);
Ok(())
}

View File

@ -58,7 +58,7 @@ service AppCallbackHealthCheck {
// AppCallbackAlpha V1 is an optional extension to AppCallback V1 to opt
// for Alpha RPCs.
service AppCallbackAlpha {
// Subscribes bulk events from Pubsub
// Subscribes bulk events from Pubsub
rpc OnBulkTopicEventAlpha1(TopicEventBulkRequest) returns (TopicEventBulkResponse) {}
// Sends job back to the app's endpoint at trigger time.
@ -185,14 +185,14 @@ message TopicEventBulkRequestEntry {
// content type of the event contained.
string content_type = 4;
// The metadata associated with the event.
map<string,string> metadata = 5;
}
// TopicEventBulkRequest represents request for bulk message
message TopicEventBulkRequest {
// Unique identifier for the bulk request.
// Unique identifier for the bulk request.
string id = 1;
// The list of items inside this bulk request.
@ -203,10 +203,10 @@ message TopicEventBulkRequest {
// The pubsub topic which publisher sent to.
string topic = 4;
// The name of the pubsub the publisher sent to.
string pubsub_name = 5;
// The type of event related to the originating occurrence.
string type = 6;
@ -310,8 +310,8 @@ message TopicRoutes {
message TopicRule {
// The optional CEL expression used to match the event.
// If the match is not specified, then the route is considered
// the default.
// If the match is not specified, then the route is considered
// the default.
string match = 1;
// The path used to identify matches for this subscription.
@ -340,4 +340,4 @@ message ListInputBindingsResponse {
// HealthCheckResponse is the message with the response to the health check.
// This message is currently empty as used as placeholder.
message HealthCheckResponse {}
message HealthCheckResponse {}

View File

@ -202,6 +202,9 @@ service Dapr {
// Delete a job
rpc DeleteJobAlpha1(DeleteJobRequest) returns (DeleteJobResponse) {}
// Converse with a LLM service
rpc ConverseAlpha1(ConversationRequest) returns (ConversationResponse) {}
}
// InvokeServiceRequest represents the request message for Service invocation.
@ -1206,7 +1209,7 @@ message Job {
//
// Systemd timer style cron accepts 6 fields:
// seconds | minutes | hours | day of month | month | day of week
// 0-59 | 0-59 | 0-23 | 1-31 | 1-12/jan-dec | 0-7/sun-sat
// 0-59 | 0-59 | 0-23 | 1-31 | 1-12/jan-dec | 0-6/sun-sat
//
// "0 30 * * * *" - every hour on the half hour
// "0 15 3 * * *" - every day at 03:15
@ -1274,4 +1277,56 @@ message DeleteJobRequest {
// DeleteJobResponse is the message response to delete the job by name.
message DeleteJobResponse {
// Empty
}
// ConversationRequest is the request object for Conversation.
message ConversationRequest {
// The name of Conversation component
string name = 1;
// The ID of an existing chat (like in ChatGPT)
optional string contextID = 2;
// Inputs for the conversation, support multiple input in one time.
repeated ConversationInput inputs = 3;
// Parameters for all custom fields.
map<string, google.protobuf.Any> parameters = 4;
// The metadata passing to conversation components.
map<string, string> metadata = 5;
// Scrub PII data that comes back from the LLM
optional bool scrubPII = 6;
// Temperature for the LLM to optimize for creativity or predictability
optional double temperature = 7;
}
message ConversationInput {
// The message to send to the llm
string message = 1;
// The role to set for the message
optional string role = 2;
// Scrub PII data that goes into the LLM
optional bool scrubPII = 3;
}
// ConversationResult is the result for one input.
message ConversationResult {
// Result for the one conversation input.
string result = 1;
// Parameters for all custom fields.
map<string, google.protobuf.Any> parameters = 2;
}
// ConversationResponse is the response for Conversation.
message ConversationResponse {
// The ID of an existing chat (like in ChatGPT)
optional string contextID = 1;
// An array of results.
repeated ConversationResult outputs = 2;
}