Updated protos and implemented SDK endpoints to support streaming responses from the conversation endpoint.

Signed-off-by: Whit Waldo <whit.waldo@innovian.net>
This commit is contained in:
Whit Waldo 2025-06-21 00:44:29 -05:00
parent 8aa6a7fe86
commit c3b17e8d0b
5 changed files with 253 additions and 9 deletions

View File

@ -0,0 +1,101 @@
// ------------------------------------------------------------------------
// Copyright 2025 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------
using System.Runtime.CompilerServices;
using System.Threading.Channels;
using Grpc.Core;
using Autogenerated = Dapr.Client.Autogen.Grpc.v1;
namespace Dapr.AI.Conversation;
/// <summary>
/// Provides the implementation to process the streamed response from a conversation endpoint invocation.
/// </summary>
internal sealed class ConversationStreamProcessor : IDisposable
{
private bool disposed;
private readonly Channel<string> outputChannel = Channel.CreateUnbounded<string>();
/// <summary>
/// Surfaces any exceptions encountered while asynchronously processing the outbound stream.
/// </summary>
internal event EventHandler<Exception>? OnException;
/// <summary>
/// Reads the chunks out asynchronously from the streaming source into the channel.
/// </summary>
/// <param name="call">The call made to the Dapr sidecar to process the response from.</param>
/// <param name="cancellationToken">Token used to cancel the ongoing request.</param>
public Task ProcessStreamAsync(
AsyncServerStreamingCall<Autogenerated.ConversationStreamResponse> call,
CancellationToken cancellationToken)
{
// Start reading from the gRPC call and writing to the output channel.
_ = Task.Run(async () =>
{
try
{
await foreach (var response in call.ResponseStream.ReadAllAsync(cancellationToken))
{
await outputChannel.Writer.WriteAsync(response.Chunk.Content, cancellationToken);
}
}
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
{
// Expected cancellation exception
}
catch (Exception ex)
{
OnException?.Invoke(this, ex);
}
finally
{
outputChannel.Writer.Complete();
}
}, cancellationToken);
return Task.CompletedTask;
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
private void Dispose(bool disposing)
{
if (!disposed)
{
if (disposing)
{
outputChannel.Writer.TryComplete();
}
disposed = true;
}
}
/// <summary>
/// Retrieves the processed content from the operation from the Dapr sidecar and returns as an
/// enumerable stream.
/// </summary>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns></returns>
public async IAsyncEnumerable<string> GetProcessedDataAsync([EnumeratorCancellation] CancellationToken cancellationToken)
{
await foreach (var data in outputChannel.Reader.ReadAllAsync(cancellationToken))
{
yield return data;
}
}
}

View File

@ -80,4 +80,19 @@ public abstract class DaprConversationClient : DaprAIClient
public abstract Task<DaprConversationResponse> ConverseAsync(string daprConversationComponentName,
IReadOnlyList<DaprConversationInput> inputs, ConversationOptions? options = null,
CancellationToken cancellationToken = default);
/// <summary>
/// Sends various inputs to the large language model via the Conversational building block on the Dapr sidecar
/// and get a streamed response back.
/// </summary>
/// <param name="daprConversationComponentName">The name of the Dapr conversation component.</param>
/// <param name="inputs">The input values to send.</param>
/// <param name="options">Optional options used to configure the conversation.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The response provided as a stream by the LLM provider.</returns>
public abstract IAsyncEnumerable<string> ConverseAsStreamAsync(
string daprConversationComponentName,
IReadOnlyList<DaprConversationInput> inputs,
ConversationOptions? options = null,
CancellationToken cancellationToken = default);
}

View File

