mirror of https://github.com/grpc/grpc-java.git
interop: Fix flake in cascading test due to sleeps
The value of nodeCount depended on deadlines expiring after the chain was constructed. This is effectively the same as using Thread.sleep() and would commonly fail if the machine was under load. Instead of checking nodeCount after the deadline expires, we now wait for the chain to be constructed and then cancel the RPC. This also ensures that the cancel propagates instead of each hop just enforcing the deadline. As a bonus, this also reduces test execution time by one second. A new test was added for deadline propagation. Fixes #1852
This commit is contained in:
parent
2506e9396a
commit
563baa4a20
|
|
@ -31,10 +31,18 @@
|
|||
|
||||
package io.grpc.testing.integration;
|
||||
|
||||
import static com.google.common.truth.Truth.assertAbout;
|
||||
import static io.grpc.testing.DeadlineSubject.deadline;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotSame;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import com.google.common.util.concurrent.SettableFuture;
|
||||
|
||||
import io.grpc.Context;
|
||||
import io.grpc.Context.CancellableContext;
|
||||
import io.grpc.Deadline;
|
||||
import io.grpc.Metadata;
|
||||
import io.grpc.ServerCall;
|
||||
import io.grpc.ServerCallHandler;
|
||||
|
|
@ -46,7 +54,10 @@ import io.grpc.inprocess.InProcessChannelBuilder;
|
|||
import io.grpc.inprocess.InProcessServerBuilder;
|
||||
import io.grpc.internal.ManagedChannelImpl;
|
||||
import io.grpc.internal.ServerImpl;
|
||||
import io.grpc.stub.ServerCallStreamObserver;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import io.grpc.testing.integration.Messages.SimpleRequest;
|
||||
import io.grpc.testing.integration.Messages.SimpleResponse;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
|
|
@ -59,14 +70,15 @@ import org.mockito.MockitoAnnotations;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* Integration test for various forms of cancellation & deadline propagation.
|
||||
* Integration test for various forms of cancellation and deadline propagation.
|
||||
*/
|
||||
@RunWith(JUnit4.class)
|
||||
public class CascadingTest {
|
||||
|
|
@ -75,57 +87,58 @@ public class CascadingTest {
|
|||
TestServiceGrpc.TestServiceImplBase service;
|
||||
private ManagedChannelImpl channel;
|
||||
private ServerImpl server;
|
||||
private AtomicInteger nodeCount;
|
||||
private CountDownLatch observedCancellations;
|
||||
private CountDownLatch receivedCancellations;
|
||||
private TestServiceGrpc.TestServiceBlockingStub blockingStub;
|
||||
private TestServiceGrpc.TestServiceStub asyncStub;
|
||||
private ScheduledExecutorService scheduler;
|
||||
private TestServiceGrpc.TestServiceFutureStub futureStub;
|
||||
private ExecutorService otherWork;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
MockitoAnnotations.initMocks(this);
|
||||
Mockito.when(service.bindService()).thenCallRealMethod();
|
||||
nodeCount = new AtomicInteger();
|
||||
scheduler = Executors.newScheduledThreadPool(1);
|
||||
// Use a cached thread pool as we need a thread for each blocked call
|
||||
otherWork = Executors.newCachedThreadPool();
|
||||
channel = InProcessChannelBuilder.forName("channel").executor(otherWork).build();
|
||||
blockingStub = TestServiceGrpc.newBlockingStub(channel);
|
||||
asyncStub = TestServiceGrpc.newStub(channel);
|
||||
futureStub = TestServiceGrpc.newFutureStub(channel);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
Context.ROOT.attach();
|
||||
channel.shutdownNow();
|
||||
server.shutdownNow();
|
||||
otherWork.shutdownNow();
|
||||
scheduler.shutdownNow();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test {@link Context} expiration propagates from the first node in the call chain all the way
|
||||
* Test {@link Context} cancellation propagates from the first node in the call chain all the way
|
||||
* to the last.
|
||||
*/
|
||||
@Test
|
||||
public void testCascadingCancellationViaOuterContextExpiration() throws Exception {
|
||||
public void testCascadingCancellationViaOuterContextCancellation() throws Exception {
|
||||
observedCancellations = new CountDownLatch(2);
|
||||
receivedCancellations = new CountDownLatch(3);
|
||||
startChainingServer(3);
|
||||
|
||||
Context.current().withDeadlineAfter(500, TimeUnit.MILLISECONDS, scheduler).attach();
|
||||
Future<?> chainReady = startChainingServer(3);
|
||||
CancellableContext context = Context.current().withCancellation();
|
||||
Future<SimpleResponse> future;
|
||||
Context prevContext = context.attach();
|
||||
try {
|
||||
blockingStub.unaryCall(Messages.SimpleRequest.getDefaultInstance());
|
||||
fail("Expected cancellation");
|
||||
} catch (StatusRuntimeException sre) {
|
||||
// Wait for the workers to finish
|
||||
Status status = Status.fromThrowable(sre);
|
||||
assertEquals(Status.Code.DEADLINE_EXCEEDED, status.getCode());
|
||||
future = futureStub.unaryCall(SimpleRequest.getDefaultInstance());
|
||||
} finally {
|
||||
context.detach(prevContext);
|
||||
}
|
||||
chainReady.get(5, TimeUnit.SECONDS);
|
||||
|
||||
// Should have 3 calls before timeout propagates
|
||||
assertEquals(3, nodeCount.get());
|
||||
context.cancel(null);
|
||||
try {
|
||||
future.get(5, TimeUnit.SECONDS);
|
||||
fail("Expected cancellation");
|
||||
} catch (ExecutionException ex) {
|
||||
Status status = Status.fromThrowable(ex);
|
||||
assertEquals(Status.Code.CANCELLED, status.getCode());
|
||||
|
||||
// Should have observed 2 cancellations responses from downstream servers
|
||||
if (!observedCancellations.await(5, TimeUnit.SECONDS)) {
|
||||
|
|
@ -138,33 +151,23 @@ public class CascadingTest {
|
|||
}
|
||||
|
||||
/**
|
||||
* Test that cancellation via method deadline propagates down the call.
|
||||
* Test that cancellation via call cancellation propagates down the call.
|
||||
*/
|
||||
@Test
|
||||
public void testCascadingCancellationViaMethodTimeout() throws Exception {
|
||||
public void testCascadingCancellationViaRpcCancel() throws Exception {
|
||||
observedCancellations = new CountDownLatch(2);
|
||||
receivedCancellations = new CountDownLatch(3);
|
||||
startChainingServer(3);
|
||||
Future<?> chainReady = startChainingServer(3);
|
||||
Future<SimpleResponse> future = futureStub.unaryCall(SimpleRequest.getDefaultInstance());
|
||||
chainReady.get(5, TimeUnit.SECONDS);
|
||||
|
||||
try {
|
||||
blockingStub.withDeadlineAfter(500, TimeUnit.MILLISECONDS)
|
||||
.unaryCall(Messages.SimpleRequest.getDefaultInstance());
|
||||
fail("Expected cancellation");
|
||||
} catch (StatusRuntimeException sre) {
|
||||
// Wait for the workers to finish
|
||||
Status status = Status.fromThrowable(sre);
|
||||
// Outermost caller observes deadline exceeded, the descendant RPCs are cancelled so they
|
||||
// receive cancellation.
|
||||
assertEquals(Status.Code.DEADLINE_EXCEEDED, status.getCode());
|
||||
|
||||
// Should have 3 calls before deadline propagates
|
||||
assertEquals(3, nodeCount.get());
|
||||
if (!observedCancellations.await(5, TimeUnit.SECONDS)) {
|
||||
fail("Expected number of cancellations not observed by clients");
|
||||
}
|
||||
if (!receivedCancellations.await(5, TimeUnit.SECONDS)) {
|
||||
fail("Expected number of cancellations to be received by servers not observed");
|
||||
}
|
||||
future.cancel(true);
|
||||
assertTrue(future.isCancelled());
|
||||
if (!observedCancellations.await(5, TimeUnit.SECONDS)) {
|
||||
fail("Expected number of cancellations not observed by clients");
|
||||
}
|
||||
if (!receivedCancellations.await(5, TimeUnit.SECONDS)) {
|
||||
fail("Expected number of cancellations to be received by servers not observed");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -184,7 +187,7 @@ public class CascadingTest {
|
|||
fail("Expected abort");
|
||||
} catch (StatusRuntimeException sre) {
|
||||
// Wait for the workers to finish
|
||||
Status status = Status.fromThrowable(sre);
|
||||
Status status = sre.getStatus();
|
||||
// Outermost caller observes ABORTED propagating up from the failing leaf,
|
||||
// The descendant RPCs are cancelled so they receive CANCELLED.
|
||||
assertEquals(Status.Code.ABORTED, status.getCode());
|
||||
|
|
@ -198,58 +201,91 @@ public class CascadingTest {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDeadlinePropagation() throws Exception {
|
||||
final AtomicInteger recursionDepthRemaining = new AtomicInteger(3);
|
||||
final SettableFuture<Deadline> finalDeadline = SettableFuture.create();
|
||||
class DeadlineSaver extends TestServiceGrpc.TestServiceImplBase {
|
||||
@Override
|
||||
public void unaryCall(final SimpleRequest request,
|
||||
final StreamObserver<SimpleResponse> responseObserver) {
|
||||
Context.currentContextExecutor(otherWork).execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
if (recursionDepthRemaining.decrementAndGet() == 0) {
|
||||
finalDeadline.set(Context.current().getDeadline());
|
||||
responseObserver.onNext(SimpleResponse.getDefaultInstance());
|
||||
} else {
|
||||
responseObserver.onNext(blockingStub.unaryCall(request));
|
||||
}
|
||||
responseObserver.onCompleted();
|
||||
} catch (Exception ex) {
|
||||
responseObserver.onError(ex);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
server = InProcessServerBuilder.forName("channel").executor(otherWork)
|
||||
.addService(new DeadlineSaver())
|
||||
.build().start();
|
||||
|
||||
Deadline initialDeadline = Deadline.after(1, TimeUnit.MINUTES);
|
||||
blockingStub.withDeadline(initialDeadline).unaryCall(SimpleRequest.getDefaultInstance());
|
||||
assertNotSame(initialDeadline, finalDeadline);
|
||||
// Since deadline is re-calculated at each hop, some variance is acceptable and expected.
|
||||
assertAbout(deadline())
|
||||
.that(finalDeadline.get()).isWithin(1, TimeUnit.SECONDS).of(initialDeadline);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a chain of client to server calls which can be cancelled top down.
|
||||
*
|
||||
* @return a Future that completes when call chain is created
|
||||
*/
|
||||
private void startChainingServer(final int depthThreshold)
|
||||
throws IOException {
|
||||
server = InProcessServerBuilder.forName("channel").executor(otherWork).addService(
|
||||
ServerInterceptors.intercept(service,
|
||||
new ServerInterceptor() {
|
||||
@Override
|
||||
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
|
||||
final ServerCall<ReqT, RespT> call,
|
||||
Metadata headers,
|
||||
ServerCallHandler<ReqT, RespT> next) {
|
||||
// Respond with the headers but nothing else.
|
||||
call.sendHeaders(new Metadata());
|
||||
call.request(1);
|
||||
return new ServerCall.Listener<ReqT>() {
|
||||
@Override
|
||||
public void onMessage(final ReqT message) {
|
||||
if (nodeCount.incrementAndGet() == depthThreshold) {
|
||||
// No need to abort so just wait for top-down cancellation
|
||||
return;
|
||||
}
|
||||
private Future<?> startChainingServer(final int depthThreshold) throws IOException {
|
||||
final AtomicInteger serversReady = new AtomicInteger();
|
||||
final SettableFuture<Void> chainReady = SettableFuture.create();
|
||||
class ChainingService extends TestServiceGrpc.TestServiceImplBase {
|
||||
@Override
|
||||
public void unaryCall(final SimpleRequest request,
|
||||
final StreamObserver<SimpleResponse> responseObserver) {
|
||||
((ServerCallStreamObserver) responseObserver).setOnCancelHandler(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
receivedCancellations.countDown();
|
||||
}
|
||||
});
|
||||
if (serversReady.incrementAndGet() == depthThreshold) {
|
||||
// Stop recursion
|
||||
chainReady.set(null);
|
||||
return;
|
||||
}
|
||||
|
||||
Context.currentContextExecutor(otherWork).execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
blockingStub.unaryCall((Messages.SimpleRequest) message);
|
||||
} catch (Exception e) {
|
||||
Status status = Status.fromThrowable(e);
|
||||
if (status.getCode() == Status.Code.CANCELLED
|
||||
|| status.getCode() == Status.Code.DEADLINE_EXCEEDED) {
|
||||
observedCancellations.countDown();
|
||||
} else if (status.getCode() == Status.Code.ABORTED) {
|
||||
// Propagate aborted back up
|
||||
call.close(status, new Metadata());
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCancel() {
|
||||
receivedCancellations.countDown();
|
||||
}
|
||||
};
|
||||
Context.currentContextExecutor(otherWork).execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
blockingStub.unaryCall(request);
|
||||
} catch (StatusRuntimeException e) {
|
||||
Status status = e.getStatus();
|
||||
if (status.getCode() == Status.Code.CANCELLED) {
|
||||
observedCancellations.countDown();
|
||||
} else {
|
||||
responseObserver.onError(e);
|
||||
}
|
||||
})
|
||||
).build();
|
||||
server.start();
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
server = InProcessServerBuilder.forName("channel").executor(otherWork)
|
||||
.addService(new ChainingService())
|
||||
.build().start();
|
||||
return chainReady;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -290,7 +326,6 @@ public class CascadingTest {
|
|||
new StreamObserver<Messages.SimpleResponse>() {
|
||||
@Override
|
||||
public void onNext(Messages.SimpleResponse value) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -302,7 +337,7 @@ public class CascadingTest {
|
|||
// Propagate closure upwards.
|
||||
try {
|
||||
call.close(status, new Metadata());
|
||||
} catch (Throwable t2) {
|
||||
} catch (IllegalStateException t2) {
|
||||
// Ignore error if already closed.
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue