gcp-observability: remove logging channel/server providers (#9424)

This commit is contained in:
DNVindhya 2022-08-09 21:08:01 -07:00 committed by GitHub
parent 050cdb14fe
commit 7bdca0c0ef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 593 additions and 701 deletions

View File

@ -34,6 +34,7 @@ import io.grpc.gcp.observability.interceptors.LogHelper;
import io.grpc.gcp.observability.logging.GcpLogSink;
import io.grpc.gcp.observability.logging.Sink;
import io.grpc.internal.TimeProvider;
import io.opencensus.common.Duration;
import io.opencensus.contrib.grpc.metrics.RpcViews;
import io.opencensus.exporter.stats.stackdriver.StackdriverStatsConfiguration;
import io.opencensus.exporter.stats.stackdriver.StackdriverStatsExporter;
@ -55,6 +56,7 @@ import java.util.stream.Collectors;
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/8869")
public final class GcpObservability implements AutoCloseable {
private static final Logger logger = Logger.getLogger(GcpObservability.class.getName());
private static final int METRICS_EXPORT_INTERVAL = 30;
private static GcpObservability instance = null;
private final Sink sink;
private final ObservabilityConfig config;
@ -76,8 +78,6 @@ public final class GcpObservability implements AutoCloseable {
Sink sink = new GcpLogSink(observabilityConfig.getDestinationProjectId(),
globalLocationTags.getLocationTags(), observabilityConfig.getCustomTags(),
observabilityConfig.getFlushMessageCount());
// TODO(dnvindhya): Cleanup code for LoggingChannelProvider and LoggingServerProvider
// once ChannelBuilder and ServerBuilder are used
LogHelper helper = new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER);
ConfigFilterHelper configFilterHelper = ConfigFilterHelper.factory(observabilityConfig);
instance = grpcInit(sink, observabilityConfig,
@ -97,13 +97,8 @@ public final class GcpObservability implements AutoCloseable {
InternalLoggingServerInterceptor.Factory serverInterceptorFactory)
throws IOException {
if (instance == null) {
instance =
new GcpObservability(sink, config, channelInterceptorFactory, serverInterceptorFactory);
LogHelper logHelper = new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER);
ConfigFilterHelper logFilterHelper = ConfigFilterHelper.factory(config);
instance.setProducer(
new InternalLoggingChannelInterceptor.FactoryImpl(logHelper, logFilterHelper),
new InternalLoggingServerInterceptor.FactoryImpl(logHelper, logFilterHelper));
instance = new GcpObservability(sink, config);
instance.setProducer(channelInterceptorFactory, serverInterceptorFactory);
}
return instance;
}
@ -116,13 +111,13 @@ public final class GcpObservability implements AutoCloseable {
throw new IllegalStateException("GcpObservability already closed!");
}
unRegisterStackDriverExporter();
LoggingChannelProvider.shutdown();
LoggingServerProvider.shutdown();
sink.close();
instance = null;
}
}
// TODO(dnvindhya): Remove <channel/server>InterceptorFactory and replace with respective
// interceptors
private void setProducer(
InternalLoggingChannelInterceptor.Factory channelInterceptorFactory,
InternalLoggingServerInterceptor.Factory serverInterceptorFactory) {
@ -145,7 +140,8 @@ public final class GcpObservability implements AutoCloseable {
clientInterceptors, serverInterceptors, tracerFactories);
}
private void registerStackDriverExporter(String projectId, Map<String, String> customTags)
@VisibleForTesting
void registerStackDriverExporter(String projectId, Map<String, String> customTags)
throws IOException {
if (config.isEnableCloudMonitoring()) {
RpcViews.registerAllGrpcViews();
@ -160,6 +156,7 @@ public final class GcpObservability implements AutoCloseable {
e -> LabelValue.create(e.getValue())));
statsConfigurationBuilder.setConstantLabels(constantLabels);
}
statsConfigurationBuilder.setExportInterval(Duration.create(METRICS_EXPORT_INTERVAL, 0));
StackdriverStatsExporter.createAndRegister(statsConfigurationBuilder.build());
metricsEnabled = true;
}
@ -208,13 +205,8 @@ public final class GcpObservability implements AutoCloseable {
private GcpObservability(
Sink sink,
ObservabilityConfig config,
InternalLoggingChannelInterceptor.Factory channelInterceptorFactory,
InternalLoggingServerInterceptor.Factory serverInterceptorFactory) {
ObservabilityConfig config) {
this.sink = checkNotNull(sink);
this.config = checkNotNull(config);
LoggingChannelProvider.init(checkNotNull(channelInterceptorFactory));
LoggingServerProvider.init(checkNotNull(serverInterceptorFactory));
}
}

View File

@ -1,102 +0,0 @@
/*
* Copyright 2022 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.gcp.observability;
import static com.google.common.base.Preconditions.checkNotNull;
import io.grpc.ChannelCredentials;
import io.grpc.InternalManagedChannelProvider;
import io.grpc.ManagedChannelBuilder;
import io.grpc.ManagedChannelProvider;
import io.grpc.ManagedChannelRegistry;
import io.grpc.gcp.observability.interceptors.InternalLoggingChannelInterceptor;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Collection;
import java.util.Collections;
/** A channel provider that injects logging interceptor. */
final class LoggingChannelProvider extends ManagedChannelProvider {
private final ManagedChannelProvider prevProvider;
private final InternalLoggingChannelInterceptor.Factory clientInterceptorFactory;
private static LoggingChannelProvider instance;
private LoggingChannelProvider(InternalLoggingChannelInterceptor.Factory factory) {
prevProvider = ManagedChannelProvider.provider();
clientInterceptorFactory = factory;
}
static synchronized void init(InternalLoggingChannelInterceptor.Factory factory) {
if (instance != null) {
throw new IllegalStateException("LoggingChannelProvider already initialized!");
}
instance = new LoggingChannelProvider(factory);
ManagedChannelRegistry.getDefaultRegistry().register(instance);
}
static synchronized void shutdown() {
if (instance == null) {
throw new IllegalStateException("LoggingChannelProvider not initialized!");
}
ManagedChannelRegistry.getDefaultRegistry().deregister(instance);
instance = null;
}
@Override
protected boolean isAvailable() {
return true;
}
@Override
protected int priority() {
return 6;
}
private ManagedChannelBuilder<?> addInterceptor(ManagedChannelBuilder<?> builder) {
return builder.intercept(clientInterceptorFactory.create());
}
@Override
protected ManagedChannelBuilder<?> builderForAddress(String name, int port) {
return addInterceptor(
InternalManagedChannelProvider.builderForAddress(prevProvider, name, port));
}
@Override
protected ManagedChannelBuilder<?> builderForTarget(String target) {
return addInterceptor(InternalManagedChannelProvider.builderForTarget(prevProvider, target));
}
@Override
protected NewChannelBuilderResult newChannelBuilder(String target, ChannelCredentials creds) {
NewChannelBuilderResult result = InternalManagedChannelProvider.newChannelBuilder(prevProvider,
target, creds);
ManagedChannelBuilder<?> builder = result.getChannelBuilder();
if (builder != null) {
return NewChannelBuilderResult.channelBuilder(
addInterceptor(builder));
}
checkNotNull(result.getError(), "Expected error to be set!");
return result;
}
@Override
protected Collection<Class<? extends SocketAddress>> getSupportedSocketAddressTypes() {
return Collections.singleton(InetSocketAddress.class);
}
}

View File

