Add HTTP attributes support for AWS lambda function wrapper (#1780)
This commit is contained in:
parent
71f1a79100
commit
fff6eb7004
|
@ -61,7 +61,7 @@ public class AwsLambdaRequestHandlerInstrumentation implements TypeInstrumentati
|
|||
@Advice.Local("otelFunctionScope") Scope functionScope,
|
||||
@Advice.Local("otelMessageSpan") Span messageSpan,
|
||||
@Advice.Local("otelMessageScope") Scope messageScope) {
|
||||
functionSpan = functionTracer().startSpan(context, Kind.SERVER);
|
||||
functionSpan = functionTracer().startSpan(context, arg, Kind.SERVER);
|
||||
functionScope = functionTracer().startScope(functionSpan);
|
||||
if (arg instanceof SQSEvent) {
|
||||
messageSpan = messageTracer().startSpan(context, (SQSEvent) arg);
|
||||
|
|
|
@ -5,13 +5,18 @@
|
|||
|
||||
package io.opentelemetry.instrumentation.awslambda.v1_0;
|
||||
|
||||
import static io.opentelemetry.api.trace.attributes.SemanticAttributes.FAAS_EXECUTION;
|
||||
import static io.opentelemetry.api.trace.attributes.SemanticAttributes.FAAS_TRIGGER;
|
||||
|
||||
import com.amazonaws.services.lambda.runtime.Context;
|
||||
import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent;
|
||||
import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyResponseEvent;
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.api.trace.Span.Kind;
|
||||
import io.opentelemetry.api.trace.SpanBuilder;
|
||||
import io.opentelemetry.api.trace.SpanContext;
|
||||
import io.opentelemetry.api.trace.Tracer;
|
||||
import io.opentelemetry.api.trace.attributes.SemanticAttributes;
|
||||
import io.opentelemetry.api.trace.attributes.SemanticAttributes.FaasTriggerValues;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import io.opentelemetry.instrumentation.api.tracer.BaseTracer;
|
||||
import java.util.Collections;
|
||||
|
@ -21,6 +26,8 @@ public class AwsLambdaTracer extends BaseTracer {
|
|||
|
||||
private static final String AWS_TRACE_HEADER_ENV_KEY = "_X_AMZN_TRACE_ID";
|
||||
|
||||
private final HttpSpanAttributes httpSpanAttributes = new HttpSpanAttributes();
|
||||
|
||||
public AwsLambdaTracer() {}
|
||||
|
||||
public AwsLambdaTracer(Tracer tracer) {
|
||||
|
@ -51,9 +58,9 @@ public class AwsLambdaTracer extends BaseTracer {
|
|||
return parentContext;
|
||||
}
|
||||
|
||||
SpanBuilder createSpan(Context context, Map<String, String> headers) {
|
||||
SpanBuilder span = tracer.spanBuilder(context.getFunctionName());
|
||||
span.setAttribute(SemanticAttributes.FAAS_EXECUTION, context.getAwsRequestId());
|
||||
private SpanBuilder createSpan(Context context, Object input, Map<String, String> headers) {
|
||||
SpanBuilder span = tracer.spanBuilder(spanName(context, input));
|
||||
setAttributes(span, context, input);
|
||||
io.opentelemetry.context.Context parent = parent(headers);
|
||||
if (parent != null) {
|
||||
span.setParent(parent);
|
||||
|
@ -61,12 +68,28 @@ public class AwsLambdaTracer extends BaseTracer {
|
|||
return span;
|
||||
}
|
||||
|
||||
public Span startSpan(Context context, Kind kind, Map<String, String> headers) {
|
||||
return createSpan(context, headers).setSpanKind(kind).startSpan();
|
||||
private void setAttributes(SpanBuilder span, Context context, Object input) {
|
||||
span.setAttribute(FAAS_EXECUTION, context.getAwsRequestId());
|
||||
if (input instanceof APIGatewayProxyRequestEvent) {
|
||||
span.setAttribute(FAAS_TRIGGER, FaasTriggerValues.HTTP.getValue());
|
||||
httpSpanAttributes.onRequest(span, (APIGatewayProxyRequestEvent) input);
|
||||
}
|
||||
}
|
||||
|
||||
public Span startSpan(Context context, Kind kind) {
|
||||
return createSpan(context, Collections.emptyMap()).setSpanKind(kind).startSpan();
|
||||
private String spanName(Context context, Object input) {
|
||||
String name = null;
|
||||
if (input instanceof APIGatewayProxyRequestEvent) {
|
||||
name = ((APIGatewayProxyRequestEvent) input).getResource();
|
||||
}
|
||||
return name == null ? context.getFunctionName() : name;
|
||||
}
|
||||
|
||||
public Span startSpan(Context context, Kind kind, Object input, Map<String, String> headers) {
|
||||
return createSpan(context, input, headers).setSpanKind(kind).startSpan();
|
||||
}
|
||||
|
||||
public Span startSpan(Context context, Object input, Kind kind) {
|
||||
return createSpan(context, input, Collections.emptyMap()).setSpanKind(kind).startSpan();
|
||||
}
|
||||
|
||||
/** Creates new scoped context with the given span. */
|
||||
|
@ -78,6 +101,12 @@ public class AwsLambdaTracer extends BaseTracer {
|
|||
return newContext.makeCurrent();
|
||||
}
|
||||
|
||||
public void onOutput(Span span, Object output) {
|
||||
if (output instanceof APIGatewayProxyResponseEvent) {
|
||||
httpSpanAttributes.onResponse(span, (APIGatewayProxyResponseEvent) output);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String getInstrumentationName() {
|
||||
return "io.opentelemetry.aws-lambda";
|
||||
|
|
|
@ -0,0 +1,78 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.instrumentation.awslambda.v1_0;
|
||||
|
||||
import static io.opentelemetry.api.trace.attributes.SemanticAttributes.HTTP_METHOD;
|
||||
import static io.opentelemetry.api.trace.attributes.SemanticAttributes.HTTP_STATUS_CODE;
|
||||
import static io.opentelemetry.api.trace.attributes.SemanticAttributes.HTTP_URL;
|
||||
import static io.opentelemetry.api.trace.attributes.SemanticAttributes.HTTP_USER_AGENT;
|
||||
import static io.opentelemetry.instrumentation.awslambda.v1_0.MapUtils.emptyIfNull;
|
||||
import static io.opentelemetry.instrumentation.awslambda.v1_0.MapUtils.lowercaseMap;
|
||||
|
||||
import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyRequestEvent;
|
||||
import com.amazonaws.services.lambda.runtime.events.APIGatewayProxyResponseEvent;
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.api.trace.SpanBuilder;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.net.URLEncoder;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.Map;
|
||||
|
||||
final class HttpSpanAttributes {
|
||||
void onRequest(SpanBuilder span, APIGatewayProxyRequestEvent request) {
|
||||
String httpMethod = request.getHttpMethod();
|
||||
if (httpMethod != null) {
|
||||
span.setAttribute(HTTP_METHOD, httpMethod);
|
||||
}
|
||||
|
||||
Map<String, String> headers = lowercaseMap(request.getHeaders());
|
||||
String userAgent = headers.get("user-agent");
|
||||
if (userAgent != null) {
|
||||
span.setAttribute(HTTP_USER_AGENT, userAgent);
|
||||
}
|
||||
String url = getHttpUrl(request, headers);
|
||||
if (!url.isEmpty()) {
|
||||
span.setAttribute(HTTP_URL, url);
|
||||
}
|
||||
}
|
||||
|
||||
private String getHttpUrl(APIGatewayProxyRequestEvent request, Map<String, String> headers) {
|
||||
StringBuilder str = new StringBuilder();
|
||||
|
||||
String scheme = headers.get("x-forwarded-proto");
|
||||
if (scheme != null) {
|
||||
str.append(scheme).append("://");
|
||||
}
|
||||
String host = headers.get("host");
|
||||
if (host != null) {
|
||||
str.append(host);
|
||||
}
|
||||
String path = request.getPath();
|
||||
if (path != null) {
|
||||
str.append(path);
|
||||
}
|
||||
|
||||
try {
|
||||
boolean first = true;
|
||||
for (Map.Entry<String, String> entry :
|
||||
emptyIfNull(request.getQueryStringParameters()).entrySet()) {
|
||||
String key = URLEncoder.encode(entry.getKey(), StandardCharsets.UTF_8.name());
|
||||
String value = URLEncoder.encode(entry.getValue(), StandardCharsets.UTF_8.name());
|
||||
str.append(first ? '?' : '&').append(key).append('=').append(value);
|
||||
first = false;
|
||||
}
|
||||
} catch (UnsupportedEncodingException ignored) {
|
||||
}
|
||||
return str.toString();
|
||||
}
|
||||
|
||||
void onResponse(Span span, APIGatewayProxyResponseEvent response) {
|
||||
Integer statusCode = response.getStatusCode();
|
||||
if (statusCode != null) {
|
||||
span.setAttribute(HTTP_STATUS_CODE, statusCode);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,24 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.instrumentation.awslambda.v1_0;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
final class MapUtils {
|
||||
static Map<String, String> lowercaseMap(Map<String, String> source) {
|
||||
return emptyIfNull(source).entrySet().stream()
|
||||
.filter(e -> e.getKey() != null)
|
||||
.collect(Collectors.toMap(e -> e.getKey().toLowerCase(), Map.Entry::getValue));
|
||||
}
|
||||
|
||||
static Map<String, String> emptyIfNull(Map<String, String> map) {
|
||||
return map == null ? Collections.emptyMap() : map;
|
||||
}
|
||||
|
||||
private MapUtils() {}
|
||||
}
|
|
@ -5,13 +5,13 @@
|
|||
|
||||
package io.opentelemetry.instrumentation.awslambda.v1_0;
|
||||
|
||||
import static io.opentelemetry.instrumentation.awslambda.v1_0.MapUtils.lowercaseMap;
|
||||
|
||||
import io.opentelemetry.api.OpenTelemetry;
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.context.propagation.TextMapPropagator;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class ParentContextExtractor {
|
||||
|
||||
|
@ -22,12 +22,6 @@ public class ParentContextExtractor {
|
|||
io.opentelemetry.context.Context.current(), lowercaseMap(headers), MapGetter.INSTANCE);
|
||||
}
|
||||
|
||||
private static Map<String, String> lowercaseMap(Map<String, String> source) {
|
||||
return source.entrySet().stream()
|
||||
.filter(e -> e.getKey() != null)
|
||||
.collect(Collectors.toMap(e -> e.getKey().toLowerCase(), Entry::getValue));
|
||||
}
|
||||
|
||||
// lower-case map getter used for extraction
|
||||
static final String AWS_TRACE_HEADER_PROPAGATOR_KEY = "x-amzn-trace-id";
|
||||
|
||||
|
|
|
@ -56,10 +56,12 @@ public abstract class TracingRequestHandler<I, O> implements RequestHandler<I, O
|
|||
|
||||
@Override
|
||||
public final O handleRequest(I input, Context context) {
|
||||
Span span = tracer.startSpan(context, Kind.SERVER, getHeaders(input));
|
||||
Span span = tracer.startSpan(context, Kind.SERVER, input, getHeaders(input));
|
||||
Throwable error = null;
|
||||
try (Scope ignored = tracer.startScope(span)) {
|
||||
return doHandleRequest(input, context);
|
||||
O output = doHandleRequest(input, context);
|
||||
tracer.onOutput(span, output);
|
||||
return output;
|
||||
} catch (Throwable t) {
|
||||
error = t;
|
||||
throw t;
|
||||
|
|
|
@ -54,7 +54,7 @@ public abstract class TracingRequestStreamHandler implements RequestStreamHandle
|
|||
throws IOException {
|
||||
|
||||
ApiGatewayProxyRequest proxyRequest = ApiGatewayProxyRequest.forStream(input);
|
||||
Span span = tracer.startSpan(context, Kind.SERVER, proxyRequest.getHeaders());
|
||||
Span span = tracer.startSpan(context, Kind.SERVER, input, proxyRequest.getHeaders());
|
||||
|
||||
try (Scope ignored = tracer.startScope(span)) {
|
||||
doHandleRequest(proxyRequest.freshStream(), new OutputStreamWrapper(output, span), context);
|
||||
|
|
|
@ -24,7 +24,11 @@ class TracingRequestApiGatewayWrapperTest extends TracingRequestWrapperTestBase
|
|||
@Override
|
||||
APIGatewayProxyResponseEvent handleRequest(APIGatewayProxyRequestEvent input, Context context) {
|
||||
if (input.getBody() == "hello") {
|
||||
return new APIGatewayProxyResponseEvent().withBody("world")
|
||||
return new APIGatewayProxyResponseEvent()
|
||||
.withStatusCode(200)
|
||||
.withBody("world")
|
||||
} else if (input.getBody() == "empty") {
|
||||
return new APIGatewayProxyResponseEvent()
|
||||
}
|
||||
throw new RuntimeException("bad request")
|
||||
}
|
||||
|
@ -43,7 +47,20 @@ class TracingRequestApiGatewayWrapperTest extends TracingRequestWrapperTestBase
|
|||
def "handler traced with trace propagation"() {
|
||||
given:
|
||||
setLambda(TestApiGatewayHandler.getName() + "::handleRequest", TracingRequestApiGatewayWrapper)
|
||||
def input = new APIGatewayProxyRequestEvent().withBody("hello").withHeaders(propagationHeaders())
|
||||
|
||||
def headers = ImmutableMap.builder()
|
||||
.putAll(propagationHeaders())
|
||||
.put("User-Agent", "Test Client")
|
||||
.put("host", "localhost:123")
|
||||
.put("X-FORWARDED-PROTO", "http")
|
||||
.build()
|
||||
def input = new APIGatewayProxyRequestEvent()
|
||||
.withHttpMethod("GET")
|
||||
.withResource("/hello/{param}")
|
||||
.withPath("/hello/world")
|
||||
.withBody("hello")
|
||||
.withQueryStringParamters(["a": "b", "c": "d"])
|
||||
.withHeaders(headers)
|
||||
|
||||
when:
|
||||
APIGatewayProxyResponseEvent result = wrapper.handleRequest(input, context)
|
||||
|
@ -55,10 +72,41 @@ class TracingRequestApiGatewayWrapperTest extends TracingRequestWrapperTestBase
|
|||
span(0) {
|
||||
parentSpanId("0000000000000456")
|
||||
traceId("4fd0b6131f19f39af59518d127b0cafe")
|
||||
name("/hello/{param}")
|
||||
kind SERVER
|
||||
attributes {
|
||||
"$SemanticAttributes.FAAS_EXECUTION.key" "1-22-333"
|
||||
"$SemanticAttributes.FAAS_TRIGGER.key" "http"
|
||||
"$SemanticAttributes.HTTP_METHOD.key" "GET"
|
||||
"$SemanticAttributes.HTTP_USER_AGENT.key" "Test Client"
|
||||
"$SemanticAttributes.HTTP_URL.key" "http://localhost:123/hello/world?a=b&c=d"
|
||||
"$SemanticAttributes.HTTP_STATUS_CODE.key" 200
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
def "test empty request & response"() {
|
||||
given:
|
||||
setLambda(TestApiGatewayHandler.getName() + "::handleRequest", TracingRequestApiGatewayWrapper)
|
||||
|
||||
def input = new APIGatewayProxyRequestEvent()
|
||||
.withBody("empty")
|
||||
|
||||
when:
|
||||
APIGatewayProxyResponseEvent result = wrapper.handleRequest(input, context)
|
||||
|
||||
then:
|
||||
result.body == null
|
||||
assertTraces(1) {
|
||||
trace(0, 1) {
|
||||
span(0) {
|
||||
name("my_function")
|
||||
kind SERVER
|
||||
attributes {
|
||||
"${SemanticAttributes.FAAS_EXECUTION.key}" "1-22-333"
|
||||
"$SemanticAttributes.FAAS_EXECUTION.key" "1-22-333"
|
||||
"$SemanticAttributes.FAAS_TRIGGER.key" "http"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue