Add Refresh method to cloud provider
This can be used to dynamically update cloud provider config (in particular list of managed NodeGroups and their min/max constraints). Add GKE implementation.
This commit is contained in:
parent
2f661682e2
commit
07511f444a
|
|
@ -165,6 +165,12 @@ func (aws *awsCloudProvider) GetResourceLimiter() (*cloudprovider.ResourceLimite
|
|||
return aws.resourceLimiter, nil
|
||||
}
|
||||
|
||||
// Refresh is called before every main loop and can be used to dynamically update cloud provider state.
|
||||
// In particular the list of node groups returned by NodeGroups can change as a result of CloudProvider.Refresh().
|
||||
func (aws *awsCloudProvider) Refresh() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// AwsRef contains a reference to some entity in AWS/GKE world.
|
||||
type AwsRef struct {
|
||||
Name string
|
||||
|
|
|
|||
|
|
@ -56,6 +56,10 @@ type CloudProvider interface {
|
|||
|
||||
// GetResourceLimiter returns struct containing limits (max, min) for resources (cores, memory etc.).
|
||||
GetResourceLimiter() (*ResourceLimiter, error)
|
||||
|
||||
// Refresh is called before every main loop and can be used to dynamically update cloud provider state.
|
||||
// In particular the list of node groups returned by NodeGroups can change as a result of CloudProvider.Refresh().
|
||||
Refresh() error
|
||||
}
|
||||
|
||||
// ErrNotImplemented is returned if a method is not implemented.
|
||||
|
|
|
|||
|
|
@ -162,6 +162,12 @@ func (gce *GceCloudProvider) GetResourceLimiter() (*cloudprovider.ResourceLimite
|
|||
return gce.resourceLimiter, nil
|
||||
}
|
||||
|
||||
// Refresh is called before every main loop and can be used to dynamically update cloud provider state.
|
||||
// In particular the list of node groups returned by NodeGroups can change as a result of CloudProvider.Refresh().
|
||||
func (gce *GceCloudProvider) Refresh() error {
|
||||
return gce.gceManager.Refresh()
|
||||
}
|
||||
|
||||
// GceRef contains s reference to some entity in GCE/GKE world.
|
||||
type GceRef struct {
|
||||
Project string
|
||||
|
|
|
|||
|
|
@ -72,6 +72,11 @@ func (m *gceManagerMock) GetMigNodes(mig *Mig) ([]string, error) {
|
|||
return args.Get(0).([]string), args.Error(1)
|
||||
}
|
||||
|
||||
func (m *gceManagerMock) Refresh() error {
|
||||
args := m.Called()
|
||||
return args.Error(0)
|
||||
}
|
||||
|
||||
func (m *gceManagerMock) getMigs() []*migInformation {
|
||||
args := m.Called()
|
||||
return args.Get(0).([]*migInformation)
|
||||
|
|
|
|||
|
|
@ -57,6 +57,7 @@ const (
|
|||
operationWaitTimeout = 5 * time.Second
|
||||
gkeOperationWaitTimeout = 120 * time.Second
|
||||
operationPollInterval = 100 * time.Millisecond
|
||||
refreshInterval = 1 * time.Minute
|
||||
nodeAutoprovisioningPrefix = "nap"
|
||||
napMaxNodes = 1000
|
||||
napMinNodes = 0
|
||||
|
|
@ -91,6 +92,8 @@ type GceManager interface {
|
|||
GetMigForInstance(instance *GceRef) (*Mig, error)
|
||||
// GetMigNodes returns mig nodes.
|
||||
GetMigNodes(mig *Mig) ([]string, error)
|
||||
// Refresh updates config by calling GKE API (in GKE mode only).
|
||||
Refresh() error
|
||||
getMigs() []*migInformation
|
||||
createNodePool(mig *Mig) error
|
||||
deleteNodePool(toBeRemoved *Mig) error
|
||||
|
|
@ -117,6 +120,8 @@ type gceManagerImpl struct {
|
|||
clusterName string
|
||||
mode GcpCloudProviderMode
|
||||
templates *templateBuilder
|
||||
|
||||
lastRefresh time.Time
|
||||
}
|
||||
|
||||
// CreateGceManager constructs gceManager object.
|
||||
|
|
@ -215,6 +220,8 @@ func CreateGceManager(configReader io.Reader, mode GcpCloudProviderMode, cluster
|
|||
glog.V(1).Info("Using GKE-NAP mode")
|
||||
}
|
||||
|
||||
manager.lastRefresh = time.Now()
|
||||
|
||||
go wait.Forever(func() {
|
||||
manager.cacheMutex.Lock()
|
||||
defer manager.cacheMutex.Unlock()
|
||||
|
|
@ -691,6 +698,19 @@ func (m *gceManagerImpl) getTemplates() *templateBuilder {
|
|||
return m.templates
|
||||
}
|
||||
|
||||
func (m *gceManagerImpl) Refresh() error {
|
||||
if m.mode == ModeGCE {
|
||||
return nil
|
||||
}
|
||||
if m.lastRefresh.Add(refreshInterval).Before(time.Now()) {
|
||||
err := m.fetchAllNodePools()
|
||||
m.lastRefresh = time.Now()
|
||||
glog.V(2).Infof("Refreshed NodePools list, next refresh after %v", m.lastRefresh.Add(refreshInterval))
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Code borrowed from gce cloud provider. Reuse the original as soon as it becomes public.
|
||||
func getProjectAndZone() (string, string, error) {
|
||||
result, err := metadata.Get("instance/zone")
|
||||
|
|
|
|||
|
|
@ -122,6 +122,12 @@ func (kubemark *KubemarkCloudProvider) GetResourceLimiter() (*cloudprovider.Reso
|
|||
return kubemark.resourceLimiter, nil
|
||||
}
|
||||
|
||||
// Refresh is called before every main loop and can be used to dynamically update cloud provider state.
|
||||
// In particular the list of node groups returned by NodeGroups can change as a result of CloudProvider.Refresh().
|
||||
func (kubemark *KubemarkCloudProvider) Refresh() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// NodeGroup implements NodeGroup interfrace.
|
||||
type NodeGroup struct {
|
||||
Name string
|
||||
|
|
|
|||
|
|
@ -64,3 +64,9 @@ func (kubemark *KubemarkCloudProvider) NewNodeGroup(machineType string, labels m
|
|||
func (kubemark *KubemarkCloudProvider) GetResourceLimiter() (*cloudprovider.ResourceLimiter, error) {
|
||||
return nil, cloudprovider.ErrNotImplemented
|
||||
}
|
||||
|
||||
// Refresh is called before every main loop and can be used to dynamically update cloud provider state.
|
||||
// In particular the list of node groups returned by NodeGroups can change as a result of CloudProvider.Refresh().
|
||||
func (kubemark *KubemarkCloudProvider) Refresh() error {
|
||||
return cloudprovider.ErrNotImplemented
|
||||
}
|
||||
|
|
|
|||
|
|
@ -192,6 +192,12 @@ func (tcp *TestCloudProvider) SetResourceLimiter(resourceLimiter *cloudprovider.
|
|||
tcp.resourceLimiter = resourceLimiter
|
||||
}
|
||||
|
||||
// Refresh is called before every main loop and can be used to dynamically update cloud provider state.
|
||||
// In particular the list of node groups returned by NodeGroups can change as a result of CloudProvider.Refresh().
|
||||
func (tcp *TestCloudProvider) Refresh() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// TestNodeGroup is a node group used by TestCloudProvider.
|
||||
type TestNodeGroup struct {
|
||||
sync.Mutex
|
||||
|
|
|
|||
|
|
@ -95,6 +95,12 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) errors.AutoscalerError
|
|||
|
||||
glog.V(4).Info("Starting main loop")
|
||||
|
||||
err := autoscalingContext.CloudProvider.Refresh()
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to refresh cloud provider config: %v", err)
|
||||
return errors.ToAutoscalerError(errors.CloudProviderError, err)
|
||||
}
|
||||
|
||||
readyNodes, err := readyNodeLister.List()
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to list ready nodes: %v", err)
|
||||
|
|
|
|||
Loading…
Reference in New Issue