From 9ed84258aa3ead42747ce87fa5a7b428bda72626 Mon Sep 17 00:00:00 2001 From: Carl Mastrangelo Date: Wed, 4 Apr 2018 16:02:02 -0700 Subject: [PATCH] core: don't reschedule idle timer if it is already active Benchmark results (3 runs each) non direct ``` Before: 50.0%ile Latency (in nanos): 157471 90.0%ile Latency (in nanos): 185927 95.0%ile Latency (in nanos): 195135 99.0%ile Latency (in nanos): 218815 99.9%ile Latency (in nanos): 1188735 100.0%ile Latency (in nanos): 18333695 QPS: 6126 50.0%ile Latency (in nanos): 160407 90.0%ile Latency (in nanos): 188551 95.0%ile Latency (in nanos): 197487 99.0%ile Latency (in nanos): 219575 99.9%ile Latency (in nanos): 390239 100.0%ile Latency (in nanos): 18338815 QPS: 6106 50.0%ile Latency (in nanos): 157831 90.0%ile Latency (in nanos): 186439 95.0%ile Latency (in nanos): 195815 99.0%ile Latency (in nanos): 216951 99.9%ile Latency (in nanos): 281167 100.0%ile Latency (in nanos): 5384447 QPS: 6235 After: 50.0%ile Latency (in nanos): 152255 90.0%ile Latency (in nanos): 180551 95.0%ile Latency (in nanos): 188943 99.0%ile Latency (in nanos): 209623 99.9%ile Latency (in nanos): 1184831 100.0%ile Latency (in nanos): 4351999 QPS: 6313 50.0%ile Latency (in nanos): 153663 90.0%ile Latency (in nanos): 181671 95.0%ile Latency (in nanos): 189991 99.0%ile Latency (in nanos): 210495 99.9%ile Latency (in nanos): 278895 100.0%ile Latency (in nanos): 18283519 QPS: 6300 50.0%ile Latency (in nanos): 152767 90.0%ile Latency (in nanos): 180839 95.0%ile Latency (in nanos): 189791 99.0%ile Latency (in nanos): 211719 99.9%ile Latency (in nanos): 280927 100.0%ile Latency (in nanos): 12231167 QPS: 6381 ``` direct: ``` Before: 50.0%ile Latency (in nanos): 133943 90.0%ile Latency (in nanos): 153671 95.0%ile Latency (in nanos): 163655 99.0%ile Latency (in nanos): 188871 99.9%ile Latency (in nanos): 235791 100.0%ile Latency (in nanos): 7864575 QPS: 7134 50.0%ile Latency (in nanos): 131623 90.0%ile Latency (in nanos): 151863 95.0%ile Latency (in nanos): 162095 99.0%ile Latency (in nanos): 187719 99.9%ile Latency (in nanos): 234983 100.0%ile Latency (in nanos): 17836031 QPS: 7250 50.0%ile Latency (in nanos): 131223 90.0%ile Latency (in nanos): 150823 95.0%ile Latency (in nanos): 161311 99.0%ile Latency (in nanos): 187719 99.9%ile Latency (in nanos): 237471 100.0%ile Latency (in nanos): 4416255 QPS: 7273 After: 50.0%ile Latency (in nanos): 122751 90.0%ile Latency (in nanos): 140967 95.0%ile Latency (in nanos): 148911 99.0%ile Latency (in nanos): 173215 99.9%ile Latency (in nanos): 214823 100.0%ile Latency (in nanos): 18509823 QPS: 7774 50.0%ile Latency (in nanos): 124507 90.0%ile Latency (in nanos): 145855 95.0%ile Latency (in nanos): 156623 99.0%ile Latency (in nanos): 183111 99.9%ile Latency (in nanos): 235679 100.0%ile Latency (in nanos): 18289663 QPS: 7625 50.0%ile Latency (in nanos): 124295 90.0%ile Latency (in nanos): 145071 95.0%ile Latency (in nanos): 156439 99.0%ile Latency (in nanos): 183919 99.9%ile Latency (in nanos): 232447 100.0%ile Latency (in nanos): 3712383 QPS: 7632 ``` --- .../io/grpc/internal/ManagedChannelImpl.java | 60 ++++---- .../java/io/grpc/internal/Rescheduler.java | 119 ++++++++++++++++ .../ManagedChannelImplIdlenessTest.java | 7 +- .../grpc/internal/ManagedChannelImplTest.java | 1 + .../io/grpc/internal/ReschedulerTest.java | 133 ++++++++++++++++++ 5 files changed, 284 insertions(+), 36 deletions(-) create mode 100644 core/src/main/java/io/grpc/internal/Rescheduler.java create mode 100644 core/src/test/java/io/grpc/internal/ReschedulerTest.java diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index 9298accd80..a9fb34ade2 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -307,27 +307,13 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented idleModeTimerFuture; - // Must be used from channelExecutor - @Nullable - private IdleModeTimer idleModeTimer; - // Must be called from channelExecutor private void shutdownNameResolverAndLoadBalancer(boolean verifyActive) { if (verifyActive) { @@ -360,7 +346,7 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented builder, ClientTransportFactory clientTransportFactory, @@ -570,6 +544,7 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented wakeUp; + + Rescheduler( + Runnable r, + Executor serializingExecutor, + ScheduledExecutorService scheduler, + Stopwatch stopwatch) { + this.runnable = r; + this.serializingExecutor = serializingExecutor; + this.scheduler = scheduler; + this.stopwatch = stopwatch; + stopwatch.start(); + } + + /* must be called from the {@link #serializingExecutor} originally passed in. */ + void reschedule(long delay, TimeUnit timeUnit) { + long delayNanos = timeUnit.toNanos(delay); + long newRunAtNanos = nanoTime() + delayNanos; + enabled = true; + if (newRunAtNanos - runAtNanos < 0 || wakeUp == null) { + if (wakeUp != null) { + wakeUp.cancel(false); + } + wakeUp = scheduler.schedule(new FutureRunnable(this), delayNanos, TimeUnit.NANOSECONDS); + } + runAtNanos = newRunAtNanos; + } + + // must be called from channel executor + void cancel(boolean permanent) { + enabled = false; + if (permanent && wakeUp != null) { + wakeUp.cancel(false); + wakeUp = null; + } + } + + private static final class FutureRunnable implements Runnable { + + private final Rescheduler rescheduler; + + FutureRunnable(Rescheduler rescheduler) { + this.rescheduler = rescheduler; + } + + @Override + public void run() { + rescheduler.serializingExecutor.execute(rescheduler.new ChannelFutureRunnable()); + } + } + + private final class ChannelFutureRunnable implements Runnable { + + @Override + public void run() { + if (!enabled) { + wakeUp = null; + return; + } + long now = nanoTime(); + if (runAtNanos - now > 0) { + wakeUp = scheduler.schedule( + new FutureRunnable(Rescheduler.this), runAtNanos - now, TimeUnit.NANOSECONDS); + } else { + enabled = false; + wakeUp = null; + runnable.run(); + } + } + } + + @VisibleForTesting + static boolean isEnabled(Runnable r) { + return ((FutureRunnable) r).rescheduler.enabled; + } + + private long nanoTime() { + return stopwatch.elapsed(TimeUnit.NANOSECONDS); + } +} diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java index ba04974efb..d2631e179d 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java @@ -51,10 +51,12 @@ import io.grpc.MethodDescriptor.MethodType; import io.grpc.NameResolver; import io.grpc.Status; import io.grpc.StringMarshaller; +import io.grpc.internal.FakeClock.ScheduledTask; import io.grpc.internal.TestUtils.MockClientTransportInfo; import java.net.SocketAddress; import java.net.URI; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.concurrent.BlockingQueue; @@ -163,7 +165,10 @@ public class ManagedChannelImplIdlenessTest { @After public void allPendingTasksAreRun() { - assertEquals(timer.getPendingTasks() + " should be empty", 0, timer.numPendingTasks()); + Collection pendingTimerTasks = timer.getPendingTasks(); + for (ScheduledTask a : pendingTimerTasks) { + assertFalse(Rescheduler.isEnabled(a.command)); + } assertEquals(executor.getPendingTasks() + " should be empty", 0, executor.numPendingTasks()); } diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index 33abbde160..f5cb01bb01 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -329,6 +329,7 @@ public class ManagedChannelImplTest { NO_INTERCEPTOR); verify(executorPool).getObject(); verify(executorPool, never()).returnObject(anyObject()); + verify(mockTransportFactory).getScheduledExecutorService(); verifyNoMoreInteractions(mockTransportFactory); channel.shutdown(); assertTrue(channel.isShutdown()); diff --git a/core/src/test/java/io/grpc/internal/ReschedulerTest.java b/core/src/test/java/io/grpc/internal/ReschedulerTest.java new file mode 100644 index 0000000000..4eef0677e3 --- /dev/null +++ b/core/src/test/java/io/grpc/internal/ReschedulerTest.java @@ -0,0 +1,133 @@ +/* + * Copyright 2018, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.internal; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link Rescheduler}. + */ +@RunWith(JUnit4.class) +public class ReschedulerTest { + + private final Runner runner = new Runner(); + private final Exec exec = new Exec(); + private final FakeClock scheduler = new FakeClock(); + private final Rescheduler rescheduler = new Rescheduler( + runner, + exec, + scheduler.getScheduledExecutorService(), + scheduler.getStopwatchSupplier().get()); + + @Test + public void runs() { + assertFalse(runner.ran); + rescheduler.reschedule(1, TimeUnit.NANOSECONDS); + assertFalse(runner.ran); + + scheduler.forwardNanos(1); + + assertTrue(runner.ran); + } + + @Test + public void cancels() { + assertFalse(runner.ran); + rescheduler.reschedule(1, TimeUnit.NANOSECONDS); + assertFalse(runner.ran); + rescheduler.cancel(/* permanent= */ false); + + scheduler.forwardNanos(1); + + assertFalse(runner.ran); + assertTrue(exec.executed); + } + + @Test + public void cancelPermanently() { + assertFalse(runner.ran); + rescheduler.reschedule(1, TimeUnit.NANOSECONDS); + assertFalse(runner.ran); + rescheduler.cancel(/* permanent= */ true); + + scheduler.forwardNanos(1); + + assertFalse(runner.ran); + assertFalse(exec.executed); + } + + @Test + public void reschedules() { + assertFalse(runner.ran); + rescheduler.reschedule(1, TimeUnit.NANOSECONDS); + assertFalse(runner.ran); + assertFalse(exec.executed); + rescheduler.reschedule(50, TimeUnit.NANOSECONDS); + assertFalse(runner.ran); + assertFalse(exec.executed); + + scheduler.forwardNanos(1); + assertFalse(runner.ran); + assertTrue(exec.executed); + + scheduler.forwardNanos(50); + + assertTrue(runner.ran); + } + + @Test + public void reschedulesShortDelay() { + assertFalse(runner.ran); + rescheduler.reschedule(50, TimeUnit.NANOSECONDS); + assertFalse(runner.ran); + assertFalse(exec.executed); + rescheduler.reschedule(1, TimeUnit.NANOSECONDS); + assertFalse(runner.ran); + assertFalse(exec.executed); + + scheduler.forwardNanos(1); + assertTrue(runner.ran); + assertTrue(exec.executed); + } + + private static final class Exec implements Executor { + boolean executed; + + @Override + public void execute(Runnable command) { + executed = true; + + command.run(); + } + } + + private static final class Runner implements Runnable { + boolean ran; + + @Override + public void run() { + ran = true; + } + } +}