alts: create handshaker RPC lazily (#7630)

* alts: create handshaker RPC lazily

* alts: address review comments
This commit is contained in:
Jiangtao Li 2020-11-17 17:36:09 -08:00 committed by GitHub
parent d7a00e6047
commit 24e4d68282
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 17 additions and 7 deletions

View File

@ -29,7 +29,8 @@ import java.util.concurrent.atomic.AtomicReference;
/** An interface to the ALTS handshaker service. */
class AltsHandshakerStub {
private final StreamObserver<HandshakerResp> reader = new Reader();
private final StreamObserver<HandshakerReq> writer;
private StreamObserver<HandshakerReq> writer;
private final HandshakerServiceStub serviceStub;
private final ArrayBlockingQueue<Optional<HandshakerResp>> responseQueue =
new ArrayBlockingQueue<>(1);
private final AtomicReference<String> exceptionMessage = new AtomicReference<>();
@ -37,20 +38,18 @@ class AltsHandshakerStub {
private static final long HANDSHAKE_RPC_DEADLINE_SECS = 20;
AltsHandshakerStub(HandshakerServiceStub serviceStub) {
this.writer =
serviceStub
.withDeadlineAfter(HANDSHAKE_RPC_DEADLINE_SECS, SECONDS)
.doHandshake(this.reader);
this.serviceStub = serviceStub;
}
@VisibleForTesting
AltsHandshakerStub() {
writer = null;
serviceStub = null;
}
@VisibleForTesting
AltsHandshakerStub(StreamObserver<HandshakerReq> writer) {
this.writer = writer;
serviceStub = null;
}
@VisibleForTesting
@ -60,6 +59,7 @@ class AltsHandshakerStub {
/** Send a handshaker request and return the handshaker response. */
public HandshakerResp send(HandshakerReq req) throws InterruptedException, IOException {
createWriterIfNull();
maybeThrowIoException();
if (!responseQueue.isEmpty()) {
throw new IOException("Received an unexpected response.");
@ -72,6 +72,14 @@ class AltsHandshakerStub {
return result.get();
}
/** Create a new writer if the writer is null. */
private void createWriterIfNull() {
if (writer == null) {
writer =
serviceStub.withDeadlineAfter(HANDSHAKE_RPC_DEADLINE_SECS, SECONDS).doHandshake(reader);
}
}
/** Throw exception if there is an outstanding exception. */
private void maybeThrowIoException() throws IOException {
if (exceptionMessage.get() != null) {
@ -81,7 +89,9 @@ class AltsHandshakerStub {
/** Close the connection. */
public void close() {
writer.onCompleted();
if (writer != null) {
writer.onCompleted();
}
}
private class Reader implements StreamObserver<HandshakerResp> {