diff --git a/.github/workflows/e2e-multi-network-provider.yaml b/.github/workflows/e2e-multi-network-provider.yaml new file mode 100644 index 0000000..bbabd9d --- /dev/null +++ b/.github/workflows/e2e-multi-network-provider.yaml @@ -0,0 +1,85 @@ +name: E2E-Multiple-NetworkProvider + +on: + push: + branches: + - master + - release-* + pull_request: {} + workflow_dispatch: {} + +env: + # Common versions + GO_VERSION: '1.17' + KIND_IMAGE: 'kindest/node:v1.23.3' + KIND_CLUSTER_NAME: 'ci-testing' + +jobs: + + rollout: + runs-on: ubuntu-20.04 + steps: + - uses: actions/checkout@v2 + with: + submodules: true + - name: Setup Go + uses: actions/setup-go@v2 + with: + go-version: ${{ env.GO_VERSION }} + - name: Setup Kind Cluster + uses: helm/kind-action@v1.2.0 + with: + node_image: ${{ env.KIND_IMAGE }} + cluster_name: ${{ env.KIND_CLUSTER_NAME }} + config: ./test/kind-conf.yaml + - name: Build image + run: | + export IMAGE="openkruise/kruise-rollout:e2e-${GITHUB_RUN_ID}" + docker build --pull --no-cache . -t $IMAGE + kind load docker-image --name=${KIND_CLUSTER_NAME} $IMAGE || { echo >&2 "kind not installed or error loading image: $IMAGE"; exit 1; } + - name: Install Kruise Rollout + run: | + set -ex + kubectl cluster-info + IMG=openkruise/kruise-rollout:e2e-${GITHUB_RUN_ID} ./scripts/deploy_kind.sh + for ((i=1;i<10;i++)); + do + set +e + PODS=$(kubectl get pod -n kruise-rollout | grep '1/1' | wc -l) + set -e + if [ "$PODS" -eq "1" ]; then + break + fi + sleep 3 + done + set +e + PODS=$(kubectl get pod -n kruise-rollout | grep '1/1' | wc -l) + kubectl get node -o yaml + kubectl get all -n kruise-rollout -o yaml + set -e + if [ "$PODS" -eq "1" ]; then + echo "Wait for kruise-rollout ready successfully" + else + echo "Timeout to wait for kruise-rollout ready" + exit 1 + fi + - name: Run E2E Tests + run: | + export KUBECONFIG=/home/runner/.kube/config + kubectl apply -f ./test/e2e/test_data/customNetworkProvider/istio_crd.yaml + kubectl apply -f ./test/e2e/test_data/customNetworkProvider/lua_script_configmap.yaml + make ginkgo + set +e + ./bin/ginkgo -timeout 60m -v --focus='Canary rollout with multiple network providers' test/e2e + retVal=$? + # kubectl get pod -n kruise-rollout --no-headers | grep manager | awk '{print $1}' | xargs kubectl logs -n kruise-rollout + restartCount=$(kubectl get pod -n kruise-rollout --no-headers | awk '{print $4}') + if [ "${restartCount}" -eq "0" ];then + echo "Kruise-rollout has not restarted" + else + kubectl get pod -n kruise-rollout --no-headers + echo "Kruise-rollout has restarted, abort!!!" + kubectl get pod -n kruise-rollout --no-headers| awk '{print $1}' | xargs kubectl logs -p -n kruise-rollout + exit 1 + fi + exit $retVal diff --git a/lua_configuration/trafficrouting_ingress/nginx.lua b/lua_configuration/trafficrouting_ingress/nginx.lua index 16ec4c6..5f0b5d8 100644 --- a/lua_configuration/trafficrouting_ingress/nginx.lua +++ b/lua_configuration/trafficrouting_ingress/nginx.lua @@ -26,19 +26,21 @@ end -- headers & cookie apis -- traverse matches for _,match in ipairs(obj.matches) do - local header = match.headers[1] - -- cookie - if ( header.name == "canary-by-cookie" ) - then - annotations["nginx.ingress.kubernetes.io/canary-by-cookie"] = header.value - else - annotations["nginx.ingress.kubernetes.io/canary-by-header"] = header.name - -- if regular expression - if ( header.type == "RegularExpression" ) + if match.headers and next(match.headers) ~= nil then + local header = match.headers[1] + -- cookie + if ( header.name == "canary-by-cookie" ) then - annotations["nginx.ingress.kubernetes.io/canary-by-header-pattern"] = header.value + annotations["nginx.ingress.kubernetes.io/canary-by-cookie"] = header.value else - annotations["nginx.ingress.kubernetes.io/canary-by-header-value"] = header.value + annotations["nginx.ingress.kubernetes.io/canary-by-header"] = header.name + -- if regular expression + if ( header.type == "RegularExpression" ) + then + annotations["nginx.ingress.kubernetes.io/canary-by-header-pattern"] = header.value + else + annotations["nginx.ingress.kubernetes.io/canary-by-header-value"] = header.value + end end end end diff --git a/pkg/trafficrouting/manager.go b/pkg/trafficrouting/manager.go index b7a3edb..113f643 100644 --- a/pkg/trafficrouting/manager.go +++ b/pkg/trafficrouting/manager.go @@ -336,8 +336,9 @@ func (m *Manager) PatchStableService(c *TrafficRoutingContext) (bool, error) { func newNetworkProvider(c client.Client, con *TrafficRoutingContext, sService, cService string) (network.NetworkProvider, error) { trafficRouting := con.ObjectRef[0] + networkProviders := make([]network.NetworkProvider, 0, 3) if trafficRouting.CustomNetworkRefs != nil { - return custom.NewCustomController(c, custom.Config{ + np, innerErr := custom.NewCustomController(c, custom.Config{ Key: con.Key, RolloutNs: con.Namespace, CanaryService: cService, @@ -347,9 +348,13 @@ func newNetworkProvider(c client.Client, con *TrafficRoutingContext, sService, c //only set for CustomController, never work for Ingress and Gateway DisableGenerateCanaryService: con.DisableGenerateCanaryService, }) + if innerErr != nil { + return nil, innerErr + } + networkProviders = append(networkProviders, np) } if trafficRouting.Ingress != nil { - return ingress.NewIngressTrafficRouting(c, ingress.Config{ + np, innerErr := ingress.NewIngressTrafficRouting(c, ingress.Config{ Key: con.Key, Namespace: con.Namespace, CanaryService: cService, @@ -357,17 +362,30 @@ func newNetworkProvider(c client.Client, con *TrafficRoutingContext, sService, c TrafficConf: trafficRouting.Ingress, OwnerRef: con.OwnerRef, }) + if innerErr != nil { + return nil, innerErr + } + networkProviders = append(networkProviders, np) } if trafficRouting.Gateway != nil { - return gateway.NewGatewayTrafficRouting(c, gateway.Config{ + np, innerErr := gateway.NewGatewayTrafficRouting(c, gateway.Config{ Key: con.Key, Namespace: con.Namespace, CanaryService: cService, StableService: sService, TrafficConf: trafficRouting.Gateway, }) + if innerErr != nil { + return nil, innerErr + } + networkProviders = append(networkProviders, np) } - return nil, fmt.Errorf("TrafficRouting current only support Ingress or Gateway API") + if len(networkProviders) == 0 { + return nil, fmt.Errorf("TrafficRouting current only supports Ingress, Gateway API and CustomNetworkRefs") + } else if len(networkProviders) == 1 { + return networkProviders[0], nil + } + return network.CompositeController(networkProviders), nil } func (m *Manager) createCanaryService(c *TrafficRoutingContext, cService string, spec corev1.ServiceSpec) (*corev1.Service, error) { diff --git a/pkg/trafficrouting/network/composite.go b/pkg/trafficrouting/network/composite.go new file mode 100644 index 0000000..357e0c6 --- /dev/null +++ b/pkg/trafficrouting/network/composite.go @@ -0,0 +1,66 @@ +/* +Copyright 2024 The Kruise Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package network + +import ( + "context" + + "k8s.io/apimachinery/pkg/util/validation/field" + + "github.com/openkruise/rollouts/api/v1beta1" +) + +var ( + _ NetworkProvider = (CompositeController)(nil) +) + +// CompositeController is a set of NetworkProvider +type CompositeController []NetworkProvider + +func (c CompositeController) Initialize(ctx context.Context) error { + for _, provider := range c { + if err := provider.Initialize(ctx); err != nil { + return err + } + } + return nil +} + +func (c CompositeController) EnsureRoutes(ctx context.Context, strategy *v1beta1.TrafficRoutingStrategy) (bool, error) { + done := true + for _, provider := range c { + if innerDone, innerErr := provider.EnsureRoutes(ctx, strategy); innerErr != nil { + return false, innerErr + } else if !innerDone { + done = false + } + } + return done, nil +} + +func (c CompositeController) Finalise(ctx context.Context) (bool, error) { + modified := false + errList := field.ErrorList{} + for _, provider := range c { + if updated, innerErr := provider.Finalise(ctx); innerErr != nil { + errList = append(errList, field.InternalError(field.NewPath("FinalizeChildNetworkProvider"), innerErr)) + } else if updated { + modified = true + } + } + return modified, errList.ToAggregate() +} diff --git a/pkg/trafficrouting/network/interface.go b/pkg/trafficrouting/network/interface.go index 28acdf8..0910a79 100644 --- a/pkg/trafficrouting/network/interface.go +++ b/pkg/trafficrouting/network/interface.go @@ -27,12 +27,11 @@ type NetworkProvider interface { // Initialize only determine if the network resources(ingress & gateway api) exist. // If error is nil, then the network resources exist. Initialize(ctx context.Context) error - // EnsureRoutes check and set canary weight and matches. - // weight indicates percentage of traffic to canary service, and range of values[0,100] - // matches indicates A/B Testing release for headers, cookies - // 1. check if canary has been set desired weight. - // 2. If not, set canary desired weight - // When the first set weight is returned false, mainly to give the provider some time to process, only when again ensure, will return true + // EnsureRoutes check and set routes, e.g. canary weight and match conditions. + // 1. Canary weight specifies the relative proportion of traffic to be forwarded to the canary service within the range of [0,100] + // 2. Match conditions indicates rules to be satisfied for A/B testing scenarios, such as header, cookie, queryParams etc. + // Return true if and only if the route resources have been correctly updated and does not change in this round of reconciliation. + // Otherwise, return false to wait for the eventual consistency. EnsureRoutes(ctx context.Context, strategy *v1beta1.TrafficRoutingStrategy) (bool, error) // Finalise will do some cleanup work after the canary rollout complete, such as delete canary ingress. // if error is nil, the return bool value means if the resources are modified diff --git a/test/e2e/rollout_v1beta1_test.go b/test/e2e/rollout_v1beta1_test.go index 0ee7065..606c282 100644 --- a/test/e2e/rollout_v1beta1_test.go +++ b/test/e2e/rollout_v1beta1_test.go @@ -34,6 +34,7 @@ import ( netv1 "k8s.io/api/networking/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/util/retry" @@ -3982,6 +3983,133 @@ var _ = SIGDescribe("Rollout v1beta1", func() { }) }) + KruiseDescribe("Canary rollout with multiple network providers", func() { + It("V1->V2: Route traffic with header/queryParams/path matches and weight using rollout for Istio and Ingress", func() { + By("Creating Rollout...") + rollout := &v1beta1.Rollout{} + Expect(ReadYamlToObject("./test_data/customNetworkProvider/rollout_with_multi_trafficrouting.yaml", rollout)).ToNot(HaveOccurred()) + CreateObject(rollout) + + By("Creating workload and waiting for all pods ready...") + // service + service := &v1.Service{} + Expect(ReadYamlToObject("./test_data/rollout/service.yaml", service)).ToNot(HaveOccurred()) + CreateObject(service) + // istio api + vs := &unstructured.Unstructured{} + Expect(ReadYamlToObject("./test_data/customNetworkProvider/virtualservice_without_destinationrule.yaml", vs)).ToNot(HaveOccurred()) + vs.SetAPIVersion("networking.istio.io/v1alpha3") + vs.SetKind("VirtualService") + CreateObject(vs) + // ingress + ingress := &netv1.Ingress{} + Expect(ReadYamlToObject("./test_data/rollout/nginx_ingress.yaml", ingress)).ToNot(HaveOccurred()) + CreateObject(ingress) + // workload + workload := &apps.Deployment{} + Expect(ReadYamlToObject("./test_data/rollout/deployment.yaml", workload)).ToNot(HaveOccurred()) + workload.Spec.Replicas = utilpointer.Int32(4) + CreateObject(workload) + WaitDeploymentAllPodsReady(workload) + + // check rollout status + Expect(GetObject(rollout.Name, rollout)).NotTo(HaveOccurred()) + Expect(rollout.Status.Phase).Should(Equal(v1beta1.RolloutPhaseHealthy)) + By("check rollout status & paused success") + + // v1 -> v2, start rollout action + newEnvs := mergeEnvVar(workload.Spec.Template.Spec.Containers[0].Env, v1.EnvVar{Name: "NODE_NAME", Value: "version2"}) + workload.Spec.Template.Spec.Containers[0].Env = newEnvs + UpdateDeployment(workload) + By("Update deployment env NODE_NAME from(version1) -> to(version2), routing traffic with header agent:pc to new version pods") + time.Sleep(time.Second * 2) + + Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred()) + Expect(workload.Spec.Paused).Should(BeTrue()) + // wait step 1 complete + WaitRolloutCanaryStepPaused(rollout.Name, 1) + // check rollout status + Expect(GetObject(rollout.Name, rollout)).NotTo(HaveOccurred()) + Expect(rollout.Status.CanaryStatus.CanaryReplicas).Should(BeNumerically("==", 1)) + Expect(rollout.Status.CanaryStatus.CanaryReadyReplicas).Should(BeNumerically("==", 1)) + // check virtualservice spec + Expect(GetObject(vs.GetName(), vs)).NotTo(HaveOccurred()) + expectedSpec := `{"gateways":["nginx-gateway"],"hosts":["*"],"http":[{"match":[{"uri":{"prefix":"/pc"}}],"route":[{"destination":{"host":"echoserver-canary"}}]},{"match":[{"queryParams":{"user-agent":{"exact":"pc"}}}],"route":[{"destination":{"host":"echoserver-canary"}}]},{"match":[{"headers":{"user-agent":{"exact":"pc"}}}],"route":[{"destination":{"host":"echoserver-canary"}}]},{"route":[{"destination":{"host":"echoserver"}}]}]}` + Expect(util.DumpJSON(vs.Object["spec"])).Should(Equal(expectedSpec)) + // check original spec annotation + expectedAnno := `{"spec":{"gateways":["nginx-gateway"],"hosts":["*"],"http":[{"route":[{"destination":{"host":"echoserver"}}]}]}}` + Expect(vs.GetAnnotations()[OriginalSpecAnnotation]).Should(Equal(expectedAnno)) + // check canary-ingress spec + cIngress := &netv1.Ingress{} + Expect(GetObject(service.Name+"-canary", cIngress)).NotTo(HaveOccurred()) + Expect(cIngress.Annotations[fmt.Sprintf("%s/canary", nginxIngressAnnotationDefaultPrefix)]).Should(Equal("true")) + Expect(cIngress.Annotations[fmt.Sprintf("%s/canary-weight", nginxIngressAnnotationDefaultPrefix)]).Should(BeEmpty()) + Expect(cIngress.Annotations[fmt.Sprintf("%s/canary-by-header", nginxIngressAnnotationDefaultPrefix)]).Should(Equal("user-agent")) + Expect(cIngress.Annotations[fmt.Sprintf("%s/canary-by-header-value", nginxIngressAnnotationDefaultPrefix)]).Should(Equal("pc")) + + // resume rollout canary + ResumeRolloutCanary(rollout.Name) + By("Resume rollout, and wait next step(2), routing 50% traffic to new version pods") + WaitRolloutCanaryStepPaused(rollout.Name, 2) + // check rollout status + Expect(GetObject(rollout.Name, rollout)).NotTo(HaveOccurred()) + Expect(rollout.Status.CanaryStatus.CanaryReplicas).Should(BeNumerically("==", 2)) + Expect(rollout.Status.CanaryStatus.CanaryReadyReplicas).Should(BeNumerically("==", 2)) + // check virtualservice spec + Expect(GetObject(vs.GetName(), vs)).NotTo(HaveOccurred()) + expectedSpec = `{"gateways":["nginx-gateway"],"hosts":["*"],"http":[{"route":[{"destination":{"host":"echoserver"},"weight":50},{"destination":{"host":"echoserver-canary"},"weight":50}]}]}` + Expect(util.DumpJSON(vs.Object["spec"])).Should(Equal(expectedSpec)) + // check original spec annotation + Expect(vs.GetAnnotations()[OriginalSpecAnnotation]).Should(Equal(expectedAnno)) + // check canary-ingress spec + Expect(GetObject(service.Name+"-canary", cIngress)).NotTo(HaveOccurred()) + Expect(cIngress.Annotations[fmt.Sprintf("%s/canary", nginxIngressAnnotationDefaultPrefix)]).Should(Equal("true")) + Expect(cIngress.Annotations[fmt.Sprintf("%s/canary-weight", nginxIngressAnnotationDefaultPrefix)]).Should(Equal(removePercentageSign(*rollout.Spec.Strategy.Canary.Steps[1].Traffic))) + Expect(cIngress.Annotations[fmt.Sprintf("%s/canary-by-header", nginxIngressAnnotationDefaultPrefix)]).Should(BeEmpty()) + Expect(cIngress.Annotations[fmt.Sprintf("%s/canary-by-header-value", nginxIngressAnnotationDefaultPrefix)]).Should(BeEmpty()) + + // resume rollout + ResumeRolloutCanary(rollout.Name) + WaitRolloutStatusPhase(rollout.Name, v1beta1.RolloutPhaseHealthy) + By("rollout completed, and check") + // check ingress & service & virtualservice & deployment + + // ingress + Expect(GetObject(ingress.Name, ingress)).NotTo(HaveOccurred()) + cIngress = &netv1.Ingress{} + Expect(GetObject(fmt.Sprintf("%s-canary", ingress.Name), cIngress)).To(HaveOccurred()) + // virtualservice + Expect(GetObject(vs.GetName(), vs)).NotTo(HaveOccurred()) + expectedSpec = `{"gateways":["nginx-gateway"],"hosts":["*"],"http":[{"route":[{"destination":{"host":"echoserver"}}]}]}` + Expect(util.DumpJSON(vs.Object["spec"])).Should(Equal(expectedSpec)) + Expect(vs.GetAnnotations()[OriginalSpecAnnotation]).Should(Equal("")) + // service + Expect(GetObject(service.Name, service)).NotTo(HaveOccurred()) + Expect(service.Spec.Selector[apps.DefaultDeploymentUniqueLabelKey]).Should(Equal("")) + cService := &v1.Service{} + Expect(GetObject(fmt.Sprintf("%s-canary", service.Name), cService)).To(HaveOccurred()) + // deployment + Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred()) + Expect(workload.Spec.Paused).Should(BeFalse()) + Expect(workload.Status.UpdatedReplicas).Should(BeNumerically("==", *workload.Spec.Replicas)) + Expect(workload.Status.Replicas).Should(BeNumerically("==", *workload.Spec.Replicas)) + Expect(workload.Status.ReadyReplicas).Should(BeNumerically("==", *workload.Spec.Replicas)) + for _, env := range workload.Spec.Template.Spec.Containers[0].Env { + if env.Name == "NODE_NAME" { + Expect(env.Value).Should(Equal("version2")) + } + } + // check progressing succeed + Expect(GetObject(rollout.Name, rollout)).NotTo(HaveOccurred()) + cond := getRolloutConditionV1beta1(rollout.Status, v1beta1.RolloutConditionProgressing) + Expect(cond.Reason).Should(Equal(v1beta1.ProgressingReasonCompleted)) + Expect(string(cond.Status)).Should(Equal(string(metav1.ConditionFalse))) + cond = getRolloutConditionV1beta1(rollout.Status, v1beta1.RolloutConditionSucceeded) + Expect(string(cond.Status)).Should(Equal(string(metav1.ConditionTrue))) + Expect(GetObject(workload.Name, workload)).NotTo(HaveOccurred()) + WaitRolloutWorkloadGeneration(rollout.Name, workload.Generation) + }) + }) }) func removePercentageSign(input string) string { diff --git a/test/e2e/test_data/customNetworkProvider/rollout_with_multi_trafficrouting.yaml b/test/e2e/test_data/customNetworkProvider/rollout_with_multi_trafficrouting.yaml new file mode 100644 index 0000000..5fa5c71 --- /dev/null +++ b/test/e2e/test_data/customNetworkProvider/rollout_with_multi_trafficrouting.yaml @@ -0,0 +1,37 @@ + apiVersion: rollouts.kruise.io/v1beta1 + kind: Rollout + metadata: + name: rollouts-demo + spec: + disabled: false + workloadRef: + apiVersion: apps/v1 + kind: Deployment + name: echoserver + strategy: + canary: + enableExtraWorkloadForCanary: true + steps: + - replicas: 1 + matches: + - headers: + - type: Exact + name: user-agent + value: pc + - queryParams: + - type: Exact + name: user-agent + value: pc + - path: + value: /pc + - replicas: "50%" + traffic: "50%" + trafficRoutings: + - service: echoserver + ingress: + classType: nginx + name: echoserver + customNetworkRefs: + - apiVersion: networking.istio.io/v1alpha3 + kind: VirtualService + name: vs-demo diff --git a/test/e2e/test_data/rollout/deployment.yaml b/test/e2e/test_data/rollout/deployment.yaml index 68e1fea..34f8706 100644 --- a/test/e2e/test_data/rollout/deployment.yaml +++ b/test/e2e/test_data/rollout/deployment.yaml @@ -22,6 +22,8 @@ spec: containers: - name: echoserver image: cilium/echoserver:latest + # For ARM-based env + # image: jmalloc/echo-server:latest # imagePullPolicy: IfNotPresent ports: - containerPort: 8080