@ -1,88 +0,0 @@
/*
* Copyright 2022 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.gcp.observability;
import static com.google.common.base.Preconditions.checkNotNull;
import io.grpc.InternalServerProvider;
import io.grpc.ServerBuilder;
import io.grpc.ServerCredentials;
import io.grpc.ServerProvider;
import io.grpc.ServerRegistry;
import io.grpc.gcp.observability.interceptors.InternalLoggingServerInterceptor;
/** A server provider that injects the logging interceptor. */
final class LoggingServerProvider extends ServerProvider {
private final ServerProvider prevProvider;
private final InternalLoggingServerInterceptor.Factory serverInterceptorFactory;
private static LoggingServerProvider instance;
private LoggingServerProvider(InternalLoggingServerInterceptor.Factory factory) {
prevProvider = ServerProvider.provider();
serverInterceptorFactory = factory;
}
static synchronized void init(InternalLoggingServerInterceptor.Factory factory) {
if (instance != null) {
throw new IllegalStateException("LoggingServerProvider already initialized!");
}
instance = new LoggingServerProvider(factory);
ServerRegistry.getDefaultRegistry().register(instance);
}
static synchronized void shutdown() {
if (instance == null) {
throw new IllegalStateException("LoggingServerProvider not initialized!");
}
ServerRegistry.getDefaultRegistry().deregister(instance);
instance = null;
}
@Override
protected boolean isAvailable() {
return true;
}
@Override
protected int priority() {
return 6;
}
private ServerBuilder<?> addInterceptor(ServerBuilder<?> builder) {
return builder.intercept(serverInterceptorFactory.create());
}
@Override
protected ServerBuilder<?> builderForPort(int port) {
return addInterceptor(InternalServerProvider.builderForPort(prevProvider, port));
}
@Override
protected NewServerBuilderResult newServerBuilderForPort(int port, ServerCredentials creds) {
ServerProvider.NewServerBuilderResult result = InternalServerProvider.newServerBuilderForPort(
prevProvider, port,
creds);
ServerBuilder<?> builder = result.getServerBuilder();
if (builder != null) {
return ServerProvider.NewServerBuilderResult.serverBuilder(
addInterceptor(builder));
}
checkNotNull(result.getError(), "Expected error to be set!");
return result;
}
}

View File

