Add http client response attributes to aws sqs process spans (#10074)

This commit is contained in:
Lauri Tulmin 2023-12-14 18:07:54 +02:00 committed by GitHub
parent 30ddf6a66c
commit 37ca15b76f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 105 additions and 101 deletions

View File

@ -3,7 +3,6 @@
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.api.semconv.http.internal.HttpAttributes
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.semconv.SemanticAttributes
import spock.lang.Shared
@ -190,6 +189,7 @@ class S3TracingTest extends AgentInstrumentationSpecification {
"rpc.system" "aws-api"
"rpc.service" "AmazonSQS"
"$SemanticAttributes.HTTP_REQUEST_METHOD" "POST"
"$SemanticAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$SemanticAttributes.URL_FULL" { it.startsWith("http://") }
"$SemanticAttributes.SERVER_ADDRESS" String
"$SemanticAttributes.SERVER_PORT" { it == null || Number }
@ -197,7 +197,7 @@ class S3TracingTest extends AgentInstrumentationSpecification {
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "s3ToSqsTestQueue"
"$SemanticAttributes.MESSAGING_OPERATION" "process"
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
"$HttpAttributes.ERROR_TYPE" "_OTHER"
"$SemanticAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
}
}
span(2) {
@ -523,6 +523,7 @@ class S3TracingTest extends AgentInstrumentationSpecification {
"rpc.system" "aws-api"
"rpc.service" "AmazonSQS"
"$SemanticAttributes.HTTP_REQUEST_METHOD" "POST"
"$SemanticAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$SemanticAttributes.URL_FULL" { it.startsWith("http://") }
"$SemanticAttributes.SERVER_ADDRESS" String
"$SemanticAttributes.SERVER_PORT" { it == null || Number }
@ -530,7 +531,7 @@ class S3TracingTest extends AgentInstrumentationSpecification {
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "s3ToSnsToSqsTestQueue"
"$SemanticAttributes.MESSAGING_OPERATION" "process"
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
"$HttpAttributes.ERROR_TYPE" "_OTHER"
"$SemanticAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
}
}
span(1) {

View File

@ -3,7 +3,6 @@
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.api.semconv.http.internal.HttpAttributes
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.semconv.SemanticAttributes
import spock.lang.Shared
@ -179,6 +178,7 @@ class SnsTracingTest extends AgentInstrumentationSpecification {
"rpc.service" "AmazonSQS"
"rpc.method" "ReceiveMessage"
"$SemanticAttributes.HTTP_REQUEST_METHOD" "POST"
"$SemanticAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$SemanticAttributes.URL_FULL" String
"$SemanticAttributes.SERVER_ADDRESS" String
"$SemanticAttributes.SERVER_PORT" { it == null || Number }
@ -186,7 +186,7 @@ class SnsTracingTest extends AgentInstrumentationSpecification {
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "snsToSqsTestQueue"
"$SemanticAttributes.MESSAGING_OPERATION" "process"
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
"$HttpAttributes.ERROR_TYPE" "_OTHER"
"$SemanticAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
}
}
span(2) {

View File

@ -28,7 +28,6 @@ import io.opentelemetry.instrumentation.api.semconv.http.HttpClientAttributesExt
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
import javax.annotation.Nullable;
final class AwsSdkInstrumenterFactory {
@ -98,23 +97,23 @@ final class AwsSdkInstrumenterFactory {
openTelemetry,
MessagingSpanNameExtractor.create(getter, operation),
SpanKindExtractor.alwaysConsumer(),
toSqsRequestExtractors(attributesExtractors(), Function.identity()),
toSqsRequestExtractors(attributesExtractors()),
singletonList(messagingAttributeExtractor),
messagingReceiveInstrumentationEnabled);
}
Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter() {
Instrumenter<SqsProcessRequest, Response<?>> consumerProcessInstrumenter() {
MessageOperation operation = MessageOperation.PROCESS;
SqsProcessRequestAttributesGetter getter = SqsProcessRequestAttributesGetter.INSTANCE;
AttributesExtractor<SqsProcessRequest, Void> messagingAttributeExtractor =
AttributesExtractor<SqsProcessRequest, Response<?>> messagingAttributeExtractor =
messagingAttributesExtractor(getter, operation);
InstrumenterBuilder<SqsProcessRequest, Void> builder =
Instrumenter.<SqsProcessRequest, Void>builder(
InstrumenterBuilder<SqsProcessRequest, Response<?>> builder =
Instrumenter.<SqsProcessRequest, Response<?>>builder(
openTelemetry,
INSTRUMENTATION_NAME,
MessagingSpanNameExtractor.create(getter, operation))
.addAttributesExtractors(toSqsRequestExtractors(attributesExtractors(), unused -> null))
.addAttributesExtractors(toSqsRequestExtractors(attributesExtractors()))
.addAttributesExtractor(messagingAttributeExtractor);
if (messagingReceiveInstrumentationEnabled) {
@ -128,14 +127,12 @@ final class AwsSdkInstrumenterFactory {
return builder.buildInstrumenter(SpanKindExtractor.alwaysConsumer());
}
private static <RESPONSE>
List<AttributesExtractor<AbstractSqsRequest, RESPONSE>> toSqsRequestExtractors(
List<AttributesExtractor<Request<?>, Response<?>>> extractors,
Function<RESPONSE, Response<?>> responseConverter) {
List<AttributesExtractor<AbstractSqsRequest, RESPONSE>> result = new ArrayList<>();
private static List<AttributesExtractor<AbstractSqsRequest, Response<?>>> toSqsRequestExtractors(
List<AttributesExtractor<Request<?>, Response<?>>> extractors) {
List<AttributesExtractor<AbstractSqsRequest, Response<?>>> result = new ArrayList<>();
for (AttributesExtractor<Request<?>, Response<?>> extractor : extractors) {
result.add(
new AttributesExtractor<AbstractSqsRequest, RESPONSE>() {
new AttributesExtractor<AbstractSqsRequest, Response<?>>() {
@Override
public void onStart(
AttributesBuilder attributes,
@ -149,14 +146,9 @@ final class AwsSdkInstrumenterFactory {
AttributesBuilder attributes,
Context context,
AbstractSqsRequest sqsRequest,
@Nullable RESPONSE response,
@Nullable Response<?> response,
@Nullable Throwable error) {
extractor.onEnd(
attributes,
context,
sqsRequest.getRequest(),
responseConverter.apply(response),
error);
extractor.onEnd(attributes, context, sqsRequest.getRequest(), response, error);
}
});
}

View File

@ -47,7 +47,7 @@ public class AwsSdkTelemetry {
private final Instrumenter<Request<?>, Response<?>> requestInstrumenter;
private final Instrumenter<SqsReceiveRequest, Response<?>> consumerReceiveInstrumenter;
private final Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter;
private final Instrumenter<SqsProcessRequest, Response<?>> consumerProcessInstrumenter;
private final Instrumenter<Request<?>, Response<?>> producerInstrumenter;
AwsSdkTelemetry(

View File

@ -57,7 +57,7 @@ final class SqsImpl {
Instrumenter<SqsReceiveRequest, Response<?>> consumerReceiveInstrumenter =
requestHandler.getConsumerReceiveInstrumenter();
Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter =
Instrumenter<SqsProcessRequest, Response<?>> consumerProcessInstrumenter =
requestHandler.getConsumerProcessInstrumenter();
Context receiveContext = null;
@ -75,7 +75,8 @@ final class SqsImpl {
timer.now());
}
addTracing(receiveMessageResult, request, consumerProcessInstrumenter, receiveContext);
addTracing(
receiveMessageResult, request, response, consumerProcessInstrumenter, receiveContext);
}
private static final Field messagesField = getMessagesField();
@ -93,7 +94,8 @@ final class SqsImpl {
private static void addTracing(
ReceiveMessageResult receiveMessageResult,
Request<?> request,
Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter,
Response<?> response,
Instrumenter<SqsProcessRequest, Response<?>> consumerProcessInstrumenter,
Context receiveContext) {
if (messagesField == null) {
return;
@ -107,6 +109,7 @@ final class SqsImpl {
receiveMessageResult.getMessages(),
consumerProcessInstrumenter,
request,
response,
receiveContext));
} catch (IllegalAccessException ignored) {
// should not happen, we call setAccessible on the field

View File

@ -5,13 +5,14 @@
package io.opentelemetry.instrumentation.awssdk.v1_11;
import com.amazonaws.Response;
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;
enum SqsProcessRequestAttributesGetter
implements MessagingAttributesGetter<SqsProcessRequest, Void> {
implements MessagingAttributesGetter<SqsProcessRequest, Response<?>> {
INSTANCE;
@Override
@ -52,7 +53,7 @@ enum SqsProcessRequestAttributesGetter
@Override
@Nullable
public String getMessageId(SqsProcessRequest request, @Nullable Void response) {
public String getMessageId(SqsProcessRequest request, @Nullable Response<?> response) {
return request.getMessage().getMessageId();
}

View File

@ -6,6 +6,7 @@
package io.opentelemetry.instrumentation.awssdk.v1_11;
import com.amazonaws.Request;
import com.amazonaws.Response;
import com.amazonaws.services.sqs.model.Message;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
@ -16,8 +17,9 @@ import javax.annotation.Nullable;
class TracingIterator implements Iterator<Message> {
private final Iterator<Message> delegateIterator;
private final Instrumenter<SqsProcessRequest, Void> instrumenter;
private final Instrumenter<SqsProcessRequest, Response<?>> instrumenter;
private final Request<?> request;
private final Response<?> response;
private final Context receiveContext;
/*
@ -30,21 +32,24 @@ class TracingIterator implements Iterator<Message> {
private TracingIterator(
Iterator<Message> delegateIterator,
Instrumenter<SqsProcessRequest, Void> instrumenter,
Instrumenter<SqsProcessRequest, Response<?>> instrumenter,
Request<?> request,
Response<?> response,
Context receiveContext) {
this.delegateIterator = delegateIterator;
this.instrumenter = instrumenter;
this.request = request;
this.response = response;
this.receiveContext = receiveContext;
}
public static Iterator<Message> wrap(
Iterator<Message> delegateIterator,
Instrumenter<SqsProcessRequest, Void> instrumenter,
Instrumenter<SqsProcessRequest, Response<?>> instrumenter,
Request<?> request,
Response<?> response,
Context receiveContext) {
return new TracingIterator(delegateIterator, instrumenter, request, receiveContext);
return new TracingIterator(delegateIterator, instrumenter, request, response, receiveContext);
}
@Override
@ -80,7 +85,7 @@ class TracingIterator implements Iterator<Message> {
private void closeScopeAndEndSpan() {
if (currentScope != null) {
currentScope.close();
instrumenter.end(currentContext, currentRequest, null, null);
instrumenter.end(currentContext, currentRequest, response, null);
currentScope = null;
currentRequest = null;
currentContext = null;

View File

@ -6,6 +6,7 @@
package io.opentelemetry.instrumentation.awssdk.v1_11;
import com.amazonaws.Request;
import com.amazonaws.Response;
import com.amazonaws.internal.SdkInternalList;
import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.Message;
@ -18,28 +19,32 @@ import java.util.function.Consumer;
class TracingList extends SdkInternalList<Message> {
private static final long serialVersionUID = 1L;
private final transient Instrumenter<SqsProcessRequest, Void> instrumenter;
private final transient Instrumenter<SqsProcessRequest, Response<?>> instrumenter;
private final transient Request<?> request;
private final transient Response<?> response;
private final transient Context receiveContext;
private boolean firstIterator = true;
private TracingList(
List<Message> list,
Instrumenter<SqsProcessRequest, Void> instrumenter,
Instrumenter<SqsProcessRequest, Response<?>> instrumenter,
Request<?> request,
Response<?> response,
Context receiveContext) {
super(list);
this.instrumenter = instrumenter;
this.request = request;
this.response = response;
this.receiveContext = receiveContext;
}
public static SdkInternalList<Message> wrap(
List<Message> list,
Instrumenter<SqsProcessRequest, Void> instrumenter,
Instrumenter<SqsProcessRequest, Response<?>> instrumenter,
Request<?> request,
Response<?> response,
Context receiveContext) {
return new TracingList(list, instrumenter, request, receiveContext);
return new TracingList(list, instrumenter, request, response, receiveContext);
}
@Override
@ -49,7 +54,7 @@ class TracingList extends SdkInternalList<Message> {
// However, this is not thread-safe, but usually the first (hopefully only) traversal of
// List is performed in the same thread that called receiveMessage()
if (firstIterator && !inAwsClient()) {
it = TracingIterator.wrap(super.iterator(), instrumenter, request, receiveContext);
it = TracingIterator.wrap(super.iterator(), instrumenter, request, response, receiveContext);
firstIterator = false;
} else {
it = super.iterator();

View File

@ -34,13 +34,13 @@ final class TracingRequestHandler extends RequestHandler2 {
private final Instrumenter<Request<?>, Response<?>> requestInstrumenter;
private final Instrumenter<SqsReceiveRequest, Response<?>> consumerReceiveInstrumenter;
private final Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter;
private final Instrumenter<SqsProcessRequest, Response<?>> consumerProcessInstrumenter;
private final Instrumenter<Request<?>, Response<?>> producerInstrumenter;
TracingRequestHandler(
Instrumenter<Request<?>, Response<?>> requestInstrumenter,
Instrumenter<SqsReceiveRequest, Response<?>> consumerReceiveInstrumenter,
Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter,
Instrumenter<SqsProcessRequest, Response<?>> consumerProcessInstrumenter,
Instrumenter<Request<?>, Response<?>> producerInstrumenter) {
this.requestInstrumenter = requestInstrumenter;
this.consumerReceiveInstrumenter = consumerReceiveInstrumenter;
@ -103,7 +103,7 @@ final class TracingRequestHandler extends RequestHandler2 {
return consumerReceiveInstrumenter;
}
Instrumenter<SqsProcessRequest, Void> getConsumerProcessInstrumenter() {
Instrumenter<SqsProcessRequest, Response<?>> getConsumerProcessInstrumenter() {
return consumerProcessInstrumenter;
}

View File

@ -12,7 +12,6 @@ import com.amazonaws.services.sqs.AmazonSQSAsyncClient
import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder
import com.amazonaws.services.sqs.model.ReceiveMessageRequest
import com.amazonaws.services.sqs.model.SendMessageRequest
import io.opentelemetry.instrumentation.api.semconv.http.internal.HttpAttributes
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
import io.opentelemetry.instrumentation.test.utils.PortUtils
import io.opentelemetry.semconv.SemanticAttributes
@ -121,6 +120,7 @@ abstract class AbstractSqsSuppressReceiveSpansTest extends InstrumentationSpecif
"rpc.system" "aws-api"
"rpc.service" "AmazonSQS"
"$SemanticAttributes.HTTP_REQUEST_METHOD" "POST"
"$SemanticAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$SemanticAttributes.URL_FULL" "http://localhost:$sqsPort"
"$SemanticAttributes.SERVER_ADDRESS" "localhost"
"$SemanticAttributes.SERVER_PORT" sqsPort
@ -128,7 +128,7 @@ abstract class AbstractSqsSuppressReceiveSpansTest extends InstrumentationSpecif
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
"$SemanticAttributes.MESSAGING_OPERATION" "process"
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
"$HttpAttributes.ERROR_TYPE" "_OTHER"
"$SemanticAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
}
}
span(2) {
@ -213,6 +213,7 @@ abstract class AbstractSqsSuppressReceiveSpansTest extends InstrumentationSpecif
"rpc.system" "aws-api"
"rpc.service" "AmazonSQS"
"$SemanticAttributes.HTTP_REQUEST_METHOD" "POST"
"$SemanticAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$SemanticAttributes.URL_FULL" "http://localhost:$sqsPort"
"$SemanticAttributes.SERVER_ADDRESS" "localhost"
"$SemanticAttributes.SERVER_PORT" sqsPort
@ -220,7 +221,7 @@ abstract class AbstractSqsSuppressReceiveSpansTest extends InstrumentationSpecif
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
"$SemanticAttributes.MESSAGING_OPERATION" "process"
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
"$HttpAttributes.ERROR_TYPE" "_OTHER"
"$SemanticAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
}
}
span(2) {

View File

@ -13,7 +13,6 @@ import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder
import com.amazonaws.services.sqs.model.MessageAttributeValue
import com.amazonaws.services.sqs.model.ReceiveMessageRequest
import com.amazonaws.services.sqs.model.SendMessageRequest
import io.opentelemetry.instrumentation.api.semconv.http.internal.HttpAttributes
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
import io.opentelemetry.instrumentation.test.utils.PortUtils
import io.opentelemetry.sdk.trace.data.SpanData
@ -172,6 +171,7 @@ abstract class AbstractSqsTracingTest extends InstrumentationSpecification {
"rpc.system" "aws-api"
"rpc.service" "AmazonSQS"
"$SemanticAttributes.HTTP_REQUEST_METHOD" "POST"
"$SemanticAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$SemanticAttributes.URL_FULL" "http://localhost:$sqsPort"
"$SemanticAttributes.SERVER_ADDRESS" "localhost"
"$SemanticAttributes.SERVER_PORT" sqsPort
@ -179,10 +179,10 @@ abstract class AbstractSqsTracingTest extends InstrumentationSpecification {
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
"$SemanticAttributes.MESSAGING_OPERATION" "process"
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
"$SemanticAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
if (testCaptureHeaders) {
"messaging.header.test_message_header" { it == ["test"] }
}
"$HttpAttributes.ERROR_TYPE" "_OTHER"
}
}
span(2) {
@ -335,6 +335,7 @@ abstract class AbstractSqsTracingTest extends InstrumentationSpecification {
"rpc.system" "aws-api"
"rpc.service" "AmazonSQS"
"$SemanticAttributes.HTTP_REQUEST_METHOD" "POST"
"$SemanticAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$SemanticAttributes.URL_FULL" "http://localhost:$sqsPort"
"$SemanticAttributes.SERVER_ADDRESS" "localhost"
"$SemanticAttributes.SERVER_PORT" sqsPort
@ -342,7 +343,7 @@ abstract class AbstractSqsTracingTest extends InstrumentationSpecification {
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
"$SemanticAttributes.MESSAGING_OPERATION" "process"
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
"$HttpAttributes.ERROR_TYPE" "_OTHER"
"$SemanticAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
}
}
span(4) {

View File

@ -27,7 +27,6 @@ import io.opentelemetry.instrumentation.api.semconv.http.HttpClientAttributesExt
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
import javax.annotation.Nullable;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.core.interceptor.SdkExecutionAttribute;
@ -129,22 +128,21 @@ final class AwsSdkInstrumenterFactory {
openTelemetry,
MessagingSpanNameExtractor.create(getter, operation),
SpanKindExtractor.alwaysConsumer(),
toSqsRequestExtractors(consumerAttributesExtractors(), Function.identity()),
toSqsRequestExtractors(consumerAttributesExtractors()),
singletonList(messagingAttributeExtractor),
messagingReceiveInstrumentationEnabled);
}
Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter() {
Instrumenter<SqsProcessRequest, Response> consumerProcessInstrumenter() {
MessageOperation operation = MessageOperation.PROCESS;
SqsProcessRequestAttributesGetter getter = SqsProcessRequestAttributesGetter.INSTANCE;
InstrumenterBuilder<SqsProcessRequest, Void> builder =
Instrumenter.<SqsProcessRequest, Void>builder(
InstrumenterBuilder<SqsProcessRequest, Response> builder =
Instrumenter.<SqsProcessRequest, Response>builder(
openTelemetry,
INSTRUMENTATION_NAME,
MessagingSpanNameExtractor.create(getter, operation))
.addAttributesExtractors(
toSqsRequestExtractors(consumerAttributesExtractors(), unused -> null))
.addAttributesExtractors(toSqsRequestExtractors(consumerAttributesExtractors()))
.addAttributesExtractor(messagingAttributesExtractor(getter, operation));
if (messagingReceiveInstrumentationEnabled) {
@ -159,14 +157,12 @@ final class AwsSdkInstrumenterFactory {
return builder.buildInstrumenter(SpanKindExtractor.alwaysConsumer());
}
private static <RESPONSE>
List<AttributesExtractor<AbstractSqsRequest, RESPONSE>> toSqsRequestExtractors(
List<AttributesExtractor<ExecutionAttributes, Response>> extractors,
Function<RESPONSE, Response> responseConverter) {
List<AttributesExtractor<AbstractSqsRequest, RESPONSE>> result = new ArrayList<>();
private static List<AttributesExtractor<AbstractSqsRequest, Response>> toSqsRequestExtractors(
List<AttributesExtractor<ExecutionAttributes, Response>> extractors) {
List<AttributesExtractor<AbstractSqsRequest, Response>> result = new ArrayList<>();
for (AttributesExtractor<ExecutionAttributes, Response> extractor : extractors) {
result.add(
new AttributesExtractor<AbstractSqsRequest, RESPONSE>() {
new AttributesExtractor<AbstractSqsRequest, Response>() {
@Override
public void onStart(
AttributesBuilder attributes,
@ -180,14 +176,9 @@ final class AwsSdkInstrumenterFactory {
AttributesBuilder attributes,
Context context,
AbstractSqsRequest sqsRequest,
@Nullable RESPONSE response,
@Nullable Response response,
@Nullable Throwable error) {
extractor.onEnd(
attributes,
context,
sqsRequest.getRequest(),
responseConverter.apply(response),
error);
extractor.onEnd(attributes, context, sqsRequest.getRequest(), response, error);
}
});
}

View File

@ -46,7 +46,7 @@ public class AwsSdkTelemetry {
private final Instrumenter<ExecutionAttributes, Response> requestInstrumenter;
private final Instrumenter<SqsReceiveRequest, Response> consumerReceiveInstrumenter;
private final Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter;
private final Instrumenter<SqsProcessRequest, Response> consumerProcessInstrumenter;
private final Instrumenter<ExecutionAttributes, Response> producerInstrumenter;
private final boolean captureExperimentalSpanAttributes;
@Nullable private final TextMapPropagator messagingPropagator;

View File

@ -102,6 +102,7 @@ final class SqsImpl {
response.messages(),
config.getConsumerProcessInstrumenter(),
copy,
new Response(context.httpResponse(), response),
config,
receiveContext);

View File

@ -12,7 +12,7 @@ import javax.annotation.Nullable;
import software.amazon.awssdk.core.SdkRequest;
enum SqsProcessRequestAttributesGetter
implements MessagingAttributesGetter<SqsProcessRequest, Void> {
implements MessagingAttributesGetter<SqsProcessRequest, Response> {
INSTANCE;
@Override
@ -59,7 +59,7 @@ enum SqsProcessRequestAttributesGetter
@Override
@Nullable
public String getMessageId(SqsProcessRequest request, @Nullable Void response) {
public String getMessageId(SqsProcessRequest request, @Nullable Response response) {
return request.getMessage().getMessageId();
}

View File

@ -65,7 +65,7 @@ final class TracingExecutionInterceptor implements ExecutionInterceptor {
private final Instrumenter<ExecutionAttributes, Response> requestInstrumenter;
private final Instrumenter<SqsReceiveRequest, Response> consumerReceiveInstrumenter;
private final Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter;
private final Instrumenter<SqsProcessRequest, Response> consumerProcessInstrumenter;
private final Instrumenter<ExecutionAttributes, Response> producerInstrumenter;
private final boolean captureExperimentalSpanAttributes;
@ -77,7 +77,7 @@ final class TracingExecutionInterceptor implements ExecutionInterceptor {
return consumerReceiveInstrumenter;
}
Instrumenter<SqsProcessRequest, Void> getConsumerProcessInstrumenter() {
Instrumenter<SqsProcessRequest, Response> getConsumerProcessInstrumenter() {
return consumerProcessInstrumenter;
}
@ -98,7 +98,7 @@ final class TracingExecutionInterceptor implements ExecutionInterceptor {
TracingExecutionInterceptor(
Instrumenter<ExecutionAttributes, Response> requestInstrumenter,
Instrumenter<SqsReceiveRequest, Response> consumerReceiveInstrumenter,
Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter,
Instrumenter<SqsProcessRequest, Response> consumerProcessInstrumenter,
Instrumenter<ExecutionAttributes, Response> producerInstrumenter,
boolean captureExperimentalSpanAttributes,
TextMapPropagator messagingPropagator,

View File

@ -16,8 +16,9 @@ import software.amazon.awssdk.services.sqs.model.Message;
class TracingIterator implements Iterator<Message> {
private final Iterator<Message> delegateIterator;
private final Instrumenter<SqsProcessRequest, Void> instrumenter;
private final Instrumenter<SqsProcessRequest, Response> instrumenter;
private final ExecutionAttributes request;
private final Response response;
private final TracingExecutionInterceptor config;
private final Context receiveContext;
@ -31,24 +32,28 @@ class TracingIterator implements Iterator<Message> {
private TracingIterator(
Iterator<Message> delegateIterator,
Instrumenter<SqsProcessRequest, Void> instrumenter,
Instrumenter<SqsProcessRequest, Response> instrumenter,
ExecutionAttributes request,
Response response,
TracingExecutionInterceptor config,
Context receiveContext) {
this.delegateIterator = delegateIterator;
this.instrumenter = instrumenter;
this.request = request;
this.response = response;
this.config = config;
this.receiveContext = receiveContext;
}
public static Iterator<Message> wrap(
Iterator<Message> delegateIterator,
Instrumenter<SqsProcessRequest, Void> instrumenter,
Instrumenter<SqsProcessRequest, Response> instrumenter,
ExecutionAttributes request,
Response response,
TracingExecutionInterceptor config,
Context receiveContext) {
return new TracingIterator(delegateIterator, instrumenter, request, config, receiveContext);
return new TracingIterator(
delegateIterator, instrumenter, request, response, config, receiveContext);
}
@Override
@ -85,7 +90,7 @@ class TracingIterator implements Iterator<Message> {
private void closeScopeAndEndSpan() {
if (currentScope != null) {
currentScope.close();
instrumenter.end(currentContext, currentRequest, null, null);
instrumenter.end(currentContext, currentRequest, response, null);
currentScope = null;
currentRequest = null;
currentContext = null;

View File

@ -17,32 +17,36 @@ import software.amazon.awssdk.services.sqs.model.Message;
class TracingList extends ArrayList<Message> {
private static final long serialVersionUID = 1L;
private final Instrumenter<SqsProcessRequest, Void> instrumenter;
private final Instrumenter<SqsProcessRequest, Response> instrumenter;
private final ExecutionAttributes request;
private final Response response;
private final TracingExecutionInterceptor config;
private final Context receiveContext;
private boolean firstIterator = true;
private TracingList(
List<Message> list,
Instrumenter<SqsProcessRequest, Void> instrumenter,
Instrumenter<SqsProcessRequest, Response> instrumenter,
ExecutionAttributes request,
Response response,
TracingExecutionInterceptor config,
Context receiveContext) {
super(list);
this.instrumenter = instrumenter;
this.request = request;
this.response = response;
this.config = config;
this.receiveContext = receiveContext;
}
public static TracingList wrap(
List<Message> list,
Instrumenter<SqsProcessRequest, Void> instrumenter,
Instrumenter<SqsProcessRequest, Response> instrumenter,
ExecutionAttributes request,
Response response,
TracingExecutionInterceptor config,
Context receiveContext) {
return new TracingList(list, instrumenter, request, config, receiveContext);
return new TracingList(list, instrumenter, request, response, config, receiveContext);
}
@Override
@ -52,7 +56,9 @@ class TracingList extends ArrayList<Message> {
// However, this is not thread-safe, but usually the first (hopefully only) traversal of
// List is performed in the same thread that called receiveMessage()
if (firstIterator) {
it = TracingIterator.wrap(super.iterator(), instrumenter, request, config, receiveContext);
it =
TracingIterator.wrap(
super.iterator(), instrumenter, request, response, config, receiveContext);
firstIterator = false;
} else {
it = super.iterator();

View File

@ -5,7 +5,6 @@
package io.opentelemetry.instrumentation.awssdk.v2_2
import io.opentelemetry.instrumentation.api.semconv.http.internal.HttpAttributes
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
import io.opentelemetry.semconv.SemanticAttributes
import org.elasticmq.rest.sqs.SQSRestServerBuilder
@ -172,6 +171,7 @@ abstract class AbstractAws2SqsSuppressReceiveSpansTest extends InstrumentationSp
"rpc.system" "aws-api"
"rpc.service" "Sqs"
"$SemanticAttributes.HTTP_REQUEST_METHOD" "POST"
"$SemanticAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$SemanticAttributes.URL_FULL" { it.startsWith("http://localhost:$sqsPort") }
"$SemanticAttributes.SERVER_ADDRESS" "localhost"
"$SemanticAttributes.SERVER_PORT" sqsPort
@ -179,7 +179,6 @@ abstract class AbstractAws2SqsSuppressReceiveSpansTest extends InstrumentationSp
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
"$SemanticAttributes.MESSAGING_OPERATION" "process"
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
"$HttpAttributes.ERROR_TYPE" "_OTHER"
}
}
span(2) {
@ -345,6 +344,7 @@ abstract class AbstractAws2SqsSuppressReceiveSpansTest extends InstrumentationSp
"rpc.system" "aws-api"
"rpc.service" "Sqs"
"$SemanticAttributes.HTTP_REQUEST_METHOD" "POST"
"$SemanticAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$SemanticAttributes.URL_FULL" { it.startsWith("http://localhost:$sqsPort") }
"$SemanticAttributes.SERVER_ADDRESS" "localhost"
"$SemanticAttributes.SERVER_PORT" sqsPort
@ -352,7 +352,6 @@ abstract class AbstractAws2SqsSuppressReceiveSpansTest extends InstrumentationSp
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
"$SemanticAttributes.MESSAGING_OPERATION" "process"
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
"$HttpAttributes.ERROR_TYPE" "_OTHER"
}
}
}
@ -373,6 +372,7 @@ abstract class AbstractAws2SqsSuppressReceiveSpansTest extends InstrumentationSp
"rpc.system" "aws-api"
"rpc.service" "Sqs"
"$SemanticAttributes.HTTP_REQUEST_METHOD" "POST"
"$SemanticAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$SemanticAttributes.URL_FULL" { it.startsWith("http://localhost:$sqsPort") }
"$SemanticAttributes.SERVER_ADDRESS" "localhost"
"$SemanticAttributes.SERVER_PORT" sqsPort
@ -380,7 +380,6 @@ abstract class AbstractAws2SqsSuppressReceiveSpansTest extends InstrumentationSp
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
"$SemanticAttributes.MESSAGING_OPERATION" "process"
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
"$HttpAttributes.ERROR_TYPE" "_OTHER"
}
}
}

View File

@ -5,7 +5,6 @@
package io.opentelemetry.instrumentation.awssdk.v2_2
import io.opentelemetry.instrumentation.api.semconv.http.internal.HttpAttributes
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
import io.opentelemetry.sdk.trace.data.SpanData
import io.opentelemetry.semconv.SemanticAttributes
@ -237,6 +236,7 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification {
"rpc.system" "aws-api"
"rpc.service" "Sqs"
"$SemanticAttributes.HTTP_REQUEST_METHOD" "POST"
"$SemanticAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$SemanticAttributes.URL_FULL" { it.startsWith("http://localhost:$sqsPort") }
"$SemanticAttributes.SERVER_ADDRESS" "localhost"
"$SemanticAttributes.SERVER_PORT" sqsPort
@ -247,7 +247,6 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification {
if (captureHeaders) {
"messaging.header.test_message_header" { it == ["test"] }
}
"$HttpAttributes.ERROR_TYPE" "_OTHER"
}
}
span(2 + offset) {
@ -450,6 +449,7 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification {
"rpc.system" "aws-api"
"rpc.service" "Sqs"
"$SemanticAttributes.HTTP_REQUEST_METHOD" "POST"
"$SemanticAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$SemanticAttributes.URL_FULL" { it.startsWith("http://localhost:$sqsPort") }
"$SemanticAttributes.SERVER_ADDRESS" "localhost"
"$SemanticAttributes.SERVER_PORT" sqsPort
@ -457,7 +457,6 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification {
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
"$SemanticAttributes.MESSAGING_OPERATION" "process"
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
"$HttpAttributes.ERROR_TYPE" "_OTHER"
}
}
span(1 + 2*i + 1) {

View File

@ -12,7 +12,6 @@ import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satis
import static org.assertj.core.api.Assertions.assertThat;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.api.semconv.http.internal.HttpAttributes;
import io.opentelemetry.sdk.testing.assertj.AttributeAssertion;
import io.opentelemetry.sdk.testing.assertj.SpanDataAssert;
import io.opentelemetry.semconv.SemanticAttributes;
@ -67,6 +66,7 @@ class AwsSpanAssertions {
val.satisfiesAnyOf(
v -> assertThat(v).isEqualTo(queueUrl), v -> assertThat(v).isNull())),
equalTo(SemanticAttributes.HTTP_REQUEST_METHOD, "POST"),
equalTo(SemanticAttributes.HTTP_RESPONSE_STATUS_CODE, 200),
satisfies(SemanticAttributes.URL_FULL, val -> val.isInstanceOf(String.class)),
satisfies(
SemanticAttributes.SERVER_ADDRESS,
@ -77,16 +77,11 @@ class AwsSpanAssertions {
val.satisfiesAnyOf(
v -> assertThat(v).isNull(),
v -> assertThat(v).isInstanceOf(Number.class))),
equalTo(SemanticAttributes.NETWORK_PROTOCOL_VERSION, "1.1"),
equalTo(stringKey("rpc.system"), "aws-api"),
satisfies(stringKey("rpc.method"), stringAssert -> stringAssert.isEqualTo(rpcMethod)),
equalTo(stringKey("rpc.service"), "AmazonSQS")));
if (!spanName.endsWith("process")) {
attributeAssertions.addAll(
Arrays.asList(
equalTo(SemanticAttributes.HTTP_RESPONSE_STATUS_CODE, 200),
equalTo(SemanticAttributes.NETWORK_PROTOCOL_VERSION, "1.1")));
}
if (spanName.endsWith("receive")
|| spanName.endsWith("process")
|| spanName.endsWith("publish")) {
@ -100,7 +95,6 @@ class AwsSpanAssertions {
attributeAssertions.add(equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"));
attributeAssertions.add(
satisfies(SemanticAttributes.MESSAGING_MESSAGE_ID, val -> assertThat(val).isNotNull()));
attributeAssertions.add(equalTo(HttpAttributes.ERROR_TYPE, "_OTHER"));
} else if (spanName.endsWith("publish")) {
attributeAssertions.add(equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"));
attributeAssertions.add(