okhttp: throws correct exception in AsyncSink when write/flush happened before

connected. This change will suppress NPE logs when the transport encountered
any error during connection phrase.
This commit is contained in:
Jihun Cho 2019-01-18 14:09:14 -08:00
parent abdae25c8d
commit 0e32152229
2 changed files with 73 additions and 23 deletions

View File

@ -85,19 +85,15 @@ final class AsyncSink implements Sink {
} }
writeEnqueued = true; writeEnqueued = true;
} }
serializingExecutor.execute(new Runnable() { serializingExecutor.execute(new WriteRunnable() {
@Override @Override
public void run() { public void doRun() throws IOException {
Buffer buf = new Buffer(); Buffer buf = new Buffer();
synchronized (lock) { synchronized (lock) {
buf.write(buffer, buffer.completeSegmentByteCount()); buf.write(buffer, buffer.completeSegmentByteCount());
writeEnqueued = false; writeEnqueued = false;
} }
try {
sink.write(buf, buf.size()); sink.write(buf, buf.size());
} catch (IOException e) {
transportExceptionHandler.onException(e);
}
} }
}); });
} }
@ -113,20 +109,16 @@ final class AsyncSink implements Sink {
} }
flushEnqueued = true; flushEnqueued = true;
} }
serializingExecutor.execute(new Runnable() { serializingExecutor.execute(new WriteRunnable() {
@Override @Override
public void run() { public void doRun() throws IOException {
Buffer buf = new Buffer(); Buffer buf = new Buffer();
synchronized (lock) { synchronized (lock) {
buf.write(buffer, buffer.size()); buf.write(buffer, buffer.size());
flushEnqueued = false; flushEnqueued = false;
} }
try {
sink.write(buf, buf.size()); sink.write(buf, buf.size());
sink.flush(); sink.flush();
} catch (IOException e) {
transportExceptionHandler.onException(e);
}
} }
}); });
} }
@ -147,16 +139,36 @@ final class AsyncSink implements Sink {
public void run() { public void run() {
buffer.close(); buffer.close();
try { try {
if (sink != null) {
sink.close(); sink.close();
}
} catch (IOException e) { } catch (IOException e) {
transportExceptionHandler.onException(e); transportExceptionHandler.onException(e);
} }
try { try {
if (socket != null) {
socket.close(); socket.close();
}
} catch (IOException e) { } catch (IOException e) {
transportExceptionHandler.onException(e); transportExceptionHandler.onException(e);
} }
} }
}); });
} }
private abstract class WriteRunnable implements Runnable {
@Override
public final void run() {
try {
if (sink == null) {
throw new IOException("Unable to perform write due to unavailable sink.");
}
doRun();
} catch (Exception e) {
transportExceptionHandler.onException(e);
}
}
public abstract void doRun() throws IOException;
}
} }

View File

