Use actual etcd client for /healthz/etcd checks

Kubernetes-commit: b39cd00982c1696d8ae8afc99931919894044ee2
This commit is contained in:
Jordan Liggitt 2018-06-12 14:33:48 -04:00 committed by Kubernetes Publisher
parent a2b2abd503
commit 8d6d8aa36e
4 changed files with 95 additions and 15 deletions

View File

@ -32,8 +32,8 @@ import (
"k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/healthz"
serverstorage "k8s.io/apiserver/pkg/server/storage"
"k8s.io/apiserver/pkg/storage/etcd3/preflight"
"k8s.io/apiserver/pkg/storage/storagebackend"
storagefactory "k8s.io/apiserver/pkg/storage/storagebackend/factory"
)
type EtcdOptions struct {
@ -181,29 +181,30 @@ func (s *EtcdOptions) ApplyTo(c *server.Config) error {
if s == nil {
return nil
}
s.addEtcdHealthEndpoint(c)
if err := s.addEtcdHealthEndpoint(c); err != nil {
return err
}
c.RESTOptionsGetter = &SimpleRestOptionsFactory{Options: *s}
return nil
}
func (s *EtcdOptions) ApplyWithStorageFactoryTo(factory serverstorage.StorageFactory, c *server.Config) error {
s.addEtcdHealthEndpoint(c)
if err := s.addEtcdHealthEndpoint(c); err != nil {
return err
}
c.RESTOptionsGetter = &storageFactoryRestOptionsFactory{Options: *s, StorageFactory: factory}
return nil
}
func (s *EtcdOptions) addEtcdHealthEndpoint(c *server.Config) {
func (s *EtcdOptions) addEtcdHealthEndpoint(c *server.Config) error {
healthCheck, err := storagefactory.CreateHealthCheck(s.StorageConfig)
if err != nil {
return err
}
c.HealthzChecks = append(c.HealthzChecks, healthz.NamedCheck("etcd", func(r *http.Request) error {
done, err := preflight.EtcdConnection{ServerList: s.StorageConfig.ServerList}.CheckEtcdServers()
if !done {
return fmt.Errorf("etcd failed")
}
if err != nil {
return err
}
return nil
return healthCheck()
}))
return nil
}
type SimpleRestOptionsFactory struct {

View File

@ -17,6 +17,8 @@ limitations under the License.
package factory
import (
"context"
"fmt"
"net"
"net/http"
"time"
@ -30,6 +32,29 @@ import (
"k8s.io/apiserver/pkg/storage/storagebackend"
)
func newETCD2HealthCheck(c storagebackend.Config) (func() error, error) {
tr, err := newTransportForETCD2(c.CertFile, c.KeyFile, c.CAFile)
if err != nil {
return nil, err
}
client, err := newETCD2Client(tr, c.ServerList)
if err != nil {
return nil, err
}
members := etcd2client.NewMembersAPI(client)
return func() error {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
if _, err := members.List(ctx); err != nil {
return fmt.Errorf("error listing etcd members: %v", err)
}
return nil
}, nil
}
func newETCD2Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
tr, err := newTransportForETCD2(c.CertFile, c.KeyFile, c.CAFile)
if err != nil {

View File

@ -18,11 +18,14 @@ package factory
import (
"context"
"fmt"
"sync/atomic"
"time"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/pkg/transport"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/etcd3"
"k8s.io/apiserver/pkg/storage/storagebackend"
@ -38,7 +41,41 @@ var (
dialTimeout = 10 * time.Second
)
func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
func newETCD3HealthCheck(c storagebackend.Config) (func() error, error) {
// constructing the etcd v3 client blocks and times out if etcd is not available.
// retry in a loop in the background until we successfully create the client, storing the client or error encountered
clientValue := &atomic.Value{}
clientErrMsg := &atomic.Value{}
clientErrMsg.Store("etcd client connection not yet established")
go wait.PollUntil(time.Second, func() (bool, error) {
client, err := newETCD3Client(c)
if err != nil {
clientErrMsg.Store(err.Error())
return false, nil
}
clientValue.Store(client)
clientErrMsg.Store("")
return true, nil
}, wait.NeverStop)
return func() error {
if errMsg := clientErrMsg.Load().(string); len(errMsg) > 0 {
return fmt.Errorf(errMsg)
}
client := clientValue.Load().(*clientv3.Client)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
if _, err := client.Cluster.MemberList(ctx); err != nil {
return fmt.Errorf("error listing etcd members: %v", err)
}
return nil
}, nil
}
func newETCD3Client(c storagebackend.Config) (*clientv3.Client, error) {
tlsInfo := transport.TLSInfo{
CertFile: c.CertFile,
KeyFile: c.KeyFile,
@ -46,7 +83,7 @@ func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, e
}
tlsConfig, err := tlsInfo.ClientConfig()
if err != nil {
return nil, nil, err
return nil, err
}
// NOTE: Client relies on nil tlsConfig
// for non-secure connections, update the implicit variable
@ -61,6 +98,11 @@ func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, e
TLS: tlsConfig,
}
client, err := clientv3.New(cfg)
return client, err
}
func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
client, err := newETCD3Client(c)
if err != nil {
return nil, nil, err
}

View File

@ -41,3 +41,15 @@ func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
}
}
// CreateHealthCheck creates a healthcheck function based on given config.
func CreateHealthCheck(c storagebackend.Config) (func() error, error) {
switch c.Type {
case storagebackend.StorageTypeETCD2:
return newETCD2HealthCheck(c)
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
return newETCD3HealthCheck(c)
default:
return nil, fmt.Errorf("unknown storage type: %s", c.Type)
}
}