AWS SDK instrumentation - DynamoDB attributes (#2262)

* AWS SDK instrumentation - DynamoDB attributes

* aws sdk attributes mapping - dynamoDb

* some clean-up, swithced to AWS SDK marshalling for SdkPojos

* more tests, slight refactoring of different request type fields handling

* code review changes

* code review changes

* code review changes
This commit is contained in:
Jakub Wach 2021-02-22 10:38:35 +01:00 committed by GitHub
parent 2497a03523
commit 17aae4dfaa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 806 additions and 148 deletions

View File

@ -4,6 +4,10 @@ dependencies {
implementation deps.opentelemetryExtAws
library group: 'software.amazon.awssdk', name: 'aws-core', version: '2.2.0'
library group: 'software.amazon.awssdk', name: 'aws-json-protocol', version: '2.2.0'
testImplementation project(':instrumentation:aws-sdk:aws-sdk-2.2:testing')
testImplementation deps.assertj
testImplementation group: 'org.mockito', name: 'mockito-core', version: '3.6.0'
}

View File

@ -0,0 +1,155 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v2_2;
import static io.opentelemetry.instrumentation.awssdk.v2_2.AwsSdkRequestType.DynamoDB;
import static io.opentelemetry.instrumentation.awssdk.v2_2.AwsSdkRequestType.Kinesis;
import static io.opentelemetry.instrumentation.awssdk.v2_2.AwsSdkRequestType.S3;
import static io.opentelemetry.instrumentation.awssdk.v2_2.AwsSdkRequestType.SQS;
import static io.opentelemetry.instrumentation.awssdk.v2_2.FieldMapping.request;
import static io.opentelemetry.instrumentation.awssdk.v2_2.FieldMapping.response;
import java.util.List;
import java.util.Map;
import org.checkerframework.checker.nullness.qual.Nullable;
import software.amazon.awssdk.core.SdkRequest;
/**
* Temporary solution - maps only DynamoDB attributes. Final solution should be generated from AWS
* SDK automatically
* (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/2291).
*/
enum AwsSdkRequest {
// generic requests
DynamoDbRequest(DynamoDB, "DynamoDbRequest"),
S3Request(S3, "S3Request"),
SqsRequest(SQS, "SqsRequest"),
KinesisRequest(Kinesis, "KinesisRequest"),
// specific requests
BatchGetItem(
DynamoDB,
"BatchGetItemRequest",
request("aws.dynamodb.table_names", "RequestItems"),
response("aws.dynamodb.consumed_capacity", "ConsumedCapacity")),
BatchWriteItem(
DynamoDB,
"BatchWriteItemRequest",
request("aws.dynamodb.table_names", "RequestItems"),
response("aws.dynamodb.consumed_capacity", "ConsumedCapacity"),
response("aws.dynamodb.item_collection_metrics", "ItemCollectionMetrics")),
CreateTable(
DynamoDB,
"CreateTableRequest",
request("aws.dynamodb.global_secondary_indexes", "GlobalSecondaryIndexes"),
request("aws.dynamodb.local_secondary_indexes", "LocalSecondaryIndexes"),
request(
"aws.dynamodb.provisioned_throughput.read_capacity_units",
"ProvisionedThroughput.ReadCapacityUnits"),
request(
"aws.dynamodb.provisioned_throughput.write_capacity_units",
"ProvisionedThroughput.WriteCapacityUnits")),
DeleteItem(
DynamoDB,
"DeleteItemRequest",
response("aws.dynamodb.consumed_capacity", "ConsumedCapacity"),
response("aws.dynamodb.item_collection_metrics", "ItemCollectionMetrics")),
GetItem(
DynamoDB,
"GetItemRequest",
request("aws.dynamodb.projection_expression", "ProjectionExpression"),
response("aws.dynamodb.consumed_capacity", "ConsumedCapacity"),
request("aws.dynamodb.consistent_read", "ConsistentRead")),
ListTables(
DynamoDB,
"ListTablesRequest",
request("aws.dynamodb.exclusive_start_table_name", "ExclusiveStartTableName"),
response("aws.dynamodb.table_count", "TableNames"),
request("aws.dynamodb.limit", "Limit")),
PutItem(
DynamoDB,
"PutItemRequest",
response("aws.dynamodb.consumed_capacity", "ConsumedCapacity"),
response("aws.dynamodb.item_collection_metrics", "ItemCollectionMetrics")),
Query(
DynamoDB,
"QueryRequest",
request("aws.dynamodb.attributes_to_get", "AttributesToGet"),
request("aws.dynamodb.consistent_read", "ConsistentRead"),
request("aws.dynamodb.index_name", "IndexName"),
request("aws.dynamodb.limit", "Limit"),
request("aws.dynamodb.projection_expression", "ProjectionExpression"),
request("aws.dynamodb.scan_index_forward", "ScanIndexForward"),
request("aws.dynamodb.select", "Select"),
response("aws.dynamodb.consumed_capacity", "ConsumedCapacity")),
Scan(
DynamoDB,
"ScanRequest",
request("aws.dynamodb.attributes_to_get", "AttributesToGet"),
request("aws.dynamodb.consistent_read", "ConsistentRead"),
request("aws.dynamodb.index_name", "IndexName"),
request("aws.dynamodb.limit", "Limit"),
request("aws.dynamodb.projection_expression", "ProjectionExpression"),
request("aws.dynamodb.segment", "Segment"),
request("aws.dynamodb.select", "Select"),
request("aws.dynamodb.total_segments", "TotalSegments"),
response("aws.dynamodb.consumed_capacity", "ConsumedCapacity"),
response("aws.dynamodb.count", "Count"),
response("aws.dynamodb.scanned_count", "ScannedCount")),
UpdateItem(
DynamoDB,
"UpdateItemRequest",
response("aws.dynamodb.consumed_capacity", "ConsumedCapacity"),
response("aws.dynamodb.item_collection_metrics", "ItemCollectionMetrics")),
UpdateTable(
DynamoDB,
"UpdateTableRequest",
request("aws.dynamodb.attribute_definitions", "AttributeDefinitions"),
request("aws.dynamodb.global_secondary_index_updates", "GlobalSecondaryIndexUpdates"),
request(
"aws.dynamodb.provisioned_throughput.read_capacity_units",
"ProvisionedThroughput.ReadCapacityUnits"),
request(
"aws.dynamodb.provisioned_throughput.write_capacity_units",
"ProvisionedThroughput.WriteCapacityUnits"));
private final AwsSdkRequestType type;
private final String requestClass;
private final Map<FieldMapping.Type, List<FieldMapping>> fields;
AwsSdkRequest(AwsSdkRequestType type, String requestClass, FieldMapping... fields) {
this.type = type;
this.requestClass = requestClass;
this.fields = FieldMapping.groupByType(fields);
}
@Nullable
static AwsSdkRequest ofSdkRequest(SdkRequest request) {
// try request type
AwsSdkRequest result = ofType(request.getClass().getSimpleName());
// try parent - generic
if (result == null) {
result = ofType(request.getClass().getSuperclass().getSimpleName());
}
return result;
}
private static AwsSdkRequest ofType(String typeName) {
for (AwsSdkRequest type : values()) {
if (type.requestClass.equals(typeName)) {
return type;
}
}
return null;
}
List<FieldMapping> fields(FieldMapping.Type type) {
return fields.get(type);
}
AwsSdkRequestType type() {
return type;
}
}

View File

@ -0,0 +1,31 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v2_2;
import static io.opentelemetry.instrumentation.awssdk.v2_2.FieldMapping.request;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.util.List;
import java.util.Map;
enum AwsSdkRequestType {
S3(request("aws.bucket.name", "Bucket")),
SQS(request("aws.queue.url", "QueueUrl"), request("aws.queue.name", "QueueName")),
Kinesis(request("aws.stream.name", "StreamName")),
DynamoDB(
request("aws.table.name", "TableName"),
request(SemanticAttributes.DB_NAME.getKey(), "TableName"));
private final Map<FieldMapping.Type, List<FieldMapping>> fields;
AwsSdkRequestType(FieldMapping... fieldMappings) {
this.fields = FieldMapping.groupByType(fieldMappings);
}
List<FieldMapping> fields(FieldMapping.Type type) {
return fields.get(type);
}
}

View File

@ -1,30 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v2_2;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import software.amazon.awssdk.core.SdkRequest;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.core.interceptor.SdkExecutionAttribute;
final class DbRequestDecorator implements SdkRequestDecorator {
@Override
public void decorate(Span span, SdkRequest sdkRequest, ExecutionAttributes attributes) {
span.setAttribute(SemanticAttributes.DB_SYSTEM, "dynamodb");
// decorate with TableName as db.name (DynamoDB equivalent - not for batch)
sdkRequest
.getValueForField("TableName", String.class)
.ifPresent(val -> span.setAttribute(SemanticAttributes.DB_NAME, val));
String operation = attributes.getAttribute(SdkExecutionAttribute.OPERATION_NAME);
if (operation != null) {
span.setAttribute(SemanticAttributes.DB_OPERATION, operation);
}
}
}

View File

@ -0,0 +1,85 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v2_2;
import io.opentelemetry.api.trace.Span;
import java.util.List;
import java.util.function.Function;
import org.checkerframework.checker.nullness.qual.Nullable;
import software.amazon.awssdk.core.SdkRequest;
import software.amazon.awssdk.core.SdkResponse;
import software.amazon.awssdk.utils.StringUtils;
class FieldMapper {
private final Serializer serializer;
private final MethodHandleFactory methodHandleFactory;
FieldMapper() {
serializer = new Serializer();
methodHandleFactory = new MethodHandleFactory();
}
FieldMapper(Serializer serializer, MethodHandleFactory methodHandleFactory) {
this.methodHandleFactory = methodHandleFactory;
this.serializer = serializer;
}
void mapToAttributes(SdkRequest sdkRequest, AwsSdkRequest request, Span span) {
mapToAttributes(
field -> sdkRequest.getValueForField(field, Object.class).orElse(null),
FieldMapping.Type.REQUEST,
request,
span);
}
void mapToAttributes(SdkResponse sdkResponse, AwsSdkRequest request, Span span) {
mapToAttributes(
field -> sdkResponse.getValueForField(field, Object.class).orElse(null),
FieldMapping.Type.RESPONSE,
request,
span);
}
private void mapToAttributes(
Function<String, Object> fieldValueProvider,
FieldMapping.Type type,
AwsSdkRequest request,
Span span) {
for (FieldMapping fieldMapping : request.fields(type)) {
mapToAttributes(fieldValueProvider, fieldMapping, span);
}
for (FieldMapping fieldMapping : request.type().fields(type)) {
mapToAttributes(fieldValueProvider, fieldMapping, span);
}
}
private void mapToAttributes(
Function<String, Object> fieldValueProvider, FieldMapping fieldMapping, Span span) {
// traverse path
List<String> path = fieldMapping.getFields();
Object target = fieldValueProvider.apply(path.get(0));
for (int i = 1; i < path.size() && target != null; i++) {
target = next(target, path.get(i));
}
if (target != null) {
String value = serializer.serialize(target);
if (!StringUtils.isEmpty(value)) {
span.setAttribute(fieldMapping.getAttribute(), value);
}
}
}
@Nullable
private Object next(Object current, String fieldName) {
try {
return methodHandleFactory.forField(current.getClass(), fieldName).invoke(current);
} catch (Throwable t) {
// ignore
}
return null;
}
}

View File

@ -0,0 +1,63 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v2_2;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
class FieldMapping {
enum Type {
REQUEST,
RESPONSE;
}
private final Type type;
private final String attribute;
private final List<String> fields;
static FieldMapping request(String attribute, String fieldPath) {
return new FieldMapping(Type.REQUEST, attribute, fieldPath);
}
static FieldMapping response(String attribute, String fieldPath) {
return new FieldMapping(Type.RESPONSE, attribute, fieldPath);
}
FieldMapping(Type type, String attribute, String fieldPath) {
this.type = type;
this.attribute = attribute;
this.fields = Collections.unmodifiableList(Arrays.asList(fieldPath.split("\\.")));
}
String getAttribute() {
return attribute;
}
List<String> getFields() {
return fields;
}
Type getType() {
return type;
}
static final Map<Type, List<FieldMapping>> groupByType(FieldMapping[] fieldMappings) {
EnumMap<Type, List<FieldMapping>> fields = new EnumMap<>(Type.class);
for (FieldMapping.Type type : FieldMapping.Type.values()) {
fields.put(type, new ArrayList<>());
}
for (FieldMapping fieldMapping : fieldMappings) {
fields.get(fieldMapping.getType()).add(fieldMapping);
}
return fields;
}
}

View File

@ -0,0 +1,36 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v2_2;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.ConcurrentHashMap;
class MethodHandleFactory {
private String lowerCase(String string) {
return string.substring(0, 1).toLowerCase() + string.substring(1);
}
private final ClassValue<ConcurrentHashMap<String, MethodHandle>> getterCache =
new ClassValue<ConcurrentHashMap<String, MethodHandle>>() {
@Override
protected ConcurrentHashMap<String, MethodHandle> computeValue(Class<?> type) {
return new ConcurrentHashMap<>();
}
};
MethodHandle forField(Class clazz, String fieldName)
throws NoSuchMethodException, IllegalAccessException {
MethodHandle methodHandle = getterCache.get(clazz).get(fieldName);
if (methodHandle == null) {
// getter in AWS SDK is lowercased field name
methodHandle = MethodHandles.publicLookup().unreflect(clazz.getMethod(lowerCase(fieldName)));
getterCache.get(clazz).put(fieldName, methodHandle);
}
return methodHandle;
}
}

View File

@ -1,44 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v2_2;
import org.checkerframework.checker.nullness.qual.Nullable;
import software.amazon.awssdk.core.SdkRequest;
enum RequestType {
S3("S3Request", "Bucket"),
SQS("SqsRequest", "QueueUrl", "QueueName"),
Kinesis("KinesisRequest", "StreamName"),
DynamoDB("DynamoDbRequest", "TableName");
private final String requestClass;
private final String[] fields;
RequestType(String requestClass, String... fields) {
this.requestClass = requestClass;
this.fields = fields;
}
String[] getFields() {
return fields;
}
@Nullable
static RequestType ofSdkRequest(SdkRequest request) {
// exact request class should be 1st level child of request type
String typeName =
(request.getClass().getSuperclass() == null
? request.getClass()
: request.getClass().getSuperclass())
.getSimpleName();
for (RequestType type : values()) {
if (type.requestClass.equals(typeName)) {
return type;
}
}
return null;
}
}

View File

@ -1,15 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v2_2;
import io.opentelemetry.api.trace.Span;
import software.amazon.awssdk.core.SdkRequest;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
interface SdkRequestDecorator {
void decorate(Span span, SdkRequest sdkRequest, ExecutionAttributes attributes);
}

View File

@ -0,0 +1,89 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v2_2;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.checkerframework.checker.nullness.qual.Nullable;
import software.amazon.awssdk.core.SdkPojo;
import software.amazon.awssdk.core.client.config.SdkClientConfiguration;
import software.amazon.awssdk.core.client.config.SdkClientOption;
import software.amazon.awssdk.http.ContentStreamProvider;
import software.amazon.awssdk.http.SdkHttpFullRequest;
import software.amazon.awssdk.http.SdkHttpMethod;
import software.amazon.awssdk.protocols.core.OperationInfo;
import software.amazon.awssdk.protocols.core.ProtocolMarshaller;
import software.amazon.awssdk.protocols.json.AwsJsonProtocolFactory;
import software.amazon.awssdk.utils.IoUtils;
import software.amazon.awssdk.utils.StringUtils;
class Serializer {
private final AwsJsonProtocolFactory awsJsonProtocolFactory;
Serializer() {
awsJsonProtocolFactory =
AwsJsonProtocolFactory.builder()
.clientConfiguration(
SdkClientConfiguration.builder()
// AwsJsonProtocolFactory requires any URI to be present
.option(SdkClientOption.ENDPOINT, URI.create("http://empty"))
.build())
.build();
}
@Nullable
String serialize(Object target) {
if (target == null) {
return null;
}
if (target instanceof SdkPojo) {
return serialize((SdkPojo) target);
}
if (target instanceof Collection) {
return serialize((Collection<Object>) target);
}
if (target instanceof Map) {
return serialize(((Map) target).keySet());
}
// simple type
return target.toString();
}
@Nullable
private String serialize(SdkPojo sdkPojo) {
Optional<ContentStreamProvider> optional =
createMarshaller().marshall(sdkPojo).contentStreamProvider();
return optional
.map(
csp -> {
try (InputStream cspIs = csp.newStream()) {
return IoUtils.toUtf8String(cspIs);
} catch (IOException e) {
return null;
}
})
.orElse(null);
}
private String serialize(Collection<Object> collection) {
String serialized = collection.stream().map(this::serialize).collect(Collectors.joining(","));
return (StringUtils.isEmpty(serialized) ? null : "[" + serialized + "]");
}
private ProtocolMarshaller<SdkHttpFullRequest> createMarshaller() {
// apparently AWS SDK serializers can't be reused (throwing NPEs on second use)
return awsJsonProtocolFactory.createProtocolMarshaller(
OperationInfo.builder().hasPayloadMembers(true).httpMethod(SdkHttpMethod.POST).build());
}
}

View File

@ -7,15 +7,11 @@ package io.opentelemetry.instrumentation.awssdk.v2_2;
import static io.opentelemetry.instrumentation.awssdk.v2_2.AwsSdk.getContext;
import static io.opentelemetry.instrumentation.awssdk.v2_2.AwsSdkHttpClientTracer.tracer;
import static io.opentelemetry.instrumentation.awssdk.v2_2.RequestType.ofSdkRequest;
import static io.opentelemetry.instrumentation.awssdk.v2_2.AwsSdkRequestType.DynamoDB;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.Map;
import org.checkerframework.checker.nullness.qual.Nullable;
import software.amazon.awssdk.awscore.AwsResponse;
import software.amazon.awssdk.core.ClientType;
import software.amazon.awssdk.core.SdkRequest;
@ -36,39 +32,12 @@ final class TracingExecutionInterceptor implements ExecutionInterceptor {
new ExecutionAttribute<>(TracingExecutionInterceptor.class.getName() + ".Context");
static final ExecutionAttribute<Scope> SCOPE_ATTRIBUTE =
new ExecutionAttribute<>(TracingExecutionInterceptor.class.getName() + ".Scope");
static final ExecutionAttribute<RequestType> REQUEST_TYPE_ATTRIBUTE =
new ExecutionAttribute<>(TracingExecutionInterceptor.class.getName() + ".RequestType");
static final ExecutionAttribute<AwsSdkRequest> AWS_SDK_REQUEST_ATTRIBUTE =
new ExecutionAttribute<>(TracingExecutionInterceptor.class.getName() + ".AwsSdkRequest");
static final String COMPONENT_NAME = "java-aws-sdk";
private static final Map<RequestType, SdkRequestDecorator> TYPE_TO_DECORATOR = mapDecorators();
private static final Map<String, String> FIELD_TO_ATTRIBUTE = mapFieldToAttribute();
private static Map<RequestType, SdkRequestDecorator> mapDecorators() {
Map<RequestType, SdkRequestDecorator> result = new EnumMap<>(RequestType.class);
result.put(RequestType.DynamoDB, new DbRequestDecorator());
return result;
}
private static Map<String, String> mapFieldToAttribute() {
Map<String, String> result = new HashMap<>();
result.put("QueueUrl", "aws.queue.url");
result.put("Bucket", "aws.bucket.name");
result.put("QueueName", "aws.queue.name");
result.put("StreamName", "aws.stream.name");
result.put("TableName", "aws.table.name");
return result;
}
@Nullable
private SdkRequestDecorator decorator(ExecutionAttributes executionAttributes) {
RequestType type = getTypeFromAttributes(executionAttributes);
return TYPE_TO_DECORATOR.get(type);
}
private RequestType getTypeFromAttributes(ExecutionAttributes executionAttributes) {
return executionAttributes.getAttribute(REQUEST_TYPE_ATTRIBUTE);
}
private final FieldMapper fieldMapper = new FieldMapper();
@Override
public void beforeExecution(
@ -80,10 +49,6 @@ final class TracingExecutionInterceptor implements ExecutionInterceptor {
io.opentelemetry.context.Context otelContext =
tracer().startSpan(parentOtelContext, executionAttributes);
executionAttributes.putAttribute(CONTEXT_ATTRIBUTE, otelContext);
RequestType type = ofSdkRequest(context.request());
if (type != null) {
executionAttributes.putAttribute(REQUEST_TYPE_ATTRIBUTE, type);
}
if (executionAttributes
.getAttribute(SdkExecutionAttribute.CLIENT_TYPE)
.equals(ClientType.SYNC)) {
@ -116,27 +81,33 @@ final class TracingExecutionInterceptor implements ExecutionInterceptor {
Span span = Span.fromContext(otelContext);
tracer().onRequest(span, context.httpRequest());
SdkRequestDecorator decorator = decorator(executionAttributes);
if (decorator != null) {
decorator.decorate(span, context.request(), executionAttributes);
AwsSdkRequest awsSdkRequest = AwsSdkRequest.ofSdkRequest(context.request());
if (awsSdkRequest != null) {
executionAttributes.putAttribute(AWS_SDK_REQUEST_ATTRIBUTE, awsSdkRequest);
populateRequestAttributes(span, awsSdkRequest, context.request(), executionAttributes);
}
decorateWithGenericRequestData(span, context.request());
decorateWithExAttributesData(span, executionAttributes);
populateGenericAttributes(span, executionAttributes);
}
private void decorateWithGenericRequestData(Span span, SdkRequest request) {
private void populateRequestAttributes(
Span span,
AwsSdkRequest awsSdkRequest,
SdkRequest sdkRequest,
ExecutionAttributes attributes) {
RequestType type = ofSdkRequest(request);
if (type != null) {
for (String field : type.getFields()) {
request
.getValueForField(field, String.class)
.ifPresent(val -> span.setAttribute(FIELD_TO_ATTRIBUTE.get(field), val));
fieldMapper.mapToAttributes(sdkRequest, awsSdkRequest, span);
if (awsSdkRequest.type() == DynamoDB) {
span.setAttribute(SemanticAttributes.DB_SYSTEM, "dynamodb");
String operation = attributes.getAttribute(SdkExecutionAttribute.OPERATION_NAME);
if (operation != null) {
span.setAttribute(SemanticAttributes.DB_OPERATION, operation);
}
}
}
private void decorateWithExAttributesData(Span span, ExecutionAttributes attributes) {
private void populateGenericAttributes(Span span, ExecutionAttributes attributes) {
String awsServiceName = attributes.getAttribute(SdkExecutionAttribute.SERVICE_NAME);
String awsOperation = attributes.getAttribute(SdkExecutionAttribute.OPERATION_NAME);
@ -157,7 +128,7 @@ final class TracingExecutionInterceptor implements ExecutionInterceptor {
clearAttributes(executionAttributes);
Span span = Span.fromContext(otelContext);
onUserAgentHeaderAvailable(span, context.httpRequest());
onSdkResponse(span, context.response());
onSdkResponse(span, context.response(), executionAttributes);
tracer().end(otelContext, context.httpResponse());
}
@ -167,10 +138,15 @@ final class TracingExecutionInterceptor implements ExecutionInterceptor {
SemanticAttributes.HTTP_USER_AGENT, tracer().requestHeader(request, "User-Agent"));
}
private void onSdkResponse(Span span, SdkResponse response) {
private void onSdkResponse(
Span span, SdkResponse response, ExecutionAttributes executionAttributes) {
if (response instanceof AwsResponse) {
span.setAttribute("aws.requestId", ((AwsResponse) response).responseMetadata().requestId());
}
AwsSdkRequest sdkRequest = executionAttributes.getAttribute(AWS_SDK_REQUEST_ATTRIBUTE);
if (sdkRequest != null) {
fieldMapper.mapToAttributes(response, sdkRequest, span);
}
}
@Override
@ -183,6 +159,6 @@ final class TracingExecutionInterceptor implements ExecutionInterceptor {
private void clearAttributes(ExecutionAttributes executionAttributes) {
executionAttributes.putAttribute(CONTEXT_ATTRIBUTE, null);
executionAttributes.putAttribute(REQUEST_TYPE_ATTRIBUTE, null);
executionAttributes.putAttribute(AWS_SDK_REQUEST_ATTRIBUTE, null);
}
}

View File

@ -0,0 +1,104 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v2_2;
import static io.opentelemetry.instrumentation.awssdk.v2_2.AwsSdkRequest.BatchWriteItem;
import static io.opentelemetry.instrumentation.awssdk.v2_2.AwsSdkRequest.UpdateTable;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.mock;
import static org.mockito.BDDMockito.verify;
import static org.mockito.BDDMockito.verifyNoMoreInteractions;
import io.opentelemetry.api.trace.Span;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.junit.Test;
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.BatchWriteItemResponse;
import software.amazon.awssdk.services.dynamodb.model.ConsumedCapacity;
import software.amazon.awssdk.services.dynamodb.model.ItemCollectionMetrics;
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput;
import software.amazon.awssdk.services.dynamodb.model.UpdateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.WriteRequest;
public class FieldMapperTest {
@Test
public void shouldMapNestedField() {
// given
AwsSdkRequest awsSdkRequest = UpdateTable;
MethodHandleFactory methodHandleFactory = new MethodHandleFactory();
Serializer serializer = mock(Serializer.class);
FieldMapper underTest = new FieldMapper(serializer, methodHandleFactory);
Map<String, Collection<WriteRequest>> items = new HashMap();
UpdateTableRequest sdkRequest =
UpdateTableRequest.builder()
.provisionedThroughput(
ProvisionedThroughput.builder()
.readCapacityUnits(55L)
.writeCapacityUnits(77L)
.build())
.build();
given(serializer.serialize(55L)).willReturn("55");
given(serializer.serialize(77L)).willReturn("77");
Span span = mock(Span.class);
// when
underTest.mapToAttributes(sdkRequest, awsSdkRequest, span);
// then
verify(span).setAttribute("aws.dynamodb.provisioned_throughput.read_capacity_units", "55");
verify(span).setAttribute("aws.dynamodb.provisioned_throughput.write_capacity_units", "77");
verifyNoMoreInteractions(span);
}
@Test
public void shouldMapRequestFieldsOnly() {
// given
AwsSdkRequest awsSdkRequest = BatchWriteItem;
MethodHandleFactory methodHandleFactory = new MethodHandleFactory();
Serializer serializer = mock(Serializer.class);
FieldMapper underTest = new FieldMapper(serializer, methodHandleFactory);
Map<String, Collection<WriteRequest>> items = new HashMap();
BatchWriteItemRequest sdkRequest = BatchWriteItemRequest.builder().requestItems(items).build();
given(serializer.serialize(items)).willReturn("firstTable,secondTable");
Span span = mock(Span.class);
// when
underTest.mapToAttributes(sdkRequest, awsSdkRequest, span);
// then
verify(span).setAttribute("aws.dynamodb.table_names", "firstTable,secondTable");
verifyNoMoreInteractions(span);
}
@Test
public void shouldMapResponseFieldsOnly() {
// given
AwsSdkRequest awsSdkRequest = BatchWriteItem;
MethodHandleFactory methodHandleFactory = new MethodHandleFactory();
Serializer serializer = mock(Serializer.class);
FieldMapper underTest = new FieldMapper(serializer, methodHandleFactory);
Map<String, Collection<ItemCollectionMetrics>> items = new HashMap();
BatchWriteItemResponse sdkResponse =
BatchWriteItemResponse.builder()
.consumedCapacity(ConsumedCapacity.builder().build())
.itemCollectionMetrics(items)
.build();
given(serializer.serialize(sdkResponse.consumedCapacity())).willReturn("consumedCapacity");
given(serializer.serialize(items)).willReturn("itemCollectionMetrics");
Span span = mock(Span.class);
// when
underTest.mapToAttributes(sdkResponse, awsSdkRequest, span);
// then
verify(span).setAttribute("aws.dynamodb.consumed_capacity", "consumedCapacity");
verify(span).setAttribute("aws.dynamodb.item_collection_metrics", "itemCollectionMetrics");
verifyNoMoreInteractions(span);
}
}

View File

@ -0,0 +1,73 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v2_2;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.junit.Test;
import software.amazon.awssdk.core.SdkPojo;
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput;
public class SerializerTest {
@Test
public void shouldSerializeSimpleString() {
// given
// when
String serialized = new Serializer().serialize("simpleString");
// then
assertThat(serialized).isEqualTo("simpleString");
}
@Test
public void shouldSerializeSdkPojo() {
// given
SdkPojo sdkPojo =
ProvisionedThroughput.builder().readCapacityUnits(1L).writeCapacityUnits(2L).build();
// when
String serialized = new Serializer().serialize(sdkPojo);
// then
assertThat(serialized).isEqualTo("{\"ReadCapacityUnits\":1,\"WriteCapacityUnits\":2}");
}
@Test
public void shouldSerializeCollection() {
// given
List<String> collection = Arrays.asList("one", "two", "three");
// when
String serialized = new Serializer().serialize(collection);
// then
assertThat(serialized).isEqualTo("[one,two,three]");
}
@Test
public void shouldSerializeEmptyCollectionAsNull() {
// given
List<String> collection = Arrays.asList();
// when
String serialized = new Serializer().serialize(collection);
// then
assertThat(serialized).isNull();
}
@Test
public void shouldSerializeMapAsKeyCollection() {
// given
Map<String, Object> map = new HashMap<>();
map.put("uno", Long.valueOf(1));
map.put("dos", new LinkedHashMap<>());
map.put("tres", "cuatro");
// when
String serialized = new Serializer().serialize(map);
// then
assertThat(serialized).isEqualTo("[uno,dos,tres]");
}
}

View File

@ -9,8 +9,8 @@ import static com.google.common.collect.ImmutableMap.of
import static io.opentelemetry.api.trace.SpanKind.CLIENT
import static io.opentelemetry.instrumentation.test.server.http.TestHttpServer.httpServer
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import java.time.Duration
import java.util.concurrent.Future
import java.util.concurrent.atomic.AtomicReference
@ -29,6 +29,9 @@ import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest
import software.amazon.awssdk.services.dynamodb.model.GlobalSecondaryIndex
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest
import software.amazon.awssdk.services.dynamodb.model.QueryRequest
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest
@ -97,7 +100,16 @@ abstract class AbstractAws2ClientTest extends InstrumentationSpecification {
expect:
response != null
response.class.simpleName.startsWith(operation)
assertDynamoDbRequest(service, operation, path, method, requestId)
switch (operation) {
case "CreateTable":
assertCreateTableRequest(path, method, requestId)
break
case "Query":
assertQueryRequest(path, method, requestId)
break
default:
assertDynamoDbRequest(service, operation, path, method, requestId)
}
where:
[service, operation, method, path, requestId, builder, call] << dynamoDbRequestDataTable(DynamoDbClient.builder())
@ -120,12 +132,92 @@ abstract class AbstractAws2ClientTest extends InstrumentationSpecification {
expect:
response != null
assertDynamoDbRequest(service, operation, path, method, requestId)
switch (operation) {
case "CreateTable":
assertCreateTableRequest(path, method, requestId)
break
case "Query":
assertQueryRequest(path, method, requestId)
break
default:
assertDynamoDbRequest(service, operation, path, method, requestId)
}
where:
[service, operation, method, path, requestId, builder, call] << dynamoDbRequestDataTable(DynamoDbAsyncClient.builder())
}
def assertCreateTableRequest(path, method, requestId) {
assertTraces(1) {
trace(0, 1) {
span(0) {
name "DynamoDb.CreateTable"
kind CLIENT
errored false
hasNoParent()
attributes {
"${SemanticAttributes.NET_TRANSPORT.key}" "IP.TCP"
"${SemanticAttributes.NET_PEER_NAME.key}" "localhost"
"${SemanticAttributes.NET_PEER_PORT.key}" server.address.port
"${SemanticAttributes.HTTP_URL.key}" { it.startsWith("${server.address}${path}") }
"${SemanticAttributes.HTTP_METHOD.key}" "$method"
"${SemanticAttributes.HTTP_STATUS_CODE.key}" 200
"${SemanticAttributes.HTTP_USER_AGENT.key}" { it.startsWith("aws-sdk-java/") }
"${SemanticAttributes.HTTP_FLAVOR.key}" "1.1"
"aws.service" "DynamoDb"
"aws.operation" "CreateTable"
"aws.agent" "java-aws-sdk"
"aws.requestId" "$requestId"
"aws.table.name" "sometable"
"${SemanticAttributes.DB_SYSTEM.key}" "dynamodb"
"${SemanticAttributes.DB_NAME.key}" "sometable"
"${SemanticAttributes.DB_OPERATION.key}" "CreateTable"
"aws.dynamodb.global_secondary_indexes" "[{\"IndexName\":\"globalIndex\",\"KeySchema\":[{\"AttributeName\":\"attribute\"}],\"ProvisionedThroughput\":{\"ReadCapacityUnits\":10,\"WriteCapacityUnits\":12}},{\"IndexName\":\"globalIndexSecondary\",\"KeySchema\":[{\"AttributeName\":\"attributeSecondary\"}],\"ProvisionedThroughput\":{\"ReadCapacityUnits\":7,\"WriteCapacityUnits\":8}}]"
"aws.dynamodb.provisioned_throughput.read_capacity_units" "1"
"aws.dynamodb.provisioned_throughput.write_capacity_units" "1"
}
}
}
}
server.lastRequest.headers.get("X-Amzn-Trace-Id") != null
server.lastRequest.headers.get("traceparent") == null
}
def assertQueryRequest(path, method, requestId) {
assertTraces(1) {
trace(0, 1) {
span(0) {
name "DynamoDb.Query"
kind CLIENT
errored false
hasNoParent()
attributes {
"${SemanticAttributes.NET_TRANSPORT.key}" "IP.TCP"
"${SemanticAttributes.NET_PEER_NAME.key}" "localhost"
"${SemanticAttributes.NET_PEER_PORT.key}" server.address.port
"${SemanticAttributes.HTTP_URL.key}" { it.startsWith("${server.address}${path}") }
"${SemanticAttributes.HTTP_METHOD.key}" "$method"
"${SemanticAttributes.HTTP_STATUS_CODE.key}" 200
"${SemanticAttributes.HTTP_USER_AGENT.key}" { it.startsWith("aws-sdk-java/") }
"${SemanticAttributes.HTTP_FLAVOR.key}" "1.1"
"aws.service" "DynamoDb"
"aws.operation" "Query"
"aws.agent" "java-aws-sdk"
"aws.requestId" "$requestId"
"aws.table.name" "sometable"
"${SemanticAttributes.DB_SYSTEM.key}" "dynamodb"
"${SemanticAttributes.DB_NAME.key}" "sometable"
"${SemanticAttributes.DB_OPERATION.key}" "Query"
"aws.dynamodb.limit" "10"
"aws.dynamodb.select" "ALL_ATTRIBUTES"
}
}
}
}
server.lastRequest.headers.get("X-Amzn-Trace-Id") != null
server.lastRequest.headers.get("traceparent") == null
}
def assertDynamoDbRequest(service, operation, path, method, requestId) {
assertTraces(1) {
trace(0, 1) {
@ -162,7 +254,7 @@ abstract class AbstractAws2ClientTest extends InstrumentationSpecification {
static dynamoDbRequestDataTable(client) {
[
["DynamoDb", "CreateTable", "POST", "/", "UNKNOWN", client,
{ c -> c.createTable(CreateTableRequest.builder().tableName("sometable").build()) }],
{ c -> c.createTable(createTableRequest()) }],
["DynamoDb", "DeleteItem", "POST", "/", "UNKNOWN", client,
{ c -> c.deleteItem(DeleteItemRequest.builder().tableName("sometable").key(of("anotherKey", val("value"), "key", val("value"))).conditionExpression("property in (:one :two)").build()) }],
["DynamoDb", "DeleteTable", "POST", "/", "UNKNOWN", client,
@ -178,6 +270,45 @@ abstract class AbstractAws2ClientTest extends InstrumentationSpecification {
]
}
static CreateTableRequest createTableRequest() {
return CreateTableRequest.builder()
.tableName("sometable")
.globalSecondaryIndexes(Arrays.asList(
GlobalSecondaryIndex.builder()
.indexName("globalIndex")
.keySchema(
KeySchemaElement.builder()
.attributeName("attribute")
.build())
.provisionedThroughput(
ProvisionedThroughput.builder()
.readCapacityUnits(10)
.writeCapacityUnits(12)
.build()
)
.build(),
GlobalSecondaryIndex.builder()
.indexName("globalIndexSecondary")
.keySchema(
KeySchemaElement.builder()
.attributeName("attributeSecondary")
.build())
.provisionedThroughput(
ProvisionedThroughput.builder()
.readCapacityUnits(7)
.writeCapacityUnits(8)
.build()
)
.build()))
.provisionedThroughput(
ProvisionedThroughput.builder()
.readCapacityUnits(1)
.writeCapacityUnits(1)
.build()
)
.build()
}
static val(String value) {
return AttributeValue.builder().s(value).build()
}