From a4c821d94e84d839498b08572c3568adac6bd205 Mon Sep 17 00:00:00 2001 From: Louis Ryan Date: Wed, 26 Aug 2015 14:15:30 -0700 Subject: [PATCH] Rename onValue to onNext in StreamObserver to align with the naming in Rx & the proposed Flow API in Java9 --- .../benchmarks/netty/AbstractBenchmark.java | 14 +++++----- .../io/grpc/benchmarks/qps/AsyncClient.java | 8 +++--- .../io/grpc/benchmarks/qps/AsyncServer.java | 6 ++-- .../grpc/benchmarks/qps/OpenLoopClient.java | 2 +- .../examples/header/CustomHeaderServer.java | 2 +- .../examples/helloworld/HelloWorldServer.java | 2 +- .../examples/routeguide/RouteGuideClient.java | 8 +++--- .../examples/routeguide/RouteGuideServer.java | 12 ++++---- .../integration/AbstractTransportTest.java | 28 +++++++++---------- .../testing/integration/TestServiceImpl.java | 14 +++++----- .../main/java/io/grpc/stub/ClientCalls.java | 4 +-- .../main/java/io/grpc/stub/ServerCalls.java | 4 +-- .../java/io/grpc/stub/StreamObserver.java | 2 +- .../java/io/grpc/stub/StreamRecorder.java | 2 +- 14 files changed, 54 insertions(+), 54 deletions(-) diff --git a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/AbstractBenchmark.java b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/AbstractBenchmark.java index a04c284314..6b95f4bb18 100644 --- a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/AbstractBenchmark.java +++ b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/AbstractBenchmark.java @@ -372,7 +372,7 @@ public abstract class AbstractBenchmark { for (int i = 0; i < callsPerChannel; i++) { StreamObserver observer = new StreamObserver() { @Override - public void onValue(ByteBuf value) { + public void onNext(ByteBuf value) { counter.addAndGet(counterDelta); } @@ -414,10 +414,10 @@ public abstract class AbstractBenchmark { streamingCall, new StreamObserver() { @Override - public void onValue(ByteBuf value) { + public void onNext(ByteBuf value) { if (!done.get()) { counter.addAndGet(counterDelta); - requestObserverRef.get().onValue(request.slice()); + requestObserverRef.get().onNext(request.slice()); streamingCall.request(1); } } @@ -432,8 +432,8 @@ public abstract class AbstractBenchmark { } }); requestObserverRef.set(requestObserver); - requestObserver.onValue(request.slice()); - requestObserver.onValue(request.slice()); + requestObserver.onNext(request.slice()); + requestObserver.onNext(request.slice()); } } } @@ -457,7 +457,7 @@ public abstract class AbstractBenchmark { streamingCall, new StreamObserver() { @Override - public void onValue(ByteBuf value) { + public void onNext(ByteBuf value) { if (!done.get()) { counter.addAndGet(counterDelta); streamingCall.request(1); @@ -474,7 +474,7 @@ public abstract class AbstractBenchmark { } }); requestObserverRef.set(requestObserver); - requestObserver.onValue(request.slice()); + requestObserver.onNext(request.slice()); } } } diff --git a/benchmarks/src/main/java/io/grpc/benchmarks/qps/AsyncClient.java b/benchmarks/src/main/java/io/grpc/benchmarks/qps/AsyncClient.java index d002e008c1..4588e7e615 100644 --- a/benchmarks/src/main/java/io/grpc/benchmarks/qps/AsyncClient.java +++ b/benchmarks/src/main/java/io/grpc/benchmarks/qps/AsyncClient.java @@ -179,7 +179,7 @@ public class AsyncClient { long lastCall = System.nanoTime(); @Override - public void onValue(SimpleResponse value) { + public void onNext(SimpleResponse value) { } @Override @@ -220,7 +220,7 @@ public class AsyncClient { StreamObserver requestObserver = stub.streamingCall(responseObserver); responseObserver.requestObserver = requestObserver; - requestObserver.onValue(request); + requestObserver.onNext(request); return future; } @@ -249,14 +249,14 @@ public class AsyncClient { } @Override - public void onValue(SimpleResponse value) { + public void onNext(SimpleResponse value) { long now = System.nanoTime(); // Record the latencies in microseconds histogram.recordValue((now - lastCall) / 1000); lastCall = now; if (endTime > now) { - requestObserver.onValue(request); + requestObserver.onNext(request); } else { requestObserver.onCompleted(); } diff --git a/benchmarks/src/main/java/io/grpc/benchmarks/qps/AsyncServer.java b/benchmarks/src/main/java/io/grpc/benchmarks/qps/AsyncServer.java index d56bc5795c..857de41291 100644 --- a/benchmarks/src/main/java/io/grpc/benchmarks/qps/AsyncServer.java +++ b/benchmarks/src/main/java/io/grpc/benchmarks/qps/AsyncServer.java @@ -184,7 +184,7 @@ public class AsyncServer { @Override public void unaryCall(SimpleRequest request, StreamObserver responseObserver) { SimpleResponse response = buildSimpleResponse(request); - responseObserver.onValue(response); + responseObserver.onNext(response); responseObserver.onCompleted(); } @@ -193,9 +193,9 @@ public class AsyncServer { final StreamObserver responseObserver) { return new StreamObserver() { @Override - public void onValue(SimpleRequest request) { + public void onNext(SimpleRequest request) { SimpleResponse response = buildSimpleResponse(request); - responseObserver.onValue(response); + responseObserver.onNext(response); } @Override diff --git a/benchmarks/src/main/java/io/grpc/benchmarks/qps/OpenLoopClient.java b/benchmarks/src/main/java/io/grpc/benchmarks/qps/OpenLoopClient.java index 8b407a6911..2f65fac9e4 100644 --- a/benchmarks/src/main/java/io/grpc/benchmarks/qps/OpenLoopClient.java +++ b/benchmarks/src/main/java/io/grpc/benchmarks/qps/OpenLoopClient.java @@ -203,7 +203,7 @@ public class OpenLoopClient { private final long start = System.nanoTime(); @Override - public void onValue(SimpleResponse value) { + public void onNext(SimpleResponse value) { } @Override diff --git a/examples/src/main/java/io/grpc/examples/header/CustomHeaderServer.java b/examples/src/main/java/io/grpc/examples/header/CustomHeaderServer.java index 0a68f7ddae..7454f68936 100644 --- a/examples/src/main/java/io/grpc/examples/header/CustomHeaderServer.java +++ b/examples/src/main/java/io/grpc/examples/header/CustomHeaderServer.java @@ -87,7 +87,7 @@ public class CustomHeaderServer { @Override public void sayHello(HelloRequest req, StreamObserver responseObserver) { HelloResponse reply = HelloResponse.newBuilder().setMessage("Hello " + req.getName()).build(); - responseObserver.onValue(reply); + responseObserver.onNext(reply); responseObserver.onCompleted(); } } diff --git a/examples/src/main/java/io/grpc/examples/helloworld/HelloWorldServer.java b/examples/src/main/java/io/grpc/examples/helloworld/HelloWorldServer.java index 5a8a028c68..65a35b7c83 100644 --- a/examples/src/main/java/io/grpc/examples/helloworld/HelloWorldServer.java +++ b/examples/src/main/java/io/grpc/examples/helloworld/HelloWorldServer.java @@ -82,7 +82,7 @@ public class HelloWorldServer { @Override public void sayHello(HelloRequest req, StreamObserver responseObserver) { HelloResponse reply = HelloResponse.newBuilder().setMessage("Hello " + req.getName()).build(); - responseObserver.onValue(reply); + responseObserver.onNext(reply); responseObserver.onCompleted(); } } diff --git a/examples/src/main/java/io/grpc/examples/routeguide/RouteGuideClient.java b/examples/src/main/java/io/grpc/examples/routeguide/RouteGuideClient.java index 39a09590c6..8013f0fd84 100644 --- a/examples/src/main/java/io/grpc/examples/routeguide/RouteGuideClient.java +++ b/examples/src/main/java/io/grpc/examples/routeguide/RouteGuideClient.java @@ -132,7 +132,7 @@ public class RouteGuideClient { final SettableFuture finishFuture = SettableFuture.create(); StreamObserver responseObserver = new StreamObserver() { @Override - public void onValue(RouteSummary summary) { + public void onNext(RouteSummary summary) { info("Finished trip with {0} points. Passed {1} features. " + "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(), summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime()); @@ -159,7 +159,7 @@ public class RouteGuideClient { Point point = features.get(index).getLocation(); info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point), RouteGuideUtil.getLongitude(point)); - requestObserver.onValue(point); + requestObserver.onNext(point); // Sleep for a bit before sending the next one. Thread.sleep(rand.nextInt(1000) + 500); if (finishFuture.isDone()) { @@ -188,7 +188,7 @@ public class RouteGuideClient { StreamObserver requestObserver = asyncStub.routeChat(new StreamObserver() { @Override - public void onValue(RouteNote note) { + public void onNext(RouteNote note) { info("Got message \"{0}\" at {1}, {2}", note.getMessage(), note.getLocation() .getLatitude(), note.getLocation().getLongitude()); } @@ -212,7 +212,7 @@ public class RouteGuideClient { for (RouteNote request : requests) { info("Sending message \"{0}\" at {1}, {2}", request.getMessage(), request.getLocation() .getLatitude(), request.getLocation().getLongitude()); - requestObserver.onValue(request); + requestObserver.onNext(request); } requestObserver.onCompleted(); diff --git a/examples/src/main/java/io/grpc/examples/routeguide/RouteGuideServer.java b/examples/src/main/java/io/grpc/examples/routeguide/RouteGuideServer.java index 2cf6157874..78f26d2013 100644 --- a/examples/src/main/java/io/grpc/examples/routeguide/RouteGuideServer.java +++ b/examples/src/main/java/io/grpc/examples/routeguide/RouteGuideServer.java @@ -131,7 +131,7 @@ public class RouteGuideServer { */ @Override public void getFeature(Point request, StreamObserver responseObserver) { - responseObserver.onValue(checkFeature(request)); + responseObserver.onNext(checkFeature(request)); responseObserver.onCompleted(); } @@ -156,7 +156,7 @@ public class RouteGuideServer { int lat = feature.getLocation().getLatitude(); int lon = feature.getLocation().getLongitude(); if (lon >= left && lon <= right && lat >= bottom && lat <= top) { - responseObserver.onValue(feature); + responseObserver.onNext(feature); } } responseObserver.onCompleted(); @@ -179,7 +179,7 @@ public class RouteGuideServer { long startTime = System.nanoTime(); @Override - public void onValue(Point point) { + public void onNext(Point point) { pointCount++; if (RouteGuideUtil.exists(checkFeature(point))) { featureCount++; @@ -200,7 +200,7 @@ public class RouteGuideServer { @Override public void onCompleted() { long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime); - responseObserver.onValue(RouteSummary.newBuilder().setPointCount(pointCount) + responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount) .setFeatureCount(featureCount).setDistance(distance) .setElapsedTime((int) seconds).build()); responseObserver.onCompleted(); @@ -219,12 +219,12 @@ public class RouteGuideServer { public StreamObserver routeChat(final StreamObserver responseObserver) { return new StreamObserver() { @Override - public void onValue(RouteNote note) { + public void onNext(RouteNote note) { List notes = getOrCreateNotes(note.getLocation()); // Respond with all previous notes at this location. for (RouteNote prevNote : notes.toArray(new RouteNote[0])) { - responseObserver.onValue(prevNote); + responseObserver.onNext(prevNote); } // Now add the new note to the list diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractTransportTest.java b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractTransportTest.java index 77b04002c5..7bc8d94514 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractTransportTest.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractTransportTest.java @@ -240,7 +240,7 @@ public abstract class AbstractTransportTest { StreamObserver requestObserver = asyncStub.streamingInputCall(responseObserver); for (StreamingInputCallRequest request : requests) { - requestObserver.onValue(request); + requestObserver.onNext(request); } requestObserver.onCompleted(); assertEquals(goldenResponse, responseObserver.firstValue().get()); @@ -300,8 +300,8 @@ public abstract class AbstractTransportTest { StreamObserver requestObserver = asyncStub.fullDuplexCall(responseObserver); for (int i = 0; i < requests.size(); i++) { - requestObserver.onValue(requests.get(i)); - verify(responseObserver, timeout(OPERATION_TIMEOUT)).onValue(goldenResponses.get(i)); + requestObserver.onNext(requests.get(i)); + verify(responseObserver, timeout(OPERATION_TIMEOUT)).onNext(goldenResponses.get(i)); verifyNoMoreInteractions(responseObserver); } requestObserver.onCompleted(); @@ -349,8 +349,8 @@ public abstract class AbstractTransportTest { StreamObserver responseObserver = mock(StreamObserver.class); StreamObserver requestObserver = asyncStub.fullDuplexCall(responseObserver); - requestObserver.onValue(request); - verify(responseObserver, timeout(OPERATION_TIMEOUT)).onValue(goldenResponse); + requestObserver.onNext(request); + verify(responseObserver, timeout(OPERATION_TIMEOUT)).onNext(goldenResponse); verifyNoMoreInteractions(responseObserver); requestObserver.onError(new RuntimeException()); @@ -378,7 +378,7 @@ public abstract class AbstractTransportTest { final int numRequests = 10; for (int ix = numRequests; ix > 0; --ix) { - requestStream.onValue(request); + requestStream.onNext(request); } requestStream.onCompleted(); recorder.awaitCompletion(); @@ -410,7 +410,7 @@ public abstract class AbstractTransportTest { final int numRequests = 10; for (int ix = numRequests; ix > 0; --ix) { - requestStream.onValue(request); + requestStream.onNext(request); } requestStream.onCompleted(); recorder.awaitCompletion(); @@ -570,7 +570,7 @@ public abstract class AbstractTransportTest { final int numRequests = 10; for (int ix = numRequests; ix > 0; --ix) { - requestStream.onValue(request); + requestStream.onNext(request); } requestStream.onCompleted(); recorder.awaitCompletion(); @@ -695,15 +695,15 @@ public abstract class AbstractTransportTest { StreamObserver responseObserver = mock(StreamObserver.class); StreamObserver requestObserver = asyncStub.fullDuplexCall(responseObserver); - requestObserver.onValue(requests.get(0)); - verify(responseObserver, timeout(OPERATION_TIMEOUT)).onValue(goldenResponses.get(0)); + requestObserver.onNext(requests.get(0)); + verify(responseObserver, timeout(OPERATION_TIMEOUT)).onNext(goldenResponses.get(0)); // Initiate graceful shutdown. channel.shutdown(); - requestObserver.onValue(requests.get(1)); - verify(responseObserver, timeout(OPERATION_TIMEOUT)).onValue(goldenResponses.get(1)); + requestObserver.onNext(requests.get(1)); + verify(responseObserver, timeout(OPERATION_TIMEOUT)).onNext(goldenResponses.get(1)); // The previous ping-pong could have raced with the shutdown, but this one certainly shouldn't. - requestObserver.onValue(requests.get(2)); - verify(responseObserver, timeout(OPERATION_TIMEOUT)).onValue(goldenResponses.get(2)); + requestObserver.onNext(requests.get(2)); + verify(responseObserver, timeout(OPERATION_TIMEOUT)).onNext(goldenResponses.get(2)); requestObserver.onCompleted(); verify(responseObserver, timeout(OPERATION_TIMEOUT)).onCompleted(); verifyNoMoreInteractions(responseObserver); diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java index 1f8ed80bdf..b3de8b6244 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/TestServiceImpl.java @@ -78,7 +78,7 @@ public class TestServiceImpl implements TestServiceGrpc.TestService { @Override public void emptyCall(EmptyProtos.Empty empty, StreamObserver responseObserver) { - responseObserver.onValue(EmptyProtos.Empty.getDefaultInstance()); + responseObserver.onNext(EmptyProtos.Empty.getDefaultInstance()); responseObserver.onCompleted(); } @@ -101,7 +101,7 @@ public class TestServiceImpl implements TestServiceGrpc.TestService { .setType(compressable ? PayloadType.COMPRESSABLE : PayloadType.UNCOMPRESSABLE) .setBody(payload); } - responseObserver.onValue(responseBuilder.build()); + responseObserver.onNext(responseBuilder.build()); responseObserver.onCompleted(); } @@ -127,13 +127,13 @@ public class TestServiceImpl implements TestServiceGrpc.TestService { private int totalPayloadSize; @Override - public void onValue(StreamingInputCallRequest message) { + public void onNext(StreamingInputCallRequest message) { totalPayloadSize += message.getPayload().getBody().size(); } @Override public void onCompleted() { - responseObserver.onValue(StreamingInputCallResponse.newBuilder() + responseObserver.onNext(StreamingInputCallResponse.newBuilder() .setAggregatedPayloadSize(totalPayloadSize).build()); responseObserver.onCompleted(); } @@ -155,7 +155,7 @@ public class TestServiceImpl implements TestServiceGrpc.TestService { final ResponseDispatcher dispatcher = new ResponseDispatcher(responseObserver); return new StreamObserver() { @Override - public void onValue(StreamingOutputCallRequest request) { + public void onNext(StreamingOutputCallRequest request) { dispatcher.enqueue(toChunkQueue(request)); } @@ -182,7 +182,7 @@ public class TestServiceImpl implements TestServiceGrpc.TestService { final Queue chunks = new LinkedList(); return new StreamObserver() { @Override - public void onValue(StreamingOutputCallRequest request) { + public void onNext(StreamingOutputCallRequest request) { chunks.addAll(toChunkQueue(request)); } @@ -271,7 +271,7 @@ public class TestServiceImpl implements TestServiceGrpc.TestService { // Pop off the next chunk and send it to the client. Chunk chunk = chunks.remove(); - responseStream.onValue(chunk.toResponse()); + responseStream.onNext(chunk.toResponse()); } catch (Throwable e) { responseStream.onError(e); diff --git a/stub/src/main/java/io/grpc/stub/ClientCalls.java b/stub/src/main/java/io/grpc/stub/ClientCalls.java index efc09346a0..ecb0d5dd5e 100644 --- a/stub/src/main/java/io/grpc/stub/ClientCalls.java +++ b/stub/src/main/java/io/grpc/stub/ClientCalls.java @@ -206,7 +206,7 @@ public class ClientCalls { } @Override - public void onValue(T value) { + public void onNext(T value) { call.sendMessage(value); } @@ -248,7 +248,7 @@ public class ClientCalls { .asRuntimeException(); } firstResponseReceived = true; - observer.onValue(message); + observer.onNext(message); if (streamingResponse) { // Request delivery of the next inbound message. diff --git a/stub/src/main/java/io/grpc/stub/ServerCalls.java b/stub/src/main/java/io/grpc/stub/ServerCalls.java index 3e2828369f..4033d277a6 100644 --- a/stub/src/main/java/io/grpc/stub/ServerCalls.java +++ b/stub/src/main/java/io/grpc/stub/ServerCalls.java @@ -186,7 +186,7 @@ public class ServerCalls { @Override public void onMessage(ReqT request) { - requestObserver.onValue(request); + requestObserver.onNext(request); // Request delivery of the next inbound message. call.request(1); @@ -227,7 +227,7 @@ public class ServerCalls { } @Override - public void onValue(RespT response) { + public void onNext(RespT response) { if (cancelled) { throw Status.CANCELLED.asRuntimeException(); } diff --git a/stub/src/main/java/io/grpc/stub/StreamObserver.java b/stub/src/main/java/io/grpc/stub/StreamObserver.java index 482af60c9c..32a35b8f2b 100644 --- a/stub/src/main/java/io/grpc/stub/StreamObserver.java +++ b/stub/src/main/java/io/grpc/stub/StreamObserver.java @@ -55,7 +55,7 @@ public interface StreamObserver { * * @param value the value passed to the stream */ - void onValue(V value); + void onNext(V value); /** * Receives a terminating error from the stream. diff --git a/stub/src/main/java/io/grpc/stub/StreamRecorder.java b/stub/src/main/java/io/grpc/stub/StreamRecorder.java index eb473e1f4a..76aee7bb44 100644 --- a/stub/src/main/java/io/grpc/stub/StreamRecorder.java +++ b/stub/src/main/java/io/grpc/stub/StreamRecorder.java @@ -67,7 +67,7 @@ public class StreamRecorder implements StreamObserver { } @Override - public void onValue(T value) { + public void onNext(T value) { if (!firstValue.isDone()) { firstValue.set(value); }