Convert hystrix 1.4 test from groovy to java (#9217)

This commit is contained in:
Jay DeLuca 2023-08-18 08:48:27 -04:00 committed by GitHub
parent b644a88923
commit cb0d00ae8a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 803 additions and 571 deletions

View File

@ -1,108 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import com.netflix.hystrix.HystrixCommandProperties
import com.netflix.hystrix.HystrixObservableCommand
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import rx.Observable
import rx.schedulers.Schedulers
import static com.netflix.hystrix.HystrixCommandGroupKey.Factory.asKey
class HystrixObservableChainTest extends AgentInstrumentationSpecification {
def "test command #action"() {
setup:
def result = runWithSpan("parent") {
def val = new HystrixObservableCommand<String>(setter("ExampleGroup")) {
private String tracedMethod() {
runWithSpan("tracedMethod") {}
return "Hello"
}
@Override
protected Observable<String> construct() {
Observable.defer {
Observable.just(tracedMethod())
}
.subscribeOn(Schedulers.immediate())
}
}.toObservable()
.subscribeOn(Schedulers.io())
.map {
it.toUpperCase()
}.flatMap { str ->
new HystrixObservableCommand<String>(setter("OtherGroup")) {
private String anotherTracedMethod() {
runWithSpan("anotherTracedMethod") {}
return "$str!"
}
@Override
protected Observable<String> construct() {
Observable.defer {
Observable.just(anotherTracedMethod())
}
.subscribeOn(Schedulers.computation())
}
}.toObservable()
.subscribeOn(Schedulers.trampoline())
}.toBlocking().first()
return val
}
expect:
result == "HELLO!"
assertTraces(1) {
trace(0, 5) {
span(0) {
name "parent"
hasNoParent()
attributes {
}
}
span(1) {
name "ExampleGroup.HystrixObservableChainTest\$1.execute"
childOf span(0)
attributes {
"hystrix.command" "HystrixObservableChainTest\$1"
"hystrix.group" "ExampleGroup"
"hystrix.circuit_open" false
}
}
span(2) {
name "tracedMethod"
childOf span(1)
attributes {
}
}
span(3) {
name "OtherGroup.HystrixObservableChainTest\$2.execute"
childOf span(1)
attributes {
"hystrix.command" "HystrixObservableChainTest\$2"
"hystrix.group" "OtherGroup"
"hystrix.circuit_open" false
}
}
span(4) {
name "anotherTracedMethod"
childOf span(3)
attributes {
}
}
}
}
}
def setter(String key) {
def setter = new HystrixObservableCommand.Setter(asKey(key))
setter.andCommandPropertiesDefaults(new HystrixCommandProperties.Setter()
.withExecutionTimeoutInMilliseconds(10_000))
return setter
}
}

View File

@ -1,315 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import com.netflix.hystrix.HystrixCommandProperties
import com.netflix.hystrix.HystrixObservable
import com.netflix.hystrix.HystrixObservableCommand
import com.netflix.hystrix.exception.HystrixRuntimeException
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import rx.Observable
import rx.schedulers.Schedulers
import java.util.concurrent.BlockingQueue
import java.util.concurrent.LinkedBlockingQueue
import static com.netflix.hystrix.HystrixCommandGroupKey.Factory.asKey
import static io.opentelemetry.api.trace.StatusCode.ERROR
class HystrixObservableTest extends AgentInstrumentationSpecification {
def "test command #action"() {
setup:
def observeOnFn = observeOn
def subscribeOnFn = subscribeOn
def result = runWithSpan("parent") {
def val = operation new HystrixObservableCommand<String>(setter("ExampleGroup")) {
private String tracedMethod() {
runWithSpan("tracedMethod") {}
return "Hello!"
}
@Override
protected Observable<String> construct() {
def obs = Observable.defer {
Observable.just(tracedMethod()).repeat(1)
}
if (observeOnFn) {
obs = obs.observeOn(observeOnFn)
}
if (subscribeOnFn) {
obs = obs.subscribeOn(subscribeOnFn)
}
return obs
}
}
return val
}
expect:
result == "Hello!"
assertTraces(1) {
trace(0, 3) {
span(0) {
name "parent"
hasNoParent()
attributes {
}
}
span(1) {
name "ExampleGroup.HystrixObservableTest\$1.execute"
childOf span(0)
attributes {
"hystrix.command" "HystrixObservableTest\$1"
"hystrix.group" "ExampleGroup"
"hystrix.circuit_open" false
}
}
span(2) {
name "tracedMethod"
childOf span(1)
attributes {
}
}
}
}
where:
action | observeOn | subscribeOn | operation
"toObservable" | null | null | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() }
"toObservable-I" | Schedulers.immediate() | null | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() }
"toObservable-T" | Schedulers.trampoline() | null | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() }
"toObservable-C" | Schedulers.computation() | null | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() }
"toObservable-IO" | Schedulers.io() | null | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() }
"toObservable-NT" | Schedulers.newThread() | null | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() }
"toObservable+I" | null | Schedulers.immediate() | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() }
"toObservable+T" | null | Schedulers.trampoline() | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() }
"toObservable+C" | null | Schedulers.computation() | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() }
"toObservable+IO" | null | Schedulers.io() | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() }
"toObservable+NT" | null | Schedulers.newThread() | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() }
"observe" | null | null | { HystrixObservable cmd -> cmd.observe().toBlocking().first() }
"observe-I" | Schedulers.immediate() | null | { HystrixObservable cmd -> cmd.observe().toBlocking().first() }
"observe-T" | Schedulers.trampoline() | null | { HystrixObservable cmd -> cmd.observe().toBlocking().first() }
"observe-C" | Schedulers.computation() | null | { HystrixObservable cmd -> cmd.observe().toBlocking().first() }
"observe-IO" | Schedulers.io() | null | { HystrixObservable cmd -> cmd.observe().toBlocking().first() }
"observe-NT" | Schedulers.newThread() | null | { HystrixObservable cmd -> cmd.observe().toBlocking().first() }
"observe+I" | null | Schedulers.immediate() | { HystrixObservable cmd -> cmd.observe().toBlocking().first() }
"observe+T" | null | Schedulers.trampoline() | { HystrixObservable cmd -> cmd.observe().toBlocking().first() }
"observe+C" | null | Schedulers.computation() | { HystrixObservable cmd -> cmd.observe().toBlocking().first() }
"observe+IO" | null | Schedulers.io() | { HystrixObservable cmd -> cmd.observe().toBlocking().first() }
"observe+NT" | null | Schedulers.newThread() | { HystrixObservable cmd -> cmd.observe().toBlocking().first() }
"toObservable block" | Schedulers.computation() | Schedulers.newThread() | { HystrixObservable cmd ->
BlockingQueue queue = new LinkedBlockingQueue()
def subscription = cmd.toObservable().subscribe { next ->
queue.put(next)
}
def val = queue.take()
subscription.unsubscribe()
return val
}
}
def "test command #action fallback"() {
setup:
def observeOnFn = observeOn
def subscribeOnFn = subscribeOn
def result = runWithSpan("parent") {
def val = operation new HystrixObservableCommand<String>(setter("ExampleGroup")) {
@Override
protected Observable<String> construct() {
def err = Observable.defer {
Observable.error(new IllegalArgumentException()).repeat(1)
}
if (observeOnFn) {
err = err.observeOn(observeOnFn)
}
if (subscribeOnFn) {
err = err.subscribeOn(subscribeOnFn)
}
return err
}
protected Observable<String> resumeWithFallback() {
return Observable.just("Fallback!").repeat(1)
}
}
return val
}
expect:
result == "Fallback!"
assertTraces(1) {
trace(0, 3) {
span(0) {
name "parent"
hasNoParent()
attributes {
}
}
span(1) {
name "ExampleGroup.HystrixObservableTest\$2.execute"
childOf span(0)
status ERROR
errorEvent(IllegalArgumentException)
attributes {
"hystrix.command" "HystrixObservableTest\$2"
"hystrix.group" "ExampleGroup"
"hystrix.circuit_open" false
}
}
span(2) {
name "ExampleGroup.HystrixObservableTest\$2.fallback"
childOf span(1)
attributes {
"hystrix.command" "HystrixObservableTest\$2"
"hystrix.group" "ExampleGroup"
"hystrix.circuit_open" false
}
}
}
}
where:
action | observeOn | subscribeOn | operation
"toObservable" | null | null | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() }
"toObservable-I" | Schedulers.immediate() | null | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() }
"toObservable-T" | Schedulers.trampoline() | null | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() }
"toObservable-C" | Schedulers.computation() | null | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() }
"toObservable-IO" | Schedulers.io() | null | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() }
"toObservable-NT" | Schedulers.newThread() | null | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() }
"toObservable+I" | null | Schedulers.immediate() | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() }
"toObservable+T" | null | Schedulers.trampoline() | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() }
"toObservable+C" | null | Schedulers.computation() | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() }
"toObservable+IO" | null | Schedulers.io() | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() }
"toObservable+NT" | null | Schedulers.newThread() | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() }
"observe" | null | null | { HystrixObservable cmd -> cmd.observe().toBlocking().first() }
"observe-I" | Schedulers.immediate() | null | { HystrixObservable cmd -> cmd.observe().toBlocking().first() }
"observe-T" | Schedulers.trampoline() | null | { HystrixObservable cmd -> cmd.observe().toBlocking().first() }
"observe-C" | Schedulers.computation() | null | { HystrixObservable cmd -> cmd.observe().toBlocking().first() }
"observe-IO" | Schedulers.io() | null | { HystrixObservable cmd -> cmd.observe().toBlocking().first() }
"observe-NT" | Schedulers.newThread() | null | { HystrixObservable cmd -> cmd.observe().toBlocking().first() }
"observe+I" | null | Schedulers.immediate() | { HystrixObservable cmd -> cmd.observe().toBlocking().first() }
"observe+T" | null | Schedulers.trampoline() | { HystrixObservable cmd -> cmd.observe().toBlocking().first() }
"observe+C" | null | Schedulers.computation() | { HystrixObservable cmd -> cmd.observe().toBlocking().first() }
"observe+IO" | null | Schedulers.io() | { HystrixObservable cmd -> cmd.observe().toBlocking().first() }
"observe+NT" | null | Schedulers.newThread() | { HystrixObservable cmd -> cmd.observe().toBlocking().first() }
"toObservable block" | null | null | { HystrixObservable cmd ->
BlockingQueue queue = new LinkedBlockingQueue()
def subscription = cmd.toObservable().subscribe { next ->
queue.put(next)
}
def val = queue.take()
subscription.unsubscribe()
return val
}
}
def "test no fallback results in error for #action"() {
setup:
def observeOnFn = observeOn
def subscribeOnFn = subscribeOn
when:
runWithSpan("parent") {
operation new HystrixObservableCommand<String>(setter("FailingGroup")) {
@Override
protected Observable<String> construct() {
def err = Observable.defer {
Observable.error(new IllegalArgumentException())
}
if (observeOnFn) {
err = err.observeOn(observeOnFn)
}
if (subscribeOnFn) {
err = err.subscribeOn(subscribeOnFn)
}
return err
}
}
}
then:
def err = thrown HystrixRuntimeException
err.cause instanceof IllegalArgumentException
assertTraces(1) {
trace(0, 3) {
span(0) {
name "parent"
hasNoParent()
status ERROR
errorEvent(HystrixRuntimeException, "HystrixObservableTest\$3 failed and no fallback available.")
}
span(1) {
name "FailingGroup.HystrixObservableTest\$3.execute"
childOf span(0)
status ERROR
errorEvent(IllegalArgumentException)
attributes {
"hystrix.command" "HystrixObservableTest\$3"
"hystrix.group" "FailingGroup"
"hystrix.circuit_open" false
}
}
span(2) {
name "FailingGroup.HystrixObservableTest\$3.fallback"
childOf span(1)
status ERROR
errorEvent(UnsupportedOperationException, "No fallback available.")
attributes {
"hystrix.command" "HystrixObservableTest\$3"
"hystrix.group" "FailingGroup"
"hystrix.circuit_open" false
}
}
}
}
where:
action | observeOn | subscribeOn | operation
"toObservable" | null | null | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() }
"toObservable-I" | Schedulers.immediate() | null | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() }
"toObservable-T" | Schedulers.trampoline() | null | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() }
"toObservable-C" | Schedulers.computation() | null | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() }
"toObservable-IO" | Schedulers.io() | null | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() }
"toObservable-NT" | Schedulers.newThread() | null | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() }
"toObservable+I" | null | Schedulers.immediate() | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() }
"toObservable+T" | null | Schedulers.trampoline() | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() }
"toObservable+C" | null | Schedulers.computation() | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() }
"toObservable+IO" | null | Schedulers.io() | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() }
"toObservable+NT" | null | Schedulers.newThread() | { HystrixObservable cmd -> cmd.toObservable().toBlocking().first() }
"observe" | null | null | { HystrixObservable cmd -> cmd.observe().toBlocking().first() }
"observe-I" | Schedulers.immediate() | null | { HystrixObservable cmd -> cmd.observe().toBlocking().first() }
"observe-T" | Schedulers.trampoline() | null | { HystrixObservable cmd -> cmd.observe().toBlocking().first() }
"observe-C" | Schedulers.computation() | null | { HystrixObservable cmd -> cmd.observe().toBlocking().first() }
"observe-IO" | Schedulers.io() | null | { HystrixObservable cmd -> cmd.observe().toBlocking().first() }
"observe-NT" | Schedulers.newThread() | null | { HystrixObservable cmd -> cmd.observe().toBlocking().first() }
"observe+I" | null | Schedulers.immediate() | { HystrixObservable cmd -> cmd.observe().toBlocking().first() }
"observe+T" | null | Schedulers.trampoline() | { HystrixObservable cmd -> cmd.observe().toBlocking().first() }
"observe+C" | null | Schedulers.computation() | { HystrixObservable cmd -> cmd.observe().toBlocking().first() }
"observe+IO" | null | Schedulers.io() | { HystrixObservable cmd -> cmd.observe().toBlocking().first() }
"observe+NT" | null | Schedulers.newThread() | { HystrixObservable cmd -> cmd.observe().toBlocking().first() }
"toObservable block" | Schedulers.computation() | Schedulers.newThread() | { HystrixObservable cmd ->
def queue = new LinkedBlockingQueue<Throwable>()
def subscription = cmd.toObservable().subscribe({ next ->
queue.put(new Exception("Unexpectedly got a next"))
}, { next ->
queue.put(next)
})
Throwable ex = queue.take()
subscription.unsubscribe()
throw ex
}
}
def setter(String key) {
def setter = new HystrixObservableCommand.Setter(asKey(key))
setter.andCommandPropertiesDefaults(new HystrixCommandProperties.Setter()
.withExecutionTimeoutInMilliseconds(10_000))
return setter
}
}

View File

@ -1,148 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import com.netflix.hystrix.HystrixCommand
import com.netflix.hystrix.HystrixCommandProperties
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import java.util.concurrent.BlockingQueue
import java.util.concurrent.LinkedBlockingQueue
import static com.netflix.hystrix.HystrixCommandGroupKey.Factory.asKey
import static io.opentelemetry.api.trace.StatusCode.ERROR
class HystrixTest extends AgentInstrumentationSpecification {
def "test command #action"() {
setup:
def command = new HystrixCommand<String>(setter("ExampleGroup")) {
@Override
protected String run() throws Exception {
return tracedMethod()
}
private String tracedMethod() {
runWithSpan("tracedMethod") {}
return "Hello!"
}
}
def result = runWithSpan("parent") {
operation(command)
}
expect:
result == "Hello!"
assertTraces(1) {
trace(0, 3) {
span(0) {
name "parent"
hasNoParent()
attributes {
}
}
span(1) {
name "ExampleGroup.HystrixTest\$1.execute"
childOf span(0)
attributes {
"hystrix.command" "HystrixTest\$1"
"hystrix.group" "ExampleGroup"
"hystrix.circuit_open" false
}
}
span(2) {
name "tracedMethod"
childOf span(1)
attributes {
}
}
}
}
where:
action | operation
"execute" | { HystrixCommand cmd -> cmd.execute() }
"queue" | { HystrixCommand cmd -> cmd.queue().get() }
"toObservable" | { HystrixCommand cmd -> cmd.toObservable().toBlocking().first() }
"observe" | { HystrixCommand cmd -> cmd.observe().toBlocking().first() }
"observe block" | { HystrixCommand cmd ->
BlockingQueue queue = new LinkedBlockingQueue()
cmd.observe().subscribe { next ->
queue.put(next)
}
queue.take()
}
}
def "test command #action fallback"() {
setup:
def command = new HystrixCommand<String>(setter("ExampleGroup")) {
@Override
protected String run() throws Exception {
throw new IllegalArgumentException()
}
protected String getFallback() {
return "Fallback!"
}
}
def result = runWithSpan("parent") {
operation(command)
}
expect:
result == "Fallback!"
assertTraces(1) {
trace(0, 3) {
span(0) {
name "parent"
hasNoParent()
attributes {
}
}
span(1) {
name "ExampleGroup.HystrixTest\$2.execute"
childOf span(0)
status ERROR
errorEvent(IllegalArgumentException)
attributes {
"hystrix.command" "HystrixTest\$2"
"hystrix.group" "ExampleGroup"
"hystrix.circuit_open" false
}
}
span(2) {
name "ExampleGroup.HystrixTest\$2.fallback"
childOf span(1)
attributes {
"hystrix.command" "HystrixTest\$2"
"hystrix.group" "ExampleGroup"
"hystrix.circuit_open" false
}
}
}
}
where:
action | operation
"execute" | { HystrixCommand cmd -> cmd.execute() }
"queue" | { HystrixCommand cmd -> cmd.queue().get() }
"toObservable" | { HystrixCommand cmd -> cmd.toObservable().toBlocking().first() }
"observe" | { HystrixCommand cmd -> cmd.observe().toBlocking().first() }
"observe block" | { HystrixCommand cmd ->
BlockingQueue queue = new LinkedBlockingQueue()
cmd.observe().subscribe { next ->
queue.put(next)
}
queue.take()
}
}
def setter(String key) {
def setter = new HystrixCommand.Setter(asKey(key))
setter.andCommandPropertiesDefaults(new HystrixCommandProperties.Setter()
.withExecutionTimeoutInMilliseconds(10_000))
return setter
}
}

View File

@ -0,0 +1,123 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.hystrix;
import static io.opentelemetry.api.common.AttributeKey.booleanKey;
import static io.opentelemetry.api.common.AttributeKey.stringKey;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static org.assertj.core.api.Assertions.assertThat;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandProperties;
import com.netflix.hystrix.HystrixObservableCommand;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import rx.Observable;
import rx.schedulers.Schedulers;
class HystrixObservableChainTest {
@RegisterExtension
protected static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
@Test
@SuppressWarnings("RxReturnValueIgnored")
void testCommand() {
class TestCommand extends HystrixObservableCommand<String> {
protected TestCommand(Setter setter) {
super(setter);
}
private String tracedMethod() {
testing.runWithSpan("tracedMethod", () -> {});
return "Hello";
}
@Override
protected Observable<String> construct() {
return Observable.defer(() -> Observable.just(tracedMethod()))
.subscribeOn(Schedulers.immediate());
}
}
class AnotherTestCommand extends HystrixObservableCommand<String> {
private final String str;
protected AnotherTestCommand(Setter setter, String str) {
super(setter);
this.str = str;
}
private String anotherTracedMethod() {
testing.runWithSpan("anotherTracedMethod", () -> {});
return str + "!";
}
@Override
protected Observable<String> construct() {
return Observable.defer(() -> Observable.just(anotherTracedMethod()))
.subscribeOn(Schedulers.computation());
}
}
String result =
testing.runWithSpan(
"parent",
() ->
new TestCommand(setter("ExampleGroup"))
.toObservable()
.subscribeOn(Schedulers.io())
.map(String::toUpperCase)
.flatMap(
str ->
new AnotherTestCommand(setter("OtherGroup"), str)
.toObservable()
.subscribeOn(Schedulers.trampoline()))
.toBlocking()
.first());
assertThat(result).isEqualTo("HELLO!");
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent").hasNoParent().hasAttributes(Attributes.empty()),
span ->
span.hasName("ExampleGroup.TestCommand.execute")
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(stringKey("hystrix.command"), "TestCommand"),
equalTo(stringKey("hystrix.group"), "ExampleGroup"),
equalTo(booleanKey("hystrix.circuit_open"), false)),
span ->
span.hasName("tracedMethod")
.hasParent(trace.getSpan(1))
.hasAttributes(Attributes.empty()),
span ->
span.hasName("OtherGroup.AnotherTestCommand.execute")
.hasParent(trace.getSpan(1))
.hasAttributesSatisfyingExactly(
equalTo(stringKey("hystrix.command"), "AnotherTestCommand"),
equalTo(stringKey("hystrix.group"), "OtherGroup"),
equalTo(booleanKey("hystrix.circuit_open"), false)),
span ->
span.hasName("anotherTracedMethod")
.hasParent(trace.getSpan(3))
.hasAttributes(Attributes.empty())));
}
private static HystrixObservableCommand.Setter setter(String key) {
HystrixObservableCommand.Setter setter =
HystrixObservableCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(key));
setter.andCommandPropertiesDefaults(
HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(10_000));
return setter;
}
}

View File

@ -0,0 +1,488 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.hystrix;
import static io.opentelemetry.api.common.AttributeKey.booleanKey;
import static io.opentelemetry.api.common.AttributeKey.stringKey;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.EXCEPTION_MESSAGE;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.EXCEPTION_STACKTRACE;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.EXCEPTION_TYPE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.catchException;
import static org.junit.jupiter.api.Named.named;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandProperties;
import com.netflix.hystrix.HystrixObservableCommand;
import com.netflix.hystrix.exception.HystrixRuntimeException;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.sdk.trace.data.StatusData;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Function;
import java.util.stream.Stream;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import rx.Observable;
import rx.Scheduler;
import rx.Subscription;
import rx.schedulers.Schedulers;
class HystrixObservableTest {
@RegisterExtension
protected static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
@ParameterizedTest
@MethodSource("provideCommandActionArguments")
void testCommands(Parameter parameter) {
class TestCommand extends HystrixObservableCommand<String> {
protected TestCommand(Setter setter) {
super(setter);
}
private String tracedMethod() {
testing.runWithSpan("tracedMethod", () -> {});
return "Hello!";
}
@Override
protected Observable<String> construct() {
Observable<String> obs = Observable.defer(() -> Observable.just(tracedMethod()).repeat(1));
if (parameter.observeOn != null) {
obs = obs.observeOn(parameter.observeOn);
}
if (parameter.subscribeOn != null) {
obs = obs.subscribeOn(parameter.subscribeOn);
}
return obs;
}
}
String result =
testing.runWithSpan(
"parent",
() -> {
HystrixObservableCommand<String> val = new TestCommand(setter("ExampleGroup"));
return parameter.operation.apply(val);
});
assertThat(result).isEqualTo("Hello!");
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent").hasNoParent().hasAttributes(Attributes.empty()),
span ->
span.hasName("ExampleGroup.TestCommand.execute")
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(stringKey("hystrix.command"), "TestCommand"),
equalTo(stringKey("hystrix.group"), "ExampleGroup"),
equalTo(booleanKey("hystrix.circuit_open"), false)),
span ->
span.hasName("tracedMethod")
.hasParent(trace.getSpan(1))
.hasAttributes(Attributes.empty())));
}
private static Stream<Arguments> baseArguments() {
return Stream.of(
Arguments.of(
named(
"toObservable",
new Parameter(null, null, cmd -> cmd.toObservable().toBlocking().first()))),
Arguments.of(
named(
"toObservable-I",
new Parameter(
Schedulers.immediate(), null, cmd -> cmd.toObservable().toBlocking().first()))),
Arguments.of(
named(
"toObservable-T",
new Parameter(
Schedulers.trampoline(),
null,
cmd -> cmd.toObservable().toBlocking().first()))),
Arguments.of(
named(
"toObservable-C",
new Parameter(
Schedulers.computation(),
null,
cmd -> cmd.toObservable().toBlocking().first()))),
Arguments.of(
named(
"toObservable-IO",
new Parameter(
Schedulers.io(), null, cmd -> cmd.toObservable().toBlocking().first()))),
Arguments.of(
named(
"toObservable-NT",
new Parameter(
Schedulers.newThread(), null, cmd -> cmd.toObservable().toBlocking().first()))),
Arguments.of(
named(
"toObservable+I",
new Parameter(
null, Schedulers.immediate(), cmd -> cmd.toObservable().toBlocking().first()))),
Arguments.of(
named(
"toObservable+T",
new Parameter(
null,
Schedulers.trampoline(),
cmd -> cmd.toObservable().toBlocking().first()))),
Arguments.of(
named(
"toObservable+C",
new Parameter(
null,
Schedulers.computation(),
cmd -> cmd.toObservable().toBlocking().first()))),
Arguments.of(
named(
"toObservable+IO",
new Parameter(
null, Schedulers.io(), cmd -> cmd.toObservable().toBlocking().first()))),
Arguments.of(
named(
"toObservable+NT",
new Parameter(
null, Schedulers.newThread(), cmd -> cmd.toObservable().toBlocking().first()))),
Arguments.of(
named("observe", new Parameter(null, null, cmd -> cmd.observe().toBlocking().first()))),
Arguments.of(
named(
"observe-I",
new Parameter(
Schedulers.immediate(), null, cmd -> cmd.observe().toBlocking().first()))),
Arguments.of(
named(
"observe-T",
new Parameter(
Schedulers.trampoline(), null, cmd -> cmd.observe().toBlocking().first()))),
Arguments.of(
named(
"observe-C",
new Parameter(
Schedulers.computation(), null, cmd -> cmd.observe().toBlocking().first()))),
Arguments.of(
named(
"observe-IO",
new Parameter(Schedulers.io(), null, cmd -> cmd.observe().toBlocking().first()))),
Arguments.of(
named(
"observe-NT",
new Parameter(
Schedulers.newThread(), null, cmd -> cmd.observe().toBlocking().first()))),
Arguments.of(
named(
"observe+I",
new Parameter(
null, Schedulers.immediate(), cmd -> cmd.observe().toBlocking().first()))),
Arguments.of(
named(
"observe+T",
new Parameter(
null, Schedulers.trampoline(), cmd -> cmd.observe().toBlocking().first()))),
Arguments.of(
named(
"observe+C",
new Parameter(
null, Schedulers.computation(), cmd -> cmd.observe().toBlocking().first()))),
Arguments.of(
named(
"observe+IO",
new Parameter(null, Schedulers.io(), cmd -> cmd.observe().toBlocking().first()))),
Arguments.of(
named(
"observe+NT",
new Parameter(
null, Schedulers.newThread(), cmd -> cmd.observe().toBlocking().first()))));
}
private static Stream<Arguments> provideCommandActionArguments() {
return Stream.concat(
baseArguments(),
Stream.of(
Arguments.of(
named(
"toObservable block",
new Parameter(
Schedulers.computation(),
Schedulers.newThread(),
cmd -> {
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
Subscription subscription =
cmd.toObservable()
.subscribe(
next -> {
try {
queue.put(next);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
String returnValue;
try {
returnValue = queue.take();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
subscription.unsubscribe();
return returnValue;
})))));
}
@ParameterizedTest
@MethodSource("provideCommandFallbackArguments")
void testCommandFallbacks(Parameter parameter) {
class TestCommand extends HystrixObservableCommand<String> {
protected TestCommand(Setter setter) {
super(setter);
}
@Override
protected Observable<String> construct() {
Observable<String> err =
Observable.defer(() -> Observable.error(new IllegalArgumentException()));
if (parameter.observeOn != null) {
err = err.observeOn(parameter.observeOn);
}
if (parameter.subscribeOn != null) {
err = err.subscribeOn(parameter.subscribeOn);
}
return err;
}
@Override
protected Observable<String> resumeWithFallback() {
return Observable.just("Fallback!").repeat(1);
}
}
String result =
testing.runWithSpan(
"parent",
() -> {
HystrixObservableCommand<String> val = new TestCommand(setter("ExampleGroup"));
return parameter.operation.apply(val);
});
assertThat(result).isEqualTo("Fallback!");
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent").hasNoParent().hasAttributes(Attributes.empty()),
span ->
span.hasName("ExampleGroup.TestCommand.execute")
.hasParent(trace.getSpan(0))
.hasStatus(StatusData.error())
.hasEventsSatisfyingExactly(
event ->
event
.hasName("exception")
.hasAttributesSatisfyingExactly(
equalTo(
EXCEPTION_TYPE, "java.lang.IllegalArgumentException"),
satisfies(
EXCEPTION_STACKTRACE,
val -> val.isInstanceOf(String.class)))),
span ->
span.hasName("ExampleGroup.TestCommand.fallback")
.hasParent(trace.getSpan(1))
.hasAttributesSatisfyingExactly(
equalTo(stringKey("hystrix.command"), "TestCommand"),
equalTo(stringKey("hystrix.group"), "ExampleGroup"),
equalTo(booleanKey("hystrix.circuit_open"), false))));
}
private static Stream<Arguments> provideCommandFallbackArguments() {
return Stream.concat(
baseArguments(),
Stream.of(
Arguments.of(
named(
"toObservable block",
new Parameter(
null,
null,
cmd -> {
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
Subscription subscription =
cmd.toObservable()
.subscribe(
next -> {
try {
queue.put(next);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
String returnValue;
try {
returnValue = queue.take();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
subscription.unsubscribe();
return returnValue;
})))));
}
@ParameterizedTest
@MethodSource("provideCommandNoFallbackResultsInErrorArguments")
void testNoFallbackResultsInErrorForAction(Parameter parameter) {
class TestCommand extends HystrixObservableCommand<String> {
protected TestCommand(Setter setter) {
super(setter);
}
@Override
protected Observable<String> construct() {
Observable<String> err =
Observable.defer(() -> Observable.error(new IllegalArgumentException()));
if (parameter.observeOn != null) {
err = err.observeOn(parameter.observeOn);
}
if (parameter.subscribeOn != null) {
err = err.subscribeOn(parameter.subscribeOn);
}
return err;
}
}
Throwable exception =
catchException(
() ->
testing.runWithSpan(
"parent",
() -> {
HystrixObservableCommand<String> val =
new TestCommand(setter("FailingGroup"));
return parameter.operation.apply(val);
}));
assertThat(exception).hasCauseInstanceOf(IllegalArgumentException.class);
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("parent")
.hasNoParent()
.hasStatus(StatusData.error())
.hasException(exception),
span ->
span.hasName("FailingGroup.TestCommand.execute")
.hasParent(trace.getSpan(0))
.hasStatus(StatusData.error())
.hasException(exception.getCause())
.hasAttributesSatisfyingExactly(
equalTo(stringKey("hystrix.command"), "TestCommand"),
equalTo(stringKey("hystrix.group"), "FailingGroup"),
equalTo(booleanKey("hystrix.circuit_open"), false)),
span ->
span.hasName("FailingGroup.TestCommand.fallback")
.hasParent(trace.getSpan(1))
.hasEventsSatisfyingExactly(
event ->
event
.hasName("exception")
.hasAttributesSatisfyingExactly(
equalTo(
EXCEPTION_TYPE,
"java.lang.UnsupportedOperationException"),
satisfies(
EXCEPTION_STACKTRACE,
val -> val.isInstanceOf(String.class)),
equalTo(EXCEPTION_MESSAGE, "No fallback available.")))
.hasAttributesSatisfyingExactly(
equalTo(stringKey("hystrix.command"), "TestCommand"),
equalTo(stringKey("hystrix.group"), "FailingGroup"),
equalTo(booleanKey("hystrix.circuit_open"), false))));
}
private static Stream<Arguments> provideCommandNoFallbackResultsInErrorArguments() {
return Stream.concat(
baseArguments(),
Stream.of(
Arguments.of(
named(
"toObservable block",
new Parameter(
Schedulers.computation(),
Schedulers.newThread(),
cmd -> {
BlockingQueue<Throwable> queue = new LinkedBlockingQueue<>();
Subscription subscription =
cmd.toObservable()
.subscribe(
next -> {
try {
queue.put(new Exception("Unexpectedly got a next"));
} catch (InterruptedException e) {
throw new IllegalArgumentException(e);
}
},
next -> {
try {
queue.put(next);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
Throwable returnValue;
try {
returnValue = queue.take();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
subscription.unsubscribe();
try {
throw returnValue;
} catch (Throwable e) {
throw (HystrixRuntimeException) e;
}
})))));
}
private static HystrixObservableCommand.Setter setter(String key) {
HystrixObservableCommand.Setter setter =
HystrixObservableCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey(key));
setter.andCommandPropertiesDefaults(
HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(10_000));
return setter;
}
private static class Parameter {
public final Scheduler observeOn;
public final Scheduler subscribeOn;
public final Function<HystrixObservableCommand<String>, String> operation;
public Parameter(
Scheduler observeOn,
Scheduler subscribeOn,
Function<HystrixObservableCommand<String>, String> operation) {
this.observeOn = observeOn;
this.subscribeOn = subscribeOn;
this.operation = operation;
}
}
}

View File

@ -0,0 +1,192 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.hystrix;
import static io.opentelemetry.api.common.AttributeKey.booleanKey;
import static io.opentelemetry.api.common.AttributeKey.stringKey;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.EXCEPTION_STACKTRACE;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.EXCEPTION_TYPE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Named.named;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandProperties;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.sdk.trace.data.StatusData;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Function;
import java.util.stream.Stream;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
class HystrixTest {
@RegisterExtension
protected static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
@ParameterizedTest
@MethodSource("provideCommandActionArguments")
void testCommands(Function<HystrixCommand<String>, String> operation) {
class TestCommand extends HystrixCommand<String> {
protected TestCommand(Setter setter) {
super(setter);
}
@Override
protected String run() throws Exception {
return tracedMethod();
}
private String tracedMethod() {
testing.runWithSpan("tracedMethod", () -> {});
return "Hello!";
}
}
HystrixCommand<String> command = new TestCommand(setter());
String result = testing.runWithSpan("parent", () -> operation.apply(command));
assertThat(result).isEqualTo("Hello!");
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent").hasNoParent().hasAttributes(Attributes.empty()),
span ->
span.hasName("ExampleGroup.TestCommand.execute")
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(stringKey("hystrix.command"), "TestCommand"),
equalTo(stringKey("hystrix.group"), "ExampleGroup"),
equalTo(booleanKey("hystrix.circuit_open"), false)),
span ->
span.hasName("tracedMethod")
.hasParent(trace.getSpan(1))
.hasAttributes(Attributes.empty())));
}
@ParameterizedTest
@MethodSource("provideCommandActionArguments")
void testCommandFallbacks(Function<HystrixCommand<String>, String> operation) {
class TestCommand extends HystrixCommand<String> {
protected TestCommand(Setter setter) {
super(setter);
}
@Override
protected String run() throws Exception {
throw new IllegalArgumentException();
}
@Override
protected String getFallback() {
return "Fallback!";
}
}
HystrixCommand<String> command = new TestCommand(setter());
String result = testing.runWithSpan("parent", () -> operation.apply(command));
assertThat(result).isEqualTo("Fallback!");
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent").hasNoParent().hasAttributes(Attributes.empty()),
span ->
span.hasName("ExampleGroup.TestCommand.execute")
.hasParent(trace.getSpan(0))
.hasStatus(StatusData.error())
.hasEventsSatisfyingExactly(
event ->
event
.hasName("exception")
.hasAttributesSatisfyingExactly(
equalTo(
EXCEPTION_TYPE, "java.lang.IllegalArgumentException"),
satisfies(
EXCEPTION_STACKTRACE,
val -> val.isInstanceOf(String.class))))
.hasAttributesSatisfyingExactly(
equalTo(stringKey("hystrix.command"), "TestCommand"),
equalTo(stringKey("hystrix.group"), "ExampleGroup"),
equalTo(booleanKey("hystrix.circuit_open"), false)),
span ->
span.hasName("ExampleGroup.TestCommand.fallback")
.hasParent(trace.getSpan(1))
.hasAttributesSatisfyingExactly(
equalTo(stringKey("hystrix.command"), "TestCommand"),
equalTo(stringKey("hystrix.group"), "ExampleGroup"),
equalTo(booleanKey("hystrix.circuit_open"), false))));
}
private static Stream<Arguments> provideCommandActionArguments() {
return Stream.of(
Arguments.of(
named("execute", (Function<HystrixCommand<String>, String>) HystrixCommand::execute)),
Arguments.of(
named(
"queue",
(Function<HystrixCommand<String>, String>)
cmd -> {
try {
return cmd.queue().get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
})),
Arguments.of(
named(
"toObservable",
(Function<HystrixCommand<String>, String>)
cmd -> cmd.toObservable().toBlocking().first())),
Arguments.of(
named(
"observe",
(Function<HystrixCommand<String>, String>)
cmd -> cmd.observe().toBlocking().first())),
Arguments.of(
named(
"observe block",
(Function<HystrixCommand<String>, String>)
cmd -> {
BlockingQueue<String> queue = new LinkedBlockingQueue<>();
cmd.observe()
.subscribe(
next -> {
try {
queue.put(next);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
String returnValue;
try {
returnValue = queue.take();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return returnValue;
})));
}
private static HystrixCommand.Setter setter() {
HystrixCommand.Setter setter =
HystrixCommand.Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
setter.andCommandPropertiesDefaults(
HystrixCommandProperties.Setter().withExecutionTimeoutInMilliseconds(10_000));
return setter;
}
}