mirror of https://github.com/knative/pkg.git
Functions common to HA tests (#1259)
* Functions common to HA tests * Fix format errors * More robust GetEndpoints function * Do not use listOptions in getting endpoints * re-use more existing code * Fix format * Rename GetLeader to WaitForNewLeader * Fix fmt * Simplify WaitForPodDeleted * Remove span from WaitForChangedEndpoints
This commit is contained in:
parent
db40bcad49
commit
e80bcb7e66
|
|
@ -0,0 +1,45 @@
|
|||
/*
|
||||
Copyright 2020 The Knative Authors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package ha
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"knative.dev/pkg/test"
|
||||
"knative.dev/pkg/test/logging"
|
||||
)
|
||||
|
||||
// WaitForNewLeader waits until the holder of the given lease is different from the previousLeader.
|
||||
func WaitForNewLeader(client *test.KubeClient, lease, namespace, previousLeader string) (string, error) {
|
||||
span := logging.GetEmitableSpan(context.Background(), "WaitForNewLeader/"+lease)
|
||||
defer span.End()
|
||||
var leader string
|
||||
err := wait.PollImmediate(time.Second, time.Minute, func() (bool, error) {
|
||||
lease, err := client.Kube.CoordinationV1().Leases(namespace).Get(lease, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("error getting lease %s: %w", lease, err)
|
||||
}
|
||||
leader = strings.Split(*lease.Spec.HolderIdentity, "_")[0]
|
||||
return leader != previousLeader, nil
|
||||
})
|
||||
return leader, err
|
||||
}
|
||||
|
|
@ -25,8 +25,10 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
appsv1 "k8s.io/api/apps/v1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
apierrs "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
k8styped "k8s.io/client-go/kubernetes/typed/core/v1"
|
||||
|
|
@ -93,6 +95,19 @@ func WaitForPodState(client *KubeClient, inState func(p *corev1.Pod) (bool, erro
|
|||
})
|
||||
}
|
||||
|
||||
// WaitForPodDeleted waits for the given pod to disappear from the given namespace.
|
||||
func WaitForPodDeleted(client *KubeClient, name, namespace string) error {
|
||||
if err := WaitForPodState(client, func(p *corev1.Pod) (bool, error) {
|
||||
// Always return false. We're oly interested in the error which indicates pod deletion or timeout.
|
||||
return false, nil
|
||||
}, name, namespace); err != nil {
|
||||
if !apierrs.IsNotFound(err) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// WaitForServiceHasAtLeastOneEndpoint polls the status of the specified Service
|
||||
// from client every interval until number of service endpoints = numOfEndpoints
|
||||
func WaitForServiceEndpoints(client *KubeClient, svcName string, svcNamespace string, numOfEndpoints int) error {
|
||||
|
|
@ -121,6 +136,29 @@ func countEndpointsNum(e *corev1.Endpoints) int {
|
|||
return num
|
||||
}
|
||||
|
||||
// GetEndpointAddresses returns addresses of endpoints for the given service.
|
||||
func GetEndpointAddresses(client *KubeClient, svcName, svcNamespace string) ([]string, error) {
|
||||
endpoints, err := client.Kube.CoreV1().Endpoints(svcNamespace).Get(svcName, metav1.GetOptions{})
|
||||
if err != nil || countEndpointsNum(endpoints) == 0 {
|
||||
return nil, fmt.Errorf("no endpoints or error: %w", err)
|
||||
}
|
||||
var hosts []string
|
||||
for _, sub := range endpoints.Subsets {
|
||||
for _, addr := range sub.Addresses {
|
||||
hosts = append(hosts, addr.IP)
|
||||
}
|
||||
}
|
||||
return hosts, nil
|
||||
}
|
||||
|
||||
// WaitForChangedEndpoints waits until the endpoints for the given service differ from origEndpoints.
|
||||
func WaitForChangedEndpoints(client *KubeClient, svcName, svcNamespace string, origEndpoints []string) error {
|
||||
return wait.PollImmediate(1*time.Second, 2*time.Minute, func() (bool, error) {
|
||||
newEndpoints, err := GetEndpointAddresses(client, svcName, svcNamespace)
|
||||
return !cmp.Equal(origEndpoints, newEndpoints), err
|
||||
})
|
||||
}
|
||||
|
||||
// GetConfigMap gets the configmaps for a given namespace
|
||||
func GetConfigMap(client *KubeClient, namespace string) k8styped.ConfigMapInterface {
|
||||
return client.Kube.CoreV1().ConfigMaps(namespace)
|
||||
|
|
@ -176,3 +214,17 @@ func PodsRunning(podList *corev1.PodList) (bool, error) {
|
|||
func PodRunning(pod *corev1.Pod) bool {
|
||||
return pod.Status.Phase == corev1.PodRunning || pod.Status.Phase == corev1.PodSucceeded
|
||||
}
|
||||
|
||||
// WaitForDeploymentScale waits until the given deployment has the expected scale.
|
||||
func WaitForDeploymentScale(client *KubeClient, name, namespace string, scale int) error {
|
||||
return WaitForDeploymentState(
|
||||
client,
|
||||
name,
|
||||
func(d *appsv1.Deployment) (bool, error) {
|
||||
return d.Status.ReadyReplicas == int32(scale), nil
|
||||
},
|
||||
"DeploymentIsScaled",
|
||||
namespace,
|
||||
time.Minute,
|
||||
)
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue