diff --git a/cassandra/image/Makefile b/cassandra/Makefile similarity index 72% rename from cassandra/image/Makefile rename to cassandra/Makefile index 9d5774ee..98049622 100644 --- a/cassandra/image/Makefile +++ b/cassandra/Makefile @@ -13,24 +13,29 @@ # limitations under the License. # build the cassandra image. -VERSION=v13 +VERSION=v14 PROJECT_ID?=google_samples PROJECT=gcr.io/${PROJECT_ID} CASSANDRA_VERSION=3.11.2 all: kubernetes-cassandra.jar build -kubernetes-cassandra.jar: ../java/* ../java/src/main/java/io/k8s/cassandra/*.java - cd ../java && mvn clean && mvn package - mv ../java/target/kubernetes-cassandra*.jar files/kubernetes-cassandra.jar - cd ../java && mvn clean +build-go: + go build -a -installsuffix cgo \ + -ldflags "-s -w" \ + -o image/files/cassandra-seed.so -buildmode=c-shared go/main.go + +kubernetes-cassandra.jar: + @echo "Building kubernetes-cassandra.jar" + docker run -v ${PWD}/java:/usr/src/app maven:3-jdk-8-onbuild-alpine mvn clean install + cp java/target/kubernetes-cassandra*.jar image/files/kubernetes-cassandra.jar container: @echo "Building ${PROJECT}/cassandra:${VERSION}" - docker build --pull --build-arg "CASSANDRA_VERSION=${CASSANDRA_VERSION}" -t ${PROJECT}/cassandra:${VERSION} . + docker build --pull --build-arg "CASSANDRA_VERSION=${CASSANDRA_VERSION}" -t ${PROJECT}/cassandra:${VERSION} image container-dev: - docker build --pull --build-arg "CASSANDRA_VERSION=${CASSANDRA_VERSION}" --build-arg "DEV_CONTAINER=true" -t ${PROJECT}/cassandra:${VERSION}-dev . + docker build --pull --build-arg "CASSANDRA_VERSION=${CASSANDRA_VERSION}" --build-arg "DEV_CONTAINER=true" -t ${PROJECT}/cassandra:${VERSION}-dev image build: container container-dev diff --git a/cassandra/cassandra-statefulset.yaml b/cassandra/cassandra-statefulset.yaml index 9cdb4611..02f7f7b9 100644 --- a/cassandra/cassandra-statefulset.yaml +++ b/cassandra/cassandra-statefulset.yaml @@ -18,7 +18,7 @@ spec: terminationGracePeriodSeconds: 1800 containers: - name: cassandra - image: gcr.io/google-samples/cassandra:v13 + image: gcr.io/google-samples/cassandra:v14 imagePullPolicy: Always ports: - containerPort: 7000 @@ -60,6 +60,8 @@ spec: value: "DC1-K8Demo" - name: CASSANDRA_RACK value: "Rack1-K8Demo" + - name: CASSANDRA_SEED_PROVIDER + value: io.k8s.cassandra.KubernetesSeedProvider - name: POD_IP valueFrom: fieldRef: diff --git a/cassandra/go/main.go b/cassandra/go/main.go new file mode 100644 index 00000000..a6b7e1a7 --- /dev/null +++ b/cassandra/go/main.go @@ -0,0 +1,82 @@ +package main + +// #include +// #include +import "C" + +import ( + "context" + "encoding/json" + "log" + "strings" + "unicode" + "unsafe" + + "github.com/ericchiang/k8s" + corev1 "github.com/ericchiang/k8s/apis/core/v1" +) + +type endpoints struct { + IPs []string `json:"ips"` +} + +// GetEndpoints searches the endpoints of a service returning a list of IP addresses. +//export GetEndpoints +func GetEndpoints(namespace, service, defSeeds *C.char) *C.char { + ns := C.GoString(namespace) + svc := C.GoString(service) + seeds := C.GoString(defSeeds) + + s := strings.Map(func(r rune) rune { + if unicode.IsSpace(r) { + return -1 + } + return r + }, seeds) + + nseeds := strings.Split(s, ",") + client, err := k8s.NewInClusterClient() + if err != nil { + log.Printf("unexpected error opening a connection against API server: %v\n", err) + log.Printf("returning default seeds: %v\n", nseeds) + return buildEndpoints(nseeds) + } + + ips := make([]string, 0) + + var endpoints corev1.Endpoints + err = client.Get(context.Background(), ns, svc, &endpoints) + if err != nil { + log.Printf("unexpected error obtaining information about service endpoints: %v\n", err) + log.Printf("returning default seeds: %v\n", nseeds) + return buildEndpoints(nseeds) + } + + for _, endpoint := range endpoints.Subsets { + for _, address := range endpoint.Addresses { + ips = append(ips, *address.Ip) + } + } + + if len(ips) == 0 { + return buildEndpoints(nseeds) + } + + return buildEndpoints(ips) +} + +func buildEndpoints(ips []string) *C.char { + b, err := json.Marshal(&endpoints{ips}) + if err != nil { + log.Printf("unexpected error serializing JSON response: %v\n", err) + rc := C.CString(`{"ips":[]}`) + defer C.free(unsafe.Pointer(rc)) + return rc + } + + rc := C.CString(string(b)) + defer C.free(unsafe.Pointer(rc)) + return rc +} + +func main() {} diff --git a/cassandra/image/files/build.sh b/cassandra/image/files/build.sh index 36333b2e..4ad10775 100755 --- a/cassandra/image/files/build.sh +++ b/cassandra/image/files/build.sh @@ -55,6 +55,10 @@ else rm -rf $CASSANDRA_HOME/pylib; fi +mv /kubernetes-cassandra.jar /usr/local/apache-cassandra-${CASSANDRA_VERSION}/lib +mv /cassandra-seed.so /etc/cassandra/ +mv /cassandra-seed.h /usr/local/lib/include + apt-get -y purge localepurge apt-get -y autoremove apt-get clean diff --git a/cassandra/image/files/cassandra-seed.h b/cassandra/image/files/cassandra-seed.h new file mode 100644 index 00000000..83dbdfe3 --- /dev/null +++ b/cassandra/image/files/cassandra-seed.h @@ -0,0 +1,79 @@ +/* Created by "go tool cgo" - DO NOT EDIT. */ + +/* package command-line-arguments */ + + +#line 1 "cgo-builtin-prolog" + +#include /* for ptrdiff_t below */ + +#ifndef GO_CGO_EXPORT_PROLOGUE_H +#define GO_CGO_EXPORT_PROLOGUE_H + +typedef struct { const char *p; ptrdiff_t n; } _GoString_; + +#endif + +/* Start of preamble from import "C" comments. */ + + +#line 3 "/home/aledbf/go/src/k8s.io/examples/cassandra/go/main.go" + #include + #include + +#line 1 "cgo-generated-wrapper" + + +/* End of preamble from import "C" comments. */ + + +/* Start of boilerplate cgo prologue. */ +#line 1 "cgo-gcc-export-header-prolog" + +#ifndef GO_CGO_PROLOGUE_H +#define GO_CGO_PROLOGUE_H + +typedef signed char GoInt8; +typedef unsigned char GoUint8; +typedef short GoInt16; +typedef unsigned short GoUint16; +typedef int GoInt32; +typedef unsigned int GoUint32; +typedef long long GoInt64; +typedef unsigned long long GoUint64; +typedef GoInt64 GoInt; +typedef GoUint64 GoUint; +typedef __SIZE_TYPE__ GoUintptr; +typedef float GoFloat32; +typedef double GoFloat64; +typedef float _Complex GoComplex64; +typedef double _Complex GoComplex128; + +/* + static assertion to make sure the file is being used on architecture + at least with matching size of GoInt. +*/ +typedef char _check_for_64_bit_pointer_matching_GoInt[sizeof(void*)==64/8 ? 1:-1]; + +typedef _GoString_ GoString; +typedef void *GoMap; +typedef void *GoChan; +typedef struct { void *t; void *v; } GoInterface; +typedef struct { void *data; GoInt len; GoInt cap; } GoSlice; + +#endif + +/* End of boilerplate cgo prologue. */ + +#ifdef __cplusplus +extern "C" { +#endif + + +// GetEndpoints searches the endpoints of a service returning a list of IP addresses. + +extern char* GetEndpoints(char* p0, char* p1, char* p2); + +#ifdef __cplusplus +} +#endif diff --git a/cassandra/image/files/kubernetes-cassandra.jar b/cassandra/image/files/kubernetes-cassandra.jar index ed262197..4f7db786 100644 Binary files a/cassandra/image/files/kubernetes-cassandra.jar and b/cassandra/image/files/kubernetes-cassandra.jar differ diff --git a/cassandra/java/pom.xml b/cassandra/java/pom.xml index f5478203..fcf20941 100644 --- a/cassandra/java/pom.xml +++ b/cassandra/java/pom.xml @@ -17,12 +17,12 @@ 4.0.0 io.k8s.cassandra kubernetes-cassandra - 1.0.2 + 1.0.3 maven-compiler-plugin - 3.5.1 + 3.5.1 1.8 1.8 @@ -33,7 +33,7 @@ 1.1.3 - 3.9 + 3.11.2 @@ -61,7 +61,6 @@ ${logback.version} provided - ch.qos.logback logback-core @@ -72,14 +71,13 @@ org.codehaus.jackson jackson-core-asl - 1.6.3 + 1.9.13 provided - org.codehaus.jackson jackson-mapper-asl - 1.6.3 + 1.9.13 provided diff --git a/cassandra/java/src/main/java/io/k8s/cassandra/GoInterface.java b/cassandra/java/src/main/java/io/k8s/cassandra/GoInterface.java new file mode 100644 index 00000000..17d9b4e0 --- /dev/null +++ b/cassandra/java/src/main/java/io/k8s/cassandra/GoInterface.java @@ -0,0 +1,54 @@ + +/* + * Copyright (C) 2018 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.k8s.cassandra; + +import java.util.Arrays; +import java.util.List; + +import com.sun.jna.Library; +import com.sun.jna.Pointer; +import com.sun.jna.Structure; + +public interface GoInterface extends Library { + public String GetEndpoints(String namespace, String service, String seeds); + + public class GoSlice extends Structure { + public static class ByValue extends GoSlice implements Structure.ByValue { + } + + public Pointer data; + public long len; + public long cap; + + protected List getFieldOrder() { + return Arrays.asList(new String[] { "data", "len", "cap" }); + } + } + + public class GoString extends Structure { + public static class ByValue extends GoString implements Structure.ByValue { + } + + public String p; + public long n; + + protected List getFieldOrder() { + return Arrays.asList(new String[] { "p", "n" }); + } + } +} \ No newline at end of file diff --git a/cassandra/java/src/main/java/io/k8s/cassandra/KubernetesSeedProvider.java b/cassandra/java/src/main/java/io/k8s/cassandra/KubernetesSeedProvider.java index a8940d3a..b0a94f0e 100644 --- a/cassandra/java/src/main/java/io/k8s/cassandra/KubernetesSeedProvider.java +++ b/cassandra/java/src/main/java/io/k8s/cassandra/KubernetesSeedProvider.java @@ -1,3 +1,4 @@ + /* * Copyright (C) 2015 Google Inc. * @@ -16,237 +17,87 @@ package io.k8s.cassandra; -import org.apache.cassandra.config.Config; -import org.apache.cassandra.config.ConfigurationLoader; -import org.apache.cassandra.config.YamlConfigurationLoader; -import org.apache.cassandra.exceptions.ConfigurationException; +import java.io.IOException; +import java.net.InetAddress; +import java.util.Collections; +import java.util.List; +import java.util.Map; + import org.apache.cassandra.locator.SeedProvider; -import org.apache.cassandra.locator.SimpleSeedProvider; -import org.apache.cassandra.utils.FBUtilities; import org.codehaus.jackson.annotate.JsonIgnoreProperties; import org.codehaus.jackson.map.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.net.ssl.*; -import java.io.IOException; -import java.net.InetAddress; -import java.net.URL; -import java.net.UnknownHostException; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.security.SecureRandom; -import java.security.cert.X509Certificate; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; +import com.sun.jna.Native; /** * Self discovery {@link SeedProvider} that creates a list of Cassandra Seeds by * communicating with the Kubernetes API. - *