@ -41,10 +41,10 @@ import java.util.concurrent.Executor;
import okio.Buffer; import okio.Buffer;
import okio.Sink; import okio.Sink;
import okio.Timeout; import okio.Timeout;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.junit.runners.JUnit4; import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.InOrder; import org.mockito.InOrder;
/** Tests for {@link AsyncSink}. */ /** Tests for {@link AsyncSink}. */
@ -58,14 +58,10 @@ public class AsyncSinkTest {
private final AsyncSink sink = private final AsyncSink sink =
AsyncSink.sink(new SerializingExecutor(queueingExecutor), exceptionHandler); AsyncSink.sink(new SerializingExecutor(queueingExecutor), exceptionHandler);
@Before
public void setUp() throws Exception {
sink.becomeConnected(mockedSink, socket);
}
@Test @Test
public void noCoalesceRequired() throws IOException { public void noCoalesceRequired() throws IOException {
Buffer buffer = new Buffer(); Buffer buffer = new Buffer();
sink.becomeConnected(mockedSink, socket);
sink.write(buffer.writeUtf8("hello"), buffer.size()); sink.write(buffer.writeUtf8("hello"), buffer.size());
sink.flush(); sink.flush();
queueingExecutor.runAll(); queueingExecutor.runAll();
@ -80,6 +76,7 @@ public class AsyncSinkTest {
byte[] firstData = "a string".getBytes(Charsets.UTF_8); byte[] firstData = "a string".getBytes(Charsets.UTF_8);
byte[] secondData = "a longer string".getBytes(Charsets.UTF_8); byte[] secondData = "a longer string".getBytes(Charsets.UTF_8);
sink.becomeConnected(mockedSink, socket);
Buffer buffer = new Buffer(); Buffer buffer = new Buffer();
sink.write(buffer.write(firstData), buffer.size()); sink.write(buffer.write(firstData), buffer.size());
sink.flush(); sink.flush();
@ -101,6 +98,7 @@ public class AsyncSinkTest {
byte[] firstData = "a string".getBytes(Charsets.UTF_8); byte[] firstData = "a string".getBytes(Charsets.UTF_8);
byte[] secondData = "a longer string".getBytes(Charsets.UTF_8); byte[] secondData = "a longer string".getBytes(Charsets.UTF_8);
Buffer buffer = new Buffer().write(firstData); Buffer buffer = new Buffer().write(firstData);
sink.becomeConnected(mockedSink, socket);
sink.write(buffer, buffer.size()); sink.write(buffer, buffer.size());
sink.flush(); sink.flush();
buffer = new Buffer().write(secondData); buffer = new Buffer().write(secondData);
@ -120,6 +118,7 @@ public class AsyncSinkTest {
byte[] firstData = "a string".getBytes(Charsets.UTF_8); byte[] firstData = "a string".getBytes(Charsets.UTF_8);
byte[] secondData = "a longer string".getBytes(Charsets.UTF_8); byte[] secondData = "a longer string".getBytes(Charsets.UTF_8);
Buffer buffer = new Buffer(); Buffer buffer = new Buffer();
sink.becomeConnected(mockedSink, socket);
sink.write(buffer.write(firstData), buffer.size()); sink.write(buffer.write(firstData), buffer.size());
sink.write(buffer.write(secondData), buffer.size()); sink.write(buffer.write(secondData), buffer.size());
sink.flush(); sink.flush();
@ -138,6 +137,7 @@ public class AsyncSinkTest {
.when(mockedSink).write(any(Buffer.class), anyLong()); .when(mockedSink).write(any(Buffer.class), anyLong());
Buffer buffer = new Buffer(); Buffer buffer = new Buffer();
buffer.writeUtf8("any message"); buffer.writeUtf8("any message");
sink.becomeConnected(mockedSink, socket);
sink.write(buffer, buffer.size()); sink.write(buffer, buffer.size());
sink.flush(); sink.flush();
queueingExecutor.runAll(); queueingExecutor.runAll();
@ -166,6 +166,7 @@ public class AsyncSinkTest {
.when(mockedSink).write(any(Buffer.class), anyLong()); .when(mockedSink).write(any(Buffer.class), anyLong());
Buffer buffer = new Buffer(); Buffer buffer = new Buffer();
buffer.writeUtf8("any message"); buffer.writeUtf8("any message");
sink.becomeConnected(mockedSink, socket);
sink.write(buffer, buffer.size()); sink.write(buffer, buffer.size());
sink.close(); sink.close();
queueingExecutor.runAll(); queueingExecutor.runAll();
@ -180,6 +181,7 @@ public class AsyncSinkTest {
@Test @Test
public void close_flushShouldThrowException() throws IOException { public void close_flushShouldThrowException() throws IOException {
sink.becomeConnected(mockedSink, socket);
sink.close(); sink.close();
queueingExecutor.runAll(); queueingExecutor.runAll();
try { try {
@ -195,6 +197,7 @@ public class AsyncSinkTest {
public void flush_shouldThrowIfAlreadyClosed() throws IOException { public void flush_shouldThrowIfAlreadyClosed() throws IOException {
Buffer buffer = new Buffer(); Buffer buffer = new Buffer();
buffer.writeUtf8("any message"); buffer.writeUtf8("any message");
sink.becomeConnected(mockedSink, socket);
sink.write(buffer, buffer.size()); sink.write(buffer, buffer.size());
sink.close(); sink.close();
queueingExecutor.runAll(); queueingExecutor.runAll();
@ -210,6 +213,7 @@ public class AsyncSinkTest {
@Test @Test
public void write_callSinkIfBufferIsLargerThanSegmentSize() throws IOException { public void write_callSinkIfBufferIsLargerThanSegmentSize() throws IOException {
Buffer buffer = new Buffer(); Buffer buffer = new Buffer();
sink.becomeConnected(mockedSink, socket);
// OkHttp is using 8192 as segment size. // OkHttp is using 8192 as segment size.
int payloadSize = 8192 * 2 - 1; int payloadSize = 8192 * 2 - 1;
int padding = 10; int padding = 10;
@ -240,6 +244,40 @@ public class AsyncSinkTest {
verify(mockedSink).flush(); verify(mockedSink).flush();
} }
@Test
public void writeAndFlush_beforeConnected() throws IOException {
Buffer buffer = new Buffer();
sink.write(buffer.writeUtf8("hello"), buffer.size());
sink.flush();
queueingExecutor.runAll();
verify(mockedSink, never()).write(any(Buffer.class), anyLong());
verify(mockedSink, never()).flush();
ArgumentCaptor<Throwable> captor = ArgumentCaptor.forClass(Throwable.class);
verify(exceptionHandler).onException(captor.capture());
Throwable t = captor.getValue();
assertThat(t).isInstanceOf(IOException.class);
assertThat(t).hasMessageThat().contains("unavailable sink");
}
@Test
public void close_multipleCloseShouldNotThrow() throws IOException {
sink.becomeConnected(mockedSink, socket);
sink.close();
queueingExecutor.runAll();
verify(exceptionHandler, never()).onException(any(Throwable.class));
sink.close();
queueingExecutor.runAll();
verify(exceptionHandler, never()).onException(any(Throwable.class));
}
/** /**
* Executor queues incoming runnables instead of running it. Runnables can be invoked via {@link * Executor queues incoming runnables instead of running it. Runnables can be invoked via {@link
* QueueingExecutor#runAll} in serial order. * QueueingExecutor#runAll} in serial order.