mirror of https://github.com/grpc/grpc-java.git
core: don't reschedule idle timer if it is already active
Benchmark results (3 runs each) non direct ``` Before: 50.0%ile Latency (in nanos): 157471 90.0%ile Latency (in nanos): 185927 95.0%ile Latency (in nanos): 195135 99.0%ile Latency (in nanos): 218815 99.9%ile Latency (in nanos): 1188735 100.0%ile Latency (in nanos): 18333695 QPS: 6126 50.0%ile Latency (in nanos): 160407 90.0%ile Latency (in nanos): 188551 95.0%ile Latency (in nanos): 197487 99.0%ile Latency (in nanos): 219575 99.9%ile Latency (in nanos): 390239 100.0%ile Latency (in nanos): 18338815 QPS: 6106 50.0%ile Latency (in nanos): 157831 90.0%ile Latency (in nanos): 186439 95.0%ile Latency (in nanos): 195815 99.0%ile Latency (in nanos): 216951 99.9%ile Latency (in nanos): 281167 100.0%ile Latency (in nanos): 5384447 QPS: 6235 After: 50.0%ile Latency (in nanos): 152255 90.0%ile Latency (in nanos): 180551 95.0%ile Latency (in nanos): 188943 99.0%ile Latency (in nanos): 209623 99.9%ile Latency (in nanos): 1184831 100.0%ile Latency (in nanos): 4351999 QPS: 6313 50.0%ile Latency (in nanos): 153663 90.0%ile Latency (in nanos): 181671 95.0%ile Latency (in nanos): 189991 99.0%ile Latency (in nanos): 210495 99.9%ile Latency (in nanos): 278895 100.0%ile Latency (in nanos): 18283519 QPS: 6300 50.0%ile Latency (in nanos): 152767 90.0%ile Latency (in nanos): 180839 95.0%ile Latency (in nanos): 189791 99.0%ile Latency (in nanos): 211719 99.9%ile Latency (in nanos): 280927 100.0%ile Latency (in nanos): 12231167 QPS: 6381 ``` direct: ``` Before: 50.0%ile Latency (in nanos): 133943 90.0%ile Latency (in nanos): 153671 95.0%ile Latency (in nanos): 163655 99.0%ile Latency (in nanos): 188871 99.9%ile Latency (in nanos): 235791 100.0%ile Latency (in nanos): 7864575 QPS: 7134 50.0%ile Latency (in nanos): 131623 90.0%ile Latency (in nanos): 151863 95.0%ile Latency (in nanos): 162095 99.0%ile Latency (in nanos): 187719 99.9%ile Latency (in nanos): 234983 100.0%ile Latency (in nanos): 17836031 QPS: 7250 50.0%ile Latency (in nanos): 131223 90.0%ile Latency (in nanos): 150823 95.0%ile Latency (in nanos): 161311 99.0%ile Latency (in nanos): 187719 99.9%ile Latency (in nanos): 237471 100.0%ile Latency (in nanos): 4416255 QPS: 7273 After: 50.0%ile Latency (in nanos): 122751 90.0%ile Latency (in nanos): 140967 95.0%ile Latency (in nanos): 148911 99.0%ile Latency (in nanos): 173215 99.9%ile Latency (in nanos): 214823 100.0%ile Latency (in nanos): 18509823 QPS: 7774 50.0%ile Latency (in nanos): 124507 90.0%ile Latency (in nanos): 145855 95.0%ile Latency (in nanos): 156623 99.0%ile Latency (in nanos): 183111 99.9%ile Latency (in nanos): 235679 100.0%ile Latency (in nanos): 18289663 QPS: 7625 50.0%ile Latency (in nanos): 124295 90.0%ile Latency (in nanos): 145071 95.0%ile Latency (in nanos): 156439 99.0%ile Latency (in nanos): 183919 99.9%ile Latency (in nanos): 232447 100.0%ile Latency (in nanos): 3712383 QPS: 7632 ```
This commit is contained in:
parent
276586a4fb
commit
9ed84258aa
|
|
@ -307,27 +307,13 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented<Ch
|
|||
|
||||
// Run from channelExecutor
|
||||
private class IdleModeTimer implements Runnable {
|
||||
// Only mutated from channelExecutor
|
||||
boolean cancelled;
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
if (cancelled) {
|
||||
// Race detected: this task was scheduled on channelExecutor before cancelIdleTimer()
|
||||
// could cancel the timer.
|
||||
return;
|
||||
}
|
||||
enterIdleMode();
|
||||
}
|
||||
}
|
||||
|
||||
// Must be used from channelExecutor
|
||||
@Nullable
|
||||
private ScheduledFuture<?> idleModeTimerFuture;
|
||||
// Must be used from channelExecutor
|
||||
@Nullable
|
||||
private IdleModeTimer idleModeTimer;
|
||||
|
||||
// Must be called from channelExecutor
|
||||
private void shutdownNameResolverAndLoadBalancer(boolean verifyActive) {
|
||||
if (verifyActive) {
|
||||
|
|
@ -360,7 +346,7 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented<Ch
|
|||
if (inUseStateAggregator.isInUse()) {
|
||||
// Cancel the timer now, so that a racing due timer will not put Channel on idleness
|
||||
// when the caller of exitIdleMode() is about to use the returned loadBalancer.
|
||||
cancelIdleTimer();
|
||||
cancelIdleTimer(false);
|
||||
} else {
|
||||
// exitIdleMode() may be called outside of inUseStateAggregator.handleNotInUse() while
|
||||
// isInUse() == false, in which case we still need to schedule the timer.
|
||||
|
|
@ -396,13 +382,8 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented<Ch
|
|||
}
|
||||
|
||||
// Must be run from channelExecutor
|
||||
private void cancelIdleTimer() {
|
||||
if (idleModeTimerFuture != null) {
|
||||
idleModeTimerFuture.cancel(false);
|
||||
idleModeTimer.cancelled = true;
|
||||
idleModeTimerFuture = null;
|
||||
idleModeTimer = null;
|
||||
}
|
||||
private void cancelIdleTimer(boolean permanent) {
|
||||
idleTimer.cancel(permanent);
|
||||
}
|
||||
|
||||
// Always run from channelExecutor
|
||||
|
|
@ -410,16 +391,7 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented<Ch
|
|||
if (idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
|
||||
return;
|
||||
}
|
||||
cancelIdleTimer();
|
||||
idleModeTimer = new IdleModeTimer();
|
||||
idleModeTimerFuture = transportFactory.getScheduledExecutorService().schedule(
|
||||
new LogExceptionRunnable(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
channelExecutor.executeLater(idleModeTimer).drain();
|
||||
}
|
||||
}),
|
||||
idleTimeoutMillis, TimeUnit.MILLISECONDS);
|
||||
idleTimer.reschedule(idleTimeoutMillis, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
// Run from channelExecutor
|
||||
|
|
@ -537,6 +509,8 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented<Ch
|
|||
}
|
||||
};
|
||||
|
||||
private final Rescheduler idleTimer;
|
||||
|
||||
ManagedChannelImpl(
|
||||
AbstractManagedChannelImplBuilder<?> builder,
|
||||
ClientTransportFactory clientTransportFactory,
|
||||
|
|
@ -570,6 +544,7 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented<Ch
|
|||
this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
|
||||
if (builder.idleTimeoutMillis == IDLE_TIMEOUT_MILLIS_DISABLE) {
|
||||
this.idleTimeoutMillis = builder.idleTimeoutMillis;
|
||||
|
||||
} else {
|
||||
checkArgument(
|
||||
builder.idleTimeoutMillis
|
||||
|
|
@ -577,6 +552,21 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented<Ch
|
|||
"invalid idleTimeoutMillis %s", builder.idleTimeoutMillis);
|
||||
this.idleTimeoutMillis = builder.idleTimeoutMillis;
|
||||
}
|
||||
|
||||
final class AutoDrainChannelExecutor implements Executor {
|
||||
|
||||
@Override
|
||||
public void execute(Runnable command) {
|
||||
channelExecutor.executeLater(command);
|
||||
channelExecutor.drain();
|
||||
}
|
||||
}
|
||||
|
||||
idleTimer = new Rescheduler(
|
||||
new IdleModeTimer(),
|
||||
new AutoDrainChannelExecutor(),
|
||||
transportFactory.getScheduledExecutorService(),
|
||||
stopwatchSupplier.get());
|
||||
this.fullStreamDecompression = builder.fullStreamDecompression;
|
||||
this.decompressorRegistry = checkNotNull(builder.decompressorRegistry, "decompressorRegistry");
|
||||
this.compressorRegistry = checkNotNull(builder.compressorRegistry, "compressorRegistry");
|
||||
|
|
@ -666,7 +656,7 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented<Ch
|
|||
channelExecutor.executeLater(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
cancelIdleTimer();
|
||||
cancelIdleTimer(/* permanent= */ true);
|
||||
}
|
||||
}).drain();
|
||||
logger.log(Level.FINE, "[{0}] Shutting down", getLogId());
|
||||
|
|
@ -704,7 +694,7 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented<Ch
|
|||
return;
|
||||
}
|
||||
panicMode = true;
|
||||
cancelIdleTimer();
|
||||
cancelIdleTimer(/* permanent= */ true);
|
||||
shutdownNameResolverAndLoadBalancer(false);
|
||||
SubchannelPicker newPicker = new SubchannelPicker() {
|
||||
final PickResult panicPickResult =
|
||||
|
|
@ -868,7 +858,7 @@ final class ManagedChannelImpl extends ManagedChannel implements Instrumented<Ch
|
|||
if (shutdown.get() || lbHelper == null) {
|
||||
return;
|
||||
}
|
||||
cancelIdleTimer();
|
||||
cancelIdleTimer(/* permanent= */ false);
|
||||
enterIdleMode();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,119 @@
|
|||
/*
|
||||
* Copyright 2018, gRPC Authors All rights reserved.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.grpc.internal;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Stopwatch;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/**
|
||||
* Reschedules a runnable lazily.
|
||||
*/
|
||||
final class Rescheduler {
|
||||
|
||||
// deps
|
||||
private final ScheduledExecutorService scheduler;
|
||||
private final Executor serializingExecutor;
|
||||
private final Runnable runnable;
|
||||
|
||||
// state
|
||||
private final Stopwatch stopwatch;
|
||||
private long runAtNanos;
|
||||
private boolean enabled;
|
||||
private ScheduledFuture<?> wakeUp;
|
||||
|
||||
Rescheduler(
|
||||
Runnable r,
|
||||
Executor serializingExecutor,
|
||||
ScheduledExecutorService scheduler,
|
||||
Stopwatch stopwatch) {
|
||||
this.runnable = r;
|
||||
this.serializingExecutor = serializingExecutor;
|
||||
this.scheduler = scheduler;
|
||||
this.stopwatch = stopwatch;
|
||||
stopwatch.start();
|
||||
}
|
||||
|
||||
/* must be called from the {@link #serializingExecutor} originally passed in. */
|
||||
void reschedule(long delay, TimeUnit timeUnit) {
|
||||
long delayNanos = timeUnit.toNanos(delay);
|
||||
long newRunAtNanos = nanoTime() + delayNanos;
|
||||
enabled = true;
|
||||
if (newRunAtNanos - runAtNanos < 0 || wakeUp == null) {
|
||||
if (wakeUp != null) {
|
||||
wakeUp.cancel(false);
|
||||
}
|
||||
wakeUp = scheduler.schedule(new FutureRunnable(this), delayNanos, TimeUnit.NANOSECONDS);
|
||||
}
|
||||
runAtNanos = newRunAtNanos;
|
||||
}
|
||||
|
||||
// must be called from channel executor
|
||||
void cancel(boolean permanent) {
|
||||
enabled = false;
|
||||
if (permanent && wakeUp != null) {
|
||||
wakeUp.cancel(false);
|
||||
wakeUp = null;
|
||||
}
|
||||
}
|
||||
|
||||
private static final class FutureRunnable implements Runnable {
|
||||
|
||||
private final Rescheduler rescheduler;
|
||||
|
||||
FutureRunnable(Rescheduler rescheduler) {
|
||||
this.rescheduler = rescheduler;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
rescheduler.serializingExecutor.execute(rescheduler.new ChannelFutureRunnable());
|
||||
}
|
||||
}
|
||||
|
||||
private final class ChannelFutureRunnable implements Runnable {
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
if (!enabled) {
|
||||
wakeUp = null;
|
||||
return;
|
||||
}
|
||||
long now = nanoTime();
|
||||
if (runAtNanos - now > 0) {
|
||||
wakeUp = scheduler.schedule(
|
||||
new FutureRunnable(Rescheduler.this), runAtNanos - now, TimeUnit.NANOSECONDS);
|
||||
} else {
|
||||
enabled = false;
|
||||
wakeUp = null;
|
||||
runnable.run();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
static boolean isEnabled(Runnable r) {
|
||||
return ((FutureRunnable) r).rescheduler.enabled;
|
||||
}
|
||||
|
||||
private long nanoTime() {
|
||||
return stopwatch.elapsed(TimeUnit.NANOSECONDS);
|
||||
}
|
||||
}
|
||||
|
|
@ -51,10 +51,12 @@ import io.grpc.MethodDescriptor.MethodType;
|
|||
import io.grpc.NameResolver;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.StringMarshaller;
|
||||
import io.grpc.internal.FakeClock.ScheduledTask;
|
||||
import io.grpc.internal.TestUtils.MockClientTransportInfo;
|
||||
import java.net.SocketAddress;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
|
|
@ -163,7 +165,10 @@ public class ManagedChannelImplIdlenessTest {
|
|||
|
||||
@After
|
||||
public void allPendingTasksAreRun() {
|
||||
assertEquals(timer.getPendingTasks() + " should be empty", 0, timer.numPendingTasks());
|
||||
Collection<ScheduledTask> pendingTimerTasks = timer.getPendingTasks();
|
||||
for (ScheduledTask a : pendingTimerTasks) {
|
||||
assertFalse(Rescheduler.isEnabled(a.command));
|
||||
}
|
||||
assertEquals(executor.getPendingTasks() + " should be empty", 0, executor.numPendingTasks());
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -329,6 +329,7 @@ public class ManagedChannelImplTest {
|
|||
NO_INTERCEPTOR);
|
||||
verify(executorPool).getObject();
|
||||
verify(executorPool, never()).returnObject(anyObject());
|
||||
verify(mockTransportFactory).getScheduledExecutorService();
|
||||
verifyNoMoreInteractions(mockTransportFactory);
|
||||
channel.shutdown();
|
||||
assertTrue(channel.isShutdown());
|
||||
|
|
|
|||
|
|
@ -0,0 +1,133 @@
|
|||
/*
|
||||
* Copyright 2018, gRPC Authors All rights reserved.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package io.grpc.internal;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.JUnit4;
|
||||
|
||||
/**
|
||||
* Tests for {@link Rescheduler}.
|
||||
*/
|
||||
@RunWith(JUnit4.class)
|
||||
public class ReschedulerTest {
|
||||
|
||||
private final Runner runner = new Runner();
|
||||
private final Exec exec = new Exec();
|
||||
private final FakeClock scheduler = new FakeClock();
|
||||
private final Rescheduler rescheduler = new Rescheduler(
|
||||
runner,
|
||||
exec,
|
||||
scheduler.getScheduledExecutorService(),
|
||||
scheduler.getStopwatchSupplier().get());
|
||||
|
||||
@Test
|
||||
public void runs() {
|
||||
assertFalse(runner.ran);
|
||||
rescheduler.reschedule(1, TimeUnit.NANOSECONDS);
|
||||
assertFalse(runner.ran);
|
||||
|
||||
scheduler.forwardNanos(1);
|
||||
|
||||
assertTrue(runner.ran);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void cancels() {
|
||||
assertFalse(runner.ran);
|
||||
rescheduler.reschedule(1, TimeUnit.NANOSECONDS);
|
||||
assertFalse(runner.ran);
|
||||
rescheduler.cancel(/* permanent= */ false);
|
||||
|
||||
scheduler.forwardNanos(1);
|
||||
|
||||
assertFalse(runner.ran);
|
||||
assertTrue(exec.executed);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void cancelPermanently() {
|
||||
assertFalse(runner.ran);
|
||||
rescheduler.reschedule(1, TimeUnit.NANOSECONDS);
|
||||
assertFalse(runner.ran);
|
||||
rescheduler.cancel(/* permanent= */ true);
|
||||
|
||||
scheduler.forwardNanos(1);
|
||||
|
||||
assertFalse(runner.ran);
|
||||
assertFalse(exec.executed);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void reschedules() {
|
||||
assertFalse(runner.ran);
|
||||
rescheduler.reschedule(1, TimeUnit.NANOSECONDS);
|
||||
assertFalse(runner.ran);
|
||||
assertFalse(exec.executed);
|
||||
rescheduler.reschedule(50, TimeUnit.NANOSECONDS);
|
||||
assertFalse(runner.ran);
|
||||
assertFalse(exec.executed);
|
||||
|
||||
scheduler.forwardNanos(1);
|
||||
assertFalse(runner.ran);
|
||||
assertTrue(exec.executed);
|
||||
|
||||
scheduler.forwardNanos(50);
|
||||
|
||||
assertTrue(runner.ran);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void reschedulesShortDelay() {
|
||||
assertFalse(runner.ran);
|
||||
rescheduler.reschedule(50, TimeUnit.NANOSECONDS);
|
||||
assertFalse(runner.ran);
|
||||
assertFalse(exec.executed);
|
||||
rescheduler.reschedule(1, TimeUnit.NANOSECONDS);
|
||||
assertFalse(runner.ran);
|
||||
assertFalse(exec.executed);
|
||||
|
||||
scheduler.forwardNanos(1);
|
||||
assertTrue(runner.ran);
|
||||
assertTrue(exec.executed);
|
||||
}
|
||||
|
||||
private static final class Exec implements Executor {
|
||||
boolean executed;
|
||||
|
||||
@Override
|
||||
public void execute(Runnable command) {
|
||||
executed = true;
|
||||
|
||||
command.run();
|
||||
}
|
||||
}
|
||||
|
||||
private static final class Runner implements Runnable {
|
||||
boolean ran;
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
ran = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue