benchmarks: Avoid implementing Future

It's a lot of code and there are classes in Guava are better. This was noticed
with a lint checker. This commit does change the error-handling behavior, as
previous the code wrongly cancelled the Future instead of setting it to have an
exception.
This commit is contained in:
Eric Anderson 2020-08-03 13:15:14 -07:00 committed by Eric Anderson
parent e92b2275f9
commit 8c4088a9e9
1 changed files with 9 additions and 73 deletions

View File

@ -34,11 +34,10 @@ import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.TLS;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.TRANSPORT;
import static io.grpc.benchmarks.qps.ClientConfiguration.ClientParam.WARMUP_DURATION;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.ByteString;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.Status;
import io.grpc.benchmarks.proto.BenchmarkServiceGrpc;
import io.grpc.benchmarks.proto.BenchmarkServiceGrpc.BenchmarkServiceStub;
import io.grpc.benchmarks.proto.Messages.Payload;
@ -47,7 +46,6 @@ import io.grpc.benchmarks.proto.Messages.SimpleResponse;
import io.grpc.stub.StreamObserver;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.HdrHistogram.Histogram;
@ -152,7 +150,7 @@ public class AsyncClient {
final long endTime) {
final BenchmarkServiceStub stub = BenchmarkServiceGrpc.newStub(channel);
final Histogram histogram = new Histogram(HISTOGRAM_MAX_VALUE, HISTOGRAM_PRECISION);
final HistogramFuture future = new HistogramFuture(histogram);
final SettableFuture<Histogram> future = SettableFuture.create();
stub.unaryCall(request, new StreamObserver<SimpleResponse>() {
long lastCall = System.nanoTime();
@ -163,11 +161,7 @@ public class AsyncClient {
@Override
public void onError(Throwable t) {
Status status = Status.fromThrowable(t);
System.err.println("Encountered an error in unaryCall. Status is " + status);
t.printStackTrace();
future.cancel(true);
future.setException(new RuntimeException("Encountered an error in unaryCall", t));
}
@Override
@ -180,7 +174,7 @@ public class AsyncClient {
if (endTime - now > 0) {
stub.unaryCall(request, this);
} else {
future.done();
future.set(histogram);
}
}
});
@ -192,7 +186,7 @@ public class AsyncClient {
final long endTime) {
final BenchmarkServiceStub stub = BenchmarkServiceGrpc.newStub(channel);
final Histogram histogram = new Histogram(HISTOGRAM_MAX_VALUE, HISTOGRAM_PRECISION);
final HistogramFuture future = new HistogramFuture(histogram);
final SettableFuture<Histogram> future = SettableFuture.create();
ThisIsAHackStreamObserver responseObserver =
new ThisIsAHackStreamObserver(request, histogram, future, endTime);
@ -211,7 +205,7 @@ public class AsyncClient {
final SimpleRequest request;
final Histogram histogram;
final HistogramFuture future;
final SettableFuture<Histogram> future;
final long endTime;
long lastCall = System.nanoTime();
@ -219,7 +213,7 @@ public class AsyncClient {
ThisIsAHackStreamObserver(SimpleRequest request,
Histogram histogram,
HistogramFuture future,
SettableFuture<Histogram> future,
long endTime) {
this.request = request;
this.histogram = histogram;
@ -243,16 +237,12 @@ public class AsyncClient {
@Override
public void onError(Throwable t) {
Status status = Status.fromThrowable(t);
System.err.println("Encountered an error in streamingCall. Status is " + status);
t.printStackTrace();
future.cancel(true);
future.setException(new RuntimeException("Encountered an error in streamingCall", t));
}
@Override
public void onCompleted() {
future.done();
future.set(histogram);
}
}
@ -318,58 +308,4 @@ public class AsyncClient {
AsyncClient client = new AsyncClient(config);
client.run();
}
private static class HistogramFuture implements Future<Histogram> {
private final Histogram histogram;
private boolean canceled;
private boolean done;
HistogramFuture(Histogram histogram) {
Preconditions.checkNotNull(histogram, "histogram");
this.histogram = histogram;
}
@Override
public synchronized boolean cancel(boolean mayInterruptIfRunning) {
if (!done && !canceled) {
canceled = true;
notifyAll();
return true;
}
return false;
}
@Override
public synchronized boolean isCancelled() {
return canceled;
}
@Override
public synchronized boolean isDone() {
return done || canceled;
}
@Override
public synchronized Histogram get() throws InterruptedException {
while (!isDone() && !isCancelled()) {
wait();
}
if (isCancelled()) {
throw new CancellationException();
}
return histogram;
}
@Override
public Histogram get(long timeout, TimeUnit unit) {
throw new UnsupportedOperationException();
}
private synchronized void done() {
done = true;
notifyAll();
}
}
}