@ -40,7 +40,7 @@ import java.util.logging.Level;
import java.util.logging.Logger;
/**
* A logging interceptor for {@code LoggingChannelProvider}.
* A logging client interceptor for Observability.
*/
@Internal
public final class InternalLoggingChannelInterceptor implements ClientInterceptor {
@ -51,6 +51,7 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto
private final LogHelper helper;
private final ConfigFilterHelper filterHelper;
// TODO(dnvindhya): Remove factory and use interceptors directly
public interface Factory {
ClientInterceptor create();
}

View File

@ -40,7 +40,7 @@ import java.util.logging.Level;
import java.util.logging.Logger;
/**
* A logging interceptor for {@code LoggingServerProvider}.
* A logging server interceptor for Observability.
*/
@Internal
public final class InternalLoggingServerInterceptor implements ServerInterceptor {
@ -51,6 +51,7 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor
private final LogHelper helper;
private final ConfigFilterHelper filterHelper;
// TODO(dnvindhya): Remove factory and use interceptors directly
public interface Factory {
ServerInterceptor create();
}

View File

@ -54,18 +54,20 @@ public class GcpLogSink implements Sink {
= ImmutableSet.of("project_id", "location", "cluster_name", "namespace_name",
"pod_name", "container_name");
private static final long FALLBACK_FLUSH_LIMIT = 100L;
private final String projectId;
private final Map<String, String> customTags;
private final Logging gcpLoggingClient;
private final MonitoredResource kubernetesResource;
private final Long flushLimit;
/** Lazily initialize cloud logging client to avoid circular initialization. Because cloud
* logging APIs also uses gRPC. */
private volatile Logging gcpLoggingClient;
private long flushCounter;
private static Logging createLoggingClient(String projectId) {
LoggingOptions.Builder builder = LoggingOptions.newBuilder();
if (!Strings.isNullOrEmpty(projectId)) {
builder.setProjectId(projectId);
}
return builder.build().getService();
@VisibleForTesting
GcpLogSink(Logging loggingClient, String destinationProjectId, Map<String, String> locationTags,
Map<String, String> customTags, Long flushLimit) {
this(destinationProjectId, locationTags, customTags, flushLimit);
this.gcpLoggingClient = loggingClient;
}
/**
@ -75,15 +77,7 @@ public class GcpLogSink implements Sink {
*/
public GcpLogSink(String destinationProjectId, Map<String, String> locationTags,
Map<String, String> customTags, Long flushLimit) {
this(createLoggingClient(destinationProjectId), destinationProjectId, locationTags,
customTags, flushLimit);
}
@VisibleForTesting
GcpLogSink(Logging client, String destinationProjectId, Map<String, String> locationTags,
Map<String, String> customTags, Long flushLimit) {
this.gcpLoggingClient = client;
this.projectId = destinationProjectId;
this.customTags = getCustomTags(customTags, locationTags, destinationProjectId);
this.kubernetesResource = getResource(locationTags);
this.flushLimit = flushLimit != null ? flushLimit : FALLBACK_FLUSH_LIMIT;
@ -98,8 +92,11 @@ public class GcpLogSink implements Sink {
@Override
public void write(GrpcLogRecord logProto) {
if (gcpLoggingClient == null) {
logger.log(Level.SEVERE, "Attempt to write after GcpLogSink is closed.");
return;
synchronized (this) {
if (gcpLoggingClient == null) {
gcpLoggingClient = createLoggingClient();
}
}
}
if (SERVICE_TO_EXCLUDE.equals(logProto.getServiceName())) {
return;
@ -133,6 +130,14 @@ public class GcpLogSink implements Sink {
}
}
Logging createLoggingClient() {
LoggingOptions.Builder builder = LoggingOptions.newBuilder();
if (!Strings.isNullOrEmpty(projectId)) {
builder.setProjectId(projectId);
}
return builder.build().getService();
}
@VisibleForTesting
static Map<String, String> getCustomTags(Map<String, String> customTags,
Map<String, String> locationTags, String destinationProjectId) {

View File

@ -28,13 +28,11 @@ import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.InternalGlobalInterceptors;
import io.grpc.ManagedChannelProvider;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.ServerProvider;
import io.grpc.StaticTestingClassLoader;
import io.grpc.gcp.observability.interceptors.InternalLoggingChannelInterceptor;
import io.grpc.gcp.observability.interceptors.InternalLoggingServerInterceptor;
@ -84,9 +82,6 @@ public class GcpObservabilityTest {
@Override
public void run() {
// TODO(dnvindhya) : Remove usage of Providers on cleaning up Logging*Provider
ManagedChannelProvider prevChannelProvider = ManagedChannelProvider.provider();
ServerProvider prevServerProvider = ServerProvider.provider();
Sink sink = mock(Sink.class);
ObservabilityConfig config = mock(ObservabilityConfig.class);
InternalLoggingChannelInterceptor.Factory channelInterceptorFactory =
@ -98,16 +93,12 @@ public class GcpObservabilityTest {
GcpObservability observability =
GcpObservability.grpcInit(
sink, config, channelInterceptorFactory, serverInterceptorFactory);
assertThat(ManagedChannelProvider.provider()).isInstanceOf(LoggingChannelProvider.class);
assertThat(ServerProvider.provider()).isInstanceOf(ServerProvider.class);
observability1 =
GcpObservability.grpcInit(
sink, config, channelInterceptorFactory, serverInterceptorFactory);
assertThat(observability1).isSameInstanceAs(observability);
observability.close();
verify(sink).close();
assertThat(ManagedChannelProvider.provider()).isSameInstanceAs(prevChannelProvider);
assertThat(ServerProvider.provider()).isSameInstanceAs(prevServerProvider);
try {
observability1.close();
fail("should have failed for calling close() second time");

View File

@ -1,143 +0,0 @@
/*
* Copyright 2022 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.gcp.observability;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.Grpc;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.ManagedChannelProvider;
import io.grpc.MethodDescriptor;
import io.grpc.TlsChannelCredentials;
import io.grpc.gcp.observability.interceptors.ConfigFilterHelper;
import io.grpc.gcp.observability.interceptors.InternalLoggingChannelInterceptor;
import io.grpc.gcp.observability.interceptors.InternalLoggingChannelInterceptor.FactoryImpl;
import io.grpc.gcp.observability.interceptors.LogHelper;
import io.grpc.testing.TestMethodDescriptors;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentMatchers;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
@RunWith(JUnit4.class)
public class LoggingChannelProviderTest {
@Rule
public final MockitoRule mocks = MockitoJUnit.rule();
private final MethodDescriptor<Void, Void> method = TestMethodDescriptors.voidMethod();
@Test
public void initTwiceCausesException() {
ManagedChannelProvider prevProvider = ManagedChannelProvider.provider();
assertThat(prevProvider).isNotInstanceOf(LoggingChannelProvider.class);
LogHelper mockLogHelper = mock(LogHelper.class);
ConfigFilterHelper mockFilterHelper = mock(ConfigFilterHelper.class);
LoggingChannelProvider.init(
new FactoryImpl(mockLogHelper, mockFilterHelper));
assertThat(ManagedChannelProvider.provider()).isInstanceOf(LoggingChannelProvider.class);
try {
LoggingChannelProvider.init(
new FactoryImpl(mockLogHelper, mockFilterHelper));
fail("should have failed for calling init() again");
} catch (IllegalStateException e) {
assertThat(e).hasMessageThat().contains("LoggingChannelProvider already initialized!");
}
LoggingChannelProvider.shutdown();
assertThat(ManagedChannelProvider.provider()).isSameInstanceAs(prevProvider);
}
@Test
public void forTarget_interceptorCalled() {
ClientInterceptor interceptor = mock(ClientInterceptor.class,
delegatesTo(new NoopInterceptor()));
InternalLoggingChannelInterceptor.Factory factory = mock(
InternalLoggingChannelInterceptor.Factory.class);
when(factory.create()).thenReturn(interceptor);
LoggingChannelProvider.init(factory);
ManagedChannelBuilder<?> builder = ManagedChannelBuilder.forTarget("localhost");
ManagedChannel channel = builder.build();
CallOptions callOptions = CallOptions.DEFAULT;
ClientCall<Void, Void> unused = channel.newCall(method, callOptions);
verify(interceptor)
.interceptCall(same(method), same(callOptions), ArgumentMatchers.<Channel>any());
channel.shutdownNow();
LoggingChannelProvider.shutdown();
}
@Test
public void forAddress_interceptorCalled() {
ClientInterceptor interceptor = mock(ClientInterceptor.class,
delegatesTo(new NoopInterceptor()));
InternalLoggingChannelInterceptor.Factory factory = mock(
InternalLoggingChannelInterceptor.Factory.class);
when(factory.create()).thenReturn(interceptor);
LoggingChannelProvider.init(factory);
ManagedChannelBuilder<?> builder = ManagedChannelBuilder.forAddress("localhost", 80);
ManagedChannel channel = builder.build();
CallOptions callOptions = CallOptions.DEFAULT;
ClientCall<Void, Void> unused = channel.newCall(method, callOptions);
verify(interceptor)
.interceptCall(same(method), same(callOptions), ArgumentMatchers.<Channel>any());
channel.shutdownNow();
LoggingChannelProvider.shutdown();
}
@Test
public void newChannelBuilder_interceptorCalled() {
ClientInterceptor interceptor = mock(ClientInterceptor.class,
delegatesTo(new NoopInterceptor()));
InternalLoggingChannelInterceptor.Factory factory = mock(
InternalLoggingChannelInterceptor.Factory.class);
when(factory.create()).thenReturn(interceptor);
LoggingChannelProvider.init(factory);
ManagedChannelBuilder<?> builder = Grpc.newChannelBuilder("localhost",
TlsChannelCredentials.create());
ManagedChannel channel = builder.build();
CallOptions callOptions = CallOptions.DEFAULT;
ClientCall<Void, Void> unused = channel.newCall(method, callOptions);
verify(interceptor)
.interceptCall(same(method), same(callOptions), ArgumentMatchers.<Channel>any());
channel.shutdownNow();
LoggingChannelProvider.shutdown();
}
private static class NoopInterceptor implements ClientInterceptor {
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions, Channel next) {
return next.newCall(method, callOptions);
}
}
}

View File

@ -1,143 +0,0 @@
/*
* Copyright 2022 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.gcp.observability;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.fail;
import static org.mockito.AdditionalAnswers.delegatesTo;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import io.grpc.Grpc;
import io.grpc.InsecureServerCredentials;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.ServerProvider;
import io.grpc.gcp.observability.interceptors.ConfigFilterHelper;
import io.grpc.gcp.observability.interceptors.InternalLoggingServerInterceptor;
import io.grpc.gcp.observability.interceptors.InternalLoggingServerInterceptor.FactoryImpl;
import io.grpc.gcp.observability.interceptors.LogHelper;
import io.grpc.stub.StreamObserver;
import io.grpc.testing.GrpcCleanupRule;
import io.grpc.testing.protobuf.SimpleRequest;
import io.grpc.testing.protobuf.SimpleResponse;
import io.grpc.testing.protobuf.SimpleServiceGrpc;
import java.io.IOException;
import java.util.function.Supplier;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentMatchers;
@RunWith(JUnit4.class)
public class LoggingServerProviderTest {
@Rule public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule();
@Test
public void initTwiceCausesException() {
ServerProvider prevProvider = ServerProvider.provider();
assertThat(prevProvider).isNotInstanceOf(LoggingServerProvider.class);
LogHelper mockLogHelper = mock(LogHelper.class);
ConfigFilterHelper mockFilterHelper = mock(ConfigFilterHelper.class);
LoggingServerProvider.init(
new FactoryImpl(mockLogHelper, mockFilterHelper));
assertThat(ServerProvider.provider()).isInstanceOf(ServerProvider.class);
try {
LoggingServerProvider.init(
new FactoryImpl(mockLogHelper, mockFilterHelper));
fail("should have failed for calling init() again");
} catch (IllegalStateException e) {
assertThat(e).hasMessageThat().contains("LoggingServerProvider already initialized!");
}
LoggingServerProvider.shutdown();
assertThat(ServerProvider.provider()).isSameInstanceAs(prevProvider);
}
@Test
public void forPort_interceptorCalled() throws IOException {
serverBuilder_interceptorCalled(() -> ServerBuilder.forPort(0));
}
@Test
public void newServerBuilder_interceptorCalled() throws IOException {
serverBuilder_interceptorCalled(
() -> Grpc.newServerBuilderForPort(0, InsecureServerCredentials.create()));
}
@SuppressWarnings("unchecked")
private void serverBuilder_interceptorCalled(Supplier<ServerBuilder<?>> serverBuilderSupplier)
throws IOException {
ServerInterceptor interceptor =
mock(ServerInterceptor.class, delegatesTo(new NoopInterceptor()));
InternalLoggingServerInterceptor.Factory factory = mock(
InternalLoggingServerInterceptor.Factory.class);
when(factory.create()).thenReturn(interceptor);
LoggingServerProvider.init(factory);
Server server = serverBuilderSupplier.get().addService(new SimpleServiceImpl()).build().start();
int port = cleanupRule.register(server).getPort();
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", port).usePlaintext()
.build();
SimpleServiceGrpc.SimpleServiceBlockingStub stub = SimpleServiceGrpc.newBlockingStub(
cleanupRule.register(channel));
assertThat(unaryRpc("buddy", stub)).isEqualTo("Hello buddy");
verify(interceptor).interceptCall(any(ServerCall.class), any(Metadata.class), anyCallHandler());
LoggingServerProvider.shutdown();
}
private ServerCallHandler<String, Integer> anyCallHandler() {
return ArgumentMatchers.any();
}
private static String unaryRpc(
String requestMessage, SimpleServiceGrpc.SimpleServiceBlockingStub blockingStub) {
SimpleRequest request = SimpleRequest.newBuilder().setRequestMessage(requestMessage).build();
SimpleResponse response = blockingStub.unaryRpc(request);
return response.getResponseMessage();
}
private static class SimpleServiceImpl extends SimpleServiceGrpc.SimpleServiceImplBase {
@Override
public void unaryRpc(SimpleRequest req, StreamObserver<SimpleResponse> responseObserver) {
SimpleResponse response =
SimpleResponse.newBuilder()
.setResponseMessage("Hello " + req.getRequestMessage())
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
}
private static class NoopInterceptor implements ServerInterceptor {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(
ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
return next.startCall(call, headers);
}
}
}

View File

@ -28,10 +28,11 @@ import io.grpc.ManagedChannelBuilder;
import io.grpc.MethodDescriptor;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.StaticTestingClassLoader;
import io.grpc.gcp.observability.interceptors.ConfigFilterHelper;
import io.grpc.gcp.observability.interceptors.ConfigFilterHelper.FilterParams;
import io.grpc.gcp.observability.interceptors.InternalLoggingChannelInterceptor;
import io.grpc.gcp.observability.interceptors.InternalLoggingServerInterceptor.FactoryImpl;
import io.grpc.gcp.observability.interceptors.InternalLoggingServerInterceptor;
import io.grpc.gcp.observability.interceptors.LogHelper;
import io.grpc.gcp.observability.logging.GcpLogSink;
import io.grpc.gcp.observability.logging.Sink;
@ -41,9 +42,9 @@ import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType;
import io.grpc.testing.GrpcCleanupRule;
import io.grpc.testing.protobuf.SimpleServiceGrpc;
import java.io.IOException;
import java.util.Map;
import java.util.regex.Pattern;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@ -52,132 +53,196 @@ import org.mockito.Mockito;
@RunWith(JUnit4.class)
public class LoggingTest {
@Rule
public final GrpcCleanupRule cleanupRule = new GrpcCleanupRule();
@ClassRule
public static final GrpcCleanupRule cleanupRule = new GrpcCleanupRule();
private static final String PROJECT_ID = "PROJECT";
private static final Map<String, String> locationTags = ImmutableMap.of(
private static final ImmutableMap<String, String> LOCATION_TAGS = ImmutableMap.of(
"project_id", "PROJECT",
"location", "us-central1-c",
"cluster_name", "grpc-observability-cluster",
"namespace_name", "default" ,
"pod_name", "app1-6c7c58f897-n92c5");
private static final Map<String, String> customTags = ImmutableMap.of(
private static final ImmutableMap<String, String> CUSTOM_TAGS = ImmutableMap.of(
"KEY1", "Value1",
"KEY2", "VALUE2");
private static final long flushLimit = 100L;
private static final long FLUSH_LIMIT = 100L;
private final StaticTestingClassLoader classLoader =
new StaticTestingClassLoader(getClass().getClassLoader(), Pattern.compile("io\\.grpc\\..*"));
/**
* Cloud logging test using LoggingChannelProvider and LoggingServerProvider.
*
* <p> Ignoring test, because it calls external CLoud Logging APIs.
* To test cloud logging setup,
* 1. Set up Cloud Logging Auth credentials
* 2. Assign permissions to service account to write logs to project specified by
* Cloud logging test using GlobalInterceptors.
*
* <p> Ignoring test, because it calls external Cloud Logging APIs.
* To test cloud logging setup locally,
* 1. Set up Cloud auth credentials
* 2. Assign permissions to service account to write logs to project specified by
* variable PROJECT_ID
* 3. Comment @Ignore annotation
* 4. This test is expected to pass when ran with above setup. This has been verified manually.
* </p>
*/
@Ignore
@Test
public void clientServer_interceptorCalled_logAlways()
throws IOException {
Sink sink = new GcpLogSink(PROJECT_ID, locationTags, customTags, flushLimit);
LogHelper spyLogHelper = spy(new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER));
ConfigFilterHelper mockFilterHelper = mock(ConfigFilterHelper.class);
FilterParams logAlwaysFilterParams =
FilterParams.create(true, 0, 0);
when(mockFilterHelper.isMethodToBeLogged(any(MethodDescriptor.class)))
.thenReturn(logAlwaysFilterParams);
when(mockFilterHelper.isEventToBeLogged(any(GrpcLogRecord.EventType.class)))
.thenReturn(true);
LoggingServerProvider.init(
new FactoryImpl(spyLogHelper, mockFilterHelper));
Server server = ServerBuilder.forPort(0).addService(new LoggingTestHelper.SimpleServiceImpl())
.build().start();
int port = cleanupRule.register(server).getPort();
LoggingChannelProvider.init(
new InternalLoggingChannelInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper));
SimpleServiceGrpc.SimpleServiceBlockingStub stub = SimpleServiceGrpc.newBlockingStub(
cleanupRule.register(ManagedChannelBuilder.forAddress("localhost", port)
.usePlaintext().build()));
assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub))
.isEqualTo("Hello buddy");
assertThat(Mockito.mockingDetails(spyLogHelper).getInvocations().size()).isGreaterThan(11);
sink.close();
LoggingChannelProvider.shutdown();
LoggingServerProvider.shutdown();
public void clientServer_interceptorCalled_logAlways() throws Exception {
Class<?> runnable =
classLoader.loadClass(LoggingTest.StaticTestingClassEndtoEndLogging.class.getName());
((Runnable) runnable.getDeclaredConstructor().newInstance()).run();
}
@Test
public void clientServer_interceptorCalled_logNever() throws IOException {
Sink mockSink = mock(GcpLogSink.class);
LogHelper spyLogHelper = spy(new LogHelper(mockSink, TimeProvider.SYSTEM_TIME_PROVIDER));
ConfigFilterHelper mockFilterHelper = mock(ConfigFilterHelper.class);
FilterParams logNeverFilterParams =
FilterParams.create(false, 0, 0);
when(mockFilterHelper.isMethodToBeLogged(any(MethodDescriptor.class)))
.thenReturn(logNeverFilterParams);
when(mockFilterHelper.isEventToBeLogged(any(GrpcLogRecord.EventType.class)))
.thenReturn(true);
LoggingServerProvider.init(
new FactoryImpl(spyLogHelper, mockFilterHelper));
Server server = ServerBuilder.forPort(0).addService(new LoggingTestHelper.SimpleServiceImpl())
.build().start();
int port = cleanupRule.register(server).getPort();
LoggingChannelProvider.init(
new InternalLoggingChannelInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper));
SimpleServiceGrpc.SimpleServiceBlockingStub stub = SimpleServiceGrpc.newBlockingStub(
cleanupRule.register(ManagedChannelBuilder.forAddress("localhost", port)
.usePlaintext().build()));
assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub))
.isEqualTo("Hello buddy");
verifyNoInteractions(spyLogHelper);
verifyNoInteractions(mockSink);
LoggingChannelProvider.shutdown();
LoggingServerProvider.shutdown();
public void clientServer_interceptorCalled_logNever() throws Exception {
Class<?> runnable =
classLoader.loadClass(LoggingTest.StaticTestingClassLogNever.class.getName());
((Runnable) runnable.getDeclaredConstructor().newInstance()).run();
}
@Test
public void clientServer_interceptorCalled_doNotLogMessageEvents() throws IOException {
LogHelper mockLogHelper = mock(LogHelper.class);
ConfigFilterHelper mockFilterHelper2 = mock(ConfigFilterHelper.class);
FilterParams logAlwaysFilterParams =
FilterParams.create(true, 0, 0);
when(mockFilterHelper2.isMethodToBeLogged(any(MethodDescriptor.class)))
.thenReturn(logAlwaysFilterParams);
when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_REQUEST_HEADER))
.thenReturn(true);
when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_RESPONSE_HEADER))
.thenReturn(true);
when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_HALF_CLOSE))
.thenReturn(true);
when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_TRAILER))
.thenReturn(true);
when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_CANCEL))
.thenReturn(true);
when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_REQUEST_MESSAGE))
.thenReturn(false);
when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_RESPONSE_MESSAGE))
.thenReturn(false);
LoggingServerProvider.init(
new FactoryImpl(mockLogHelper, mockFilterHelper2));
Server server = ServerBuilder.forPort(0).addService(new LoggingTestHelper.SimpleServiceImpl())
.build().start();
int port = cleanupRule.register(server).getPort();
LoggingChannelProvider.init(
new InternalLoggingChannelInterceptor.FactoryImpl(mockLogHelper, mockFilterHelper2));
SimpleServiceGrpc.SimpleServiceBlockingStub stub = SimpleServiceGrpc.newBlockingStub(
cleanupRule.register(ManagedChannelBuilder.forAddress("localhost", port)
.usePlaintext().build()));
assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub))
.isEqualTo("Hello buddy");
// Total number of calls should have been 14 (6 from client and 6 from server)
// Since cancel is not invoked, it will be 12.
// Request message(Total count:2 (1 from client and 1 from server) and Response message(count:2)
// events are not in the event_types list, i.e 14 - 2(cancel) - 2(req_msg) - 2(resp_msg) = 8
assertThat(Mockito.mockingDetails(mockLogHelper).getInvocations().size()).isEqualTo(8);
LoggingChannelProvider.shutdown();
LoggingServerProvider.shutdown();
public void clientServer_interceptorCalled_logFewEvents() throws Exception {
Class<?> runnable =
classLoader.loadClass(LoggingTest.StaticTestingClassLogFewEvents.class.getName());
((Runnable) runnable.getDeclaredConstructor().newInstance()).run();
}
// UsedReflectively
public static final class StaticTestingClassEndtoEndLogging implements Runnable {
@Override
public void run() {
Sink sink = new GcpLogSink(PROJECT_ID, LOCATION_TAGS, CUSTOM_TAGS, FLUSH_LIMIT);
ObservabilityConfig config = mock(ObservabilityConfig.class);
LogHelper spyLogHelper = spy(new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER));
ConfigFilterHelper mockFilterHelper = mock(ConfigFilterHelper.class);
InternalLoggingChannelInterceptor.Factory channelInterceptorFactory =
new InternalLoggingChannelInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper);
InternalLoggingServerInterceptor.Factory serverInterceptorFactory =
new InternalLoggingServerInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper);
when(config.isEnableCloudLogging()).thenReturn(true);
FilterParams logAlwaysFilterParams = FilterParams.create(true, 0, 0);
when(mockFilterHelper.isMethodToBeLogged(any(MethodDescriptor.class)))
.thenReturn(logAlwaysFilterParams);
when(mockFilterHelper.isEventToBeLogged(any(GrpcLogRecord.EventType.class))).thenReturn(true);
try (GcpObservability unused =
GcpObservability.grpcInit(
sink, config, channelInterceptorFactory, serverInterceptorFactory)) {
Server server =
ServerBuilder.forPort(0)
.addService(new LoggingTestHelper.SimpleServiceImpl())
.build()
.start();
int port = cleanupRule.register(server).getPort();
SimpleServiceGrpc.SimpleServiceBlockingStub stub =
SimpleServiceGrpc.newBlockingStub(
cleanupRule.register(
ManagedChannelBuilder.forAddress("localhost", port).usePlaintext().build()));
assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub))
.isEqualTo("Hello buddy");
assertThat(Mockito.mockingDetails(spyLogHelper).getInvocations().size()).isGreaterThan(11);
} catch (IOException e) {
throw new AssertionError("Exception while testing logging", e);
}
}
}
public static final class StaticTestingClassLogNever implements Runnable {
@Override
public void run() {
Sink mockSink = mock(GcpLogSink.class);
ObservabilityConfig config = mock(ObservabilityConfig.class);
LogHelper spyLogHelper = spy(new LogHelper(mockSink, TimeProvider.SYSTEM_TIME_PROVIDER));
ConfigFilterHelper mockFilterHelper = mock(ConfigFilterHelper.class);
InternalLoggingChannelInterceptor.Factory channelInterceptorFactory =
new InternalLoggingChannelInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper);
InternalLoggingServerInterceptor.Factory serverInterceptorFactory =
new InternalLoggingServerInterceptor.FactoryImpl(spyLogHelper, mockFilterHelper);
when(config.isEnableCloudLogging()).thenReturn(true);
FilterParams logNeverFilterParams = FilterParams.create(false, 0, 0);
when(mockFilterHelper.isMethodToBeLogged(any(MethodDescriptor.class)))
.thenReturn(logNeverFilterParams);
when(mockFilterHelper.isEventToBeLogged(any(GrpcLogRecord.EventType.class))).thenReturn(true);
try (GcpObservability unused =
GcpObservability.grpcInit(
mockSink, config, channelInterceptorFactory, serverInterceptorFactory)) {
Server server =
ServerBuilder.forPort(0)
.addService(new LoggingTestHelper.SimpleServiceImpl())
.build()
.start();
int port = cleanupRule.register(server).getPort();
SimpleServiceGrpc.SimpleServiceBlockingStub stub =
SimpleServiceGrpc.newBlockingStub(
cleanupRule.register(
ManagedChannelBuilder.forAddress("localhost", port).usePlaintext().build()));
assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub))
.isEqualTo("Hello buddy");
verifyNoInteractions(spyLogHelper);
verifyNoInteractions(mockSink);
} catch (IOException e) {
throw new AssertionError("Exception while testing logging event filter", e);
}
}
}
public static final class StaticTestingClassLogFewEvents implements Runnable {
@Override
public void run() {
Sink mockSink = mock(GcpLogSink.class);
ObservabilityConfig config = mock(ObservabilityConfig.class);
LogHelper mockLogHelper = mock(LogHelper.class);
ConfigFilterHelper mockFilterHelper2 = mock(ConfigFilterHelper.class);
InternalLoggingChannelInterceptor.Factory channelInterceptorFactory =
new InternalLoggingChannelInterceptor.FactoryImpl(mockLogHelper, mockFilterHelper2);
InternalLoggingServerInterceptor.Factory serverInterceptorFactory =
new InternalLoggingServerInterceptor.FactoryImpl(mockLogHelper, mockFilterHelper2);
when(config.isEnableCloudLogging()).thenReturn(true);
FilterParams logAlwaysFilterParams = FilterParams.create(true, 0, 0);
when(mockFilterHelper2.isMethodToBeLogged(any(MethodDescriptor.class)))
.thenReturn(logAlwaysFilterParams);
when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_REQUEST_HEADER))
.thenReturn(true);
when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_RESPONSE_HEADER))
.thenReturn(true);
when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_HALF_CLOSE)).thenReturn(true);
when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_TRAILER)).thenReturn(true);
when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_CANCEL)).thenReturn(true);
when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_REQUEST_MESSAGE))
.thenReturn(false);
when(mockFilterHelper2.isEventToBeLogged(EventType.GRPC_CALL_RESPONSE_MESSAGE))
.thenReturn(false);
try (GcpObservability observability =
GcpObservability.grpcInit(
mockSink, config, channelInterceptorFactory, serverInterceptorFactory)) {
Server server =
ServerBuilder.forPort(0)
.addService(new LoggingTestHelper.SimpleServiceImpl())
.build()
.start();
int port = cleanupRule.register(server).getPort();
SimpleServiceGrpc.SimpleServiceBlockingStub stub =
SimpleServiceGrpc.newBlockingStub(
cleanupRule.register(
ManagedChannelBuilder.forAddress("localhost", port).usePlaintext().build()));
assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub))
.isEqualTo("Hello buddy");
// Total number of calls should have been 14 (6 from client and 6 from server)
// Since cancel is not invoked, it will be 12.
// Request message(Total count:2 (1 from client and 1 from server) and Response
// message(count:2)
// events are not in the event_types list, i.e 14 - 2(cancel) - 2(req_msg) - 2(resp_msg)
// = 8
assertThat(Mockito.mockingDetails(mockLogHelper).getInvocations().size()).isEqualTo(8);
} catch (IOException e) {
throw new AssertionError("Exception while testing logging event filter", e);
}
}
}
}

