This adds an etcd health check endpoint to kube-apiserver
addressing https://github.com/kubernetes/kubernetes/issues/48215. Kubernetes-commit: 47d748c5dc989ea46142569bf42636c622fe128a
This commit is contained in:
parent
c377b37cda
commit
7dfcb9c56f
|
@ -62,7 +62,9 @@ go_library(
|
|||
"//vendor/k8s.io/apiserver/pkg/registry/generic:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/registry/generic/registry:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/server:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/server/healthz:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/server/storage:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/storage/etcd3/preflight:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/util/flag:go_default_library",
|
||||
|
|
|
@ -18,6 +18,7 @@ package options
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
"github.com/spf13/pflag"
|
||||
|
||||
|
@ -26,7 +27,9 @@ import (
|
|||
"k8s.io/apiserver/pkg/registry/generic"
|
||||
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
|
||||
"k8s.io/apiserver/pkg/server"
|
||||
"k8s.io/apiserver/pkg/server/healthz"
|
||||
serverstorage "k8s.io/apiserver/pkg/server/storage"
|
||||
"k8s.io/apiserver/pkg/storage/etcd3/preflight"
|
||||
"k8s.io/apiserver/pkg/storage/storagebackend"
|
||||
)
|
||||
|
||||
|
@ -127,15 +130,30 @@ func (s *EtcdOptions) AddFlags(fs *pflag.FlagSet) {
|
|||
}
|
||||
|
||||
func (s *EtcdOptions) ApplyTo(c *server.Config) error {
|
||||
s.addEtcdHealthEndpoint(c)
|
||||
c.RESTOptionsGetter = &SimpleRestOptionsFactory{Options: *s}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFactory, c *server.Config) error {
|
||||
s.addEtcdHealthEndpoint(c)
|
||||
c.RESTOptionsGetter = &storageFactoryRestOptionsFactory{Options: *s, StorageFactory: factory}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *EtcdOptions) addEtcdHealthEndpoint(c *server.Config) {
|
||||
c.HealthzChecks = append(c.HealthzChecks, healthz.NamedCheck("etcd", func(r *http.Request) error {
|
||||
done, err := preflight.EtcdConnection{ServerList: s.StorageConfig.ServerList}.CheckEtcdServers()
|
||||
if !done {
|
||||
return fmt.Errorf("etcd failed")
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}))
|
||||
}
|
||||
|
||||
type SimpleRestOptionsFactory struct {
|
||||
Options EtcdOptions
|
||||
}
|
||||
|
|
|
@ -0,0 +1,23 @@
|
|||
package(default_visibility = ["//visibility:public"])
|
||||
|
||||
licenses(["notice"])
|
||||
|
||||
load(
|
||||
"@io_bazel_rules_go//go:def.bzl",
|
||||
"go_library",
|
||||
"go_test",
|
||||
)
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = ["checks.go"],
|
||||
tags = ["automanaged"],
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = ["checks_test.go"],
|
||||
library = ":go_default_library",
|
||||
tags = ["automanaged"],
|
||||
deps = ["//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library"],
|
||||
)
|
|
@ -0,0 +1,73 @@
|
|||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package preflight
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"net/url"
|
||||
"time"
|
||||
)
|
||||
|
||||
const connectionTimeout = 1 * time.Second
|
||||
|
||||
type connection interface {
|
||||
serverReachable(address string) bool
|
||||
parseServerList(serverList []string) error
|
||||
CheckEtcdServers() (bool, error)
|
||||
}
|
||||
|
||||
// EtcdConnection holds the Etcd server list
|
||||
type EtcdConnection struct {
|
||||
ServerList []string
|
||||
}
|
||||
|
||||
func (EtcdConnection) serverReachable(connURL *url.URL) bool {
|
||||
scheme := connURL.Scheme
|
||||
if scheme == "http" || scheme == "https" || scheme == "tcp" {
|
||||
scheme = "tcp"
|
||||
}
|
||||
if conn, err := net.DialTimeout(scheme, connURL.Host, connectionTimeout); err == nil {
|
||||
defer conn.Close()
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func parseServerURI(serverURI string) (*url.URL, error) {
|
||||
connURL, err := url.Parse(serverURI)
|
||||
if err != nil {
|
||||
return &url.URL{}, fmt.Errorf("unable to parse etcd url: %v", err)
|
||||
}
|
||||
return connURL, nil
|
||||
}
|
||||
|
||||
// CheckEtcdServers will attempt to reach all etcd servers once. If any
|
||||
// can be reached, return true.
|
||||
func (con EtcdConnection) CheckEtcdServers() (done bool, err error) {
|
||||
// Attempt to reach every Etcd server in order
|
||||
for _, serverURI := range con.ServerList {
|
||||
host, err := parseServerURI(serverURI)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if con.serverReachable(host) {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
return false, nil
|
||||
}
|
|
@ -0,0 +1,109 @@
|
|||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package preflight
|
||||
|
||||
import (
|
||||
"net/url"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
utilwait "k8s.io/apimachinery/pkg/util/wait"
|
||||
)
|
||||
|
||||
func TestParseServerURIGood(t *testing.T) {
|
||||
connURL, err := parseServerURI("https://127.0.0.1:2379")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
reference := "127.0.0.1:2379"
|
||||
if connURL.Host != reference {
|
||||
t.Fatalf("server uri was not parsed correctly, host %s was invalid", connURL.Host)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseServerURIGoodUnix(t *testing.T) {
|
||||
connURL, err := parseServerURI("unix://127.0.0.1:21002112605")
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
reference := "127.0.0.1:21002112605"
|
||||
if connURL.Host != reference {
|
||||
t.Fatalf("server uri was not parsed correctly, host %s was invalid", connURL.Host)
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseServerURIBad(t *testing.T) {
|
||||
_, err := parseServerURI("-invalid uri$@#%")
|
||||
if err == nil {
|
||||
t.Fatal("expected bad uri to raise parse error")
|
||||
}
|
||||
}
|
||||
|
||||
func TestEtcdConnection(t *testing.T) {
|
||||
etcd := new(EtcdConnection)
|
||||
|
||||
result := etcd.serverReachable(&url.URL{Host: "-not a real network address-", Scheme: "tcp"})
|
||||
if result {
|
||||
t.Fatal("checkConnection should not have succeeded")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCheckEtcdServersEmpty(t *testing.T) {
|
||||
etcd := new(EtcdConnection)
|
||||
result, err := etcd.CheckEtcdServers()
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if result {
|
||||
t.Fatal("CheckEtcdServers should not have succeeded")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCheckEtcdServersUri(t *testing.T) {
|
||||
etcd := new(EtcdConnection)
|
||||
etcd.ServerList = []string{"-invalid uri$@#%"}
|
||||
result, err := etcd.CheckEtcdServers()
|
||||
if err == nil {
|
||||
t.Fatalf("expected bad uri to raise parse error")
|
||||
}
|
||||
if result {
|
||||
t.Fatal("CheckEtcdServers should not have succeeded")
|
||||
}
|
||||
}
|
||||
|
||||
func TestCheckEtcdServers(t *testing.T) {
|
||||
etcd := new(EtcdConnection)
|
||||
etcd.ServerList = []string{""}
|
||||
result, err := etcd.CheckEtcdServers()
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %v", err)
|
||||
}
|
||||
if result {
|
||||
t.Fatal("CheckEtcdServers should not have succeeded")
|
||||
}
|
||||
}
|
||||
|
||||
func TestPollCheckServer(t *testing.T) {
|
||||
err := utilwait.PollImmediate(1*time.Microsecond,
|
||||
2*time.Microsecond,
|
||||
EtcdConnection{ServerList: []string{""}}.CheckEtcdServers)
|
||||
if err == nil {
|
||||
t.Fatal("expected check to time out")
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue