Cluster-autoscaler: update code for 1.6 k8s sync

This commit is contained in:
Marcin Wielgus 2017-03-02 14:15:30 +01:00
parent 4cab3d70bf
commit 72a47dc2b2
27 changed files with 111 additions and 118 deletions

View File

@ -21,7 +21,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
kube_client "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
kube_record "k8s.io/kubernetes/pkg/client/record"
kube_record "k8s.io/client-go/tools/record"
)
// ConfigFetcher fetches the up-to-date dynamic configuration from the apiserver

View File

@ -23,9 +23,9 @@ import (
"strconv"
"k8s.io/apimachinery/pkg/runtime/schema"
kube_rest "k8s.io/kubernetes/pkg/client/restclient"
kube_client_cmd "k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
kube_client_cmd_api "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api"
kube_rest "k8s.io/client-go/rest"
kube_client_cmd "k8s.io/client-go/tools/clientcmd"
kube_client_cmd_api "k8s.io/client-go/tools/clientcmd/api"
)
// This code was borrowed from Heapster to push the work forward and contains some functionality

View File

@ -20,8 +20,7 @@ import (
"time"
kube_client "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
kube_record "k8s.io/kubernetes/pkg/client/record"
kube_record "k8s.io/client-go/tools/record"
"k8s.io/contrib/cluster-autoscaler/config/dynamic"
"k8s.io/contrib/cluster-autoscaler/simulator"
kube_util "k8s.io/contrib/cluster-autoscaler/utils/kubernetes"

View File

@ -18,6 +18,8 @@ package core
import (
"testing"
"fmt"
"time"
"k8s.io/contrib/cluster-autoscaler/config/dynamic"
@ -25,15 +27,11 @@ import (
"k8s.io/apimachinery/pkg/runtime"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
"k8s.io/kubernetes/pkg/client/testing/core"
core "k8s.io/client-go/testing"
. "k8s.io/contrib/cluster-autoscaler/utils/test"
"fmt"
"github.com/stretchr/testify/assert"
"k8s.io/contrib/cluster-autoscaler/simulator"
kube_util "k8s.io/contrib/cluster-autoscaler/utils/kubernetes"
"time"
)
func TestNewAutoscalerStatic(t *testing.T) {

View File

@ -17,17 +17,16 @@ limitations under the License.
package core
import (
"time"
"k8s.io/contrib/cluster-autoscaler/cloudprovider/builder"
"k8s.io/contrib/cluster-autoscaler/expander"
"k8s.io/contrib/cluster-autoscaler/simulator"
kube_client "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
kube_record "k8s.io/kubernetes/pkg/client/record"
kube_record "k8s.io/client-go/tools/record"
"k8s.io/contrib/cluster-autoscaler/cloudprovider"
"k8s.io/contrib/cluster-autoscaler/clusterstate"
"k8s.io/contrib/cluster-autoscaler/expander/factory"
"time"
)
// AutoscalingContext contains user-configurable constant and configuration-related objects passed to

View File

@ -22,10 +22,8 @@ import (
"k8s.io/contrib/cluster-autoscaler/config/dynamic"
"k8s.io/contrib/cluster-autoscaler/metrics"
kube_client "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
kube_record "k8s.io/kubernetes/pkg/client/record"
kube_record "k8s.io/client-go/tools/record"
"github.com/golang/glog"
"k8s.io/contrib/cluster-autoscaler/simulator"
kube_util "k8s.io/contrib/cluster-autoscaler/utils/kubernetes"

View File

@ -32,7 +32,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
kube_client "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
kube_record "k8s.io/kubernetes/pkg/client/record"
kube_record "k8s.io/client-go/tools/record"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
"github.com/golang/glog"
@ -344,7 +344,7 @@ func drainNode(node *apiv1.Node, pods []*apiv1.Pod, client kube_client.Interface
maxGraceful64 := int64(maxGratefulTerminationSec)
for _, pod := range pods {
recorder.Eventf(pod, apiv1.EventTypeNormal, "ScaleDown", "deleting pod for node scale down")
err := client.Core().Pods(pod.Namespace).Delete(pod.Name, &apiv1.DeleteOptions{
err := client.Core().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{
GracePeriodSeconds: &maxGraceful64,
})
if err != nil {

View File

@ -26,14 +26,13 @@ import (
"k8s.io/contrib/cluster-autoscaler/simulator"
kube_util "k8s.io/contrib/cluster-autoscaler/utils/kubernetes"
. "k8s.io/contrib/cluster-autoscaler/utils/test"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
batchv1 "k8s.io/kubernetes/pkg/apis/batch/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
"k8s.io/kubernetes/pkg/client/testing/core"
core "k8s.io/client-go/testing"
"github.com/stretchr/testify/assert"
)

View File

@ -28,12 +28,10 @@ import (
"k8s.io/contrib/cluster-autoscaler/simulator"
kube_util "k8s.io/contrib/cluster-autoscaler/utils/kubernetes"
. "k8s.io/contrib/cluster-autoscaler/utils/test"
"k8s.io/apimachinery/pkg/runtime"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
"k8s.io/kubernetes/pkg/client/testing/core"
core "k8s.io/client-go/testing"
"github.com/stretchr/testify/assert"
"k8s.io/contrib/cluster-autoscaler/estimator"
)

View File

@ -27,7 +27,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
kube_client "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
kube_record "k8s.io/kubernetes/pkg/client/record"
kube_record "k8s.io/client-go/tools/record"
"github.com/golang/glog"
"k8s.io/contrib/cluster-autoscaler/simulator"
@ -266,7 +266,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) {
// ExitCleanUp removes status configmap.
func (a *StaticAutoscaler) ExitCleanUp() {
maps := a.AutoscalingContext.ClientSet.CoreV1().ConfigMaps(StatusConfigMapNamespace)
err := maps.Delete(StatusConfigMapName, &apiv1.DeleteOptions{})
err := maps.Delete(StatusConfigMapName, &metav1.DeleteOptions{})
if err != nil {
// Nothing else we could do at this point
glog.Error("Failed to delete status configmap")

View File

@ -20,7 +20,7 @@ import (
"sort"
"k8s.io/contrib/cluster-autoscaler/simulator"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/apimachinery/pkg/api/resource"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
)

View File

@ -22,7 +22,7 @@ import (
"k8s.io/contrib/cluster-autoscaler/simulator"
. "k8s.io/contrib/cluster-autoscaler/utils/test"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/apimachinery/pkg/api/resource"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"

View File

@ -21,7 +21,7 @@ import (
"fmt"
"math"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/apimachinery/pkg/api/resource"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
)

View File

@ -19,7 +19,7 @@ package estimator
import (
"testing"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/apimachinery/pkg/api/resource"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"

View File

@ -20,7 +20,7 @@ import (
"github.com/golang/glog"
"k8s.io/contrib/cluster-autoscaler/expander"
"k8s.io/contrib/cluster-autoscaler/expander/random"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/apimachinery/pkg/api/resource"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
)

View File

@ -24,7 +24,7 @@ import (
"github.com/stretchr/testify/assert"
"k8s.io/contrib/cluster-autoscaler/expander"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/apimachinery/pkg/api/resource"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
)

View File

@ -35,14 +35,14 @@ import (
kube_client "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
kube_leaderelection "k8s.io/kubernetes/pkg/client/leaderelection"
"k8s.io/kubernetes/pkg/client/leaderelection/resourcelock"
kube_flag "k8s.io/kubernetes/pkg/util/flag"
kube_flag "k8s.io/apiserver/pkg/util/flag"
"k8s.io/contrib/cluster-autoscaler/estimator"
"k8s.io/contrib/cluster-autoscaler/metrics"
"k8s.io/contrib/cluster-autoscaler/simulator"
"github.com/golang/glog"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/pflag"
"k8s.io/contrib/cluster-autoscaler/estimator"
"k8s.io/contrib/cluster-autoscaler/metrics"
"k8s.io/contrib/cluster-autoscaler/simulator"
)
// MultiStringFlag is a flag for passing multiple parameters using same flag

View File

@ -23,7 +23,7 @@ import (
"math/rand"
"time"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/apimachinery/pkg/api/resource"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
client "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"

View File

@ -22,8 +22,9 @@ import (
"k8s.io/contrib/cluster-autoscaler/utils/drain"
api "k8s.io/kubernetes/pkg/api"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kube_client "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
)
@ -33,7 +34,7 @@ import (
func GetRequiredPodsForNode(nodename string, client kube_client.Interface) ([]*apiv1.Pod, error) {
podListResult, err := client.Core().Pods(apiv1.NamespaceAll).List(
apiv1.ListOptions{FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": nodename}).String()})
metav1.ListOptions{FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": nodename}).String()})
if err != nil {
return []*apiv1.Pod{}, err
}

View File

@ -23,7 +23,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
"k8s.io/kubernetes/pkg/client/testing/core"
core "k8s.io/client-go/testing"
"k8s.io/kubernetes/pkg/kubelet/types"
"github.com/stretchr/testify/assert"

View File

@ -26,6 +26,7 @@ import (
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/plugin/pkg/scheduler/factory"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
// We need to import provider to intialize default scheduler.
_ "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider"
@ -42,7 +43,20 @@ func NewPredicateChecker(kubeClient kube_client.Interface) (*PredicateChecker, e
if err != nil {
return nil, err
}
schedulerConfigFactory := factory.NewConfigFactory(kubeClient, "", apiv1.DefaultHardPodAffinitySymmetricWeight, apiv1.DefaultFailureDomains)
informerFactory := informers.NewSharedInformerFactory(kubeClient, 0)
schedulerConfigFactory := factory.NewConfigFactory(
"cluster-autoscaler",
kubeClient,
informerFactory.Core().V1().Nodes(),
informerFactory.Core().V1().PersistentVolumes(),
informerFactory.Core().V1().PersistentVolumeClaims(),
informerFactory.Core().V1().ReplicationControllers(),
informerFactory.Extensions().V1beta1().ReplicaSets(),
informerFactory.Core().V1().Services(),
apiv1.DefaultHardPodAffinitySymmetricWeight,
)
predicates, err := schedulerConfigFactory.GetPredicates(provider.FitPredicateKeys)
predicates["ready"] = isNodeReadyAndSchedulablePredicate
if err != nil {

View File

@ -17,7 +17,6 @@ limitations under the License.
package deletetaint
import (
"encoding/json"
"fmt"
"time"
@ -55,42 +54,23 @@ func MarkToBeDeleted(node *apiv1.Node, client kube_client.Interface) error {
}
func addToBeDeletedTaint(node *apiv1.Node) (bool, error) {
taints, err := apiv1.GetTaintsFromNodeAnnotations(node.Annotations)
if err != nil {
glog.Warningf("Error while getting Taints for node %v: %v", node.Name, err)
return false, err
}
for _, taint := range taints {
for _, taint := range node.Spec.Taints {
if taint.Key == ToBeDeletedTaint {
glog.Infof("ToBeDeletedTaint already present on on node %v", taint, node.Name)
return false, nil
}
}
taints = append(taints, apiv1.Taint{
node.Spec.Taints = append(node.Spec.Taints, apiv1.Taint{
Key: ToBeDeletedTaint,
Value: time.Now().String(),
Effect: apiv1.TaintEffectNoSchedule,
})
taintsJson, err := json.Marshal(taints)
if err != nil {
glog.Warningf("Error while adding taints on node %v: %v", node.Name, err)
return false, err
}
if node.Annotations == nil {
node.Annotations = make(map[string]string)
}
node.Annotations[apiv1.TaintsAnnotationKey] = string(taintsJson)
return true, nil
}
// HasToBeDeletedTaint returns true if ToBeDeleted taint is applied on the node.
func HasToBeDeletedTaint(node *apiv1.Node) bool {
taints, err := apiv1.GetTaintsFromNodeAnnotations(node.Annotations)
if err != nil {
glog.Warningf("Node %v has incorrect taint annotation: %v", err)
return false
}
for _, taint := range taints {
for _, taint := range node.Spec.Taints {
if taint.Key == ToBeDeletedTaint {
return true
}
@ -100,14 +80,8 @@ func HasToBeDeletedTaint(node *apiv1.Node) bool {
// CleanToBeDeleted cleans ToBeDeleted taint.
func CleanToBeDeleted(node *apiv1.Node, client kube_client.Interface) (bool, error) {
taints, err := apiv1.GetTaintsFromNodeAnnotations(node.Annotations)
if err != nil {
glog.Warningf("Error while getting Taints for node %v: %v", node.Name, err)
return false, err
}
newTaints := make([]apiv1.Taint, 0)
for _, taint := range taints {
for _, taint := range node.Spec.Taints {
if taint.Key == ToBeDeletedTaint {
glog.V(1).Infof("Releasing taint %+v on node %v", taint, node.Name)
} else {
@ -115,17 +89,9 @@ func CleanToBeDeleted(node *apiv1.Node, client kube_client.Interface) (bool, err
}
}
if len(newTaints) != len(taints) {
taintsJson, err := json.Marshal(newTaints)
if err != nil {
glog.Warningf("Error while releasing taints on node %v: %v", node.Name, err)
return false, err
}
if node.Annotations == nil {
node.Annotations = make(map[string]string)
}
node.Annotations[apiv1.TaintsAnnotationKey] = string(taintsJson)
_, err = client.Core().Nodes().Update(node)
if len(newTaints) != len(node.Spec.Taints) {
node.Spec.Taints = newTaints
_, err := client.Core().Nodes().Update(node)
if err != nil {
glog.Warningf("Error while releasing taints on node %v: %v", node.Name, err)
return false, err

View File

@ -26,7 +26,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
"k8s.io/kubernetes/pkg/client/testing/core"
core "k8s.io/client-go/testing"
"github.com/stretchr/testify/assert"
)

View File

@ -31,7 +31,7 @@ import (
batchv1 "k8s.io/kubernetes/pkg/apis/batch/v1"
extensions "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
"k8s.io/kubernetes/pkg/client/testing/core"
core "k8s.io/client-go/testing"
)
func TestDrain(t *testing.T) {

View File

@ -17,18 +17,36 @@ limitations under the License.
package kubernetes
import (
apiv1 "k8s.io/kubernetes/pkg/api/v1"
kube_client "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
v1core "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/core/v1"
kube_record "k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/api"
clientv1 "k8s.io/client-go/pkg/api/v1"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
kube_record "k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
"github.com/golang/glog"
)
// CreateEventRecorder creates an event recorder to send custom events to Kubernetes to be recorded for targeted Kubernetes objects
func CreateEventRecorder(kubeClient kube_client.Interface) kube_record.EventRecorder {
func CreateEventRecorder(kubeClient clientset.Interface) kube_record.EventRecorder {
eventBroadcaster := kube_record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.Core().Events("")})
return eventBroadcaster.NewRecorder(apiv1.EventSource{Component: "cluster-autoscaler"})
if _, isfake := kubeClient.(*fake.Clientset); !isfake {
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.Core().RESTClient()).Events("")})
}
return eventBroadcaster.NewRecorder(api.Scheme,clientv1.EventSource{Component: "cluster-autoscaler"})
}
type TestEventSink struct {}
func (fes *TestEventSink) Create(event *clientv1.Event) (*clientv1.Event, error) {
return event, nil
}
func (fes *TestEventSink) Update(event *clientv1.Event) (*clientv1.Event, error) {
return event, nil
}
func (fes *TestEventSink) Patch(event *clientv1.Event, data []byte) (*clientv1.Event, error) {
return event, nil
}

View File

@ -19,11 +19,12 @@ package kubernetes
import (
"time"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/labels"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/client-go/tools/cache"
client "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/apimachinery/pkg/fields"
v1lister "k8s.io/kubernetes/pkg/client/listers/core/v1"
)
// ListerRegistry is a registry providing various listers to list pods or nodes matching conditions
@ -82,7 +83,7 @@ func (r listerRegistryImpl) UnschedulablePodLister() *UnschedulablePodLister {
// UnschedulablePodLister lists unscheduled pods
type UnschedulablePodLister struct {
podLister *cache.StoreToPodLister
podLister v1lister.PodLister
}
// List returns all unscheduled pods.
@ -113,7 +114,7 @@ func NewUnschedulablePodInNamespaceLister(kubeClient client.Interface, namespace
string(apiv1.PodSucceeded) + ",status.phase!=" + string(apiv1.PodFailed))
podListWatch := cache.NewListWatchFromClient(kubeClient.Core().RESTClient(), "pods", namespace, selector)
store := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
podLister := &cache.StoreToPodLister{store}
podLister := v1lister.NewPodLister(store)
podReflector := cache.NewReflector(podListWatch, &apiv1.Pod{}, store, time.Hour)
podReflector.RunUntil(stopchannel)
return &UnschedulablePodLister{
@ -123,7 +124,7 @@ func NewUnschedulablePodInNamespaceLister(kubeClient client.Interface, namespace
// ScheduledPodLister lists scheduled pods.
type ScheduledPodLister struct {
podLister *cache.StoreToPodLister
podLister v1lister.PodLister
}
// List returns all scheduled pods.
@ -138,7 +139,7 @@ func NewScheduledPodLister(kubeClient client.Interface, stopchannel <-chan struc
string(apiv1.PodSucceeded) + ",status.phase!=" + string(apiv1.PodFailed))
podListWatch := cache.NewListWatchFromClient(kubeClient.Core().RESTClient(), "pods", apiv1.NamespaceAll, selector)
store := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
podLister := &cache.StoreToPodLister{store}
podLister := v1lister.NewPodLister(store)
podReflector := cache.NewReflector(podListWatch, &apiv1.Pod{}, store, time.Hour)
podReflector.RunUntil(stopchannel)
@ -149,19 +150,19 @@ func NewScheduledPodLister(kubeClient client.Interface, stopchannel <-chan struc
// ReadyNodeLister lists ready nodes.
type ReadyNodeLister struct {
nodeLister *cache.StoreToNodeLister
nodeLister v1lister.NodeLister
}
// List returns ready nodes.
func (readyNodeLister *ReadyNodeLister) List() ([]*apiv1.Node, error) {
nodes, err := readyNodeLister.nodeLister.List()
nodes, err := readyNodeLister.nodeLister.List(labels.Everything())
if err != nil {
return []*apiv1.Node{}, err
}
readyNodes := make([]*apiv1.Node, 0, len(nodes.Items))
for i, node := range nodes.Items {
if IsNodeReadyAndSchedulable(&node) {
readyNodes = append(readyNodes, &nodes.Items[i])
readyNodes := make([]*apiv1.Node, 0, len(nodes))
for _, node := range nodes {
if IsNodeReadyAndSchedulable(node) {
readyNodes = append(readyNodes, node)
break
}
}
@ -171,8 +172,9 @@ func (readyNodeLister *ReadyNodeLister) List() ([]*apiv1.Node, error) {
// NewReadyNodeLister builds a node lister.
func NewReadyNodeLister(kubeClient client.Interface, stopChannel <-chan struct{}) *ReadyNodeLister {
listWatcher := cache.NewListWatchFromClient(kubeClient.Core().RESTClient(), "nodes", apiv1.NamespaceAll, fields.Everything())
nodeLister := &cache.StoreToNodeLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}
reflector := cache.NewReflector(listWatcher, &apiv1.Node{}, nodeLister.Store, time.Hour)
store := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
nodeLister := v1lister.NewNodeLister(store)
reflector := cache.NewReflector(listWatcher, &apiv1.Node{}, store, time.Hour)
reflector.RunUntil(stopChannel)
return &ReadyNodeLister{
nodeLister: nodeLister,
@ -181,18 +183,18 @@ func NewReadyNodeLister(kubeClient client.Interface, stopChannel <-chan struct{}
// AllNodeLister lists all nodes
type AllNodeLister struct {
nodeLister *cache.StoreToNodeLister
nodeLister v1lister.NodeLister
}
// List returns all nodes
func (allNodeLister *AllNodeLister) List() ([]*apiv1.Node, error) {
nodes, err := allNodeLister.nodeLister.List()
nodes, err := allNodeLister.nodeLister.List(labels.Everything())
if err != nil {
return []*apiv1.Node{}, err
}
allNodes := make([]*apiv1.Node, 0, len(nodes.Items))
for i := range nodes.Items {
allNodes = append(allNodes, &nodes.Items[i])
allNodes := make([]*apiv1.Node, 0, len(nodes))
for _,node := range nodes {
allNodes = append(allNodes, node)
}
return allNodes, nil
}
@ -200,8 +202,9 @@ func (allNodeLister *AllNodeLister) List() ([]*apiv1.Node, error) {
// NewAllNodeLister builds a node lister that returns all nodes (ready and unready)
func NewAllNodeLister(kubeClient client.Interface, stopchannel <-chan struct{}) *AllNodeLister {
listWatcher := cache.NewListWatchFromClient(kubeClient.Core().RESTClient(), "nodes", apiv1.NamespaceAll, fields.Everything())
nodeLister := &cache.StoreToNodeLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}
reflector := cache.NewReflector(listWatcher, &apiv1.Node{}, nodeLister.Store, time.Hour)
store := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
nodeLister := v1lister.NewNodeLister(store)
reflector := cache.NewReflector(listWatcher, &apiv1.Node{}, store, time.Hour)
reflector.RunUntil(stopchannel)
return &AllNodeLister{
nodeLister: nodeLister,

View File

@ -21,9 +21,9 @@ import (
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/api/resource"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/kubernetes/pkg/api"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/kubernetes/pkg/api/testapi"
)
@ -111,7 +111,7 @@ func SetNodeReadyState(node *apiv1.Node, ready bool, lastTransition time.Time) {
// RefJSON builds string reference to
func RefJSON(o runtime.Object) string {
ref, err := apiv1.GetReference(o)
ref, err := apiv1.GetReference(api.Scheme, o)
if err != nil {
panic(err)
}