diff --git a/gcp-observability/src/main/java/io/grpc/gcp/observability/logging/GcpLogSink.java b/gcp-observability/src/main/java/io/grpc/gcp/observability/logging/GcpLogSink.java index 9b301f653d..61647789db 100644 --- a/gcp-observability/src/main/java/io/grpc/gcp/observability/logging/GcpLogSink.java +++ b/gcp-observability/src/main/java/io/grpc/gcp/observability/logging/GcpLogSink.java @@ -24,14 +24,15 @@ import com.google.cloud.logging.Payload.JsonPayload; import com.google.cloud.logging.Severity; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.protobuf.util.JsonFormat; import io.grpc.internal.JsonParser; import io.grpc.observabilitylog.v1.GrpcLogRecord; import java.io.IOException; import java.util.Collections; -import java.util.HashMap; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.logging.Level; import java.util.logging.Logger; @@ -71,15 +72,16 @@ public class GcpLogSink implements Sink { */ public GcpLogSink(String destinationProjectId, Map locationTags, Map customTags, Long flushLimit) { - this(createLoggingClient(destinationProjectId), locationTags, customTags, flushLimit); + this(createLoggingClient(destinationProjectId), destinationProjectId, locationTags, + customTags, flushLimit); } @VisibleForTesting - GcpLogSink(Logging client, Map locationTags, Map customTags, - Long flushLimit) { + GcpLogSink(Logging client, String destinationProjectId, Map locationTags, + Map customTags, Long flushLimit) { this.gcpLoggingClient = client; - this.customTags = customTags != null ? customTags : new HashMap<>(); + this.customTags = getCustomTags(customTags, locationTags, destinationProjectId); this.kubernetesResource = getResource(locationTags); this.flushLimit = flushLimit != null ? flushLimit : FALLBACK_FLUSH_LIMIT; this.flushCounter = 0L; @@ -109,6 +111,7 @@ public class GcpLogSink implements Sink { .setSeverity(logEntrySeverity) .setLogName(DEFAULT_LOG_NAME) .setResource(kubernetesResource); + if (!customTags.isEmpty()) { grpcLogEntryBuilder.setLabels(customTags); } @@ -127,6 +130,21 @@ public class GcpLogSink implements Sink { } } + @VisibleForTesting + static Map getCustomTags(Map customTags, + Map locationTags, String destinationProjectId) { + ImmutableMap.Builder tagsBuilder = ImmutableMap.builder(); + String sourceProjectId = locationTags.get("project_id"); + if (!Strings.isNullOrEmpty(destinationProjectId) + && !Objects.equals(sourceProjectId, destinationProjectId)) { + tagsBuilder.put("source_project_id", sourceProjectId); + } + if (customTags != null) { + tagsBuilder.putAll(customTags); + } + return tagsBuilder.build(); + } + @VisibleForTesting static MonitoredResource getResource(Map resourceTags) { MonitoredResource.Builder builder = MonitoredResource.newBuilder(K8S_MONITORED_RESOURCE_TYPE); diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/interceptors/ConfigFilterHelperTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/interceptors/ConfigFilterHelperTest.java index 58bf794f8b..ba6e05e2dc 100644 --- a/gcp-observability/src/test/java/io/grpc/gcp/observability/interceptors/ConfigFilterHelperTest.java +++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/interceptors/ConfigFilterHelperTest.java @@ -17,7 +17,6 @@ package io.grpc.gcp.observability.interceptors; import static com.google.common.truth.Truth.assertThat; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.mock; @@ -107,15 +106,15 @@ public class ConfigFilterHelperTest { FilterParams.create(true, 128, 128)); expectedServiceFilters.put("service2", FilterParams.create(true, 2048, 1024)); - assertEquals(configFilterHelper.perServiceFilters, expectedServiceFilters); + assertThat(configFilterHelper.perServiceFilters).isEqualTo(expectedServiceFilters); Map expectedMethodFilters = new HashMap<>(); expectedMethodFilters.put("service1/Method2", FilterParams.create(true, 1024, 1024)); - assertEquals(configFilterHelper.perMethodFilters, expectedMethodFilters); + assertThat(configFilterHelper.perMethodFilters).isEqualTo(expectedMethodFilters); Set expectedLogEventTypeSet = ImmutableSet.copyOf(configEventTypes); - assertEquals(configFilterHelper.logEventTypeSet, expectedLogEventTypeSet); + assertThat(configFilterHelper.logEventTypeSet).isEqualTo(expectedLogEventTypeSet); } @Test @@ -130,7 +129,7 @@ public class ConfigFilterHelperTest { method = builder.setFullMethodName("service1/Method6").build(); FilterParams resultParams = configFilterHelper.isMethodToBeLogged(method); - assertEquals(resultParams, expectedParams); + assertThat(resultParams).isEqualTo(expectedParams); } @Test @@ -146,7 +145,7 @@ public class ConfigFilterHelperTest { method = builder.setFullMethodName("service3/Method3").build(); FilterParams resultParams = configFilterHelper.isMethodToBeLogged(method); - assertEquals(resultParams, expectedParams); + assertThat(resultParams).isEqualTo(expectedParams); } @Test @@ -159,14 +158,14 @@ public class ConfigFilterHelperTest { method = builder.setFullMethodName("service1/Method2").build(); FilterParams resultParams = configFilterHelper.isMethodToBeLogged(method); - assertEquals(resultParams, expectedParams); + assertThat(resultParams).isEqualTo(expectedParams); FilterParams expectedParamsWildCard = FilterParams.create(true, 2048, 1024); method = builder.setFullMethodName("service2/Method1").build(); FilterParams resultParamsWildCard = configFilterHelper.isMethodToBeLogged(method); - assertEquals(resultParamsWildCard, expectedParamsWildCard); + assertThat(resultParamsWildCard).isEqualTo(expectedParamsWildCard); } @Test diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/interceptors/LogHelperTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/interceptors/LogHelperTest.java index 1334a91077..209543595d 100644 --- a/gcp-observability/src/test/java/io/grpc/gcp/observability/interceptors/LogHelperTest.java +++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/interceptors/LogHelperTest.java @@ -18,7 +18,6 @@ package io.grpc.gcp.observability.interceptors; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.truth.Truth.assertThat; -import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; @@ -118,14 +117,13 @@ public class LogHelperTest { InetAddress address = InetAddress.getByName("127.0.0.1"); int port = 12345; InetSocketAddress socketAddress = new InetSocketAddress(address, port); - assertEquals( - Address + assertThat(LogHelper.socketAddressToProto(socketAddress)) + .isEqualTo(Address .newBuilder() .setType(Address.Type.TYPE_IPV4) .setAddress("127.0.0.1") .setIpPort(12345) - .build(), - LogHelper.socketAddressToProto(socketAddress)); + .build()); } @Test @@ -134,14 +132,13 @@ public class LogHelperTest { InetAddress address = InetAddress.getByName("2001:db8:0:0:0:0:2:1"); int port = 12345; InetSocketAddress socketAddress = new InetSocketAddress(address, port); - assertEquals( - Address + assertThat(LogHelper.socketAddressToProto(socketAddress)) + .isEqualTo(Address .newBuilder() .setType(Address.Type.TYPE_IPV6) .setAddress("2001:db8::2:1") // RFC 5952 section 4: ipv6 canonical form required .setIpPort(12345) - .build(), - LogHelper.socketAddressToProto(socketAddress)); + .build()); } @Test @@ -152,31 +149,30 @@ public class LogHelperTest { return "some-socket-address"; } }; - assertEquals( - Address.newBuilder() + assertThat(LogHelper.socketAddressToProto(unknownSocket)) + .isEqualTo(Address.newBuilder() .setType(Address.Type.TYPE_UNKNOWN) .setAddress("some-socket-address") - .build(), - LogHelper.socketAddressToProto(unknownSocket)); + .build()); } @Test public void metadataToProto_empty() { - assertEquals( - GrpcLogRecord.newBuilder() + assertThat(metadataToProtoTestHelper( + EventType.GRPC_CALL_REQUEST_HEADER, new Metadata(), Integer.MAX_VALUE)) + .isEqualTo(GrpcLogRecord.newBuilder() .setEventType(EventType.GRPC_CALL_REQUEST_HEADER) .setMetadata( GrpcLogRecord.Metadata.getDefaultInstance()) - .build(), - metadataToProtoTestHelper( - EventType.GRPC_CALL_REQUEST_HEADER, new Metadata(), Integer.MAX_VALUE)); + .build()); } @Test public void metadataToProto() { int nonEmptyMetadataSize = 30; - assertEquals( - GrpcLogRecord.newBuilder() + assertThat(metadataToProtoTestHelper( + EventType.GRPC_CALL_REQUEST_HEADER, nonEmptyMetadata, Integer.MAX_VALUE)) + .isEqualTo(GrpcLogRecord.newBuilder() .setEventType(EventType.GRPC_CALL_REQUEST_HEADER) .setMetadata( GrpcLogRecord.Metadata @@ -186,9 +182,7 @@ public class LogHelperTest { .addEntry(ENTRY_C) .build()) .setPayloadSize(nonEmptyMetadataSize) - .build(), - metadataToProtoTestHelper( - EventType.GRPC_CALL_REQUEST_HEADER, nonEmptyMetadata, Integer.MAX_VALUE)); + .build()); } @Test @@ -199,51 +193,44 @@ public class LogHelperTest { @Test public void metadataToProto_truncated() { // 0 byte limit not enough for any metadata - assertEquals( - io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata.getDefaultInstance(), - LogHelper.createMetadataProto(nonEmptyMetadata, 0).payload.build()); + assertThat(LogHelper.createMetadataProto(nonEmptyMetadata, 0).payload.build()) + .isEqualTo(io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata.getDefaultInstance()); // not enough bytes for first key value - assertEquals( - io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata.getDefaultInstance(), - LogHelper.createMetadataProto(nonEmptyMetadata, 9).payload.build()); + assertThat(LogHelper.createMetadataProto(nonEmptyMetadata, 9).payload.build()) + .isEqualTo(io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata.getDefaultInstance()); // enough for first key value - assertEquals( - io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata + assertThat(LogHelper.createMetadataProto(nonEmptyMetadata, 10).payload.build()) + .isEqualTo(io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata .newBuilder() .addEntry(ENTRY_A) - .build(), - LogHelper.createMetadataProto(nonEmptyMetadata, 10).payload.build()); + .build()); // Test edge cases for >= 2 key values - assertEquals( - io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata + assertThat(LogHelper.createMetadataProto(nonEmptyMetadata, 19).payload.build()) + .isEqualTo(io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata .newBuilder() .addEntry(ENTRY_A) - .build(), - LogHelper.createMetadataProto(nonEmptyMetadata, 19).payload.build()); - assertEquals( - io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata + .build()); + assertThat(LogHelper.createMetadataProto(nonEmptyMetadata, 20).payload.build()) + .isEqualTo(io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata .newBuilder() .addEntry(ENTRY_A) .addEntry(ENTRY_B) - .build(), - LogHelper.createMetadataProto(nonEmptyMetadata, 20).payload.build()); - assertEquals( - io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata + .build()); + assertThat(LogHelper.createMetadataProto(nonEmptyMetadata, 29).payload.build()) + .isEqualTo(io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata .newBuilder() .addEntry(ENTRY_A) .addEntry(ENTRY_B) - .build(), - LogHelper.createMetadataProto(nonEmptyMetadata, 29).payload.build()); + .build()); // not truncated: enough for all keys - assertEquals( - io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata + assertThat(LogHelper.createMetadataProto(nonEmptyMetadata, 30).payload.build()) + .isEqualTo(io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata .newBuilder() .addEntry(ENTRY_A) .addEntry(ENTRY_B) .addEntry(ENTRY_C) - .build(), - LogHelper.createMetadataProto(nonEmptyMetadata, 30).payload.build()); + .build()); } @Test @@ -251,12 +238,11 @@ public class LogHelperTest { byte[] bytes = "this is a long message: AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA".getBytes( StandardCharsets.US_ASCII); - assertEquals( - GrpcLogRecord.newBuilder() + assertThat(messageTestHelper(bytes, Integer.MAX_VALUE)) + .isEqualTo(GrpcLogRecord.newBuilder() .setMessage(ByteString.copyFrom(bytes)) .setPayloadSize(bytes.length) - .build(), - messageTestHelper(bytes, Integer.MAX_VALUE)); + .build()); } @Test @@ -264,22 +250,20 @@ public class LogHelperTest { byte[] bytes = "this is a long message: AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA".getBytes( StandardCharsets.US_ASCII); - assertEquals( - GrpcLogRecord.newBuilder() + assertThat(messageTestHelper(bytes, 0)) + .isEqualTo(GrpcLogRecord.newBuilder() .setPayloadSize(bytes.length) .setPayloadTruncated(true) - .build(), - messageTestHelper(bytes, 0)); + .build()); int limit = 10; String truncatedMessage = "this is a "; - assertEquals( - GrpcLogRecord.newBuilder() + assertThat(messageTestHelper(bytes, limit)) + .isEqualTo(GrpcLogRecord.newBuilder() .setMessage(ByteString.copyFrom(truncatedMessage.getBytes(StandardCharsets.US_ASCII))) .setPayloadSize(bytes.length) .setPayloadTruncated(true) - .build(), - messageTestHelper(bytes, limit)); + .build()); } @@ -569,10 +553,8 @@ public class LogHelperTest { int zeroHeaderBytes = 0; PayloadBuilder pair = LogHelper.createMetadataProto(metadata, zeroHeaderBytes); - assertEquals( - key.name(), - Objects.requireNonNull(Iterables.getOnlyElement(pair.payload.getEntryBuilderList())) - .getKey()); + assertThat(Objects.requireNonNull(Iterables.getOnlyElement(pair.payload.getEntryBuilderList())) + .getKey()).isEqualTo(key.name()); assertFalse(pair.truncated); } diff --git a/gcp-observability/src/test/java/io/grpc/gcp/observability/logging/GcpLogSinkTest.java b/gcp-observability/src/test/java/io/grpc/gcp/observability/logging/GcpLogSinkTest.java index 626e8c4619..7d100cfb94 100644 --- a/gcp-observability/src/test/java/io/grpc/gcp/observability/logging/GcpLogSinkTest.java +++ b/gcp-observability/src/test/java/io/grpc/gcp/observability/logging/GcpLogSinkTest.java @@ -17,7 +17,6 @@ package io.grpc.gcp.observability.logging; import static com.google.common.truth.Truth.assertThat; -import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.anyIterable; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -67,6 +66,7 @@ public class GcpLogSinkTest { "KEY2", "VALUE2"); private static final long flushLimit = 10L; private final long seqId = 1; + private final String destProjectName = "PROJECT"; private final String serviceName = "service"; private final String methodName = "method"; private final String authority = "authority"; @@ -103,14 +103,16 @@ public class GcpLogSinkTest { @Test public void createSink() { - Sink mockSink = new GcpLogSink(mockLogging, locationTags, customTags, flushLimit); + Sink mockSink = new GcpLogSink(mockLogging, destProjectName, locationTags, + customTags, flushLimit); assertThat(mockSink).isInstanceOf(GcpLogSink.class); } @Test @SuppressWarnings("unchecked") public void verifyWrite() throws Exception { - Sink mockSink = new GcpLogSink(mockLogging, locationTags, customTags, flushLimit); + Sink mockSink = new GcpLogSink(mockLogging, destProjectName, locationTags, + customTags, flushLimit); mockSink.write(logProto); ArgumentCaptor> logEntrySetCaptor = ArgumentCaptor.forClass( @@ -119,7 +121,7 @@ public class GcpLogSinkTest { for (Iterator it = logEntrySetCaptor.getValue().iterator(); it.hasNext(); ) { LogEntry entry = it.next(); System.out.println(entry); - assertEquals(entry.getPayload().getData(), expectedStructLogProto); + assertThat(entry.getPayload().getData()).isEqualTo(expectedStructLogProto); } verifyNoMoreInteractions(mockLogging); } @@ -127,7 +129,8 @@ public class GcpLogSinkTest { @Test @SuppressWarnings("unchecked") public void verifyWriteWithTags() { - GcpLogSink mockSink = new GcpLogSink(mockLogging, locationTags, customTags, flushLimit); + GcpLogSink mockSink = new GcpLogSink(mockLogging, destProjectName, locationTags, + customTags, flushLimit); MonitoredResource expectedMonitoredResource = GcpLogSink.getResource(locationTags); mockSink.write(logProto); @@ -137,9 +140,9 @@ public class GcpLogSinkTest { System.out.println(logEntrySetCaptor.getValue()); for (Iterator it = logEntrySetCaptor.getValue().iterator(); it.hasNext(); ) { LogEntry entry = it.next(); - assertEquals(entry.getResource(), expectedMonitoredResource); - assertEquals(entry.getLabels(), customTags); - assertEquals(entry.getPayload().getData(), expectedStructLogProto); + assertThat(entry.getResource()).isEqualTo(expectedMonitoredResource); + assertThat(entry.getLabels()).isEqualTo(customTags); + assertThat(entry.getPayload().getData()).isEqualTo(expectedStructLogProto); } verifyNoMoreInteractions(mockLogging); } @@ -149,7 +152,8 @@ public class GcpLogSinkTest { public void emptyCustomTags_labelsNotSet() { Map emptyCustomTags = null; Map expectedEmptyLabels = new HashMap<>(); - GcpLogSink mockSink = new GcpLogSink(mockLogging, locationTags, emptyCustomTags, flushLimit); + GcpLogSink mockSink = new GcpLogSink(mockLogging, destProjectName, locationTags, + emptyCustomTags, flushLimit); mockSink.write(logProto); ArgumentCaptor> logEntrySetCaptor = ArgumentCaptor.forClass( @@ -157,15 +161,37 @@ public class GcpLogSinkTest { verify(mockLogging, times(1)).write(logEntrySetCaptor.capture()); for (Iterator it = logEntrySetCaptor.getValue().iterator(); it.hasNext(); ) { LogEntry entry = it.next(); - assertEquals(entry.getLabels(), expectedEmptyLabels); - assertEquals(entry.getPayload().getData(), expectedStructLogProto); + assertThat(entry.getLabels()).isEqualTo(expectedEmptyLabels); + assertThat(entry.getPayload().getData()).isEqualTo(expectedStructLogProto); + } + } + + @Test + @SuppressWarnings("unchecked") + public void emptyCustomTags_setSourceProject() { + Map emptyCustomTags = null; + String destinationProjectId = "DESTINATION_PROJECT"; + Map expectedLabels = GcpLogSink.getCustomTags(emptyCustomTags, locationTags, + destinationProjectId); + GcpLogSink mockSink = new GcpLogSink(mockLogging, destinationProjectId, locationTags, + emptyCustomTags, flushLimit); + mockSink.write(logProto); + + ArgumentCaptor> logEntrySetCaptor = ArgumentCaptor.forClass( + (Class) Collection.class); + verify(mockLogging, times(1)).write(logEntrySetCaptor.capture()); + for (Iterator it = logEntrySetCaptor.getValue().iterator(); it.hasNext(); ) { + LogEntry entry = it.next(); + assertThat(entry.getLabels()).isEqualTo(expectedLabels); + assertThat(entry.getPayload().getData()).isEqualTo(expectedStructLogProto); } } @Test public void verifyFlush() { long lowerFlushLimit = 2L; - GcpLogSink mockSink = new GcpLogSink(mockLogging, locationTags, customTags, lowerFlushLimit); + GcpLogSink mockSink = new GcpLogSink(mockLogging, destProjectName, locationTags, + customTags, lowerFlushLimit); mockSink.write(logProto); verify(mockLogging, never()).flush(); mockSink.write(logProto); @@ -177,7 +203,8 @@ public class GcpLogSinkTest { @Test public void verifyClose() throws Exception { - Sink mockSink = new GcpLogSink(mockLogging, locationTags, customTags, flushLimit); + Sink mockSink = new GcpLogSink(mockLogging, destProjectName, locationTags, + customTags, flushLimit); mockSink.write(logProto); verify(mockLogging, times(1)).write(anyIterable()); mockSink.close();