mirror of https://github.com/grpc/grpc-java.git
rls: add logging for rls lb
This commit is contained in:
parent
a547e23f5e
commit
f59cd0a599
|
|
@ -26,6 +26,8 @@ import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.collect.ImmutableMap;
|
import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.common.util.concurrent.ListenableFuture;
|
import com.google.common.util.concurrent.ListenableFuture;
|
||||||
import com.google.common.util.concurrent.SettableFuture;
|
import com.google.common.util.concurrent.SettableFuture;
|
||||||
|
import io.grpc.ChannelLogger;
|
||||||
|
import io.grpc.ChannelLogger.ChannelLogLevel;
|
||||||
import io.grpc.ConnectivityState;
|
import io.grpc.ConnectivityState;
|
||||||
import io.grpc.LoadBalancer;
|
import io.grpc.LoadBalancer;
|
||||||
import io.grpc.LoadBalancer.Helper;
|
import io.grpc.LoadBalancer.Helper;
|
||||||
|
|
@ -114,6 +116,7 @@ final class CachingRlsLbClient {
|
||||||
private final RlsPicker rlsPicker;
|
private final RlsPicker rlsPicker;
|
||||||
private final ResolvedAddressFactory childLbResolvedAddressFactory;
|
private final ResolvedAddressFactory childLbResolvedAddressFactory;
|
||||||
private final RefCountedChildPolicyWrapperFactory refCountedChildPolicyWrapperFactory;
|
private final RefCountedChildPolicyWrapperFactory refCountedChildPolicyWrapperFactory;
|
||||||
|
private final ChannelLogger logger;
|
||||||
|
|
||||||
private CachingRlsLbClient(Builder builder) {
|
private CachingRlsLbClient(Builder builder) {
|
||||||
helper = checkNotNull(builder.helper, "helper");
|
helper = checkNotNull(builder.helper, "helper");
|
||||||
|
|
@ -136,7 +139,12 @@ final class CachingRlsLbClient {
|
||||||
rlsPicker = new RlsPicker(requestFactory);
|
rlsPicker = new RlsPicker(requestFactory);
|
||||||
ManagedChannelBuilder<?> rlsChannelBuilder =
|
ManagedChannelBuilder<?> rlsChannelBuilder =
|
||||||
helper.createResolvingOobChannelBuilder(rlsConfig.getLookupService());
|
helper.createResolvingOobChannelBuilder(rlsConfig.getLookupService());
|
||||||
|
logger = helper.getChannelLogger();
|
||||||
if (enableOobChannelDirectPath) {
|
if (enableOobChannelDirectPath) {
|
||||||
|
logger.log(
|
||||||
|
ChannelLogLevel.DEBUG,
|
||||||
|
"RLS channel direct path enabled. RLS channel service config: {0}",
|
||||||
|
getDirectpathServiceConfig());
|
||||||
rlsChannelBuilder.defaultServiceConfig(getDirectpathServiceConfig());
|
rlsChannelBuilder.defaultServiceConfig(getDirectpathServiceConfig());
|
||||||
rlsChannelBuilder.disableServiceConfigLookUp();
|
rlsChannelBuilder.disableServiceConfigLookUp();
|
||||||
}
|
}
|
||||||
|
|
@ -151,6 +159,7 @@ final class CachingRlsLbClient {
|
||||||
refCountedChildPolicyWrapperFactory =
|
refCountedChildPolicyWrapperFactory =
|
||||||
new RefCountedChildPolicyWrapperFactory(
|
new RefCountedChildPolicyWrapperFactory(
|
||||||
childLbHelperProvider, new BackoffRefreshListener());
|
childLbHelperProvider, new BackoffRefreshListener());
|
||||||
|
logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient created");
|
||||||
}
|
}
|
||||||
|
|
||||||
private static ImmutableMap<String, Object> getDirectpathServiceConfig() {
|
private static ImmutableMap<String, Object> getDirectpathServiceConfig() {
|
||||||
|
|
@ -170,20 +179,25 @@ final class CachingRlsLbClient {
|
||||||
private ListenableFuture<RouteLookupResponse> asyncRlsCall(RouteLookupRequest request) {
|
private ListenableFuture<RouteLookupResponse> asyncRlsCall(RouteLookupRequest request) {
|
||||||
final SettableFuture<RouteLookupResponse> response = SettableFuture.create();
|
final SettableFuture<RouteLookupResponse> response = SettableFuture.create();
|
||||||
if (throttler.shouldThrottle()) {
|
if (throttler.shouldThrottle()) {
|
||||||
|
logger.log(ChannelLogLevel.DEBUG, "Request is throttled");
|
||||||
response.setException(new ThrottledException());
|
response.setException(new ThrottledException());
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
io.grpc.lookup.v1.RouteLookupRequest routeLookupRequest = REQUEST_CONVERTER.convert(request);
|
||||||
|
logger.log(ChannelLogLevel.DEBUG, "Sending RouteLookupRequest: {0}", routeLookupRequest);
|
||||||
rlsStub.withDeadlineAfter(callTimeoutNanos, TimeUnit.NANOSECONDS)
|
rlsStub.withDeadlineAfter(callTimeoutNanos, TimeUnit.NANOSECONDS)
|
||||||
.routeLookup(
|
.routeLookup(
|
||||||
REQUEST_CONVERTER.convert(request),
|
routeLookupRequest,
|
||||||
new StreamObserver<io.grpc.lookup.v1.RouteLookupResponse>() {
|
new StreamObserver<io.grpc.lookup.v1.RouteLookupResponse>() {
|
||||||
@Override
|
@Override
|
||||||
public void onNext(io.grpc.lookup.v1.RouteLookupResponse value) {
|
public void onNext(io.grpc.lookup.v1.RouteLookupResponse value) {
|
||||||
|
logger.log(ChannelLogLevel.DEBUG, "Received RouteLookupResponse: {0}", value);
|
||||||
response.set(RESPONSE_CONVERTER.reverse().convert(value));
|
response.set(RESPONSE_CONVERTER.reverse().convert(value));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onError(Throwable t) {
|
public void onError(Throwable t) {
|
||||||
|
logger.log(ChannelLogLevel.DEBUG, "Error looking up route:", t);
|
||||||
response.setException(t);
|
response.setException(t);
|
||||||
throttler.registerBackendResponse(false);
|
throttler.registerBackendResponse(false);
|
||||||
}
|
}
|
||||||
|
|
@ -212,6 +226,7 @@ final class CachingRlsLbClient {
|
||||||
|
|
||||||
if (cacheEntry instanceof DataCacheEntry) {
|
if (cacheEntry instanceof DataCacheEntry) {
|
||||||
// cache hit, initiate async-refresh if entry is staled
|
// cache hit, initiate async-refresh if entry is staled
|
||||||
|
logger.log(ChannelLogLevel.DEBUG, "Cache hit for the request");
|
||||||
DataCacheEntry dataEntry = ((DataCacheEntry) cacheEntry);
|
DataCacheEntry dataEntry = ((DataCacheEntry) cacheEntry);
|
||||||
if (dataEntry.isStaled(timeProvider.currentTimeNanos())) {
|
if (dataEntry.isStaled(timeProvider.currentTimeNanos())) {
|
||||||
dataEntry.maybeRefresh();
|
dataEntry.maybeRefresh();
|
||||||
|
|
@ -224,6 +239,7 @@ final class CachingRlsLbClient {
|
||||||
|
|
||||||
/** Performs any pending maintenance operations needed by the cache. */
|
/** Performs any pending maintenance operations needed by the cache. */
|
||||||
void close() {
|
void close() {
|
||||||
|
logger.log(ChannelLogLevel.DEBUG, "CachingRlsLbClient closed");
|
||||||
synchronized (lock) {
|
synchronized (lock) {
|
||||||
// all childPolicyWrapper will be returned via AutoCleaningEvictionListener
|
// all childPolicyWrapper will be returned via AutoCleaningEvictionListener
|
||||||
linkedHashLruCache.close();
|
linkedHashLruCache.close();
|
||||||
|
|
@ -408,12 +424,17 @@ final class CachingRlsLbClient {
|
||||||
|
|
||||||
private void transitionToDataEntry(RouteLookupResponse routeLookupResponse) {
|
private void transitionToDataEntry(RouteLookupResponse routeLookupResponse) {
|
||||||
synchronized (lock) {
|
synchronized (lock) {
|
||||||
|
logger.log(
|
||||||
|
ChannelLogLevel.DEBUG,
|
||||||
|
"Transition to data cache: routeLookupResponse={0}",
|
||||||
|
routeLookupResponse);
|
||||||
linkedHashLruCache.cache(request, new DataCacheEntry(request, routeLookupResponse));
|
linkedHashLruCache.cache(request, new DataCacheEntry(request, routeLookupResponse));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void transitionToBackOff(Status status) {
|
private void transitionToBackOff(Status status) {
|
||||||
synchronized (lock) {
|
synchronized (lock) {
|
||||||
|
logger.log(ChannelLogLevel.DEBUG, "Transition to back off: status={0}", status);
|
||||||
linkedHashLruCache.cache(request, new BackoffCacheEntry(request, status, backoffPolicy));
|
linkedHashLruCache.cache(request, new BackoffCacheEntry(request, status, backoffPolicy));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -482,6 +503,10 @@ final class CachingRlsLbClient {
|
||||||
childPolicy.getEffectiveChildPolicy(childPolicyWrapper.getTarget()));
|
childPolicy.getEffectiveChildPolicy(childPolicyWrapper.getTarget()));
|
||||||
|
|
||||||
LoadBalancer lb = lbProvider.newLoadBalancer(childPolicyWrapper.getHelper());
|
LoadBalancer lb = lbProvider.newLoadBalancer(childPolicyWrapper.getHelper());
|
||||||
|
logger.log(
|
||||||
|
ChannelLogLevel.DEBUG,
|
||||||
|
"RLS child lb created. config: {0}",
|
||||||
|
lbConfig.getConfig());
|
||||||
lb.handleResolvedAddresses(childLbResolvedAddressFactory.create(lbConfig.getConfig()));
|
lb.handleResolvedAddresses(childLbResolvedAddressFactory.create(lbConfig.getConfig()));
|
||||||
lb.requestConnection();
|
lb.requestConnection();
|
||||||
}
|
}
|
||||||
|
|
@ -848,6 +873,9 @@ final class CachingRlsLbClient {
|
||||||
@Override
|
@Override
|
||||||
public PickResult pickSubchannel(PickSubchannelArgs args) {
|
public PickResult pickSubchannel(PickSubchannelArgs args) {
|
||||||
String[] methodName = args.getMethodDescriptor().getFullMethodName().split("/", 2);
|
String[] methodName = args.getMethodDescriptor().getFullMethodName().split("/", 2);
|
||||||
|
logger.log(ChannelLogLevel.DEBUG,
|
||||||
|
"Creating lookup request for service={0}, method={1}, headers={2}",
|
||||||
|
new Object[]{methodName[0], methodName[1], args.getHeaders()});
|
||||||
RouteLookupRequest request =
|
RouteLookupRequest request =
|
||||||
requestFactory.create(methodName[0], methodName[1], args.getHeaders());
|
requestFactory.create(methodName[0], methodName[1], args.getHeaders());
|
||||||
final CachedRouteLookupResponse response = CachingRlsLbClient.this.get(request);
|
final CachedRouteLookupResponse response = CachingRlsLbClient.this.get(request);
|
||||||
|
|
|
||||||
|
|
@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkNotNull;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.MoreObjects;
|
import com.google.common.base.MoreObjects;
|
||||||
|
import io.grpc.ChannelLogger;
|
||||||
import io.grpc.ChannelLogger.ChannelLogLevel;
|
import io.grpc.ChannelLogger.ChannelLogLevel;
|
||||||
import io.grpc.ConnectivityState;
|
import io.grpc.ConnectivityState;
|
||||||
import io.grpc.LoadBalancer;
|
import io.grpc.LoadBalancer;
|
||||||
|
|
@ -31,6 +32,7 @@ import javax.annotation.Nullable;
|
||||||
*/
|
*/
|
||||||
final class RlsLoadBalancer extends LoadBalancer {
|
final class RlsLoadBalancer extends LoadBalancer {
|
||||||
|
|
||||||
|
private final ChannelLogger logger;
|
||||||
private final Helper helper;
|
private final Helper helper;
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
CachingRlsLbClientBuilderProvider cachingRlsLbClientBuilderProvider =
|
CachingRlsLbClientBuilderProvider cachingRlsLbClientBuilderProvider =
|
||||||
|
|
@ -42,10 +44,13 @@ final class RlsLoadBalancer extends LoadBalancer {
|
||||||
|
|
||||||
RlsLoadBalancer(Helper helper) {
|
RlsLoadBalancer(Helper helper) {
|
||||||
this.helper = checkNotNull(helper, "helper");
|
this.helper = checkNotNull(helper, "helper");
|
||||||
|
logger = helper.getChannelLogger();
|
||||||
|
logger.log(ChannelLogLevel.DEBUG, "Rls lb created. Authority: {0}", helper.getAuthority());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
|
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
|
||||||
|
logger.log(ChannelLogLevel.DEBUG, "Received resolution result: {0}", resolvedAddresses);
|
||||||
LbPolicyConfiguration lbPolicyConfiguration =
|
LbPolicyConfiguration lbPolicyConfiguration =
|
||||||
(LbPolicyConfiguration) resolvedAddresses.getLoadBalancingPolicyConfig();
|
(LbPolicyConfiguration) resolvedAddresses.getLoadBalancingPolicyConfig();
|
||||||
checkNotNull(lbPolicyConfiguration, "Missing rls lb config");
|
checkNotNull(lbPolicyConfiguration, "Missing rls lb config");
|
||||||
|
|
@ -66,12 +71,12 @@ final class RlsLoadBalancer extends LoadBalancer {
|
||||||
new ChildLbResolvedAddressFactory(
|
new ChildLbResolvedAddressFactory(
|
||||||
resolvedAddresses.getAddresses(), resolvedAddresses.getAttributes()))
|
resolvedAddresses.getAddresses(), resolvedAddresses.getAttributes()))
|
||||||
.build();
|
.build();
|
||||||
|
logger.log(
|
||||||
|
ChannelLogLevel.DEBUG, "LbPolicyConfiguration updated to {0}", lbPolicyConfiguration);
|
||||||
}
|
}
|
||||||
// TODO(creamsoup) allow incremental service config update. for initial use case, it is
|
// TODO(creamsoup) allow incremental service config update. for initial use case, it is
|
||||||
// not required.
|
// not required.
|
||||||
this.lbPolicyConfiguration = lbPolicyConfiguration;
|
this.lbPolicyConfiguration = lbPolicyConfiguration;
|
||||||
helper.getChannelLogger()
|
|
||||||
.log(ChannelLogLevel.INFO, "LbPolicyConfiguration updated to {0}", lbPolicyConfiguration);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -82,6 +87,7 @@ final class RlsLoadBalancer extends LoadBalancer {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handleNameResolutionError(final Status error) {
|
public void handleNameResolutionError(final Status error) {
|
||||||
|
logger.log(ChannelLogLevel.DEBUG, "Received resolution error: {0}", error);
|
||||||
class ErrorPicker extends SubchannelPicker {
|
class ErrorPicker extends SubchannelPicker {
|
||||||
@Override
|
@Override
|
||||||
public PickResult pickSubchannel(PickSubchannelArgs args) {
|
public PickResult pickSubchannel(PickSubchannelArgs args) {
|
||||||
|
|
@ -106,6 +112,7 @@ final class RlsLoadBalancer extends LoadBalancer {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void shutdown() {
|
public void shutdown() {
|
||||||
|
logger.log(ChannelLogLevel.DEBUG, "Rls lb shutdown");
|
||||||
if (routeLookupClient != null) {
|
if (routeLookupClient != null) {
|
||||||
routeLookupClient.close();
|
routeLookupClient.close();
|
||||||
routeLookupClient = null;
|
routeLookupClient = null;
|
||||||
|
|
|
||||||
|
|
@ -35,6 +35,7 @@ import com.google.common.collect.ImmutableMap;
|
||||||
import com.google.common.util.concurrent.SettableFuture;
|
import com.google.common.util.concurrent.SettableFuture;
|
||||||
import io.grpc.Attributes;
|
import io.grpc.Attributes;
|
||||||
import io.grpc.CallOptions;
|
import io.grpc.CallOptions;
|
||||||
|
import io.grpc.ChannelLogger;
|
||||||
import io.grpc.ConnectivityState;
|
import io.grpc.ConnectivityState;
|
||||||
import io.grpc.EquivalentAddressGroup;
|
import io.grpc.EquivalentAddressGroup;
|
||||||
import io.grpc.ForwardingChannelBuilder;
|
import io.grpc.ForwardingChannelBuilder;
|
||||||
|
|
@ -557,6 +558,11 @@ public class CachingRlsLbClientTest {
|
||||||
public SynchronizationContext getSynchronizationContext() {
|
public SynchronizationContext getSynchronizationContext() {
|
||||||
return syncContext;
|
return syncContext;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChannelLogger getChannelLogger() {
|
||||||
|
return mock(ChannelLogger.class);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final class FakeThrottler implements Throttler {
|
private static final class FakeThrottler implements Throttler {
|
||||||
|
|
|
||||||
|
|
@ -406,7 +406,7 @@ public class RlsLoadBalancerTest {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String getAuthority() {
|
public String getAuthority() {
|
||||||
throw new UnsupportedOperationException();
|
return "fake-bigtable.googleapis.com";
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue