diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index 9298accd80..a9fb34ade2 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -307,27 +307,13 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented idleModeTimerFuture; - // Must be used from channelExecutor - @Nullable - private IdleModeTimer idleModeTimer; - // Must be called from channelExecutor private void shutdownNameResolverAndLoadBalancer(boolean verifyActive) { if (verifyActive) { @@ -360,7 +346,7 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented builder, ClientTransportFactory clientTransportFactory, @@ -570,6 +544,7 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented wakeUp; + + Rescheduler( + Runnable r, + Executor serializingExecutor, + ScheduledExecutorService scheduler, + Stopwatch stopwatch) { + this.runnable = r; + this.serializingExecutor = serializingExecutor; + this.scheduler = scheduler; + this.stopwatch = stopwatch; + stopwatch.start(); + } + + /* must be called from the {@link #serializingExecutor} originally passed in. */ + void reschedule(long delay, TimeUnit timeUnit) { + long delayNanos = timeUnit.toNanos(delay); + long newRunAtNanos = nanoTime() + delayNanos; + enabled = true; + if (newRunAtNanos - runAtNanos < 0 || wakeUp == null) { + if (wakeUp != null) { + wakeUp.cancel(false); + } + wakeUp = scheduler.schedule(new FutureRunnable(this), delayNanos, TimeUnit.NANOSECONDS); + } + runAtNanos = newRunAtNanos; + } + + // must be called from channel executor + void cancel(boolean permanent) { + enabled = false; + if (permanent && wakeUp != null) { + wakeUp.cancel(false); + wakeUp = null; + } + } + + private static final class FutureRunnable implements Runnable { + + private final Rescheduler rescheduler; + + FutureRunnable(Rescheduler rescheduler) { + this.rescheduler = rescheduler; + } + + @Override + public void run() { + rescheduler.serializingExecutor.execute(rescheduler.new ChannelFutureRunnable()); + } + } + + private final class ChannelFutureRunnable implements Runnable { + + @Override + public void run() { + if (!enabled) { + wakeUp = null; + return; + } + long now = nanoTime(); + if (runAtNanos - now > 0) { + wakeUp = scheduler.schedule( + new FutureRunnable(Rescheduler.this), runAtNanos - now, TimeUnit.NANOSECONDS); + } else { + enabled = false; + wakeUp = null; + runnable.run(); + } + } + } + + @VisibleForTesting + static boolean isEnabled(Runnable r) { + return ((FutureRunnable) r).rescheduler.enabled; + } + + private long nanoTime() { + return stopwatch.elapsed(TimeUnit.NANOSECONDS); + } +} diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java index ba04974efb..d2631e179d 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplIdlenessTest.java @@ -51,10 +51,12 @@ import io.grpc.MethodDescriptor.MethodType; import io.grpc.NameResolver; import io.grpc.Status; import io.grpc.StringMarshaller; +import io.grpc.internal.FakeClock.ScheduledTask; import io.grpc.internal.TestUtils.MockClientTransportInfo; import java.net.SocketAddress; import java.net.URI; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.concurrent.BlockingQueue; @@ -163,7 +165,10 @@ public class ManagedChannelImplIdlenessTest { @After public void allPendingTasksAreRun() { - assertEquals(timer.getPendingTasks() + " should be empty", 0, timer.numPendingTasks()); + Collection pendingTimerTasks = timer.getPendingTasks(); + for (ScheduledTask a : pendingTimerTasks) { + assertFalse(Rescheduler.isEnabled(a.command)); + } assertEquals(executor.getPendingTasks() + " should be empty", 0, executor.numPendingTasks()); } diff --git a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java index 33abbde160..f5cb01bb01 100644 --- a/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java +++ b/core/src/test/java/io/grpc/internal/ManagedChannelImplTest.java @@ -329,6 +329,7 @@ public class ManagedChannelImplTest { NO_INTERCEPTOR); verify(executorPool).getObject(); verify(executorPool, never()).returnObject(anyObject()); + verify(mockTransportFactory).getScheduledExecutorService(); verifyNoMoreInteractions(mockTransportFactory); channel.shutdown(); assertTrue(channel.isShutdown()); diff --git a/core/src/test/java/io/grpc/internal/ReschedulerTest.java b/core/src/test/java/io/grpc/internal/ReschedulerTest.java new file mode 100644 index 0000000000..4eef0677e3 --- /dev/null +++ b/core/src/test/java/io/grpc/internal/ReschedulerTest.java @@ -0,0 +1,133 @@ +/* + * Copyright 2018, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.internal; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link Rescheduler}. + */ +@RunWith(JUnit4.class) +public class ReschedulerTest { + + private final Runner runner = new Runner(); + private final Exec exec = new Exec(); + private final FakeClock scheduler = new FakeClock(); + private final Rescheduler rescheduler = new Rescheduler( + runner, + exec, + scheduler.getScheduledExecutorService(), + scheduler.getStopwatchSupplier().get()); + + @Test + public void runs() { + assertFalse(runner.ran); + rescheduler.reschedule(1, TimeUnit.NANOSECONDS); + assertFalse(runner.ran); + + scheduler.forwardNanos(1); + + assertTrue(runner.ran); + } + + @Test + public void cancels() { + assertFalse(runner.ran); + rescheduler.reschedule(1, TimeUnit.NANOSECONDS); + assertFalse(runner.ran); + rescheduler.cancel(/* permanent= */ false); + + scheduler.forwardNanos(1); + + assertFalse(runner.ran); + assertTrue(exec.executed); + } + + @Test + public void cancelPermanently() { + assertFalse(runner.ran); + rescheduler.reschedule(1, TimeUnit.NANOSECONDS); + assertFalse(runner.ran); + rescheduler.cancel(/* permanent= */ true); + + scheduler.forwardNanos(1); + + assertFalse(runner.ran); + assertFalse(exec.executed); + } + + @Test + public void reschedules() { + assertFalse(runner.ran); + rescheduler.reschedule(1, TimeUnit.NANOSECONDS); + assertFalse(runner.ran); + assertFalse(exec.executed); + rescheduler.reschedule(50, TimeUnit.NANOSECONDS); + assertFalse(runner.ran); + assertFalse(exec.executed); + + scheduler.forwardNanos(1); + assertFalse(runner.ran); + assertTrue(exec.executed); + + scheduler.forwardNanos(50); + + assertTrue(runner.ran); + } + + @Test + public void reschedulesShortDelay() { + assertFalse(runner.ran); + rescheduler.reschedule(50, TimeUnit.NANOSECONDS); + assertFalse(runner.ran); + assertFalse(exec.executed); + rescheduler.reschedule(1, TimeUnit.NANOSECONDS); + assertFalse(runner.ran); + assertFalse(exec.executed); + + scheduler.forwardNanos(1); + assertTrue(runner.ran); + assertTrue(exec.executed); + } + + private static final class Exec implements Executor { + boolean executed; + + @Override + public void execute(Runnable command) { + executed = true; + + command.run(); + } + } + + private static final class Runner implements Runnable { + boolean ran; + + @Override + public void run() { + ran = true; + } + } +}