Fix codenarc
This commit is contained in:
parent
4a2e12c348
commit
b442302b6a
|
@ -84,30 +84,30 @@ class WriterQueueTest extends Specification {
|
||||||
def "check concurrency on writes"() {
|
def "check concurrency on writes"() {
|
||||||
setup:
|
setup:
|
||||||
|
|
||||||
def phaser_1 = new Phaser()
|
def phaser1 = new Phaser()
|
||||||
def phaser_2 = new Phaser()
|
def phaser2 = new Phaser()
|
||||||
def queue = new WriterQueue<Integer>(capacity)
|
def queue = new WriterQueue<Integer>(capacity)
|
||||||
def insertionCount = new AtomicInteger(0)
|
def insertionCount = new AtomicInteger(0)
|
||||||
|
|
||||||
phaser_1.register() // global start
|
phaser1.register() // global start
|
||||||
phaser_2.register() // global stop
|
phaser2.register() // global stop
|
||||||
|
|
||||||
numberThreads.times {
|
numberThreads.times {
|
||||||
phaser_1.register()
|
phaser1.register()
|
||||||
Thread.start {
|
Thread.start {
|
||||||
phaser_2.register()
|
phaser2.register()
|
||||||
phaser_1.arriveAndAwaitAdvance()
|
phaser1.arriveAndAwaitAdvance()
|
||||||
numberInsertionsPerThread.times {
|
numberInsertionsPerThread.times {
|
||||||
queue.add(1)
|
queue.add(1)
|
||||||
insertionCount.getAndIncrement()
|
insertionCount.getAndIncrement()
|
||||||
}
|
}
|
||||||
phaser_2.arriveAndAwaitAdvance()
|
phaser2.arriveAndAwaitAdvance()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
when:
|
when:
|
||||||
phaser_1.arriveAndAwaitAdvance() // allow threads to start
|
phaser1.arriveAndAwaitAdvance() // allow threads to start
|
||||||
phaser_2.arriveAndAwaitAdvance() // wait till the job is not finished
|
phaser2.arriveAndAwaitAdvance() // wait till the job is not finished
|
||||||
|
|
||||||
then:
|
then:
|
||||||
queue.size() == capacity
|
queue.size() == capacity
|
||||||
|
@ -123,48 +123,48 @@ class WriterQueueTest extends Specification {
|
||||||
|
|
||||||
def "check concurrency on writes and reads"() {
|
def "check concurrency on writes and reads"() {
|
||||||
setup:
|
setup:
|
||||||
def phaser_1 = new Phaser()
|
def phaser1 = new Phaser()
|
||||||
def phaser_2 = new Phaser()
|
def phaser2 = new Phaser()
|
||||||
def queue = new WriterQueue<Integer>(capacity)
|
def queue = new WriterQueue<Integer>(capacity)
|
||||||
def insertionCount = new AtomicInteger(0)
|
def insertionCount = new AtomicInteger(0)
|
||||||
def droppedCount = new AtomicInteger(0)
|
def droppedCount = new AtomicInteger(0)
|
||||||
def getCount = new AtomicInteger(0)
|
def getCount = new AtomicInteger(0)
|
||||||
def numberElements = new AtomicInteger(0)
|
def numberElements = new AtomicInteger(0)
|
||||||
|
|
||||||
phaser_1.register() // global start
|
phaser1.register() // global start
|
||||||
phaser_2.register() // global stop
|
phaser2.register() // global stop
|
||||||
|
|
||||||
// writes
|
// writes
|
||||||
numberThreadsWrites.times {
|
numberThreadsWrites.times {
|
||||||
phaser_1.register()
|
phaser1.register()
|
||||||
Thread.start {
|
Thread.start {
|
||||||
phaser_2.register()
|
phaser2.register()
|
||||||
phaser_1.arriveAndAwaitAdvance()
|
phaser1.arriveAndAwaitAdvance()
|
||||||
numberInsertionsPerThread.times {
|
numberInsertionsPerThread.times {
|
||||||
queue.add(1) != null ? droppedCount.getAndIncrement() : null
|
queue.add(1) != null ? droppedCount.getAndIncrement() : null
|
||||||
insertionCount.getAndIncrement()
|
insertionCount.getAndIncrement()
|
||||||
}
|
}
|
||||||
phaser_2.arriveAndAwaitAdvance()
|
phaser2.arriveAndAwaitAdvance()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// reads
|
// reads
|
||||||
numberThreadsReads.times {
|
numberThreadsReads.times {
|
||||||
phaser_1.register()
|
phaser1.register()
|
||||||
Thread.start {
|
Thread.start {
|
||||||
phaser_2.register()
|
phaser2.register()
|
||||||
phaser_1.arriveAndAwaitAdvance()
|
phaser1.arriveAndAwaitAdvance()
|
||||||
numberGetsPerThread.times {
|
numberGetsPerThread.times {
|
||||||
numberElements.getAndAdd(queue.getAll().size())
|
numberElements.getAndAdd(queue.getAll().size())
|
||||||
getCount.getAndIncrement()
|
getCount.getAndIncrement()
|
||||||
}
|
}
|
||||||
phaser_2.arriveAndAwaitAdvance()
|
phaser2.arriveAndAwaitAdvance()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
when:
|
when:
|
||||||
phaser_1.arriveAndAwaitAdvance() // allow threads to start
|
phaser1.arriveAndAwaitAdvance() // allow threads to start
|
||||||
phaser_2.arriveAndAwaitAdvance() // wait till the job is not finished
|
phaser2.arriveAndAwaitAdvance() // wait till the job is not finished
|
||||||
|
|
||||||
then:
|
then:
|
||||||
insertionCount.get() == numberInsertionsPerThread * numberThreadsWrites
|
insertionCount.get() == numberInsertionsPerThread * numberThreadsWrites
|
||||||
|
|
|
@ -1,65 +0,0 @@
|
||||||
package com.datadoghq.trace.writer.impl;
|
|
||||||
|
|
||||||
import static org.mockito.Mockito.mock;
|
|
||||||
import static org.mockito.Mockito.verify;
|
|
||||||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
|
||||||
import static org.mockito.Mockito.when;
|
|
||||||
|
|
||||||
import com.datadoghq.trace.DDBaseSpan;
|
|
||||||
import com.datadoghq.trace.DDSpan;
|
|
||||||
import com.datadoghq.trace.DDTracer;
|
|
||||||
import com.datadoghq.trace.writer.DDAgentWriter;
|
|
||||||
import com.datadoghq.trace.writer.DDApi;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.Test;
|
|
||||||
|
|
||||||
public class DDAgentWriterTest {
|
|
||||||
|
|
||||||
DDSpan parent = null;
|
|
||||||
DDApi mockedAPI = null;
|
|
||||||
List<List<DDBaseSpan<?>>> traces = new ArrayList<>();
|
|
||||||
DDAgentWriter ddAgentWriter = null;
|
|
||||||
|
|
||||||
@Before
|
|
||||||
public void setUp() throws Exception {
|
|
||||||
//Setup
|
|
||||||
final DDTracer tracer = new DDTracer();
|
|
||||||
|
|
||||||
parent = tracer.buildSpan("hello-world").withServiceName("service-name").startManual();
|
|
||||||
parent.setBaggageItem("a-baggage", "value");
|
|
||||||
|
|
||||||
Thread.sleep(100);
|
|
||||||
|
|
||||||
final DDSpan child = tracer.buildSpan("hello-world").asChildOf(parent).startManual();
|
|
||||||
Thread.sleep(100);
|
|
||||||
|
|
||||||
child.finish();
|
|
||||||
Thread.sleep(100);
|
|
||||||
parent.finish();
|
|
||||||
|
|
||||||
//Create DDWriter
|
|
||||||
traces.add(new ArrayList<>(parent.context().getTrace()));
|
|
||||||
mockedAPI = mock(DDApi.class);
|
|
||||||
when(mockedAPI.sendTraces(traces)).thenReturn(true);
|
|
||||||
ddAgentWriter = new DDAgentWriter(mockedAPI);
|
|
||||||
ddAgentWriter.start();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testWrite() throws Exception {
|
|
||||||
ddAgentWriter.write(new ArrayList<>(parent.context().getTrace()));
|
|
||||||
Thread.sleep(500);
|
|
||||||
verify(mockedAPI).sendTraces(traces);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testClose() throws Exception {
|
|
||||||
ddAgentWriter.close();
|
|
||||||
|
|
||||||
ddAgentWriter.write(new ArrayList<>(parent.context().getTrace()));
|
|
||||||
Thread.sleep(500);
|
|
||||||
verifyNoMoreInteractions(mockedAPI);
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue