Rework reactor netty context tracking (#9286)

This commit is contained in:
Lauri Tulmin 2023-09-22 23:08:14 +03:00 committed by GitHub
parent 6ba2f499e4
commit c7617dc068
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 29 additions and 17 deletions

View File

@ -113,6 +113,11 @@ public final class HttpClientAttributesExtractor<REQUEST, RESPONSE>
if (SemconvStability.emitOldHttpSemconv()) { if (SemconvStability.emitOldHttpSemconv()) {
internalSet(attributes, SemanticAttributes.HTTP_URL, fullUrl); internalSet(attributes, SemanticAttributes.HTTP_URL, fullUrl);
} }
int resendCount = resendCountIncrementer.applyAsInt(parentContext);
if (resendCount > 0) {
attributes.put(SemanticAttributes.HTTP_RESEND_COUNT, resendCount);
}
} }
@Override @Override
@ -127,11 +132,6 @@ public final class HttpClientAttributesExtractor<REQUEST, RESPONSE>
internalNetExtractor.onEnd(attributes, request, response); internalNetExtractor.onEnd(attributes, request, response);
internalNetworkExtractor.onEnd(attributes, request, response); internalNetworkExtractor.onEnd(attributes, request, response);
internalServerExtractor.onEnd(attributes, request, response); internalServerExtractor.onEnd(attributes, request, response);
int resendCount = resendCountIncrementer.applyAsInt(context);
if (resendCount > 0) {
attributes.put(SemanticAttributes.HTTP_RESEND_COUNT, resendCount);
}
} }
/** /**

View File

@ -148,7 +148,8 @@ class HttpClientAttributesExtractorTest {
AttributeKey.stringArrayKey("http.request.header.custom_request_header"), AttributeKey.stringArrayKey("http.request.header.custom_request_header"),
asList("123", "456")), asList("123", "456")),
entry(SemanticAttributes.NET_PEER_NAME, "github.com"), entry(SemanticAttributes.NET_PEER_NAME, "github.com"),
entry(SemanticAttributes.NET_PEER_PORT, 123L)); entry(SemanticAttributes.NET_PEER_PORT, 123L),
entry(SemanticAttributes.HTTP_RESEND_COUNT, 2L));
AttributesBuilder endAttributes = Attributes.builder(); AttributesBuilder endAttributes = Attributes.builder();
extractor.onEnd(endAttributes, Context.root(), request, response, null); extractor.onEnd(endAttributes, Context.root(), request, response, null);
@ -157,7 +158,6 @@ class HttpClientAttributesExtractorTest {
entry(SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH, 10L), entry(SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH, 10L),
entry(SemanticAttributes.HTTP_STATUS_CODE, 202L), entry(SemanticAttributes.HTTP_STATUS_CODE, 202L),
entry(SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH, 20L), entry(SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH, 20L),
entry(SemanticAttributes.HTTP_RESEND_COUNT, 2L),
entry( entry(
AttributeKey.stringArrayKey("http.response.header.custom_response_header"), AttributeKey.stringArrayKey("http.response.header.custom_response_header"),
asList("654", "321")), asList("654", "321")),

View File

@ -145,7 +145,8 @@ class HttpClientAttributesExtractorBothSemconvTest {
entry(SemanticAttributes.NET_PEER_NAME, "github.com"), entry(SemanticAttributes.NET_PEER_NAME, "github.com"),
entry(SemanticAttributes.NET_PEER_PORT, 123L), entry(SemanticAttributes.NET_PEER_PORT, 123L),
entry(SemanticAttributes.SERVER_ADDRESS, "github.com"), entry(SemanticAttributes.SERVER_ADDRESS, "github.com"),
entry(SemanticAttributes.SERVER_PORT, 123L)); entry(SemanticAttributes.SERVER_PORT, 123L),
entry(SemanticAttributes.HTTP_RESEND_COUNT, 2L));
AttributesBuilder endAttributes = Attributes.builder(); AttributesBuilder endAttributes = Attributes.builder();
extractor.onEnd(endAttributes, Context.root(), request, response, null); extractor.onEnd(endAttributes, Context.root(), request, response, null);
@ -157,7 +158,6 @@ class HttpClientAttributesExtractorBothSemconvTest {
entry(SemanticAttributes.HTTP_RESPONSE_STATUS_CODE, 202L), entry(SemanticAttributes.HTTP_RESPONSE_STATUS_CODE, 202L),
entry(SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH, 20L), entry(SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH, 20L),
entry(SemanticAttributes.HTTP_RESPONSE_BODY_SIZE, 20L), entry(SemanticAttributes.HTTP_RESPONSE_BODY_SIZE, 20L),
entry(SemanticAttributes.HTTP_RESEND_COUNT, 2L),
entry( entry(
AttributeKey.stringArrayKey("http.response.header.custom_response_header"), AttributeKey.stringArrayKey("http.response.header.custom_response_header"),
asList("654", "321")), asList("654", "321")),

View File

@ -152,7 +152,8 @@ class HttpClientAttributesExtractorStableSemconvTest {
AttributeKey.stringArrayKey("http.request.header.custom_request_header"), AttributeKey.stringArrayKey("http.request.header.custom_request_header"),
asList("123", "456")), asList("123", "456")),
entry(SemanticAttributes.SERVER_ADDRESS, "github.com"), entry(SemanticAttributes.SERVER_ADDRESS, "github.com"),
entry(SemanticAttributes.SERVER_PORT, 123L)); entry(SemanticAttributes.SERVER_PORT, 123L),
entry(SemanticAttributes.HTTP_RESEND_COUNT, 2L));
AttributesBuilder endAttributes = Attributes.builder(); AttributesBuilder endAttributes = Attributes.builder();
extractor.onEnd(endAttributes, Context.root(), request, response, null); extractor.onEnd(endAttributes, Context.root(), request, response, null);
@ -161,7 +162,6 @@ class HttpClientAttributesExtractorStableSemconvTest {
entry(SemanticAttributes.HTTP_REQUEST_BODY_SIZE, 10L), entry(SemanticAttributes.HTTP_REQUEST_BODY_SIZE, 10L),
entry(SemanticAttributes.HTTP_RESPONSE_STATUS_CODE, 202L), entry(SemanticAttributes.HTTP_RESPONSE_STATUS_CODE, 202L),
entry(SemanticAttributes.HTTP_RESPONSE_BODY_SIZE, 20L), entry(SemanticAttributes.HTTP_RESPONSE_BODY_SIZE, 20L),
entry(SemanticAttributes.HTTP_RESEND_COUNT, 2L),
entry( entry(
AttributeKey.stringArrayKey("http.response.header.custom_response_header"), AttributeKey.stringArrayKey("http.response.header.custom_response_header"),
asList("654", "321")), asList("654", "321")),

View File

@ -11,6 +11,7 @@ import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientResendCount; import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientResendCount;
import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil; import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil;
import io.opentelemetry.instrumentation.api.internal.Timer; import io.opentelemetry.instrumentation.api.internal.Timer;
import io.opentelemetry.instrumentation.api.util.VirtualField;
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@ -19,6 +20,8 @@ import reactor.netty.http.client.HttpClientRequest;
import reactor.netty.http.client.HttpClientResponse; import reactor.netty.http.client.HttpClientResponse;
final class InstrumentationContexts { final class InstrumentationContexts {
private static final VirtualField<HttpClientRequest, Context> requestContextVirtualField =
VirtualField.find(HttpClientRequest.class, Context.class);
private static final AtomicReferenceFieldUpdater<InstrumentationContexts, Context> private static final AtomicReferenceFieldUpdater<InstrumentationContexts, Context>
parentContextUpdater = parentContextUpdater =
@ -56,18 +59,27 @@ final class InstrumentationContexts {
Context context = null; Context context = null;
if (instrumenter().shouldStart(parentContext, request)) { if (instrumenter().shouldStart(parentContext, request)) {
context = instrumenter().start(parentContext, request); context = instrumenter().start(parentContext, request);
requestContextVirtualField.set(request, context);
clientContexts.offer(new RequestAndContext(request, context)); clientContexts.offer(new RequestAndContext(request, context));
} }
return context; return context;
} }
// we are synchronizing here to ensure that spans are ended in the oder they are read from the void endClientSpan(@Nullable HttpClientResponse response, @Nullable Throwable error) {
// queue HttpClientRequest request = null;
synchronized void endClientSpan( Context context = null;
@Nullable HttpClientResponse response, @Nullable Throwable error) {
RequestAndContext requestAndContext = clientContexts.poll(); RequestAndContext requestAndContext = clientContexts.poll();
if (requestAndContext != null) { if (response instanceof HttpClientRequest) {
instrumenter().end(requestAndContext.context, requestAndContext.request, response, error); request = (HttpClientRequest) response;
context = requestContextVirtualField.get(request);
} else if (requestAndContext != null) {
// this branch is taken when there was an error (e.g. timeout) and response was null
request = requestAndContext.request;
context = requestAndContext.context;
}
if (request != null && context != null) {
instrumenter().end(context, request, response, error);
} }
} }