Various System Variable can be used to configure this provider: + *

+ * Various System Variable can be used to configure this provider: *

    - *
  • KUBERNETES_PORT_443_TCP_ADDR defaults to kubernetes.default.svc.cluster.local
  • - *
  • KUBERNETES_PORT_443_TCP_PORT defaults to 443
  • - *
  • CASSANDRA_SERVICE defaults to cassandra
  • - *
  • POD_NAMESPACE defaults to 'default'
  • - *
  • CASSANDRA_SERVICE_NUM_SEEDS defaults to 8 seeds
  • - *
  • K8S_ACCOUNT_TOKEN defaults to the path for the default token
  • + *
  • CASSANDRA_SERVICE defaults to cassandra
  • + *
  • POD_NAMESPACE defaults to 'default'
  • + *
  • CASSANDRA_SERVICE_NUM_SEEDS defaults to 8 seeds
  • *
*/ public class KubernetesSeedProvider implements SeedProvider { - private static final Logger logger = LoggerFactory.getLogger(KubernetesSeedProvider.class); + private static final Logger logger = LoggerFactory.getLogger(KubernetesSeedProvider.class); - /** - * default seeds to fall back on - */ - private List defaultSeeds; - private TrustManager[] trustAll; + /** + * Create new seed provider + * + * @param params + */ + public KubernetesSeedProvider(Map params) { + } - private HostnameVerifier trustAllHosts; + /** + * Call Kubernetes API to collect a list of seed providers + * + * @return list of seed providers + */ + public List getSeeds() { + GoInterface go = (GoInterface) Native.loadLibrary("cassandra-seed.so", GoInterface.class); - /** - * Create new Seeds - * @param params - */ - public KubernetesSeedProvider(Map params) { + String service = getEnvOrDefault("CASSANDRA_SERVICE", "cassandra"); + String namespace = getEnvOrDefault("POD_NAMESPACE", "default"); - // Create default seeds - defaultSeeds = createDefaultSeeds(); + String initialSeeds = getEnvOrDefault("CASSANDRA_SEEDS", ""); + if (initialSeeds.equals("")) { + initialSeeds = getEnvOrDefault("POD_IP", ""); + } - // TODO: Load the CA cert when it is available on all platforms. - trustAll = new TrustManager[] { - new X509TrustManager() { - public void checkServerTrusted(X509Certificate[] certs, String authType) {} - public void checkClientTrusted(X509Certificate[] certs, String authType) {} - public X509Certificate[] getAcceptedIssuers() { return null; } - } - }; + String seedSizeVar = getEnvOrDefault("CASSANDRA_SERVICE_NUM_SEEDS", "8"); + Integer seedSize = Integer.valueOf(seedSizeVar); - trustAllHosts = new HostnameVerifier() { - public boolean verify(String hostname, SSLSession session) { - return true; - } - }; - } + String data = go.GetEndpoints(namespace, service, initialSeeds); + ObjectMapper mapper = new ObjectMapper(); - /** - * Call kubernetes API to collect a list of seed providers - * @return list of seed providers - */ - public List getSeeds() { + try { + Endpoints endpoints = mapper.readValue(data, Endpoints.class); + logger.info("cassandra seeds: " + endpoints.ips.toString()); + return Collections.unmodifiableList(endpoints.ips); + } catch (IOException e) { + // This should not happen + logger.error("unexpected error building cassandra seeds: " + e.getMessage()); + return Collections.emptyList(); + } + } - String host = getEnvOrDefault("KUBERNETES_PORT_443_TCP_ADDR", "kubernetes.default.svc.cluster.local"); - String port = getEnvOrDefault("KUBERNETES_PORT_443_TCP_PORT", "443"); - String serviceName = getEnvOrDefault("CASSANDRA_SERVICE", "cassandra"); - String podNamespace = getEnvOrDefault("POD_NAMESPACE", "default"); - String path = String.format("/api/v1/namespaces/%s/endpoints/", podNamespace); - String seedSizeVar = getEnvOrDefault("CASSANDRA_SERVICE_NUM_SEEDS", "8"); - Integer seedSize = Integer.valueOf(seedSizeVar); - String accountToken = getEnvOrDefault("K8S_ACCOUNT_TOKEN", "/var/run/secrets/kubernetes.io/serviceaccount/token"); + private static String getEnvOrDefault(String var, String def) { + String val = System.getenv(var); + if (val == null) { + val = def; + } + return val; + } - List seeds = new ArrayList(); - try { - String token = getServiceAccountToken(accountToken); - - SSLContext ctx = SSLContext.getInstance("SSL"); - ctx.init(null, trustAll, new SecureRandom()); - - String PROTO = "https://"; - URL url = new URL(PROTO + host + ":" + port + path + serviceName); - logger.info("Getting endpoints from " + url); - HttpsURLConnection conn = (HttpsURLConnection)url.openConnection(); - - // TODO: Remove this once the CA cert is propagated everywhere, and replace - // with loading the CA cert. - conn.setHostnameVerifier(trustAllHosts); - - conn.setSSLSocketFactory(ctx.getSocketFactory()); - conn.addRequestProperty("Authorization", "Bearer " + token); - ObjectMapper mapper = new ObjectMapper(); - Endpoints endpoints = mapper.readValue(conn.getInputStream(), Endpoints.class); - - if (endpoints != null) { - // Here is a problem point, endpoints.subsets can be null in first node cases. - if (endpoints.subsets != null && !endpoints.subsets.isEmpty()){ - for (Subset subset : endpoints.subsets) { - if (subset.addresses != null && !subset.addresses.isEmpty()) { - for (Address address : subset.addresses) { - seeds.add(InetAddress.getByName(address.ip)); - - if(seeds.size() >= seedSize) { - logger.info("Available num endpoints: " + seeds.size()); - return Collections.unmodifiableList(seeds); - } - } - } - } - } - logger.info("Available num endpoints: " + seeds.size()); - } else { - logger.warn("Endpoints are not available using default seeds in cassandra.yaml"); - return Collections.unmodifiableList(defaultSeeds); - } - } catch (Exception ex) { - logger.warn("Request to kubernetes apiserver failed, using default seeds in cassandra.yaml", ex); - return Collections.unmodifiableList(defaultSeeds); - } - - if (seeds.size() == 0) { - // If we got nothing, we might be the first instance, in that case - // fall back on the seeds that were passed in cassandra.yaml. - logger.warn("Seeds are not available using default seeds in cassandra.yaml"); - return Collections.unmodifiableList(defaultSeeds); - } - - return Collections.unmodifiableList(seeds); - } - - /** - * Code taken from {@link SimpleSeedProvider}. This is used as a fall back - * incase we don't find seeds - * @return - */ - protected List createDefaultSeeds() - { - Config conf; - try { - conf = loadConfig(); - } - catch (Exception e) { - throw new AssertionError(e); - } - String[] hosts = conf.seed_provider.parameters.get("seeds").split(",", -1); - List seeds = new ArrayList(); - for (String host : hosts) { - try { - seeds.add(InetAddress.getByName(host.trim())); - } - catch (UnknownHostException ex) { - // not fatal... DD will bark if there end up being zero seeds. - logger.warn("Seed provider couldn't lookup host {}", host); - } - } - - if(seeds.size() == 0) { - try { - seeds.add(InetAddress.getLocalHost()); - } catch (UnknownHostException e) { - logger.warn("Seed provider couldn't lookup localhost"); - } - } - return Collections.unmodifiableList(seeds); - } - - /** - * Code taken from {@link SimpleSeedProvider} - * @return - */ - protected static Config loadConfig() throws ConfigurationException - { - String loaderClass = System.getProperty("cassandra.config.loader"); - ConfigurationLoader loader = loaderClass == null - ? new YamlConfigurationLoader() - : FBUtilities.construct(loaderClass, "configuration loading"); - return loader.loadConfig(); - } - - private static String getEnvOrDefault(String var, String def) { - String val = System.getenv(var); - if (val == null) { - val = def; - } - return val; - } - - private static String getServiceAccountToken(String file) { - try { - return new String(Files.readAllBytes(Paths.get(file))); - } catch (IOException e) { - logger.warn("unable to load service account token" + file); - throw new RuntimeException("Unable to load services account token " + file); - } - } - - protected List getDefaultSeeds() { - return defaultSeeds; - } - - @JsonIgnoreProperties(ignoreUnknown = true) - static class Address { - public String ip; - } - - @JsonIgnoreProperties(ignoreUnknown = true) - static class Subset { - public List
addresses; - } - - @JsonIgnoreProperties(ignoreUnknown = true) - static class Endpoints { - public List subsets; - } + @JsonIgnoreProperties(ignoreUnknown = true) + static class Endpoints { + public List ips; + } } diff --git a/cassandra/java/src/test/java/io/k8s/cassandra/KubernetesSeedProviderTest.java b/cassandra/java/src/test/java/io/k8s/cassandra/KubernetesSeedProviderTest.java index e4a4ab86..2ef3c313 100644 --- a/cassandra/java/src/test/java/io/k8s/cassandra/KubernetesSeedProviderTest.java +++ b/cassandra/java/src/test/java/io/k8s/cassandra/KubernetesSeedProviderTest.java @@ -16,48 +16,32 @@ package io.k8s.cassandra; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertThat; + +import java.net.InetAddress; +import java.util.HashMap; +import java.util.List; + import org.apache.cassandra.locator.SeedProvider; import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.hamcrest.Matchers.*; - -import java.net.InetAddress; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; - -import static org.junit.Assert.*; - public class KubernetesSeedProviderTest { - private static final Logger logger = LoggerFactory.getLogger(KubernetesSeedProviderTest.class); + private static final Logger logger = LoggerFactory.getLogger(KubernetesSeedProviderTest.class); - @Test - @Ignore("has to be run inside of a kube cluster") - public void getSeeds() throws Exception { - SeedProvider provider = new KubernetesSeedProvider(new HashMap()); - List seeds = provider.getSeeds(); - - assertThat(seeds, is(not(empty()))); - - } - - @Test - public void testDefaultSeeds() throws Exception { - - KubernetesSeedProvider provider = new KubernetesSeedProvider(new HashMap()); - List seeds = provider.getDefaultSeeds(); - List seedsTest = new ArrayList<>(); - seedsTest.add(InetAddress.getByName("8.4.4.4")); - seedsTest.add(InetAddress.getByName("8.8.8.8")); - assertThat(seeds, is(not(empty()))); - assertThat(seeds, is(seedsTest)); - logger.debug("seeds loaded {}", seeds); - - } + @Test + @Ignore("has to be run inside of a kube cluster") + public void getSeeds() throws Exception { + SeedProvider provider = new KubernetesSeedProvider(new HashMap()); + List seeds = provider.getSeeds(); + assertThat(seeds, is(not(empty()))); + } } \ No newline at end of file