Fix downgrade 1.15 to 1.13 scenario with 0 scheduler pods

Signed-off-by: Anton Troshin <anton@diagrid.io>
This commit is contained in:
Anton Troshin 2025-02-27 11:44:47 -06:00
parent 68e5a09e2d
commit 4e1bede9a4
No known key found for this signature in database
GPG Key ID: 9F8A96ACA9EB6363
2 changed files with 27 additions and 13 deletions

View File

@ -216,7 +216,11 @@ func Upgrade(conf UpgradeConfig) error {
// wait for the deletion of the scheduler pods to finish
if downgradeDeletionChan != nil {
<-downgradeDeletionChan
select {
case <-downgradeDeletionChan:
case <-time.After(3 * time.Minute):
return errors.New("timed out waiting for downgrade deletion")
}
}
if dashboardChart != nil {
@ -242,11 +246,6 @@ func Upgrade(conf UpgradeConfig) error {
}
func deleteSchedulerPods(namespace string, currentVersion *semver.Version, targetVersion *semver.Version) error {
_, client, err := GetKubeConfigClient()
if err != nil {
return err
}
ctxWithTimeout, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()
@ -259,13 +258,22 @@ func deleteSchedulerPods(namespace string, currentVersion *semver.Version, targe
if foundTargetVersion {
break
}
pods, err = client.CoreV1().Pods(namespace).List(ctxWithTimeout, meta_v1.ListOptions{
LabelSelector: "app=dapr-scheduler-server",
})
k8sClient, err := Client()
if err != nil {
return err
}
pods, err = k8sClient.CoreV1().Pods(namespace).List(ctxWithTimeout, meta_v1.ListOptions{
LabelSelector: "app=dapr-scheduler-server",
})
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
return err
}
if len(pods.Items) == 0 {
return nil
}
for _, pod := range pods.Items {
pv, ok := pod.Labels["app.kubernetes.io/version"]
if ok {
@ -276,19 +284,25 @@ func deleteSchedulerPods(namespace string, currentVersion *semver.Version, targe
}
}
}
time.Sleep(time.Second)
time.Sleep(5 * time.Second)
}
if pods == nil {
return errors.New("no scheduler pods found")
}
// get a fresh client to ensure we have the latest state of the cluster
k8sClient, err := Client()
if err != nil {
return err
}
// delete scheduler pods of the current version, i.e. >1.15.0
for _, pod := range pods.Items {
if pv, ok := pod.Labels["app.kubernetes.io/version"]; ok {
podVersion, err := semver.NewVersion(pv)
if err == nil && podVersion.Equal(currentVersion) {
err = client.CoreV1().Pods(namespace).Delete(ctxWithTimeout, pod.Name, meta_v1.DeleteOptions{})
err = k8sClient.CoreV1().Pods(namespace).Delete(ctxWithTimeout, pod.Name, meta_v1.DeleteOptions{})
if err != nil {
return fmt.Errorf("failed to delete pod %s during downgrade: %w", pod.Name, err)
}

View File

@ -1193,7 +1193,7 @@ func waitPodDeletion(t *testing.T, done, podsDeleted chan struct{}) {
if len(list.Items) == 0 {
podsDeleted <- struct{}{}
}
time.Sleep(15 * time.Second)
time.Sleep(5 * time.Second)
}
}
@ -1239,7 +1239,7 @@ func waitAllPodsRunning(t *testing.T, namespace string, haEnabled bool, done, po
podsRunning <- struct{}{}
}
time.Sleep(15 * time.Second)
time.Sleep(5 * time.Second)
}
}