AWS lambda - configurable flush timeout (#1960)

This commit is contained in:
Jakub Wach 2021-01-11 19:50:00 +01:00 committed by GitHub
parent 4bf86bec74
commit 179b2257f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 77 additions and 8 deletions

View File

@ -3,9 +3,11 @@
This package contains libraries to help instrument AWS lambda functions in your code. This package contains libraries to help instrument AWS lambda functions in your code.
## Using wrappers ## Using wrappers
To use the instrumentation, configure `OTEL_LAMBDA_HANDLER` env property to your lambda handler method in following format `package.ClassName::methodName` To use the instrumentation, configure `OTEL_INSTRUMENTATION_AWS_LAMBDA_HANDLER` env property to your lambda handler method in following format `package.ClassName::methodName`
and use one of wrappers as your lambda `Handler`. and use one of wrappers as your lambda `Handler`.
In order to configure a span flush timeout (default is set to 1 second), please configure `OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TIMEOUT` env property. The value is in seconds.
Available wrappers: Available wrappers:
- `io.opentelemetry.instrumentation.awslambda.v1_0.TracingRequestWrapper` - for wrapping regular handlers (implementing `RequestHandler`) - `io.opentelemetry.instrumentation.awslambda.v1_0.TracingRequestWrapper` - for wrapping regular handlers (implementing `RequestHandler`)
- `io.opentelemetry.instrumentation.awslambda.v1_0.TracingRequestApiGatewayWrapper` - for wrapping regular handlers (implementing `RequestHandler`) proxied through API Gateway, enabling HTTP context propagation - `io.opentelemetry.instrumentation.awslambda.v1_0.TracingRequestApiGatewayWrapper` - for wrapping regular handlers (implementing `RequestHandler`) proxied through API Gateway, enabling HTTP context propagation

View File

@ -24,11 +24,21 @@ import java.util.concurrent.TimeUnit;
*/ */
public abstract class TracingRequestHandler<I, O> implements RequestHandler<I, O> { public abstract class TracingRequestHandler<I, O> implements RequestHandler<I, O> {
private static final long DEFAULT_FLUSH_TIMEOUT_SECONDS = 1;
private final AwsLambdaTracer tracer; private final AwsLambdaTracer tracer;
private final long flushTimeout;
/** Creates a new {@link TracingRequestHandler} which traces using the default {@link Tracer}. */
protected TracingRequestHandler(long flushTimeout) {
this.tracer = new AwsLambdaTracer();
this.flushTimeout = flushTimeout;
}
/** Creates a new {@link TracingRequestHandler} which traces using the default {@link Tracer}. */ /** Creates a new {@link TracingRequestHandler} which traces using the default {@link Tracer}. */
protected TracingRequestHandler() { protected TracingRequestHandler() {
this.tracer = new AwsLambdaTracer(); this.tracer = new AwsLambdaTracer();
this.flushTimeout = DEFAULT_FLUSH_TIMEOUT_SECONDS;
} }
/** /**
@ -36,6 +46,7 @@ public abstract class TracingRequestHandler<I, O> implements RequestHandler<I, O
*/ */
protected TracingRequestHandler(Tracer tracer) { protected TracingRequestHandler(Tracer tracer) {
this.tracer = new AwsLambdaTracer(tracer); this.tracer = new AwsLambdaTracer(tracer);
this.flushTimeout = DEFAULT_FLUSH_TIMEOUT_SECONDS;
} }
/** /**
@ -44,6 +55,7 @@ public abstract class TracingRequestHandler<I, O> implements RequestHandler<I, O
*/ */
protected TracingRequestHandler(AwsLambdaTracer tracer) { protected TracingRequestHandler(AwsLambdaTracer tracer) {
this.tracer = tracer; this.tracer = tracer;
this.flushTimeout = DEFAULT_FLUSH_TIMEOUT_SECONDS;
} }
private Map<String, String> getHeaders(I input) { private Map<String, String> getHeaders(I input) {
@ -71,7 +83,9 @@ public abstract class TracingRequestHandler<I, O> implements RequestHandler<I, O
} else { } else {
tracer.end(span); tracer.end(span);
} }
OpenTelemetrySdk.getGlobalTracerManagement().forceFlush().join(1, TimeUnit.SECONDS); OpenTelemetrySdk.getGlobalTracerManagement()
.forceFlush()
.join(flushTimeout, TimeUnit.SECONDS);
} }
} }

View File

