From 90aefb26e71fcc8d3f45fa5580dd3131b262db27 Mon Sep 17 00:00:00 2001 From: Eric Anderson Date: Wed, 29 Jan 2025 14:26:35 -0800 Subject: [PATCH] core: Propagate authority override from LB exactly once Setting the authority is only useful when creating a real stream, as there will be a following pick otherwise. In addition, DelayedStream will buffer each call to setAuthority() in a list and we don't want that memory usage. Note that no LBs are using this feature yet, so users would not have been exposed to the memory use. We also needed to setAuthority() when the LB selected a subchannel on the first pick attempt. --- .../grpc/internal/DelayedClientTransport.java | 23 +++++--- .../internal/DelayedClientTransportTest.java | 54 +++++++------------ 2 files changed, 36 insertions(+), 41 deletions(-) diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java index 2ff94b7804..f3faa92d4a 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -140,9 +140,15 @@ final class DelayedClientTransport implements ManagedClientTransport { ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult, callOptions.isWaitForReady()); if (transport != null) { - return transport.newStream( + ClientStream stream = transport.newStream( args.getMethodDescriptor(), args.getHeaders(), callOptions, tracers); + // User code provided authority takes precedence over the LB provided one; this will be + // overwritten by ClientCallImpl if the application sets an authority override + if (pickResult.getAuthorityOverride() != null) { + stream.setAuthority(pickResult.getAuthorityOverride()); + } + return stream; } } // This picker's conclusion is "buffer". If there hasn't been a newer picker set (possible @@ -287,10 +293,6 @@ final class DelayedClientTransport implements ManagedClientTransport { for (final PendingStream stream : toProcess) { PickResult pickResult = picker.pickSubchannel(stream.args); CallOptions callOptions = stream.args.getCallOptions(); - // User code provided authority takes precedence over the LB provided one. - if (callOptions.getAuthority() == null && pickResult.getAuthorityOverride() != null) { - stream.setAuthority(pickResult.getAuthorityOverride()); - } final ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult, callOptions.isWaitForReady()); if (transport != null) { @@ -301,7 +303,7 @@ final class DelayedClientTransport implements ManagedClientTransport { if (callOptions.getExecutor() != null) { executor = callOptions.getExecutor(); } - Runnable runnable = stream.createRealStream(transport); + Runnable runnable = stream.createRealStream(transport, pickResult.getAuthorityOverride()); if (runnable != null) { executor.execute(runnable); } @@ -354,7 +356,7 @@ final class DelayedClientTransport implements ManagedClientTransport { } /** Runnable may be null. */ - private Runnable createRealStream(ClientTransport transport) { + private Runnable createRealStream(ClientTransport transport, String authorityOverride) { ClientStream realStream; Context origContext = context.attach(); try { @@ -364,6 +366,13 @@ final class DelayedClientTransport implements ManagedClientTransport { } finally { context.detach(origContext); } + if (authorityOverride != null) { + // User code provided authority takes precedence over the LB provided one; this will be + // overwritten by an enqueud call from ClientCallImpl if the application sets an authority + // override. We must call the real stream directly because stream.start() has likely already + // been called on the delayed stream. + realStream.setAuthority(authorityOverride); + } return setStream(realStream); } diff --git a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java index a5160552a9..f65e6abcf1 100644 --- a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java @@ -503,26 +503,11 @@ public class DelayedClientTransportTest { } @Test - public void reprocess_authorityOverridePresentInCallOptions_authorityOverrideFromLbIsIgnored() { - DelayedStream delayedStream = (DelayedStream) delayedTransport.newStream( - method, headers, callOptions, tracers); - delayedStream.start(mock(ClientStreamListener.class)); - SubchannelPicker picker = mock(SubchannelPicker.class); - PickResult pickResult = PickResult.withSubchannel( - mockSubchannel, null, "authority-override-hostname-from-lb"); - when(picker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn(pickResult); - - delayedTransport.reprocess(picker); - fakeExecutor.runDueTasks(); - - verify(mockRealStream, never()).setAuthority("authority-override-hostname-from-lb"); - } - - @Test - public void - reprocess_authorityOverrideNotInCallOptions_authorityOverrideFromLbIsSetIntoStream() { + public void reprocess_authorityOverrideFromLb() { + InOrder inOrder = inOrder(mockRealStream); DelayedStream delayedStream = (DelayedStream) delayedTransport.newStream( method, headers, callOptions.withAuthority(null), tracers); + delayedStream.setAuthority("authority-override-from-calloptions"); delayedStream.start(mock(ClientStreamListener.class)); SubchannelPicker picker = mock(SubchannelPicker.class); PickResult pickResult = PickResult.withSubchannel( @@ -536,7 +521,10 @@ public class DelayedClientTransportTest { delayedTransport.reprocess(picker); fakeExecutor.runDueTasks(); - verify(mockRealStream).setAuthority("authority-override-hostname-from-lb"); + // Must be set before start(), and may be overwritten + inOrder.verify(mockRealStream).setAuthority("authority-override-hostname-from-lb"); + inOrder.verify(mockRealStream).setAuthority("authority-override-from-calloptions"); + inOrder.verify(mockRealStream).start(any(ClientStreamListener.class)); } @Test @@ -563,28 +551,26 @@ public class DelayedClientTransportTest { } @Test - public void newStream_assignsTransport_authorityFromCallOptionsSupersedesAuthorityFromLB() { + public void newStream_authorityOverrideFromLb() { + InOrder inOrder = inOrder(mockRealStream); SubchannelPicker picker = mock(SubchannelPicker.class); - AbstractSubchannel subchannel = mock(AbstractSubchannel.class); - when(subchannel.getInternalSubchannel()).thenReturn(mockInternalSubchannel); PickResult pickResult = PickResult.withSubchannel( - subchannel, null, "authority-override-hostname-from-lb"); + mockSubchannel, null, "authority-override-hostname-from-lb"); when(picker.pickSubchannel(any(PickSubchannelArgs.class))).thenReturn(pickResult); - ArgumentCaptor callOptionsArgumentCaptor = - ArgumentCaptor.forClass(CallOptions.class); when(mockRealTransport.newStream( - any(MethodDescriptor.class), any(Metadata.class), callOptionsArgumentCaptor.capture(), - ArgumentMatchers.any())) + any(MethodDescriptor.class), any(Metadata.class), any(CallOptions.class), any())) .thenReturn(mockRealStream); delayedTransport.reprocess(picker); - verifyNoMoreInteractions(picker); - verifyNoMoreInteractions(transportListener); - CallOptions callOptions = - CallOptions.DEFAULT.withAuthority("authority-override-hosstname-from-calloptions"); - delayedTransport.newStream(method, headers, callOptions, tracers); - assertThat(callOptionsArgumentCaptor.getValue().getAuthority()).isEqualTo( - "authority-override-hosstname-from-calloptions"); + ClientStream stream = delayedTransport.newStream(method, headers, callOptions, tracers); + assertThat(stream).isSameInstanceAs(mockRealStream); + stream.setAuthority("authority-override-from-calloptions"); + stream.start(mock(ClientStreamListener.class)); + + // Must be set before start(), and may be overwritten + inOrder.verify(mockRealStream).setAuthority("authority-override-hostname-from-lb"); + inOrder.verify(mockRealStream).setAuthority("authority-override-from-calloptions"); + inOrder.verify(mockRealStream).start(any(ClientStreamListener.class)); } @Test