benchmarks: Remove usage of deprecated StreamObservers

This commit is contained in:
Eric Anderson 2023-11-13 16:29:38 -08:00 committed by GitHub
parent ae62785e0b
commit 6257c59566
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 22 additions and 45 deletions

View File

@ -34,7 +34,6 @@ import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinPool.ForkJoinWorkerThreadFactory;
import java.util.concurrent.ForkJoinWorkerThread;
@ -281,7 +280,6 @@ public class AsyncServer {
}
@Override
@SuppressWarnings("deprecation") // For StreamObservers, ideally we refactor this class out.
public void streamingFromServer(
final Messages.SimpleRequest request,
final StreamObserver<Messages.SimpleResponse> observer) {
@ -289,57 +287,36 @@ public class AsyncServer {
final Messages.SimpleResponse response = Utils.makeResponse(request);
final ServerCallStreamObserver<Messages.SimpleResponse> responseObserver =
(ServerCallStreamObserver<Messages.SimpleResponse>) observer;
// If the client cancels, copyWithFlowControl takes care of calling
// responseObserver.onCompleted() for us
io.grpc.stub.StreamObservers.copyWithFlowControl(
new Iterator<Messages.SimpleResponse>() {
@Override
public boolean hasNext() {
return !shutdown.get() && !responseObserver.isCancelled();
}
@Override
public Messages.SimpleResponse next() {
return response;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
},
responseObserver);
Runnable onReady = () -> {
if (shutdown.get()) {
responseObserver.onCompleted();
return;
}
while (responseObserver.isReady() && !responseObserver.isCancelled()) {
responseObserver.onNext(response);
}
};
responseObserver.setOnReadyHandler(onReady);
onReady.run();
}
@Override
@SuppressWarnings("deprecation") // For StreamObservers, ideally we refactor this class out.
public StreamObserver<Messages.SimpleRequest> streamingBothWays(
final StreamObserver<Messages.SimpleResponse> observer) {
// receive data forever and send data forever until client cancels or we shut down.
final ServerCallStreamObserver<Messages.SimpleResponse> responseObserver =
(ServerCallStreamObserver<Messages.SimpleResponse>) observer;
// If the client cancels, copyWithFlowControl takes care of calling
// responseObserver.onCompleted() for us
io.grpc.stub.StreamObservers.copyWithFlowControl(
new Iterator<Messages.SimpleResponse>() {
@Override
public boolean hasNext() {
return !shutdown.get() && !responseObserver.isCancelled();
}
@Override
public Messages.SimpleResponse next() {
return BIDI_RESPONSE;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
},
responseObserver
);
Runnable onReady = () -> {
if (shutdown.get()) {
responseObserver.onCompleted();
return;
}
while (responseObserver.isReady() && !responseObserver.isCancelled()) {
responseObserver.onNext(BIDI_RESPONSE);
}
};
responseObserver.setOnReadyHandler(onReady);
onReady.run();
return new StreamObserver<Messages.SimpleRequest>() {
@Override
public void onNext(final Messages.SimpleRequest request) {