View File

@ -0,0 +1,159 @@
/*
* Copyright 2022 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.gcp.observability;
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.google.cloud.monitoring.v3.MetricServiceClient;
import com.google.cloud.monitoring.v3.MetricServiceClient.ListTimeSeriesPagedResponse;
import com.google.monitoring.v3.ListTimeSeriesRequest;
import com.google.monitoring.v3.ProjectName;
import com.google.monitoring.v3.TimeInterval;
import com.google.monitoring.v3.TimeSeries;
import com.google.protobuf.util.Timestamps;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.StaticTestingClassLoader;
import io.grpc.gcp.observability.interceptors.InternalLoggingChannelInterceptor;
import io.grpc.gcp.observability.interceptors.InternalLoggingServerInterceptor;
import io.grpc.gcp.observability.logging.GcpLogSink;
import io.grpc.gcp.observability.logging.Sink;
import io.grpc.testing.GrpcCleanupRule;
import io.grpc.testing.protobuf.SimpleServiceGrpc;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public class MetricsTest {
@ClassRule
public static final GrpcCleanupRule cleanupRule = new GrpcCleanupRule();
private static final String PROJECT_ID = "PROJECT";
private static final String TEST_CLIENT_METHOD = "grpc.testing.SimpleService/UnaryRpc";
private static final String CUSTOM_TAG_KEY = "Version";
private static final String CUSTOM_TAG_VALUE =
String.format("C67J9A-%s", String.valueOf(System.currentTimeMillis()));
private static final Map<String, String> CUSTOM_TAGS = Collections.singletonMap(CUSTOM_TAG_KEY,
CUSTOM_TAG_VALUE);
private final StaticTestingClassLoader classLoader =
new StaticTestingClassLoader(getClass().getClassLoader(),
Pattern.compile("io\\.grpc\\..*|io\\.opencensus\\..*"));
/**
* End to end cloud monitoring test.
*
* <p>Ignoring test, because it calls external Cloud Monitoring APIs. To test cloud monitoring
* setup locally,
* 1. Set up Cloud auth credentials
* 2. Assign permissions to service account to write metrics to project specified by variable
* PROJECT_ID
* 3. Comment @Ignore annotation
* 4. This test is expected to pass when ran with above setup. This has been verified manually.
*/
@Ignore
@Test
public void testMetricsExporter() throws Exception {
Class<?> runnable =
classLoader.loadClass(MetricsTest.StaticTestingClassTestMetricsExporter.class.getName());
((Runnable) runnable.getDeclaredConstructor().newInstance()).run();
}
public static final class StaticTestingClassTestMetricsExporter implements Runnable {
@Override
public void run() {
Sink mockSink = mock(GcpLogSink.class);
ObservabilityConfig mockConfig = mock(ObservabilityConfig.class);
InternalLoggingChannelInterceptor.Factory mockChannelInterceptorFactory =
mock(InternalLoggingChannelInterceptor.Factory.class);
InternalLoggingServerInterceptor.Factory mockServerInterceptorFactory =
mock(InternalLoggingServerInterceptor.Factory.class);
when(mockConfig.isEnableCloudMonitoring()).thenReturn(true);
when(mockConfig.getDestinationProjectId()).thenReturn(PROJECT_ID);
try {
GcpObservability observability =
GcpObservability.grpcInit(
mockSink, mockConfig, mockChannelInterceptorFactory, mockServerInterceptorFactory);
observability.registerStackDriverExporter(PROJECT_ID, CUSTOM_TAGS);
Server server =
ServerBuilder.forPort(0)
.addService(new LoggingTestHelper.SimpleServiceImpl())
.build()
.start();
int port = cleanupRule.register(server).getPort();
SimpleServiceGrpc.SimpleServiceBlockingStub stub =
SimpleServiceGrpc.newBlockingStub(
cleanupRule.register(
ManagedChannelBuilder.forAddress("localhost", port).usePlaintext().build()));
assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub))
.isEqualTo("Hello buddy");
// Adding sleep to ensure metrics are exported before querying cloud monitoring backend
TimeUnit.SECONDS.sleep(40);
// This checks Cloud monitoring for the new metrics that was just exported.
MetricServiceClient metricServiceClient = MetricServiceClient.create();
// Restrict time to last 1 minute
long startMillis = System.currentTimeMillis() - ((60 * 1) * 1000);
TimeInterval interval =
TimeInterval.newBuilder()
.setStartTime(Timestamps.fromMillis(startMillis))
.setEndTime(Timestamps.fromMillis(System.currentTimeMillis()))
.build();
// Timeseries data
String metricsFilter =
String.format(
"metric.type=\"custom.googleapis.com/opencensus/grpc.io/client/completed_rpcs\""
+ " AND metric.labels.grpc_client_method=\"%s\""
+ " AND metric.labels.%s=%s",
TEST_CLIENT_METHOD, CUSTOM_TAG_KEY, CUSTOM_TAG_VALUE);
ListTimeSeriesRequest metricsRequest =
ListTimeSeriesRequest.newBuilder()
.setName(ProjectName.of(PROJECT_ID).toString())
.setFilter(metricsFilter)
.setInterval(interval)
.build();
ListTimeSeriesPagedResponse response = metricServiceClient.listTimeSeries(metricsRequest);
assertThat(response.iterateAll()).isNotEmpty();
for (TimeSeries ts : response.iterateAll()) {
assertThat(ts.getMetric().getLabelsMap().get("grpc_client_method"))
.isEqualTo(TEST_CLIENT_METHOD);
assertThat(ts.getMetric().getLabelsMap().get("grpc_client_status")).isEqualTo("OK");
assertThat(ts.getPoints(0).getValue().getInt64Value()).isEqualTo(1);
}
observability.close();
} catch (IOException | InterruptedException e) {
throw new AssertionError("Exception while testing metrics", e);
}
}
}
}

View File

@ -0,0 +1,165 @@
/*
* Copyright 2022 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.grpc.gcp.observability;
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.google.cloud.trace.v1.TraceServiceClient;
import com.google.cloud.trace.v1.TraceServiceClient.ListTracesPagedResponse;
import com.google.devtools.cloudtrace.v1.GetTraceRequest;
import com.google.devtools.cloudtrace.v1.ListTracesRequest;
import com.google.devtools.cloudtrace.v1.Trace;
import com.google.devtools.cloudtrace.v1.TraceSpan;
import com.google.protobuf.util.Timestamps;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.StaticTestingClassLoader;
import io.grpc.gcp.observability.interceptors.InternalLoggingChannelInterceptor;
import io.grpc.gcp.observability.interceptors.InternalLoggingServerInterceptor;
import io.grpc.gcp.observability.logging.GcpLogSink;
import io.grpc.gcp.observability.logging.Sink;
import io.grpc.testing.GrpcCleanupRule;
import io.grpc.testing.protobuf.SimpleServiceGrpc;
import io.opencensus.trace.samplers.Samplers;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public class TracesTest {
@ClassRule
public static final GrpcCleanupRule cleanupRule = new GrpcCleanupRule();
private static final String PROJECT_ID = "PROJECT";
private static final String CUSTOM_TAG_KEY = "service";
private static final String CUSTOM_TAG_VALUE =
String.format("payment-%s", String.valueOf(System.currentTimeMillis()));
private static final Map<String, String> CUSTOM_TAGS =
Collections.singletonMap(CUSTOM_TAG_KEY, CUSTOM_TAG_VALUE);
private final StaticTestingClassLoader classLoader =
new StaticTestingClassLoader(getClass().getClassLoader(),
Pattern.compile("io\\.grpc\\..*|io\\.opencensus\\..*"));
/**
* End to end cloud trace test.
*
* <p>Ignoring test, because it calls external Cloud Tracing APIs. To test cloud trace setup
* locally,
* 1. Set up Cloud auth credentials
* 2. Assign permissions to service account to write traces to project specified by variable
* PROJECT_ID
* 3. Comment @Ignore annotation
* 4. This test is expected to pass when ran with above setup. This has been verified manually.
*/
@Ignore
@Test
public void testTracesExporter() throws Exception {
Class<?> runnable =
classLoader.loadClass(TracesTest.StaticTestingClassTestTracesExporter.class.getName());
((Runnable) runnable.getDeclaredConstructor().newInstance()).run();
}
public static final class StaticTestingClassTestTracesExporter implements Runnable {
@Override
public void run() {
Sink mockSink = mock(GcpLogSink.class);
ObservabilityConfig mockConfig = mock(ObservabilityConfig.class);
InternalLoggingChannelInterceptor.Factory mockChannelInterceptorFactory =
mock(InternalLoggingChannelInterceptor.Factory.class);
InternalLoggingServerInterceptor.Factory mockServerInterceptorFactory =
mock(InternalLoggingServerInterceptor.Factory.class);
when(mockConfig.isEnableCloudTracing()).thenReturn(true);
when(mockConfig.getSampler()).thenReturn(Samplers.alwaysSample());
when(mockConfig.getDestinationProjectId()).thenReturn(PROJECT_ID);
try {
GcpObservability observability =
GcpObservability.grpcInit(
mockSink, mockConfig, mockChannelInterceptorFactory, mockServerInterceptorFactory);
observability.registerStackDriverExporter(PROJECT_ID, CUSTOM_TAGS);
Server server =
ServerBuilder.forPort(0)
.addService(new LoggingTestHelper.SimpleServiceImpl())
.build()
.start();
int port = cleanupRule.register(server).getPort();
SimpleServiceGrpc.SimpleServiceBlockingStub stub =
SimpleServiceGrpc.newBlockingStub(
cleanupRule.register(
ManagedChannelBuilder.forAddress("localhost", port).usePlaintext().build()));
assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub))
.isEqualTo("Hello buddy");
// Adding sleep to ensure traces are exported before querying cloud tracing backend
TimeUnit.SECONDS.sleep(10);
TraceServiceClient traceServiceClient = TraceServiceClient.create();
String traceFilter =
String.format(
"span:Sent.grpc.testing.SimpleService +%s:%s", CUSTOM_TAG_KEY, CUSTOM_TAG_VALUE);
String traceOrder = "start";
// Restrict time to last 1 minute
long startMillis = System.currentTimeMillis() - ((60 * 1) * 1000);
ListTracesRequest traceRequest =
ListTracesRequest.newBuilder()
.setProjectId(PROJECT_ID)
.setStartTime(Timestamps.fromMillis(startMillis))
.setEndTime(Timestamps.fromMillis(System.currentTimeMillis()))
.setFilter(traceFilter)
.setOrderBy(traceOrder)
.build();
ListTracesPagedResponse traceResponse = traceServiceClient.listTraces(traceRequest);
assertThat(traceResponse.iterateAll()).isNotEmpty();
List<String> traceIdList = new ArrayList<>();
for (Trace t : traceResponse.iterateAll()) {
traceIdList.add(t.getTraceId());
}
for (String traceId : traceIdList) {
// This checks Cloud trace for the new trace that was just created.
GetTraceRequest getTraceRequest =
GetTraceRequest.newBuilder().setProjectId(PROJECT_ID).setTraceId(traceId).build();
Trace trace = traceServiceClient.getTrace(getTraceRequest);
assertThat(trace.getSpansList()).hasSize(3);
for (TraceSpan span : trace.getSpansList()) {
assertThat(span.getName()).contains("grpc.testing.SimpleService.UnaryRpc");
assertThat(span.getLabelsMap().get(CUSTOM_TAG_KEY)).isEqualTo(CUSTOM_TAG_VALUE);
}
}
observability.close();
} catch (IOException | InterruptedException e) {
throw new AssertionError("Exception while testing traces", e);
}
}
}
}

