Convert aws lambda integration to Instrumenter (#4362)

* Convert AwsLambdaMessageTracer to Instrumenter

* Convert AwsLambdaTracer to Instrumenter

* Rename class

* Polish

* Polish

* Move classes to internal package
This commit is contained in:
Nikita Salnikov-Tarnovski 2021-10-15 22:31:08 +03:00 committed by GitHub
parent 2904578875
commit 699a5adb00
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 648 additions and 425 deletions

View File

@ -5,23 +5,26 @@
package io.opentelemetry.javaagent.instrumentation.awslambda.v1_0;
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.instrumentation.awslambda.v1_0.AwsLambdaMessageTracer;
import io.opentelemetry.instrumentation.awslambda.v1_0.AwsLambdaTracer;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.awslambda.v1_0.internal.AwsLambdaFunctionInstrumenter;
import io.opentelemetry.instrumentation.awslambda.v1_0.internal.AwsLambdaFunctionInstrumenterFactory;
import io.opentelemetry.instrumentation.awslambda.v1_0.internal.AwsLambdaSqsInstrumenterFactory;
public final class AwsLambdaInstrumentationHelper {
private static final AwsLambdaTracer FUNCTION_TRACER =
new AwsLambdaTracer(GlobalOpenTelemetry.get());
private static final AwsLambdaFunctionInstrumenter FUNCTION_INSTRUMENTER =
AwsLambdaFunctionInstrumenterFactory.createInstrumenter(GlobalOpenTelemetry.get());
public static AwsLambdaTracer functionTracer() {
return FUNCTION_TRACER;
public static AwsLambdaFunctionInstrumenter functionInstrumenter() {
return FUNCTION_INSTRUMENTER;
}
private static final AwsLambdaMessageTracer MESSAGE_TRACER =
new AwsLambdaMessageTracer(GlobalOpenTelemetry.get());
private static final Instrumenter<SQSEvent, Void> MESSAGE_TRACER =
AwsLambdaSqsInstrumenterFactory.forEvent(GlobalOpenTelemetry.get());
public static AwsLambdaMessageTracer messageTracer() {
public static Instrumenter<SQSEvent, Void> messageInstrumenter() {
return MESSAGE_TRACER;
}

View File

@ -7,8 +7,8 @@ package io.opentelemetry.javaagent.instrumentation.awslambda.v1_0;
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed;
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.implementsInterface;
import static io.opentelemetry.javaagent.instrumentation.awslambda.v1_0.AwsLambdaInstrumentationHelper.functionTracer;
import static io.opentelemetry.javaagent.instrumentation.awslambda.v1_0.AwsLambdaInstrumentationHelper.messageTracer;
import static io.opentelemetry.javaagent.instrumentation.awslambda.v1_0.AwsLambdaInstrumentationHelper.functionInstrumenter;
import static io.opentelemetry.javaagent.instrumentation.awslambda.v1_0.AwsLambdaInstrumentationHelper.messageInstrumenter;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
@ -16,11 +16,12 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.awslambda.v1_0.AwsLambdaRequest;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.api.OpenTelemetrySdkAccess;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
@ -56,21 +57,34 @@ public class AwsLambdaRequestHandlerInstrumentation implements TypeInstrumentati
public static void onEnter(
@Advice.Argument(value = 0, typing = Typing.DYNAMIC) Object arg,
@Advice.Argument(1) Context context,
@Advice.Local("otelInput") AwsLambdaRequest input,
@Advice.Local("otelFunctionContext") io.opentelemetry.context.Context functionContext,
@Advice.Local("otelFunctionScope") Scope functionScope,
@Advice.Local("otelMessageContext") io.opentelemetry.context.Context messageContext,
@Advice.Local("otelMessageScope") Scope messageScope) {
functionContext = functionTracer().startSpan(context, SpanKind.SERVER, arg);
input = AwsLambdaRequest.create(context, arg, Collections.emptyMap());
io.opentelemetry.context.Context parentContext = functionInstrumenter().extract(input);
if (!functionInstrumenter().shouldStart(parentContext, input)) {
return;
}
functionContext = functionInstrumenter().start(parentContext, input);
functionScope = functionContext.makeCurrent();
if (arg instanceof SQSEvent) {
messageContext = messageTracer().startSpan(functionContext, (SQSEvent) arg);
messageScope = messageContext.makeCurrent();
if (messageInstrumenter().shouldStart(functionContext, (SQSEvent) arg)) {
messageContext = messageInstrumenter().start(functionContext, (SQSEvent) arg);
messageScope = messageContext.makeCurrent();
}
}
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(
@Advice.Argument(value = 0, typing = Typing.DYNAMIC) Object arg,
@Advice.Thrown Throwable throwable,
@Advice.Local("otelInput") AwsLambdaRequest input,
@Advice.Local("otelFunctionContext") io.opentelemetry.context.Context functionContext,
@Advice.Local("otelFunctionScope") Scope functionScope,
@Advice.Local("otelMessageContext") io.opentelemetry.context.Context messageContext,
@ -78,19 +92,14 @@ public class AwsLambdaRequestHandlerInstrumentation implements TypeInstrumentati
if (messageScope != null) {
messageScope.close();
if (throwable != null) {
messageTracer().endExceptionally(messageContext, throwable);
} else {
messageTracer().end(messageContext);
}
messageInstrumenter().end(messageContext, (SQSEvent) arg, null, throwable);
}
functionScope.close();
if (throwable != null) {
functionTracer().endExceptionally(functionContext, throwable);
} else {
functionTracer().end(functionContext);
if (functionScope != null) {
functionScope.close();
functionInstrumenter().end(functionContext, input, null, throwable);
}
OpenTelemetrySdkAccess.forceFlush(1, TimeUnit.SECONDS);
}
}

View File

@ -6,6 +6,9 @@ dependencies {
compileOnly("io.opentelemetry:opentelemetry-sdk")
compileOnly("io.opentelemetry:opentelemetry-sdk-extension-autoconfigure")
compileOnly("com.google.auto.value:auto-value-annotations")
annotationProcessor("com.google.auto.value:auto-value")
library("com.amazonaws:aws-lambda-java-core:1.0.0")
// First version to includes support for SQSEvent, currently the most popular message queue used
// with lambda.

View File

@ -1,85 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awslambda.v1_0;
import static io.opentelemetry.api.trace.SpanKind.CONSUMER;
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.tracer.BaseTracer;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
public class AwsLambdaMessageTracer extends BaseTracer {
private static final String AWS_TRACE_HEADER_SQS_ATTRIBUTE_KEY = "AWSTraceHeader";
public AwsLambdaMessageTracer(OpenTelemetry openTelemetry) {
super(openTelemetry);
}
public Context startSpan(Context parentContext, SQSEvent event) {
// Use event source in name if all messages have the same source, otherwise use placeholder.
String source = "multiple_sources";
if (!event.getRecords().isEmpty()) {
String messageSource = event.getRecords().get(0).getEventSource();
for (int i = 1; i < event.getRecords().size(); i++) {
SQSMessage message = event.getRecords().get(i);
if (!message.getEventSource().equals(messageSource)) {
messageSource = null;
break;
}
}
if (messageSource != null) {
source = messageSource;
}
}
SpanBuilder span = spanBuilder(parentContext, source + " process", CONSUMER);
span.setAttribute(SemanticAttributes.MESSAGING_SYSTEM, "AmazonSQS");
span.setAttribute(SemanticAttributes.MESSAGING_OPERATION, "process");
for (SQSMessage message : event.getRecords()) {
addLinkToMessageParent(message, span);
}
return parentContext.with(span.startSpan());
}
public Context startSpan(Context parentContext, SQSMessage message) {
SpanBuilder span = spanBuilder(parentContext, message.getEventSource() + " process", CONSUMER);
span.setAttribute(SemanticAttributes.MESSAGING_SYSTEM, "AmazonSQS");
span.setAttribute(SemanticAttributes.MESSAGING_OPERATION, "process");
span.setAttribute(SemanticAttributes.MESSAGING_MESSAGE_ID, message.getMessageId());
span.setAttribute(SemanticAttributes.MESSAGING_DESTINATION, message.getEventSource());
addLinkToMessageParent(message, span);
return parentContext.with(span.startSpan());
}
private static void addLinkToMessageParent(SQSMessage message, SpanBuilder span) {
String parentHeader = message.getAttributes().get(AWS_TRACE_HEADER_SQS_ATTRIBUTE_KEY);
if (parentHeader != null) {
SpanContext parentCtx =
Span.fromContext(ParentContextExtractor.fromXrayHeader(parentHeader)).getSpanContext();
if (parentCtx.isValid()) {
span.addLink(parentCtx);
}
}
}
@Override
protected String getInstrumentationName() {
return "io.opentelemetry.aws-lambda-1.0";
}
}

View File

@ -0,0 +1,25 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awslambda.v1_0;
import com.amazonaws.services.lambda.runtime.Context;
import com.google.auto.value.AutoValue;
import java.util.Map;
@AutoValue
public abstract class AwsLambdaRequest {
public static AwsLambdaRequest create(
Context awsContext, Object input, Map<String, String> headers) {
return new AutoValue_AwsLambdaRequest(awsContext, input, headers);
}
public abstract Context getAwsContext();
public abstract Object getInput();
public abstract Map<String, String> getHeaders();
}

View File

@ -1,139 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awslambda.v1_0;
import static io.opentelemetry.semconv.resource.attributes.ResourceAttributes.CLOUD_ACCOUNT_ID;
import static io.opentelemetry.semconv.resource.attributes.ResourceAttributes.FAAS_ID;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.FAAS_EXECUTION;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.FAAS_TRIGGER;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent;
import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyResponseEvent;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.api.tracer.BaseTracer;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes.FaasTriggerValues;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.util.Collections;
import java.util.Map;
import org.checkerframework.checker.nullness.qual.Nullable;
// Context is defined in both OTel and Lambda
@SuppressWarnings("ParameterPackage")
public class AwsLambdaTracer extends BaseTracer {
@Nullable private static final MethodHandle GET_FUNCTION_ARN;
static {
MethodHandles.Lookup lookup = MethodHandles.publicLookup();
MethodHandle getFunctionArn;
try {
getFunctionArn =
lookup.findVirtual(
Context.class, "getInvokedFunctionArn", MethodType.methodType(String.class));
} catch (NoSuchMethodException | IllegalAccessException e) {
getFunctionArn = null;
}
GET_FUNCTION_ARN = getFunctionArn;
}
// cached accountId value
private volatile String accountId;
public AwsLambdaTracer(OpenTelemetry openTelemetry) {
super(openTelemetry);
}
private void setAttributes(SpanBuilder span, Context context, Object input) {
setCommonAttributes(span, context);
if (input instanceof APIGatewayProxyRequestEvent) {
span.setAttribute(FAAS_TRIGGER, FaasTriggerValues.HTTP);
HttpSpanAttributes.onRequest(span, (APIGatewayProxyRequestEvent) input);
}
}
private void setCommonAttributes(SpanBuilder span, Context context) {
span.setAttribute(FAAS_EXECUTION, context.getAwsRequestId());
String arn = getFunctionArn(context);
if (arn != null) {
span.setAttribute(FAAS_ID, arn);
}
String accountId = getAccountId(arn);
if (accountId != null) {
span.setAttribute(CLOUD_ACCOUNT_ID, accountId);
}
}
@Nullable
private static String getFunctionArn(Context context) {
if (GET_FUNCTION_ARN == null) {
return null;
}
try {
return (String) GET_FUNCTION_ARN.invoke(context);
} catch (Throwable throwable) {
return null;
}
}
@Nullable
private String getAccountId(@Nullable String arn) {
if (arn == null) {
return null;
}
if (accountId == null) {
synchronized (this) {
if (accountId == null) {
String[] arnParts = arn.split(":");
if (arnParts.length >= 5) {
accountId = arnParts[4];
}
}
}
}
return accountId;
}
private static String spanName(Context context, Object input) {
String name = null;
if (input instanceof APIGatewayProxyRequestEvent) {
name = ((APIGatewayProxyRequestEvent) input).getResource();
}
return name == null ? context.getFunctionName() : name;
}
public io.opentelemetry.context.Context startSpan(
Context awsContext, SpanKind kind, Object input) {
return startSpan(awsContext, kind, input, Collections.emptyMap());
}
public io.opentelemetry.context.Context startSpan(
Context awsContext, SpanKind kind, Object input, Map<String, String> headers) {
io.opentelemetry.context.Context parentContext = ParentContextExtractor.extract(headers, this);
SpanBuilder spanBuilder = spanBuilder(parentContext, spanName(awsContext, input), kind);
setAttributes(spanBuilder, awsContext, input);
return withServerSpan(parentContext, spanBuilder.startSpan());
}
public void onOutput(io.opentelemetry.context.Context context, Object output) {
if (output instanceof APIGatewayProxyResponseEvent) {
HttpSpanAttributes.onResponse(
Span.fromContext(context), (APIGatewayProxyResponseEvent) output);
}
}
@Override
protected String getInstrumentationName() {
return "io.opentelemetry.aws-lambda-1.0";
}
}

View File

@ -1,82 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awslambda.v1_0;
import static io.opentelemetry.instrumentation.awslambda.v1_0.MapUtils.emptyIfNull;
import static io.opentelemetry.instrumentation.awslambda.v1_0.MapUtils.lowercaseMap;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.HTTP_METHOD;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.HTTP_STATUS_CODE;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.HTTP_URL;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.HTTP_USER_AGENT;
import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent;
import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyResponseEvent;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Map;
final class HttpSpanAttributes {
static void onRequest(SpanBuilder span, APIGatewayProxyRequestEvent request) {
String httpMethod = request.getHttpMethod();
if (httpMethod != null) {
span.setAttribute(HTTP_METHOD, httpMethod);
}
Map<String, String> headers = lowercaseMap(request.getHeaders());
String userAgent = headers.get("user-agent");
if (userAgent != null) {
span.setAttribute(HTTP_USER_AGENT, userAgent);
}
String url = getHttpUrl(request, headers);
if (!url.isEmpty()) {
span.setAttribute(HTTP_URL, url);
}
}
private static String getHttpUrl(
APIGatewayProxyRequestEvent request, Map<String, String> headers) {
StringBuilder str = new StringBuilder();
String scheme = headers.get("x-forwarded-proto");
if (scheme != null) {
str.append(scheme).append("://");
}
String host = headers.get("host");
if (host != null) {
str.append(host);
}
String path = request.getPath();
if (path != null) {
str.append(path);
}
try {
boolean first = true;
for (Map.Entry<String, String> entry :
emptyIfNull(request.getQueryStringParameters()).entrySet()) {
String key = URLEncoder.encode(entry.getKey(), StandardCharsets.UTF_8.name());
String value = URLEncoder.encode(entry.getValue(), StandardCharsets.UTF_8.name());
str.append(first ? '?' : '&').append(key).append('=').append(value);
first = false;
}
} catch (UnsupportedEncodingException ignored) {
// Ignore
}
return str.toString();
}
static void onResponse(Span span, APIGatewayProxyResponseEvent response) {
Integer statusCode = response.getStatusCode();
if (statusCode != null) {
span.setAttribute(HTTP_STATUS_CODE, statusCode);
}
}
private HttpSpanAttributes() {}
}

View File

@ -8,8 +8,9 @@ package io.opentelemetry.instrumentation.awslambda.v1_0;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.awslambda.v1_0.internal.AwsLambdaFunctionInstrumenter;
import io.opentelemetry.instrumentation.awslambda.v1_0.internal.AwsLambdaFunctionInstrumenterFactory;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import java.time.Duration;
import java.util.Collections;
@ -25,7 +26,7 @@ public abstract class TracingRequestHandler<I, O> implements RequestHandler<I, O
protected static final Duration DEFAULT_FLUSH_TIMEOUT = Duration.ofSeconds(1);
private final AwsLambdaTracer tracer;
private final AwsLambdaFunctionInstrumenter instrumenter;
private final OpenTelemetrySdk openTelemetrySdk;
private final long flushTimeoutNanos;
@ -43,47 +44,55 @@ public abstract class TracingRequestHandler<I, O> implements RequestHandler<I, O
* invocation.
*/
protected TracingRequestHandler(OpenTelemetrySdk openTelemetrySdk, Duration flushTimeout) {
this(openTelemetrySdk, flushTimeout, new AwsLambdaTracer(openTelemetrySdk));
this(
openTelemetrySdk,
flushTimeout,
AwsLambdaFunctionInstrumenterFactory.createInstrumenter(openTelemetrySdk));
}
/**
* Creates a new {@link TracingRequestHandler} which flushes the provided {@link
* OpenTelemetrySdk}, has a timeout of {@code flushTimeout} when flushing at the end of an
* invocation, and traces using the provided {@link AwsLambdaTracer}.
* invocation, and traces using the provided {@link AwsLambdaFunctionInstrumenter}.
*/
protected TracingRequestHandler(
OpenTelemetrySdk openTelemetrySdk, Duration flushTimeout, AwsLambdaTracer tracer) {
OpenTelemetrySdk openTelemetrySdk,
Duration flushTimeout,
AwsLambdaFunctionInstrumenter instrumenter) {
this.openTelemetrySdk = openTelemetrySdk;
this.flushTimeoutNanos = flushTimeout.toNanos();
this.tracer = tracer;
this.instrumenter = instrumenter;
}
private Map<String, String> getHeaders(I input) {
Map<String, String> result = null;
if (input instanceof APIGatewayProxyRequestEvent) {
APIGatewayProxyRequestEvent event = (APIGatewayProxyRequestEvent) input;
return event.getHeaders();
result = event.getHeaders();
}
return Collections.emptyMap();
return result == null ? Collections.emptyMap() : result;
}
@Override
public final O handleRequest(I input, Context context) {
io.opentelemetry.context.Context otelContext =
tracer.startSpan(context, SpanKind.SERVER, input, getHeaders(input));
AwsLambdaRequest request = AwsLambdaRequest.create(context, input, getHeaders(input));
io.opentelemetry.context.Context parentContext = instrumenter.extract(request);
if (!instrumenter.shouldStart(parentContext, request)) {
return doHandleRequest(input, context);
}
io.opentelemetry.context.Context otelContext = instrumenter.start(parentContext, request);
Throwable error = null;
O output = null;
try (Scope ignored = otelContext.makeCurrent()) {
O output = doHandleRequest(input, context);
tracer.onOutput(otelContext, output);
output = doHandleRequest(input, context);
return output;
} catch (Throwable t) {
error = t;
throw t;
} finally {
if (error != null) {
tracer.endExceptionally(otelContext, error);
} else {
tracer.end(otelContext);
}
instrumenter.end(otelContext, request, output, error);
LambdaUtils.forceFlush(openTelemetrySdk, flushTimeoutNanos, TimeUnit.NANOSECONDS);
}
}

View File

@ -7,8 +7,10 @@ package io.opentelemetry.instrumentation.awslambda.v1_0;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestStreamHandler;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.awslambda.v1_0.internal.ApiGatewayProxyRequest;
import io.opentelemetry.instrumentation.awslambda.v1_0.internal.AwsLambdaFunctionInstrumenter;
import io.opentelemetry.instrumentation.awslambda.v1_0.internal.AwsLambdaFunctionInstrumenterFactory;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import java.io.IOException;
import java.io.InputStream;
@ -26,7 +28,7 @@ public abstract class TracingRequestStreamHandler implements RequestStreamHandle
private final OpenTelemetrySdk openTelemetrySdk;
private final long flushTimeoutNanos;
private final AwsLambdaTracer tracer;
private final AwsLambdaFunctionInstrumenter instrumenter;
/**
* Creates a new {@link TracingRequestStreamHandler} which traces using the provided {@link
@ -42,19 +44,24 @@ public abstract class TracingRequestStreamHandler implements RequestStreamHandle
* invocation.
*/
protected TracingRequestStreamHandler(OpenTelemetrySdk openTelemetrySdk, Duration flushTimeout) {
this(openTelemetrySdk, flushTimeout, new AwsLambdaTracer(openTelemetrySdk));
this(
openTelemetrySdk,
flushTimeout,
AwsLambdaFunctionInstrumenterFactory.createInstrumenter(openTelemetrySdk));
}
/**
* Creates a new {@link TracingRequestStreamHandler} which flushes the provided {@link
* OpenTelemetrySdk}, has a timeout of {@code flushTimeout} when flushing at the end of an
* invocation, and traces using the provided {@link AwsLambdaTracer}.
* invocation, and traces using the provided {@link AwsLambdaFunctionInstrumenter}.
*/
protected TracingRequestStreamHandler(
OpenTelemetrySdk openTelemetrySdk, Duration flushTimeout, AwsLambdaTracer tracer) {
OpenTelemetrySdk openTelemetrySdk,
Duration flushTimeout,
AwsLambdaFunctionInstrumenter instrumenter) {
this.openTelemetrySdk = openTelemetrySdk;
this.flushTimeoutNanos = flushTimeout.toNanos();
this.tracer = tracer;
this.instrumenter = instrumenter;
}
@Override
@ -62,16 +69,23 @@ public abstract class TracingRequestStreamHandler implements RequestStreamHandle
throws IOException {
ApiGatewayProxyRequest proxyRequest = ApiGatewayProxyRequest.forStream(input);
io.opentelemetry.context.Context otelContext =
tracer.startSpan(context, SpanKind.SERVER, input, proxyRequest.getHeaders());
AwsLambdaRequest request =
AwsLambdaRequest.create(context, proxyRequest, proxyRequest.getHeaders());
io.opentelemetry.context.Context parentContext = instrumenter.extract(request);
if (!instrumenter.shouldStart(parentContext, request)) {
doHandleRequest(proxyRequest.freshStream(), output, context);
return;
}
io.opentelemetry.context.Context otelContext = instrumenter.start(parentContext, request);
try (Scope ignored = otelContext.makeCurrent()) {
doHandleRequest(
proxyRequest.freshStream(),
new OutputStreamWrapper(output, otelContext, openTelemetrySdk),
new OutputStreamWrapper(output, otelContext, request, openTelemetrySdk),
context);
} catch (Throwable t) {
tracer.endExceptionally(otelContext, t);
instrumenter.end(otelContext, request, null, t);
LambdaUtils.forceFlush(openTelemetrySdk, flushTimeoutNanos, TimeUnit.NANOSECONDS);
throw t;
}
@ -84,14 +98,17 @@ public abstract class TracingRequestStreamHandler implements RequestStreamHandle
private final OutputStream delegate;
private final io.opentelemetry.context.Context otelContext;
private final AwsLambdaRequest request;
private final OpenTelemetrySdk openTelemetrySdk;
private OutputStreamWrapper(
OutputStream delegate,
io.opentelemetry.context.Context otelContext,
AwsLambdaRequest request,
OpenTelemetrySdk openTelemetrySdk) {
this.delegate = delegate;
this.otelContext = otelContext;
this.request = request;
this.openTelemetrySdk = openTelemetrySdk;
}
@ -118,7 +135,7 @@ public abstract class TracingRequestStreamHandler implements RequestStreamHandle
@Override
public void close() throws IOException {
delegate.close();
tracer.end(otelContext);
instrumenter.end(otelContext, request, null, null);
LambdaUtils.forceFlush(openTelemetrySdk, flushTimeoutNanos, TimeUnit.NANOSECONDS);
}
}

View File

@ -8,12 +8,15 @@ package io.opentelemetry.instrumentation.awslambda.v1_0;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.awslambda.v1_0.internal.AwsLambdaFunctionInstrumenter;
import io.opentelemetry.instrumentation.awslambda.v1_0.internal.AwsLambdaSqsInstrumenterFactory;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import java.time.Duration;
public abstract class TracingSqsEventHandler extends TracingRequestHandler<SQSEvent, Void> {
private final AwsLambdaMessageTracer tracer;
private final Instrumenter<SQSEvent, Void> instrumenter;
/**
* Creates a new {@link TracingSqsEventHandler} which traces using the provided {@link
@ -29,36 +32,39 @@ public abstract class TracingSqsEventHandler extends TracingRequestHandler<SQSEv
* invocation.
*/
protected TracingSqsEventHandler(OpenTelemetrySdk openTelemetrySdk, Duration flushTimeout) {
this(openTelemetrySdk, flushTimeout, new AwsLambdaMessageTracer(openTelemetrySdk));
this(
openTelemetrySdk, flushTimeout, AwsLambdaSqsInstrumenterFactory.forEvent(openTelemetrySdk));
}
/**
* Creates a new {@link TracingSqsEventHandler} which flushes the provided {@link
* OpenTelemetrySdk}, has a timeout of {@code flushTimeout} when flushing at the end of an
* invocation, and traces using the provided {@link AwsLambdaTracer}.
* invocation, and traces using the provided {@link AwsLambdaFunctionInstrumenter}.
*/
protected TracingSqsEventHandler(
OpenTelemetrySdk openTelemetrySdk, Duration flushTimeout, AwsLambdaMessageTracer tracer) {
OpenTelemetrySdk openTelemetrySdk,
Duration flushTimeout,
Instrumenter<SQSEvent, Void> instrumenter) {
super(openTelemetrySdk, flushTimeout);
this.tracer = tracer;
this.instrumenter = instrumenter;
}
@Override
public Void doHandleRequest(SQSEvent event, Context context) {
io.opentelemetry.context.Context parentContext = io.opentelemetry.context.Context.current();
io.opentelemetry.context.Context otelContext = tracer.startSpan(parentContext, event);
Throwable error = null;
try (Scope ignored = otelContext.makeCurrent()) {
handleEvent(event, context);
} catch (Throwable t) {
error = t;
throw t;
} finally {
if (error != null) {
tracer.endExceptionally(otelContext, error);
} else {
tracer.end(otelContext);
if (instrumenter.shouldStart(parentContext, event)) {
io.opentelemetry.context.Context otelContext = instrumenter.start(parentContext, event);
Throwable error = null;
try (Scope ignored = otelContext.makeCurrent()) {
handleEvent(event, context);
} catch (Throwable t) {
error = t;
throw t;
} finally {
instrumenter.end(otelContext, event, null, error);
}
} else {
handleEvent(event, context);
}
return null;
}
@ -68,9 +74,4 @@ public abstract class TracingSqsEventHandler extends TracingRequestHandler<SQSEv
* processing of incoming SQS messages.
*/
protected abstract void handleEvent(SQSEvent event, Context context);
// We use in SQS message handler too.
AwsLambdaMessageTracer getTracer() {
return tracer;
}
}

View File

@ -9,17 +9,21 @@ import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.awslambda.v1_0.internal.AwsLambdaSqsInstrumenterFactory;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import java.time.Duration;
public abstract class TracingSqsMessageHandler extends TracingSqsEventHandler {
private final Instrumenter<SQSMessage, Void> messageInstrumenter;
/**
* Creates a new {@link TracingSqsMessageHandler} which traces using the provided {@link
* OpenTelemetrySdk} and has a timeout of 1s when flushing at the end of an invocation.
*/
protected TracingSqsMessageHandler(OpenTelemetrySdk openTelemetrySdk) {
super(openTelemetrySdk);
this(openTelemetrySdk, DEFAULT_FLUSH_TIMEOUT);
}
/**
@ -28,36 +32,60 @@ public abstract class TracingSqsMessageHandler extends TracingSqsEventHandler {
* invocation.
*/
protected TracingSqsMessageHandler(OpenTelemetrySdk openTelemetrySdk, Duration flushTimeout) {
super(openTelemetrySdk, flushTimeout);
this(
openTelemetrySdk, flushTimeout, AwsLambdaSqsInstrumenterFactory.forEvent(openTelemetrySdk));
}
/**
* Creates a new {@link TracingSqsMessageHandler} which flushes the provided {@link
* OpenTelemetrySdk}, has a timeout of {@code flushTimeout} when flushing at the end of an
* invocation, and traces using the provided {@link AwsLambdaTracer}.
* invocation, and instruments {@link SQSEvent} using the provided {@code Instrumenter<SQSEvent,
* Void>}.
*/
protected TracingSqsMessageHandler(
OpenTelemetrySdk openTelemetrySdk, Duration flushTimeout, AwsLambdaMessageTracer tracer) {
super(openTelemetrySdk, flushTimeout, tracer);
OpenTelemetrySdk openTelemetrySdk,
Duration flushTimeout,
Instrumenter<SQSEvent, Void> eventInstrumenter) {
this(
openTelemetrySdk,
flushTimeout,
eventInstrumenter,
AwsLambdaSqsInstrumenterFactory.forMessage(openTelemetrySdk));
}
/**
* Creates a new {@link TracingSqsMessageHandler} which flushes the provided {@link
* OpenTelemetrySdk}, has a timeout of {@code flushTimeout} when flushing at the end of an
* invocation, and traces using the provided {@code Instrumenter<SQSEvent, Void>} and {@code
* Instrumenter<SQSMessage, Void>}.
*/
protected TracingSqsMessageHandler(
OpenTelemetrySdk openTelemetrySdk,
Duration flushTimeout,
Instrumenter<SQSEvent, Void> eventInstrumenter,
Instrumenter<SQSMessage, Void> messageInstrumenter) {
super(openTelemetrySdk, flushTimeout, eventInstrumenter);
this.messageInstrumenter = messageInstrumenter;
}
@Override
protected final void handleEvent(SQSEvent event, Context context) {
io.opentelemetry.context.Context parentContext = io.opentelemetry.context.Context.current();
for (SQSMessage message : event.getRecords()) {
io.opentelemetry.context.Context otelContext = getTracer().startSpan(parentContext, message);
Throwable error = null;
try (Scope ignored = otelContext.makeCurrent()) {
handleMessage(message, context);
} catch (Throwable t) {
error = t;
throw t;
} finally {
if (error != null) {
getTracer().endExceptionally(otelContext, error);
} else {
getTracer().end(otelContext);
if (messageInstrumenter.shouldStart(parentContext, message)) {
io.opentelemetry.context.Context otelContext =
messageInstrumenter.start(parentContext, message);
Throwable error = null;
try (Scope ignored = otelContext.makeCurrent()) {
handleMessage(message, context);
} catch (Throwable t) {
error = t;
throw t;
} finally {
messageInstrumenter.end(otelContext, message, null, error);
}
} else {
handleMessage(message, context);
}
}
}

View File

@ -0,0 +1,93 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awslambda.v1_0.internal;
import static io.opentelemetry.instrumentation.awslambda.v1_0.internal.MapUtils.emptyIfNull;
import static io.opentelemetry.instrumentation.awslambda.v1_0.internal.MapUtils.lowercaseMap;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.FAAS_TRIGGER;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.HTTP_METHOD;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.HTTP_STATUS_CODE;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.HTTP_URL;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.HTTP_USER_AGENT;
import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent;
import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyResponseEvent;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.instrumentation.awslambda.v1_0.AwsLambdaRequest;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import org.checkerframework.checker.nullness.qual.Nullable;
final class ApiGatewayProxyAttributesExtractor
implements AttributesExtractor<AwsLambdaRequest, Object> {
@Override
public void onStart(AttributesBuilder attributes, AwsLambdaRequest request) {
if (request.getInput() instanceof APIGatewayProxyRequestEvent) {
set(attributes, FAAS_TRIGGER, SemanticAttributes.FaasTriggerValues.HTTP);
onRequest(attributes, (APIGatewayProxyRequestEvent) request.getInput());
}
}
void onRequest(AttributesBuilder attributes, APIGatewayProxyRequestEvent request) {
set(attributes, HTTP_METHOD, request.getHttpMethod());
Map<String, String> headers = lowercaseMap(request.getHeaders());
set(attributes, HTTP_USER_AGENT, headers.get("user-agent"));
set(attributes, HTTP_URL, getHttpUrl(request, headers));
}
private static String getHttpUrl(
APIGatewayProxyRequestEvent request, Map<String, String> headers) {
StringBuilder str = new StringBuilder();
String scheme = headers.get("x-forwarded-proto");
if (scheme != null) {
str.append(scheme).append("://");
}
String host = headers.get("host");
if (host != null) {
str.append(host);
}
String path = request.getPath();
if (path != null) {
str.append(path);
}
try {
boolean first = true;
for (Map.Entry<String, String> entry :
emptyIfNull(request.getQueryStringParameters()).entrySet()) {
String key = URLEncoder.encode(entry.getKey(), StandardCharsets.UTF_8.name());
String value = URLEncoder.encode(entry.getValue(), StandardCharsets.UTF_8.name());
str.append(first ? '?' : '&').append(key).append('=').append(value);
first = false;
}
} catch (UnsupportedEncodingException ignored) {
// Ignore
}
return str.length() == 0 ? null : str.toString();
}
@Override
public void onEnd(
AttributesBuilder attributes,
AwsLambdaRequest request,
@Nullable Object response,
@Nullable Throwable error) {
if (response instanceof APIGatewayProxyResponseEvent) {
Integer statusCode = ((APIGatewayProxyResponseEvent) response).getStatusCode();
if (statusCode != null) {
attributes.put(HTTP_STATUS_CODE, statusCode);
}
}
}
ApiGatewayProxyAttributesExtractor() {}
}

View File

@ -3,9 +3,9 @@
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awslambda.v1_0;
package io.opentelemetry.instrumentation.awslambda.v1_0.internal;
import static io.opentelemetry.instrumentation.awslambda.v1_0.HeadersFactory.ofStream;
import static io.opentelemetry.instrumentation.awslambda.v1_0.internal.HeadersFactory.ofStream;
import io.opentelemetry.api.GlobalOpenTelemetry;
import java.io.ByteArrayInputStream;
@ -17,7 +17,7 @@ import java.util.Map;
import org.apache.commons.io.IOUtils;
import org.checkerframework.checker.nullness.qual.Nullable;
abstract class ApiGatewayProxyRequest {
public abstract class ApiGatewayProxyRequest {
// TODO(anuraaga): We should create a RequestFactory type of class instead of evaluating this
// for every request.
@ -34,7 +34,7 @@ abstract class ApiGatewayProxyRequest {
fields.iterator().next());
}
static ApiGatewayProxyRequest forStream(InputStream source) throws IOException {
public static ApiGatewayProxyRequest forStream(InputStream source) throws IOException {
if (noHttpPropagationNeeded()) {
return new NoopRequest(source);
@ -48,12 +48,12 @@ abstract class ApiGatewayProxyRequest {
}
@Nullable
Map<String, String> getHeaders() throws IOException {
public Map<String, String> getHeaders() throws IOException {
Map<String, String> headers = ofStream(freshStream());
return (headers == null ? Collections.emptyMap() : headers);
}
abstract InputStream freshStream() throws IOException;
public abstract InputStream freshStream() throws IOException;
private static class NoopRequest extends ApiGatewayProxyRequest {
@ -64,12 +64,12 @@ abstract class ApiGatewayProxyRequest {
}
@Override
InputStream freshStream() {
public InputStream freshStream() {
return stream;
}
@Override
Map<String, String> getHeaders() {
public Map<String, String> getHeaders() {
return Collections.emptyMap();
}
}
@ -84,7 +84,7 @@ abstract class ApiGatewayProxyRequest {
}
@Override
InputStream freshStream() throws IOException {
public InputStream freshStream() throws IOException {
inputStream.reset();
inputStream.mark(Integer.MAX_VALUE);
@ -101,7 +101,7 @@ abstract class ApiGatewayProxyRequest {
}
@Override
InputStream freshStream() {
public InputStream freshStream() {
return new ByteArrayInputStream(data);
}
}

View File

@ -0,0 +1,86 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awslambda.v1_0.internal;
import static io.opentelemetry.semconv.resource.attributes.ResourceAttributes.CLOUD_ACCOUNT_ID;
import static io.opentelemetry.semconv.resource.attributes.ResourceAttributes.FAAS_ID;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.FAAS_EXECUTION;
import com.amazonaws.services.lambda.runtime.Context;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.instrumentation.awslambda.v1_0.AwsLambdaRequest;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import org.checkerframework.checker.nullness.qual.Nullable;
class AwsLambdaFunctionAttributesExtractor
implements AttributesExtractor<AwsLambdaRequest, Object> {
@Nullable private static final MethodHandle GET_FUNCTION_ARN;
static {
MethodHandles.Lookup lookup = MethodHandles.publicLookup();
MethodHandle getFunctionArn;
try {
getFunctionArn =
lookup.findVirtual(
Context.class, "getInvokedFunctionArn", MethodType.methodType(String.class));
} catch (NoSuchMethodException | IllegalAccessException e) {
getFunctionArn = null;
}
GET_FUNCTION_ARN = getFunctionArn;
}
// cached accountId value
private volatile String accountId;
@Override
public void onStart(AttributesBuilder attributes, AwsLambdaRequest request) {
Context context = request.getAwsContext();
set(attributes, FAAS_EXECUTION, context.getAwsRequestId());
set(attributes, FAAS_ID, getFunctionArn(context));
set(attributes, CLOUD_ACCOUNT_ID, getAccountId(getFunctionArn(context)));
}
@Override
public void onEnd(
AttributesBuilder attributes,
AwsLambdaRequest request,
@Nullable Object response,
@Nullable Throwable error) {}
@Nullable
private static String getFunctionArn(Context context) {
if (GET_FUNCTION_ARN == null) {
return null;
}
try {
return (String) GET_FUNCTION_ARN.invoke(context);
} catch (Throwable throwable) {
return null;
}
}
@Nullable
private String getAccountId(@Nullable String arn) {
if (arn == null) {
return null;
}
if (accountId == null) {
synchronized (this) {
if (accountId == null) {
String[] arnParts = arn.split(":");
if (arnParts.length >= 5) {
accountId = arnParts[4];
}
}
}
}
return accountId;
}
}

View File

@ -0,0 +1,56 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awslambda.v1_0.internal;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.TextMapGetter;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.internal.ContextPropagationDebug;
import io.opentelemetry.instrumentation.awslambda.v1_0.AwsLambdaRequest;
import java.util.Map;
import org.checkerframework.checker.nullness.qual.Nullable;
public class AwsLambdaFunctionInstrumenter {
private final OpenTelemetry openTelemetry;
final Instrumenter<AwsLambdaRequest, Object> instrumenter;
AwsLambdaFunctionInstrumenter(
OpenTelemetry openTelemetry, Instrumenter<AwsLambdaRequest, Object> instrumenter) {
this.openTelemetry = openTelemetry;
this.instrumenter = instrumenter;
}
public boolean shouldStart(Context parentContext, AwsLambdaRequest input) {
return instrumenter.shouldStart(parentContext, input);
}
public Context start(Context parentContext, AwsLambdaRequest input) {
return instrumenter.start(parentContext, input);
}
public void end(
Context context,
AwsLambdaRequest input,
@Nullable Object response,
@Nullable Throwable error) {
instrumenter.end(context, input, response, error);
}
public Context extract(AwsLambdaRequest input) {
return ParentContextExtractor.extract(input.getHeaders(), this);
}
public Context extract(Map<String, String> headers, TextMapGetter<Map<String, String>> getter) {
ContextPropagationDebug.debugContextLeakIfEnabled();
return openTelemetry
.getPropagators()
.getTextMapPropagator()
.extract(Context.root(), headers, getter);
}
}

View File

@ -0,0 +1,36 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awslambda.v1_0.internal;
import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import io.opentelemetry.instrumentation.awslambda.v1_0.AwsLambdaRequest;
public class AwsLambdaFunctionInstrumenterFactory {
public static AwsLambdaFunctionInstrumenter createInstrumenter(OpenTelemetry openTelemetry) {
return new AwsLambdaFunctionInstrumenter(
openTelemetry,
Instrumenter.newBuilder(
openTelemetry,
"io.opentelemetry.aws-lambda-1.0",
AwsLambdaFunctionInstrumenterFactory::spanName)
.addAttributesExtractors(
new AwsLambdaFunctionAttributesExtractor(),
new ApiGatewayProxyAttributesExtractor())
.newInstrumenter(SpanKindExtractor.alwaysServer()));
}
private static String spanName(AwsLambdaRequest input) {
String name = null;
if (input.getInput() instanceof APIGatewayProxyRequestEvent) {
name = ((APIGatewayProxyRequestEvent) input.getInput()).getResource();
}
return name == null ? input.getAwsContext().getFunctionName() : name;
}
}

View File

@ -0,0 +1,54 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awslambda.v1_0.internal;
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
public class AwsLambdaSqsInstrumenterFactory {
public static Instrumenter<SQSEvent, Void> forEvent(OpenTelemetry openTelemetry) {
return Instrumenter.<SQSEvent, Void>newBuilder(
openTelemetry,
"io.opentelemetry.aws-lambda-1.0",
AwsLambdaSqsInstrumenterFactory::spanName)
.addAttributesExtractors(new SqsEventAttributesExtractor())
.addSpanLinksExtractor(new SqsEventSpanLinksExtractor())
.newInstrumenter(SpanKindExtractor.alwaysConsumer());
}
public static Instrumenter<SQSMessage, Void> forMessage(OpenTelemetry openTelemetry) {
return Instrumenter.<SQSMessage, Void>newBuilder(
openTelemetry,
"io.opentelemetry.aws-lambda-1.0",
message -> message.getEventSource() + " process")
.addAttributesExtractors(new SqsMessageAttributesExtractor())
.addSpanLinksExtractor(new SqsMessageSpanLinksExtractor())
.newInstrumenter(SpanKindExtractor.alwaysConsumer());
}
private static String spanName(SQSEvent event) {
String source = "multiple_sources";
if (!event.getRecords().isEmpty()) {
String messageSource = event.getRecords().get(0).getEventSource();
for (int i = 1; i < event.getRecords().size(); i++) {
SQSMessage message = event.getRecords().get(i);
if (!message.getEventSource().equals(messageSource)) {
messageSource = null;
break;
}
}
if (messageSource != null) {
source = messageSource;
}
}
return source + " process";
}
}

View File

@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awslambda.v1_0;
package io.opentelemetry.instrumentation.awslambda.v1_0.internal;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;

View File

@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awslambda.v1_0;
package io.opentelemetry.instrumentation.awslambda.v1_0.internal;
import java.util.Collections;
import java.util.Locale;

View File

@ -3,25 +3,24 @@
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awslambda.v1_0;
package io.opentelemetry.instrumentation.awslambda.v1_0.internal;
import static io.opentelemetry.instrumentation.awslambda.v1_0.MapUtils.lowercaseMap;
import static io.opentelemetry.instrumentation.awslambda.v1_0.internal.MapUtils.lowercaseMap;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.TextMapGetter;
import io.opentelemetry.extension.aws.AwsXrayPropagator;
import io.opentelemetry.instrumentation.api.tracer.BaseTracer;
import java.util.Collections;
import java.util.Locale;
import java.util.Map;
public final class ParentContextExtractor {
final class ParentContextExtractor {
private static final String AWS_TRACE_HEADER_ENV_KEY = "_X_AMZN_TRACE_ID";
static Context extract(Map<String, String> headers, BaseTracer tracer) {
static Context extract(Map<String, String> headers, AwsLambdaFunctionInstrumenter instrumenter) {
Context parentContext = null;
String parentTraceHeader = System.getenv(AWS_TRACE_HEADER_ENV_KEY);
if (parentTraceHeader != null) {
@ -29,7 +28,7 @@ public final class ParentContextExtractor {
}
if (!isValidAndSampled(parentContext)) {
// try http
parentContext = fromHttpHeaders(headers, tracer);
parentContext = fromHttpHeaders(headers, instrumenter);
}
return parentContext;
}
@ -43,8 +42,9 @@ public final class ParentContextExtractor {
return (parentSpanContext.isValid() && parentSpanContext.isSampled());
}
private static Context fromHttpHeaders(Map<String, String> headers, BaseTracer tracer) {
return tracer.extract(lowercaseMap(headers), MapGetter.INSTANCE);
private static Context fromHttpHeaders(
Map<String, String> headers, AwsLambdaFunctionInstrumenter instrumenter) {
return instrumenter.extract(lowercaseMap(headers), MapGetter.INSTANCE);
}
// lower-case map getter used for extraction

View File

@ -0,0 +1,27 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awslambda.v1_0.internal;
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import org.checkerframework.checker.nullness.qual.Nullable;
class SqsEventAttributesExtractor implements AttributesExtractor<SQSEvent, Void> {
@Override
public void onStart(AttributesBuilder attributes, SQSEvent event) {
attributes.put(SemanticAttributes.MESSAGING_SYSTEM, "AmazonSQS");
attributes.put(SemanticAttributes.MESSAGING_OPERATION, "process");
}
@Override
public void onEnd(
AttributesBuilder attributes,
SQSEvent event,
@Nullable Void unused,
@Nullable Throwable error) {}
}

View File

@ -0,0 +1,23 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awslambda.v1_0.internal;
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksExtractor;
class SqsEventSpanLinksExtractor implements SpanLinksExtractor<SQSEvent> {
private static final SqsMessageSpanLinksExtractor messageSpanLinksExtractor =
new SqsMessageSpanLinksExtractor();
@Override
public void extract(SpanLinksBuilder spanLinks, Context parentContext, SQSEvent event) {
for (SQSEvent.SQSMessage message : event.getRecords()) {
messageSpanLinksExtractor.extract(spanLinks, parentContext, message);
}
}
}

View File

@ -0,0 +1,29 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awslambda.v1_0.internal;
import com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import org.checkerframework.checker.nullness.qual.Nullable;
class SqsMessageAttributesExtractor implements AttributesExtractor<SQSMessage, Void> {
@Override
public void onStart(AttributesBuilder attributes, SQSMessage message) {
attributes.put(SemanticAttributes.MESSAGING_SYSTEM, "AmazonSQS");
attributes.put(SemanticAttributes.MESSAGING_OPERATION, "process");
attributes.put(SemanticAttributes.MESSAGING_MESSAGE_ID, message.getMessageId());
attributes.put(SemanticAttributes.MESSAGING_DESTINATION, message.getEventSource());
}
@Override
public void onEnd(
AttributesBuilder attributes,
SQSMessage message,
@Nullable Void unused,
@Nullable Throwable error) {}
}

View File

@ -0,0 +1,29 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awslambda.v1_0.internal;
import com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksExtractor;
class SqsMessageSpanLinksExtractor implements SpanLinksExtractor<SQSMessage> {
private static final String AWS_TRACE_HEADER_SQS_ATTRIBUTE_KEY = "AWSTraceHeader";
@Override
public void extract(SpanLinksBuilder spanLinks, Context parentContext, SQSMessage message) {
String parentHeader = message.getAttributes().get(AWS_TRACE_HEADER_SQS_ATTRIBUTE_KEY);
if (parentHeader != null) {
SpanContext parentCtx =
Span.fromContext(ParentContextExtractor.fromXrayHeader(parentHeader)).getSpanContext();
if (parentCtx.isValid()) {
spanLinks.addLink(parentCtx);
}
}
}
}

View File

@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awslambda.v1_0;
package io.opentelemetry.instrumentation.awslambda.v1_0.internal;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;

View File

@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awslambda.v1_0;
package io.opentelemetry.instrumentation.awslambda.v1_0.internal;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.data.MapEntry.entry;

View File

@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awslambda.v1_0;
package io.opentelemetry.instrumentation.awslambda.v1_0.internal;
import static org.assertj.core.api.Assertions.assertThat;
@ -30,7 +30,8 @@ public class ParentContextExtractorTest {
private static final OpenTelemetry OTEL =
OpenTelemetry.propagating(ContextPropagators.create(B3Propagator.injectingSingleHeader()));
private static final AwsLambdaTracer TRACER = new AwsLambdaTracer(OTEL);
private static final AwsLambdaFunctionInstrumenter INSTRUMENTER =
AwsLambdaFunctionInstrumenterFactory.createInstrumenter(OTEL);
@Test
public void shouldUseHttpIfAwsParentNotSampled() {
@ -48,7 +49,7 @@ public class ParentContextExtractorTest {
"Root=1-8a3c60f7-d188f8fa79d48a391a778fa6;Parent=0000000000000456;Sampled=0");
// when
Context context = ParentContextExtractor.extract(headers, TRACER);
Context context = ParentContextExtractor.extract(headers, INSTRUMENTER);
// then
Span span = Span.fromContext(context);
SpanContext spanContext = span.getSpanContext();
@ -74,7 +75,7 @@ public class ParentContextExtractorTest {
"Root=1-8a3c60f7-d188f8fa79d48a391a778fa6;Parent=0000000000000456;Sampled=1");
// when
Context context = ParentContextExtractor.extract(headers, TRACER);
Context context = ParentContextExtractor.extract(headers, INSTRUMENTER);
// then
Span span = Span.fromContext(context);
SpanContext spanContext = span.getSpanContext();
@ -97,7 +98,7 @@ public class ParentContextExtractorTest {
"true");
// when
Context context = ParentContextExtractor.extract(headers, TRACER);
Context context = ParentContextExtractor.extract(headers, INSTRUMENTER);
// then
Span span = Span.fromContext(context);
SpanContext spanContext = span.getSpanContext();