mirror of https://github.com/grpc/grpc-java.git
core: keepaliveManager not to use Ping.onSuccess; ragard onDataReceive as ping Ack
Preparing to support server side keepalive. For the convience on server side, not to use Ping `onSuccess()` callback to cancle shutdownFuture any more, instead, regard `onDataReceived()` as ping Ack and cancel shutdownFuture in it.
This commit is contained in:
parent
3390b6ae8d
commit
c44a4b24dd
|
|
@ -162,6 +162,21 @@ public class KeepAliveManager {
|
||||||
// keep one sendPing task always in flight when there're active rpcs.
|
// keep one sendPing task always in flight when there're active rpcs.
|
||||||
if (state == State.PING_SCHEDULED) {
|
if (state == State.PING_SCHEDULED) {
|
||||||
state = State.PING_DELAYED;
|
state = State.PING_DELAYED;
|
||||||
|
} else if (state == State.PING_SENT || state == State.IDLE_AND_PING_SENT) {
|
||||||
|
// Ping acked or effectively ping acked. Cancel shutdown, and then if not idle,
|
||||||
|
// schedule a new keep-alive ping.
|
||||||
|
if (shutdownFuture != null) {
|
||||||
|
shutdownFuture.cancel(false);
|
||||||
|
}
|
||||||
|
if (state == State.IDLE_AND_PING_SENT) {
|
||||||
|
// not to schedule new pings until onTransportActive
|
||||||
|
state = State.IDLE;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// schedule a new ping
|
||||||
|
state = State.PING_SCHEDULED;
|
||||||
|
pingFuture =
|
||||||
|
scheduler.schedule(sendPing, keepAliveDelayInNanos, TimeUnit.NANOSECONDS);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -171,7 +186,7 @@ public class KeepAliveManager {
|
||||||
public synchronized void onTransportActive() {
|
public synchronized void onTransportActive() {
|
||||||
if (state == State.IDLE) {
|
if (state == State.IDLE) {
|
||||||
// When the transport goes active, we do not reset the nextKeepaliveTime. This allows us to
|
// When the transport goes active, we do not reset the nextKeepaliveTime. This allows us to
|
||||||
// quickly check whether the conneciton is still working.
|
// quickly check whether the connection is still working.
|
||||||
state = State.PING_SCHEDULED;
|
state = State.PING_SCHEDULED;
|
||||||
pingFuture = scheduler.schedule(sendPing, nextKeepaliveTime - ticker.read(),
|
pingFuture = scheduler.schedule(sendPing, nextKeepaliveTime - ticker.read(),
|
||||||
TimeUnit.NANOSECONDS);
|
TimeUnit.NANOSECONDS);
|
||||||
|
|
@ -210,23 +225,7 @@ public class KeepAliveManager {
|
||||||
private class KeepAlivePingCallback implements ClientTransport.PingCallback {
|
private class KeepAlivePingCallback implements ClientTransport.PingCallback {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onSuccess(long roundTripTimeNanos) {
|
public void onSuccess(long roundTripTimeNanos) {}
|
||||||
synchronized (KeepAliveManager.this) {
|
|
||||||
shutdownFuture.cancel(false);
|
|
||||||
nextKeepaliveTime = ticker.read() + keepAliveDelayInNanos;
|
|
||||||
if (state == State.PING_SENT) {
|
|
||||||
// We have received the ping response so there's no need to shutdown the transport.
|
|
||||||
// Schedule a new keepalive ping.
|
|
||||||
pingFuture = scheduler.schedule(sendPing, keepAliveDelayInNanos, TimeUnit.NANOSECONDS);
|
|
||||||
state = State.PING_SCHEDULED;
|
|
||||||
}
|
|
||||||
if (state == State.IDLE_AND_PING_SENT) {
|
|
||||||
// Transport went idle after we had sent out the ping. We don't need to schedule a new
|
|
||||||
// ping.
|
|
||||||
state = State.IDLE;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(Throwable cause) {
|
public void onFailure(Throwable cause) {
|
||||||
|
|
|
||||||
|
|
@ -95,7 +95,6 @@ public final class KeepAliveManagerTest {
|
||||||
ArgumentCaptor<ClientTransport.PingCallback> pingCallbackCaptor =
|
ArgumentCaptor<ClientTransport.PingCallback> pingCallbackCaptor =
|
||||||
ArgumentCaptor.forClass(ClientTransport.PingCallback.class);
|
ArgumentCaptor.forClass(ClientTransport.PingCallback.class);
|
||||||
verify(transport).ping(pingCallbackCaptor.capture(), isA(Executor.class));
|
verify(transport).ping(pingCallbackCaptor.capture(), isA(Executor.class));
|
||||||
ClientTransport.PingCallback pingCallback = pingCallbackCaptor.getValue();
|
|
||||||
verify(scheduler, times(2)).schedule(isA(Runnable.class), delayCaptor.capture(),
|
verify(scheduler, times(2)).schedule(isA(Runnable.class), delayCaptor.capture(),
|
||||||
isA(TimeUnit.class));
|
isA(TimeUnit.class));
|
||||||
delay = delayCaptor.getValue();
|
delay = delayCaptor.getValue();
|
||||||
|
|
@ -104,7 +103,7 @@ public final class KeepAliveManagerTest {
|
||||||
|
|
||||||
// Ping succeeds. Reschedule another ping.
|
// Ping succeeds. Reschedule another ping.
|
||||||
ticker.time = 1100;
|
ticker.time = 1100;
|
||||||
pingCallback.onSuccess(100);
|
keepAliveManager.onDataReceived();
|
||||||
verify(scheduler, times(3)).schedule(isA(Runnable.class), delayCaptor.capture(),
|
verify(scheduler, times(3)).schedule(isA(Runnable.class), delayCaptor.capture(),
|
||||||
isA(TimeUnit.class));
|
isA(TimeUnit.class));
|
||||||
// Shutdown task has been cancelled.
|
// Shutdown task has been cancelled.
|
||||||
|
|
@ -280,13 +279,12 @@ public final class KeepAliveManagerTest {
|
||||||
ArgumentCaptor<ClientTransport.PingCallback> pingCallbackCaptor =
|
ArgumentCaptor<ClientTransport.PingCallback> pingCallbackCaptor =
|
||||||
ArgumentCaptor.forClass(ClientTransport.PingCallback.class);
|
ArgumentCaptor.forClass(ClientTransport.PingCallback.class);
|
||||||
verify(transport).ping(pingCallbackCaptor.capture(), isA(Executor.class));
|
verify(transport).ping(pingCallbackCaptor.capture(), isA(Executor.class));
|
||||||
ClientTransport.PingCallback pingCallback = pingCallbackCaptor.getValue();
|
|
||||||
verify(scheduler, times(2)).schedule(isA(Runnable.class), isA(Long.class), isA(TimeUnit.class));
|
verify(scheduler, times(2)).schedule(isA(Runnable.class), isA(Long.class), isA(TimeUnit.class));
|
||||||
|
|
||||||
// Transport becomes idle. No more ping should be scheduled after we receive a ping response.
|
// Transport becomes idle. No more ping should be scheduled after we receive a ping response.
|
||||||
keepAliveManager.onTransportIdle();
|
keepAliveManager.onTransportIdle();
|
||||||
ticker.time = 1100;
|
ticker.time = 1100;
|
||||||
pingCallback.onSuccess(100);
|
keepAliveManager.onDataReceived();
|
||||||
verify(scheduler, times(2)).schedule(isA(Runnable.class), isA(Long.class), isA(TimeUnit.class));
|
verify(scheduler, times(2)).schedule(isA(Runnable.class), isA(Long.class), isA(TimeUnit.class));
|
||||||
// Shutdown task has been cancelled.
|
// Shutdown task has been cancelled.
|
||||||
verify(shutdownFuture).cancel(isA(Boolean.class));
|
verify(shutdownFuture).cancel(isA(Boolean.class));
|
||||||
|
|
@ -399,13 +397,12 @@ public final class KeepAliveManagerTest {
|
||||||
ArgumentCaptor<ClientTransport.PingCallback> pingCallbackCaptor =
|
ArgumentCaptor<ClientTransport.PingCallback> pingCallbackCaptor =
|
||||||
ArgumentCaptor.forClass(ClientTransport.PingCallback.class);
|
ArgumentCaptor.forClass(ClientTransport.PingCallback.class);
|
||||||
verify(transport).ping(pingCallbackCaptor.capture(), isA(Executor.class));
|
verify(transport).ping(pingCallbackCaptor.capture(), isA(Executor.class));
|
||||||
ClientTransport.PingCallback pingCallback = pingCallbackCaptor.getValue();
|
|
||||||
|
|
||||||
keepAliveManager.onTransportIdle();
|
keepAliveManager.onTransportIdle();
|
||||||
|
|
||||||
keepAliveManager.onTransportActive();
|
keepAliveManager.onTransportActive();
|
||||||
|
|
||||||
pingCallback.onSuccess(100);
|
keepAliveManager.onDataReceived();
|
||||||
|
|
||||||
// another ping scheduled
|
// another ping scheduled
|
||||||
verify(scheduler, times(3))
|
verify(scheduler, times(3))
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue