Implement genai events for bedrock (streaming) (#13507)
This commit is contained in:
parent
06bd699bbc
commit
2f8017113a
|
@ -152,6 +152,6 @@ public class AwsSdkTelemetry {
|
|||
@NoMuzzle
|
||||
public BedrockRuntimeAsyncClient wrapBedrockRuntimeClient(
|
||||
BedrockRuntimeAsyncClient bedrockClient) {
|
||||
return BedrockRuntimeImpl.wrap(bedrockClient);
|
||||
return BedrockRuntimeImpl.wrap(bedrockClient, eventLogger, genAiCaptureMessageContent);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,14 +24,21 @@ import java.util.Collections;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
import javax.annotation.Nullable;
|
||||
import software.amazon.awssdk.core.SdkRequest;
|
||||
import software.amazon.awssdk.core.SdkResponse;
|
||||
import software.amazon.awssdk.core.async.SdkPublisher;
|
||||
import software.amazon.awssdk.core.document.Document;
|
||||
import software.amazon.awssdk.protocols.json.SdkJsonGenerator;
|
||||
import software.amazon.awssdk.protocols.jsoncore.JsonNode;
|
||||
import software.amazon.awssdk.protocols.jsoncore.JsonNodeParser;
|
||||
import software.amazon.awssdk.services.bedrockruntime.BedrockRuntimeAsyncClient;
|
||||
import software.amazon.awssdk.services.bedrockruntime.model.ContentBlock;
|
||||
import software.amazon.awssdk.services.bedrockruntime.model.ContentBlockDelta;
|
||||
import software.amazon.awssdk.services.bedrockruntime.model.ContentBlockDeltaEvent;
|
||||
import software.amazon.awssdk.services.bedrockruntime.model.ContentBlockStartEvent;
|
||||
import software.amazon.awssdk.services.bedrockruntime.model.ContentBlockStopEvent;
|
||||
import software.amazon.awssdk.services.bedrockruntime.model.ConverseRequest;
|
||||
import software.amazon.awssdk.services.bedrockruntime.model.ConverseResponse;
|
||||
import software.amazon.awssdk.services.bedrockruntime.model.ConverseStreamMetadataEvent;
|
||||
|
@ -41,11 +48,13 @@ import software.amazon.awssdk.services.bedrockruntime.model.ConverseStreamRespon
|
|||
import software.amazon.awssdk.services.bedrockruntime.model.ConverseStreamResponseHandler;
|
||||
import software.amazon.awssdk.services.bedrockruntime.model.InferenceConfiguration;
|
||||
import software.amazon.awssdk.services.bedrockruntime.model.Message;
|
||||
import software.amazon.awssdk.services.bedrockruntime.model.MessageStartEvent;
|
||||
import software.amazon.awssdk.services.bedrockruntime.model.MessageStopEvent;
|
||||
import software.amazon.awssdk.services.bedrockruntime.model.StopReason;
|
||||
import software.amazon.awssdk.services.bedrockruntime.model.TokenUsage;
|
||||
import software.amazon.awssdk.services.bedrockruntime.model.ToolResultContentBlock;
|
||||
import software.amazon.awssdk.services.bedrockruntime.model.ToolUseBlock;
|
||||
import software.amazon.awssdk.services.bedrockruntime.model.ToolUseBlockStart;
|
||||
import software.amazon.awssdk.thirdparty.jackson.core.JsonFactory;
|
||||
|
||||
/**
|
||||
|
@ -59,6 +68,8 @@ public final class BedrockRuntimeImpl {
|
|||
private static final AttributeKey<String> GEN_AI_SYSTEM = stringKey("gen_ai.system");
|
||||
|
||||
private static final JsonFactory JSON_FACTORY = new JsonFactory();
|
||||
private static final JsonNodeParser JSON_PARSER = JsonNode.parser();
|
||||
private static final DocumentUnmarshaller DOCUMENT_UNMARSHALLER = new DocumentUnmarshaller();
|
||||
|
||||
static boolean isBedrockRuntimeRequest(SdkRequest request) {
|
||||
if (request instanceof ConverseRequest) {
|
||||
|
@ -202,35 +213,54 @@ public final class BedrockRuntimeImpl {
|
|||
static void recordRequestEvents(
|
||||
Context otelContext, Logger eventLogger, SdkRequest request, boolean captureMessageContent) {
|
||||
if (request instanceof ConverseRequest) {
|
||||
for (Message message : ((ConverseRequest) request).messages()) {
|
||||
long numToolResults =
|
||||
message.content().stream().filter(block -> block.toolResult() != null).count();
|
||||
if (numToolResults > 0) {
|
||||
// Tool results are different from others, emitting multiple events for a single message,
|
||||
// so treat them separately.
|
||||
emitToolResultEvents(otelContext, eventLogger, message, captureMessageContent);
|
||||
if (numToolResults == message.content().size()) {
|
||||
continue;
|
||||
}
|
||||
// There are content blocks besides tool results in the same message. While models
|
||||
// generally don't expect such usage, the SDK allows it so go ahead and generate a normal
|
||||
// message too.
|
||||
recordRequestMessageEvents(
|
||||
otelContext, eventLogger, ((ConverseRequest) request).messages(), captureMessageContent);
|
||||
}
|
||||
if (request instanceof ConverseStreamRequest) {
|
||||
recordRequestMessageEvents(
|
||||
otelContext,
|
||||
eventLogger,
|
||||
((ConverseStreamRequest) request).messages(),
|
||||
captureMessageContent);
|
||||
|
||||
// Good a time as any to store the context for a streaming request.
|
||||
TracingConverseStreamResponseHandler.fromContext(otelContext).setOtelContext(otelContext);
|
||||
}
|
||||
}
|
||||
|
||||
private static void recordRequestMessageEvents(
|
||||
Context otelContext,
|
||||
Logger eventLogger,
|
||||
List<Message> messages,
|
||||
boolean captureMessageContent) {
|
||||
for (Message message : messages) {
|
||||
long numToolResults =
|
||||
message.content().stream().filter(block -> block.toolResult() != null).count();
|
||||
if (numToolResults > 0) {
|
||||
// Tool results are different from others, emitting multiple events for a single message,
|
||||
// so treat them separately.
|
||||
emitToolResultEvents(otelContext, eventLogger, message, captureMessageContent);
|
||||
if (numToolResults == message.content().size()) {
|
||||
continue;
|
||||
}
|
||||
LogRecordBuilder event = newEvent(otelContext, eventLogger);
|
||||
switch (message.role()) {
|
||||
case ASSISTANT:
|
||||
event.setAttribute(EVENT_NAME, "gen_ai.assistant.message");
|
||||
break;
|
||||
case USER:
|
||||
event.setAttribute(EVENT_NAME, "gen_ai.user.message");
|
||||
break;
|
||||
default:
|
||||
// unknown role, shouldn't happen in practice
|
||||
continue;
|
||||
}
|
||||
// Requests don't have index or stop reason.
|
||||
event.setBody(convertMessage(message, -1, null, captureMessageContent)).emit();
|
||||
// There are content blocks besides tool results in the same message. While models
|
||||
// generally don't expect such usage, the SDK allows it so go ahead and generate a normal
|
||||
// message too.
|
||||
}
|
||||
LogRecordBuilder event = newEvent(otelContext, eventLogger);
|
||||
switch (message.role()) {
|
||||
case ASSISTANT:
|
||||
event.setAttribute(EVENT_NAME, "gen_ai.assistant.message");
|
||||
break;
|
||||
case USER:
|
||||
event.setAttribute(EVENT_NAME, "gen_ai.user.message");
|
||||
break;
|
||||
default:
|
||||
// unknown role, shouldn't happen in practice
|
||||
continue;
|
||||
}
|
||||
// Requests don't have index or stop reason.
|
||||
event.setBody(convertMessage(message, -1, null, captureMessageContent)).emit();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -248,7 +278,7 @@ public final class BedrockRuntimeImpl {
|
|||
convertMessage(
|
||||
converseResponse.output().message(),
|
||||
0,
|
||||
converseResponse.stopReason(),
|
||||
converseResponse.stopReasonAsString(),
|
||||
captureMessageContent))
|
||||
.emit();
|
||||
}
|
||||
|
@ -270,7 +300,8 @@ public final class BedrockRuntimeImpl {
|
|||
return Double.valueOf(value);
|
||||
}
|
||||
|
||||
public static BedrockRuntimeAsyncClient wrap(BedrockRuntimeAsyncClient asyncClient) {
|
||||
public static BedrockRuntimeAsyncClient wrap(
|
||||
BedrockRuntimeAsyncClient asyncClient, Logger eventLogger, boolean captureMessageContent) {
|
||||
// proxy BedrockRuntimeAsyncClient so we can wrap the subscriber to converseStream to capture
|
||||
// events.
|
||||
return (BedrockRuntimeAsyncClient)
|
||||
|
@ -283,7 +314,9 @@ public final class BedrockRuntimeImpl {
|
|||
&& args[1] instanceof ConverseStreamResponseHandler) {
|
||||
TracingConverseStreamResponseHandler wrapped =
|
||||
new TracingConverseStreamResponseHandler(
|
||||
(ConverseStreamResponseHandler) args[1]);
|
||||
(ConverseStreamResponseHandler) args[1],
|
||||
eventLogger,
|
||||
captureMessageContent);
|
||||
args[1] = wrapped;
|
||||
try (Scope ignored = wrapped.makeCurrent()) {
|
||||
return invokeProxyMethod(method, asyncClient, args);
|
||||
|
@ -318,12 +351,29 @@ public final class BedrockRuntimeImpl {
|
|||
ContextKey.named("bedrock-runtime-converse-stream-response-handler");
|
||||
|
||||
private final ConverseStreamResponseHandler delegate;
|
||||
private final Logger eventLogger;
|
||||
private final boolean captureMessageContent;
|
||||
|
||||
private StringBuilder currentText;
|
||||
|
||||
// The response handler is created and stored into context before the span, so we need to
|
||||
// also pass the later context in for recording events. While subscribers are called from a
|
||||
// single thread, it is not clear if that is guaranteed to be the same as the execution
|
||||
// interceptor so we use volatile.
|
||||
private volatile Context otelContext;
|
||||
|
||||
private List<ToolUseBlock> tools;
|
||||
private ToolUseBlock.Builder currentTool;
|
||||
private StringBuilder currentToolArgs;
|
||||
|
||||
List<String> stopReasons;
|
||||
TokenUsage usage;
|
||||
|
||||
TracingConverseStreamResponseHandler(ConverseStreamResponseHandler delegate) {
|
||||
TracingConverseStreamResponseHandler(
|
||||
ConverseStreamResponseHandler delegate, Logger eventLogger, boolean captureMessageContent) {
|
||||
this.delegate = delegate;
|
||||
this.eventLogger = eventLogger;
|
||||
this.captureMessageContent = captureMessageContent;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -336,19 +386,66 @@ public final class BedrockRuntimeImpl {
|
|||
delegate.onEventStream(
|
||||
sdkPublisher.map(
|
||||
event -> {
|
||||
if (event instanceof MessageStopEvent) {
|
||||
if (stopReasons == null) {
|
||||
stopReasons = new ArrayList<>();
|
||||
}
|
||||
stopReasons.add(((MessageStopEvent) event).stopReasonAsString());
|
||||
}
|
||||
if (event instanceof ConverseStreamMetadataEvent) {
|
||||
usage = ((ConverseStreamMetadataEvent) event).usage();
|
||||
}
|
||||
handleEvent(event);
|
||||
return event;
|
||||
}));
|
||||
}
|
||||
|
||||
private void handleEvent(ConverseStreamOutput event) {
|
||||
if (captureMessageContent && event instanceof MessageStartEvent) {
|
||||
if (currentText == null) {
|
||||
currentText = new StringBuilder();
|
||||
}
|
||||
currentText.setLength(0);
|
||||
}
|
||||
if (event instanceof ContentBlockStartEvent) {
|
||||
ToolUseBlockStart toolUse = ((ContentBlockStartEvent) event).start().toolUse();
|
||||
if (toolUse != null) {
|
||||
if (currentToolArgs == null) {
|
||||
currentToolArgs = new StringBuilder();
|
||||
}
|
||||
currentToolArgs.setLength(0);
|
||||
currentTool = ToolUseBlock.builder().name(toolUse.name()).toolUseId(toolUse.toolUseId());
|
||||
}
|
||||
}
|
||||
if (event instanceof ContentBlockDeltaEvent) {
|
||||
ContentBlockDelta delta = ((ContentBlockDeltaEvent) event).delta();
|
||||
if (captureMessageContent && delta.text() != null) {
|
||||
currentText.append(delta.text());
|
||||
}
|
||||
if (delta.toolUse() != null) {
|
||||
currentToolArgs.append(delta.toolUse().input());
|
||||
}
|
||||
}
|
||||
if (event instanceof ContentBlockStopEvent) {
|
||||
if (currentTool != null) {
|
||||
if (tools == null) {
|
||||
tools = new ArrayList<>();
|
||||
}
|
||||
if (currentToolArgs != null) {
|
||||
Document args = deserializeDocument(currentToolArgs.toString());
|
||||
currentTool.input(args);
|
||||
}
|
||||
tools.add(currentTool.build());
|
||||
currentTool = null;
|
||||
}
|
||||
}
|
||||
if (event instanceof MessageStopEvent) {
|
||||
if (stopReasons == null) {
|
||||
stopReasons = new ArrayList<>();
|
||||
}
|
||||
String stopReason = ((MessageStopEvent) event).stopReasonAsString();
|
||||
stopReasons.add(stopReason);
|
||||
newEvent(otelContext, eventLogger)
|
||||
.setAttribute(EVENT_NAME, "gen_ai.choice")
|
||||
.setBody(convertMessageData(currentText, tools, 0, stopReason, captureMessageContent))
|
||||
.emit();
|
||||
}
|
||||
if (event instanceof ConverseStreamMetadataEvent) {
|
||||
usage = ((ConverseStreamMetadataEvent) event).usage();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionOccurred(Throwable throwable) {
|
||||
delegate.exceptionOccurred(throwable);
|
||||
|
@ -363,6 +460,10 @@ public final class BedrockRuntimeImpl {
|
|||
public Context storeInContext(Context context) {
|
||||
return context.with(KEY, this);
|
||||
}
|
||||
|
||||
void setOtelContext(Context otelContext) {
|
||||
this.otelContext = otelContext;
|
||||
}
|
||||
}
|
||||
|
||||
private static LogRecordBuilder newEvent(Context otelContext, Logger eventLogger) {
|
||||
|
@ -401,9 +502,9 @@ public final class BedrockRuntimeImpl {
|
|||
}
|
||||
|
||||
private static Value<?> convertMessage(
|
||||
Message message, int index, @Nullable StopReason stopReason, boolean captureMessageContent) {
|
||||
Message message, int index, @Nullable String stopReason, boolean captureMessageContent) {
|
||||
StringBuilder text = null;
|
||||
List<Value<?>> toolCalls = null;
|
||||
List<ToolUseBlock> toolCalls = null;
|
||||
for (ContentBlock content : message.content()) {
|
||||
if (captureMessageContent && content.text() != null) {
|
||||
if (text == null) {
|
||||
|
@ -415,15 +516,29 @@ public final class BedrockRuntimeImpl {
|
|||
if (toolCalls == null) {
|
||||
toolCalls = new ArrayList<>();
|
||||
}
|
||||
toolCalls.add(convertToolCall(content.toolUse(), captureMessageContent));
|
||||
toolCalls.add(content.toolUse());
|
||||
}
|
||||
}
|
||||
|
||||
return convertMessageData(text, toolCalls, index, stopReason, captureMessageContent);
|
||||
}
|
||||
|
||||
private static Value<?> convertMessageData(
|
||||
@Nullable StringBuilder text,
|
||||
List<ToolUseBlock> toolCalls,
|
||||
int index,
|
||||
@Nullable String stopReason,
|
||||
boolean captureMessageContent) {
|
||||
Map<String, Value<?>> body = new HashMap<>();
|
||||
if (text != null) {
|
||||
body.put("content", Value.of(text.toString()));
|
||||
}
|
||||
if (toolCalls != null) {
|
||||
body.put("toolCalls", Value.of(toolCalls));
|
||||
List<Value<?>> toolCallValues =
|
||||
toolCalls.stream()
|
||||
.map(tool -> convertToolCall(tool, captureMessageContent))
|
||||
.collect(Collectors.toList());
|
||||
body.put("toolCalls", Value.of(toolCallValues));
|
||||
}
|
||||
if (stopReason != null) {
|
||||
body.put("finish_reason", Value.of(stopReason.toString()));
|
||||
|
@ -451,4 +566,9 @@ public final class BedrockRuntimeImpl {
|
|||
document.accept(marshaller);
|
||||
return new String(generator.getBytes(), StandardCharsets.UTF_8);
|
||||
}
|
||||
|
||||
private static Document deserializeDocument(String json) {
|
||||
JsonNode node = JSON_PARSER.parse(json);
|
||||
return node.visit(DOCUMENT_UNMARSHALLER);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,78 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
// Includes work from:
|
||||
/*
|
||||
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License").
|
||||
* You may not use this file except in compliance with the License.
|
||||
* A copy of the License is located at
|
||||
*
|
||||
* http://aws.amazon.com/apache2.0
|
||||
*
|
||||
* or in the "license" file accompanying this file. This file is distributed
|
||||
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
|
||||
* express or implied. See the License for the specific language governing
|
||||
* permissions and limitations under the License.
|
||||
*/
|
||||
|
||||
package io.opentelemetry.instrumentation.awssdk.v2_2.internal;
|
||||
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
import software.amazon.awssdk.core.document.Document;
|
||||
import software.amazon.awssdk.protocols.jsoncore.JsonNode;
|
||||
import software.amazon.awssdk.protocols.jsoncore.JsonNodeVisitor;
|
||||
|
||||
// Copied as-is from
|
||||
// https://github.com/aws/aws-sdk-java-v2/blob/d5081c25be94f01e91f24d39751eac68e3de38ec/core/imds/src/main/java/software/amazon/awssdk/imds/internal/unmarshall/document/DocumentUnmarshaller.jav
|
||||
final class DocumentUnmarshaller implements JsonNodeVisitor<Document> {
|
||||
@Override
|
||||
public Document visitNull() {
|
||||
return Document.fromNull();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Document visitBoolean(boolean bool) {
|
||||
return Document.fromBoolean(bool);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Document visitNumber(String number) {
|
||||
return Document.fromNumber(number);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Document visitString(String string) {
|
||||
return Document.fromString(string);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Document visitArray(List<JsonNode> array) {
|
||||
return Document.fromList(
|
||||
array.stream().map(node -> node.visit(this)).collect(Collectors.toList()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Document visitObject(Map<String, JsonNode> object) {
|
||||
return Document.fromMap(
|
||||
object.entrySet().stream()
|
||||
.collect(
|
||||
Collectors.toMap(
|
||||
Map.Entry::getKey,
|
||||
entry -> entry.getValue().visit(this),
|
||||
(left, right) -> left,
|
||||
LinkedHashMap::new)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Document visitEmbeddedObject(Object embeddedObject) {
|
||||
throw new UnsupportedOperationException(
|
||||
"Embedded objects are not supported within Document types.");
|
||||
}
|
||||
}
|
|
@ -31,18 +31,25 @@ import io.opentelemetry.semconv.incubating.GenAiIncubatingAttributes;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.stream.Collectors;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
|
||||
import software.amazon.awssdk.core.document.Document;
|
||||
import software.amazon.awssdk.protocols.json.internal.unmarshall.document.DocumentUnmarshaller;
|
||||
import software.amazon.awssdk.protocols.jsoncore.JsonNode;
|
||||
import software.amazon.awssdk.services.bedrockruntime.BedrockRuntimeAsyncClient;
|
||||
import software.amazon.awssdk.services.bedrockruntime.BedrockRuntimeAsyncClientBuilder;
|
||||
import software.amazon.awssdk.services.bedrockruntime.BedrockRuntimeClient;
|
||||
import software.amazon.awssdk.services.bedrockruntime.BedrockRuntimeClientBuilder;
|
||||
import software.amazon.awssdk.services.bedrockruntime.model.ContentBlock;
|
||||
import software.amazon.awssdk.services.bedrockruntime.model.ConversationRole;
|
||||
import software.amazon.awssdk.services.bedrockruntime.model.ConverseRequest;
|
||||
import software.amazon.awssdk.services.bedrockruntime.model.ConverseResponse;
|
||||
import software.amazon.awssdk.services.bedrockruntime.model.ConverseStreamRequest;
|
||||
import software.amazon.awssdk.services.bedrockruntime.model.ConverseStreamResponseHandler;
|
||||
import software.amazon.awssdk.services.bedrockruntime.model.Message;
|
||||
import software.amazon.awssdk.services.bedrockruntime.model.Tool;
|
||||
import software.amazon.awssdk.services.bedrockruntime.model.ToolConfiguration;
|
||||
|
@ -50,6 +57,9 @@ import software.amazon.awssdk.services.bedrockruntime.model.ToolInputSchema;
|
|||
import software.amazon.awssdk.services.bedrockruntime.model.ToolResultBlock;
|
||||
import software.amazon.awssdk.services.bedrockruntime.model.ToolResultContentBlock;
|
||||
import software.amazon.awssdk.services.bedrockruntime.model.ToolSpecification;
|
||||
import software.amazon.awssdk.services.bedrockruntime.model.ToolUseBlock;
|
||||
import software.amazon.awssdk.services.bedrockruntime.model.ToolUseBlockDelta;
|
||||
import software.amazon.awssdk.services.bedrockruntime.model.ToolUseBlockStart;
|
||||
|
||||
class Aws2BedrockRuntimeTest extends AbstractAws2BedrockRuntimeTest {
|
||||
|
||||
|
@ -242,6 +252,7 @@ class Aws2BedrockRuntimeTest extends AbstractAws2BedrockRuntimeTest {
|
|||
KeyValue.of("id", Value.of(sanFranciscoToolUseId)),
|
||||
KeyValue.of("type", Value.of("function"))))))));
|
||||
|
||||
// Clear to allow asserting telemetry of user request and tool result processing separately.
|
||||
getTesting().clearData();
|
||||
|
||||
messages.add(response0.output().message());
|
||||
|
@ -414,6 +425,406 @@ class Aws2BedrockRuntimeTest extends AbstractAws2BedrockRuntimeTest {
|
|||
KeyValue.of("index", Value.of(0)))));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testConverseToolCallStreamNoMessageContent()
|
||||
throws InterruptedException, ExecutionException {
|
||||
BedrockRuntimeAsyncClientBuilder builder = BedrockRuntimeAsyncClient.builder();
|
||||
AwsSdkTelemetry telemetry =
|
||||
AwsSdkTelemetry.builder(testing.getOpenTelemetry())
|
||||
.setGenaiCaptureMessageContent(false)
|
||||
.build();
|
||||
builder.overrideConfiguration(
|
||||
ClientOverrideConfiguration.builder()
|
||||
.addExecutionInterceptor(telemetry.newExecutionInterceptor())
|
||||
.build());
|
||||
configureClient(builder);
|
||||
BedrockRuntimeAsyncClient client = telemetry.wrapBedrockRuntimeClient(builder.build());
|
||||
|
||||
String modelId = "amazon.nova-micro-v1:0";
|
||||
List<Message> messages = new ArrayList<>();
|
||||
messages.add(
|
||||
Message.builder()
|
||||
.role(ConversationRole.USER)
|
||||
.content(
|
||||
ContentBlock.fromText("What is the weather in Seattle and San Francisco today?"))
|
||||
.build());
|
||||
|
||||
StringBuilder responseChunksText = new StringBuilder();
|
||||
List<ToolUseBlock.Builder> responseChunksTools = new ArrayList<>();
|
||||
StringBuilder currentToolArgs = new StringBuilder();
|
||||
|
||||
ConverseStreamResponseHandler responseHandler =
|
||||
ConverseStreamResponseHandler.builder()
|
||||
.subscriber(
|
||||
ConverseStreamResponseHandler.Visitor.builder()
|
||||
.onContentBlockStart(
|
||||
chunk -> {
|
||||
if (!responseChunksTools.isEmpty()) {
|
||||
JsonNode node = JsonNode.parser().parse(currentToolArgs.toString());
|
||||
currentToolArgs.setLength(0);
|
||||
Document document = node.visit(new DocumentUnmarshaller());
|
||||
responseChunksTools.get(responseChunksTools.size() - 1).input(document);
|
||||
}
|
||||
ToolUseBlockStart toolUse = chunk.start().toolUse();
|
||||
if (toolUse != null) {
|
||||
responseChunksTools.add(
|
||||
ToolUseBlock.builder()
|
||||
.name(toolUse.name())
|
||||
.toolUseId(toolUse.toolUseId()));
|
||||
}
|
||||
})
|
||||
.onContentBlockDelta(
|
||||
chunk -> {
|
||||
ToolUseBlockDelta toolUse = chunk.delta().toolUse();
|
||||
if (toolUse != null) {
|
||||
currentToolArgs.append(toolUse.input());
|
||||
}
|
||||
String text = chunk.delta().text();
|
||||
if (text != null) {
|
||||
responseChunksText.append(text);
|
||||
}
|
||||
})
|
||||
.build())
|
||||
.build();
|
||||
|
||||
client
|
||||
.converseStream(
|
||||
ConverseStreamRequest.builder()
|
||||
.modelId(modelId)
|
||||
.messages(messages)
|
||||
.toolConfig(currentWeatherToolConfig())
|
||||
.build(),
|
||||
responseHandler)
|
||||
.get();
|
||||
|
||||
if (currentToolArgs.length() > 0 && !responseChunksTools.isEmpty()) {
|
||||
JsonNode node = JsonNode.parser().parse(currentToolArgs.toString());
|
||||
currentToolArgs.setLength(0);
|
||||
Document document = node.visit(new DocumentUnmarshaller());
|
||||
responseChunksTools.get(responseChunksTools.size() - 1).input(document);
|
||||
}
|
||||
|
||||
List<ToolUseBlock> toolUses =
|
||||
responseChunksTools.stream().map(ToolUseBlock.Builder::build).collect(Collectors.toList());
|
||||
|
||||
String seattleToolUseId0 = "";
|
||||
String sanFranciscoToolUseId0 = "";
|
||||
for (ToolUseBlock toolUse : toolUses) {
|
||||
String toolUseId = toolUse.toolUseId();
|
||||
switch (toolUse.input().asMap().get("location").asString()) {
|
||||
case "Seattle":
|
||||
seattleToolUseId0 = toolUseId;
|
||||
break;
|
||||
case "San Francisco":
|
||||
sanFranciscoToolUseId0 = toolUseId;
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("Invalid tool use: " + toolUse);
|
||||
}
|
||||
}
|
||||
String seattleToolUseId = seattleToolUseId0;
|
||||
String sanFranciscoToolUseId = sanFranciscoToolUseId0;
|
||||
|
||||
getTesting()
|
||||
.waitAndAssertTraces(
|
||||
trace ->
|
||||
trace.hasSpansSatisfyingExactly(
|
||||
span ->
|
||||
span.hasName("chat amazon.nova-micro-v1:0")
|
||||
.hasKind(SpanKind.CLIENT)
|
||||
.hasAttributesSatisfying(
|
||||
equalTo(GEN_AI_SYSTEM, AWS_BEDROCK),
|
||||
equalTo(
|
||||
GEN_AI_OPERATION_NAME,
|
||||
GenAiIncubatingAttributes.GenAiOperationNameIncubatingValues
|
||||
.CHAT),
|
||||
equalTo(GEN_AI_REQUEST_MODEL, modelId),
|
||||
equalTo(GEN_AI_USAGE_INPUT_TOKENS, 415),
|
||||
equalTo(GEN_AI_USAGE_OUTPUT_TOKENS, 162),
|
||||
equalTo(GEN_AI_RESPONSE_FINISH_REASONS, asList("tool_use")))));
|
||||
|
||||
getTesting()
|
||||
.waitAndAssertMetrics(
|
||||
INSTRUMENTATION_NAME,
|
||||
metric ->
|
||||
metric
|
||||
.hasName("gen_ai.client.token.usage")
|
||||
.hasUnit("{token}")
|
||||
.hasDescription("Measures number of input and output tokens used")
|
||||
.hasHistogramSatisfying(
|
||||
histogram ->
|
||||
histogram.hasPointsSatisfying(
|
||||
point ->
|
||||
point
|
||||
.hasSum(415)
|
||||
.hasCount(1)
|
||||
.hasAttributesSatisfyingExactly(
|
||||
equalTo(GEN_AI_SYSTEM, AWS_BEDROCK),
|
||||
equalTo(
|
||||
GEN_AI_TOKEN_TYPE,
|
||||
GenAiIncubatingAttributes
|
||||
.GenAiTokenTypeIncubatingValues.INPUT),
|
||||
equalTo(
|
||||
GEN_AI_OPERATION_NAME,
|
||||
GenAiIncubatingAttributes
|
||||
.GenAiOperationNameIncubatingValues.CHAT),
|
||||
equalTo(GEN_AI_REQUEST_MODEL, modelId)),
|
||||
point ->
|
||||
point
|
||||
.hasSum(162)
|
||||
.hasCount(1)
|
||||
.hasAttributesSatisfyingExactly(
|
||||
equalTo(GEN_AI_SYSTEM, AWS_BEDROCK),
|
||||
equalTo(
|
||||
GEN_AI_TOKEN_TYPE,
|
||||
GenAiIncubatingAttributes
|
||||
.GenAiTokenTypeIncubatingValues.COMPLETION),
|
||||
equalTo(
|
||||
GEN_AI_OPERATION_NAME,
|
||||
GenAiIncubatingAttributes
|
||||
.GenAiOperationNameIncubatingValues.CHAT),
|
||||
equalTo(GEN_AI_REQUEST_MODEL, modelId)))),
|
||||
metric ->
|
||||
metric
|
||||
.hasName("gen_ai.client.operation.duration")
|
||||
.hasUnit("s")
|
||||
.hasDescription("GenAI operation duration")
|
||||
.hasHistogramSatisfying(
|
||||
histogram ->
|
||||
histogram.hasPointsSatisfying(
|
||||
point ->
|
||||
point
|
||||
.hasSumGreaterThan(0.0)
|
||||
.hasAttributesSatisfyingExactly(
|
||||
equalTo(GEN_AI_SYSTEM, AWS_BEDROCK),
|
||||
equalTo(
|
||||
GEN_AI_OPERATION_NAME,
|
||||
GenAiIncubatingAttributes
|
||||
.GenAiOperationNameIncubatingValues.CHAT),
|
||||
equalTo(GEN_AI_REQUEST_MODEL, modelId)))));
|
||||
|
||||
SpanContext spanCtx0 = getTesting().waitForTraces(1).get(0).get(0).getSpanContext();
|
||||
|
||||
getTesting()
|
||||
.waitAndAssertLogRecords(
|
||||
log ->
|
||||
log.hasAttributesSatisfyingExactly(
|
||||
equalTo(GEN_AI_SYSTEM, AWS_BEDROCK),
|
||||
equalTo(EVENT_NAME, "gen_ai.user.message"))
|
||||
.hasSpanContext(spanCtx0)
|
||||
.hasBody(Value.of(Collections.emptyMap())),
|
||||
log ->
|
||||
log.hasAttributesSatisfyingExactly(
|
||||
equalTo(GEN_AI_SYSTEM, AWS_BEDROCK), equalTo(EVENT_NAME, "gen_ai.choice"))
|
||||
.hasSpanContext(spanCtx0)
|
||||
.hasBody(
|
||||
Value.of(
|
||||
KeyValue.of("finish_reason", Value.of("tool_use")),
|
||||
KeyValue.of("index", Value.of(0)),
|
||||
KeyValue.of(
|
||||
"toolCalls",
|
||||
Value.of(
|
||||
Value.of(
|
||||
KeyValue.of("name", Value.of("get_current_weather")),
|
||||
KeyValue.of("id", Value.of(seattleToolUseId)),
|
||||
KeyValue.of("type", Value.of("function"))),
|
||||
Value.of(
|
||||
KeyValue.of("name", Value.of("get_current_weather")),
|
||||
KeyValue.of("id", Value.of(sanFranciscoToolUseId)),
|
||||
KeyValue.of("type", Value.of("function"))))))));
|
||||
|
||||
// Clear to allow asserting telemetry of user request and tool result processing separately.
|
||||
getTesting().clearData();
|
||||
|
||||
List<ContentBlock> contentBlocks = new ArrayList<>();
|
||||
contentBlocks.add(ContentBlock.fromText(responseChunksText.toString()));
|
||||
toolUses.stream()
|
||||
.map(toolUse -> ContentBlock.builder().toolUse(toolUse).build())
|
||||
.forEach(contentBlocks::add);
|
||||
messages.add(Message.builder().role(ConversationRole.ASSISTANT).content(contentBlocks).build());
|
||||
messages.add(
|
||||
Message.builder()
|
||||
.role(ConversationRole.USER)
|
||||
.content(
|
||||
ContentBlock.fromToolResult(
|
||||
ToolResultBlock.builder()
|
||||
.content(
|
||||
ToolResultContentBlock.builder()
|
||||
.json(
|
||||
Document.mapBuilder()
|
||||
.putString("weather", "50 degrees and raining")
|
||||
.build())
|
||||
.build())
|
||||
.toolUseId(seattleToolUseId)
|
||||
.build()),
|
||||
ContentBlock.fromToolResult(
|
||||
ToolResultBlock.builder()
|
||||
.content(
|
||||
ToolResultContentBlock.builder()
|
||||
.json(
|
||||
Document.mapBuilder()
|
||||
.putString("weather", "70 degrees and sunny")
|
||||
.build())
|
||||
.build())
|
||||
.toolUseId(sanFranciscoToolUseId)
|
||||
.build()))
|
||||
.build());
|
||||
|
||||
List<String> responseChunks = new ArrayList<>();
|
||||
ConverseStreamResponseHandler responseHandler1 =
|
||||
ConverseStreamResponseHandler.builder()
|
||||
.subscriber(
|
||||
ConverseStreamResponseHandler.Visitor.builder()
|
||||
.onContentBlockDelta(
|
||||
chunk -> {
|
||||
responseChunks.add(chunk.delta().text());
|
||||
})
|
||||
.build())
|
||||
.build();
|
||||
|
||||
client
|
||||
.converseStream(
|
||||
ConverseStreamRequest.builder()
|
||||
.modelId(modelId)
|
||||
.messages(messages)
|
||||
.toolConfig(currentWeatherToolConfig())
|
||||
.build(),
|
||||
responseHandler1)
|
||||
.get();
|
||||
|
||||
assertThat(String.join("", responseChunks))
|
||||
.contains(
|
||||
"The current weather in Seattle is 50 degrees and it is raining. "
|
||||
+ "In San Francisco, the weather is 70 degrees and sunny.");
|
||||
|
||||
getTesting()
|
||||
.waitAndAssertTraces(
|
||||
trace ->
|
||||
trace.hasSpansSatisfyingExactly(
|
||||
span ->
|
||||
span.hasName("chat amazon.nova-micro-v1:0")
|
||||
.hasKind(SpanKind.CLIENT)
|
||||
.hasAttributesSatisfying(
|
||||
equalTo(GEN_AI_SYSTEM, AWS_BEDROCK),
|
||||
equalTo(
|
||||
GEN_AI_OPERATION_NAME,
|
||||
GenAiIncubatingAttributes.GenAiOperationNameIncubatingValues
|
||||
.CHAT),
|
||||
equalTo(GEN_AI_REQUEST_MODEL, modelId),
|
||||
equalTo(GEN_AI_USAGE_INPUT_TOKENS, 554),
|
||||
equalTo(GEN_AI_USAGE_OUTPUT_TOKENS, 59),
|
||||
equalTo(GEN_AI_RESPONSE_FINISH_REASONS, asList("end_turn")))));
|
||||
|
||||
getTesting()
|
||||
.waitAndAssertMetrics(
|
||||
INSTRUMENTATION_NAME,
|
||||
metric ->
|
||||
metric
|
||||
.hasName("gen_ai.client.token.usage")
|
||||
.hasUnit("{token}")
|
||||
.hasDescription("Measures number of input and output tokens used")
|
||||
.hasHistogramSatisfying(
|
||||
histogram ->
|
||||
histogram.hasPointsSatisfying(
|
||||
point ->
|
||||
point
|
||||
.hasSum(554)
|
||||
.hasCount(1)
|
||||
.hasAttributesSatisfyingExactly(
|
||||
equalTo(GEN_AI_SYSTEM, AWS_BEDROCK),
|
||||
equalTo(
|
||||
GEN_AI_TOKEN_TYPE,
|
||||
GenAiIncubatingAttributes
|
||||
.GenAiTokenTypeIncubatingValues.INPUT),
|
||||
equalTo(
|
||||
GEN_AI_OPERATION_NAME,
|
||||
GenAiIncubatingAttributes
|
||||
.GenAiOperationNameIncubatingValues.CHAT),
|
||||
equalTo(GEN_AI_REQUEST_MODEL, modelId)),
|
||||
point ->
|
||||
point
|
||||
.hasSum(59)
|
||||
.hasCount(1)
|
||||
.hasAttributesSatisfyingExactly(
|
||||
equalTo(GEN_AI_SYSTEM, AWS_BEDROCK),
|
||||
equalTo(
|
||||
GEN_AI_TOKEN_TYPE,
|
||||
GenAiIncubatingAttributes
|
||||
.GenAiTokenTypeIncubatingValues.COMPLETION),
|
||||
equalTo(
|
||||
GEN_AI_OPERATION_NAME,
|
||||
GenAiIncubatingAttributes
|
||||
.GenAiOperationNameIncubatingValues.CHAT),
|
||||
equalTo(GEN_AI_REQUEST_MODEL, modelId)))),
|
||||
metric ->
|
||||
metric
|
||||
.hasName("gen_ai.client.operation.duration")
|
||||
.hasUnit("s")
|
||||
.hasDescription("GenAI operation duration")
|
||||
.hasHistogramSatisfying(
|
||||
histogram ->
|
||||
histogram.hasPointsSatisfying(
|
||||
point ->
|
||||
point
|
||||
.hasSumGreaterThan(0.0)
|
||||
.hasAttributesSatisfyingExactly(
|
||||
equalTo(GEN_AI_SYSTEM, AWS_BEDROCK),
|
||||
equalTo(
|
||||
GEN_AI_OPERATION_NAME,
|
||||
GenAiIncubatingAttributes
|
||||
.GenAiOperationNameIncubatingValues.CHAT),
|
||||
equalTo(GEN_AI_REQUEST_MODEL, modelId)))));
|
||||
|
||||
SpanContext spanCtx1 = getTesting().waitForTraces(1).get(0).get(0).getSpanContext();
|
||||
|
||||
getTesting()
|
||||
.waitAndAssertLogRecords(
|
||||
log ->
|
||||
log.hasAttributesSatisfyingExactly(
|
||||
equalTo(GEN_AI_SYSTEM, AWS_BEDROCK),
|
||||
equalTo(EVENT_NAME, "gen_ai.user.message"))
|
||||
.hasSpanContext(spanCtx1)
|
||||
.hasBody(Value.of(emptyMap())),
|
||||
log ->
|
||||
log.hasAttributesSatisfyingExactly(
|
||||
equalTo(GEN_AI_SYSTEM, AWS_BEDROCK),
|
||||
equalTo(EVENT_NAME, "gen_ai.assistant.message"))
|
||||
.hasSpanContext(spanCtx1)
|
||||
.hasBody(
|
||||
Value.of(
|
||||
KeyValue.of(
|
||||
"toolCalls",
|
||||
Value.of(
|
||||
Value.of(
|
||||
KeyValue.of("name", Value.of("get_current_weather")),
|
||||
KeyValue.of("id", Value.of(seattleToolUseId)),
|
||||
KeyValue.of("type", Value.of("function"))),
|
||||
Value.of(
|
||||
KeyValue.of("name", Value.of("get_current_weather")),
|
||||
KeyValue.of("id", Value.of(sanFranciscoToolUseId)),
|
||||
KeyValue.of("type", Value.of("function"))))))),
|
||||
log ->
|
||||
log.hasAttributesSatisfyingExactly(
|
||||
equalTo(GEN_AI_SYSTEM, AWS_BEDROCK),
|
||||
equalTo(EVENT_NAME, "gen_ai.tool.message"))
|
||||
.hasSpanContext(spanCtx1)
|
||||
.hasBody(Value.of(KeyValue.of("id", Value.of(seattleToolUseId)))),
|
||||
log ->
|
||||
log.hasAttributesSatisfyingExactly(
|
||||
equalTo(GEN_AI_SYSTEM, AWS_BEDROCK),
|
||||
equalTo(EVENT_NAME, "gen_ai.tool.message"))
|
||||
.hasSpanContext(spanCtx1)
|
||||
.hasBody(Value.of(KeyValue.of("id", Value.of(sanFranciscoToolUseId)))),
|
||||
log ->
|
||||
log.hasAttributesSatisfyingExactly(
|
||||
equalTo(GEN_AI_SYSTEM, AWS_BEDROCK), equalTo(EVENT_NAME, "gen_ai.choice"))
|
||||
.hasSpanContext(spanCtx1)
|
||||
.hasBody(
|
||||
Value.of(
|
||||
KeyValue.of("finish_reason", Value.of("end_turn")),
|
||||
KeyValue.of("index", Value.of(0)))));
|
||||
}
|
||||
|
||||
private static ToolConfiguration currentWeatherToolConfig() {
|
||||
return ToolConfiguration.builder()
|
||||
.tools(
|
||||
|
|
|
@ -36,6 +36,7 @@ import java.net.URI;
|
|||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.stream.Collectors;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
|
||||
|
@ -43,6 +44,8 @@ import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
|
|||
import software.amazon.awssdk.awscore.client.builder.AwsClientBuilder;
|
||||
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
|
||||
import software.amazon.awssdk.core.document.Document;
|
||||
import software.amazon.awssdk.protocols.json.internal.unmarshall.document.DocumentUnmarshaller;
|
||||
import software.amazon.awssdk.protocols.jsoncore.JsonNode;
|
||||
import software.amazon.awssdk.regions.Region;
|
||||
import software.amazon.awssdk.services.bedrockruntime.BedrockRuntimeAsyncClient;
|
||||
import software.amazon.awssdk.services.bedrockruntime.BedrockRuntimeAsyncClientBuilder;
|
||||
|
@ -62,6 +65,9 @@ import software.amazon.awssdk.services.bedrockruntime.model.ToolInputSchema;
|
|||
import software.amazon.awssdk.services.bedrockruntime.model.ToolResultBlock;
|
||||
import software.amazon.awssdk.services.bedrockruntime.model.ToolResultContentBlock;
|
||||
import software.amazon.awssdk.services.bedrockruntime.model.ToolSpecification;
|
||||
import software.amazon.awssdk.services.bedrockruntime.model.ToolUseBlock;
|
||||
import software.amazon.awssdk.services.bedrockruntime.model.ToolUseBlockDelta;
|
||||
import software.amazon.awssdk.services.bedrockruntime.model.ToolUseBlockStart;
|
||||
|
||||
public abstract class AbstractAws2BedrockRuntimeTest {
|
||||
protected static final String INSTRUMENTATION_NAME = "io.opentelemetry.aws-sdk-2.2";
|
||||
|
@ -512,6 +518,7 @@ public abstract class AbstractAws2BedrockRuntimeTest {
|
|||
Value.of(
|
||||
"<thinking> The User has asked for the current weather in two locations: Seattle and San Francisco. To provide the requested information, I will use the \"get_current_weather\" tool for each location separately. </thinking>\n")))));
|
||||
|
||||
// Clear to allow asserting telemetry of user request and tool result processing separately.
|
||||
getTesting().clearData();
|
||||
|
||||
messages.add(response0.output().message());
|
||||
|
@ -712,6 +719,440 @@ public abstract class AbstractAws2BedrockRuntimeTest {
|
|||
+ "The current weather in Seattle is 50 degrees and raining. In San Francisco, the weather is 70 degrees and sunny.")))));
|
||||
}
|
||||
|
||||
@Test
|
||||
void testConverseToolCallStream() throws InterruptedException, ExecutionException {
|
||||
BedrockRuntimeAsyncClientBuilder builder = BedrockRuntimeAsyncClient.builder();
|
||||
builder.overrideConfiguration(createOverrideConfigurationBuilder().build());
|
||||
configureClient(builder);
|
||||
BedrockRuntimeAsyncClient client = configureBedrockRuntimeClient(builder.build());
|
||||
|
||||
String modelId = "amazon.nova-micro-v1:0";
|
||||
List<Message> messages = new ArrayList<>();
|
||||
messages.add(
|
||||
Message.builder()
|
||||
.role(ConversationRole.USER)
|
||||
.content(
|
||||
ContentBlock.fromText("What is the weather in Seattle and San Francisco today?"))
|
||||
.build());
|
||||
|
||||
StringBuilder responseChunksText = new StringBuilder();
|
||||
List<ToolUseBlock.Builder> responseChunksTools = new ArrayList<>();
|
||||
StringBuilder currentToolArgs = new StringBuilder();
|
||||
|
||||
ConverseStreamResponseHandler responseHandler =
|
||||
ConverseStreamResponseHandler.builder()
|
||||
.subscriber(
|
||||
ConverseStreamResponseHandler.Visitor.builder()
|
||||
.onContentBlockStart(
|
||||
chunk -> {
|
||||
if (!responseChunksTools.isEmpty()) {
|
||||
JsonNode node = JsonNode.parser().parse(currentToolArgs.toString());
|
||||
currentToolArgs.setLength(0);
|
||||
Document document = node.visit(new DocumentUnmarshaller());
|
||||
responseChunksTools.get(responseChunksTools.size() - 1).input(document);
|
||||
}
|
||||
ToolUseBlockStart toolUse = chunk.start().toolUse();
|
||||
if (toolUse != null) {
|
||||
responseChunksTools.add(
|
||||
ToolUseBlock.builder()
|
||||
.name(toolUse.name())
|
||||
.toolUseId(toolUse.toolUseId()));
|
||||
}
|
||||
})
|
||||
.onContentBlockDelta(
|
||||
chunk -> {
|
||||
ToolUseBlockDelta toolUse = chunk.delta().toolUse();
|
||||
if (toolUse != null) {
|
||||
currentToolArgs.append(toolUse.input());
|
||||
}
|
||||
String text = chunk.delta().text();
|
||||
if (text != null) {
|
||||
responseChunksText.append(text);
|
||||
}
|
||||
})
|
||||
.build())
|
||||
.build();
|
||||
|
||||
client
|
||||
.converseStream(
|
||||
ConverseStreamRequest.builder()
|
||||
.modelId(modelId)
|
||||
.messages(messages)
|
||||
.toolConfig(currentWeatherToolConfig())
|
||||
.build(),
|
||||
responseHandler)
|
||||
.get();
|
||||
|
||||
if (currentToolArgs.length() > 0 && !responseChunksTools.isEmpty()) {
|
||||
JsonNode node = JsonNode.parser().parse(currentToolArgs.toString());
|
||||
currentToolArgs.setLength(0);
|
||||
Document document = node.visit(new DocumentUnmarshaller());
|
||||
responseChunksTools.get(responseChunksTools.size() - 1).input(document);
|
||||
}
|
||||
|
||||
List<ToolUseBlock> toolUses =
|
||||
responseChunksTools.stream().map(ToolUseBlock.Builder::build).collect(Collectors.toList());
|
||||
|
||||
String seattleToolUseId0 = "";
|
||||
String sanFranciscoToolUseId0 = "";
|
||||
for (ToolUseBlock toolUse : toolUses) {
|
||||
String toolUseId = toolUse.toolUseId();
|
||||
switch (toolUse.input().asMap().get("location").asString()) {
|
||||
case "Seattle":
|
||||
seattleToolUseId0 = toolUseId;
|
||||
break;
|
||||
case "San Francisco":
|
||||
sanFranciscoToolUseId0 = toolUseId;
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("Invalid tool use: " + toolUse);
|
||||
}
|
||||
}
|
||||
String seattleToolUseId = seattleToolUseId0;
|
||||
String sanFranciscoToolUseId = sanFranciscoToolUseId0;
|
||||
|
||||
getTesting()
|
||||
.waitAndAssertTraces(
|
||||
trace ->
|
||||
trace.hasSpansSatisfyingExactly(
|
||||
span ->
|
||||
span.hasName("chat amazon.nova-micro-v1:0")
|
||||
.hasKind(SpanKind.CLIENT)
|
||||
.hasAttributesSatisfying(
|
||||
equalTo(GEN_AI_SYSTEM, AWS_BEDROCK),
|
||||
equalTo(
|
||||
GEN_AI_OPERATION_NAME,
|
||||
GenAiIncubatingAttributes.GenAiOperationNameIncubatingValues
|
||||
.CHAT),
|
||||
equalTo(GEN_AI_REQUEST_MODEL, modelId),
|
||||
equalTo(GEN_AI_USAGE_INPUT_TOKENS, 415),
|
||||
equalTo(GEN_AI_USAGE_OUTPUT_TOKENS, 162),
|
||||
equalTo(GEN_AI_RESPONSE_FINISH_REASONS, asList("tool_use")))));
|
||||
|
||||
getTesting()
|
||||
.waitAndAssertMetrics(
|
||||
INSTRUMENTATION_NAME,
|
||||
metric ->
|
||||
metric
|
||||
.hasName("gen_ai.client.token.usage")
|
||||
.hasUnit("{token}")
|
||||
.hasDescription("Measures number of input and output tokens used")
|
||||
.hasHistogramSatisfying(
|
||||
histogram ->
|
||||
histogram.hasPointsSatisfying(
|
||||
point ->
|
||||
point
|
||||
.hasSum(415)
|
||||
.hasCount(1)
|
||||
.hasAttributesSatisfyingExactly(
|
||||
equalTo(GEN_AI_SYSTEM, AWS_BEDROCK),
|
||||
equalTo(
|
||||
GEN_AI_TOKEN_TYPE,
|
||||
GenAiIncubatingAttributes
|
||||
.GenAiTokenTypeIncubatingValues.INPUT),
|
||||
equalTo(
|
||||
GEN_AI_OPERATION_NAME,
|
||||
GenAiIncubatingAttributes
|
||||
.GenAiOperationNameIncubatingValues.CHAT),
|
||||
equalTo(GEN_AI_REQUEST_MODEL, modelId)),
|
||||
point ->
|
||||
point
|
||||
.hasSum(162)
|
||||
.hasCount(1)
|
||||
.hasAttributesSatisfyingExactly(
|
||||
equalTo(GEN_AI_SYSTEM, AWS_BEDROCK),
|
||||
equalTo(
|
||||
GEN_AI_TOKEN_TYPE,
|
||||
GenAiIncubatingAttributes
|
||||
.GenAiTokenTypeIncubatingValues.COMPLETION),
|
||||
equalTo(
|
||||
GEN_AI_OPERATION_NAME,
|
||||
GenAiIncubatingAttributes
|
||||
.GenAiOperationNameIncubatingValues.CHAT),
|
||||
equalTo(GEN_AI_REQUEST_MODEL, modelId)))),
|
||||
metric ->
|
||||
metric
|
||||
.hasName("gen_ai.client.operation.duration")
|
||||
.hasUnit("s")
|
||||
.hasDescription("GenAI operation duration")
|
||||
.hasHistogramSatisfying(
|
||||
histogram ->
|
||||
histogram.hasPointsSatisfying(
|
||||
point ->
|
||||
point
|
||||
.hasSumGreaterThan(0.0)
|
||||
.hasAttributesSatisfyingExactly(
|
||||
equalTo(GEN_AI_SYSTEM, AWS_BEDROCK),
|
||||
equalTo(
|
||||
GEN_AI_OPERATION_NAME,
|
||||
GenAiIncubatingAttributes
|
||||
.GenAiOperationNameIncubatingValues.CHAT),
|
||||
equalTo(GEN_AI_REQUEST_MODEL, modelId)))));
|
||||
|
||||
SpanContext spanCtx0 = getTesting().waitForTraces(1).get(0).get(0).getSpanContext();
|
||||
|
||||
getTesting()
|
||||
.waitAndAssertLogRecords(
|
||||
log ->
|
||||
log.hasAttributesSatisfyingExactly(
|
||||
equalTo(GEN_AI_SYSTEM, AWS_BEDROCK),
|
||||
equalTo(EVENT_NAME, "gen_ai.user.message"))
|
||||
.hasSpanContext(spanCtx0)
|
||||
.hasBody(
|
||||
Value.of(
|
||||
KeyValue.of(
|
||||
"content",
|
||||
Value.of(
|
||||
"What is the weather in Seattle and San Francisco today?")))),
|
||||
log ->
|
||||
log.hasAttributesSatisfyingExactly(
|
||||
equalTo(GEN_AI_SYSTEM, AWS_BEDROCK), equalTo(EVENT_NAME, "gen_ai.choice"))
|
||||
.hasSpanContext(spanCtx0)
|
||||
.hasBody(
|
||||
Value.of(
|
||||
KeyValue.of("finish_reason", Value.of("tool_use")),
|
||||
KeyValue.of("index", Value.of(0)),
|
||||
KeyValue.of(
|
||||
"toolCalls",
|
||||
Value.of(
|
||||
Value.of(
|
||||
KeyValue.of("name", Value.of("get_current_weather")),
|
||||
KeyValue.of(
|
||||
"arguments", Value.of("{\"location\":\"Seattle\"}")),
|
||||
KeyValue.of("id", Value.of(seattleToolUseId)),
|
||||
KeyValue.of("type", Value.of("function"))),
|
||||
Value.of(
|
||||
KeyValue.of("name", Value.of("get_current_weather")),
|
||||
KeyValue.of(
|
||||
"arguments",
|
||||
Value.of("{\"location\":\"San Francisco\"}")),
|
||||
KeyValue.of("id", Value.of(sanFranciscoToolUseId)),
|
||||
KeyValue.of("type", Value.of("function"))))),
|
||||
KeyValue.of(
|
||||
"content",
|
||||
Value.of(
|
||||
"<thinking> The User has asked for the current weather in two locations: Seattle and San Francisco. To provide the requested information, I will use the \"get_current_weather\" tool for each location separately. </thinking>\n")))));
|
||||
|
||||
// Clear to allow asserting telemetry of user request and tool result processing separately.
|
||||
getTesting().clearData();
|
||||
|
||||
List<ContentBlock> contentBlocks = new ArrayList<>();
|
||||
contentBlocks.add(ContentBlock.fromText(responseChunksText.toString()));
|
||||
toolUses.stream()
|
||||
.map(toolUse -> ContentBlock.builder().toolUse(toolUse).build())
|
||||
.forEach(contentBlocks::add);
|
||||
messages.add(Message.builder().role(ConversationRole.ASSISTANT).content(contentBlocks).build());
|
||||
messages.add(
|
||||
Message.builder()
|
||||
.role(ConversationRole.USER)
|
||||
.content(
|
||||
ContentBlock.fromToolResult(
|
||||
ToolResultBlock.builder()
|
||||
.content(
|
||||
ToolResultContentBlock.builder()
|
||||
.json(
|
||||
Document.mapBuilder()
|
||||
.putString("weather", "50 degrees and raining")
|
||||
.build())
|
||||
.build())
|
||||
.toolUseId(seattleToolUseId)
|
||||
.build()),
|
||||
ContentBlock.fromToolResult(
|
||||
ToolResultBlock.builder()
|
||||
.content(
|
||||
ToolResultContentBlock.builder()
|
||||
.json(
|
||||
Document.mapBuilder()
|
||||
.putString("weather", "70 degrees and sunny")
|
||||
.build())
|
||||
.build())
|
||||
.toolUseId(sanFranciscoToolUseId)
|
||||
.build()))
|
||||
.build());
|
||||
|
||||
List<String> responseChunks = new ArrayList<>();
|
||||
ConverseStreamResponseHandler responseHandler1 =
|
||||
ConverseStreamResponseHandler.builder()
|
||||
.subscriber(
|
||||
ConverseStreamResponseHandler.Visitor.builder()
|
||||
.onContentBlockDelta(
|
||||
chunk -> {
|
||||
responseChunks.add(chunk.delta().text());
|
||||
})
|
||||
.build())
|
||||
.build();
|
||||
|
||||
client
|
||||
.converseStream(
|
||||
ConverseStreamRequest.builder()
|
||||
.modelId(modelId)
|
||||
.messages(messages)
|
||||
.toolConfig(currentWeatherToolConfig())
|
||||
.build(),
|
||||
responseHandler1)
|
||||
.get();
|
||||
|
||||
assertThat(String.join("", responseChunks))
|
||||
.contains(
|
||||
"The current weather in Seattle is 50 degrees and it is raining. "
|
||||
+ "In San Francisco, the weather is 70 degrees and sunny.");
|
||||
|
||||
getTesting()
|
||||
.waitAndAssertTraces(
|
||||
trace ->
|
||||
trace.hasSpansSatisfyingExactly(
|
||||
span ->
|
||||
span.hasName("chat amazon.nova-micro-v1:0")
|
||||
.hasKind(SpanKind.CLIENT)
|
||||
.hasAttributesSatisfying(
|
||||
equalTo(GEN_AI_SYSTEM, AWS_BEDROCK),
|
||||
equalTo(
|
||||
GEN_AI_OPERATION_NAME,
|
||||
GenAiIncubatingAttributes.GenAiOperationNameIncubatingValues
|
||||
.CHAT),
|
||||
equalTo(GEN_AI_REQUEST_MODEL, modelId),
|
||||
equalTo(GEN_AI_USAGE_INPUT_TOKENS, 554),
|
||||
equalTo(GEN_AI_USAGE_OUTPUT_TOKENS, 59),
|
||||
equalTo(GEN_AI_RESPONSE_FINISH_REASONS, asList("end_turn")))));
|
||||
|
||||
getTesting()
|
||||
.waitAndAssertMetrics(
|
||||
INSTRUMENTATION_NAME,
|
||||
metric ->
|
||||
metric
|
||||
.hasName("gen_ai.client.token.usage")
|
||||
.hasUnit("{token}")
|
||||
.hasDescription("Measures number of input and output tokens used")
|
||||
.hasHistogramSatisfying(
|
||||
histogram ->
|
||||
histogram.hasPointsSatisfying(
|
||||
point ->
|
||||
point
|
||||
.hasSum(554)
|
||||
.hasCount(1)
|
||||
.hasAttributesSatisfyingExactly(
|
||||
equalTo(GEN_AI_SYSTEM, AWS_BEDROCK),
|
||||
equalTo(
|
||||
GEN_AI_TOKEN_TYPE,
|
||||
GenAiIncubatingAttributes
|
||||
.GenAiTokenTypeIncubatingValues.INPUT),
|
||||
equalTo(
|
||||
GEN_AI_OPERATION_NAME,
|
||||
GenAiIncubatingAttributes
|
||||
.GenAiOperationNameIncubatingValues.CHAT),
|
||||
equalTo(GEN_AI_REQUEST_MODEL, modelId)),
|
||||
point ->
|
||||
point
|
||||
.hasSum(59)
|
||||
.hasCount(1)
|
||||
.hasAttributesSatisfyingExactly(
|
||||
equalTo(GEN_AI_SYSTEM, AWS_BEDROCK),
|
||||
equalTo(
|
||||
GEN_AI_TOKEN_TYPE,
|
||||
GenAiIncubatingAttributes
|
||||
.GenAiTokenTypeIncubatingValues.COMPLETION),
|
||||
equalTo(
|
||||
GEN_AI_OPERATION_NAME,
|
||||
GenAiIncubatingAttributes
|
||||
.GenAiOperationNameIncubatingValues.CHAT),
|
||||
equalTo(GEN_AI_REQUEST_MODEL, modelId)))),
|
||||
metric ->
|
||||
metric
|
||||
.hasName("gen_ai.client.operation.duration")
|
||||
.hasUnit("s")
|
||||
.hasDescription("GenAI operation duration")
|
||||
.hasHistogramSatisfying(
|
||||
histogram ->
|
||||
histogram.hasPointsSatisfying(
|
||||
point ->
|
||||
point
|
||||
.hasSumGreaterThan(0.0)
|
||||
.hasAttributesSatisfyingExactly(
|
||||
equalTo(GEN_AI_SYSTEM, AWS_BEDROCK),
|
||||
equalTo(
|
||||
GEN_AI_OPERATION_NAME,
|
||||
GenAiIncubatingAttributes
|
||||
.GenAiOperationNameIncubatingValues.CHAT),
|
||||
equalTo(GEN_AI_REQUEST_MODEL, modelId)))));
|
||||
|
||||
SpanContext spanCtx1 = getTesting().waitForTraces(1).get(0).get(0).getSpanContext();
|
||||
|
||||
getTesting()
|
||||
.waitAndAssertLogRecords(
|
||||
log ->
|
||||
log.hasAttributesSatisfyingExactly(
|
||||
equalTo(GEN_AI_SYSTEM, AWS_BEDROCK),
|
||||
equalTo(EVENT_NAME, "gen_ai.user.message"))
|
||||
.hasSpanContext(spanCtx1)
|
||||
.hasBody(
|
||||
Value.of(
|
||||
KeyValue.of(
|
||||
"content",
|
||||
Value.of(
|
||||
"What is the weather in Seattle and San Francisco today?")))),
|
||||
log ->
|
||||
log.hasAttributesSatisfyingExactly(
|
||||
equalTo(GEN_AI_SYSTEM, AWS_BEDROCK),
|
||||
equalTo(EVENT_NAME, "gen_ai.assistant.message"))
|
||||
.hasSpanContext(spanCtx1)
|
||||
.hasBody(
|
||||
Value.of(
|
||||
KeyValue.of(
|
||||
"toolCalls",
|
||||
Value.of(
|
||||
Value.of(
|
||||
KeyValue.of("name", Value.of("get_current_weather")),
|
||||
KeyValue.of(
|
||||
"arguments", Value.of("{\"location\":\"Seattle\"}")),
|
||||
KeyValue.of("id", Value.of(seattleToolUseId)),
|
||||
KeyValue.of("type", Value.of("function"))),
|
||||
Value.of(
|
||||
KeyValue.of("name", Value.of("get_current_weather")),
|
||||
KeyValue.of(
|
||||
"arguments",
|
||||
Value.of("{\"location\":\"San Francisco\"}")),
|
||||
KeyValue.of("id", Value.of(sanFranciscoToolUseId)),
|
||||
KeyValue.of("type", Value.of("function"))))),
|
||||
KeyValue.of(
|
||||
"content",
|
||||
Value.of(
|
||||
"<thinking> The User has asked for the current weather in two locations: Seattle and San Francisco. To provide the requested information, I will use the \"get_current_weather\" tool for each location separately. </thinking>\n")))),
|
||||
log ->
|
||||
log.hasAttributesSatisfyingExactly(
|
||||
equalTo(GEN_AI_SYSTEM, AWS_BEDROCK),
|
||||
equalTo(EVENT_NAME, "gen_ai.tool.message"))
|
||||
.hasSpanContext(spanCtx1)
|
||||
.hasBody(
|
||||
Value.of(
|
||||
KeyValue.of("id", Value.of(seattleToolUseId)),
|
||||
KeyValue.of(
|
||||
"content", Value.of("{\"weather\":\"50 degrees and raining\"}")))),
|
||||
log ->
|
||||
log.hasAttributesSatisfyingExactly(
|
||||
equalTo(GEN_AI_SYSTEM, AWS_BEDROCK),
|
||||
equalTo(EVENT_NAME, "gen_ai.tool.message"))
|
||||
.hasSpanContext(spanCtx1)
|
||||
.hasBody(
|
||||
Value.of(
|
||||
KeyValue.of("id", Value.of(sanFranciscoToolUseId)),
|
||||
KeyValue.of(
|
||||
"content", Value.of("{\"weather\":\"70 degrees and sunny\"}")))),
|
||||
log ->
|
||||
log.hasAttributesSatisfyingExactly(
|
||||
equalTo(GEN_AI_SYSTEM, AWS_BEDROCK), equalTo(EVENT_NAME, "gen_ai.choice"))
|
||||
.hasSpanContext(spanCtx1)
|
||||
.hasBody(
|
||||
Value.of(
|
||||
KeyValue.of("finish_reason", Value.of("end_turn")),
|
||||
KeyValue.of("index", Value.of(0)),
|
||||
KeyValue.of(
|
||||
"content",
|
||||
Value.of(
|
||||
"<thinking> The tool has provided the current weather for both locations. Now I will compile the information and present it to the User. </thinking>\n"
|
||||
+ "\n"
|
||||
+ "The current weather in Seattle is 50 degrees and it is raining. In San Francisco, the weather is 70 degrees and sunny.")))));
|
||||
}
|
||||
|
||||
private static ToolConfiguration currentWeatherToolConfig() {
|
||||
return ToolConfiguration.builder()
|
||||
.tools(
|
||||
|
@ -863,6 +1304,26 @@ public abstract class AbstractAws2BedrockRuntimeTest {
|
|||
GenAiIncubatingAttributes
|
||||
.GenAiOperationNameIncubatingValues.CHAT),
|
||||
equalTo(GEN_AI_REQUEST_MODEL, modelId)))));
|
||||
|
||||
SpanContext spanCtx = getTesting().waitForTraces(1).get(0).get(0).getSpanContext();
|
||||
|
||||
getTesting()
|
||||
.waitAndAssertLogRecords(
|
||||
log ->
|
||||
log.hasAttributesSatisfyingExactly(
|
||||
equalTo(GEN_AI_SYSTEM, AWS_BEDROCK),
|
||||
equalTo(EVENT_NAME, "gen_ai.user.message"))
|
||||
.hasSpanContext(spanCtx)
|
||||
.hasBody(Value.of(KeyValue.of("content", Value.of("Say this is a test")))),
|
||||
log ->
|
||||
log.hasAttributesSatisfyingExactly(
|
||||
equalTo(GEN_AI_SYSTEM, AWS_BEDROCK), equalTo(EVENT_NAME, "gen_ai.choice"))
|
||||
.hasSpanContext(spanCtx)
|
||||
.hasBody(
|
||||
Value.of(
|
||||
KeyValue.of("finish_reason", Value.of("end_turn")),
|
||||
KeyValue.of("index", Value.of(0)),
|
||||
KeyValue.of("content", Value.of("\"Test, test\"")))));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -935,5 +1396,25 @@ public abstract class AbstractAws2BedrockRuntimeTest {
|
|||
equalTo(GEN_AI_USAGE_INPUT_TOKENS, 8),
|
||||
equalTo(GEN_AI_USAGE_OUTPUT_TOKENS, 5),
|
||||
equalTo(GEN_AI_RESPONSE_FINISH_REASONS, asList("max_tokens")))));
|
||||
|
||||
SpanContext spanCtx = getTesting().waitForTraces(1).get(0).get(0).getSpanContext();
|
||||
|
||||
getTesting()
|
||||
.waitAndAssertLogRecords(
|
||||
log ->
|
||||
log.hasAttributesSatisfyingExactly(
|
||||
equalTo(GEN_AI_SYSTEM, AWS_BEDROCK),
|
||||
equalTo(EVENT_NAME, "gen_ai.user.message"))
|
||||
.hasSpanContext(spanCtx)
|
||||
.hasBody(Value.of(KeyValue.of("content", Value.of("Say this is a test")))),
|
||||
log ->
|
||||
log.hasAttributesSatisfyingExactly(
|
||||
equalTo(GEN_AI_SYSTEM, AWS_BEDROCK), equalTo(EVENT_NAME, "gen_ai.choice"))
|
||||
.hasSpanContext(spanCtx)
|
||||
.hasBody(
|
||||
Value.of(
|
||||
KeyValue.of("finish_reason", Value.of("max_tokens")),
|
||||
KeyValue.of("index", Value.of(0)),
|
||||
KeyValue.of("content", Value.of("This model")))));
|
||||
}
|
||||
}
|
||||
|
|
File diff suppressed because one or more lines are too long
Loading…
Reference in New Issue