mirror of https://github.com/grpc/grpc-java.git
xds: Fix fallback test FakeClock TSAN failure
d65d3942e increased the test speed of
connect_then_mainServerDown_fallbackServerUp by using FakeClock.
However, it introduced a data race because FakeClock is not thread-safe.
This change injects a single thread for gRPC callbacks such that
syncContext is run on a thread under the test's control.
A simpler approach would be to expose syncContext from XdsClientImpl for
testing. However, this test is in a different package and I wanted to
avoid adding a public method.
```
Read of size 8 at 0x00008dec9d50 by thread T25:
#0 io.grpc.internal.FakeClock$ScheduledExecutorImpl.schedule(Lio/grpc/internal/FakeClock$ScheduledTask;JLjava/util/concurrent/TimeUnit;)V FakeClock.java:140
#1 io.grpc.internal.FakeClock$ScheduledExecutorImpl.schedule(Ljava/lang/Runnable;JLjava/util/concurrent/TimeUnit;)Ljava/util/concurrent/ScheduledFuture; FakeClock.java:150
#2 io.grpc.SynchronizationContext.schedule(Ljava/lang/Runnable;JLjava/util/concurrent/TimeUnit;Ljava/util/concurrent/ScheduledExecutorService;)Lio/grpc/SynchronizationContext$ScheduledHandle; SynchronizationContext.java:153
#3 io.grpc.xds.client.ControlPlaneClient$AdsStream.handleRpcStreamClosed(Lio/grpc/Status;)V ControlPlaneClient.java:491
#4 io.grpc.xds.client.ControlPlaneClient$AdsStream.lambda$onStatusReceived$0(Lio/grpc/Status;)V ControlPlaneClient.java:429
#5 io.grpc.xds.client.ControlPlaneClient$AdsStream$$Lambda+0x00000001004a95d0.run()V ??
#6 io.grpc.SynchronizationContext.drain()V SynchronizationContext.java:96
#7 io.grpc.SynchronizationContext.execute(Ljava/lang/Runnable;)V SynchronizationContext.java:128
#8 io.grpc.xds.client.ControlPlaneClient$AdsStream.onStatusReceived(Lio/grpc/Status;)V ControlPlaneClient.java:428
#9 io.grpc.xds.GrpcXdsTransportFactory$EventHandlerToCallListenerAdapter.onClose(Lio/grpc/Status;Lio/grpc/Metadata;)V GrpcXdsTransportFactory.java:149
#10 io.grpc.PartialForwardingClientCallListener.onClose(Lio/grpc/Status;Lio/grpc/Metadata;)V PartialForwardingClientCallListener.java:39
...
Previous write of size 8 at 0x00008dec9d50 by thread T4 (mutexes: write M0, write M1, write M2, write M3):
#0 io.grpc.internal.FakeClock.forwardTime(JLjava/util/concurrent/TimeUnit;)I FakeClock.java:368
#1 io.grpc.xds.XdsClientFallbackTest.connect_then_mainServerDown_fallbackServerUp()V XdsClientFallbackTest.java:358
...
```
This commit is contained in:
parent
fc86084df5
commit
495a8906b2
|
|
@ -31,6 +31,8 @@ import static org.mockito.Mockito.verify;
|
|||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.grpc.ChannelCredentials;
|
||||
import io.grpc.Grpc;
|
||||
import io.grpc.MetricRecorder;
|
||||
import io.grpc.Status;
|
||||
import io.grpc.internal.ExponentialBackoffPolicy;
|
||||
|
|
@ -43,11 +45,14 @@ import io.grpc.xds.client.XdsClient;
|
|||
import io.grpc.xds.client.XdsClientImpl;
|
||||
import io.grpc.xds.client.XdsClientMetricReporter;
|
||||
import io.grpc.xds.client.XdsInitializationException;
|
||||
import io.grpc.xds.client.XdsTransportFactory;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Map;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.logging.Level;
|
||||
import java.util.logging.Logger;
|
||||
|
|
@ -338,9 +343,21 @@ public class XdsClientFallbackTest {
|
|||
public void connect_then_mainServerDown_fallbackServerUp() throws Exception {
|
||||
mainXdsServer.restartXdsServer();
|
||||
fallbackServer.restartXdsServer();
|
||||
ExecutorService executor = Executors.newFixedThreadPool(1);
|
||||
XdsTransportFactory xdsTransportFactory = new XdsTransportFactory() {
|
||||
@Override
|
||||
public XdsTransport create(Bootstrapper.ServerInfo serverInfo) {
|
||||
ChannelCredentials channelCredentials =
|
||||
(ChannelCredentials) serverInfo.implSpecificConfig();
|
||||
return new GrpcXdsTransportFactory.GrpcXdsTransport(
|
||||
Grpc.newChannelBuilder(serverInfo.target(), channelCredentials)
|
||||
.executor(executor)
|
||||
.build());
|
||||
}
|
||||
};
|
||||
XdsClientImpl xdsClient = CommonBootstrapperTestUtils.createXdsClient(
|
||||
new GrpcBootstrapperImpl().bootstrap(defaultBootstrapOverride()),
|
||||
DEFAULT_XDS_TRANSPORT_FACTORY, fakeClock, new ExponentialBackoffPolicy.Provider(),
|
||||
xdsTransportFactory, fakeClock, new ExponentialBackoffPolicy.Provider(),
|
||||
MessagePrinter.INSTANCE, xdsClientMetricReporter);
|
||||
|
||||
xdsClient.watchXdsResource(XdsListenerResource.getInstance(), MAIN_SERVER, ldsWatcher);
|
||||
|
|
@ -355,7 +372,8 @@ public class XdsClientFallbackTest {
|
|||
// Sleep for the ADS stream disconnect to be processed and for the retry to fail. Between those
|
||||
// two sleeps we need the fakeClock to progress by 1 second to restart the ADS stream.
|
||||
for (int i = 0; i < 5; i++) {
|
||||
fakeClock.forwardTime(1000, TimeUnit.MILLISECONDS);
|
||||
// FakeClock is not thread-safe, and the retry scheduling is concurrent to this test thread
|
||||
executor.submit(() -> fakeClock.forwardTime(1000, TimeUnit.MILLISECONDS)).get();
|
||||
TimeUnit.SECONDS.sleep(1);
|
||||
}
|
||||
|
||||
|
|
@ -393,6 +411,7 @@ public class XdsClientFallbackTest {
|
|||
fakeClock.forwardTime(15000, TimeUnit.MILLISECONDS); // Does not exist timer
|
||||
verify(cdsWatcher2, timeout(5000)).onResourceDoesNotExist(eq(CLUSTER_NAME));
|
||||
xdsClient.shutdown();
|
||||
executor.shutdown();
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
|||
Loading…
Reference in New Issue