@ -11,8 +11,10 @@
// limitations under the License.
// ------------------------------------------------------------------------
using System.Runtime.CompilerServices;
using Dapr.Common;
using Dapr.Common.Extensions;
using Grpc.Core;
using Autogenerated = Dapr.Client.Autogen.Grpc.v1;
namespace Dapr.AI.Conversation;
@ -35,6 +37,63 @@ internal sealed class DaprConversationGrpcClient(Autogenerated.Dapr.DaprClient c
/// <returns>The response(s) provided by the LLM provider.</returns>
public override async Task<DaprConversationResponse> ConverseAsync(string daprConversationComponentName, IReadOnlyList<DaprConversationInput> inputs, ConversationOptions? options = null,
CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrEmpty(daprConversationComponentName);
//Build out the common request and gRPC options to the endpoint
var (request, grpcCallOptions) = BuildRequest(daprConversationComponentName, inputs, options, cancellationToken);
var result = await Client.ConverseAlpha1Async(request, grpcCallOptions).ConfigureAwait(false);
var outputs = result.Outputs.Select(output => new DaprConversationResult(output.Result)
{
Parameters = output.Parameters.ToDictionary(kvp => kvp.Key, parameter => parameter.Value)
}).ToList();
return new DaprConversationResponse(outputs);
}
/// <summary>
/// Sends various inputs to the large language model via the Conversational building block on the Dapr sidecar
/// and get a streamed response back.
/// </summary>
/// <param name="daprConversationComponentName">The name of the Dapr conversation component.</param>
/// <param name="inputs">The input values to send.</param>
/// <param name="options">Optional options used to configure the conversation.</param>
/// <param name="cancellationToken">Cancellation token.</param>
/// <returns>The response provided as a stream by the LLM provider.</returns>
public override async IAsyncEnumerable<string> ConverseAsStreamAsync(
string daprConversationComponentName,
IReadOnlyList<DaprConversationInput> inputs,
ConversationOptions? options = null,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
ArgumentException.ThrowIfNullOrEmpty(daprConversationComponentName);
EventHandler<Exception> exceptionHandler = (_, ex) => throw ex;
//Build out the common request and gRPC options to the endpoint
var (request, grpcCallOptions) = BuildRequest(daprConversationComponentName, inputs, options, cancellationToken);
var streamResponse = Client.ConverseStreamAlpha1(request, grpcCallOptions);
using var streamProcessor = new ConversationStreamProcessor();
try
{
streamProcessor.OnException += exceptionHandler;
await streamProcessor.ProcessStreamAsync(streamResponse, cancellationToken);
await foreach (var content in streamProcessor.GetProcessedDataAsync(cancellationToken))
{
yield return content;
}
}
finally
{
streamProcessor.OnException -= exceptionHandler;
}
}
private (Autogenerated.ConversationRequest request, CallOptions grpcCallOptions) BuildRequest(string daprConversationComponentName,
IReadOnlyList<DaprConversationInput> inputs, ConversationOptions? options, CancellationToken cancellationToken)
{
var request = new Autogenerated.ConversationRequest
{
@ -70,18 +129,12 @@ internal sealed class DaprConversationGrpcClient(Autogenerated.Dapr.DaprClient c
Role = input.Role.GetValueFromEnumMember()
});
}
var grpCCallOptions =
var grpcCallOptions =
DaprClientUtilities.ConfigureGrpcCallOptions(typeof(DaprConversationClient).Assembly, this.DaprApiToken,
cancellationToken);
var result = await Client.ConverseAlpha1Async(request, grpCCallOptions).ConfigureAwait(false);
var outputs = result.Outputs.Select(output => new DaprConversationResult(output.Result)
{
Parameters = output.Parameters.ToDictionary(kvp => kvp.Key, parameter => parameter.Value)
}).ToList();
return new DaprConversationResponse(outputs);
return (request, grpcCallOptions);
}
/// <inheritdoc />

View File

@ -0,0 +1,35 @@
// ------------------------------------------------------------------------
// Copyright 2025 The Dapr Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// ------------------------------------------------------------------------
namespace Dapr.AI.Conversation;
/// <summary>
/// Represents token usage statistics.
/// </summary>
public record DaprConversationUsage
{
/// <summary>
/// The number of tokens in the prompt.
/// </summary>
public int? PromptTokens { get; init; }
/// <summary>
/// The number of tokens in the completion.
/// </summary>
public int? CompletionTokens { get; init; }
/// <summary>
/// The total number of tokens used.
/// </summary>
public int? TotalTokens { get; init; }
}

View File

@ -220,6 +220,9 @@ service Dapr {
// Converse with a LLM service
rpc ConverseAlpha1(ConversationRequest) returns (ConversationResponse) {}
// Converse with a LLM service using streaming
rpc ConverseStreamAlpha1(ConversationRequest) returns (stream ConversationStreamResponse) {}
}
// InvokeServiceRequest represents the request message for Service invocation.
@ -1357,4 +1360,41 @@ message ConversationResponse {
// An array of results.
repeated ConversationResult outputs = 2;
// Usage statistics if available
optional ConversationUsage usage = 3;
}
// ConversationStreamResponse is the streaming response for Conversation.
message ConversationStreamResponse {
oneof response_type {
ConversationStreamChunk chunk = 1;
ConversationStreamComplete complete = 2;
}
}
// ConversationStreamChunk represents a streaming content chunk.
message ConversationStreamChunk {
// Streaming content chunk
string content = 1;
}
// ConversationStreamComplete indicates the streaming conversation has completed.
message ConversationStreamComplete {
// Final context ID
optional string contextID = 1;
// Usage statistics if available
optional ConversationUsage usage = 2;
}
// ConversationUsage represents token usage statistics.
message ConversationUsage {
// Number of tokens in the prompt
optional int32 prompt_tokens = 1 [json_name = "promptTokens"];
// Number of tokens in the completion
optional int32 completion_tokens = 2 [json_name = "completionTokens"];
// Total number of tokens used
optional int32 total_tokens = 3 [json_name = "totalTokens"];
}