mirror of https://github.com/grpc/grpc-java.git
core: fix old ClientStreamTracer.Factory creating tracers twice (#8381)
Fix a bug introduced in #8355 : old ClientStreamTracer.Factory implementation creates tracers twice.
This commit is contained in:
parent
0d80c33bce
commit
c77083f013
|
|
@ -65,7 +65,6 @@ import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
import java.util.concurrent.ThreadFactory;
|
import java.util.concurrent.ThreadFactory;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
|
||||||
import java.util.logging.Level;
|
import java.util.logging.Level;
|
||||||
import java.util.logging.Logger;
|
import java.util.logging.Logger;
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
|
|
@ -785,15 +784,22 @@ public final class GrpcUtil {
|
||||||
} else {
|
} else {
|
||||||
streamTracer = new ForwardingClientStreamTracer() {
|
streamTracer = new ForwardingClientStreamTracer() {
|
||||||
final ClientStreamTracer noop = new ClientStreamTracer() {};
|
final ClientStreamTracer noop = new ClientStreamTracer() {};
|
||||||
AtomicReference<ClientStreamTracer> delegate = new AtomicReference<>(noop);
|
volatile ClientStreamTracer delegate = noop;
|
||||||
|
|
||||||
void maybeInit(StreamInfo info, Metadata headers) {
|
void maybeInit(StreamInfo info, Metadata headers) {
|
||||||
delegate.compareAndSet(noop, streamTracerFactory.newClientStreamTracer(info, headers));
|
if (delegate != noop) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
synchronized (this) {
|
||||||
|
if (delegate == noop) {
|
||||||
|
delegate = streamTracerFactory.newClientStreamTracer(info, headers);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected ClientStreamTracer delegate() {
|
protected ClientStreamTracer delegate() {
|
||||||
return delegate.get();
|
return delegate;
|
||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
|
|
|
||||||
|
|
@ -38,6 +38,7 @@ import io.grpc.Status;
|
||||||
import io.grpc.internal.ClientStreamListener.RpcProgress;
|
import io.grpc.internal.ClientStreamListener.RpcProgress;
|
||||||
import io.grpc.internal.GrpcUtil.Http2Error;
|
import io.grpc.internal.GrpcUtil.Http2Error;
|
||||||
import io.grpc.testing.TestMethodDescriptors;
|
import io.grpc.testing.TestMethodDescriptors;
|
||||||
|
import java.util.ArrayDeque;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
@ -301,12 +302,14 @@ public class GrpcUtilTest {
|
||||||
final AtomicReference<Attributes> transportAttrsRef = new AtomicReference<>();
|
final AtomicReference<Attributes> transportAttrsRef = new AtomicReference<>();
|
||||||
final ClientStreamTracer mockTracer = mock(ClientStreamTracer.class);
|
final ClientStreamTracer mockTracer = mock(ClientStreamTracer.class);
|
||||||
final Metadata.Key<String> key = Metadata.Key.of("fake-key", Metadata.ASCII_STRING_MARSHALLER);
|
final Metadata.Key<String> key = Metadata.Key.of("fake-key", Metadata.ASCII_STRING_MARSHALLER);
|
||||||
|
final ArrayDeque<ClientStreamTracer> tracers = new ArrayDeque<>();
|
||||||
ClientStreamTracer.Factory oldFactoryImpl = new ClientStreamTracer.Factory() {
|
ClientStreamTracer.Factory oldFactoryImpl = new ClientStreamTracer.Factory() {
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
@Override
|
@Override
|
||||||
public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
|
public ClientStreamTracer newClientStreamTracer(StreamInfo info, Metadata headers) {
|
||||||
transportAttrsRef.set(info.getTransportAttrs());
|
transportAttrsRef.set(info.getTransportAttrs());
|
||||||
headers.put(key, "fake-value");
|
headers.put(key, "fake-value");
|
||||||
|
tracers.offer(mockTracer);
|
||||||
return mockTracer;
|
return mockTracer;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
@ -318,8 +321,12 @@ public class GrpcUtilTest {
|
||||||
Attributes.newBuilder().set(Attributes.Key.<String>create("foo"), "bar").build();
|
Attributes.newBuilder().set(Attributes.Key.<String>create("foo"), "bar").build();
|
||||||
ClientStreamTracer tracer = GrpcUtil.newClientStreamTracer(oldFactoryImpl, info, metadata);
|
ClientStreamTracer tracer = GrpcUtil.newClientStreamTracer(oldFactoryImpl, info, metadata);
|
||||||
tracer.streamCreated(transAttrs, metadata);
|
tracer.streamCreated(transAttrs, metadata);
|
||||||
|
assertThat(tracers.poll()).isSameInstanceAs(mockTracer);
|
||||||
assertThat(transportAttrsRef.get()).isEqualTo(transAttrs);
|
assertThat(transportAttrsRef.get()).isEqualTo(transAttrs);
|
||||||
assertThat(metadata.get(key)).isEqualTo("fake-value");
|
assertThat(metadata.get(key)).isEqualTo("fake-value");
|
||||||
|
|
||||||
|
tracer.streamClosed(Status.UNAVAILABLE);
|
||||||
|
// verify that newClientStreamTracer() is called no more than once
|
||||||
|
assertThat(tracers).isEmpty();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue