diff --git a/observability/src/main/java/io/grpc/observability/GlobalLoggingTags.java b/observability/src/main/java/io/grpc/observability/GlobalLoggingTags.java index fb5452ecda..cc1efe4121 100644 --- a/observability/src/main/java/io/grpc/observability/GlobalLoggingTags.java +++ b/observability/src/main/java/io/grpc/observability/GlobalLoggingTags.java @@ -39,16 +39,23 @@ final class GlobalLoggingTags { private static final Logger logger = Logger.getLogger(GlobalLoggingTags.class.getName()); private static final String ENV_KEY_PREFIX = "GRPC_OBSERVABILITY_"; - private final Map tags; + private final Map locationTags; + private final Map customTags; GlobalLoggingTags() { - ImmutableMap.Builder builder = ImmutableMap.builder(); - populate(builder); - tags = builder.build(); + ImmutableMap.Builder locationTagsBuilder = ImmutableMap.builder(); + ImmutableMap.Builder customTagsBuilder = ImmutableMap.builder(); + populate(locationTagsBuilder, customTagsBuilder); + locationTags = locationTagsBuilder.build(); + customTags = customTagsBuilder.build(); } - Map getTags() { - return tags; + Map getLocationTags() { + return locationTags; + } + + Map getCustomTags() { + return customTags; } @VisibleForTesting @@ -139,10 +146,11 @@ final class GlobalLoggingTags { }); } - static void populate(ImmutableMap.Builder customTags) { + static void populate(ImmutableMap.Builder locationTags, + ImmutableMap.Builder customTags) { populateFromEnvironmentVars(customTags); - populateFromMetadataServer(customTags); - populateFromKubernetesValues(customTags, + populateFromMetadataServer(locationTags); + populateFromKubernetesValues(locationTags, "/var/run/secrets/kubernetes.io/serviceaccount/namespace", "/etc/hostname", "/proc/self/cgroup"); } diff --git a/observability/src/main/java/io/grpc/observability/LoggingChannelProvider.java b/observability/src/main/java/io/grpc/observability/LoggingChannelProvider.java index 5011068c17..10eb64fa5e 100644 --- a/observability/src/main/java/io/grpc/observability/LoggingChannelProvider.java +++ b/observability/src/main/java/io/grpc/observability/LoggingChannelProvider.java @@ -45,7 +45,7 @@ final class LoggingChannelProvider extends ManagedChannelProvider { ManagedChannelRegistry.getDefaultRegistry().register(instance); } - static synchronized void finish() { + static synchronized void shutdown() { if (instance == null) { throw new IllegalStateException("LoggingChannelProvider not initialized!"); } diff --git a/observability/src/main/java/io/grpc/observability/LoggingServerProvider.java b/observability/src/main/java/io/grpc/observability/LoggingServerProvider.java index 5277bcf572..4931a1f7f9 100644 --- a/observability/src/main/java/io/grpc/observability/LoggingServerProvider.java +++ b/observability/src/main/java/io/grpc/observability/LoggingServerProvider.java @@ -45,7 +45,7 @@ final class LoggingServerProvider extends ServerProvider { ServerRegistry.getDefaultRegistry().register(instance); } - static synchronized void finish() { + static synchronized void shutdown() { if (instance == null) { throw new IllegalStateException("LoggingServerProvider not initialized!"); } diff --git a/observability/src/main/java/io/grpc/observability/Observability.java b/observability/src/main/java/io/grpc/observability/Observability.java index 617da68a5c..f5039ce390 100644 --- a/observability/src/main/java/io/grpc/observability/Observability.java +++ b/observability/src/main/java/io/grpc/observability/Observability.java @@ -16,49 +16,71 @@ package io.grpc.observability; +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.common.annotations.VisibleForTesting; import io.grpc.ExperimentalApi; import io.grpc.ManagedChannelProvider.ProviderNotFoundException; import io.grpc.observability.interceptors.InternalLoggingChannelInterceptor; import io.grpc.observability.interceptors.InternalLoggingServerInterceptor; import io.grpc.observability.logging.GcpLogSink; import io.grpc.observability.logging.Sink; +import java.io.IOException; /** The main class for gRPC Observability features. */ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/8869") public final class Observability { - private static boolean initialized = false; - private static final String PROJECT_ID = "PROJECT"; + private static Observability instance = null; + private final Sink sink; /** * Initialize grpc-observability. * * @throws ProviderNotFoundException if no underlying channel/server provider is available. */ - public static synchronized void grpcInit() { - if (initialized) { - throw new IllegalStateException("Observability already initialized!"); + public static synchronized Observability grpcInit() throws IOException { + if (instance == null) { + GlobalLoggingTags globalLoggingTags = new GlobalLoggingTags(); + ObservabilityConfigImpl observabilityConfig = ObservabilityConfigImpl.getInstance(); + Sink sink = new GcpLogSink(observabilityConfig.getDestinationProjectId()); + instance = grpcInit(sink, + new InternalLoggingChannelInterceptor.FactoryImpl(sink, + globalLoggingTags.getLocationTags(), globalLoggingTags.getCustomTags(), + observabilityConfig), + new InternalLoggingServerInterceptor.FactoryImpl(sink, + globalLoggingTags.getLocationTags(), globalLoggingTags.getCustomTags(), + observabilityConfig)); } - // TODO(dnvindhya): PROJECT_ID to be replaced with configured destinationProjectId - Sink sink = new GcpLogSink(PROJECT_ID); - LoggingChannelProvider.init(new InternalLoggingChannelInterceptor.FactoryImpl(sink)); - LoggingServerProvider.init(new InternalLoggingServerInterceptor.FactoryImpl(sink)); - // TODO(sanjaypujare): initialize customTags map - initialized = true; + return instance; } - /** Un-initialize or finish grpc-observability. */ - // TODO(sanjaypujare): Once Observability is made into Singleton object, - // close() on sink will be called as part of grpcFinish() - public static synchronized void grpcFinish() { - if (!initialized) { - throw new IllegalStateException("Observability not initialized!"); + @VisibleForTesting static Observability grpcInit(Sink sink, + InternalLoggingChannelInterceptor.Factory channelInterceptorFactory, + InternalLoggingServerInterceptor.Factory serverInterceptorFactory) { + if (instance == null) { + instance = new Observability(sink, channelInterceptorFactory, serverInterceptorFactory); } - LoggingChannelProvider.finish(); - LoggingServerProvider.finish(); - // TODO(sanjaypujare): finish customTags map - initialized = false; + return instance; } - private Observability() { + /** Un-initialize/shutdown grpc-observability. */ + public void grpcShutdown() { + synchronized (Observability.class) { + if (instance == null) { + throw new IllegalStateException("Observability already shutdown!"); + } + LoggingChannelProvider.shutdown(); + LoggingServerProvider.shutdown(); + sink.close(); + instance = null; + } + } + + private Observability(Sink sink, + InternalLoggingChannelInterceptor.Factory channelInterceptorFactory, + InternalLoggingServerInterceptor.Factory serverInterceptorFactory) { + this.sink = checkNotNull(sink); + LoggingChannelProvider.init(checkNotNull(channelInterceptorFactory)); + LoggingServerProvider.init(checkNotNull(serverInterceptorFactory)); } } diff --git a/observability/src/main/java/io/grpc/observability/ObservabilityConfig.java b/observability/src/main/java/io/grpc/observability/ObservabilityConfig.java index 29cb371eb4..61c5f90835 100644 --- a/observability/src/main/java/io/grpc/observability/ObservabilityConfig.java +++ b/observability/src/main/java/io/grpc/observability/ObservabilityConfig.java @@ -16,31 +16,29 @@ package io.grpc.observability; -import static com.google.common.base.Preconditions.checkArgument; - -import io.grpc.internal.JsonParser; -import io.grpc.internal.JsonUtil; -import io.grpc.observabilitylog.v1.GrpcLogRecord; import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType; -import java.io.IOException; -import java.util.List; -import java.util.Map; -/** gRPC Observability configuration processor. */ -final class ObservabilityConfig { - private static final String CONFIG_ENV_VAR_NAME = "GRPC_CONFIG_OBSERVABILITY"; +public interface ObservabilityConfig { + /** Is Cloud Logging enabled. */ + boolean isEnableCloudLogging(); - private boolean enableCloudLogging = true; - private String destinationProjectId = null; - private LogFilter[] logFilters; - private GrpcLogRecord.EventType[] eventTypes; + /** Get destination project ID - where logs will go. */ + String getDestinationProjectId(); - /** POJO for representing a filter used in configuration. */ - static class LogFilter { + /** Get filters set for logging. */ + LogFilter[] getLogFilters(); + + /** Get event types to log. */ + EventType[] getEventTypes(); + + /** + * POJO for representing a filter used in configuration. + */ + public static class LogFilter { /** Pattern indicating which service/method to log. */ public final String pattern; - /** Number of bytes of each header to log. */ + /** Number of bytes of each header to log. */ public final Integer headerBytes; /** Number of bytes of each header to log. */ @@ -52,88 +50,4 @@ final class ObservabilityConfig { this.messageBytes = messageBytes; } } - - static ObservabilityConfig getInstance() throws IOException { - ObservabilityConfig config = new ObservabilityConfig(); - config.parse(System.getenv(CONFIG_ENV_VAR_NAME)); - return config; - } - - @SuppressWarnings("unchecked") - void parse(String config) throws IOException { - checkArgument(config != null, CONFIG_ENV_VAR_NAME + " value is null!"); - parseLoggingConfig( - JsonUtil.getObject((Map) JsonParser.parse(config), "logging_config")); - } - - private void parseLoggingConfig(Map loggingConfig) { - if (loggingConfig != null) { - Boolean value = JsonUtil.getBoolean(loggingConfig, "enable_cloud_logging"); - if (value != null) { - enableCloudLogging = value; - } - destinationProjectId = JsonUtil.getString(loggingConfig, "destination_project_id"); - List rawList = JsonUtil.getList(loggingConfig, "log_filters"); - if (rawList != null) { - List> jsonLogFilters = JsonUtil.checkObjectList(rawList); - this.logFilters = new LogFilter[jsonLogFilters.size()]; - for (int i = 0; i < jsonLogFilters.size(); i++) { - this.logFilters[i] = parseJsonLogFilter(jsonLogFilters.get(i)); - } - } - rawList = JsonUtil.getList(loggingConfig, "event_types"); - if (rawList != null) { - List jsonEventTypes = JsonUtil.checkStringList(rawList); - this.eventTypes = new GrpcLogRecord.EventType[jsonEventTypes.size()]; - for (int i = 0; i < jsonEventTypes.size(); i++) { - this.eventTypes[i] = convertEventType(jsonEventTypes.get(i)); - } - } - } - } - - private GrpcLogRecord.EventType convertEventType(String val) { - switch (val) { - case "GRPC_CALL_UNKNOWN": - return GrpcLogRecord.EventType.GRPC_CALL_UNKNOWN; - case "GRPC_CALL_REQUEST_HEADER": - return GrpcLogRecord.EventType.GRPC_CALL_REQUEST_HEADER; - case "GRPC_CALL_RESPONSE_HEADER": - return GrpcLogRecord.EventType.GRPC_CALL_RESPONSE_HEADER; - case"GRPC_CALL_REQUEST_MESSAGE": - return GrpcLogRecord.EventType.GRPC_CALL_REQUEST_MESSAGE; - case "GRPC_CALL_RESPONSE_MESSAGE": - return GrpcLogRecord.EventType.GRPC_CALL_RESPONSE_MESSAGE; - case "GRPC_CALL_TRAILER": - return GrpcLogRecord.EventType.GRPC_CALL_TRAILER; - case "GRPC_CALL_HALF_CLOSE": - return GrpcLogRecord.EventType.GRPC_CALL_HALF_CLOSE; - case "GRPC_CALL_CANCEL": - return GrpcLogRecord.EventType.GRPC_CALL_CANCEL; - default: - throw new IllegalArgumentException("Unknown event type value:" + val); - } - } - - private LogFilter parseJsonLogFilter(Map logFilterMap) { - return new LogFilter(JsonUtil.getString(logFilterMap, "pattern"), - JsonUtil.getNumberAsInteger(logFilterMap, "header_bytes"), - JsonUtil.getNumberAsInteger(logFilterMap, "message_bytes")); - } - - public boolean isEnableCloudLogging() { - return enableCloudLogging; - } - - public String getDestinationProjectId() { - return destinationProjectId; - } - - public LogFilter[] getLogFilters() { - return logFilters; - } - - public EventType[] getEventTypes() { - return eventTypes; - } } diff --git a/observability/src/main/java/io/grpc/observability/ObservabilityConfigImpl.java b/observability/src/main/java/io/grpc/observability/ObservabilityConfigImpl.java new file mode 100644 index 0000000000..86e60ed0bc --- /dev/null +++ b/observability/src/main/java/io/grpc/observability/ObservabilityConfigImpl.java @@ -0,0 +1,124 @@ +/* + * Copyright 2022 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.observability; + +import static com.google.common.base.Preconditions.checkArgument; + +import io.grpc.internal.JsonParser; +import io.grpc.internal.JsonUtil; +import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType; +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** gRPC Observability configuration processor. */ +final class ObservabilityConfigImpl implements ObservabilityConfig { + private static final String CONFIG_ENV_VAR_NAME = "GRPC_CONFIG_OBSERVABILITY"; + + private boolean enableCloudLogging = true; + private String destinationProjectId = null; + private LogFilter[] logFilters; + private EventType[] eventTypes; + + static ObservabilityConfigImpl getInstance() throws IOException { + ObservabilityConfigImpl config = new ObservabilityConfigImpl(); + config.parse(System.getenv(CONFIG_ENV_VAR_NAME)); + return config; + } + + @SuppressWarnings("unchecked") + void parse(String config) throws IOException { + checkArgument(config != null, CONFIG_ENV_VAR_NAME + " value is null!"); + parseLoggingConfig( + JsonUtil.getObject((Map) JsonParser.parse(config), "logging_config")); + } + + private void parseLoggingConfig(Map loggingConfig) { + if (loggingConfig != null) { + Boolean value = JsonUtil.getBoolean(loggingConfig, "enable_cloud_logging"); + if (value != null) { + enableCloudLogging = value; + } + destinationProjectId = JsonUtil.getString(loggingConfig, "destination_project_id"); + List rawList = JsonUtil.getList(loggingConfig, "log_filters"); + if (rawList != null) { + List> jsonLogFilters = JsonUtil.checkObjectList(rawList); + this.logFilters = new LogFilter[jsonLogFilters.size()]; + for (int i = 0; i < jsonLogFilters.size(); i++) { + this.logFilters[i] = parseJsonLogFilter(jsonLogFilters.get(i)); + } + } + rawList = JsonUtil.getList(loggingConfig, "event_types"); + if (rawList != null) { + List jsonEventTypes = JsonUtil.checkStringList(rawList); + this.eventTypes = new EventType[jsonEventTypes.size()]; + for (int i = 0; i < jsonEventTypes.size(); i++) { + this.eventTypes[i] = convertEventType(jsonEventTypes.get(i)); + } + } + } + } + + private EventType convertEventType(String val) { + switch (val) { + case "GRPC_CALL_UNKNOWN": + return EventType.GRPC_CALL_UNKNOWN; + case "GRPC_CALL_REQUEST_HEADER": + return EventType.GRPC_CALL_REQUEST_HEADER; + case "GRPC_CALL_RESPONSE_HEADER": + return EventType.GRPC_CALL_RESPONSE_HEADER; + case"GRPC_CALL_REQUEST_MESSAGE": + return EventType.GRPC_CALL_REQUEST_MESSAGE; + case "GRPC_CALL_RESPONSE_MESSAGE": + return EventType.GRPC_CALL_RESPONSE_MESSAGE; + case "GRPC_CALL_TRAILER": + return EventType.GRPC_CALL_TRAILER; + case "GRPC_CALL_HALF_CLOSE": + return EventType.GRPC_CALL_HALF_CLOSE; + case "GRPC_CALL_CANCEL": + return EventType.GRPC_CALL_CANCEL; + default: + throw new IllegalArgumentException("Unknown event type value:" + val); + } + } + + private LogFilter parseJsonLogFilter(Map logFilterMap) { + return new LogFilter(JsonUtil.getString(logFilterMap, "pattern"), + JsonUtil.getNumberAsInteger(logFilterMap, "header_bytes"), + JsonUtil.getNumberAsInteger(logFilterMap, "message_bytes")); + } + + @Override + public boolean isEnableCloudLogging() { + return enableCloudLogging; + } + + @Override + public String getDestinationProjectId() { + return destinationProjectId; + } + + @Override + public LogFilter[] getLogFilters() { + return logFilters; + } + + @Override + public EventType[] getEventTypes() { + return eventTypes; + } +} diff --git a/observability/src/main/java/io/grpc/observability/interceptors/InternalLoggingChannelInterceptor.java b/observability/src/main/java/io/grpc/observability/interceptors/InternalLoggingChannelInterceptor.java index 63d5280584..6c4ee63f5c 100644 --- a/observability/src/main/java/io/grpc/observability/interceptors/InternalLoggingChannelInterceptor.java +++ b/observability/src/main/java/io/grpc/observability/interceptors/InternalLoggingChannelInterceptor.java @@ -31,9 +31,11 @@ import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.Status; import io.grpc.internal.TimeProvider; +import io.grpc.observability.ObservabilityConfig; import io.grpc.observability.logging.Sink; import io.grpc.observabilitylog.v1.GrpcLogRecord.EventLogger; import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType; +import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -58,13 +60,12 @@ public final class InternalLoggingChannelInterceptor implements ClientIntercepto private final Sink sink; private final LogHelper helper; - static LogHelper createLogHelper(Sink sink, TimeProvider provider) { - return new LogHelper(sink, provider); - } - - public FactoryImpl(Sink sink) { + /** Create the {@link Factory} we need to create our {@link ClientInterceptor}s. */ + public FactoryImpl(Sink sink, Map locationTags, Map customTags, + ObservabilityConfig observabilityConfig) { this.sink = sink; - this.helper = createLogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER); + this.helper = new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER, locationTags, customTags, + observabilityConfig); } @Override diff --git a/observability/src/main/java/io/grpc/observability/interceptors/InternalLoggingServerInterceptor.java b/observability/src/main/java/io/grpc/observability/interceptors/InternalLoggingServerInterceptor.java index 8d3337057c..c9df0f65c4 100644 --- a/observability/src/main/java/io/grpc/observability/interceptors/InternalLoggingServerInterceptor.java +++ b/observability/src/main/java/io/grpc/observability/interceptors/InternalLoggingServerInterceptor.java @@ -29,10 +29,12 @@ import io.grpc.ServerCallHandler; import io.grpc.ServerInterceptor; import io.grpc.Status; import io.grpc.internal.TimeProvider; +import io.grpc.observability.ObservabilityConfig; import io.grpc.observability.logging.Sink; import io.grpc.observabilitylog.v1.GrpcLogRecord.EventLogger; import io.grpc.observabilitylog.v1.GrpcLogRecord.EventType; import java.net.SocketAddress; +import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -55,13 +57,13 @@ public final class InternalLoggingServerInterceptor implements ServerInterceptor private final Sink sink; private final LogHelper helper; - static LogHelper createLogHelper(Sink sink, TimeProvider provider) { - return new LogHelper(sink, provider); - } - - public FactoryImpl(Sink sink) { + /** Create the {@link Factory} we need to create our {@link ServerInterceptor}s. */ + public FactoryImpl(Sink sink, Map locationTags, + Map customTags, + ObservabilityConfig observabilityConfig) { this.sink = sink; - this.helper = createLogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER); + this.helper = new LogHelper(sink, TimeProvider.SYSTEM_TIME_PROVIDER, locationTags, customTags, + observabilityConfig); } @Override diff --git a/observability/src/main/java/io/grpc/observability/interceptors/LogHelper.java b/observability/src/main/java/io/grpc/observability/interceptors/LogHelper.java index d454c84c98..f4e705b470 100644 --- a/observability/src/main/java/io/grpc/observability/interceptors/LogHelper.java +++ b/observability/src/main/java/io/grpc/observability/interceptors/LogHelper.java @@ -30,6 +30,7 @@ import io.grpc.InternalMetadata; import io.grpc.Metadata; import io.grpc.Status; import io.grpc.internal.TimeProvider; +import io.grpc.observability.ObservabilityConfig; import io.grpc.observability.logging.Sink; import io.grpc.observabilitylog.v1.GrpcLogRecord; import io.grpc.observabilitylog.v1.GrpcLogRecord.Address; @@ -41,6 +42,7 @@ import java.net.Inet6Address; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.util.Map; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -59,10 +61,18 @@ class LogHelper { private final Sink sink; private final TimeProvider timeProvider; + // TODO(DNvindhya) remove unused annotation once the following 2 are actually used + @SuppressWarnings({"unused"}) private final Map locationTags; + @SuppressWarnings({"unused"}) private final Map customTags; + @SuppressWarnings({"unused"}) private final ObservabilityConfig observabilityConfig; - LogHelper(Sink sink, TimeProvider timeProvider) { + LogHelper(Sink sink, TimeProvider timeProvider, Map locationTags, + Map customTags, ObservabilityConfig observabilityConfig) { this.sink = sink; this.timeProvider = timeProvider; + this.locationTags = locationTags; + this.customTags = customTags; + this.observabilityConfig = observabilityConfig; } /** diff --git a/observability/src/test/java/io/grpc/observability/LoggingChannelProviderTest.java b/observability/src/test/java/io/grpc/observability/LoggingChannelProviderTest.java index 31e1268262..b8de06cf56 100644 --- a/observability/src/test/java/io/grpc/observability/LoggingChannelProviderTest.java +++ b/observability/src/test/java/io/grpc/observability/LoggingChannelProviderTest.java @@ -59,16 +59,17 @@ public class LoggingChannelProviderTest { ManagedChannelProvider prevProvider = ManagedChannelProvider.provider(); assertThat(prevProvider).isNotInstanceOf(LoggingChannelProvider.class); Sink mockSink = mock(GcpLogSink.class); - LoggingChannelProvider.init(new InternalLoggingChannelInterceptor.FactoryImpl(mockSink)); + LoggingChannelProvider.init( + new InternalLoggingChannelInterceptor.FactoryImpl(mockSink, null, null, null)); assertThat(ManagedChannelProvider.provider()).isInstanceOf(LoggingChannelProvider.class); try { LoggingChannelProvider.init( - new InternalLoggingChannelInterceptor.FactoryImpl(mockSink)); + new InternalLoggingChannelInterceptor.FactoryImpl(mockSink, null, null, null)); fail("should have failed for calling init() again"); } catch (IllegalStateException e) { assertThat(e).hasMessageThat().contains("LoggingChannelProvider already initialized!"); } - LoggingChannelProvider.finish(); + LoggingChannelProvider.shutdown(); assertThat(ManagedChannelProvider.provider()).isSameInstanceAs(prevProvider); } @@ -88,7 +89,7 @@ public class LoggingChannelProviderTest { verify(interceptor) .interceptCall(same(method), same(callOptions), ArgumentMatchers.any()); channel.shutdownNow(); - LoggingChannelProvider.finish(); + LoggingChannelProvider.shutdown(); } @Test @@ -107,7 +108,7 @@ public class LoggingChannelProviderTest { verify(interceptor) .interceptCall(same(method), same(callOptions), ArgumentMatchers.any()); channel.shutdownNow(); - LoggingChannelProvider.finish(); + LoggingChannelProvider.shutdown(); } @Test @@ -127,7 +128,7 @@ public class LoggingChannelProviderTest { verify(interceptor) .interceptCall(same(method), same(callOptions), ArgumentMatchers.any()); channel.shutdownNow(); - LoggingChannelProvider.finish(); + LoggingChannelProvider.shutdown(); } private static class NoopInterceptor implements ClientInterceptor { diff --git a/observability/src/test/java/io/grpc/observability/LoggingServerProviderTest.java b/observability/src/test/java/io/grpc/observability/LoggingServerProviderTest.java index 21e86ac182..8f837f44f4 100644 --- a/observability/src/test/java/io/grpc/observability/LoggingServerProviderTest.java +++ b/observability/src/test/java/io/grpc/observability/LoggingServerProviderTest.java @@ -60,15 +60,17 @@ public class LoggingServerProviderTest { ServerProvider prevProvider = ServerProvider.provider(); assertThat(prevProvider).isNotInstanceOf(LoggingServerProvider.class); Sink mockSink = mock(GcpLogSink.class); - LoggingServerProvider.init(new InternalLoggingServerInterceptor.FactoryImpl(mockSink)); + LoggingServerProvider.init( + new InternalLoggingServerInterceptor.FactoryImpl(mockSink, null, null, null)); assertThat(ServerProvider.provider()).isInstanceOf(ServerProvider.class); try { - LoggingServerProvider.init(new InternalLoggingServerInterceptor.FactoryImpl(mockSink)); + LoggingServerProvider.init( + new InternalLoggingServerInterceptor.FactoryImpl(mockSink, null, null, null)); fail("should have failed for calling init() again"); } catch (IllegalStateException e) { assertThat(e).hasMessageThat().contains("LoggingServerProvider already initialized!"); } - LoggingServerProvider.finish(); + LoggingServerProvider.shutdown(); assertThat(ServerProvider.provider()).isSameInstanceAs(prevProvider); } @@ -100,7 +102,7 @@ public class LoggingServerProviderTest { cleanupRule.register(channel)); assertThat(unaryRpc("buddy", stub)).isEqualTo("Hello buddy"); verify(interceptor).interceptCall(any(ServerCall.class), any(Metadata.class), anyCallHandler()); - LoggingServerProvider.finish(); + LoggingServerProvider.shutdown(); } private ServerCallHandler anyCallHandler() { diff --git a/observability/src/test/java/io/grpc/observability/LoggingTest.java b/observability/src/test/java/io/grpc/observability/LoggingTest.java index bceb9bb31d..8c2a0a6965 100644 --- a/observability/src/test/java/io/grpc/observability/LoggingTest.java +++ b/observability/src/test/java/io/grpc/observability/LoggingTest.java @@ -52,12 +52,12 @@ public class LoggingTest { throws IOException { Sink sink = new GcpLogSink(PROJECT_ID); LoggingServerProvider.init( - new InternalLoggingServerInterceptor.FactoryImpl(sink)); + new InternalLoggingServerInterceptor.FactoryImpl(sink, null, null, null)); Server server = ServerBuilder.forPort(0).addService(new LoggingTestHelper.SimpleServiceImpl()) .build().start(); int port = cleanupRule.register(server).getPort(); LoggingChannelProvider.init( - new InternalLoggingChannelInterceptor.FactoryImpl(sink)); + new InternalLoggingChannelInterceptor.FactoryImpl(sink, null, null, null)); ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", port) .usePlaintext().build(); SimpleServiceGrpc.SimpleServiceBlockingStub stub = SimpleServiceGrpc.newBlockingStub( @@ -65,7 +65,7 @@ public class LoggingTest { assertThat(LoggingTestHelper.makeUnaryRpcViaClientStub("buddy", stub)) .isEqualTo("Hello buddy"); sink.close(); - LoggingChannelProvider.finish(); - LoggingServerProvider.finish(); + LoggingChannelProvider.shutdown(); + LoggingServerProvider.shutdown(); } } diff --git a/observability/src/test/java/io/grpc/observability/ObservabilityConfigTest.java b/observability/src/test/java/io/grpc/observability/ObservabilityConfigImplTest.java similarity index 97% rename from observability/src/test/java/io/grpc/observability/ObservabilityConfigTest.java rename to observability/src/test/java/io/grpc/observability/ObservabilityConfigImplTest.java index f4147f0b62..ee29031748 100644 --- a/observability/src/test/java/io/grpc/observability/ObservabilityConfigTest.java +++ b/observability/src/test/java/io/grpc/observability/ObservabilityConfigImplTest.java @@ -29,7 +29,7 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @RunWith(JUnit4.class) -public class ObservabilityConfigTest { +public class ObservabilityConfigImplTest { private static final String EVENT_TYPES = "{\n" + " \"logging_config\": {\n" + " \"enable_cloud_logging\": false,\n" @@ -66,7 +66,7 @@ public class ObservabilityConfigTest { + " \"enable_cloud_logging\": false\n" + " }\n" + "}"; - ObservabilityConfig observabilityConfig = new ObservabilityConfig(); + ObservabilityConfigImpl observabilityConfig = new ObservabilityConfigImpl(); @Test public void nullConfig() throws IOException { diff --git a/observability/src/test/java/io/grpc/observability/ObservabilityTest.java b/observability/src/test/java/io/grpc/observability/ObservabilityTest.java index 37fadb6a00..79273cd04b 100644 --- a/observability/src/test/java/io/grpc/observability/ObservabilityTest.java +++ b/observability/src/test/java/io/grpc/observability/ObservabilityTest.java @@ -18,7 +18,14 @@ package io.grpc.observability; import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import io.grpc.ManagedChannelProvider; +import io.grpc.ServerProvider; +import io.grpc.observability.interceptors.InternalLoggingChannelInterceptor; +import io.grpc.observability.interceptors.InternalLoggingServerInterceptor; +import io.grpc.observability.logging.Sink; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -28,19 +35,30 @@ public class ObservabilityTest { @Test public void initFinish() { - Observability.grpcInit(); + ManagedChannelProvider prevChannelProvider = ManagedChannelProvider.provider(); + ServerProvider prevServerProvider = ServerProvider.provider(); + Sink sink = mock(Sink.class); + InternalLoggingChannelInterceptor.Factory channelInterceptorFactory = mock( + InternalLoggingChannelInterceptor.Factory.class); + InternalLoggingServerInterceptor.Factory serverInterceptorFactory = mock( + InternalLoggingServerInterceptor.Factory.class); + Observability observability = Observability.grpcInit(sink, channelInterceptorFactory, + serverInterceptorFactory); + assertThat(ManagedChannelProvider.provider()).isInstanceOf(LoggingChannelProvider.class); + assertThat(ServerProvider.provider()).isInstanceOf(ServerProvider.class); + Observability observability1 = Observability.grpcInit(sink, channelInterceptorFactory, + serverInterceptorFactory); + assertThat(observability1).isSameInstanceAs(observability); + + observability.grpcShutdown(); + verify(sink).close(); + assertThat(ManagedChannelProvider.provider()).isSameInstanceAs(prevChannelProvider); + assertThat(ServerProvider.provider()).isSameInstanceAs(prevServerProvider); try { - Observability.grpcInit(); - fail("should have failed for calling grpcInit() again"); + observability.grpcShutdown(); + fail("should have failed for calling grpcShutdown() second time"); } catch (IllegalStateException e) { - assertThat(e).hasMessageThat().contains("Observability already initialized!"); - } - Observability.grpcFinish(); - try { - Observability.grpcFinish(); - fail("should have failed for calling grpcFinish() on uninitialized"); - } catch (IllegalStateException e) { - assertThat(e).hasMessageThat().contains("Observability not initialized!"); + assertThat(e).hasMessageThat().contains("Observability already shutdown!"); } } } diff --git a/observability/src/test/java/io/grpc/observability/interceptors/InternalLoggingChannelInterceptorTest.java b/observability/src/test/java/io/grpc/observability/interceptors/InternalLoggingChannelInterceptorTest.java index 6ddb2abc32..ae14557566 100644 --- a/observability/src/test/java/io/grpc/observability/interceptors/InternalLoggingChannelInterceptorTest.java +++ b/observability/src/test/java/io/grpc/observability/interceptors/InternalLoggingChannelInterceptorTest.java @@ -86,7 +86,7 @@ public class InternalLoggingChannelInterceptorTest { @Before public void setup() throws Exception { - factory = new InternalLoggingChannelInterceptor.FactoryImpl(mockSink); + factory = new InternalLoggingChannelInterceptor.FactoryImpl(mockSink, null, null, null); interceptedListener = new AtomicReference<>(); actualClientInitial = new AtomicReference<>(); actualRequest = new AtomicReference<>(); diff --git a/observability/src/test/java/io/grpc/observability/interceptors/InternalLoggingServerInterceptorTest.java b/observability/src/test/java/io/grpc/observability/interceptors/InternalLoggingServerInterceptorTest.java index c567d76335..17813f0a27 100644 --- a/observability/src/test/java/io/grpc/observability/interceptors/InternalLoggingServerInterceptorTest.java +++ b/observability/src/test/java/io/grpc/observability/interceptors/InternalLoggingServerInterceptorTest.java @@ -83,7 +83,7 @@ public class InternalLoggingServerInterceptorTest { @Before @SuppressWarnings("unchecked") public void setup() throws Exception { - factory = new InternalLoggingServerInterceptor.FactoryImpl(mockSink); + factory = new InternalLoggingServerInterceptor.FactoryImpl(mockSink, null, null, null); interceptedLoggingCall = new AtomicReference<>(); mockListener = mock(ServerCall.Listener.class); actualServerInitial = new AtomicReference<>(); diff --git a/observability/src/test/java/io/grpc/observability/interceptors/LogHelperTest.java b/observability/src/test/java/io/grpc/observability/interceptors/LogHelperTest.java index 7998021d80..ec5ed6d3fd 100644 --- a/observability/src/test/java/io/grpc/observability/interceptors/LogHelperTest.java +++ b/observability/src/test/java/io/grpc/observability/interceptors/LogHelperTest.java @@ -35,6 +35,7 @@ import io.grpc.Metadata; import io.grpc.MethodDescriptor.Marshaller; import io.grpc.Status; import io.grpc.internal.TimeProvider; +import io.grpc.observability.ObservabilityConfig; import io.grpc.observability.interceptors.LogHelper.PayloadBuilder; import io.grpc.observability.logging.GcpLogSink; import io.grpc.observability.logging.Sink; @@ -53,6 +54,7 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.charset.Charset; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.junit.Before; import org.junit.Test; @@ -101,16 +103,14 @@ public class LogHelperTest { private final Sink sink = mock(GcpLogSink.class); private final Timestamp timestamp = Timestamp.newBuilder().setSeconds(9876).setNanos(54321).build(); - private final TimeProvider timeProvider = new TimeProvider() { - @Override - public long currentTimeNanos() { - return TimeUnit.SECONDS.toNanos(9876) + 54321; - } - }; + private final TimeProvider timeProvider = () -> TimeUnit.SECONDS.toNanos(9876) + 54321; + @SuppressWarnings("unchecked") private final Map locationTags = mock(Map.class); + @SuppressWarnings("unchecked") private final Map customTags = mock(Map.class); + private final ObservabilityConfig observabilityConfig = mock(ObservabilityConfig.class); private final LogHelper logHelper = new LogHelper( sink, - timeProvider); + timeProvider, locationTags, customTags, observabilityConfig); @Before public void setUp() throws Exception {