From 4a2e12c3485f8cf409ed70e322430083c19e4afc Mon Sep 17 00:00:00 2001 From: Guillaume Polaert Date: Mon, 14 Aug 2017 14:00:01 +0200 Subject: [PATCH] Test concurrency --- .../trace/writer/WriterQueueTest.groovy | 102 ++++++++++++++++++ 1 file changed, 102 insertions(+) diff --git a/dd-trace/src/test/groovy/com/datadoghq/trace/writer/WriterQueueTest.groovy b/dd-trace/src/test/groovy/com/datadoghq/trace/writer/WriterQueueTest.groovy index db703b895b..b5fd656863 100644 --- a/dd-trace/src/test/groovy/com/datadoghq/trace/writer/WriterQueueTest.groovy +++ b/dd-trace/src/test/groovy/com/datadoghq/trace/writer/WriterQueueTest.groovy @@ -2,6 +2,9 @@ package com.datadoghq.trace.writer import spock.lang.Specification +import java.util.concurrent.Phaser +import java.util.concurrent.atomic.AtomicInteger + class WriterQueueTest extends Specification { def "instantiate a empty queue throws an exception"() { @@ -78,4 +81,103 @@ class WriterQueueTest extends Specification { } + def "check concurrency on writes"() { + setup: + + def phaser_1 = new Phaser() + def phaser_2 = new Phaser() + def queue = new WriterQueue(capacity) + def insertionCount = new AtomicInteger(0) + + phaser_1.register() // global start + phaser_2.register() // global stop + + numberThreads.times { + phaser_1.register() + Thread.start { + phaser_2.register() + phaser_1.arriveAndAwaitAdvance() + numberInsertionsPerThread.times { + queue.add(1) + insertionCount.getAndIncrement() + } + phaser_2.arriveAndAwaitAdvance() + } + } + + when: + phaser_1.arriveAndAwaitAdvance() // allow threads to start + phaser_2.arriveAndAwaitAdvance() // wait till the job is not finished + + then: + queue.size() == capacity + insertionCount.get() == numberInsertionsPerThread * numberThreads + + where: + capacity = 100 + numberThreads << [1, 10, 100] + numberInsertionsPerThread = 100 + + } + + + def "check concurrency on writes and reads"() { + setup: + def phaser_1 = new Phaser() + def phaser_2 = new Phaser() + def queue = new WriterQueue(capacity) + def insertionCount = new AtomicInteger(0) + def droppedCount = new AtomicInteger(0) + def getCount = new AtomicInteger(0) + def numberElements = new AtomicInteger(0) + + phaser_1.register() // global start + phaser_2.register() // global stop + + // writes + numberThreadsWrites.times { + phaser_1.register() + Thread.start { + phaser_2.register() + phaser_1.arriveAndAwaitAdvance() + numberInsertionsPerThread.times { + queue.add(1) != null ? droppedCount.getAndIncrement() : null + insertionCount.getAndIncrement() + } + phaser_2.arriveAndAwaitAdvance() + } + } + + // reads + numberThreadsReads.times { + phaser_1.register() + Thread.start { + phaser_2.register() + phaser_1.arriveAndAwaitAdvance() + numberGetsPerThread.times { + numberElements.getAndAdd(queue.getAll().size()) + getCount.getAndIncrement() + } + phaser_2.arriveAndAwaitAdvance() + } + } + + when: + phaser_1.arriveAndAwaitAdvance() // allow threads to start + phaser_2.arriveAndAwaitAdvance() // wait till the job is not finished + + then: + insertionCount.get() == numberInsertionsPerThread * numberThreadsWrites + getCount.get() == numberGetsPerThread * numberThreadsReads + insertionCount.get() == numberElements + queue.size() + droppedCount + + where: + capacity = 100 + numberThreadsWrites << [1, 10, 100] + numberThreadsReads << [1, 5, 10] + numberInsertionsPerThread = 100 + numberGetsPerThread = 5 + + } + }