Improve dynamic cert file change detection

DynamicFileCAContent and DynamicCertKeyPairContent used periodical job
to check whether the file content has changed, leading to 1 minute of
delay in worst case. This patch improves it by leveraging fsnotify
watcher. The content change will be reflected immediately.

Kubernetes-commit: 3cfe3d048ff37c1c6994d131ed8557f3c8bddc8a
This commit is contained in:
Quan Tian 2021-08-03 21:28:01 +08:00 committed by Kubernetes Publisher
parent 5038711d39
commit 0737519ac6
4 changed files with 127 additions and 34 deletions

18
go.mod
View File

@ -11,6 +11,7 @@ require (
github.com/davecgh/go-spew v1.1.1
github.com/emicklei/go-restful v2.9.5+incompatible
github.com/evanphx/json-patch v4.11.0+incompatible
github.com/fsnotify/fsnotify v1.4.9
github.com/go-openapi/jsonpointer v0.19.5 // indirect
github.com/go-openapi/jsonreference v0.19.5 // indirect
github.com/go-openapi/swag v0.19.14 // indirect
@ -43,10 +44,10 @@ require (
google.golang.org/grpc v1.38.0
gopkg.in/natefinch/lumberjack.v2 v2.0.0
gopkg.in/square/go-jose.v2 v2.2.2
k8s.io/api v0.0.0-20210806000319-499b6f90564c
k8s.io/apimachinery v0.0.0-20210805051055-f7769293e6f1
k8s.io/client-go v0.0.0-20210806000600-0f5acb8c39dd
k8s.io/component-base v0.0.0-20210805120716-92bebfd2c985
k8s.io/api v0.0.0
k8s.io/apimachinery v0.0.0
k8s.io/client-go v0.0.0
k8s.io/component-base v0.0.0
k8s.io/klog/v2 v2.9.0
k8s.io/kube-openapi v0.0.0-20210421082810-95288971da7e
k8s.io/utils v0.0.0-20210707171843-4b05e18ac7d9
@ -56,8 +57,9 @@ require (
)
replace (
k8s.io/api => k8s.io/api v0.0.0-20210806000319-499b6f90564c
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20210805051055-f7769293e6f1
k8s.io/client-go => k8s.io/client-go v0.0.0-20210806000600-0f5acb8c39dd
k8s.io/component-base => k8s.io/component-base v0.0.0-20210805120716-92bebfd2c985
k8s.io/api => ../api
k8s.io/apimachinery => ../apimachinery
k8s.io/apiserver => ../apiserver
k8s.io/client-go => ../client-go
k8s.io/component-base => ../component-base
)

8
go.sum
View File

@ -780,14 +780,6 @@ honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
k8s.io/api v0.0.0-20210806000319-499b6f90564c h1:/TwdwBIiHSFv2VnETc8qgoi/skGGBo1qlXszp8N/BMo=
k8s.io/api v0.0.0-20210806000319-499b6f90564c/go.mod h1:9J6nkHavSazyXmPeuA4f1YO9Ztdjw7nDibPjT4P+wsY=
k8s.io/apimachinery v0.0.0-20210805051055-f7769293e6f1 h1:cVpwhaGeh/tNPBeYbFff3tjx5AxwG5zwImhz+eusG3k=
k8s.io/apimachinery v0.0.0-20210805051055-f7769293e6f1/go.mod h1:O3oNtNadZdeOMxHFVxOreoznohCpy0z6mocxbZr7oJ0=
k8s.io/client-go v0.0.0-20210806000600-0f5acb8c39dd h1:rJtT7gZ4LkZG+xtM6/iJvnzOx5XW+fNicRxMOYT0u/w=
k8s.io/client-go v0.0.0-20210806000600-0f5acb8c39dd/go.mod h1:AptIKtgqoSsdZsOhDruqHKBn7GeBrnbH5CgVkI7tpYU=
k8s.io/component-base v0.0.0-20210805120716-92bebfd2c985 h1:5AICcEUyTnjI3rCqtY3n7sjZRLrCtI8Ejt9jnY+24kQ=
k8s.io/component-base v0.0.0-20210805120716-92bebfd2c985/go.mod h1:uuSJv3vcTE8Ays2CdTgQdaDyg2nYrkK3jVrLDSV3uAo=
k8s.io/gengo v0.0.0-20200413195148-3a45101e95ac/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0=
k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE=
k8s.io/klog/v2 v2.9.0 h1:D7HV+n1V57XeZ0m6tdRkfknthUaM06VFbWldOFh8kzM=

View File

@ -24,6 +24,7 @@ import (
"sync/atomic"
"time"
"github.com/fsnotify/fsnotify"
"k8s.io/client-go/util/cert"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
@ -44,7 +45,7 @@ type ControllerRunner interface {
Run(workers int, stopCh <-chan struct{})
}
// DynamicFileCAContent provies a CAContentProvider that can dynamically react to new file content
// DynamicFileCAContent provides a CAContentProvider that can dynamically react to new file content
// It also fulfills the authenticator interface to provide verifyoptions
type DynamicFileCAContent struct {
name string
@ -147,7 +148,7 @@ func (c *DynamicFileCAContent) RunOnce() error {
return c.loadCABundle()
}
// Run starts the kube-apiserver and blocks until stopCh is closed.
// Run starts the controller and blocks until stopCh is closed.
func (c *DynamicFileCAContent) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()
@ -158,17 +159,62 @@ func (c *DynamicFileCAContent) Run(workers int, stopCh <-chan struct{}) {
// doesn't matter what workers say, only start one.
go wait.Until(c.runWorker, time.Second, stopCh)
// start timer that rechecks every minute, just in case. this also serves to prime the controller quickly.
go wait.PollImmediateUntil(FileRefreshDuration, func() (bool, error) {
c.queue.Add(workItemKey)
return false, nil
}, stopCh)
// TODO this can be wired to an fsnotifier as well.
// start the loop that watches the CA file until stopCh is closed.
go wait.Until(func() {
if err := c.watchCAFile(stopCh); err != nil {
klog.ErrorS(err, "Failed to watch CA file, will retry later")
}
}, time.Minute, stopCh)
<-stopCh
}
func (c *DynamicFileCAContent) watchCAFile(stopCh <-chan struct{}) error {
// Trigger a check here to ensure the content will be checked periodically even if the following watch fails.
c.queue.Add(workItemKey)
w, err := fsnotify.NewWatcher()
if err != nil {
return fmt.Errorf("error creating fsnotify watcher: %v", err)
}
defer w.Close()
if err = w.Add(c.filename); err != nil {
return fmt.Errorf("error adding watch for file %s: %v", c.filename, err)
}
// Trigger a check in case the file is updated before the watch starts.
c.queue.Add(workItemKey)
for {
select {
case e := <-w.Events:
if err := c.handleWatchEvent(e, w); err != nil {
return err
}
case err := <-w.Errors:
return fmt.Errorf("received fsnotify error: %v", err)
case <-stopCh:
return nil
}
}
}
// handleWatchEvent triggers reloading the CA file, and restarts a new watch if it's a Remove or Rename event.
func (c *DynamicFileCAContent) handleWatchEvent(e fsnotify.Event, w *fsnotify.Watcher) error {
// This should be executed after restarting the watch (if applicable) to ensure no file event will be missing.
defer c.queue.Add(workItemKey)
if e.Op&(fsnotify.Remove|fsnotify.Rename) == 0 {
return nil
}
if err := w.Remove(c.filename); err != nil {
klog.InfoS("Failed to remove file watch, it may have been deleted", "file", c.filename, "err", err)
}
if err := w.Add(c.filename); err != nil {
return fmt.Errorf("error adding watch for file %s: %v", c.filename, err)
}
return nil
}
func (c *DynamicFileCAContent) runWorker() {
for c.processNextWorkItem() {
}

View File

@ -23,6 +23,8 @@ import (
"sync/atomic"
"time"
"github.com/fsnotify/fsnotify"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
@ -38,7 +40,7 @@ type DynamicCertKeyPairContent struct {
// keyFile is the name of the key file to read.
keyFile string
// servingCert is a certKeyContent that contains the last read, non-zero length content of the key and cert
// certKeyPair is a certKeyContent that contains the last read, non-zero length content of the key and cert
certKeyPair atomic.Value
listeners []Listener
@ -75,7 +77,7 @@ func (c *DynamicCertKeyPairContent) AddListener(listener Listener) {
c.listeners = append(c.listeners, listener)
}
// loadServingCert determines the next set of content for the file.
// loadCertKeyPair determines the next set of content for the file.
func (c *DynamicCertKeyPairContent) loadCertKeyPair() error {
cert, err := ioutil.ReadFile(c.certFile)
if err != nil {
@ -132,17 +134,68 @@ func (c *DynamicCertKeyPairContent) Run(workers int, stopCh <-chan struct{}) {
// doesn't matter what workers say, only start one.
go wait.Until(c.runWorker, time.Second, stopCh)
// start timer that rechecks every minute, just in case. this also serves to prime the controller quickly.
go wait.PollImmediateUntil(FileRefreshDuration, func() (bool, error) {
c.queue.Add(workItemKey)
return false, nil
}, stopCh)
// TODO this can be wired to an fsnotifier as well.
// start the loop that watches the cert and key files until stopCh is closed.
go wait.Until(func() {
if err := c.watchCertKeyFile(stopCh); err != nil {
klog.ErrorS(err, "Failed to watch cert and key file, will retry later")
}
}, time.Minute, stopCh)
<-stopCh
}
func (c *DynamicCertKeyPairContent) watchCertKeyFile(stopCh <-chan struct{}) error {
// Trigger a check here to ensure the content will be checked periodically even if the following watch fails.
c.queue.Add(workItemKey)
w, err := fsnotify.NewWatcher()
if err != nil {
return fmt.Errorf("error creating fsnotify watcher: %v", err)
}
defer w.Close()
if err := w.Add(c.certFile); err != nil {
return fmt.Errorf("error adding watch for file %s: %v", c.certFile, err)
}
if err := w.Add(c.keyFile); err != nil {
return fmt.Errorf("error adding watch for file %s: %v", c.keyFile, err)
}
// Trigger a check in case the file is updated before the watch starts.
c.queue.Add(workItemKey)
for {
select {
case e := <-w.Events:
if err := c.handleWatchEvent(e, w); err != nil {
return err
}
case err := <-w.Errors:
return fmt.Errorf("received fsnotify error: %v", err)
case <-stopCh:
return nil
}
}
}
// handleWatchEvent triggers reloading the cert and key file, and restarts a new watch if it's a Remove or Rename event.
// If one file is updated before the other, the loadCertKeyPair method will catch the mismatch and will not apply the
// change. When an event of the other file is received, it will trigger reloading the files again and the new content
// will be loaded and used.
func (c *DynamicCertKeyPairContent) handleWatchEvent(e fsnotify.Event, w *fsnotify.Watcher) error {
// This should be executed after restarting the watch (if applicable) to ensure no file event will be missing.
defer c.queue.Add(workItemKey)
if e.Op&(fsnotify.Remove|fsnotify.Rename) == 0 {
return nil
}
if err := w.Remove(e.Name); err != nil {
klog.InfoS("Failed to remove file watch, it may have been deleted", "file", e.Name, "err", err)
}
if err := w.Add(e.Name); err != nil {
return fmt.Errorf("error adding watch for file %s: %v", e.Name, err)
}
return nil
}
func (c *DynamicCertKeyPairContent) runWorker() {
for c.processNextWorkItem() {
}