Refactored KubernetesSeedProvider and added unit tests. Updated Docker image and bumped Cassandra version

This commit is contained in:
chrislovecnm 2016-04-28 13:26:45 -06:00
parent 04ef3fdb57
commit 66c8f6f536
13 changed files with 642 additions and 290 deletions

View File

@ -38,6 +38,7 @@ Documentation for other releases can be found at
## Table of Contents
- [Prerequisites](#prerequisites)
- [Cassandra Docker](#cassandra-docker)
- [tl;dr Quickstart](#tldr-quickstart)
- [Step 1: Create a Cassandra Service](#step-1-create-a-cassandra-service)
- [Step 2: Use a Replication Controller to create Cassandra node pods](#step-2-use-a-replication-controller-to-create-cassandra-node-pods)
@ -74,6 +75,38 @@ This example also has a few code and configuration files needed. To avoid
typing these out, you can `git clone` the Kubernetes repository to your local
computer.
## Cassandra Docker
The pods use the [```gcr.io/google-samples/cassandra:v9```](image/Dockerfile)
image from Google's [container registry](https://cloud.google.com/container-registry/docs/).
The docker is based on `debian:jessie` and includes OpenJDK 8. This image
includes a standard Cassandra installation from the Apache Debian repo.
### Custom Seed Provider
A custom [`SeedProvider`](https://svn.apache.org/repos/asf/cassandra/trunk/src/java/org/apache/cassandra/locator/SeedProvider.java)
is included for running Cassandra on top of Kubernetes. In Cassandra, a
`SeedProvider` bootstraps the gossip protocol that Cassandra uses to find other
Cassandra nodes. Seed addresses are hosts deemed as contact points. Cassandra
instances use the seed list to find each other and learn the topology of the
ring. The [`KubernetesSeedProvider`](java/src/main/java/io/k8s/cassandra/KubernetesSeedProvider.java)
discovers Cassandra seeds IP addresses vis the Kubernetes API, those Cassandra
instances are defined within the Cassandra Service.
Refer to the custom seed provider [README](java/README.md) for further
`KubernetesSeedProvider` configurations. For this example you should not need
to customize the Seed Provider configurations.
See the [image](image/) directory of this example for specifics on
how the container docker image was built and what it contains.
You may also note that we are setting some Cassandra parameters (`MAX_HEAP_SIZE`
and `HEAP_NEWSIZE`), and adding information about the
[namespace](../../docs/user-guide/namespaces.md).
We also tell Kubernetes that the container exposes
both the `CQL` and `Thrift` API ports. Finally, we tell the cluster
manager that we need 0.1 cpu (0.1 core).
## tl;dr Quickstart
If you want to jump straight to the commands we will run,
@ -103,8 +136,6 @@ kubectl delete service -l app=cassandra
kubectl delete daemonset cassandra
```
## Step 1: Create a Cassandra Service
A Kubernetes _[Service](../../docs/user-guide/services.md)_ describes a set of
@ -239,38 +270,24 @@ by the Service."
The `replicas` attribute specifies the desired number of replicas, in this
case 2 initially. We'll scale up to more shortly.
The replica's pods are using the [```gcr.io/google-samples/cassandra:v8```](image/Dockerfile)
image from Google's [container registry](https://cloud.google.com/container-registry/docs/).
This is a standard Cassandra installation on top of Debian. However, it also
adds a custom
[`SeedProvider`](https://svn.apache.org/repos/asf/cassandra/trunk/src/java/org/apache/cassandra/locator/SeedProvider.java) to Cassandra. In
Cassandra, a ```SeedProvider``` bootstraps the gossip protocol that Cassandra
uses to find other nodes.
The [`KubernetesSeedProvider`](java/src/io/k8s/cassandra/KubernetesSeedProvider.java)
discovers the Kubernetes API Server using the built in Kubernetes
discovery service, and then uses the Kubernetes API to find new nodes.
See the [image](image/) directory of this example for specifics on
how the container image was built and what it contains.
You may also note that we are setting some Cassandra parameters (`MAX_HEAP_SIZE`
and `HEAP_NEWSIZE`), and adding information about the
[namespace](../../docs/user-guide/namespaces.md).
We also tell Kubernetes that the container exposes
both the `CQL` and `Thrift` API ports. Finally, we tell the cluster
manager that we need 0.1 cpu (0.1 core).
Create the Replication Controller:
```console
$ kubectl create -f examples/cassandra/cassandra-controller.yaml
```
You can list the new controller:
```console
$ kubectl get rc -o wide
NAME DESIRED CURRENT AGE CONTAINER(S) IMAGE(S) SELECTOR
cassandra 2 2 11s cassandra gcr.io/google-samples/cassandra:v8 app=cassandra
```
Now if you list the pods in your cluster, and filter to the label
@ -278,10 +295,12 @@ Now if you list the pods in your cluster, and filter to the label
you see which Kubernetes nodes the pods were scheduled onto.)
```console
$ kubectl get pods -l="app=cassandra" -o wide
NAME READY STATUS RESTARTS AGE NODE
cassandra-21qyy 1/1 Running 0 1m kubernetes-minion-b286
cassandra-q6sz7 1/1 Running 0 1m kubernetes-minion-9ye5
```
Because these pods have the label `app=cassandra`, they map to the service we
@ -290,6 +309,7 @@ defined in Step 1.
You can check that the Pods are visible to the Service using the following service endpoints query:
```console
$ kubectl get endpoints cassandra -o yaml
apiVersion: v1
kind: Endpoints
@ -314,6 +334,7 @@ subsets:
ports:
- port: 9042
protocol: TCP
```
To show that the `SeedProvider` logic is working as intended, you can use the
@ -323,6 +344,7 @@ Cassandra pods. Again, substitute `cassandra-xxxxx` with the actual name of one
of your pods.
```console
$ kubectl exec -ti cassandra-xxxxx -- nodetool status
Datacenter: datacenter1
=======================
@ -331,6 +353,7 @@ Status=Up/Down
-- Address Load Tokens Owns (effective) Host ID Rack
UN 10.244.0.5 74.09 KB 256 100.0% 86feda0f-f070-4a5b-bda1-2eeb0ad08b77 rack1
UN 10.244.3.3 51.28 KB 256 100.0% dafe3154-1d67-42e1-ac1d-78e7e80dce2b rack1
```
## Step 3: Scale up the Cassandra cluster
@ -339,24 +362,29 @@ Now let's scale our Cassandra cluster to 4 pods. We do this by telling the
Replication Controller that we now want 4 replicas.
```sh
$ kubectl scale rc cassandra --replicas=4
```
You can see the new pods listed:
```console
$ kubectl get pods -l="app=cassandra" -o wide
NAME READY STATUS RESTARTS AGE NODE
cassandra-21qyy 1/1 Running 0 6m kubernetes-minion-b286
cassandra-81m2l 1/1 Running 0 47s kubernetes-minion-b286
cassandra-8qoyp 1/1 Running 0 47s kubernetes-minion-9ye5
cassandra-q6sz7 1/1 Running 0 6m kubernetes-minion-9ye5
```
In a few moments, you can examine the Cassandra cluster status again, and see
that the new pods have been detected by the custom `SeedProvider`:
```console
$ kubectl exec -ti cassandra-xxxxx -- nodetool status
Datacenter: datacenter1
=======================
@ -367,6 +395,7 @@ UN 10.244.0.6 51.67 KB 256 48.9% d07b23a5-56a1-4b0b-952d-68a
UN 10.244.1.5 84.71 KB 256 50.7% e060df1f-faa2-470c-923d-ca049b0f3f38 rack1
UN 10.244.1.6 84.71 KB 256 47.0% 83ca1580-4f3c-4ec5-9b38-75036b7a297f rack1
UN 10.244.0.5 68.2 KB 256 53.4% 72ca27e2-c72c-402a-9313-1e4b61c2f839 rack1
```
## Step 4: Delete the Replication Controller
@ -374,7 +403,9 @@ UN 10.244.0.5 68.2 KB 256 53.4% 72ca27e2-c72c-402a-9313-1e4
Before you start Step 5, __delete the replication controller__ you created above:
```sh
$ kubectl delete rc cassandra
```
## Step 5: Use a DaemonSet instead of a Replication Controller
@ -459,21 +490,27 @@ pod relationship.
Create this daemonset:
```console
$ kubectl create -f examples/cassandra/cassandra-daemonset.yaml
```
You may need to disable config file validation, like so:
```console
$ kubectl create -f examples/cassandra/cassandra-daemonset.yaml --validate=false
```
You can see the daemonset running:
```console
$ kubectl get daemonset
NAME DESIRED CURRENT NODE-SELECTOR
cassandra 3 3 <none>
```
Now, if you list the pods in your cluster, and filter to the label
@ -481,11 +518,13 @@ Now, if you list the pods in your cluster, and filter to the label
node in your network.
```console
$ kubectl get pods -l="app=cassandra" -o wide
NAME READY STATUS RESTARTS AGE NODE
cassandra-ico4r 1/1 Running 0 4s kubernetes-minion-rpo1
cassandra-kitfh 1/1 Running 0 1s kubernetes-minion-9ye5
cassandra-tzw89 1/1 Running 0 2s kubernetes-minion-b286
```
To prove that this all worked as intended, you can again use the `nodetool`
@ -493,6 +532,7 @@ command to examine the status of the cluster. To do this, use the `kubectl
exec` command to run `nodetool` in one of your newly-launched cassandra pods.
```console
$ kubectl exec -ti cassandra-xxxxx -- nodetool status
Datacenter: datacenter1
=======================
@ -502,6 +542,7 @@ Status=Up/Down
UN 10.244.0.5 74.09 KB 256 100.0% 86feda0f-f070-4a5b-bda1-2eeb0ad08b77 rack1
UN 10.244.4.2 32.45 KB 256 100.0% 0b1be71a-6ffb-4895-ac3e-b9791299c141 rack1
UN 10.244.3.3 51.28 KB 256 100.0% dafe3154-1d67-42e1-ac1d-78e7e80dce2b rack1
```
**Note**: This example had you delete the cassandra Replication Controller before
@ -518,15 +559,12 @@ both together by using additional labels and selectors.
When you are ready to take down your resources, do the following:
```console
$ kubectl delete service -l app=cassandra
$ kubectl delete daemonset cassandra
```
## Seed Provider Source
The Seed Provider source is
[here](java/src/io/k8s/cassandra/KubernetesSeedProvider.java).
<!-- BEGIN MUNGE: GENERATED_ANALYTICS -->
[![Analytics](https://kubernetes-site.appspot.com/UA-36037335-10/GitHub/examples/cassandra/README.md?pixel)]()

View File

@ -12,32 +12,33 @@
# See the License for the specific language governing permissions and
# limitations under the License.
FROM google/debian:wheezy
FROM google/debian:jessie
COPY cassandra.list /etc/apt/sources.list.d/cassandra.list
COPY run.sh /run.sh
RUN gpg --keyserver pgp.mit.edu --recv-keys F758CE318D77295D
RUN gpg --export --armor F758CE318D77295D | apt-key add -
RUN gpg --keyserver pgp.mit.edu --recv-keys 2B5C1B00
RUN gpg --export --armor 2B5C1B00 | apt-key add -
RUN gpg --keyserver pgp.mit.edu --recv-keys 0353B12C
RUN gpg --export --armor 0353B12C | apt-key add -
RUN apt-get update
RUN apt-get -qq -y install procps cassandra
RUN gpg --keyserver pgp.mit.edu --recv-keys F758CE318D77295D && \
gpg --export --armor F758CE318D77295D | apt-key add - && \
gpg --keyserver pgp.mit.edu --recv-keys 2B5C1B00 && \
gpg --export --armor 2B5C1B00 | apt-key add - && \
gpg --keyserver pgp.mit.edu --recv-keys 0353B12C && \
gpg --export --armor 0353B12C | apt-key add - && \
apt-get update && \
apt-get -qq -y install procps cassandra openjdk-8-jre-headless && \
chmod a+rx /run.sh && \
mkdir -p /cassandra_data/data && \
chown -R cassandra.cassandra /etc/cassandra /cassandra_data && \
chmod o+w -R /etc/cassandra /cassandra_data && \
rm -rf /var/lib/apt/lists/* && \
rm -rf /usr/share/doc/ && \
rm -rf /usr/share/doc-base/ && \
rm -rf /usr/share/man/ && \
rm -rf /tmp/*
COPY cassandra.yaml /etc/cassandra/cassandra.yaml
COPY logback.xml /etc/cassandra/logback.xml
COPY run.sh /run.sh
COPY kubernetes-cassandra.jar /kubernetes-cassandra.jar
RUN chmod a+rx /run.sh && \
mkdir -p /cassandra_data/data && \
chown -R cassandra.cassandra /etc/cassandra /cassandra_data && \
chmod o+w -R /etc/cassandra /cassandra_data
VOLUME ["/cassandra_data/data"]
USER cassandra

View File

@ -14,11 +14,11 @@
# build the cassandra image.
VERSION=v8
VERSION=v9
all: build
kubernetes-cassandra.jar: ../java/* ../java/src/io/k8s/cassandra/*.java
kubernetes-cassandra.jar: ../java/* ../java/src/main/java/io/k8s/cassandra/*.java
cd ../java && mvn package
mv ../java/target/kubernetes-cassandra*.jar kubernetes-cassandra.jar
cd ../java && mvn clean

View File

@ -1,3 +1,5 @@
deb http://www.apache.org/dist/cassandra/debian 21x main
deb-src http://www.apache.org/dist/cassandra/debian 21x main
deb http://www.apache.org/dist/cassandra/debian 34x main
deb-src http://www.apache.org/dist/cassandra/debian 34x main
# for jre8
deb http://http.debian.net/debian jessie-backports main

View File

@ -1,4 +1,4 @@
# Cassandra storage config YAML
# Cassandra storage config YAML
# NOTE:
# See http://wiki.apache.org/cassandra/StorageConfiguration for
@ -20,13 +20,13 @@ cluster_name: 'Test Cluster'
# Specifying initial_token will override this setting on the node's initial start,
# on subsequent starts, this setting will apply even if initial token is set.
#
# If you already have a cluster with 1 token per node, and wish to migrate to
# If you already have a cluster with 1 token per node, and wish to migrate to
# multiple tokens per node, see http://wiki.apache.org/cassandra/Operations
num_tokens: 256
# initial_token allows you to specify tokens manually. While you can use # it with
# vnodes (num_tokens > 1, above) -- in which case you should provide a
# comma-separated list -- it's primarily used when adding nodes # to legacy clusters
# vnodes (num_tokens > 1, above) -- in which case you should provide a
# comma-separated list -- it's primarily used when adding nodes # to legacy clusters
# that do not have vnodes enabled.
# initial_token:
@ -157,7 +157,7 @@ key_cache_save_period: 14400
# NOTE: if you reduce the size, you may not get you hottest keys loaded on startup.
#
# Default value is 0, to disable row caching.
row_cache_size_in_mb: 0
row_cache_size_in_mb: 16
# Duration in seconds after which Cassandra should
# save the row cache. Caches are saved to saved_caches_directory as specified
@ -204,7 +204,7 @@ counter_cache_save_period: 7200
# well as caches. Experiments show that JEMAlloc saves some memory
# than the native GCC allocator (i.e., JEMalloc is more
# fragmentation-resistant).
#
#
# Supported values are: NativeAllocator, JEMallocAllocator
#
# If you intend to use JEMallocAllocator you have to install JEMalloc as library and
@ -217,14 +217,14 @@ counter_cache_save_period: 7200
# If not set, the default directory is $CASSANDRA_HOME/data/saved_caches.
saved_caches_directory: /cassandra_data/saved_caches
# commitlog_sync may be either "periodic" or "batch."
# commitlog_sync may be either "periodic" or "batch."
# When in batch mode, Cassandra won't ack writes until the commit log
# has been fsynced to disk. It will wait up to
# commitlog_sync_batch_window_in_ms milliseconds for other writes, before
# performing the sync.
#
# commitlog_sync: batch
# commitlog_sync_batch_window_in_ms: 50
# commitlog_sync_batch_window_in_ms: 1.0
#
# the other option is "periodic" where writes may be acked immediately
# and the CommitLog is simply synced every commitlog_sync_period_in_ms
@ -239,7 +239,7 @@ commitlog_sync_period_in_ms: 10000
# The size of the individual commitlog file segments. A commitlog
# segment may be archived, deleted, or recycled once all the data
# in it (potentially from each columnfamily in the system) has been
# flushed to sstables.
# flushed to sstables.
#
# The default size is 32, which is almost always fine, but if you are
# archiving commitlog segments (see commitlog_archiving.properties),
@ -250,11 +250,11 @@ commitlog_segment_size_in_mb: 32
# any class that implements the SeedProvider interface and has a
# constructor that takes a Map<String, String> of parameters will do.
seed_provider:
# Addresses of hosts that are deemed contact points.
# Addresses of hosts that are deemed contact points.
# Cassandra nodes use this list of hosts to find each other and learn
# the topology of the ring. You must change this if you are running
# multiple nodes!
- class_name: io.k8s.cassandra.KubernetesSeedProvider
- class_name: io.k8s.cassandra.KubernetesSeedProvider
parameters:
# seeds is actually a comma-delimited list of addresses.
# Ex: "<ip1>,<ip2>,<ip3>"
@ -279,7 +279,7 @@ concurrent_counter_writes: 32
# the smaller of 1/4 of heap or 512MB.
# file_cache_size_in_mb: 512
# Total permitted memory to use for memtables. Cassandra will stop
# Total permitted memory to use for memtables. Cassandra will stop
# accepting writes when the limit is exceeded until a flush completes,
# and will trigger a flush based on memtable_cleanup_threshold
# If omitted, Cassandra will set both to 1/4 the size of the heap.
@ -300,7 +300,7 @@ concurrent_counter_writes: 32
# heap_buffers: on heap nio buffers
# offheap_buffers: off heap (direct) nio buffers
# offheap_objects: native memory, eliminating nio buffer heap overhead
memtable_allocation_type: heap_buffers
memtable_allocation_type: offheap_objects
# Total space to use for commitlogs. Since commitlog segments are
# mmapped, and hence use up address space, the default size is 32
@ -314,11 +314,11 @@ memtable_allocation_type: heap_buffers
# This sets the amount of memtable flush writer threads. These will
# be blocked by disk io, and each one will hold a memtable in memory
# while blocked.
# while blocked.
#
# memtable_flush_writers defaults to the smaller of (number of disks,
# number of cores), with a minimum of 2 and a maximum of 8.
#
#
# If your data directories are backed by SSD, you should increase this
# to the number of cores.
#memtable_flush_writers: 8
@ -476,7 +476,7 @@ thrift_framed_transport_size_in_mb: 15
# flushed or streamed locally in a backups/ subdirectory of the
# keyspace data. Removing these links is the operator's
# responsibility.
incremental_backups: false
incremental_backups: true
# Whether or not to take a snapshot before each compaction. Be
# careful using this option, since Cassandra won't clean up the
@ -485,7 +485,7 @@ incremental_backups: false
snapshot_before_compaction: false
# Whether or not a snapshot is taken of the data before keyspace truncation
# or dropping of column families. The STRONGLY advised default of true
# or dropping of column families. The STRONGLY advised default of true
# should be used to provide data safety. If you set this flag to false, you will
# lose data on truncation or drop.
auto_snapshot: true
@ -529,9 +529,10 @@ batch_size_warn_threshold_in_kb: 5
#
# concurrent_compactors defaults to the smaller of (number of disks,
# number of cores), with a minimum of 2 and a maximum of 8.
#
#
# If your data directories are backed by SSD, you should increase this
# to the number of cores.
# TODO: set this based on env??
#concurrent_compactors: 1
# Throttles compaction to the given total throughput across the entire
@ -544,7 +545,7 @@ compaction_throughput_mb_per_sec: 16
# When compacting, the replacement sstable(s) can be opened before they
# are completely written, and used in place of the prior sstables for
# any range that has been written. This helps to smoothly transfer reads
# any range that has been written. This helps to smoothly transfer reads
# between the sstables, reducing page cache churn and keeping hot rows hot
sstable_preemptive_open_interval_in_mb: 50
@ -582,7 +583,7 @@ request_timeout_in_ms: 10000
# Enable operation timeout information exchange between nodes to accurately
# measure request timeouts. If disabled, replicas will assume that requests
# were forwarded to them instantly by the coordinator, which means that
# under overload conditions we will waste that much extra time processing
# under overload conditions we will waste that much extra time processing
# already-timed-out requests.
#
# Warning: before enabling this property make sure to ntp is installed
@ -654,7 +655,7 @@ endpoint_snitch: SimpleSnitch
# controls how often to perform the more expensive part of host score
# calculation
dynamic_snitch_update_interval_in_ms: 100
dynamic_snitch_update_interval_in_ms: 100
# controls how often to reset all host scores, allowing a bad host to
# possibly recover
dynamic_snitch_reset_interval_in_ms: 600000
@ -678,13 +679,15 @@ dynamic_snitch_badness_threshold: 0.1
# client requests to a node with a separate queue for each
# request_scheduler_id. The scheduler is further customized by
# request_scheduler_options as described below.
request_scheduler: org.apache.cassandra.scheduler.NoScheduler
request_scheduler: org.apache.cassandra.scheduler.RoundRobinScheduler
request_scheduler_id: keyspace
# Scheduler Options vary based on the type of scheduler
# NoScheduler - Has no options
# RoundRobin
# - throttle_limit -- The throttle_limit is the number of in-flight
# requests per client. Requests beyond
# requests per client. Requests beyond
# that limit are queued up until
# running requests can complete.
# The value of 80 here is twice the number of
@ -762,3 +765,10 @@ internode_compression: all
# reducing overhead from the TCP protocol itself, at the cost of increasing
# latency if you block for cross-datacenter responses.
inter_dc_tcp_nodelay: false
disk_access_mode: mmap
row_cache_class_name: org.apache.cassandra.cache.OHCProvider
# Not till 3.5
#enable_user_defined_functions: true
#enable_scripted_user_defined_functions: tru

1
cassandra/java/.gitignore vendored Normal file
View File

@ -0,0 +1 @@
target

View File

@ -1,47 +1,93 @@
<!--
Copyright (C) 2015 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with the License. You may obtain a copy of
the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations under
the License.
-->
<project>
<modelVersion>4.0.0</modelVersion>
<groupId>io.k8s.cassandra</groupId>
<artifactId>kubernetes-cassandra</artifactId>
<version>0.0.5</version>
<build>
<sourceDirectory>src</sourceDirectory>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>1.6.3</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.6.3</version>
</dependency>
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-all</artifactId>
<version>2.0.11</version>
</dependency>
</dependencies>
<modelVersion>4.0.0</modelVersion>
<groupId>io.k8s.cassandra</groupId>
<artifactId>kubernetes-cassandra</artifactId>
<version>1.0.0</version>
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
<properties>
<logback.version>1.1.3</logback.version>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<version>1.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.5</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>${logback.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-core-asl</artifactId>
<version>1.6.3</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
<version>1.6.3</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra-all</artifactId>
<version>3.4</version>
<scope>provided</scope>
</dependency>
</dependencies>
</project>

View File

@ -1,175 +0,0 @@
/*
* Copyright (C) 2015 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package io.k8s.cassandra;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.net.URL;
import java.net.URLConnection;
import java.security.cert.X509Certificate;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.net.ssl.HostnameVerifier;
import javax.net.ssl.HttpsURLConnection;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSession;
import javax.net.ssl.TrustManager;
import javax.net.ssl.X509TrustManager;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
import org.codehaus.jackson.map.ObjectMapper;
import org.apache.cassandra.locator.SeedProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class KubernetesSeedProvider implements SeedProvider {
@JsonIgnoreProperties(ignoreUnknown = true)
static class Address {
public String ip;
}
@JsonIgnoreProperties(ignoreUnknown = true)
static class Subset {
public List<Address> addresses;
}
@JsonIgnoreProperties(ignoreUnknown = true)
static class Endpoints {
public List<Subset> subsets;
}
private static String getEnvOrDefault(String var, String def) {
String val = System.getenv(var);
if (val == null) {
val = def;
}
return val;
}
private static String getServiceAccountToken() throws IOException {
String file = "/var/run/secrets/kubernetes.io/serviceaccount/token";
return new String(Files.readAllBytes(Paths.get(file)));
}
private static final Logger logger = LoggerFactory.getLogger(KubernetesSeedProvider.class);
private List defaultSeeds;
private TrustManager[] trustAll;
private HostnameVerifier trustAllHosts;
public KubernetesSeedProvider(Map<String, String> params) {
// Taken from SimpleSeedProvider.java
// These are used as a fallback, if we get nothing from k8s.
String[] hosts = params.get("seeds").split(",", -1);
defaultSeeds = new ArrayList<InetAddress>(hosts.length);
for (String host : hosts)
{
try {
defaultSeeds.add(InetAddress.getByName(host.trim()));
}
catch (UnknownHostException ex)
{
// not fatal... DD will bark if there end up being zero seeds.
logger.warn("Seed provider couldn't lookup host " + host);
}
}
// TODO: Load the CA cert when it is available on all platforms.
trustAll = new TrustManager[] {
new X509TrustManager() {
public void checkServerTrusted(X509Certificate[] certs, String authType) {}
public void checkClientTrusted(X509Certificate[] certs, String authType) {}
public X509Certificate[] getAcceptedIssuers() { return null; }
}
};
trustAllHosts = new HostnameVerifier() {
public boolean verify(String hostname, SSLSession session) {
return true;
}
};
}
public List<InetAddress> getSeeds() {
List<InetAddress> list = new ArrayList<InetAddress>();
//String host = "https://kubernetes.default.svc.cluster.local";
String proto = "https://";
String host = getEnvOrDefault("KUBERNETES_PORT_443_TCP_ADDR", "kubernetes.default.svc.cluster.local");
String port = getEnvOrDefault("KUBERNETES_PORT_443_TCP_PORT", "443");
String serviceName = getEnvOrDefault("CASSANDRA_SERVICE", "cassandra");
String podNamespace = getEnvOrDefault("POD_NAMESPACE", "default");
String path = String.format("/api/v1/namespaces/%s/endpoints/", podNamespace);
try {
String token = getServiceAccountToken();
SSLContext ctx = SSLContext.getInstance("SSL");
ctx.init(null, trustAll, new SecureRandom());
URL url = new URL(proto + host + ":" + port + path + serviceName);
logger.info("Getting endpoints from " + url);
HttpsURLConnection conn = (HttpsURLConnection)url.openConnection();
// TODO: Remove this once the CA cert is propagated everywhere, and replace
// with loading the CA cert.
conn.setSSLSocketFactory(ctx.getSocketFactory());
conn.setHostnameVerifier(trustAllHosts);
conn.addRequestProperty("Authorization", "Bearer " + token);
ObjectMapper mapper = new ObjectMapper();
Endpoints endpoints = mapper.readValue(conn.getInputStream(), Endpoints.class);
if (endpoints != null) {
// Here is a problem point, endpoints.subsets can be null in first node cases.
if (endpoints.subsets != null && !endpoints.subsets.isEmpty()){
for (Subset subset : endpoints.subsets) {
if (subset.addresses != null && !subset.addresses.isEmpty()) {
for (Address address : subset.addresses) {
list.add(InetAddress.getByName(address.ip));
}
}
}
}
logger.info("Available endpoints: " + list);
} else {
logger.warn("Endpoints are not available");
}
} catch (IOException | NoSuchAlgorithmException | KeyManagementException ex) {
logger.warn("Request to kubernetes apiserver failed", ex);
}
if (list.size() == 0) {
// If we got nothing, we might be the first instance, in that case
// fall back on the seeds that were passed in cassandra.yaml.
return defaultSeeds;
}
return list;
}
// Simple main to test the implementation
public static void main(String[] args) {
SeedProvider provider = new KubernetesSeedProvider(new HashMap<String, String>());
System.out.println(provider.getSeeds());
}
}

View File

@ -0,0 +1,274 @@
/*
* Copyright (C) 2015 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package io.k8s.cassandra;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.ConfigurationLoader;
import org.apache.cassandra.config.YamlConfigurationLoader;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.locator.SeedProvider;
import org.apache.cassandra.locator.SimpleSeedProvider;
import org.apache.cassandra.utils.FBUtilities;
import org.codehaus.jackson.annotate.JsonIgnoreProperties;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.*;
import java.io.IOException;
import java.net.InetAddress;
import java.net.URL;
import java.net.UnknownHostException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.KeyManagementException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
/**
* Self discovery {@link SeedProvider} that creates a list of Cassandra Seeds by
* communicating with the Kubernetes API.
* <p>Various System Variable can be used to configure this provider:
* <ul>
* <li>KUBERNETES_PORT_443_TCP_ADDR defaults to kubernetes.default.svc.cluster.local</li>
* <li>KUBERNETES_PORT_443_TCP_PORT defaults to 443</li>
* <li>CASSANDRA_SERVICE defaults to cassandra</li>
* <li>POD_NAMESPACE defaults to 'default'</li>
* <li>CASSANDRA_SERVICE_NUM_SEEDS defaults to 8 seeds</li>
* </ul>
*/
public class KubernetesSeedProvider implements SeedProvider {
private static final Logger logger = LoggerFactory.getLogger(KubernetesSeedProvider.class);
/**
* default seeds to fall back on
*/
private List<InetAddress> defaultSeeds;
private TrustManager[] trustAll;
private HostnameVerifier trustAllHosts;
/**
* Create new Seeds
* @param params
*/
public KubernetesSeedProvider(Map<String, String> params) {
// Create default seeds
defaultSeeds = createDefaultSeeds();
// TODO: Load the CA cert when it is available on all platforms.
trustAll = new TrustManager[] {
new X509TrustManager() {
public void checkServerTrusted(X509Certificate[] certs, String authType) {}
public void checkClientTrusted(X509Certificate[] certs, String authType) {}
public X509Certificate[] getAcceptedIssuers() { return null; }
}
};
trustAllHosts = new HostnameVerifier() {
public boolean verify(String hostname, SSLSession session) {
return true;
}
};
}
/**
* Call kubernetes API to collect a list of seed providers
* @return list of seed providers
*/
public List<InetAddress> getSeeds() {
String host = getEnvOrDefault("KUBERNETES_PORT_443_TCP_ADDR", "kubernetes.default.svc.cluster.local");
String port = getEnvOrDefault("KUBERNETES_PORT_443_TCP_PORT", "443");
String serviceName = getEnvOrDefault("CASSANDRA_SERVICE", "cassandra");
String podNamespace = getEnvOrDefault("POD_NAMESPACE", "default");
String path = String.format("/api/v1/namespaces/%s/endpoints/", podNamespace);
String seedSizeVar = getEnvOrDefault("CASSANDRA_SERVICE_NUM_SEEDS", "8");
Integer seedSize = Integer.valueOf(seedSizeVar);
List<InetAddress> seeds = new ArrayList<InetAddress>();
try {
String token = getServiceAccountToken();
SSLContext ctx = SSLContext.getInstance("SSL");
ctx.init(null, trustAll, new SecureRandom());
String PROTO = "https://";
URL url = new URL(PROTO + host + ":" + port + path + serviceName);
logger.info("Getting endpoints from " + url);
HttpsURLConnection conn = (HttpsURLConnection)url.openConnection();
// TODO: Remove this once the CA cert is propagated everywhere, and replace
// with loading the CA cert.
conn.setHostnameVerifier(trustAllHosts);
conn.setSSLSocketFactory(ctx.getSocketFactory());
conn.addRequestProperty("Authorization", "Bearer " + token);
ObjectMapper mapper = new ObjectMapper();
Endpoints endpoints = mapper.readValue(conn.getInputStream(), Endpoints.class);
if (endpoints != null) {
// Here is a problem point, endpoints.subsets can be null in first node cases.
if (endpoints.subsets != null && !endpoints.subsets.isEmpty()){
for (Subset subset : endpoints.subsets) {
if (subset.addresses != null && !subset.addresses.isEmpty()) {
for (Address address : subset.addresses) {
seeds.add(InetAddress.getByName(address.ip));
if(seeds.size() >= seedSize) {
logger.info("Available num endpoints: " + seeds.size());
return Collections.unmodifiableList(seeds);
}
}
}
}
}
logger.info("Available num endpoints: " + seeds.size());
} else {
logger.warn("Endpoints are not available using default seeds in cassandra.yaml");
return Collections.unmodifiableList(defaultSeeds);
}
} catch (IOException | NoSuchAlgorithmException | KeyManagementException ex) {
logger.warn("Request to kubernetes apiserver failed, using default seeds in cassandra.yaml", ex);
return Collections.unmodifiableList(defaultSeeds);
}
if (seeds.size() == 0) {
// If we got nothing, we might be the first instance, in that case
// fall back on the seeds that were passed in cassandra.yaml.
logger.warn("Seeds are not available using default seeds in cassandra.yaml");
return Collections.unmodifiableList(defaultSeeds);
}
return Collections.unmodifiableList(seeds);
}
/**
* Code taken from {@link SimpleSeedProvider}. This is used as a fall back
* incase we don't find seeds
* @return
*/
protected List<InetAddress> createDefaultSeeds()
{
Config conf;
try
{
conf = loadConfig();
}
catch (Exception e)
{
throw new AssertionError(e);
}
String[] hosts = conf.seed_provider.parameters.get("seeds").split(",", -1);
List<InetAddress> seeds = new ArrayList<InetAddress>();
for (String host : hosts)
{
try
{
seeds.add(InetAddress.getByName(host.trim()));
}
catch (UnknownHostException ex)
{
// not fatal... DD will bark if there end up being zero seeds.
logger.warn("Seed provider couldn't lookup host {}", host);
}
}
if(seeds.size() == 0) {
try {
seeds.add(InetAddress.getLocalHost());
} catch (UnknownHostException e) {
logger.warn("Seed provider couldn't lookup localhost");
}
}
return Collections.unmodifiableList(seeds);
}
/**
* Code taken from {@link SimpleSeedProvider}
* @return
*/
protected static Config loadConfig() throws ConfigurationException
{
String loaderClass = System.getProperty("cassandra.config.loader");
ConfigurationLoader loader = loaderClass == null
? new YamlConfigurationLoader()
: FBUtilities.<ConfigurationLoader>construct(loaderClass, "configuration loading");
return loader.loadConfig();
}
private static String getEnvOrDefault(String var, String def) {
String val = System.getenv(var);
if (val == null) {
val = def;
}
return val;
}
private static String getServiceAccountToken() throws IOException {
String file = "/var/run/secrets/kubernetes.io/serviceaccount/token";
try {
return new String(Files.readAllBytes(Paths.get(file)));
} catch (IOException e) {
logger.warn("unable to load service account token");
throw e;
}
}
protected List<InetAddress> getDefaultSeeds() {
return defaultSeeds;
}
@JsonIgnoreProperties(ignoreUnknown = true)
static class Address {
public String ip;
}
@JsonIgnoreProperties(ignoreUnknown = true)
static class Subset {
public List<Address> addresses;
}
@JsonIgnoreProperties(ignoreUnknown = true)
static class Endpoints {
public List<Subset> subsets;
}
}

View File

@ -0,0 +1,64 @@
/*
* Copyright (C) 2015 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package io.k8s.cassandra;
import com.google.common.collect.ImmutableMap;
import org.apache.cassandra.locator.SeedProvider;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.hamcrest.Matchers.*;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import static org.junit.Assert.*;
public class KubernetesSeedProviderTest {
private static final Logger logger = LoggerFactory.getLogger(KubernetesSeedProviderTest.class);
@Test
@Ignore("has to be run inside of a kube cluster")
public void getSeeds() throws Exception {
SeedProvider provider = new KubernetesSeedProvider(new HashMap<String, String>());
List<InetAddress> seeds = provider.getSeeds();
assertThat(seeds, is(not(empty())));
}
@Test
public void testDefaultSeeds() throws Exception {
KubernetesSeedProvider provider = new KubernetesSeedProvider(new HashMap<String,String>());
List<InetAddress> seeds = provider.getDefaultSeeds();
List<InetAddress> seedsTest = new ArrayList<>();
seedsTest.add(InetAddress.getByName("8.4.4.4"));
seedsTest.add(InetAddress.getByName("8.8.8.8"));
assertThat(seeds, is(not(empty())));
assertThat(seeds, is(seedsTest));
logger.debug("seeds loaded {}", seeds);
}
}

View File

@ -0,0 +1,57 @@
# Copyright (C) 2015 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy of
# the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
#
# Warning!
# Consider the effects on 'o.a.c.i.s.LegacySSTableTest' before changing schemas in this file.
#
cluster_name: Test Cluster
# memtable_allocation_type: heap_buffers
memtable_allocation_type: offheap_objects
commitlog_sync: batch
commitlog_sync_batch_window_in_ms: 1.0
commitlog_segment_size_in_mb: 5
commitlog_directory: target/cassandra/commitlog
hints_directory: target/cassandra/hints
partitioner: org.apache.cassandra.dht.ByteOrderedPartitioner
listen_address: 127.0.0.1
storage_port: 7010
rpc_port: 9170
start_native_transport: true
native_transport_port: 9042
column_index_size_in_kb: 4
saved_caches_directory: target/cassandra/saved_caches
data_file_directories:
- target/cassandra/data
disk_access_mode: mmap
seed_provider:
- class_name: io.k8s.cassandra.KubernetesSeedProvider
parameters:
- seeds: "8.4.4.4,8.8.8.8"
endpoint_snitch: org.apache.cassandra.locator.SimpleSnitch
dynamic_snitch: true
request_scheduler: org.apache.cassandra.scheduler.RoundRobinScheduler
request_scheduler_id: keyspace
server_encryption_options:
internode_encryption: none
keystore: conf/.keystore
keystore_password: cassandra
truststore: conf/.truststore
truststore_password: cassandra
incremental_backups: true
concurrent_compactors: 4
compaction_throughput_mb_per_sec: 0
row_cache_class_name: org.apache.cassandra.cache.OHCProvider
row_cache_size_in_mb: 16
enable_user_defined_functions: true
enable_scripted_user_defined_functions: true

View File

@ -0,0 +1,34 @@
<!--
Copyright (C) 2015 Google Inc.
Licensed under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with the License. You may obtain a copy of
the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations under
the License.
-->
<configuration debug="false" scan="true">
<appender name="STDOUT" target="System.out" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%-5level %date{HH:mm:ss,SSS} %msg%n</pattern>
</encoder>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<level>DEBUG</level>
</filter>
</appender>
<logger name="io.k8s.cassandra" level="DEBUG"/>
<root level="INFO">
<appender-ref ref="STDOUT" />
</root>
</configuration>