Returned a StreamedAsyncHandler instance where appropriate

This commit is contained in:
Laplie Anderson 2020-03-03 17:45:21 -05:00
parent 0c74cf031e
commit fd0e10f0cb
7 changed files with 76 additions and 3 deletions

View File

@ -28,7 +28,11 @@ public class PlayWSClientInstrumentation extends BasePlayWSClientInstrumentation
DECORATE.onRequest(span, request);
propagate().inject(span, request, SETTER);
asyncHandler = new AsyncHandlerWrapper(asyncHandler, span);
if (asyncHandler instanceof StreamedAsyncHandler) {
asyncHandler = new StreamedAsyncHandlerWrapper((StreamedAsyncHandler) asyncHandler, span);
} else {
asyncHandler = new AsyncHandlerWrapper(asyncHandler, span);
}
return span;
}

View File

@ -0,0 +1,20 @@
package datadog.trace.instrumentation.playws1;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import org.reactivestreams.Publisher;
import play.shaded.ahc.org.asynchttpclient.handler.StreamedAsyncHandler;
public class StreamedAsyncHandlerWrapper extends AsyncHandlerWrapper
implements StreamedAsyncHandler {
private final StreamedAsyncHandler streamedDelegate;
public StreamedAsyncHandlerWrapper(final StreamedAsyncHandler delegate, final AgentSpan span) {
super(delegate, span);
streamedDelegate = delegate;
}
@Override
public State onStream(final Publisher publisher) {
return streamedDelegate.onStream(publisher);
}
}

View File

@ -28,7 +28,11 @@ public class PlayWSClientInstrumentation extends BasePlayWSClientInstrumentation
DECORATE.onRequest(span, request);
propagate().inject(span, request, SETTER);
asyncHandler = new AsyncHandlerWrapper(asyncHandler, span);
if (asyncHandler instanceof StreamedAsyncHandler) {
asyncHandler = new StreamedAsyncHandlerWrapper((StreamedAsyncHandler) asyncHandler, span);
} else {
asyncHandler = new AsyncHandlerWrapper(asyncHandler, span);
}
return span;
}

View File

@ -0,0 +1,20 @@
package datadog.trace.instrumentation.playws21;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import org.reactivestreams.Publisher;
import play.shaded.ahc.org.asynchttpclient.handler.StreamedAsyncHandler;
public class StreamedAsyncHandlerWrapper extends AsyncHandlerWrapper
implements StreamedAsyncHandler {
private final StreamedAsyncHandler streamedDelegate;
public StreamedAsyncHandlerWrapper(final StreamedAsyncHandler delegate, final AgentSpan span) {
super(delegate, span);
streamedDelegate = delegate;
}
@Override
public State onStream(final Publisher publisher) {
return streamedDelegate.onStream(publisher);
}
}

View File

@ -28,7 +28,11 @@ public class PlayWSClientInstrumentation extends BasePlayWSClientInstrumentation
DECORATE.onRequest(span, request);
propagate().inject(span, request, SETTER);
asyncHandler = new AsyncHandlerWrapper(asyncHandler, span);
if (asyncHandler instanceof StreamedAsyncHandler) {
asyncHandler = new StreamedAsyncHandlerWrapper((StreamedAsyncHandler) asyncHandler, span);
} else {
asyncHandler = new AsyncHandlerWrapper(asyncHandler, span);
}
return span;
}

View File

@ -0,0 +1,20 @@
package datadog.trace.instrumentation.playws2;
import datadog.trace.bootstrap.instrumentation.api.AgentSpan;
import org.reactivestreams.Publisher;
import play.shaded.ahc.org.asynchttpclient.handler.StreamedAsyncHandler;
public class StreamedAsyncHandlerWrapper extends AsyncHandlerWrapper
implements StreamedAsyncHandler {
private final StreamedAsyncHandler streamedDelegate;
public StreamedAsyncHandlerWrapper(final StreamedAsyncHandler delegate, final AgentSpan span) {
super(delegate, span);
streamedDelegate = delegate;
}
@Override
public State onStream(final Publisher publisher) {
return streamedDelegate.onStream(publisher);
}
}

View File

@ -50,6 +50,7 @@ public abstract class BasePlayWSClientInstrumentation extends Instrumenter.Defau
"datadog.trace.instrumentation.playws.PlayWSClientDecorator",
"datadog.trace.instrumentation.playws.HeadersInjectAdapter",
packageName + ".AsyncHandlerWrapper",
packageName + ".StreamedAsyncHandlerWrapper"
};
}
}