mirror of https://github.com/grpc/grpc-java.git
Revert the cascading test for the moment as its flaky
This commit is contained in:
parent
6f1261f288
commit
caad0294b9
|
|
@ -1,324 +0,0 @@
|
|||
/*
|
||||
* Copyright 2015, Google Inc. All rights reserved.
|
||||
*
|
||||
* Redistribution and use in source and binary forms, with or without
|
||||
* modification, are permitted provided that the following conditions are
|
||||
* met:
|
||||
*
|
||||
* * Redistributions of source code must retain the above copyright
|
||||
* notice, this list of conditions and the following disclaimer.
|
||||
* * Redistributions in binary form must reproduce the above
|
||||
* copyright notice, this list of conditions and the following disclaimer
|
||||
* in the documentation and/or other materials provided with the
|
||||
* distribution.
|
||||
*
|
||||
* * Neither the name of Google Inc. nor the names of its
|
||||
* contributors may be used to endorse or promote products derived from
|
||||
* this software without specific prior written permission.
|
||||
*
|
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
||||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
||||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
||||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
||||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
||||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
||||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
||||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
*/
|
||||
|
||||
package io.grpc.testing.integration;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import io.grpc.Context;
|
||||
import io.grpc.Metadata;
|
||||
import io.grpc.MethodDescriptor;
|
||||
import io.grpc.ServerCall;
|
||||
import io.grpc.ServerCallHandler;
|
||||
import io.grpc.ServerInterceptor;
|
||||
import io.grpc.ServerInterceptors;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.StatusRuntimeException;
|
||||
import io.grpc.inprocess.InProcessChannelBuilder;
|
||||
import io.grpc.inprocess.InProcessServerBuilder;
|
||||
import io.grpc.internal.ManagedChannelImpl;
|
||||
import io.grpc.internal.ServerImpl;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.JUnit4;
|
||||
import org.mockito.Mock;
|
||||
import org.mockito.MockitoAnnotations;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
/**
|
||||
* Integration test for various forms of cancellation & deadline propagation.
|
||||
*/
|
||||
@RunWith(JUnit4.class)
|
||||
public class CascadingTest {
|
||||
|
||||
@Mock
|
||||
TestServiceGrpc.TestService service;
|
||||
private ManagedChannelImpl channel;
|
||||
private ServerImpl server;
|
||||
private AtomicInteger nodeCount;
|
||||
private AtomicInteger observedCancellations;
|
||||
private AtomicInteger receivedCancellations;
|
||||
private TestServiceGrpc.TestServiceBlockingStub blockingStub;
|
||||
private TestServiceGrpc.TestServiceStub asyncStub;
|
||||
private ScheduledExecutorService scheduler;
|
||||
private ExecutorService otherWork;
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
MockitoAnnotations.initMocks(this);
|
||||
nodeCount = new AtomicInteger();
|
||||
observedCancellations = new AtomicInteger();
|
||||
receivedCancellations = new AtomicInteger();
|
||||
scheduler = Executors.newScheduledThreadPool(1);
|
||||
// Use a cached thread pool as we need a thread for each blocked call
|
||||
otherWork = Executors.newCachedThreadPool();
|
||||
channel = InProcessChannelBuilder.forName("channel").executor(otherWork).build();
|
||||
blockingStub = TestServiceGrpc.newBlockingStub(channel);
|
||||
asyncStub = TestServiceGrpc.newStub(channel);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
Context.ROOT.attach();
|
||||
channel.shutdownNow();
|
||||
server.shutdownNow();
|
||||
otherWork.shutdownNow();
|
||||
scheduler.shutdownNow();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test {@link Context} expiration propagates from the first node in the call chain all the way
|
||||
* to the last.
|
||||
*/
|
||||
@Test
|
||||
public void testCascadingCancellationViaOuterContextExpiration() throws Exception {
|
||||
startChainingServer(3);
|
||||
Context.current().withDeadlineAfter(500, TimeUnit.MILLISECONDS, scheduler).attach();
|
||||
try {
|
||||
blockingStub.unaryCall(Messages.SimpleRequest.getDefaultInstance());
|
||||
fail("Expected cancellation");
|
||||
} catch (StatusRuntimeException sre) {
|
||||
// Wait for the workers to finish
|
||||
Status status = Status.fromThrowable(sre);
|
||||
assertEquals(Status.Code.CANCELLED, status.getCode());
|
||||
|
||||
// Wait for the channel to shutdown so we know all the calls have completed
|
||||
channel.shutdown();
|
||||
channel.awaitTermination(5, TimeUnit.SECONDS);
|
||||
|
||||
// Should have 3 calls before timeout propagates
|
||||
assertEquals(3, nodeCount.get());
|
||||
|
||||
// Should have observed 2 cancellations responses from downstream servers
|
||||
assertEquals(2, observedCancellations.get());
|
||||
// and received 3 cancellations from upstream clients
|
||||
assertEquals(3, receivedCancellations.get());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that cancellation via method deadline propagates down the call.
|
||||
*/
|
||||
@Test
|
||||
public void testCascadingCancellationViaMethodTimeout() throws Exception {
|
||||
startChainingServer(3);
|
||||
try {
|
||||
blockingStub.withDeadlineAfter(500, TimeUnit.MILLISECONDS)
|
||||
.unaryCall(Messages.SimpleRequest.getDefaultInstance());
|
||||
fail("Expected cancellation");
|
||||
} catch (StatusRuntimeException sre) {
|
||||
// Wait for the workers to finish
|
||||
Status status = Status.fromThrowable(sre);
|
||||
// Outermost caller observes deadline exceeded, the descendant RPCs are cancelled so they
|
||||
// receive cancellation.
|
||||
assertEquals(Status.Code.DEADLINE_EXCEEDED, status.getCode());
|
||||
|
||||
// Wait for the channel to shutdown so we know all the calls have completed
|
||||
channel.shutdown();
|
||||
channel.awaitTermination(5, TimeUnit.SECONDS);
|
||||
|
||||
// Should have 3 calls before deadline propagates
|
||||
assertEquals(3, nodeCount.get());
|
||||
// Server should have observed 2 cancellations from downstream calls
|
||||
assertEquals(2, observedCancellations.get());
|
||||
// and received 2 cancellations
|
||||
assertEquals(3, receivedCancellations.get());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that when RPC cancellation propagates up a call chain, the cancellation of the parent
|
||||
* RPC triggers cancellation of all of its children.
|
||||
*/
|
||||
@Test
|
||||
public void testCascadingCancellationViaLeafFailure() throws Exception {
|
||||
startCallTreeServer(3);
|
||||
try {
|
||||
// Use response size limit to control tree nodeCount.
|
||||
blockingStub.unaryCall(Messages.SimpleRequest.newBuilder().setResponseSize(3).build());
|
||||
fail("Expected abort");
|
||||
} catch (StatusRuntimeException sre) {
|
||||
// Wait for the workers to finish
|
||||
Status status = Status.fromThrowable(sre);
|
||||
// Outermost caller observes ABORTED propagating up from the failing leaf,
|
||||
// The descendant RPCs are cancelled so they receive CANCELLED.
|
||||
assertEquals(Status.Code.ABORTED, status.getCode());
|
||||
|
||||
// Wait for the channel to shutdown so we know all the calls have completed
|
||||
channel.shutdown();
|
||||
channel.awaitTermination(5, TimeUnit.SECONDS);
|
||||
|
||||
// All nodes (15) except one edge of the tree (4) will be cancelled.
|
||||
assertEquals(11, observedCancellations.get());
|
||||
assertEquals(11, receivedCancellations.get());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a chain of client to server calls which can be cancelled top down.
|
||||
*/
|
||||
private void startChainingServer(final int depthThreshold)
|
||||
throws IOException {
|
||||
server = InProcessServerBuilder.forName("channel").addService(
|
||||
ServerInterceptors.intercept(TestServiceGrpc.bindService(service),
|
||||
new ServerInterceptor() {
|
||||
@Override
|
||||
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
|
||||
MethodDescriptor<ReqT, RespT> method,
|
||||
final ServerCall<RespT> call,
|
||||
Metadata headers,
|
||||
ServerCallHandler<ReqT, RespT> next) {
|
||||
// Respond with the headers but nothing else.
|
||||
call.sendHeaders(new Metadata());
|
||||
call.request(1);
|
||||
return new ServerCall.Listener<ReqT>() {
|
||||
@Override
|
||||
public void onMessage(final ReqT message) {
|
||||
if (nodeCount.incrementAndGet() == depthThreshold) {
|
||||
// No need to abort so just wait for top-down cancellation
|
||||
return;
|
||||
}
|
||||
|
||||
Context.propagate(otherWork).execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
blockingStub.unaryCall((Messages.SimpleRequest) message);
|
||||
} catch (Exception e) {
|
||||
Status status = Status.fromThrowable(e);
|
||||
if (status.getCode() == Status.Code.CANCELLED) {
|
||||
observedCancellations.incrementAndGet();
|
||||
} else if (status.getCode() == Status.Code.ABORTED) {
|
||||
// Propagate aborted back up
|
||||
call.close(status, new Metadata());
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCancel() {
|
||||
receivedCancellations.incrementAndGet();
|
||||
}
|
||||
};
|
||||
}
|
||||
})
|
||||
).build();
|
||||
server.start();
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a tree of client to server calls where each received call on the server
|
||||
* fans out to two downstream calls. Uses SimpleRequest.response_size to limit the nodeCount
|
||||
* of the tree. One of the leaves will ABORT to trigger cancellation back up to tree.
|
||||
*/
|
||||
private void startCallTreeServer(int depthThreshold) throws IOException {
|
||||
final AtomicInteger nodeCount = new AtomicInteger((2 << depthThreshold) - 1);
|
||||
server = InProcessServerBuilder.forName("channel").addService(
|
||||
ServerInterceptors.intercept(TestServiceGrpc.bindService(service),
|
||||
new ServerInterceptor() {
|
||||
@Override
|
||||
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
|
||||
MethodDescriptor<ReqT, RespT> method,
|
||||
final ServerCall<RespT> call,
|
||||
Metadata headers,
|
||||
ServerCallHandler<ReqT, RespT> next) {
|
||||
// Respond with the headers but nothing else.
|
||||
call.sendHeaders(new Metadata());
|
||||
call.request(1);
|
||||
return new ServerCall.Listener<ReqT>() {
|
||||
@Override
|
||||
public void onMessage(final ReqT message) {
|
||||
Messages.SimpleRequest req = (Messages.SimpleRequest) message;
|
||||
if (nodeCount.decrementAndGet() == 0) {
|
||||
// we are in the final leaf node so trigger an ABORT upwards
|
||||
Context.propagate(otherWork).execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
call.close(Status.ABORTED, new Metadata());
|
||||
}
|
||||
});
|
||||
} else if (req.getResponseSize() != 0) {
|
||||
// We are in a non leaf node so fire off two requests
|
||||
req = req.toBuilder().setResponseSize(req.getResponseSize() - 1).build();
|
||||
for (int i = 0; i < 2; i++) {
|
||||
asyncStub.unaryCall(req,
|
||||
new StreamObserver<Messages.SimpleResponse>() {
|
||||
@Override
|
||||
public void onNext(Messages.SimpleResponse value) {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(Throwable t) {
|
||||
Status status = Status.fromThrowable(t);
|
||||
if (status.getCode() == Status.Code.CANCELLED) {
|
||||
observedCancellations.incrementAndGet();
|
||||
}
|
||||
// Propagate closure upwards.
|
||||
try {
|
||||
call.close(status, new Metadata());
|
||||
} catch (Throwable t2) {
|
||||
// Ignore error if already closed.
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCompleted() {
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onCancel() {
|
||||
receivedCancellations.incrementAndGet();
|
||||
}
|
||||
};
|
||||
}
|
||||
})
|
||||
).build();
|
||||
server.start();
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue