From dcac7689fa3ec46bdbee744e36ce51fc3e911650 Mon Sep 17 00:00:00 2001 From: Larry Safran <107004254+larry-safran@users.noreply.github.com> Date: Thu, 21 Jul 2022 11:41:02 -0700 Subject: [PATCH] rls: Change AdaptiveThrottler to use Ticker instead of TimeProvider (#9390) rls: Change AdaptiveThrottler to use Ticker instead of TimeProvider * Use a slot being null to mark invalid rather than relying on the slot's endNanos value. Fixes #9048 --- .../java/io/grpc/rls/AdaptiveThrottler.java | 55 +++++++++--------- .../io/grpc/rls/AdaptiveThrottlerTest.java | 58 ++++++++++++------- 2 files changed, 63 insertions(+), 50 deletions(-) diff --git a/rls/src/main/java/io/grpc/rls/AdaptiveThrottler.java b/rls/src/main/java/io/grpc/rls/AdaptiveThrottler.java index 55f3f72453..ff5789c517 100644 --- a/rls/src/main/java/io/grpc/rls/AdaptiveThrottler.java +++ b/rls/src/main/java/io/grpc/rls/AdaptiveThrottler.java @@ -21,7 +21,7 @@ import static com.google.common.base.Preconditions.checkNotNull; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.MoreObjects; -import io.grpc.internal.TimeProvider; +import com.google.common.base.Ticker; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLongFieldUpdater; @@ -60,7 +60,7 @@ final class AdaptiveThrottler implements Throttler { * is currently accepting. */ private final float ratioForAccepts; - private final TimeProvider timeProvider; + private final Ticker ticker; /** * The number of requests attempted by the client during the Adaptive Throttler instance's * history of calls. This includes requests throttled at the client. The history period defaults @@ -79,10 +79,10 @@ final class AdaptiveThrottler implements Throttler { this.historySeconds = builder.historySeconds; this.requestsPadding = builder.requestsPadding; this.ratioForAccepts = builder.ratioForAccepts; - this.timeProvider = builder.timeProvider; + this.ticker = builder.ticker; long internalNanos = TimeUnit.SECONDS.toNanos(historySeconds); - this.requestStat = new TimeBasedAccumulator(internalNanos, timeProvider); - this.throttledStat = new TimeBasedAccumulator(internalNanos, timeProvider); + this.requestStat = new TimeBasedAccumulator(internalNanos, ticker); + this.throttledStat = new TimeBasedAccumulator(internalNanos, ticker); } @Override @@ -92,7 +92,7 @@ final class AdaptiveThrottler implements Throttler { @VisibleForTesting boolean shouldThrottle(float random) { - long nowNanos = timeProvider.currentTimeNanos(); + long nowNanos = ticker.read(); if (getThrottleProbability(nowNanos) <= random) { return false; } @@ -118,7 +118,7 @@ final class AdaptiveThrottler implements Throttler { @Override public void registerBackendResponse(boolean throttled) { - long now = timeProvider.currentTimeNanos(); + long now = ticker.read(); requestStat.increment(now); if (throttled) { throttledStat.increment(now); @@ -150,7 +150,7 @@ final class AdaptiveThrottler implements Throttler { private float ratioForAccepts = DEFAULT_RATIO_FOR_ACCEPT; private int historySeconds = DEFAULT_HISTORY_SECONDS; private int requestsPadding = DEFAULT_REQUEST_PADDING; - private TimeProvider timeProvider = TimeProvider.SYSTEM_TIME_PROVIDER; + private Ticker ticker = Ticker.systemTicker(); public Builder setRatioForAccepts(float ratioForAccepts) { this.ratioForAccepts = ratioForAccepts; @@ -167,8 +167,8 @@ final class AdaptiveThrottler implements Throttler { return this; } - public Builder setTimeProvider(TimeProvider timeProvider) { - this.timeProvider = checkNotNull(timeProvider, "timeProvider"); + public Builder setTicker(Ticker ticker) { + this.ticker = checkNotNull(ticker, "ticker"); return this; } @@ -205,9 +205,6 @@ final class AdaptiveThrottler implements Throttler { } } - // Represents a slot which is not initialized and is unusable. - private static final Slot NULL_SLOT = new Slot(-1); - /** The array of slots. */ private final AtomicReferenceArray slots = new AtomicReferenceArray<>(NUM_SLOTS); @@ -224,7 +221,7 @@ final class AdaptiveThrottler implements Throttler { */ private volatile int currentIndex; - private final TimeProvider timeProvider; + private final Ticker ticker; /** * Interval constructor. @@ -232,7 +229,7 @@ final class AdaptiveThrottler implements Throttler { * @param internalNanos is the stat interval in nanoseconds * @throws IllegalArgumentException if the supplied interval is too small to be effective */ - TimeBasedAccumulator(long internalNanos, TimeProvider timeProvider) { + TimeBasedAccumulator(long internalNanos, Ticker ticker) { checkArgument( internalNanos >= NUM_SLOTS, "Interval must be greater than %s", @@ -240,30 +237,27 @@ final class AdaptiveThrottler implements Throttler { this.interval = internalNanos; this.slotNanos = internalNanos / NUM_SLOTS; this.currentIndex = 0; - for (int i = 0; i < NUM_SLOTS; i++) { - slots.set(i, NULL_SLOT); - } - this.timeProvider = checkNotNull(timeProvider, "ticker"); + this.ticker = checkNotNull(ticker, "ticker"); } /** Gets the current slot. */ private Slot getSlot(long now) { Slot currentSlot = slots.get(currentIndex); - if (now < currentSlot.endNanos) { + if (currentSlot != null && now - currentSlot.endNanos < 0) { return currentSlot; } else { long slotBoundary = getSlotEndTime(now); synchronized (this) { int index = currentIndex; currentSlot = slots.get(index); - if (now < currentSlot.endNanos) { + if (currentSlot != null && now - currentSlot.endNanos < 0) { return currentSlot; } int newIndex = (index == NUM_SLOTS - 1) ? 0 : index + 1; Slot nextSlot = new Slot(slotBoundary); slots.set(newIndex, nextSlot); // Set currentIndex only after assigning the new slot to slots, otherwise - // racing readers will see NULL_SLOT or an old slot. + // racing readers will see null or an old slot. currentIndex = newIndex; return nextSlot; } @@ -294,7 +288,7 @@ final class AdaptiveThrottler implements Throttler { * * @param now is the time used to increment the count */ - final void increment(long now) { + void increment(long now) { getSlot(now).increment(); } @@ -304,28 +298,33 @@ final class AdaptiveThrottler implements Throttler { * @param now the current time * @return the statistic count */ - final long get(long now) { + long get(long now) { long intervalEnd = getSlotEndTime(now); long intervalStart = intervalEnd - interval; // This is the point at which increments to new slots will be ignored. int index = currentIndex; long accumulated = 0L; - long prevSlotEnd = Long.MAX_VALUE; + Long prevSlotEnd = null; for (int i = 0; i < NUM_SLOTS; i++) { if (index < 0) { index = NUM_SLOTS - 1; } Slot currentSlot = slots.get(index); index--; + if (currentSlot == null) { + continue; + } + long currentSlotEnd = currentSlot.endNanos; - if (currentSlotEnd <= intervalStart || currentSlotEnd > prevSlotEnd) { + if (currentSlotEnd - intervalStart <= 0 + || (prevSlotEnd != null && currentSlotEnd - prevSlotEnd > 0)) { break; } prevSlotEnd = currentSlotEnd; - if (currentSlotEnd > intervalEnd) { + if (currentSlotEnd - intervalEnd > 0) { continue; } accumulated = accumulated + currentSlot.count; @@ -337,7 +336,7 @@ final class AdaptiveThrottler implements Throttler { public String toString() { return MoreObjects.toStringHelper(this) .add("interval", interval) - .add("current_count", get(timeProvider.currentTimeNanos())) + .add("current_count", get(ticker.read())) .toString(); } } diff --git a/rls/src/test/java/io/grpc/rls/AdaptiveThrottlerTest.java b/rls/src/test/java/io/grpc/rls/AdaptiveThrottlerTest.java index 54482e8de6..6852b2479d 100644 --- a/rls/src/test/java/io/grpc/rls/AdaptiveThrottlerTest.java +++ b/rls/src/test/java/io/grpc/rls/AdaptiveThrottlerTest.java @@ -18,8 +18,8 @@ package io.grpc.rls; import static com.google.common.truth.Truth.assertThat; +import com.google.common.base.Ticker; import io.grpc.internal.FakeClock; -import io.grpc.internal.TimeProvider; import java.util.concurrent.TimeUnit; import org.junit.Test; import org.junit.runner.RunWith; @@ -30,13 +30,13 @@ public class AdaptiveThrottlerTest { private static final float TOLERANCE = 0.0001f; private final FakeClock fakeClock = new FakeClock(); - private final TimeProvider fakeTimeProvider = fakeClock.getTimeProvider(); + private final Ticker fakeTicker = fakeClock.getTicker(); private final AdaptiveThrottler throttler = new AdaptiveThrottler.Builder() .setHistorySeconds(1) .setRatioForAccepts(1.0f) .setRequestsPadding(1) - .setTimeProvider(fakeTimeProvider) + .setTicker(fakeTicker) .build(); @Test @@ -44,9 +44,9 @@ public class AdaptiveThrottlerTest { long startTime = fakeClock.currentTimeMillis(); // initial states - assertThat(throttler.requestStat.get(fakeTimeProvider.currentTimeNanos())).isEqualTo(0L); - assertThat(throttler.throttledStat.get(fakeTimeProvider.currentTimeNanos())).isEqualTo(0L); - assertThat(throttler.getThrottleProbability(fakeTimeProvider.currentTimeNanos())) + assertThat(throttler.requestStat.get(fakeTicker.read())).isEqualTo(0L); + assertThat(throttler.throttledStat.get(fakeTicker.read())).isEqualTo(0L); + assertThat(throttler.getThrottleProbability(fakeTicker.read())) .isWithin(TOLERANCE).of(0.0f); // Request 1, allowed by all. @@ -54,10 +54,10 @@ public class AdaptiveThrottlerTest { fakeClock.forwardTime(1L, TimeUnit.MILLISECONDS); throttler.registerBackendResponse(false); - assertThat(throttler.requestStat.get(fakeTimeProvider.currentTimeNanos())) + assertThat(throttler.requestStat.get(fakeTicker.read())) .isEqualTo(1L); - assertThat(throttler.throttledStat.get(fakeTimeProvider.currentTimeNanos())).isEqualTo(0L); - assertThat(throttler.getThrottleProbability(fakeTimeProvider.currentTimeNanos())) + assertThat(throttler.throttledStat.get(fakeTicker.read())).isEqualTo(0L); + assertThat(throttler.getThrottleProbability(fakeTicker.read())) .isWithin(TOLERANCE).of(0.0f); // Request 2, throttled by backend @@ -65,25 +65,26 @@ public class AdaptiveThrottlerTest { fakeClock.forwardTime(1L, TimeUnit.MILLISECONDS); throttler.registerBackendResponse(true); - assertThat(throttler.requestStat.get(fakeTimeProvider.currentTimeNanos())) + assertThat(throttler.requestStat.get(fakeTicker.read())) .isEqualTo(2L); - assertThat(throttler.throttledStat.get(fakeTimeProvider.currentTimeNanos())) + assertThat(throttler.throttledStat.get(fakeTicker.read())) .isEqualTo(1L); - assertThat(throttler.getThrottleProbability(fakeTimeProvider.currentTimeNanos())) + assertThat(throttler.getThrottleProbability(fakeTicker.read())) .isWithin(TOLERANCE) .of(1.0f / 3.0f); // Skip to half second mark from the beginning (half the duration). - fakeClock.forwardTime(500 - (fakeClock.currentTimeMillis() - startTime), TimeUnit.MILLISECONDS); + fakeClock.forwardTime(500 - (fakeClock.currentTimeMillis() - startTime), + TimeUnit.MILLISECONDS); // Request 3, throttled by backend assertThat(throttler.shouldThrottle(0.4f)).isFalse(); fakeClock.forwardTime(1L, TimeUnit.MILLISECONDS); throttler.registerBackendResponse(true); - assertThat(throttler.requestStat.get(fakeTimeProvider.currentTimeNanos())).isEqualTo(3L); - assertThat(throttler.throttledStat.get(fakeTimeProvider.currentTimeNanos())).isEqualTo(2L); - assertThat(throttler.getThrottleProbability(fakeTimeProvider.currentTimeNanos())) + assertThat(throttler.requestStat.get(fakeTicker.read())).isEqualTo(3L); + assertThat(throttler.throttledStat.get(fakeTicker.read())).isEqualTo(2L); + assertThat(throttler.getThrottleProbability(fakeTicker.read())) .isWithin(TOLERANCE) .of(2.0f / 4.0f); @@ -91,9 +92,9 @@ public class AdaptiveThrottlerTest { assertThat(throttler.shouldThrottle(0.4f)).isTrue(); fakeClock.forwardTime(1L, TimeUnit.MILLISECONDS); - assertThat(throttler.requestStat.get(fakeTimeProvider.currentTimeNanos())).isEqualTo(4L); - assertThat(throttler.throttledStat.get(fakeTimeProvider.currentTimeNanos())).isEqualTo(3L); - assertThat(throttler.getThrottleProbability(fakeTimeProvider.currentTimeNanos())) + assertThat(throttler.requestStat.get(fakeTicker.read())).isEqualTo(4L); + assertThat(throttler.throttledStat.get(fakeTicker.read())).isEqualTo(3L); + assertThat(throttler.getThrottleProbability(fakeTicker.read())) .isWithin(TOLERANCE) .of(3.0f / 5.0f); @@ -101,10 +102,23 @@ public class AdaptiveThrottlerTest { fakeClock.forwardTime( 1250 - (fakeClock.currentTimeMillis() - startTime), TimeUnit.MILLISECONDS); - assertThat(throttler.requestStat.get(fakeTimeProvider.currentTimeNanos())).isEqualTo(2L); - assertThat(throttler.throttledStat.get(fakeTimeProvider.currentTimeNanos())).isEqualTo(2L); - assertThat(throttler.getThrottleProbability(fakeTimeProvider.currentTimeNanos())) + assertThat(throttler.requestStat.get(fakeTicker.read())).isEqualTo(2L); + assertThat(throttler.throttledStat.get(fakeTicker.read())).isEqualTo(2L); + assertThat(throttler.getThrottleProbability(fakeTicker.read())) .isWithin(TOLERANCE) .of(2.0f / 3.0f); } + + /** + * Check that when the ticker returns a negative value for now that the slot detection logic + * is correctly handled and then when the value transitions from negative to positive that things + * continue to work correctly. + */ + @Test + public void negativeTickerValues() { + long rewindAmount = TimeUnit.MILLISECONDS.toNanos(300) + fakeClock.getTicker().read(); + fakeClock.forwardTime(-1 * rewindAmount, TimeUnit.NANOSECONDS); + assertThat(fakeClock.getTicker().read()).isEqualTo(TimeUnit.MILLISECONDS.toNanos(-300)); + shouldThrottle(); + } }