support aws eks attributes (#1671)

* support aws eks attributes

* call k8s api by java native sdk

* fix code review

* fix code review

* fix code review
This commit is contained in:
Lei Wang 2020-09-27 12:24:26 -07:00 committed by GitHub
parent 8b7ea5e366
commit 056e63f354
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 446 additions and 0 deletions

View File

@ -16,6 +16,10 @@ dependencies {
libraries.guava, libraries.guava,
platform(boms.guava) platform(boms.guava)
implementation 'com.fasterxml.jackson.core:jackson-databind:2.11.0',
libraries.guava,
platform(boms.guava)
testImplementation 'com.github.tomakehurst:wiremock-jre8:2.26.3' testImplementation 'com.github.tomakehurst:wiremock-jre8:2.26.3'
signature "org.codehaus.mojo.signature:java18:1.0@signature" signature "org.codehaus.mojo.signature:java18:1.0@signature"

View File

@ -0,0 +1,136 @@
/*
* Copyright 2020, OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.opentelemetry.sdk.extensions.trace.aws.resource;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.base.Strings;
import com.google.common.io.Files;
import io.opentelemetry.common.Attributes;
import io.opentelemetry.sdk.resources.ResourceAttributes;
import io.opentelemetry.sdk.resources.ResourceProvider;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
public class EksResource extends ResourceProvider {
static final String K8S_SVC_URL = "https://kubernetes.default.svc";
static final String AUTH_CONFIGMAP_PATH = "/api/v1/namespaces/kube-system/configmaps/aws-auth";
static final String CW_CONFIGMAP_PATH =
"/api/v1/namespaces/amazon-cloudwatch/configmaps/cluster-info";
private static final String K8S_TOKEN_PATH =
"/var/run/secrets/kubernetes.io/serviceaccount/token";
private static final String K8S_CERT_PATH =
"/var/run/secrets/kubernetes.io/serviceaccount/ca.crt";
private static final Logger logger = Logger.getLogger(EksResource.class.getName());
private final JdkHttpClient jdkHttpClient;
private final DockerHelper dockerHelper;
private final String k8sTokenPath;
private final String k8sKeystorePath;
public EksResource() {
this(new JdkHttpClient(), new DockerHelper(), K8S_TOKEN_PATH, K8S_CERT_PATH);
}
@VisibleForTesting
EksResource(
JdkHttpClient jdkHttpClient,
DockerHelper dockerHelper,
String k8sTokenPath,
String k8sKeystorePath) {
this.jdkHttpClient = jdkHttpClient;
this.dockerHelper = dockerHelper;
this.k8sTokenPath = k8sTokenPath;
this.k8sKeystorePath = k8sKeystorePath;
}
@Override
protected Attributes getAttributes() {
if (!isEks()) {
return Attributes.empty();
}
Attributes.Builder attrBuilders = Attributes.newBuilder();
String clusterName = getClusterName();
if (!Strings.isNullOrEmpty(clusterName)) {
attrBuilders.setAttribute(ResourceAttributes.K8S_CLUSTER, clusterName);
}
String containerId = dockerHelper.getContainerId();
if (!Strings.isNullOrEmpty(containerId)) {
attrBuilders.setAttribute(ResourceAttributes.CONTAINER_ID, containerId);
}
return attrBuilders.build();
}
private boolean isEks() {
if (!isK8s()) {
logger.log(Level.FINE, "Not running on k8s.");
return false;
}
Map<String, String> requestProperties = new HashMap<>();
requestProperties.put("Authorization", getK8sCredHeader());
String awsAuth =
jdkHttpClient.fetchString(
"GET", K8S_SVC_URL + AUTH_CONFIGMAP_PATH, requestProperties, K8S_CERT_PATH);
return !Strings.isNullOrEmpty(awsAuth);
}
private boolean isK8s() {
File k8sTokeyFile = new File(this.k8sTokenPath);
File k8sKeystoreFile = new File(this.k8sKeystorePath);
return k8sTokeyFile.exists() && k8sKeystoreFile.exists();
}
private String getClusterName() {
Map<String, String> requestProperties = new HashMap<>();
requestProperties.put("Authorization", getK8sCredHeader());
String json =
jdkHttpClient.fetchString(
"GET", K8S_SVC_URL + CW_CONFIGMAP_PATH, requestProperties, K8S_CERT_PATH);
try {
ObjectMapper mapper = new ObjectMapper();
return mapper.readTree(json).at("/data/cluster.name").asText();
} catch (JsonProcessingException e) {
logger.log(Level.WARNING, "Can't get cluster name on EKS.", e);
}
return "";
}
private static String getK8sCredHeader() {
try {
File file = new File(K8S_TOKEN_PATH);
String content = Files.asCharSource(file, Charsets.UTF_8).read();
return "Bearer " + content;
} catch (IOException e) {
logger.log(Level.WARNING, "Unable to load K8s client token.", e);
}
return "";
}
}

View File

@ -0,0 +1,148 @@
/*
* Copyright 2020, OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.opentelemetry.sdk.extensions.trace.aws.resource;
import com.google.common.io.ByteStreams;
import java.io.ByteArrayOutputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.UnsupportedEncodingException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.security.KeyStore;
import java.security.cert.Certificate;
import java.security.cert.CertificateFactory;
import java.util.Collection;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManagerFactory;
class JdkHttpClient {
private static final Logger logger = Logger.getLogger(JdkHttpClient.class.getName());
private static final int TIMEOUT_MILLIS = 2000;
String fetchString(
String httpMethod, String urlStr, Map<String, String> requestPropertyMap, String certPath) {
final HttpURLConnection connection;
try {
if (urlStr.startsWith("https")) {
connection = (HttpURLConnection) new URL(urlStr).openConnection();
KeyStore keyStore = getKeystoreForTrustedCert(certPath);
if (keyStore != null) {
((HttpsURLConnection) connection).setSSLSocketFactory(buildSslSocketFactory(keyStore));
}
} else {
connection = (HttpURLConnection) new URL(urlStr).openConnection();
}
connection.setRequestMethod(httpMethod);
connection.setConnectTimeout(TIMEOUT_MILLIS);
connection.setReadTimeout(TIMEOUT_MILLIS);
for (Map.Entry<String, String> requestProperty : requestPropertyMap.entrySet()) {
connection.setRequestProperty(requestProperty.getKey(), requestProperty.getValue());
}
int responseCode = connection.getResponseCode();
if (responseCode != 200) {
logger.log(
Level.FINE,
"Error reponse from "
+ urlStr
+ " code ("
+ responseCode
+ ") text "
+ readResponseString(connection));
return "";
}
return readResponseString(connection).trim();
} catch (IOException e) {
logger.log(Level.FINE, "JdkHttpClient fetch string failed.", e);
}
return "";
}
private static String readResponseString(HttpURLConnection connection) {
ByteArrayOutputStream os = new ByteArrayOutputStream();
try (InputStream is = connection.getInputStream()) {
ByteStreams.copy(is, os);
} catch (IOException e) {
// Only best effort read if we can.
}
try (InputStream is = connection.getErrorStream()) {
if (is != null) {
ByteStreams.copy(is, os);
}
} catch (IOException e) {
// Only best effort read if we can.
}
try {
return os.toString(StandardCharsets.UTF_8.name());
} catch (UnsupportedEncodingException e) {
logger.log(Level.WARNING, "UTF-8 not supported can't happen.", e);
}
return "";
}
private static SSLSocketFactory buildSslSocketFactory(KeyStore keyStore) {
try {
String tmfAlgorithm = TrustManagerFactory.getDefaultAlgorithm();
TrustManagerFactory tmf = TrustManagerFactory.getInstance(tmfAlgorithm);
tmf.init(keyStore);
SSLContext context = SSLContext.getInstance("TLS");
context.init(null, tmf.getTrustManagers(), null);
return context.getSocketFactory();
} catch (Exception e) {
logger.log(Level.WARNING, "Build SslSocketFactory for K8s restful client exception.", e);
}
return null;
}
private static KeyStore getKeystoreForTrustedCert(String certPath) {
try (FileInputStream fis = new FileInputStream(certPath)) {
KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
trustStore.load(null, null);
CertificateFactory certificateFactory = CertificateFactory.getInstance("X.509");
Collection<? extends Certificate> certificates = certificateFactory.generateCertificates(fis);
int i = 0;
for (Certificate certificate : certificates) {
trustStore.setCertificateEntry("cert_" + i, certificate);
i++;
}
return trustStore;
} catch (Exception e) {
logger.log(Level.WARNING, "Cannot load KeyStore from " + certPath);
return null;
}
}
}

View File

@ -1,3 +1,4 @@
io.opentelemetry.sdk.extensions.trace.aws.resource.BeanstalkResource io.opentelemetry.sdk.extensions.trace.aws.resource.BeanstalkResource
io.opentelemetry.sdk.extensions.trace.aws.resource.Ec2Resource io.opentelemetry.sdk.extensions.trace.aws.resource.Ec2Resource
io.opentelemetry.sdk.extensions.trace.aws.resource.EcsResource io.opentelemetry.sdk.extensions.trace.aws.resource.EcsResource
io.opentelemetry.sdk.extensions.trace.aws.resource.EksResource

View File

@ -0,0 +1,93 @@
/*
* Copyright 2020, OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.opentelemetry.sdk.extensions.trace.aws.resource;
import static io.opentelemetry.sdk.extensions.trace.aws.resource.EksResource.AUTH_CONFIGMAP_PATH;
import static io.opentelemetry.sdk.extensions.trace.aws.resource.EksResource.CW_CONFIGMAP_PATH;
import static io.opentelemetry.sdk.extensions.trace.aws.resource.EksResource.K8S_SVC_URL;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
import com.google.common.base.Charsets;
import com.google.common.io.Files;
import io.opentelemetry.common.Attributes;
import io.opentelemetry.sdk.resources.ResourceAttributes;
import io.opentelemetry.sdk.resources.ResourceProvider;
import java.io.File;
import java.io.IOException;
import java.util.ServiceLoader;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.jupiter.MockitoExtension;
@ExtendWith(MockitoExtension.class)
public class EksResourceTest {
@Mock private DockerHelper mockDockerHelper;
@Mock private JdkHttpClient jdkHttpClient;
@Test
void testEks(@TempDir File tempFolder) throws IOException {
File mockK8sTokenFile = new File(tempFolder, "k8sToken");
String token = "token123";
Files.write(token.getBytes(Charsets.UTF_8), mockK8sTokenFile);
File mockK8sKeystoreFile = new File(tempFolder, "k8sCert");
String truststore = "truststore123";
Files.write(truststore.getBytes(Charsets.UTF_8), mockK8sKeystoreFile);
when(jdkHttpClient.fetchString(
any(), Mockito.eq(K8S_SVC_URL + AUTH_CONFIGMAP_PATH), any(), any()))
.thenReturn("not empty");
when(jdkHttpClient.fetchString(
any(), Mockito.eq(K8S_SVC_URL + CW_CONFIGMAP_PATH), any(), any()))
.thenReturn("{\"data\":{\"cluster.name\":\"my-cluster\"}}");
when(mockDockerHelper.getContainerId()).thenReturn("0123456789A");
EksResource eksResource =
new EksResource(
jdkHttpClient,
mockDockerHelper,
mockK8sTokenFile.getPath(),
mockK8sKeystoreFile.getPath());
Attributes attributes = eksResource.getAttributes();
assertThat(attributes)
.isEqualTo(
Attributes.of(
ResourceAttributes.K8S_CLUSTER, "my-cluster",
ResourceAttributes.CONTAINER_ID, "0123456789A"));
}
@Test
void testNotEks() {
EksResource eksResource = new EksResource(jdkHttpClient, mockDockerHelper, "", "");
Attributes attributes = eksResource.getAttributes();
assertThat(attributes.isEmpty()).isTrue();
}
@Test
void inServiceLoader() {
// No practical way to test the attributes themselves so at least check the service loader picks
// it up.
assertThat(ServiceLoader.load(ResourceProvider.class)).anyMatch(EksResource.class::isInstance);
}
}

View File

@ -0,0 +1,64 @@
/*
* Copyright 2020, OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.opentelemetry.sdk.extensions.trace.aws.resource;
import static com.github.tomakehurst.wiremock.client.WireMock.any;
import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
import static com.github.tomakehurst.wiremock.client.WireMock.getRequestedFor;
import static com.github.tomakehurst.wiremock.client.WireMock.ok;
import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
import static com.github.tomakehurst.wiremock.client.WireMock.verify;
import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig;
import static org.assertj.core.api.Assertions.assertThat;
import com.github.tomakehurst.wiremock.junit.WireMockClassRule;
import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.Map;
import org.junit.ClassRule;
import org.junit.Test;
public class JdkHttpClientTest {
@ClassRule
public static WireMockClassRule server = new WireMockClassRule(wireMockConfig().dynamicPort());
@Test
public void testFetchString() throws IOException {
stubFor(any(urlPathEqualTo("/path")).willReturn(ok("expected result")));
Map<String, String> requestPropertyMap = ImmutableMap.of("key1", "value1", "key2", "value2");
String urlStr = String.format("http://localhost:%s%s", server.port(), "/path");
JdkHttpClient jdkHttpClient = new JdkHttpClient();
String result = jdkHttpClient.fetchString("GET", urlStr, requestPropertyMap, null);
assertThat(result).isEqualTo("expected result");
verify(getRequestedFor(urlEqualTo("/path")).withHeader("key1", equalTo("value1")));
verify(getRequestedFor(urlEqualTo("/path")).withHeader("key2", equalTo("value2")));
}
@Test
public void testFailedFetchString() {
Map<String, String> requestPropertyMap = ImmutableMap.of("key1", "value1", "key2", "value2");
String urlStr = String.format("http://localhost:%s%s", server.port(), "/path");
JdkHttpClient jdkHttpClient = new JdkHttpClient();
String result = jdkHttpClient.fetchString("GET", urlStr, requestPropertyMap, null);
assertThat(result).isEmpty();
}
}