@ -23,7 +23,19 @@ import java.util.concurrent.TimeUnit;
*/ */
public abstract class TracingRequestStreamHandler implements RequestStreamHandler { public abstract class TracingRequestStreamHandler implements RequestStreamHandler {
private static final long DEFAULT_FLUSH_TIMEOUT_SECONDS = 1;
private final AwsLambdaTracer tracer; private final AwsLambdaTracer tracer;
private final long flushTimeout;
/**
* Creates a new {@link TracingRequestStreamHandler} which traces using the default {@link
* Tracer}.
*/
protected TracingRequestStreamHandler(long flushTimeout) {
this.tracer = new AwsLambdaTracer();
this.flushTimeout = flushTimeout;
}
/** /**
* Creates a new {@link TracingRequestStreamHandler} which traces using the default {@link * Creates a new {@link TracingRequestStreamHandler} which traces using the default {@link
@ -31,6 +43,7 @@ public abstract class TracingRequestStreamHandler implements RequestStreamHandle
*/ */
protected TracingRequestStreamHandler() { protected TracingRequestStreamHandler() {
this.tracer = new AwsLambdaTracer(); this.tracer = new AwsLambdaTracer();
this.flushTimeout = DEFAULT_FLUSH_TIMEOUT_SECONDS;
} }
/** /**
@ -39,6 +52,7 @@ public abstract class TracingRequestStreamHandler implements RequestStreamHandle
*/ */
protected TracingRequestStreamHandler(Tracer tracer) { protected TracingRequestStreamHandler(Tracer tracer) {
this.tracer = new AwsLambdaTracer(tracer); this.tracer = new AwsLambdaTracer(tracer);
this.flushTimeout = DEFAULT_FLUSH_TIMEOUT_SECONDS;
} }
/** /**
@ -47,6 +61,7 @@ public abstract class TracingRequestStreamHandler implements RequestStreamHandle
*/ */
protected TracingRequestStreamHandler(AwsLambdaTracer tracer) { protected TracingRequestStreamHandler(AwsLambdaTracer tracer) {
this.tracer = tracer; this.tracer = tracer;
this.flushTimeout = DEFAULT_FLUSH_TIMEOUT_SECONDS;
} }
@Override @Override
@ -60,7 +75,9 @@ public abstract class TracingRequestStreamHandler implements RequestStreamHandle
doHandleRequest(proxyRequest.freshStream(), new OutputStreamWrapper(output, span), context); doHandleRequest(proxyRequest.freshStream(), new OutputStreamWrapper(output, span), context);
} catch (Throwable t) { } catch (Throwable t) {
tracer.endExceptionally(span, t); tracer.endExceptionally(span, t);
OpenTelemetrySdk.getGlobalTracerManagement().forceFlush().join(1, TimeUnit.SECONDS); OpenTelemetrySdk.getGlobalTracerManagement()
.forceFlush()
.join(flushTimeout, TimeUnit.SECONDS);
throw t; throw t;
} }
} }

View File

@ -13,15 +13,19 @@ import java.io.OutputStream;
/** /**
* Wrapper for {@link TracingRequestStreamHandler}. Allows for wrapping a regular lambda, enabling * Wrapper for {@link TracingRequestStreamHandler}. Allows for wrapping a regular lambda, enabling
* single span tracing. Main lambda class should be configured as env property OTEL_LAMBDA_HANDLER * single span tracing. Main lambda class should be configured as env property
* in package.ClassName::methodName format. Lambda class must implement {@link * OTEL_INSTRUMENTATION_AWS_LAMBDA_HANDLER in package.ClassName::methodName format. Lambda class
* RequestStreamHandler}. * must implement {@link RequestStreamHandler}.
*/ */
public class TracingRequestStreamWrapper extends TracingRequestStreamHandler { public class TracingRequestStreamWrapper extends TracingRequestStreamHandler {
// visible for testing // visible for testing
static WrappedLambda WRAPPED_LAMBDA = WrappedLambda.fromConfiguration(); static WrappedLambda WRAPPED_LAMBDA = WrappedLambda.fromConfiguration();
public TracingRequestStreamWrapper() {
super(WrapperConfiguration.flushTimeout());
}
@Override @Override
protected void doHandleRequest(InputStream inputStream, OutputStream output, Context context) protected void doHandleRequest(InputStream inputStream, OutputStream output, Context context)
throws IOException { throws IOException {

View File

@ -11,10 +11,14 @@ import java.lang.reflect.Method;
/** /**
* Base abstract wrapper for {@link TracingRequestHandler}. Provides: - delegation to a lambda via * Base abstract wrapper for {@link TracingRequestHandler}. Provides: - delegation to a lambda via
* env property OTEL_LAMBDA_HANDLER in package.ClassName::methodName format * env property OTEL_INSTRUMENTATION_AWS_LAMBDA_HANDLER in package.ClassName::methodName format
*/ */
abstract class TracingRequestWrapperBase<I, O> extends TracingRequestHandler<I, O> { abstract class TracingRequestWrapperBase<I, O> extends TracingRequestHandler<I, O> {
protected TracingRequestWrapperBase() {
super(WrapperConfiguration.flushTimeout());
}
// visible for testing // visible for testing
static WrappedLambda WRAPPED_LAMBDA = WrappedLambda.fromConfiguration(); static WrappedLambda WRAPPED_LAMBDA = WrappedLambda.fromConfiguration();

View File

@ -17,7 +17,8 @@ import java.util.Optional;
/** Model for wrapped lambda function (object, class, method). */ /** Model for wrapped lambda function (object, class, method). */
class WrappedLambda { class WrappedLambda {
public static final String OTEL_LAMBDA_HANDLER_ENV_KEY = "OTEL_LAMBDA_HANDLER"; public static final String OTEL_LAMBDA_HANDLER_ENV_KEY =
"OTEL_INSTRUMENTATION_AWS_LAMBDA_HANDLER";
private final Object targetObject; private final Object targetObject;
private final Class<?> targetClass; private final Class<?> targetClass;

View File

@ -0,0 +1,27 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awslambda.v1_0;
public final class WrapperConfiguration {
private WrapperConfiguration() {}
public static final String OTEL_LAMBDA_FLUSH_TIMEOUT_ENV_KEY =
"OTEL_INSTRUMENTATION_AWS_LAMBDA_FLUSH_TIMEOUT";
public static final long OTEL_LAMBDA_FLUSH_TIMEOUT_DEFAULT = 1;
public static final long flushTimeout() {
String lambdaFlushTimeout = System.getenv(OTEL_LAMBDA_FLUSH_TIMEOUT_ENV_KEY);
if (lambdaFlushTimeout != null && !lambdaFlushTimeout.isEmpty()) {
try {
return Long.parseLong(lambdaFlushTimeout);
} catch (NumberFormatException nfe) {
// ignored - default used
}
}
return OTEL_LAMBDA_FLUSH_TIMEOUT_DEFAULT;
}
}