gcp-observability: add source project id to labels when cross project logging is enabled (#9056)

This commit is contained in:
DNVindhya 2022-04-05 11:29:45 -07:00 committed by GitHub
parent 78308c0c6a
commit adab27bb0f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 117 additions and 91 deletions

View File

@ -24,14 +24,15 @@ import com.google.cloud.logging.Payload.JsonPayload;
import com.google.cloud.logging.Severity;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.protobuf.util.JsonFormat;
import io.grpc.internal.JsonParser;
import io.grpc.observabilitylog.v1.GrpcLogRecord;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
@ -71,15 +72,16 @@ public class GcpLogSink implements Sink {
*/
public GcpLogSink(String destinationProjectId, Map<String, String> locationTags,
Map<String, String> customTags, Long flushLimit) {
this(createLoggingClient(destinationProjectId), locationTags, customTags, flushLimit);
this(createLoggingClient(destinationProjectId), destinationProjectId, locationTags,
customTags, flushLimit);
}
@VisibleForTesting
GcpLogSink(Logging client, Map<String, String> locationTags, Map<String, String> customTags,
Long flushLimit) {
GcpLogSink(Logging client, String destinationProjectId, Map<String, String> locationTags,
Map<String, String> customTags, Long flushLimit) {
this.gcpLoggingClient = client;
this.customTags = customTags != null ? customTags : new HashMap<>();
this.customTags = getCustomTags(customTags, locationTags, destinationProjectId);
this.kubernetesResource = getResource(locationTags);
this.flushLimit = flushLimit != null ? flushLimit : FALLBACK_FLUSH_LIMIT;
this.flushCounter = 0L;
@ -109,6 +111,7 @@ public class GcpLogSink implements Sink {
.setSeverity(logEntrySeverity)
.setLogName(DEFAULT_LOG_NAME)
.setResource(kubernetesResource);
if (!customTags.isEmpty()) {
grpcLogEntryBuilder.setLabels(customTags);
}
@ -127,6 +130,21 @@ public class GcpLogSink implements Sink {
}
}
@VisibleForTesting
static Map<String, String> getCustomTags(Map<String, String> customTags,
Map<String, String> locationTags, String destinationProjectId) {
ImmutableMap.Builder<String, String> tagsBuilder = ImmutableMap.builder();
String sourceProjectId = locationTags.get("project_id");
if (!Strings.isNullOrEmpty(destinationProjectId)
&& !Objects.equals(sourceProjectId, destinationProjectId)) {
tagsBuilder.put("source_project_id", sourceProjectId);
}
if (customTags != null) {
tagsBuilder.putAll(customTags);
}
return tagsBuilder.build();
}
@VisibleForTesting
static MonitoredResource getResource(Map<String, String> resourceTags) {
MonitoredResource.Builder builder = MonitoredResource.newBuilder(K8S_MONITORED_RESOURCE_TYPE);

View File

@ -17,7 +17,6 @@
package io.grpc.gcp.observability.interceptors;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
@ -107,15 +106,15 @@ public class ConfigFilterHelperTest {
FilterParams.create(true, 128, 128));
expectedServiceFilters.put("service2",
FilterParams.create(true, 2048, 1024));
assertEquals(configFilterHelper.perServiceFilters, expectedServiceFilters);
assertThat(configFilterHelper.perServiceFilters).isEqualTo(expectedServiceFilters);
Map<String, FilterParams> expectedMethodFilters = new HashMap<>();
expectedMethodFilters.put("service1/Method2",
FilterParams.create(true, 1024, 1024));
assertEquals(configFilterHelper.perMethodFilters, expectedMethodFilters);
assertThat(configFilterHelper.perMethodFilters).isEqualTo(expectedMethodFilters);
Set<EventType> expectedLogEventTypeSet = ImmutableSet.copyOf(configEventTypes);
assertEquals(configFilterHelper.logEventTypeSet, expectedLogEventTypeSet);
assertThat(configFilterHelper.logEventTypeSet).isEqualTo(expectedLogEventTypeSet);
}
@Test
@ -130,7 +129,7 @@ public class ConfigFilterHelperTest {
method = builder.setFullMethodName("service1/Method6").build();
FilterParams resultParams
= configFilterHelper.isMethodToBeLogged(method);
assertEquals(resultParams, expectedParams);
assertThat(resultParams).isEqualTo(expectedParams);
}
@Test
@ -146,7 +145,7 @@ public class ConfigFilterHelperTest {
method = builder.setFullMethodName("service3/Method3").build();
FilterParams resultParams
= configFilterHelper.isMethodToBeLogged(method);
assertEquals(resultParams, expectedParams);
assertThat(resultParams).isEqualTo(expectedParams);
}
@Test
@ -159,14 +158,14 @@ public class ConfigFilterHelperTest {
method = builder.setFullMethodName("service1/Method2").build();
FilterParams resultParams
= configFilterHelper.isMethodToBeLogged(method);
assertEquals(resultParams, expectedParams);
assertThat(resultParams).isEqualTo(expectedParams);
FilterParams expectedParamsWildCard =
FilterParams.create(true, 2048, 1024);
method = builder.setFullMethodName("service2/Method1").build();
FilterParams resultParamsWildCard
= configFilterHelper.isMethodToBeLogged(method);
assertEquals(resultParamsWildCard, expectedParamsWildCard);
assertThat(resultParamsWildCard).isEqualTo(expectedParamsWildCard);
}
@Test

View File

@ -18,7 +18,6 @@ package io.grpc.gcp.observability.interceptors;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
@ -118,14 +117,13 @@ public class LogHelperTest {
InetAddress address = InetAddress.getByName("127.0.0.1");
int port = 12345;
InetSocketAddress socketAddress = new InetSocketAddress(address, port);
assertEquals(
Address
assertThat(LogHelper.socketAddressToProto(socketAddress))
.isEqualTo(Address
.newBuilder()
.setType(Address.Type.TYPE_IPV4)
.setAddress("127.0.0.1")
.setIpPort(12345)
.build(),
LogHelper.socketAddressToProto(socketAddress));
.build());
}
@Test
@ -134,14 +132,13 @@ public class LogHelperTest {
InetAddress address = InetAddress.getByName("2001:db8:0:0:0:0:2:1");
int port = 12345;
InetSocketAddress socketAddress = new InetSocketAddress(address, port);
assertEquals(
Address
assertThat(LogHelper.socketAddressToProto(socketAddress))
.isEqualTo(Address
.newBuilder()
.setType(Address.Type.TYPE_IPV6)
.setAddress("2001:db8::2:1") // RFC 5952 section 4: ipv6 canonical form required
.setIpPort(12345)
.build(),
LogHelper.socketAddressToProto(socketAddress));
.build());
}
@Test
@ -152,31 +149,30 @@ public class LogHelperTest {
return "some-socket-address";
}
};
assertEquals(
Address.newBuilder()
assertThat(LogHelper.socketAddressToProto(unknownSocket))
.isEqualTo(Address.newBuilder()
.setType(Address.Type.TYPE_UNKNOWN)
.setAddress("some-socket-address")
.build(),
LogHelper.socketAddressToProto(unknownSocket));
.build());
}
@Test
public void metadataToProto_empty() {
assertEquals(
GrpcLogRecord.newBuilder()
assertThat(metadataToProtoTestHelper(
EventType.GRPC_CALL_REQUEST_HEADER, new Metadata(), Integer.MAX_VALUE))
.isEqualTo(GrpcLogRecord.newBuilder()
.setEventType(EventType.GRPC_CALL_REQUEST_HEADER)
.setMetadata(
GrpcLogRecord.Metadata.getDefaultInstance())
.build(),
metadataToProtoTestHelper(
EventType.GRPC_CALL_REQUEST_HEADER, new Metadata(), Integer.MAX_VALUE));
.build());
}
@Test
public void metadataToProto() {
int nonEmptyMetadataSize = 30;
assertEquals(
GrpcLogRecord.newBuilder()
assertThat(metadataToProtoTestHelper(
EventType.GRPC_CALL_REQUEST_HEADER, nonEmptyMetadata, Integer.MAX_VALUE))
.isEqualTo(GrpcLogRecord.newBuilder()
.setEventType(EventType.GRPC_CALL_REQUEST_HEADER)
.setMetadata(
GrpcLogRecord.Metadata
@ -186,9 +182,7 @@ public class LogHelperTest {
.addEntry(ENTRY_C)
.build())
.setPayloadSize(nonEmptyMetadataSize)
.build(),
metadataToProtoTestHelper(
EventType.GRPC_CALL_REQUEST_HEADER, nonEmptyMetadata, Integer.MAX_VALUE));
.build());
}
@Test
@ -199,51 +193,44 @@ public class LogHelperTest {
@Test
public void metadataToProto_truncated() {
// 0 byte limit not enough for any metadata
assertEquals(
io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata.getDefaultInstance(),
LogHelper.createMetadataProto(nonEmptyMetadata, 0).payload.build());
assertThat(LogHelper.createMetadataProto(nonEmptyMetadata, 0).payload.build())
.isEqualTo(io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata.getDefaultInstance());
// not enough bytes for first key value
assertEquals(
io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata.getDefaultInstance(),
LogHelper.createMetadataProto(nonEmptyMetadata, 9).payload.build());
assertThat(LogHelper.createMetadataProto(nonEmptyMetadata, 9).payload.build())
.isEqualTo(io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata.getDefaultInstance());
// enough for first key value
assertEquals(
io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata
assertThat(LogHelper.createMetadataProto(nonEmptyMetadata, 10).payload.build())
.isEqualTo(io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata
.newBuilder()
.addEntry(ENTRY_A)
.build(),
LogHelper.createMetadataProto(nonEmptyMetadata, 10).payload.build());
.build());
// Test edge cases for >= 2 key values
assertEquals(
io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata
assertThat(LogHelper.createMetadataProto(nonEmptyMetadata, 19).payload.build())
.isEqualTo(io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata
.newBuilder()
.addEntry(ENTRY_A)
.build(),
LogHelper.createMetadataProto(nonEmptyMetadata, 19).payload.build());
assertEquals(
io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata
.build());
assertThat(LogHelper.createMetadataProto(nonEmptyMetadata, 20).payload.build())
.isEqualTo(io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata
.newBuilder()
.addEntry(ENTRY_A)
.addEntry(ENTRY_B)
.build(),
LogHelper.createMetadataProto(nonEmptyMetadata, 20).payload.build());
assertEquals(
io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata
.build());
assertThat(LogHelper.createMetadataProto(nonEmptyMetadata, 29).payload.build())
.isEqualTo(io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata
.newBuilder()
.addEntry(ENTRY_A)
.addEntry(ENTRY_B)
.build(),
LogHelper.createMetadataProto(nonEmptyMetadata, 29).payload.build());
.build());
// not truncated: enough for all keys
assertEquals(
io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata
assertThat(LogHelper.createMetadataProto(nonEmptyMetadata, 30).payload.build())
.isEqualTo(io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata
.newBuilder()
.addEntry(ENTRY_A)
.addEntry(ENTRY_B)
.addEntry(ENTRY_C)
.build(),
LogHelper.createMetadataProto(nonEmptyMetadata, 30).payload.build());
.build());
}
@Test
@ -251,12 +238,11 @@ public class LogHelperTest {
byte[] bytes
= "this is a long message: AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA".getBytes(
StandardCharsets.US_ASCII);
assertEquals(
GrpcLogRecord.newBuilder()
assertThat(messageTestHelper(bytes, Integer.MAX_VALUE))
.isEqualTo(GrpcLogRecord.newBuilder()
.setMessage(ByteString.copyFrom(bytes))
.setPayloadSize(bytes.length)
.build(),
messageTestHelper(bytes, Integer.MAX_VALUE));
.build());
}
@Test
@ -264,22 +250,20 @@ public class LogHelperTest {
byte[] bytes
= "this is a long message: AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA".getBytes(
StandardCharsets.US_ASCII);
assertEquals(
GrpcLogRecord.newBuilder()
assertThat(messageTestHelper(bytes, 0))
.isEqualTo(GrpcLogRecord.newBuilder()
.setPayloadSize(bytes.length)
.setPayloadTruncated(true)
.build(),
messageTestHelper(bytes, 0));
.build());
int limit = 10;
String truncatedMessage = "this is a ";
assertEquals(
GrpcLogRecord.newBuilder()
assertThat(messageTestHelper(bytes, limit))
.isEqualTo(GrpcLogRecord.newBuilder()
.setMessage(ByteString.copyFrom(truncatedMessage.getBytes(StandardCharsets.US_ASCII)))
.setPayloadSize(bytes.length)
.setPayloadTruncated(true)
.build(),
messageTestHelper(bytes, limit));
.build());
}
@ -569,10 +553,8 @@ public class LogHelperTest {
int zeroHeaderBytes = 0;
PayloadBuilder<io.grpc.observabilitylog.v1.GrpcLogRecord.Metadata.Builder> pair =
LogHelper.createMetadataProto(metadata, zeroHeaderBytes);
assertEquals(
key.name(),
Objects.requireNonNull(Iterables.getOnlyElement(pair.payload.getEntryBuilderList()))
.getKey());
assertThat(Objects.requireNonNull(Iterables.getOnlyElement(pair.payload.getEntryBuilderList()))
.getKey()).isEqualTo(key.name());
assertFalse(pair.truncated);
}

View File

@ -17,7 +17,6 @@
package io.grpc.gcp.observability.logging;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.anyIterable;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@ -67,6 +66,7 @@ public class GcpLogSinkTest {
"KEY2", "VALUE2");
private static final long flushLimit = 10L;
private final long seqId = 1;
private final String destProjectName = "PROJECT";
private final String serviceName = "service";
private final String methodName = "method";
private final String authority = "authority";
@ -103,14 +103,16 @@ public class GcpLogSinkTest {
@Test
public void createSink() {
Sink mockSink = new GcpLogSink(mockLogging, locationTags, customTags, flushLimit);
Sink mockSink = new GcpLogSink(mockLogging, destProjectName, locationTags,
customTags, flushLimit);
assertThat(mockSink).isInstanceOf(GcpLogSink.class);
}
@Test
@SuppressWarnings("unchecked")
public void verifyWrite() throws Exception {
Sink mockSink = new GcpLogSink(mockLogging, locationTags, customTags, flushLimit);
Sink mockSink = new GcpLogSink(mockLogging, destProjectName, locationTags,
customTags, flushLimit);
mockSink.write(logProto);
ArgumentCaptor<Collection<LogEntry>> logEntrySetCaptor = ArgumentCaptor.forClass(
@ -119,7 +121,7 @@ public class GcpLogSinkTest {
for (Iterator<LogEntry> it = logEntrySetCaptor.getValue().iterator(); it.hasNext(); ) {
LogEntry entry = it.next();
System.out.println(entry);
assertEquals(entry.getPayload().getData(), expectedStructLogProto);
assertThat(entry.getPayload().getData()).isEqualTo(expectedStructLogProto);
}
verifyNoMoreInteractions(mockLogging);
}
@ -127,7 +129,8 @@ public class GcpLogSinkTest {
@Test
@SuppressWarnings("unchecked")
public void verifyWriteWithTags() {
GcpLogSink mockSink = new GcpLogSink(mockLogging, locationTags, customTags, flushLimit);
GcpLogSink mockSink = new GcpLogSink(mockLogging, destProjectName, locationTags,
customTags, flushLimit);
MonitoredResource expectedMonitoredResource = GcpLogSink.getResource(locationTags);
mockSink.write(logProto);
@ -137,9 +140,9 @@ public class GcpLogSinkTest {
System.out.println(logEntrySetCaptor.getValue());
for (Iterator<LogEntry> it = logEntrySetCaptor.getValue().iterator(); it.hasNext(); ) {
LogEntry entry = it.next();
assertEquals(entry.getResource(), expectedMonitoredResource);
assertEquals(entry.getLabels(), customTags);
assertEquals(entry.getPayload().getData(), expectedStructLogProto);
assertThat(entry.getResource()).isEqualTo(expectedMonitoredResource);
assertThat(entry.getLabels()).isEqualTo(customTags);
assertThat(entry.getPayload().getData()).isEqualTo(expectedStructLogProto);
}
verifyNoMoreInteractions(mockLogging);
}
@ -149,7 +152,8 @@ public class GcpLogSinkTest {
public void emptyCustomTags_labelsNotSet() {
Map<String, String> emptyCustomTags = null;
Map<String, String> expectedEmptyLabels = new HashMap<>();
GcpLogSink mockSink = new GcpLogSink(mockLogging, locationTags, emptyCustomTags, flushLimit);
GcpLogSink mockSink = new GcpLogSink(mockLogging, destProjectName, locationTags,
emptyCustomTags, flushLimit);
mockSink.write(logProto);
ArgumentCaptor<Collection<LogEntry>> logEntrySetCaptor = ArgumentCaptor.forClass(
@ -157,15 +161,37 @@ public class GcpLogSinkTest {
verify(mockLogging, times(1)).write(logEntrySetCaptor.capture());
for (Iterator<LogEntry> it = logEntrySetCaptor.getValue().iterator(); it.hasNext(); ) {
LogEntry entry = it.next();
assertEquals(entry.getLabels(), expectedEmptyLabels);
assertEquals(entry.getPayload().getData(), expectedStructLogProto);
assertThat(entry.getLabels()).isEqualTo(expectedEmptyLabels);
assertThat(entry.getPayload().getData()).isEqualTo(expectedStructLogProto);
}
}
@Test
@SuppressWarnings("unchecked")
public void emptyCustomTags_setSourceProject() {
Map<String, String> emptyCustomTags = null;
String destinationProjectId = "DESTINATION_PROJECT";
Map<String, String> expectedLabels = GcpLogSink.getCustomTags(emptyCustomTags, locationTags,
destinationProjectId);
GcpLogSink mockSink = new GcpLogSink(mockLogging, destinationProjectId, locationTags,
emptyCustomTags, flushLimit);
mockSink.write(logProto);
ArgumentCaptor<Collection<LogEntry>> logEntrySetCaptor = ArgumentCaptor.forClass(
(Class) Collection.class);
verify(mockLogging, times(1)).write(logEntrySetCaptor.capture());
for (Iterator<LogEntry> it = logEntrySetCaptor.getValue().iterator(); it.hasNext(); ) {
LogEntry entry = it.next();
assertThat(entry.getLabels()).isEqualTo(expectedLabels);
assertThat(entry.getPayload().getData()).isEqualTo(expectedStructLogProto);
}
}
@Test
public void verifyFlush() {
long lowerFlushLimit = 2L;
GcpLogSink mockSink = new GcpLogSink(mockLogging, locationTags, customTags, lowerFlushLimit);
GcpLogSink mockSink = new GcpLogSink(mockLogging, destProjectName, locationTags,
customTags, lowerFlushLimit);
mockSink.write(logProto);
verify(mockLogging, never()).flush();
mockSink.write(logProto);
@ -177,7 +203,8 @@ public class GcpLogSinkTest {
@Test
public void verifyClose() throws Exception {
Sink mockSink = new GcpLogSink(mockLogging, locationTags, customTags, flushLimit);
Sink mockSink = new GcpLogSink(mockLogging, destProjectName, locationTags,
customTags, flushLimit);
mockSink.write(logProto);
verify(mockLogging, times(1)).write(anyIterable());
mockSink.close();