core: delayedClientCall returns drainPendingCalls runnable in setCall (#8978)

`setCall()` returns drainPendingCalls runnable only when there are calls to drain, otherwise return null. Preserved the behaviour of `start()` and `cancel()`, as they are protected by `delayOrExecute()`.
This commit is contained in:
yifeizhuang 2022-03-15 12:57:24 -07:00 committed by GitHub
parent 87e13daf1a
commit 86b74d9ecc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 110 additions and 22 deletions

View File

@ -141,16 +141,21 @@ public class DelayedClientCall<ReqT, RespT> extends ClientCall<ReqT, RespT> {
* <p>No-op if either this method or {@link #cancel} have already been called.
*/
// When this method returns, passThrough is guaranteed to be true
public final void setCall(ClientCall<ReqT, RespT> call) {
public final Runnable setCall(ClientCall<ReqT, RespT> call) {
synchronized (this) {
// If realCall != null, then either setCall() or cancel() has been called.
if (realCall != null) {
return;
return null;
}
setRealCall(checkNotNull(call, "call"));
}
return new Runnable() {
@Override
public void run() {
drainPendingCalls();
}
};
}
@Override
public final void start(Listener<RespT> listener, final Metadata headers) {

View File

@ -1099,10 +1099,6 @@ final class ManagedChannelImpl extends ManagedChannel implements
/** Called when it's ready to create a real call and reprocess the pending call. */
void reprocess() {
getCallExecutor(callOptions).execute(
new Runnable() {
@Override
public void run() {
ClientCall<ReqT, RespT> realCall;
Context previous = context.attach();
try {
@ -1110,11 +1106,18 @@ final class ManagedChannelImpl extends ManagedChannel implements
} finally {
context.detach(previous);
}
setCall(realCall);
Runnable toRun = setCall(realCall);
if (toRun == null) {
syncContext.execute(new PendingCallRemoval());
} else {
getCallExecutor(callOptions).execute(new Runnable() {
@Override
public void run() {
toRun.run();
syncContext.execute(new PendingCallRemoval());
}
});
}
);
}
@Override

View File

@ -21,6 +21,7 @@ import static java.util.concurrent.TimeUnit.SECONDS;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import com.google.common.util.concurrent.MoreExecutors;
@ -30,6 +31,7 @@ import io.grpc.Deadline;
import io.grpc.ForwardingTestUtil;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.StatusException;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.util.Arrays;
@ -63,12 +65,13 @@ public class DelayedClientCallTest {
public void allMethodsForwarded() throws Exception {
DelayedClientCall<String, Integer> delayedClientCall =
new DelayedClientCall<>(callExecutor, fakeClock.getScheduledExecutorService(), null);
delayedClientCall.setCall(mockRealCall);
callMeMaybe(delayedClientCall.setCall(mockRealCall));
ForwardingTestUtil.testMethodsForwarded(
ClientCall.class,
mockRealCall,
delayedClientCall,
Arrays.asList(ClientCall.class.getMethod("toString")),
Arrays.asList(ClientCall.class.getMethod("toString"),
ClientCall.class.getMethod("start", Listener.class, Metadata.class)),
new ForwardingTestUtil.ArgumentProvider() {
@Override
public Object get(Method method, int argPos, Class<?> clazz) {
@ -101,7 +104,7 @@ public class DelayedClientCallTest {
DelayedClientCall<String, Integer> delayedClientCall = new DelayedClientCall<>(
callExecutor, fakeClock.getScheduledExecutorService(), Deadline.after(10, SECONDS));
delayedClientCall.start(listener, new Metadata());
delayedClientCall.setCall(mockRealCall);
callMeMaybe(delayedClientCall.setCall(mockRealCall));
ArgumentCaptor<Listener<Integer>> listenerCaptor = ArgumentCaptor.forClass(null);
verify(mockRealCall).start(listenerCaptor.capture(), any(Metadata.class));
Listener<Integer> realCallListener = listenerCaptor.getValue();
@ -119,4 +122,78 @@ public class DelayedClientCallTest {
verify(listener).onClose(statusCaptor.capture(), eq(trailer));
assertThat(statusCaptor.getValue().getCode()).isEqualTo(Status.Code.DATA_LOSS);
}
@Test
public void setCallThenStart() {
DelayedClientCall<String, Integer> delayedClientCall = new DelayedClientCall<>(
callExecutor, fakeClock.getScheduledExecutorService(), null);
callMeMaybe(delayedClientCall.setCall(mockRealCall));
delayedClientCall.start(listener, new Metadata());
delayedClientCall.request(1);
ArgumentCaptor<Listener<Integer>> listenerCaptor = ArgumentCaptor.forClass(null);
verify(mockRealCall).start(listenerCaptor.capture(), any(Metadata.class));
Listener<Integer> realCallListener = listenerCaptor.getValue();
verify(mockRealCall).request(1);
realCallListener.onMessage(1);
verify(listener).onMessage(1);
}
@Test
public void startThenSetCall() {
DelayedClientCall<String, Integer> delayedClientCall = new DelayedClientCall<>(
callExecutor, fakeClock.getScheduledExecutorService(), null);
delayedClientCall.start(listener, new Metadata());
delayedClientCall.request(1);
Runnable r = delayedClientCall.setCall(mockRealCall);
assertThat(r).isNotNull();
r.run();
ArgumentCaptor<Listener<Integer>> listenerCaptor = ArgumentCaptor.forClass(null);
verify(mockRealCall).start(listenerCaptor.capture(), any(Metadata.class));
Listener<Integer> realCallListener = listenerCaptor.getValue();
verify(mockRealCall).request(1);
realCallListener.onMessage(1);
verify(listener).onMessage(1);
}
@Test
@SuppressWarnings("unchecked")
public void cancelThenSetCall() {
DelayedClientCall<String, Integer> delayedClientCall = new DelayedClientCall<>(
callExecutor, fakeClock.getScheduledExecutorService(), null);
delayedClientCall.start(listener, new Metadata());
delayedClientCall.request(1);
delayedClientCall.cancel("cancel", new StatusException(Status.CANCELLED));
Runnable r = delayedClientCall.setCall(mockRealCall);
assertThat(r).isNull();
verify(mockRealCall, never()).start(any(Listener.class), any(Metadata.class));
verify(mockRealCall, never()).request(1);
verify(mockRealCall, never()).cancel(any(), any());
verify(listener).onClose(any(), any());
}
@Test
@SuppressWarnings("unchecked")
public void setCallThenCancel() {
DelayedClientCall<String, Integer> delayedClientCall = new DelayedClientCall<>(
callExecutor, fakeClock.getScheduledExecutorService(), null);
delayedClientCall.start(listener, new Metadata());
delayedClientCall.request(1);
Runnable r = delayedClientCall.setCall(mockRealCall);
assertThat(r).isNotNull();
r.run();
delayedClientCall.cancel("cancel", new StatusException(Status.CANCELLED));
ArgumentCaptor<Listener<Integer>> listenerCaptor = ArgumentCaptor.forClass(null);
verify(mockRealCall).start(listenerCaptor.capture(), any(Metadata.class));
Listener<Integer> realCallListener = listenerCaptor.getValue();
verify(mockRealCall).request(1);
verify(mockRealCall).cancel(any(), any());
realCallListener.onClose(Status.CANCELLED, null);
verify(listener).onClose(Status.CANCELLED, null);
}
private void callMeMaybe(Runnable r) {
if (r != null) {
r.run();
}
}
}

View File

@ -400,7 +400,10 @@ final class FaultFilter implements Filter, ClientInterceptorBuilder {
activeFaultCounter.decrementAndGet();
}
}
setCall(callSupplier.get());
Runnable toRun = setCall(callSupplier.get());
if (toRun != null) {
toRun.run();
}
}
},
delayNanos,