View File

@ -18,7 +18,6 @@ package io.grpc.gcp.observability.logging;
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.ArgumentMatchers.anyIterable;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
@ -39,12 +38,12 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
@ -57,75 +56,65 @@ public class GcpLogSinkTest {
@Rule
public final MockitoRule mockito = MockitoJUnit.rule();
private static final Map<String, String> locationTags = ImmutableMap.of("project_id", "PROJECT",
private static final ImmutableMap<String, String> LOCATION_TAGS =
ImmutableMap.of("project_id", "PROJECT",
"location", "us-central1-c",
"cluster_name", "grpc-observability-cluster",
"namespace_name", "default" ,
"pod_name", "app1-6c7c58f897-n92c5");
private static final Map<String, String> customTags = ImmutableMap.of("KEY1", "Value1",
private static final ImmutableMap<String, String> CUSTOM_TAGS =
ImmutableMap.of("KEY1", "Value1",
"KEY2", "VALUE2");
private static final long flushLimit = 10L;
// gRPC is expected to alway use this log name when reporting to GCP cloud logging.
private static final String expectedLogName =
private static final long FLUSH_LIMIT = 10L;
// gRPC is expected to always use this log name when reporting to GCP cloud logging.
private static final String EXPECTED_LOG_NAME =
"microservices.googleapis.com%2Fobservability%2Fgrpc";
private final long seqId = 1;
private final String destProjectName = "PROJECT";
private final String serviceName = "service";
private final String methodName = "method";
private final String authority = "authority";
private final Duration timeout = Durations.fromMillis(1234);
private final String rpcId = "d155e885-9587-4e77-81f7-3aa5a443d47f";
private final GrpcLogRecord logProto = GrpcLogRecord.newBuilder()
.setSequenceId(seqId)
.setServiceName(serviceName)
.setMethodName(methodName)
.setAuthority(authority)
.setTimeout(timeout)
private static final long SEQ_ID = 1;
private static final String DEST_PROJECT_NAME = "PROJECT";
private static final String SERVICE_NAME = "service";
private static final String METHOD_NAME = "method";
private static final String AUTHORITY = "authority";
private static final Duration TIMEOUT = Durations.fromMillis(1234);
private static final String RPC_ID = "d155e885-9587-4e77-81f7-3aa5a443d47f";
private static final GrpcLogRecord LOG_PROTO = GrpcLogRecord.newBuilder()
.setSequenceId(SEQ_ID)
.setServiceName(SERVICE_NAME)
.setMethodName(METHOD_NAME)
.setAuthority(AUTHORITY)
.setTimeout(TIMEOUT)
.setEventType(EventType.GRPC_CALL_REQUEST_HEADER)
.setEventLogger(EventLogger.LOGGER_CLIENT)
.setRpcId(rpcId)
.setRpcId(RPC_ID)
.build();
private final Struct expectedStructLogProto = Struct.newBuilder()
.putFields("sequence_id", Value.newBuilder().setStringValue(String.valueOf(seqId)).build())
.putFields("service_name", Value.newBuilder().setStringValue(serviceName).build())
.putFields("method_name", Value.newBuilder().setStringValue(methodName).build())
.putFields("authority", Value.newBuilder().setStringValue(authority).build())
private static final Struct EXPECTED_STRUCT_LOG_PROTO = Struct.newBuilder()
.putFields("sequence_id", Value.newBuilder().setStringValue(String.valueOf(SEQ_ID)).build())
.putFields("service_name", Value.newBuilder().setStringValue(SERVICE_NAME).build())
.putFields("method_name", Value.newBuilder().setStringValue(METHOD_NAME).build())
.putFields("authority", Value.newBuilder().setStringValue(AUTHORITY).build())
.putFields("timeout", Value.newBuilder().setStringValue("1.234s").build())
.putFields("event_type", Value.newBuilder().setStringValue(
String.valueOf(EventType.GRPC_CALL_REQUEST_HEADER)).build())
.putFields("event_logger", Value.newBuilder().setStringValue(
String.valueOf(EventLogger.LOGGER_CLIENT)).build())
.putFields("rpc_id", Value.newBuilder().setStringValue(rpcId).build())
.putFields("rpc_id", Value.newBuilder().setStringValue(RPC_ID).build())
.build();
@Mock
private Logging mockLogging;
@Before
public void setUp() {
mockLogging = mock(Logging.class);
}
@Test
public void createSink() {
Sink mockSink = new GcpLogSink(mockLogging, destProjectName, locationTags,
customTags, flushLimit);
assertThat(mockSink).isInstanceOf(GcpLogSink.class);
}
@Test
@SuppressWarnings("unchecked")
public void verifyWrite() throws Exception {
Sink mockSink = new GcpLogSink(mockLogging, destProjectName, locationTags,
customTags, flushLimit);
mockSink.write(logProto);
GcpLogSink sink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS,
CUSTOM_TAGS, FLUSH_LIMIT);
sink.write(LOG_PROTO);
ArgumentCaptor<Collection<LogEntry>> logEntrySetCaptor = ArgumentCaptor.forClass(
(Class) Collection.class);
verify(mockLogging, times(1)).write(logEntrySetCaptor.capture());
for (Iterator<LogEntry> it = logEntrySetCaptor.getValue().iterator(); it.hasNext(); ) {
LogEntry entry = it.next();
System.out.println(entry);
assertThat(entry.getPayload().getData()).isEqualTo(expectedStructLogProto);
assertThat(entry.getLogName()).isEqualTo(expectedLogName);
assertThat(entry.getPayload().getData()).isEqualTo(EXPECTED_STRUCT_LOG_PROTO);
assertThat(entry.getLogName()).isEqualTo(EXPECTED_LOG_NAME);
}
verifyNoMoreInteractions(mockLogging);
}
@ -133,10 +122,10 @@ public class GcpLogSinkTest {
@Test
@SuppressWarnings("unchecked")
public void verifyWriteWithTags() {
GcpLogSink mockSink = new GcpLogSink(mockLogging, destProjectName, locationTags,
customTags, flushLimit);
MonitoredResource expectedMonitoredResource = GcpLogSink.getResource(locationTags);
mockSink.write(logProto);
GcpLogSink sink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS,
CUSTOM_TAGS, FLUSH_LIMIT);
MonitoredResource expectedMonitoredResource = GcpLogSink.getResource(LOCATION_TAGS);
sink.write(LOG_PROTO);
ArgumentCaptor<Collection<LogEntry>> logEntrySetCaptor = ArgumentCaptor.forClass(
(Class) Collection.class);
@ -145,9 +134,9 @@ public class GcpLogSinkTest {
for (Iterator<LogEntry> it = logEntrySetCaptor.getValue().iterator(); it.hasNext(); ) {
LogEntry entry = it.next();
assertThat(entry.getResource()).isEqualTo(expectedMonitoredResource);
assertThat(entry.getLabels()).isEqualTo(customTags);
assertThat(entry.getPayload().getData()).isEqualTo(expectedStructLogProto);
assertThat(entry.getLogName()).isEqualTo(expectedLogName);
assertThat(entry.getLabels()).isEqualTo(CUSTOM_TAGS);
assertThat(entry.getPayload().getData()).isEqualTo(EXPECTED_STRUCT_LOG_PROTO);
assertThat(entry.getLogName()).isEqualTo(EXPECTED_LOG_NAME);
}
verifyNoMoreInteractions(mockLogging);
}
@ -157,9 +146,9 @@ public class GcpLogSinkTest {
public void emptyCustomTags_labelsNotSet() {
Map<String, String> emptyCustomTags = null;
Map<String, String> expectedEmptyLabels = new HashMap<>();
GcpLogSink mockSink = new GcpLogSink(mockLogging, destProjectName, locationTags,
emptyCustomTags, flushLimit);
mockSink.write(logProto);
GcpLogSink sink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS,
emptyCustomTags, FLUSH_LIMIT);
sink.write(LOG_PROTO);
ArgumentCaptor<Collection<LogEntry>> logEntrySetCaptor = ArgumentCaptor.forClass(
(Class) Collection.class);
@ -167,7 +156,7 @@ public class GcpLogSinkTest {
for (Iterator<LogEntry> it = logEntrySetCaptor.getValue().iterator(); it.hasNext(); ) {
LogEntry entry = it.next();
assertThat(entry.getLabels()).isEqualTo(expectedEmptyLabels);
assertThat(entry.getPayload().getData()).isEqualTo(expectedStructLogProto);
assertThat(entry.getPayload().getData()).isEqualTo(EXPECTED_STRUCT_LOG_PROTO);
}
}
@ -176,11 +165,11 @@ public class GcpLogSinkTest {
public void emptyCustomTags_setSourceProject() {
Map<String, String> emptyCustomTags = null;
String destinationProjectId = "DESTINATION_PROJECT";
Map<String, String> expectedLabels = GcpLogSink.getCustomTags(emptyCustomTags, locationTags,
Map<String, String> expectedLabels = GcpLogSink.getCustomTags(emptyCustomTags, LOCATION_TAGS,
destinationProjectId);
GcpLogSink mockSink = new GcpLogSink(mockLogging, destinationProjectId, locationTags,
emptyCustomTags, flushLimit);
mockSink.write(logProto);
GcpLogSink sink = new GcpLogSink(mockLogging, destinationProjectId, LOCATION_TAGS,
emptyCustomTags, FLUSH_LIMIT);
sink.write(LOG_PROTO);
ArgumentCaptor<Collection<LogEntry>> logEntrySetCaptor = ArgumentCaptor.forClass(
(Class) Collection.class);
@ -188,31 +177,31 @@ public class GcpLogSinkTest {
for (Iterator<LogEntry> it = logEntrySetCaptor.getValue().iterator(); it.hasNext(); ) {
LogEntry entry = it.next();
assertThat(entry.getLabels()).isEqualTo(expectedLabels);
assertThat(entry.getPayload().getData()).isEqualTo(expectedStructLogProto);
assertThat(entry.getPayload().getData()).isEqualTo(EXPECTED_STRUCT_LOG_PROTO);
}
}
@Test
public void verifyFlush() {
long lowerFlushLimit = 2L;
GcpLogSink mockSink = new GcpLogSink(mockLogging, destProjectName, locationTags,
customTags, lowerFlushLimit);
mockSink.write(logProto);
GcpLogSink sink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS,
CUSTOM_TAGS, lowerFlushLimit);
sink.write(LOG_PROTO);
verify(mockLogging, never()).flush();
mockSink.write(logProto);
sink.write(LOG_PROTO);
verify(mockLogging, times(1)).flush();
mockSink.write(logProto);
mockSink.write(logProto);
sink.write(LOG_PROTO);
sink.write(LOG_PROTO);
verify(mockLogging, times(2)).flush();
}
@Test
public void verifyClose() throws Exception {
Sink mockSink = new GcpLogSink(mockLogging, destProjectName, locationTags,
customTags, flushLimit);
mockSink.write(logProto);
GcpLogSink sink = new GcpLogSink(mockLogging, DEST_PROJECT_NAME, LOCATION_TAGS,
CUSTOM_TAGS, FLUSH_LIMIT);
sink.write(LOG_PROTO);
verify(mockLogging, times(1)).write(anyIterable());
mockSink.close();
sink.close();
verify(mockLogging).close();
verifyNoMoreInteractions(mockLogging);
}