Get rid of `WRITER_PHASER`

We have alternative way of doing the same thing and `Phaser` seems to be
not very correct way of doing this anyway.
This commit is contained in:
Nikolay Martynov 2018-07-23 13:35:27 -04:00
parent 989091847a
commit 3d8e76c2a4
3 changed files with 3 additions and 10 deletions

View File

@ -44,11 +44,10 @@ class KafkaClientTest extends AgentTestRunner {
def records = new LinkedBlockingQueue<ConsumerRecord<String, String>>() def records = new LinkedBlockingQueue<ConsumerRecord<String, String>>()
// setup a Kafka message listener // setup a Kafka message listener
WRITER_PHASER.register()
container.setupMessageListener(new MessageListener<String, String>() { container.setupMessageListener(new MessageListener<String, String>() {
@Override @Override
void onMessage(ConsumerRecord<String, String> record) { void onMessage(ConsumerRecord<String, String> record) {
WRITER_PHASER.arriveAndAwaitAdvance() // ensure consistent ordering of traces TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces
records.add(record) records.add(record)
} }
}) })

View File

@ -43,14 +43,13 @@ class KafkaStreamsTest extends AgentTestRunner {
def consumerContainer = new KafkaMessageListenerContainer<>(consumerFactory, new ContainerProperties(STREAM_PROCESSED)) def consumerContainer = new KafkaMessageListenerContainer<>(consumerFactory, new ContainerProperties(STREAM_PROCESSED))
// create a thread safe queue to store the processed message // create a thread safe queue to store the processed message
WRITER_PHASER.register()
def records = new LinkedBlockingQueue<ConsumerRecord<String, String>>() def records = new LinkedBlockingQueue<ConsumerRecord<String, String>>()
// setup a Kafka message listener // setup a Kafka message listener
consumerContainer.setupMessageListener(new MessageListener<String, String>() { consumerContainer.setupMessageListener(new MessageListener<String, String>() {
@Override @Override
void onMessage(ConsumerRecord<String, String> record) { void onMessage(ConsumerRecord<String, String> record) {
WRITER_PHASER.arriveAndAwaitAdvance() // ensure consistent ordering of traces TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces
getTestTracer().activeSpan().setTag("testing", 123) getTestTracer().activeSpan().setTag("testing", 123)
records.add(record) records.add(record)
} }
@ -69,7 +68,7 @@ class KafkaStreamsTest extends AgentTestRunner {
.mapValues(new ValueMapper<String, String>() { .mapValues(new ValueMapper<String, String>() {
@Override @Override
String apply(String textLine) { String apply(String textLine) {
WRITER_PHASER.arriveAndAwaitAdvance() // ensure consistent ordering of traces TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces
getTestTracer().activeSpan().setTag("asdf", "testing") getTestTracer().activeSpan().setTag("asdf", "testing")
return textLine.toLowerCase() return textLine.toLowerCase()
} }

View File

@ -16,7 +16,6 @@ import java.util.List;
import java.util.ServiceLoader; import java.util.ServiceLoader;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import net.bytebuddy.agent.ByteBuddyAgent; import net.bytebuddy.agent.ByteBuddyAgent;
import net.bytebuddy.agent.builder.AgentBuilder; import net.bytebuddy.agent.builder.AgentBuilder;
@ -69,8 +68,6 @@ public abstract class AgentTestRunner extends Specification {
private static final Instrumentation instrumentation; private static final Instrumentation instrumentation;
private static volatile ClassFileTransformer activeTransformer = null; private static volatile ClassFileTransformer activeTransformer = null;
protected static final Phaser WRITER_PHASER = new Phaser();
static { static {
instrumentation = ByteBuddyAgent.getInstrumentation(); instrumentation = ByteBuddyAgent.getInstrumentation();
@ -82,7 +79,6 @@ public abstract class AgentTestRunner extends Specification {
@Override @Override
public boolean add(final List<DDSpan> trace) { public boolean add(final List<DDSpan> trace) {
final boolean result = super.add(trace); final boolean result = super.add(trace);
WRITER_PHASER.arriveAndDeregister();
return result; return result;
} }
}; };
@ -136,7 +132,6 @@ public abstract class AgentTestRunner extends Specification {
@Before @Before
public void beforeTest() { public void beforeTest() {
TEST_WRITER.start(); TEST_WRITER.start();
WRITER_PHASER.register();
INSTRUMENTATION_ERROR_COUNT.set(0); INSTRUMENTATION_ERROR_COUNT.set(0);
ERROR_LISTENER.activateTest(this); ERROR_LISTENER.activateTest(this);
assert getTestTracer().activeSpan() == null : "Span is active before test has started"; assert getTestTracer().activeSpan() == null : "Span is active before test has started";