Use docker to build cassandra jar file
This commit is contained in:
parent
2845293915
commit
cabf8b8e47
|
@ -13,24 +13,29 @@
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
# build the cassandra image.
|
# build the cassandra image.
|
||||||
VERSION=v13
|
VERSION=v14
|
||||||
PROJECT_ID?=google_samples
|
PROJECT_ID?=google_samples
|
||||||
PROJECT=gcr.io/${PROJECT_ID}
|
PROJECT=gcr.io/${PROJECT_ID}
|
||||||
CASSANDRA_VERSION=3.11.2
|
CASSANDRA_VERSION=3.11.2
|
||||||
|
|
||||||
all: kubernetes-cassandra.jar build
|
all: kubernetes-cassandra.jar build
|
||||||
|
|
||||||
kubernetes-cassandra.jar: ../java/* ../java/src/main/java/io/k8s/cassandra/*.java
|
build-go:
|
||||||
cd ../java && mvn clean && mvn package
|
go build -a -installsuffix cgo \
|
||||||
mv ../java/target/kubernetes-cassandra*.jar files/kubernetes-cassandra.jar
|
-ldflags "-s -w" \
|
||||||
cd ../java && mvn clean
|
-o image/files/cassandra-seed.so -buildmode=c-shared go/main.go
|
||||||
|
|
||||||
|
kubernetes-cassandra.jar:
|
||||||
|
@echo "Building kubernetes-cassandra.jar"
|
||||||
|
docker run -v ${PWD}/java:/usr/src/app maven:3-jdk-8-onbuild-alpine mvn clean install
|
||||||
|
cp java/target/kubernetes-cassandra*.jar image/files/kubernetes-cassandra.jar
|
||||||
|
|
||||||
container:
|
container:
|
||||||
@echo "Building ${PROJECT}/cassandra:${VERSION}"
|
@echo "Building ${PROJECT}/cassandra:${VERSION}"
|
||||||
docker build --pull --build-arg "CASSANDRA_VERSION=${CASSANDRA_VERSION}" -t ${PROJECT}/cassandra:${VERSION} .
|
docker build --pull --build-arg "CASSANDRA_VERSION=${CASSANDRA_VERSION}" -t ${PROJECT}/cassandra:${VERSION} image
|
||||||
|
|
||||||
container-dev:
|
container-dev:
|
||||||
docker build --pull --build-arg "CASSANDRA_VERSION=${CASSANDRA_VERSION}" --build-arg "DEV_CONTAINER=true" -t ${PROJECT}/cassandra:${VERSION}-dev .
|
docker build --pull --build-arg "CASSANDRA_VERSION=${CASSANDRA_VERSION}" --build-arg "DEV_CONTAINER=true" -t ${PROJECT}/cassandra:${VERSION}-dev image
|
||||||
|
|
||||||
build: container container-dev
|
build: container container-dev
|
||||||
|
|
|
@ -18,7 +18,7 @@ spec:
|
||||||
terminationGracePeriodSeconds: 1800
|
terminationGracePeriodSeconds: 1800
|
||||||
containers:
|
containers:
|
||||||
- name: cassandra
|
- name: cassandra
|
||||||
image: gcr.io/google-samples/cassandra:v13
|
image: gcr.io/google-samples/cassandra:v14
|
||||||
imagePullPolicy: Always
|
imagePullPolicy: Always
|
||||||
ports:
|
ports:
|
||||||
- containerPort: 7000
|
- containerPort: 7000
|
||||||
|
@ -60,6 +60,8 @@ spec:
|
||||||
value: "DC1-K8Demo"
|
value: "DC1-K8Demo"
|
||||||
- name: CASSANDRA_RACK
|
- name: CASSANDRA_RACK
|
||||||
value: "Rack1-K8Demo"
|
value: "Rack1-K8Demo"
|
||||||
|
- name: CASSANDRA_SEED_PROVIDER
|
||||||
|
value: io.k8s.cassandra.KubernetesSeedProvider
|
||||||
- name: POD_IP
|
- name: POD_IP
|
||||||
valueFrom:
|
valueFrom:
|
||||||
fieldRef:
|
fieldRef:
|
||||||
|
|
|
@ -0,0 +1,82 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
// #include <stdio.h>
|
||||||
|
// #include <stdlib.h>
|
||||||
|
import "C"
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"log"
|
||||||
|
"strings"
|
||||||
|
"unicode"
|
||||||
|
"unsafe"
|
||||||
|
|
||||||
|
"github.com/ericchiang/k8s"
|
||||||
|
corev1 "github.com/ericchiang/k8s/apis/core/v1"
|
||||||
|
)
|
||||||
|
|
||||||
|
type endpoints struct {
|
||||||
|
IPs []string `json:"ips"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetEndpoints searches the endpoints of a service returning a list of IP addresses.
|
||||||
|
//export GetEndpoints
|
||||||
|
func GetEndpoints(namespace, service, defSeeds *C.char) *C.char {
|
||||||
|
ns := C.GoString(namespace)
|
||||||
|
svc := C.GoString(service)
|
||||||
|
seeds := C.GoString(defSeeds)
|
||||||
|
|
||||||
|
s := strings.Map(func(r rune) rune {
|
||||||
|
if unicode.IsSpace(r) {
|
||||||
|
return -1
|
||||||
|
}
|
||||||
|
return r
|
||||||
|
}, seeds)
|
||||||
|
|
||||||
|
nseeds := strings.Split(s, ",")
|
||||||
|
client, err := k8s.NewInClusterClient()
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("unexpected error opening a connection against API server: %v\n", err)
|
||||||
|
log.Printf("returning default seeds: %v\n", nseeds)
|
||||||
|
return buildEndpoints(nseeds)
|
||||||
|
}
|
||||||
|
|
||||||
|
ips := make([]string, 0)
|
||||||
|
|
||||||
|
var endpoints corev1.Endpoints
|
||||||
|
err = client.Get(context.Background(), ns, svc, &endpoints)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("unexpected error obtaining information about service endpoints: %v\n", err)
|
||||||
|
log.Printf("returning default seeds: %v\n", nseeds)
|
||||||
|
return buildEndpoints(nseeds)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, endpoint := range endpoints.Subsets {
|
||||||
|
for _, address := range endpoint.Addresses {
|
||||||
|
ips = append(ips, *address.Ip)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(ips) == 0 {
|
||||||
|
return buildEndpoints(nseeds)
|
||||||
|
}
|
||||||
|
|
||||||
|
return buildEndpoints(ips)
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildEndpoints(ips []string) *C.char {
|
||||||
|
b, err := json.Marshal(&endpoints{ips})
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("unexpected error serializing JSON response: %v\n", err)
|
||||||
|
rc := C.CString(`{"ips":[]}`)
|
||||||
|
defer C.free(unsafe.Pointer(rc))
|
||||||
|
return rc
|
||||||
|
}
|
||||||
|
|
||||||
|
rc := C.CString(string(b))
|
||||||
|
defer C.free(unsafe.Pointer(rc))
|
||||||
|
return rc
|
||||||
|
}
|
||||||
|
|
||||||
|
func main() {}
|
|
@ -55,6 +55,10 @@ else
|
||||||
rm -rf $CASSANDRA_HOME/pylib;
|
rm -rf $CASSANDRA_HOME/pylib;
|
||||||
fi
|
fi
|
||||||
|
|
||||||
|
mv /kubernetes-cassandra.jar /usr/local/apache-cassandra-${CASSANDRA_VERSION}/lib
|
||||||
|
mv /cassandra-seed.so /etc/cassandra/
|
||||||
|
mv /cassandra-seed.h /usr/local/lib/include
|
||||||
|
|
||||||
apt-get -y purge localepurge
|
apt-get -y purge localepurge
|
||||||
apt-get -y autoremove
|
apt-get -y autoremove
|
||||||
apt-get clean
|
apt-get clean
|
||||||
|
|
|
@ -0,0 +1,79 @@
|
||||||
|
/* Created by "go tool cgo" - DO NOT EDIT. */
|
||||||
|
|
||||||
|
/* package command-line-arguments */
|
||||||
|
|
||||||
|
|
||||||
|
#line 1 "cgo-builtin-prolog"
|
||||||
|
|
||||||
|
#include <stddef.h> /* for ptrdiff_t below */
|
||||||
|
|
||||||
|
#ifndef GO_CGO_EXPORT_PROLOGUE_H
|
||||||
|
#define GO_CGO_EXPORT_PROLOGUE_H
|
||||||
|
|
||||||
|
typedef struct { const char *p; ptrdiff_t n; } _GoString_;
|
||||||
|
|
||||||
|
#endif
|
||||||
|
|
||||||
|
/* Start of preamble from import "C" comments. */
|
||||||
|
|
||||||
|
|
||||||
|
#line 3 "/home/aledbf/go/src/k8s.io/examples/cassandra/go/main.go"
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <stdlib.h>
|
||||||
|
|
||||||
|
#line 1 "cgo-generated-wrapper"
|
||||||
|
|
||||||
|
|
||||||
|
/* End of preamble from import "C" comments. */
|
||||||
|
|
||||||
|
|
||||||
|
/* Start of boilerplate cgo prologue. */
|
||||||
|
#line 1 "cgo-gcc-export-header-prolog"
|
||||||
|
|
||||||
|
#ifndef GO_CGO_PROLOGUE_H
|
||||||
|
#define GO_CGO_PROLOGUE_H
|
||||||
|
|
||||||
|
typedef signed char GoInt8;
|
||||||
|
typedef unsigned char GoUint8;
|
||||||
|
typedef short GoInt16;
|
||||||
|
typedef unsigned short GoUint16;
|
||||||
|
typedef int GoInt32;
|
||||||
|
typedef unsigned int GoUint32;
|
||||||
|
typedef long long GoInt64;
|
||||||
|
typedef unsigned long long GoUint64;
|
||||||
|
typedef GoInt64 GoInt;
|
||||||
|
typedef GoUint64 GoUint;
|
||||||
|
typedef __SIZE_TYPE__ GoUintptr;
|
||||||
|
typedef float GoFloat32;
|
||||||
|
typedef double GoFloat64;
|
||||||
|
typedef float _Complex GoComplex64;
|
||||||
|
typedef double _Complex GoComplex128;
|
||||||
|
|
||||||
|
/*
|
||||||
|
static assertion to make sure the file is being used on architecture
|
||||||
|
at least with matching size of GoInt.
|
||||||
|
*/
|
||||||
|
typedef char _check_for_64_bit_pointer_matching_GoInt[sizeof(void*)==64/8 ? 1:-1];
|
||||||
|
|
||||||
|
typedef _GoString_ GoString;
|
||||||
|
typedef void *GoMap;
|
||||||
|
typedef void *GoChan;
|
||||||
|
typedef struct { void *t; void *v; } GoInterface;
|
||||||
|
typedef struct { void *data; GoInt len; GoInt cap; } GoSlice;
|
||||||
|
|
||||||
|
#endif
|
||||||
|
|
||||||
|
/* End of boilerplate cgo prologue. */
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
|
||||||
|
// GetEndpoints searches the endpoints of a service returning a list of IP addresses.
|
||||||
|
|
||||||
|
extern char* GetEndpoints(char* p0, char* p1, char* p2);
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
Binary file not shown.
|
@ -17,12 +17,12 @@
|
||||||
<modelVersion>4.0.0</modelVersion>
|
<modelVersion>4.0.0</modelVersion>
|
||||||
<groupId>io.k8s.cassandra</groupId>
|
<groupId>io.k8s.cassandra</groupId>
|
||||||
<artifactId>kubernetes-cassandra</artifactId>
|
<artifactId>kubernetes-cassandra</artifactId>
|
||||||
<version>1.0.2</version>
|
<version>1.0.3</version>
|
||||||
<build>
|
<build>
|
||||||
<plugins>
|
<plugins>
|
||||||
<plugin>
|
<plugin>
|
||||||
<artifactId>maven-compiler-plugin</artifactId>
|
<artifactId>maven-compiler-plugin</artifactId>
|
||||||
<version>3.5.1</version>
|
<version>3.5.1</version>
|
||||||
<configuration>
|
<configuration>
|
||||||
<source>1.8</source>
|
<source>1.8</source>
|
||||||
<target>1.8</target>
|
<target>1.8</target>
|
||||||
|
@ -33,7 +33,7 @@
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
<logback.version>1.1.3</logback.version>
|
<logback.version>1.1.3</logback.version>
|
||||||
<cassandra.version>3.9</cassandra.version>
|
<cassandra.version>3.11.2</cassandra.version>
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
|
@ -61,7 +61,6 @@
|
||||||
<version>${logback.version}</version>
|
<version>${logback.version}</version>
|
||||||
<scope>provided</scope>
|
<scope>provided</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>ch.qos.logback</groupId>
|
<groupId>ch.qos.logback</groupId>
|
||||||
<artifactId>logback-core</artifactId>
|
<artifactId>logback-core</artifactId>
|
||||||
|
@ -72,14 +71,13 @@
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.codehaus.jackson</groupId>
|
<groupId>org.codehaus.jackson</groupId>
|
||||||
<artifactId>jackson-core-asl</artifactId>
|
<artifactId>jackson-core-asl</artifactId>
|
||||||
<version>1.6.3</version>
|
<version>1.9.13</version>
|
||||||
<scope>provided</scope>
|
<scope>provided</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>org.codehaus.jackson</groupId>
|
<groupId>org.codehaus.jackson</groupId>
|
||||||
<artifactId>jackson-mapper-asl</artifactId>
|
<artifactId>jackson-mapper-asl</artifactId>
|
||||||
<version>1.6.3</version>
|
<version>1.9.13</version>
|
||||||
<scope>provided</scope>
|
<scope>provided</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,54 @@
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2018 Google Inc.
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
|
||||||
|
* use this file except in compliance with the License. You may obtain a copy of
|
||||||
|
* the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
* License for the specific language governing permissions and limitations under
|
||||||
|
* the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package io.k8s.cassandra;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import com.sun.jna.Library;
|
||||||
|
import com.sun.jna.Pointer;
|
||||||
|
import com.sun.jna.Structure;
|
||||||
|
|
||||||
|
public interface GoInterface extends Library {
|
||||||
|
public String GetEndpoints(String namespace, String service, String seeds);
|
||||||
|
|
||||||
|
public class GoSlice extends Structure {
|
||||||
|
public static class ByValue extends GoSlice implements Structure.ByValue {
|
||||||
|
}
|
||||||
|
|
||||||
|
public Pointer data;
|
||||||
|
public long len;
|
||||||
|
public long cap;
|
||||||
|
|
||||||
|
protected List<String> getFieldOrder() {
|
||||||
|
return Arrays.asList(new String[] { "data", "len", "cap" });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public class GoString extends Structure {
|
||||||
|
public static class ByValue extends GoString implements Structure.ByValue {
|
||||||
|
}
|
||||||
|
|
||||||
|
public String p;
|
||||||
|
public long n;
|
||||||
|
|
||||||
|
protected List<String> getFieldOrder() {
|
||||||
|
return Arrays.asList(new String[] { "p", "n" });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,3 +1,4 @@
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Copyright (C) 2015 Google Inc.
|
* Copyright (C) 2015 Google Inc.
|
||||||
*
|
*
|
||||||
|
@ -16,237 +17,87 @@
|
||||||
|
|
||||||
package io.k8s.cassandra;
|
package io.k8s.cassandra;
|
||||||
|
|
||||||
import org.apache.cassandra.config.Config;
|
import java.io.IOException;
|
||||||
import org.apache.cassandra.config.ConfigurationLoader;
|
import java.net.InetAddress;
|
||||||
import org.apache.cassandra.config.YamlConfigurationLoader;
|
import java.util.Collections;
|
||||||
import org.apache.cassandra.exceptions.ConfigurationException;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.cassandra.locator.SeedProvider;
|
import org.apache.cassandra.locator.SeedProvider;
|
||||||
import org.apache.cassandra.locator.SimpleSeedProvider;
|
|
||||||
import org.apache.cassandra.utils.FBUtilities;
|
|
||||||
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
|
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
|
||||||
import org.codehaus.jackson.map.ObjectMapper;
|
import org.codehaus.jackson.map.ObjectMapper;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import javax.net.ssl.*;
|
import com.sun.jna.Native;
|
||||||
import java.io.IOException;
|
|
||||||
import java.net.InetAddress;
|
|
||||||
import java.net.URL;
|
|
||||||
import java.net.UnknownHostException;
|
|
||||||
import java.nio.file.Files;
|
|
||||||
import java.nio.file.Paths;
|
|
||||||
import java.security.SecureRandom;
|
|
||||||
import java.security.cert.X509Certificate;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Self discovery {@link SeedProvider} that creates a list of Cassandra Seeds by
|
* Self discovery {@link SeedProvider} that creates a list of Cassandra Seeds by
|
||||||
* communicating with the Kubernetes API.
|
* communicating with the Kubernetes API.
|
||||||
* <p>Various System Variable can be used to configure this provider:
|
* <p>
|
||||||
|
* Various System Variable can be used to configure this provider:
|
||||||
* <ul>
|
* <ul>
|
||||||
* <li>KUBERNETES_PORT_443_TCP_ADDR defaults to kubernetes.default.svc.cluster.local</li>
|
* <li>CASSANDRA_SERVICE defaults to cassandra</li>
|
||||||
* <li>KUBERNETES_PORT_443_TCP_PORT defaults to 443</li>
|
* <li>POD_NAMESPACE defaults to 'default'</li>
|
||||||
* <li>CASSANDRA_SERVICE defaults to cassandra</li>
|
* <li>CASSANDRA_SERVICE_NUM_SEEDS defaults to 8 seeds</li>
|
||||||
* <li>POD_NAMESPACE defaults to 'default'</li>
|
|
||||||
* <li>CASSANDRA_SERVICE_NUM_SEEDS defaults to 8 seeds</li>
|
|
||||||
* <li>K8S_ACCOUNT_TOKEN defaults to the path for the default token</li>
|
|
||||||
* </ul>
|
* </ul>
|
||||||
*/
|
*/
|
||||||
public class KubernetesSeedProvider implements SeedProvider {
|
public class KubernetesSeedProvider implements SeedProvider {
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(KubernetesSeedProvider.class);
|
private static final Logger logger = LoggerFactory.getLogger(KubernetesSeedProvider.class);
|
||||||
|
|
||||||
/**
|
|
||||||
* default seeds to fall back on
|
|
||||||
*/
|
|
||||||
private List<InetAddress> defaultSeeds;
|
|
||||||
|
|
||||||
private TrustManager[] trustAll;
|
/**
|
||||||
|
* Create new seed provider
|
||||||
|
*
|
||||||
|
* @param params
|
||||||
|
*/
|
||||||
|
public KubernetesSeedProvider(Map<String, String> params) {
|
||||||
|
}
|
||||||
|
|
||||||
private HostnameVerifier trustAllHosts;
|
/**
|
||||||
|
* Call Kubernetes API to collect a list of seed providers
|
||||||
|
*
|
||||||
|
* @return list of seed providers
|
||||||
|
*/
|
||||||
|
public List<InetAddress> getSeeds() {
|
||||||
|
GoInterface go = (GoInterface) Native.loadLibrary("cassandra-seed.so", GoInterface.class);
|
||||||
|
|
||||||
/**
|
String service = getEnvOrDefault("CASSANDRA_SERVICE", "cassandra");
|
||||||
* Create new Seeds
|
String namespace = getEnvOrDefault("POD_NAMESPACE", "default");
|
||||||
* @param params
|
|
||||||
*/
|
|
||||||
public KubernetesSeedProvider(Map<String, String> params) {
|
|
||||||
|
|
||||||
// Create default seeds
|
String initialSeeds = getEnvOrDefault("CASSANDRA_SEEDS", "");
|
||||||
defaultSeeds = createDefaultSeeds();
|
if (initialSeeds.equals("")) {
|
||||||
|
initialSeeds = getEnvOrDefault("POD_IP", "");
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: Load the CA cert when it is available on all platforms.
|
String seedSizeVar = getEnvOrDefault("CASSANDRA_SERVICE_NUM_SEEDS", "8");
|
||||||
trustAll = new TrustManager[] {
|
Integer seedSize = Integer.valueOf(seedSizeVar);
|
||||||
new X509TrustManager() {
|
|
||||||
public void checkServerTrusted(X509Certificate[] certs, String authType) {}
|
|
||||||
public void checkClientTrusted(X509Certificate[] certs, String authType) {}
|
|
||||||
public X509Certificate[] getAcceptedIssuers() { return null; }
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
trustAllHosts = new HostnameVerifier() {
|
String data = go.GetEndpoints(namespace, service, initialSeeds);
|
||||||
public boolean verify(String hostname, SSLSession session) {
|
ObjectMapper mapper = new ObjectMapper();
|
||||||
return true;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
try {
|
||||||
* Call kubernetes API to collect a list of seed providers
|
Endpoints endpoints = mapper.readValue(data, Endpoints.class);
|
||||||
* @return list of seed providers
|
logger.info("cassandra seeds: " + endpoints.ips.toString());
|
||||||
*/
|
return Collections.unmodifiableList(endpoints.ips);
|
||||||
public List<InetAddress> getSeeds() {
|
} catch (IOException e) {
|
||||||
|
// This should not happen
|
||||||
|
logger.error("unexpected error building cassandra seeds: " + e.getMessage());
|
||||||
|
return Collections.emptyList();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
String host = getEnvOrDefault("KUBERNETES_PORT_443_TCP_ADDR", "kubernetes.default.svc.cluster.local");
|
private static String getEnvOrDefault(String var, String def) {
|
||||||
String port = getEnvOrDefault("KUBERNETES_PORT_443_TCP_PORT", "443");
|
String val = System.getenv(var);
|
||||||
String serviceName = getEnvOrDefault("CASSANDRA_SERVICE", "cassandra");
|
if (val == null) {
|
||||||
String podNamespace = getEnvOrDefault("POD_NAMESPACE", "default");
|
val = def;
|
||||||
String path = String.format("/api/v1/namespaces/%s/endpoints/", podNamespace);
|
}
|
||||||
String seedSizeVar = getEnvOrDefault("CASSANDRA_SERVICE_NUM_SEEDS", "8");
|
return val;
|
||||||
Integer seedSize = Integer.valueOf(seedSizeVar);
|
}
|
||||||
String accountToken = getEnvOrDefault("K8S_ACCOUNT_TOKEN", "/var/run/secrets/kubernetes.io/serviceaccount/token");
|
|
||||||
|
|
||||||
List<InetAddress> seeds = new ArrayList<InetAddress>();
|
@JsonIgnoreProperties(ignoreUnknown = true)
|
||||||
try {
|
static class Endpoints {
|
||||||
String token = getServiceAccountToken(accountToken);
|
public List<InetAddress> ips;
|
||||||
|
}
|
||||||
SSLContext ctx = SSLContext.getInstance("SSL");
|
|
||||||
ctx.init(null, trustAll, new SecureRandom());
|
|
||||||
|
|
||||||
String PROTO = "https://";
|
|
||||||
URL url = new URL(PROTO + host + ":" + port + path + serviceName);
|
|
||||||
logger.info("Getting endpoints from " + url);
|
|
||||||
HttpsURLConnection conn = (HttpsURLConnection)url.openConnection();
|
|
||||||
|
|
||||||
// TODO: Remove this once the CA cert is propagated everywhere, and replace
|
|
||||||
// with loading the CA cert.
|
|
||||||
conn.setHostnameVerifier(trustAllHosts);
|
|
||||||
|
|
||||||
conn.setSSLSocketFactory(ctx.getSocketFactory());
|
|
||||||
conn.addRequestProperty("Authorization", "Bearer " + token);
|
|
||||||
ObjectMapper mapper = new ObjectMapper();
|
|
||||||
Endpoints endpoints = mapper.readValue(conn.getInputStream(), Endpoints.class);
|
|
||||||
|
|
||||||
if (endpoints != null) {
|
|
||||||
// Here is a problem point, endpoints.subsets can be null in first node cases.
|
|
||||||
if (endpoints.subsets != null && !endpoints.subsets.isEmpty()){
|
|
||||||
for (Subset subset : endpoints.subsets) {
|
|
||||||
if (subset.addresses != null && !subset.addresses.isEmpty()) {
|
|
||||||
for (Address address : subset.addresses) {
|
|
||||||
seeds.add(InetAddress.getByName(address.ip));
|
|
||||||
|
|
||||||
if(seeds.size() >= seedSize) {
|
|
||||||
logger.info("Available num endpoints: " + seeds.size());
|
|
||||||
return Collections.unmodifiableList(seeds);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
logger.info("Available num endpoints: " + seeds.size());
|
|
||||||
} else {
|
|
||||||
logger.warn("Endpoints are not available using default seeds in cassandra.yaml");
|
|
||||||
return Collections.unmodifiableList(defaultSeeds);
|
|
||||||
}
|
|
||||||
} catch (Exception ex) {
|
|
||||||
logger.warn("Request to kubernetes apiserver failed, using default seeds in cassandra.yaml", ex);
|
|
||||||
return Collections.unmodifiableList(defaultSeeds);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (seeds.size() == 0) {
|
|
||||||
// If we got nothing, we might be the first instance, in that case
|
|
||||||
// fall back on the seeds that were passed in cassandra.yaml.
|
|
||||||
logger.warn("Seeds are not available using default seeds in cassandra.yaml");
|
|
||||||
return Collections.unmodifiableList(defaultSeeds);
|
|
||||||
}
|
|
||||||
|
|
||||||
return Collections.unmodifiableList(seeds);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Code taken from {@link SimpleSeedProvider}. This is used as a fall back
|
|
||||||
* incase we don't find seeds
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
protected List<InetAddress> createDefaultSeeds()
|
|
||||||
{
|
|
||||||
Config conf;
|
|
||||||
try {
|
|
||||||
conf = loadConfig();
|
|
||||||
}
|
|
||||||
catch (Exception e) {
|
|
||||||
throw new AssertionError(e);
|
|
||||||
}
|
|
||||||
String[] hosts = conf.seed_provider.parameters.get("seeds").split(",", -1);
|
|
||||||
List<InetAddress> seeds = new ArrayList<InetAddress>();
|
|
||||||
for (String host : hosts) {
|
|
||||||
try {
|
|
||||||
seeds.add(InetAddress.getByName(host.trim()));
|
|
||||||
}
|
|
||||||
catch (UnknownHostException ex) {
|
|
||||||
// not fatal... DD will bark if there end up being zero seeds.
|
|
||||||
logger.warn("Seed provider couldn't lookup host {}", host);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if(seeds.size() == 0) {
|
|
||||||
try {
|
|
||||||
seeds.add(InetAddress.getLocalHost());
|
|
||||||
} catch (UnknownHostException e) {
|
|
||||||
logger.warn("Seed provider couldn't lookup localhost");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return Collections.unmodifiableList(seeds);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Code taken from {@link SimpleSeedProvider}
|
|
||||||
* @return
|
|
||||||
*/
|
|
||||||
protected static Config loadConfig() throws ConfigurationException
|
|
||||||
{
|
|
||||||
String loaderClass = System.getProperty("cassandra.config.loader");
|
|
||||||
ConfigurationLoader loader = loaderClass == null
|
|
||||||
? new YamlConfigurationLoader()
|
|
||||||
: FBUtilities.<ConfigurationLoader>construct(loaderClass, "configuration loading");
|
|
||||||
return loader.loadConfig();
|
|
||||||
}
|
|
||||||
|
|
||||||
private static String getEnvOrDefault(String var, String def) {
|
|
||||||
String val = System.getenv(var);
|
|
||||||
if (val == null) {
|
|
||||||
val = def;
|
|
||||||
}
|
|
||||||
return val;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static String getServiceAccountToken(String file) {
|
|
||||||
try {
|
|
||||||
return new String(Files.readAllBytes(Paths.get(file)));
|
|
||||||
} catch (IOException e) {
|
|
||||||
logger.warn("unable to load service account token" + file);
|
|
||||||
throw new RuntimeException("Unable to load services account token " + file);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
protected List<InetAddress> getDefaultSeeds() {
|
|
||||||
return defaultSeeds;
|
|
||||||
}
|
|
||||||
|
|
||||||
@JsonIgnoreProperties(ignoreUnknown = true)
|
|
||||||
static class Address {
|
|
||||||
public String ip;
|
|
||||||
}
|
|
||||||
|
|
||||||
@JsonIgnoreProperties(ignoreUnknown = true)
|
|
||||||
static class Subset {
|
|
||||||
public List<Address> addresses;
|
|
||||||
}
|
|
||||||
|
|
||||||
@JsonIgnoreProperties(ignoreUnknown = true)
|
|
||||||
static class Endpoints {
|
|
||||||
public List<Subset> subsets;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,48 +16,32 @@
|
||||||
|
|
||||||
package io.k8s.cassandra;
|
package io.k8s.cassandra;
|
||||||
|
|
||||||
|
import static org.hamcrest.Matchers.empty;
|
||||||
|
import static org.hamcrest.Matchers.is;
|
||||||
|
import static org.hamcrest.Matchers.not;
|
||||||
|
import static org.junit.Assert.assertThat;
|
||||||
|
|
||||||
|
import java.net.InetAddress;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.cassandra.locator.SeedProvider;
|
import org.apache.cassandra.locator.SeedProvider;
|
||||||
import org.junit.Ignore;
|
import org.junit.Ignore;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import static org.hamcrest.Matchers.*;
|
|
||||||
|
|
||||||
import java.net.InetAddress;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import static org.junit.Assert.*;
|
|
||||||
|
|
||||||
public class KubernetesSeedProviderTest {
|
public class KubernetesSeedProviderTest {
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(KubernetesSeedProviderTest.class);
|
private static final Logger logger = LoggerFactory.getLogger(KubernetesSeedProviderTest.class);
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@Ignore("has to be run inside of a kube cluster")
|
@Ignore("has to be run inside of a kube cluster")
|
||||||
public void getSeeds() throws Exception {
|
public void getSeeds() throws Exception {
|
||||||
SeedProvider provider = new KubernetesSeedProvider(new HashMap<String, String>());
|
SeedProvider provider = new KubernetesSeedProvider(new HashMap<String, String>());
|
||||||
List<InetAddress> seeds = provider.getSeeds();
|
List<InetAddress> seeds = provider.getSeeds();
|
||||||
|
|
||||||
assertThat(seeds, is(not(empty())));
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testDefaultSeeds() throws Exception {
|
|
||||||
|
|
||||||
KubernetesSeedProvider provider = new KubernetesSeedProvider(new HashMap<String,String>());
|
|
||||||
List<InetAddress> seeds = provider.getDefaultSeeds();
|
|
||||||
List<InetAddress> seedsTest = new ArrayList<>();
|
|
||||||
seedsTest.add(InetAddress.getByName("8.4.4.4"));
|
|
||||||
seedsTest.add(InetAddress.getByName("8.8.8.8"));
|
|
||||||
assertThat(seeds, is(not(empty())));
|
|
||||||
assertThat(seeds, is(seedsTest));
|
|
||||||
logger.debug("seeds loaded {}", seeds);
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
|
assertThat(seeds, is(not(empty())));
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue