Compare commits
33 Commits
v0.7.0-alp
...
main
Author | SHA1 | Date |
---|---|---|
|
5c0be848e5 | |
|
f3d186c9f2 | |
|
69ba4b7029 | |
|
c4bff1c14a | |
|
61ce8f3d21 | |
|
d8165ff698 | |
|
f5d475e441 | |
|
4e2217c262 | |
|
0d62680e96 | |
|
6f0aa4eb4d | |
|
a4a3750f0a | |
|
4178e9b766 | |
|
ca2caf011d | |
|
d519399fb1 | |
|
38fba26bf7 | |
|
38032a6521 | |
|
d4fab7fb30 | |
|
1207b03315 | |
|
99858934ec | |
|
94be1308ae | |
|
443849bafe | |
|
ba0491030e | |
|
171962b0ea | |
|
06ef5035a0 | |
|
2beda5f509 | |
|
959830923f | |
|
a8e1c62656 | |
|
8317059fb7 | |
|
5dccbc473d | |
|
2d49bb66a0 | |
|
e13d1a0b51 | |
|
d78aeaa3cc | |
|
537bdd0a8e |
|
@ -14,24 +14,24 @@ concurrency:
|
|||
jobs:
|
||||
golangci:
|
||||
name: lint
|
||||
runs-on: ubuntu-20.04
|
||||
runs-on: ubuntu-22.04
|
||||
steps:
|
||||
- name: checkout code
|
||||
uses: actions/checkout@v3
|
||||
- name: install Go
|
||||
uses: actions/setup-go@v3
|
||||
with:
|
||||
go-version: 1.19.5
|
||||
go-version: 1.21.10
|
||||
- name: lint
|
||||
run: hack/verify-staticcheck.sh
|
||||
build:
|
||||
runs-on: ubuntu-latest
|
||||
runs-on: ubuntu-22.04
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- name: Set up Go
|
||||
uses: actions/setup-go@v3
|
||||
with:
|
||||
go-version: 1.19.5
|
||||
go-version: 1.21.10
|
||||
- name: Build
|
||||
run: go build -v ./...
|
||||
- name: Test
|
||||
|
|
141
.golangci.yml
141
.golangci.yml
|
@ -1,23 +1,11 @@
|
|||
# This files contains all configuration options for analysis running.
|
||||
# More details please refer to: https://golangci-lint.run/usage/configuration/
|
||||
|
||||
version: "2"
|
||||
run:
|
||||
# timeout for analysis, e.g. 30s, 5m, default is 1m
|
||||
# timeout for analysis, e.g. 30s, 5m, default timeout is disabled
|
||||
timeout: 10m
|
||||
|
||||
# which dirs to skip: issues from them won't be reported;
|
||||
# can use regexp here: generated.*, regexp is applied on full path;
|
||||
# default value is empty list, but default dirs are skipped independently
|
||||
# from this option's value (see skip-dirs-use-default).
|
||||
# "/" will be replaced by current OS file path separator to properly work
|
||||
# on Windows.
|
||||
skip-dirs:
|
||||
- (^|/)vendor($|/)
|
||||
|
||||
# default is true. Enables skipping of directories:
|
||||
# vendor$, third_party$, testdata$, examples$, Godeps$, builtin$
|
||||
skip-dirs-use-default: false
|
||||
|
||||
# One of 'readonly' and 'vendor'.
|
||||
# - readonly: the go command is disallowed from the implicit automatic updating of go.mod described above.
|
||||
# Instead, it fails when any changes to go.mod are needed. This setting is most useful to check
|
||||
|
@ -27,42 +15,89 @@ run:
|
|||
modules-download-mode: readonly
|
||||
linters:
|
||||
enable:
|
||||
# linters maintained by golang.org
|
||||
- gofmt
|
||||
- goimports
|
||||
- govet
|
||||
# linters default enabled by golangci-lint .
|
||||
- errcheck
|
||||
- gosimple
|
||||
- ineffassign
|
||||
- staticcheck
|
||||
- typecheck
|
||||
- unused
|
||||
# other linters supported by golangci-lint.
|
||||
- gci
|
||||
- gocyclo
|
||||
- gosec
|
||||
- misspell
|
||||
- whitespace
|
||||
- revive
|
||||
|
||||
linters-settings:
|
||||
goimports:
|
||||
local-prefixes: github.com/karmada-io/multicluster-cloud-provider
|
||||
gocyclo:
|
||||
# minimal cyclomatic complexity to report
|
||||
min-complexity: 15
|
||||
gci:
|
||||
sections:
|
||||
- Standard
|
||||
- Default
|
||||
- Prefix(github.com/karmada-io/multicluster-cloud-provider)
|
||||
|
||||
issues:
|
||||
# The list of ids of default excludes to include or disable. By default it's empty.
|
||||
include:
|
||||
# disable excluding of issues about comments from revive
|
||||
# see https://golangci-lint.run/usage/configuration/#command-line-options for more info
|
||||
- EXC0012
|
||||
- EXC0013
|
||||
- EXC0014
|
||||
- depguard
|
||||
- gocyclo
|
||||
- gosec
|
||||
- misspell
|
||||
- revive
|
||||
- whitespace
|
||||
settings:
|
||||
depguard:
|
||||
rules:
|
||||
main:
|
||||
deny:
|
||||
- pkg: io/ioutil
|
||||
desc: 'replaced by io and os packages since Go 1.16: https://tip.golang.org/doc/go1.16#ioutil'
|
||||
gocyclo:
|
||||
# minimal cyclomatic complexity to report
|
||||
min-complexity: 15
|
||||
revive:
|
||||
rules:
|
||||
# Disable if-return as it is too strict and not always useful.
|
||||
# https://github.com/mgechev/revive/blob/master/RULES_DESCRIPTIONS.md#if-return
|
||||
- name: if-return
|
||||
disabled: true
|
||||
# Disable package-comments for now since most packages in this project are primarily for internal use.
|
||||
# If we decide to provide public packages in the future, we can move them to a separate
|
||||
# repository and revisit adding package-level comments at that time.
|
||||
- name: package-comments
|
||||
disabled: true
|
||||
- name: superfluous-else
|
||||
arguments:
|
||||
- preserveScope
|
||||
- name: error-strings
|
||||
- name: error-return
|
||||
- name: receiver-naming
|
||||
- name: increment-decrement
|
||||
- name: range
|
||||
- name: error-naming
|
||||
- name: dot-imports
|
||||
- name: errorf
|
||||
- name: exported
|
||||
- name: var-declaration
|
||||
- name: blank-imports
|
||||
- name: indent-error-flow
|
||||
- name: unreachable-code
|
||||
- name: var-naming
|
||||
- name: redefines-builtin-id
|
||||
- name: unused-parameter
|
||||
- name: context-as-argument
|
||||
- name: context-keys-type
|
||||
- name: unexported-return
|
||||
- name: time-naming
|
||||
- name: empty-block
|
||||
staticcheck:
|
||||
checks:
|
||||
- all
|
||||
# Disable QF1008 to retain embedded fields for better readability.
|
||||
- "-QF1008"
|
||||
# Disable ST1000 (staticcheck) for now since most packages in this project are primarily for internal use.
|
||||
# If we decide to provide public packages in the future, we can move them to a separate
|
||||
# repository and revisit adding package-level comments at that time.
|
||||
- "-ST1000"
|
||||
exclusions:
|
||||
generated: lax
|
||||
presets:
|
||||
- common-false-positives
|
||||
- legacy
|
||||
- std-error-handling
|
||||
paths:
|
||||
- (^|/)vendor($|/)
|
||||
formatters:
|
||||
enable:
|
||||
- gci
|
||||
- gofmt
|
||||
- goimports
|
||||
settings:
|
||||
gci:
|
||||
sections:
|
||||
- Standard
|
||||
- Default
|
||||
- Prefix(github.com/karmada-io/multicluster-cloud-provider)
|
||||
goimports:
|
||||
local-prefixes:
|
||||
- github.com/karmada-io/multicluster-cloud-provider
|
||||
exclusions:
|
||||
generated: lax
|
||||
paths:
|
||||
- (^|/)vendor($|/)
|
||||
|
|
|
@ -0,0 +1,3 @@
|
|||
# Karmada Community Code of Conduct
|
||||
|
||||
Please refer to our [Karmada Community Code of Conduct](https://github.com/karmada-io/community/blob/main/CODE_OF_CONDUCT.md).
|
|
@ -0,0 +1,8 @@
|
|||
reviewers:
|
||||
- chaunceyjiang
|
||||
- RainbowMango
|
||||
- XiShanYongYe-Chang
|
||||
approvers:
|
||||
- chaunceyjiang
|
||||
- RainbowMango
|
||||
- XiShanYongYe-Chang
|
|
@ -5,6 +5,7 @@ import (
|
|||
|
||||
controllerscontext "github.com/karmada-io/multicluster-cloud-provider/pkg/controllers/context"
|
||||
"github.com/karmada-io/multicluster-cloud-provider/pkg/controllers/crdinstallation"
|
||||
"github.com/karmada-io/multicluster-cloud-provider/pkg/controllers/mciservicelocations"
|
||||
"github.com/karmada-io/multicluster-cloud-provider/pkg/controllers/multiclusteringress"
|
||||
"github.com/karmada-io/multicluster-cloud-provider/pkg/controllers/multiclusterservice"
|
||||
"github.com/karmada-io/multicluster-cloud-provider/pkg/controllers/serviceexportpropagation"
|
||||
|
@ -79,3 +80,14 @@ func startServiceExportPropagationController(ctx controllerscontext.Context) (en
|
|||
}
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func startMCIServiceLocationsController(ctx controllerscontext.Context) (enabled bool, err error) {
|
||||
c := &mciservicelocations.Controller{
|
||||
Client: ctx.Mgr.GetClient(),
|
||||
RateLimiterOptions: ctx.Opts.RateLimiterOptions,
|
||||
}
|
||||
if err = c.SetupWithManager(ctx.Mgr); err != nil {
|
||||
return false, err
|
||||
}
|
||||
return true, nil
|
||||
}
|
|
@ -17,6 +17,7 @@ import (
|
|||
"github.com/karmada-io/karmada/pkg/version/sharedcommand"
|
||||
"github.com/spf13/cobra"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/rest"
|
||||
cliflag "k8s.io/component-base/cli/flag"
|
||||
"k8s.io/component-base/term"
|
||||
"k8s.io/klog/v2"
|
||||
|
@ -24,6 +25,7 @@ import (
|
|||
"sigs.k8s.io/controller-runtime/pkg/cache"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/healthz"
|
||||
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
|
||||
|
||||
multiclusterprovider "github.com/karmada-io/multicluster-cloud-provider"
|
||||
"github.com/karmada-io/multicluster-cloud-provider/options"
|
||||
|
@ -38,6 +40,7 @@ func init() {
|
|||
controllers["multiclusterservice"] = startMCSController
|
||||
controllers["crd-installation"] = startCRDInstallationController
|
||||
controllers["serviceexport-propagation"] = startServiceExportPropagationController
|
||||
controllers["mci-service-locations"] = startMCIServiceLocationsController
|
||||
}
|
||||
|
||||
// InitProviderFunc is used to initialize multicluster provider
|
||||
|
@ -53,7 +56,7 @@ func NewControllerManagerCommand(ctx context.Context,
|
|||
Use: "multicluster-controller-manager",
|
||||
Long: `The MultiCluster controller manager is a daemon that embeds
|
||||
the cloud specific control loops shipped with Karmada.`,
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
RunE: func(_ *cobra.Command, _ []string) error {
|
||||
// validate options
|
||||
if errs := opts.Validate(); len(errs) != 0 {
|
||||
return errs.ToAggregate()
|
||||
|
@ -104,7 +107,7 @@ func Run(ctx context.Context, opts *options.MultiClusterControllerManagerOptions
|
|||
controllerManager, err := controllerruntime.NewManager(config, controllerruntime.Options{
|
||||
Logger: klog.Background(),
|
||||
Scheme: gclient.NewSchema(),
|
||||
SyncPeriod: &opts.ResyncPeriod.Duration,
|
||||
Cache: cache.Options{SyncPeriod: &opts.ResyncPeriod.Duration},
|
||||
LeaderElection: opts.LeaderElection.LeaderElect,
|
||||
LeaderElectionID: opts.LeaderElection.ResourceName,
|
||||
LeaderElectionNamespace: opts.LeaderElection.ResourceNamespace,
|
||||
|
@ -114,14 +117,15 @@ func Run(ctx context.Context, opts *options.MultiClusterControllerManagerOptions
|
|||
LeaderElectionResourceLock: opts.LeaderElection.ResourceLock,
|
||||
HealthProbeBindAddress: net.JoinHostPort(opts.BindAddress, strconv.Itoa(opts.SecurePort)),
|
||||
LivenessEndpointName: "/healthz",
|
||||
MetricsBindAddress: opts.MetricsBindAddress,
|
||||
Metrics: metricsserver.Options{BindAddress: opts.MetricsBindAddress},
|
||||
MapperProvider: restmapper.MapperProvider,
|
||||
BaseContext: func() context.Context {
|
||||
return ctx
|
||||
},
|
||||
NewCache: cache.BuilderWithOptions(cache.Options{
|
||||
DefaultTransform: fedinformer.StripUnusedFields,
|
||||
}),
|
||||
NewCache: func(config *rest.Config, opts cache.Options) (cache.Cache, error) {
|
||||
opts.DefaultTransform = fedinformer.StripUnusedFields
|
||||
return cache.New(config, opts)
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to build controller manager: %v", err)
|
||||
|
@ -150,7 +154,7 @@ func setupControllers(ctx context.Context, mgr controllerruntime.Manager, cloudP
|
|||
restConfig := mgr.GetConfig()
|
||||
dynamicClientSet := dynamic.NewForConfigOrDie(restConfig)
|
||||
|
||||
controlPlaneInformerManager := genericmanager.NewSingleClusterInformerManager(dynamicClientSet, 0, ctx.Done())
|
||||
controlPlaneInformerManager := genericmanager.NewSingleClusterInformerManager(ctx, dynamicClientSet, 0)
|
||||
|
||||
setupIndexesForMCI(ctx, mgr.GetFieldIndexer())
|
||||
|
||||
|
|
|
@ -25,7 +25,7 @@ spec:
|
|||
imagePullPolicy: IfNotPresent
|
||||
command:
|
||||
- /bin/multicluster-provider-fake
|
||||
- --kubeconfig=/etc/kubeconfig
|
||||
- --kubeconfig=/etc/kubeconfig/karmada.config
|
||||
- --bind-address=0.0.0.0
|
||||
- --secure-port=10368
|
||||
- --multicluster-provider=fake
|
||||
|
@ -41,10 +41,10 @@ spec:
|
|||
periodSeconds: 15
|
||||
timeoutSeconds: 5
|
||||
volumeMounts:
|
||||
- name: kubeconfig
|
||||
subPath: kubeconfig
|
||||
- name: karmada-config
|
||||
mountPath: /etc/kubeconfig
|
||||
readOnly: true
|
||||
volumes:
|
||||
- name: kubeconfig
|
||||
- name: karmada-config
|
||||
secret:
|
||||
secretName: kubeconfig
|
||||
secretName: multicluster-provider-config
|
||||
|
|
|
@ -1,9 +1,7 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"k8s.io/component-base/cli"
|
||||
cliflag "k8s.io/component-base/cli/flag"
|
||||
|
@ -19,8 +17,6 @@ import (
|
|||
)
|
||||
|
||||
func main() {
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
|
||||
ctx := controllerruntime.SetupSignalHandler()
|
||||
opts := options.NewClusterControllerManagerOptions()
|
||||
fss := cliflag.NamedFlagSets{}
|
||||
|
|
16
fake/fake.go
16
fake/fake.go
|
@ -68,7 +68,7 @@ func (p *Provider) LoadBalancer() (multiclusterprovider.LoadBalancer, bool) {
|
|||
}
|
||||
|
||||
// GetLoadBalancer is a stub implementation of LoadBalancer.GetLoadBalancer.
|
||||
func (p *Provider) GetLoadBalancer(ctx context.Context, mci *networkingv1alpha1.MultiClusterIngress) (*networkingv1.IngressLoadBalancerStatus, bool, error) {
|
||||
func (p *Provider) GetLoadBalancer(_ context.Context, mci *networkingv1alpha1.MultiClusterIngress) (*networkingv1.IngressLoadBalancerStatus, bool, error) {
|
||||
p.addCall("get")
|
||||
namespacedName := types.NamespacedName{Namespace: mci.Namespace, Name: mci.Name}.String()
|
||||
for name := range p.Balancers {
|
||||
|
@ -83,7 +83,7 @@ func (p *Provider) GetLoadBalancer(ctx context.Context, mci *networkingv1alpha1.
|
|||
}
|
||||
|
||||
// EnsureLoadBalancer is a stub implementation of LoadBalancer.EnsureLoadBalancer.
|
||||
func (p *Provider) EnsureLoadBalancer(ctx context.Context, mci *networkingv1alpha1.MultiClusterIngress) (status *networkingv1.IngressLoadBalancerStatus, err error) {
|
||||
func (p *Provider) EnsureLoadBalancer(_ context.Context, mci *networkingv1alpha1.MultiClusterIngress) (status *networkingv1.IngressLoadBalancerStatus, err error) {
|
||||
p.addCall("create")
|
||||
if p.Balancers == nil {
|
||||
p.Balancers = make(map[string]Balancer)
|
||||
|
@ -113,7 +113,7 @@ func (p *Provider) EnsureLoadBalancer(ctx context.Context, mci *networkingv1alph
|
|||
}
|
||||
|
||||
// UpdateLoadBalancer is a stub implementation of LoadBalancer.UpdateLoadBalancer.
|
||||
func (p *Provider) UpdateLoadBalancer(ctx context.Context, mci *networkingv1alpha1.MultiClusterIngress) (status *networkingv1.IngressLoadBalancerStatus, err error) {
|
||||
func (p *Provider) UpdateLoadBalancer(_ context.Context, mci *networkingv1alpha1.MultiClusterIngress) (status *networkingv1.IngressLoadBalancerStatus, err error) {
|
||||
p.addCall("update")
|
||||
namespacedName := types.NamespacedName{Namespace: mci.Namespace, Name: mci.Name}.String()
|
||||
lb, exist := p.Balancers[namespacedName]
|
||||
|
@ -135,7 +135,7 @@ func (p *Provider) UpdateLoadBalancer(ctx context.Context, mci *networkingv1alph
|
|||
}
|
||||
|
||||
// EnsureLoadBalancerDeleted is a stub implementation of LoadBalancer.EnsureLoadBalancerDeleted.
|
||||
func (p *Provider) EnsureLoadBalancerDeleted(ctx context.Context, mci *networkingv1alpha1.MultiClusterIngress) error {
|
||||
func (p *Provider) EnsureLoadBalancerDeleted(_ context.Context, mci *networkingv1alpha1.MultiClusterIngress) error {
|
||||
p.addCall("delete")
|
||||
namespacedName := types.NamespacedName{Namespace: mci.Namespace, Name: mci.Name}.String()
|
||||
delete(p.Balancers, namespacedName)
|
||||
|
@ -149,7 +149,7 @@ func (p *Provider) MCSLoadBalancer() (multiclusterprovider.MCSLoadBalancer, bool
|
|||
}
|
||||
|
||||
// GetMCSLoadBalancer is a stub implementation of LoadBalancer.GetMCSLoadBalancer.
|
||||
func (p *Provider) GetMCSLoadBalancer(ctx context.Context, mcs *networkingv1alpha1.MultiClusterService) (status *corev1.LoadBalancerStatus, exist bool, err error) {
|
||||
func (p *Provider) GetMCSLoadBalancer(_ context.Context, mcs *networkingv1alpha1.MultiClusterService) (status *corev1.LoadBalancerStatus, exist bool, err error) {
|
||||
p.addCall("get")
|
||||
namespacedName := types.NamespacedName{Namespace: mcs.Namespace, Name: mcs.Name}.String()
|
||||
for name := range p.MCSBalancers {
|
||||
|
@ -164,7 +164,7 @@ func (p *Provider) GetMCSLoadBalancer(ctx context.Context, mcs *networkingv1alph
|
|||
}
|
||||
|
||||
// EnsureMCSLoadBalancer is a stub implementation of LoadBalancer.EnsureMCSLoadBalancer.
|
||||
func (p *Provider) EnsureMCSLoadBalancer(ctx context.Context, mcs *networkingv1alpha1.MultiClusterService) (status *corev1.LoadBalancerStatus, err error) {
|
||||
func (p *Provider) EnsureMCSLoadBalancer(_ context.Context, mcs *networkingv1alpha1.MultiClusterService) (status *corev1.LoadBalancerStatus, err error) {
|
||||
p.addCall("create")
|
||||
if p.MCSBalancers == nil {
|
||||
p.MCSBalancers = make(map[string]MCSBalancer)
|
||||
|
@ -192,7 +192,7 @@ func (p *Provider) EnsureMCSLoadBalancer(ctx context.Context, mcs *networkingv1a
|
|||
}
|
||||
|
||||
// UpdateMCSLoadBalancer is a stub implementation of LoadBalancer.UpdateMCSLoadBalancer.
|
||||
func (p *Provider) UpdateMCSLoadBalancer(ctx context.Context, mcs *networkingv1alpha1.MultiClusterService) (status *corev1.LoadBalancerStatus, err error) {
|
||||
func (p *Provider) UpdateMCSLoadBalancer(_ context.Context, mcs *networkingv1alpha1.MultiClusterService) (status *corev1.LoadBalancerStatus, err error) {
|
||||
p.addCall("update")
|
||||
namespacedName := types.NamespacedName{Namespace: mcs.Namespace, Name: mcs.Name}.String()
|
||||
lb, exist := p.MCSBalancers[namespacedName]
|
||||
|
@ -212,7 +212,7 @@ func (p *Provider) UpdateMCSLoadBalancer(ctx context.Context, mcs *networkingv1a
|
|||
}
|
||||
|
||||
// EnsureMCSLoadBalancerDeleted is a stub implementation of LoadBalancer.EnsureMCSLoadBalancerDeleted.
|
||||
func (p *Provider) EnsureMCSLoadBalancerDeleted(ctx context.Context, mcs *networkingv1alpha1.MultiClusterService) error {
|
||||
func (p *Provider) EnsureMCSLoadBalancerDeleted(_ context.Context, mcs *networkingv1alpha1.MultiClusterService) error {
|
||||
p.addCall("delete")
|
||||
namespacedName := types.NamespacedName{Namespace: mcs.Namespace, Name: mcs.Name}.String()
|
||||
delete(p.MCSBalancers, namespacedName)
|
||||
|
|
|
@ -7,7 +7,7 @@ import (
|
|||
)
|
||||
|
||||
func init() {
|
||||
multiclusterprovider.RegisterMultiClusterProvider(defaultProviderName, func(config io.Reader) (multiclusterprovider.Interface, error) {
|
||||
multiclusterprovider.RegisterMultiClusterProvider(defaultProviderName, func(_ io.Reader) (multiclusterprovider.Interface, error) {
|
||||
return &Provider{Name: defaultProviderName}, nil
|
||||
})
|
||||
}
|
||||
|
|
129
go.mod
129
go.mod
|
@ -1,89 +1,90 @@
|
|||
module github.com/karmada-io/multicluster-cloud-provider
|
||||
|
||||
go 1.19
|
||||
go 1.23.8
|
||||
|
||||
require (
|
||||
github.com/karmada-io/karmada v1.7.0-alpha.2
|
||||
github.com/karmada-io/karmada v1.14.0
|
||||
github.com/pkg/errors v0.9.1
|
||||
github.com/spf13/cobra v1.6.1
|
||||
github.com/spf13/pflag v1.0.5
|
||||
k8s.io/api v0.26.2
|
||||
k8s.io/apimachinery v0.26.2
|
||||
k8s.io/client-go v0.26.2
|
||||
k8s.io/component-base v0.26.2
|
||||
k8s.io/klog/v2 v2.90.1
|
||||
k8s.io/utils v0.0.0-20230220204549-a5ecb0141aa5
|
||||
sigs.k8s.io/controller-runtime v0.14.5
|
||||
github.com/spf13/cobra v1.8.1
|
||||
github.com/spf13/pflag v1.0.6
|
||||
k8s.io/api v0.32.3
|
||||
k8s.io/apimachinery v0.32.3
|
||||
k8s.io/client-go v0.32.3
|
||||
k8s.io/component-base v0.32.3
|
||||
k8s.io/klog/v2 v2.130.1
|
||||
k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738
|
||||
sigs.k8s.io/controller-runtime v0.20.4
|
||||
sigs.k8s.io/mcs-api v0.1.0
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
|
||||
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect
|
||||
github.com/MakeNowJust/heredoc v1.0.0 // indirect
|
||||
github.com/beorn7/perks v1.0.1 // indirect
|
||||
github.com/blang/semver v3.5.1+incompatible // indirect
|
||||
github.com/blang/semver/v4 v4.0.0 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.1.2 // indirect
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/emicklei/go-restful/v3 v3.10.1 // indirect
|
||||
github.com/evanphx/json-patch/v5 v5.6.0 // indirect
|
||||
github.com/fsnotify/fsnotify v1.6.0 // indirect
|
||||
github.com/go-logr/logr v1.2.3 // indirect
|
||||
github.com/go-logr/zapr v1.2.3 // indirect
|
||||
github.com/go-openapi/jsonpointer v0.19.6 // indirect
|
||||
github.com/go-openapi/jsonreference v0.20.1 // indirect
|
||||
github.com/go-openapi/swag v0.22.3 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
|
||||
github.com/emicklei/go-restful/v3 v3.12.1 // indirect
|
||||
github.com/evanphx/json-patch/v5 v5.9.11 // indirect
|
||||
github.com/fsnotify/fsnotify v1.8.0 // indirect
|
||||
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
|
||||
github.com/go-logr/logr v1.4.2 // indirect
|
||||
github.com/go-logr/zapr v1.3.0 // indirect
|
||||
github.com/go-openapi/jsonpointer v0.21.0 // indirect
|
||||
github.com/go-openapi/jsonreference v0.20.4 // indirect
|
||||
github.com/go-openapi/swag v0.23.0 // indirect
|
||||
github.com/gogo/protobuf v1.3.2 // indirect
|
||||
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
|
||||
github.com/golang/protobuf v1.5.3 // indirect
|
||||
github.com/google/gnostic v0.6.9 // indirect
|
||||
github.com/google/go-cmp v0.5.9 // indirect
|
||||
github.com/golang/protobuf v1.5.4 // indirect
|
||||
github.com/google/btree v1.1.3 // indirect
|
||||
github.com/google/gnostic-models v0.6.8 // indirect
|
||||
github.com/google/go-cmp v0.6.0 // indirect
|
||||
github.com/google/gofuzz v1.2.0 // indirect
|
||||
github.com/google/uuid v1.3.0 // indirect
|
||||
github.com/imdario/mergo v0.3.13 // indirect
|
||||
github.com/inconshreveable/mousetrap v1.0.1 // indirect
|
||||
github.com/google/uuid v1.6.0 // indirect
|
||||
github.com/gorilla/websocket v1.5.0 // indirect
|
||||
github.com/inconshreveable/mousetrap v1.1.0 // indirect
|
||||
github.com/josharian/intern v1.0.0 // indirect
|
||||
github.com/json-iterator/go v1.1.12 // indirect
|
||||
github.com/kr/pretty v0.3.0 // indirect
|
||||
github.com/klauspost/compress v1.17.9 // indirect
|
||||
github.com/kr/pretty v0.3.1 // indirect
|
||||
github.com/kr/text v0.2.0 // indirect
|
||||
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de // indirect
|
||||
github.com/mailru/easyjson v0.7.7 // indirect
|
||||
github.com/matttproud/golang_protobuf_extensions v1.0.2 // indirect
|
||||
github.com/mitchellh/go-wordwrap v1.0.0 // indirect
|
||||
github.com/moby/spdystream v0.2.0 // indirect
|
||||
github.com/moby/term v0.0.0-20220808134915-39b0c02b01ae // indirect
|
||||
github.com/mitchellh/go-wordwrap v1.0.1 // indirect
|
||||
github.com/moby/spdystream v0.5.0 // indirect
|
||||
github.com/moby/term v0.5.0 // indirect
|
||||
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
|
||||
github.com/modern-go/reflect2 v1.0.2 // indirect
|
||||
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
|
||||
github.com/prometheus/client_golang v1.14.0 // indirect
|
||||
github.com/prometheus/client_model v0.3.0 // indirect
|
||||
github.com/prometheus/common v0.37.0 // indirect
|
||||
github.com/prometheus/procfs v0.8.0 // indirect
|
||||
github.com/rogpeppe/go-internal v1.8.1 // indirect
|
||||
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f // indirect
|
||||
github.com/prometheus/client_golang v1.20.5 // indirect
|
||||
github.com/prometheus/client_model v0.6.1 // indirect
|
||||
github.com/prometheus/common v0.57.0 // indirect
|
||||
github.com/prometheus/procfs v0.15.1 // indirect
|
||||
github.com/rogpeppe/go-internal v1.12.0 // indirect
|
||||
github.com/russross/blackfriday/v2 v2.1.0 // indirect
|
||||
github.com/yuin/gopher-lua v0.0.0-20220504180219-658193537a64 // indirect
|
||||
go.uber.org/atomic v1.9.0 // indirect
|
||||
go.uber.org/multierr v1.8.0 // indirect
|
||||
go.uber.org/zap v1.24.0 // indirect
|
||||
golang.org/x/net v0.8.0 // indirect
|
||||
golang.org/x/oauth2 v0.6.0 // indirect
|
||||
golang.org/x/sys v0.6.0 // indirect
|
||||
golang.org/x/term v0.6.0 // indirect
|
||||
golang.org/x/text v0.8.0 // indirect
|
||||
golang.org/x/time v0.3.0 // indirect
|
||||
gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect
|
||||
google.golang.org/appengine v1.6.7 // indirect
|
||||
google.golang.org/genproto v0.0.0-20221227171554-f9683d7f8bef // indirect
|
||||
google.golang.org/grpc v1.52.0 // indirect
|
||||
google.golang.org/protobuf v1.28.1 // indirect
|
||||
github.com/x448/float16 v0.8.4 // indirect
|
||||
go.opentelemetry.io/otel v1.29.0 // indirect
|
||||
go.opentelemetry.io/otel/trace v1.29.0 // indirect
|
||||
go.uber.org/multierr v1.11.0 // indirect
|
||||
go.uber.org/zap v1.27.0 // indirect
|
||||
golang.org/x/net v0.39.0 // indirect
|
||||
golang.org/x/oauth2 v0.25.0 // indirect
|
||||
golang.org/x/sync v0.13.0 // indirect
|
||||
golang.org/x/sys v0.32.0 // indirect
|
||||
golang.org/x/term v0.31.0 // indirect
|
||||
golang.org/x/text v0.24.0 // indirect
|
||||
golang.org/x/time v0.8.0 // indirect
|
||||
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
|
||||
google.golang.org/protobuf v1.36.1 // indirect
|
||||
gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect
|
||||
gopkg.in/inf.v0 v0.9.1 // indirect
|
||||
gopkg.in/yaml.v2 v2.4.0 // indirect
|
||||
gopkg.in/yaml.v3 v3.0.1 // indirect
|
||||
k8s.io/apiextensions-apiserver v0.26.2 // indirect
|
||||
k8s.io/apiserver v0.26.2 // indirect
|
||||
k8s.io/kube-openapi v0.0.0-20230303024457-afdc3dddf62d // indirect
|
||||
k8s.io/kubectl v0.26.2 // indirect
|
||||
sigs.k8s.io/cluster-api v1.4.0 // indirect
|
||||
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
|
||||
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
|
||||
sigs.k8s.io/yaml v1.3.0 // indirect
|
||||
k8s.io/apiextensions-apiserver v0.32.3 // indirect
|
||||
k8s.io/cli-runtime v0.32.3 // indirect
|
||||
k8s.io/kube-openapi v0.0.0-20241105132330-32ad38e42d3f // indirect
|
||||
k8s.io/kubectl v0.32.3 // indirect
|
||||
sigs.k8s.io/cluster-api v1.7.1 // indirect
|
||||
sigs.k8s.io/json v0.0.0-20241010143419-9aa6b5e7a4b3 // indirect
|
||||
sigs.k8s.io/structured-merge-diff/v4 v4.4.2 // indirect
|
||||
sigs.k8s.io/yaml v1.4.0 // indirect
|
||||
)
|
||||
|
|
|
@ -23,9 +23,12 @@ make image-multicluster-provider-fake GOOS="linux" --directory="${REPO_ROOT}"
|
|||
# step2: load image
|
||||
kind load docker-image "${REGISTRY}/multicluster-provider-fake:${VERSION}" --name="${HOST_CLUSTER_NAME}"
|
||||
|
||||
# step3: deploy multicluster-provider-fake
|
||||
# step3: create multicluster-provider-config secret to access karmada-apiserver
|
||||
kubectl --context="${HOST_CLUSTER_NAME}" create secret generic multicluster-provider-config --from-file=karmada.config="${MAIN_KUBECONFIG}" -n "${KARMADA_SYSTEM_NAMESPACE}"
|
||||
|
||||
# step4: deploy multicluster-provider-fake
|
||||
kubectl --context="${HOST_CLUSTER_NAME}" apply -f "${REPO_ROOT}/artifacts/deploy/multicluster-provider-fake.yaml"
|
||||
util::wait_pod_ready "${HOST_CLUSTER_NAME}" multicluster-provider-fake "${KARMADA_SYSTEM_NAMESPACE}"
|
||||
|
||||
# step4: deploy ingressclass-fake
|
||||
# step5: deploy ingressclass-fake
|
||||
kubectl --context="${KARMADA_APISERVER}" apply -f "${REPO_ROOT}/artifacts/deploy/ingressclass-fake.yaml"
|
||||
|
|
|
@ -5,7 +5,7 @@ set -o nounset
|
|||
set -o pipefail
|
||||
|
||||
REPO_ROOT=$(dirname "${BASH_SOURCE[0]}")/..
|
||||
GOLANGCI_LINT_VER="v1.49.0"
|
||||
GOLANGCI_LINT_VER="v2.0.2"
|
||||
|
||||
cd "${REPO_ROOT}"
|
||||
source "hack/util.sh"
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
|
||||
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
|
||||
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/equality"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
@ -20,7 +21,6 @@ import (
|
|||
"sigs.k8s.io/controller-runtime/pkg/handler"
|
||||
"sigs.k8s.io/controller-runtime/pkg/predicate"
|
||||
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
||||
"sigs.k8s.io/controller-runtime/pkg/source"
|
||||
)
|
||||
|
||||
// ControllerName is the controller name that will be used when reporting events.
|
||||
|
@ -145,23 +145,36 @@ func clusterPropagationPolicy(clusters []string) *policyv1alpha1.ClusterPropagat
|
|||
Placement: policyv1alpha1.Placement{
|
||||
ClusterAffinity: &policyv1alpha1.ClusterAffinity{
|
||||
ClusterNames: clusters,
|
||||
}}}}
|
||||
},
|
||||
ClusterTolerations: []corev1.Toleration{
|
||||
{
|
||||
Key: clusterv1alpha1.TaintClusterNotReady,
|
||||
Operator: corev1.TolerationOpExists,
|
||||
Effect: corev1.TaintEffectNoExecute,
|
||||
},
|
||||
{
|
||||
Key: clusterv1alpha1.TaintClusterUnreachable,
|
||||
Operator: corev1.TolerationOpExists,
|
||||
Effect: corev1.TaintEffectNoExecute,
|
||||
},
|
||||
},
|
||||
}}}
|
||||
}
|
||||
|
||||
// SetupWithManager creates a controller and register to controller manager.
|
||||
func (r *Controller) SetupWithManager(_ context.Context, mgr controllerruntime.Manager) error {
|
||||
clusterFilter := predicate.Funcs{
|
||||
CreateFunc: func(event event.CreateEvent) bool { return true },
|
||||
CreateFunc: func(_ event.CreateEvent) bool { return true },
|
||||
UpdateFunc: func(updateEvent event.UpdateEvent) bool {
|
||||
return !equality.Semantic.DeepEqual(updateEvent.ObjectOld.GetDeletionTimestamp().IsZero(),
|
||||
updateEvent.ObjectNew.GetDeletionTimestamp().IsZero())
|
||||
},
|
||||
DeleteFunc: func(deleteEvent event.DeleteEvent) bool { return true },
|
||||
GenericFunc: func(genericEvent event.GenericEvent) bool { return false },
|
||||
DeleteFunc: func(_ event.DeleteEvent) bool { return true },
|
||||
GenericFunc: func(_ event.GenericEvent) bool { return false },
|
||||
}
|
||||
|
||||
cppHandlerFn := handler.MapFunc(
|
||||
func(object client.Object) []reconcile.Request {
|
||||
func(_ context.Context, _ client.Object) []reconcile.Request {
|
||||
// return a fictional cluster, triggering to reconcile to recreate the cpp.
|
||||
return []reconcile.Request{
|
||||
{NamespacedName: types.NamespacedName{Name: "no-exist-cluster"}},
|
||||
|
@ -169,21 +182,21 @@ func (r *Controller) SetupWithManager(_ context.Context, mgr controllerruntime.M
|
|||
},
|
||||
)
|
||||
cppFilter := builder.WithPredicates(predicate.Funcs{
|
||||
CreateFunc: func(event event.CreateEvent) bool { return false },
|
||||
UpdateFunc: func(updateEvent event.UpdateEvent) bool { return false },
|
||||
CreateFunc: func(_ event.CreateEvent) bool { return false },
|
||||
UpdateFunc: func(_ event.UpdateEvent) bool { return false },
|
||||
DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
|
||||
return deleteEvent.Object.GetName() == clusterPropagationPolicyName
|
||||
},
|
||||
GenericFunc: func(genericEvent event.GenericEvent) bool { return false },
|
||||
GenericFunc: func(_ event.GenericEvent) bool { return false },
|
||||
})
|
||||
|
||||
return controllerruntime.NewControllerManagedBy(mgr).
|
||||
For(&clusterv1alpha1.Cluster{}).
|
||||
Watches(&source.Kind{Type: &policyv1alpha1.ClusterPropagationPolicy{}},
|
||||
Watches(&policyv1alpha1.ClusterPropagationPolicy{},
|
||||
handler.EnqueueRequestsFromMapFunc(cppHandlerFn), cppFilter).
|
||||
WithEventFilter(clusterFilter).
|
||||
WithOptions(controller.Options{
|
||||
RateLimiter: ratelimiterflag.DefaultControllerRateLimiter(r.RateLimiterOptions),
|
||||
RateLimiter: ratelimiterflag.DefaultControllerRateLimiter[controllerruntime.Request](r.RateLimiterOptions),
|
||||
}).
|
||||
Complete(r)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,197 @@
|
|||
package mciservicelocations
|
||||
|
||||
import (
|
||||
"context"
|
||||
"reflect"
|
||||
"sort"
|
||||
|
||||
networkingv1alpha1 "github.com/karmada-io/karmada/pkg/apis/networking/v1alpha1"
|
||||
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
|
||||
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
|
||||
"github.com/karmada-io/karmada/pkg/util/names"
|
||||
networkingv1 "k8s.io/api/networking/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/client-go/util/retry"
|
||||
"k8s.io/klog/v2"
|
||||
controllerruntime "sigs.k8s.io/controller-runtime"
|
||||
"sigs.k8s.io/controller-runtime/pkg/builder"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/controller"
|
||||
"sigs.k8s.io/controller-runtime/pkg/event"
|
||||
"sigs.k8s.io/controller-runtime/pkg/handler"
|
||||
"sigs.k8s.io/controller-runtime/pkg/predicate"
|
||||
"sigs.k8s.io/controller-runtime/pkg/reconcile"
|
||||
|
||||
"github.com/karmada-io/multicluster-cloud-provider/pkg/controllers/indexes"
|
||||
)
|
||||
|
||||
// Controller is used to maintain information about the clusters in which
|
||||
// the Service backend of the MultiClusterIngress resource resides.
|
||||
type Controller struct {
|
||||
client.Client
|
||||
RateLimiterOptions ratelimiterflag.Options
|
||||
}
|
||||
|
||||
// Reconcile performs a full reconciliation for the object referred to by the Request.
|
||||
// The Controller will requeue the Request to be processed again if an error is non-nil or
|
||||
// Result.Requeue is true, otherwise upon completion it will remove the work from the queue.
|
||||
func (c *Controller) Reconcile(ctx context.Context, req controllerruntime.Request) (controllerruntime.Result, error) {
|
||||
klog.V(4).Infof("Reconciling MultiClusterIngress %s", req.NamespacedName.String())
|
||||
|
||||
mci := &networkingv1alpha1.MultiClusterIngress{}
|
||||
if err := c.Client.Get(ctx, req.NamespacedName, mci); err != nil {
|
||||
if apierrors.IsNotFound(err) {
|
||||
return controllerruntime.Result{}, nil
|
||||
}
|
||||
klog.InfoS("failed to get multiClusterIngress object", "NamespacedName", req.NamespacedName.String())
|
||||
return controllerruntime.Result{}, err
|
||||
}
|
||||
|
||||
svcLocations, err := c.calculateServiceLocations(ctx, mci)
|
||||
if err != nil {
|
||||
return controllerruntime.Result{}, err
|
||||
}
|
||||
|
||||
mci = mci.DeepCopy()
|
||||
err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
||||
if reflect.DeepEqual(svcLocations, mci.Status.ServiceLocations) {
|
||||
return nil
|
||||
}
|
||||
mci.Status.ServiceLocations = svcLocations
|
||||
updateErr := c.Client.Status().Update(ctx, mci)
|
||||
if updateErr == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
updatedMCI := &networkingv1alpha1.MultiClusterIngress{}
|
||||
err = c.Client.Get(ctx, req.NamespacedName, updatedMCI)
|
||||
if err == nil {
|
||||
mci = updatedMCI.DeepCopy()
|
||||
} else {
|
||||
klog.Errorf("Failed to get updated multiClusterIngress(%s): %v", req.NamespacedName.String(), err)
|
||||
}
|
||||
return updateErr
|
||||
})
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to sync multiClusterIngress(%s) service locations: %v", req.NamespacedName.String(), err)
|
||||
return controllerruntime.Result{}, err
|
||||
}
|
||||
klog.V(4).Infof("Success to sync multiClusterIngress(%s) service locations", req.NamespacedName.String())
|
||||
return controllerruntime.Result{}, nil
|
||||
}
|
||||
|
||||
func (c *Controller) calculateServiceLocations(ctx context.Context, mci *networkingv1alpha1.MultiClusterIngress) ([]networkingv1alpha1.ServiceLocation, error) {
|
||||
backendSvcNames := indexes.BuildServiceRefIndexes(mci)
|
||||
sort.Strings(backendSvcNames)
|
||||
|
||||
var svcLocations []networkingv1alpha1.ServiceLocation
|
||||
for _, svcName := range backendSvcNames {
|
||||
svcBinding := &workv1alpha2.ResourceBinding{}
|
||||
svcRBNamespacedName := types.NamespacedName{
|
||||
Namespace: mci.Namespace,
|
||||
Name: names.GenerateBindingName("Service", svcName),
|
||||
}
|
||||
err := c.Client.Get(ctx, svcRBNamespacedName, svcBinding)
|
||||
if err != nil {
|
||||
if apierrors.IsNotFound(err) {
|
||||
continue
|
||||
}
|
||||
klog.ErrorS(err, "failed to get service's related resourceBinding",
|
||||
"ResourceBinding", svcRBNamespacedName.String())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
svcLocations = append(svcLocations, networkingv1alpha1.ServiceLocation{
|
||||
Name: svcName,
|
||||
Clusters: obtainBindingClusters(svcBinding),
|
||||
})
|
||||
}
|
||||
return svcLocations, nil
|
||||
}
|
||||
|
||||
func obtainBindingClusters(rb *workv1alpha2.ResourceBinding) []string {
|
||||
clusters := sets.NewString()
|
||||
for _, cluster := range rb.Spec.Clusters {
|
||||
clusters.Insert(cluster.Name)
|
||||
}
|
||||
for _, requiredBy := range rb.Spec.RequiredBy {
|
||||
for _, cluster := range requiredBy.Clusters {
|
||||
clusters.Insert(cluster.Name)
|
||||
}
|
||||
}
|
||||
return clusters.List()
|
||||
}
|
||||
|
||||
// SetupWithManager creates a controller and register to controller manager.
|
||||
func (c *Controller) SetupWithManager(mgr controllerruntime.Manager) error {
|
||||
mciPredicateFuncs := predicate.Funcs{
|
||||
CreateFunc: func(_ event.CreateEvent) bool { return true },
|
||||
DeleteFunc: func(_ event.DeleteEvent) bool { return false },
|
||||
UpdateFunc: func(event event.UpdateEvent) bool {
|
||||
oldMCI := event.ObjectOld.(*networkingv1alpha1.MultiClusterIngress)
|
||||
newMCI := event.ObjectNew.(*networkingv1alpha1.MultiClusterIngress)
|
||||
var oldDefaultBackend, newDefaultBackend networkingv1.IngressBackend
|
||||
if oldMCI.Spec.DefaultBackend != nil {
|
||||
oldDefaultBackend = *oldMCI.Spec.DefaultBackend
|
||||
}
|
||||
if newMCI.Spec.DefaultBackend != nil {
|
||||
newDefaultBackend = *newMCI.Spec.DefaultBackend
|
||||
}
|
||||
return !reflect.DeepEqual(oldDefaultBackend, newDefaultBackend) ||
|
||||
!reflect.DeepEqual(oldMCI.Spec.Rules, newMCI.Spec.Rules)
|
||||
},
|
||||
}
|
||||
|
||||
rbMapFunc := handler.MapFunc(
|
||||
func(_ context.Context, object client.Object) []reconcile.Request {
|
||||
var requests []reconcile.Request
|
||||
|
||||
rb := object.(*workv1alpha2.ResourceBinding)
|
||||
if rb.Spec.Resource.APIVersion != "v1" || rb.Spec.Resource.Kind != "Service" {
|
||||
return nil
|
||||
}
|
||||
|
||||
mciList := &networkingv1alpha1.MultiClusterIngressList{}
|
||||
if err := c.Client.List(context.Background(), mciList,
|
||||
client.InNamespace(rb.GetNamespace()),
|
||||
client.MatchingFields{indexes.IndexKeyServiceRefName: rb.Spec.Resource.Name}); err != nil {
|
||||
klog.Errorf("failed to fetch multiclusteringresses")
|
||||
return nil
|
||||
}
|
||||
|
||||
for index := range mciList.Items {
|
||||
mci := &mciList.Items[index]
|
||||
requests = append(requests, reconcile.Request{
|
||||
NamespacedName: types.NamespacedName{Namespace: mci.Namespace, Name: mci.Name}})
|
||||
}
|
||||
return requests
|
||||
})
|
||||
|
||||
rbPredicateFuncs := predicate.Funcs{
|
||||
CreateFunc: func(event event.CreateEvent) bool {
|
||||
rb := event.Object.(*workv1alpha2.ResourceBinding)
|
||||
return rb.Spec.Resource.APIVersion == "v1" && rb.Spec.Resource.Kind == "Service"
|
||||
},
|
||||
UpdateFunc: func(event event.UpdateEvent) bool {
|
||||
oldRB := event.ObjectOld.(*workv1alpha2.ResourceBinding)
|
||||
newRB := event.ObjectNew.(*workv1alpha2.ResourceBinding)
|
||||
if newRB.Spec.Resource.APIVersion != "v1" || newRB.Spec.Resource.Kind != "Service" {
|
||||
return false
|
||||
}
|
||||
return !reflect.DeepEqual(oldRB.Spec.Clusters, newRB.Spec.Clusters) ||
|
||||
!reflect.DeepEqual(oldRB.Spec.RequiredBy, newRB.Spec.RequiredBy)
|
||||
},
|
||||
DeleteFunc: func(event event.DeleteEvent) bool {
|
||||
rb := event.Object.(*workv1alpha2.ResourceBinding)
|
||||
return rb.Spec.Resource.APIVersion == "v1" && rb.Spec.Resource.Kind == "Service"
|
||||
},
|
||||
}
|
||||
|
||||
return controllerruntime.NewControllerManagedBy(mgr).
|
||||
For(&networkingv1alpha1.MultiClusterIngress{}, builder.WithPredicates(mciPredicateFuncs)).
|
||||
Watches(&workv1alpha2.ResourceBinding{}, handler.EnqueueRequestsFromMapFunc(rbMapFunc), builder.WithPredicates(rbPredicateFuncs)).
|
||||
WithOptions(controller.Options{RateLimiter: ratelimiterflag.DefaultControllerRateLimiter[controllerruntime.Request](c.RateLimiterOptions)}).
|
||||
Complete(c)
|
||||
}
|
|
@ -4,7 +4,9 @@ import (
|
|||
"context"
|
||||
"strings"
|
||||
|
||||
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
|
||||
networkingv1alpha1 "github.com/karmada-io/karmada/pkg/apis/networking/v1alpha1"
|
||||
remedyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/remedy/v1alpha1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
discoveryv1 "k8s.io/api/discovery/v1"
|
||||
"k8s.io/apimachinery/pkg/api/equality"
|
||||
|
@ -21,7 +23,7 @@ import (
|
|||
"github.com/karmada-io/multicluster-cloud-provider/pkg/util"
|
||||
)
|
||||
|
||||
func newMultiClusterIngressEventHandler(ctx context.Context, client client.Client, providerClassName string) handler.EventHandler {
|
||||
func newMultiClusterIngressEventHandler(ctx context.Context, client client.Client, providerClassName string) handler.TypedEventHandler[*networkingv1alpha1.MultiClusterIngress, reconcile.Request] {
|
||||
return &multiClusterIngressEventHandler{
|
||||
ctx: ctx,
|
||||
client: client,
|
||||
|
@ -29,7 +31,7 @@ func newMultiClusterIngressEventHandler(ctx context.Context, client client.Clien
|
|||
}
|
||||
}
|
||||
|
||||
var _ handler.EventHandler = (*multiClusterIngressEventHandler)(nil)
|
||||
var _ handler.TypedEventHandler[*networkingv1alpha1.MultiClusterIngress, reconcile.Request] = (*multiClusterIngressEventHandler)(nil)
|
||||
|
||||
type multiClusterIngressEventHandler struct {
|
||||
ctx context.Context
|
||||
|
@ -37,9 +39,9 @@ type multiClusterIngressEventHandler struct {
|
|||
ingClassName string
|
||||
}
|
||||
|
||||
func (h *multiClusterIngressEventHandler) Create(e event.CreateEvent, queue workqueue.RateLimitingInterface) {
|
||||
mci := e.Object.(*networkingv1alpha1.MultiClusterIngress)
|
||||
if !util.CheckIngressClassMatched(h.ctx, h.client, mci, h.ingClassName) {
|
||||
func (h *multiClusterIngressEventHandler) Create(_ context.Context, e event.TypedCreateEvent[*networkingv1alpha1.MultiClusterIngress], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
klog.V(4).Infof("mci(%s/%s) created", e.Object.GetNamespace(), e.Object.GetName())
|
||||
if !util.CheckIngressClassMatched(h.ctx, h.client, e.Object, h.ingClassName) {
|
||||
return
|
||||
}
|
||||
queue.Add(reconcile.Request{NamespacedName: types.NamespacedName{
|
||||
|
@ -48,9 +50,9 @@ func (h *multiClusterIngressEventHandler) Create(e event.CreateEvent, queue work
|
|||
}})
|
||||
}
|
||||
|
||||
func (h *multiClusterIngressEventHandler) Update(e event.UpdateEvent, queue workqueue.RateLimitingInterface) {
|
||||
mciOld := e.ObjectOld.(*networkingv1alpha1.MultiClusterIngress)
|
||||
mciNew := e.ObjectNew.(*networkingv1alpha1.MultiClusterIngress)
|
||||
func (h *multiClusterIngressEventHandler) Update(_ context.Context, e event.TypedUpdateEvent[*networkingv1alpha1.MultiClusterIngress], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
mciOld := e.ObjectOld
|
||||
mciNew := e.ObjectNew
|
||||
if !util.CheckIngressClassMatched(h.ctx, h.client, mciNew, h.ingClassName) {
|
||||
return
|
||||
}
|
||||
|
@ -68,14 +70,13 @@ func (h *multiClusterIngressEventHandler) Update(e event.UpdateEvent, queue work
|
|||
}})
|
||||
}
|
||||
|
||||
func (h *multiClusterIngressEventHandler) Delete(_ event.DeleteEvent, _ workqueue.RateLimitingInterface) {
|
||||
func (h *multiClusterIngressEventHandler) Delete(_ context.Context, _ event.TypedDeleteEvent[*networkingv1alpha1.MultiClusterIngress], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
// Since finalizer is added to the multiClusterIngress object,
|
||||
// the delete event is processed by the update event.
|
||||
}
|
||||
|
||||
func (h *multiClusterIngressEventHandler) Generic(e event.GenericEvent, queue workqueue.RateLimitingInterface) {
|
||||
mci := e.Object.(*networkingv1alpha1.MultiClusterIngress)
|
||||
if !util.CheckIngressClassMatched(h.ctx, h.client, mci, h.ingClassName) {
|
||||
func (h *multiClusterIngressEventHandler) Generic(_ context.Context, e event.TypedGenericEvent[*networkingv1alpha1.MultiClusterIngress], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
if !util.CheckIngressClassMatched(h.ctx, h.client, e.Object, h.ingClassName) {
|
||||
return
|
||||
}
|
||||
queue.Add(reconcile.Request{NamespacedName: types.NamespacedName{
|
||||
|
@ -84,27 +85,28 @@ func (h *multiClusterIngressEventHandler) Generic(e event.GenericEvent, queue wo
|
|||
}})
|
||||
}
|
||||
|
||||
func newServiceEventHandler(mciEventChan chan<- event.GenericEvent, client client.Client) handler.EventHandler {
|
||||
func newServiceEventHandler(mciEventChan chan<- event.TypedGenericEvent[*networkingv1alpha1.MultiClusterIngress], client client.Client) handler.TypedEventHandler[*corev1.Service, reconcile.Request] {
|
||||
return &serviceEventHandler{
|
||||
mciEventChan: mciEventChan,
|
||||
client: client,
|
||||
}
|
||||
}
|
||||
|
||||
var _ handler.EventHandler = (*serviceEventHandler)(nil)
|
||||
var _ handler.TypedEventHandler[*corev1.Service, reconcile.Request] = (*serviceEventHandler)(nil)
|
||||
|
||||
type serviceEventHandler struct {
|
||||
mciEventChan chan<- event.GenericEvent
|
||||
mciEventChan chan<- event.TypedGenericEvent[*networkingv1alpha1.MultiClusterIngress]
|
||||
client client.Client
|
||||
}
|
||||
|
||||
func (h *serviceEventHandler) Create(e event.CreateEvent, _ workqueue.RateLimitingInterface) {
|
||||
func (h *serviceEventHandler) Create(_ context.Context, e event.TypedCreateEvent[*corev1.Service], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
klog.V(4).Infof("service(%s/%s) created", e.Object.GetNamespace(), e.Object.GetName())
|
||||
h.enqueueImpactedMCI(e.Object.GetNamespace(), e.Object.GetName())
|
||||
}
|
||||
|
||||
func (h *serviceEventHandler) Update(e event.UpdateEvent, _ workqueue.RateLimitingInterface) {
|
||||
svcOld := e.ObjectOld.(*corev1.Service)
|
||||
svcNew := e.ObjectNew.(*corev1.Service)
|
||||
func (h *serviceEventHandler) Update(_ context.Context, e event.TypedUpdateEvent[*corev1.Service], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
svcOld := e.ObjectOld
|
||||
svcNew := e.ObjectNew
|
||||
|
||||
// We only care about the update events below:
|
||||
if equality.Semantic.DeepEqual(svcOld.Annotations, svcNew.Annotations) &&
|
||||
|
@ -116,11 +118,11 @@ func (h *serviceEventHandler) Update(e event.UpdateEvent, _ workqueue.RateLimiti
|
|||
h.enqueueImpactedMCI(svcNew.Namespace, svcNew.Name)
|
||||
}
|
||||
|
||||
func (h *serviceEventHandler) Delete(e event.DeleteEvent, _ workqueue.RateLimitingInterface) {
|
||||
func (h *serviceEventHandler) Delete(_ context.Context, e event.TypedDeleteEvent[*corev1.Service], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
h.enqueueImpactedMCI(e.Object.GetNamespace(), e.Object.GetName())
|
||||
}
|
||||
|
||||
func (h *serviceEventHandler) Generic(e event.GenericEvent, _ workqueue.RateLimitingInterface) {
|
||||
func (h *serviceEventHandler) Generic(_ context.Context, e event.TypedGenericEvent[*corev1.Service], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
h.enqueueImpactedMCI(e.Object.GetNamespace(), e.Object.GetName())
|
||||
}
|
||||
|
||||
|
@ -135,7 +137,7 @@ func (h *serviceEventHandler) enqueueImpactedMCI(svcNamespace, svcName string) {
|
|||
|
||||
for index := range mciList.Items {
|
||||
mci := &mciList.Items[index]
|
||||
h.mciEventChan <- event.GenericEvent{
|
||||
h.mciEventChan <- event.TypedGenericEvent[*networkingv1alpha1.MultiClusterIngress]{
|
||||
Object: mci,
|
||||
}
|
||||
}
|
||||
|
@ -144,31 +146,32 @@ func (h *serviceEventHandler) enqueueImpactedMCI(svcNamespace, svcName string) {
|
|||
// endpointSlicePrefix is the prefix of service derived from ServiceImport.
|
||||
const derivedServicePrefix = "derived-"
|
||||
|
||||
func newEndpointSlicesEventHandler(svcEventChan chan<- event.GenericEvent) handler.EventHandler {
|
||||
func newEndpointSlicesEventHandler(svcEventChan chan<- event.TypedGenericEvent[*corev1.Service]) handler.TypedEventHandler[*discoveryv1.EndpointSlice, reconcile.Request] {
|
||||
return &endpointSlicesEventHandler{
|
||||
svcEventChan: svcEventChan,
|
||||
}
|
||||
}
|
||||
|
||||
var _ handler.EventHandler = (*endpointSlicesEventHandler)(nil)
|
||||
var _ handler.TypedEventHandler[*discoveryv1.EndpointSlice, reconcile.Request] = (*endpointSlicesEventHandler)(nil)
|
||||
|
||||
type endpointSlicesEventHandler struct {
|
||||
svcEventChan chan<- event.GenericEvent
|
||||
svcEventChan chan<- event.TypedGenericEvent[*corev1.Service]
|
||||
}
|
||||
|
||||
func (h *endpointSlicesEventHandler) Create(e event.CreateEvent, _ workqueue.RateLimitingInterface) {
|
||||
func (h *endpointSlicesEventHandler) Create(_ context.Context, e event.TypedCreateEvent[*discoveryv1.EndpointSlice], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
klog.V(4).Infof("eps(%s/%s) created", e.Object.GetNamespace(), e.Object.GetName())
|
||||
h.enqueueImpactedSvc(e.Object)
|
||||
}
|
||||
|
||||
func (h *endpointSlicesEventHandler) Update(e event.UpdateEvent, _ workqueue.RateLimitingInterface) {
|
||||
func (h *endpointSlicesEventHandler) Update(_ context.Context, e event.TypedUpdateEvent[*discoveryv1.EndpointSlice], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
h.enqueueImpactedSvc(e.ObjectNew)
|
||||
}
|
||||
|
||||
func (h *endpointSlicesEventHandler) Delete(e event.DeleteEvent, _ workqueue.RateLimitingInterface) {
|
||||
func (h *endpointSlicesEventHandler) Delete(_ context.Context, e event.TypedDeleteEvent[*discoveryv1.EndpointSlice], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
h.enqueueImpactedSvc(e.Object)
|
||||
}
|
||||
|
||||
func (h *endpointSlicesEventHandler) Generic(_ event.GenericEvent, _ workqueue.RateLimitingInterface) {
|
||||
func (h *endpointSlicesEventHandler) Generic(_ context.Context, _ event.TypedGenericEvent[*discoveryv1.EndpointSlice], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
}
|
||||
|
||||
func (h *endpointSlicesEventHandler) enqueueImpactedSvc(obj client.Object) {
|
||||
|
@ -179,7 +182,7 @@ func (h *endpointSlicesEventHandler) enqueueImpactedSvc(obj client.Object) {
|
|||
return
|
||||
}
|
||||
|
||||
h.svcEventChan <- event.GenericEvent{
|
||||
h.svcEventChan <- event.TypedGenericEvent[*corev1.Service]{
|
||||
Object: &corev1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: obj.GetNamespace(),
|
||||
|
@ -187,27 +190,28 @@ func (h *endpointSlicesEventHandler) enqueueImpactedSvc(obj client.Object) {
|
|||
}}}
|
||||
}
|
||||
|
||||
func newSecretEventHandler(mciEventChan chan<- event.GenericEvent, client client.Client) handler.EventHandler {
|
||||
func newSecretEventHandler(mciEventChan chan<- event.TypedGenericEvent[*networkingv1alpha1.MultiClusterIngress], client client.Client) handler.TypedEventHandler[*corev1.Secret, reconcile.Request] {
|
||||
return &secretEventHandler{
|
||||
mciEventChan: mciEventChan,
|
||||
client: client,
|
||||
}
|
||||
}
|
||||
|
||||
var _ handler.EventHandler = (*secretEventHandler)(nil)
|
||||
var _ handler.TypedEventHandler[*corev1.Secret, reconcile.Request] = (*secretEventHandler)(nil)
|
||||
|
||||
type secretEventHandler struct {
|
||||
mciEventChan chan<- event.GenericEvent
|
||||
mciEventChan chan<- event.TypedGenericEvent[*networkingv1alpha1.MultiClusterIngress]
|
||||
client client.Client
|
||||
}
|
||||
|
||||
func (h *secretEventHandler) Create(e event.CreateEvent, _ workqueue.RateLimitingInterface) {
|
||||
func (h *secretEventHandler) Create(_ context.Context, e event.TypedCreateEvent[*corev1.Secret], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
klog.V(4).Infof("secret(%s/%s) created", e.Object.GetNamespace(), e.Object.GetName())
|
||||
h.enqueueImpactedMCI(e.Object.GetNamespace(), e.Object.GetName())
|
||||
}
|
||||
|
||||
func (h *secretEventHandler) Update(e event.UpdateEvent, _ workqueue.RateLimitingInterface) {
|
||||
secretOld := e.ObjectOld.(*corev1.Secret)
|
||||
secretNew := e.ObjectNew.(*corev1.Secret)
|
||||
func (h *secretEventHandler) Update(_ context.Context, e event.TypedUpdateEvent[*corev1.Secret], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
secretOld := e.ObjectOld
|
||||
secretNew := e.ObjectNew
|
||||
|
||||
if equality.Semantic.DeepEqual(secretOld.Annotations, secretNew.Annotations) &&
|
||||
equality.Semantic.DeepEqual(secretOld.Data, secretNew.Data) &&
|
||||
|
@ -218,7 +222,7 @@ func (h *secretEventHandler) Update(e event.UpdateEvent, _ workqueue.RateLimitin
|
|||
h.enqueueImpactedMCI(secretNew.Namespace, secretNew.Name)
|
||||
}
|
||||
|
||||
func (h *secretEventHandler) Delete(e event.DeleteEvent, _ workqueue.RateLimitingInterface) {
|
||||
func (h *secretEventHandler) Delete(_ context.Context, e event.TypedDeleteEvent[*corev1.Secret], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
h.enqueueImpactedMCI(e.Object.GetNamespace(), e.Object.GetName())
|
||||
}
|
||||
|
||||
|
@ -233,11 +237,84 @@ func (h *secretEventHandler) enqueueImpactedMCI(secretNamespace, secretName stri
|
|||
|
||||
for index := range mciList.Items {
|
||||
mci := &mciList.Items[index]
|
||||
h.mciEventChan <- event.GenericEvent{
|
||||
h.mciEventChan <- event.TypedGenericEvent[*networkingv1alpha1.MultiClusterIngress]{
|
||||
Object: mci,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *secretEventHandler) Generic(_ event.GenericEvent, _ workqueue.RateLimitingInterface) {
|
||||
func (h *secretEventHandler) Generic(_ context.Context, _ event.TypedGenericEvent[*corev1.Secret], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
|
||||
}
|
||||
|
||||
func newClusterEventHandler(mciEventChan chan<- event.TypedGenericEvent[*networkingv1alpha1.MultiClusterIngress], client client.Client) handler.TypedEventHandler[*clusterv1alpha1.Cluster, reconcile.Request] {
|
||||
return &clusterEventHandler{
|
||||
client: client,
|
||||
mciEventChan: mciEventChan,
|
||||
}
|
||||
}
|
||||
|
||||
var _ handler.TypedEventHandler[*clusterv1alpha1.Cluster, reconcile.Request] = (*clusterEventHandler)(nil)
|
||||
|
||||
type clusterEventHandler struct {
|
||||
client client.Client
|
||||
mciEventChan chan<- event.TypedGenericEvent[*networkingv1alpha1.MultiClusterIngress]
|
||||
}
|
||||
|
||||
func (h *clusterEventHandler) Create(_ context.Context, _ event.TypedCreateEvent[*clusterv1alpha1.Cluster], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
}
|
||||
|
||||
func (h *clusterEventHandler) Update(_ context.Context, e event.TypedUpdateEvent[*clusterv1alpha1.Cluster], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
oldCluster := e.ObjectOld
|
||||
newCluster := e.ObjectNew
|
||||
oldExist, newExist := false, false
|
||||
for _, action := range oldCluster.Status.RemedyActions {
|
||||
if action == string(remedyv1alpha1.TrafficControl) {
|
||||
oldExist = true
|
||||
break
|
||||
}
|
||||
}
|
||||
for _, action := range newCluster.Status.RemedyActions {
|
||||
if action == string(remedyv1alpha1.TrafficControl) {
|
||||
newExist = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if oldExist == newExist {
|
||||
return
|
||||
}
|
||||
|
||||
mciList := &networkingv1alpha1.MultiClusterIngressList{}
|
||||
if err := h.client.List(context.Background(), mciList); err != nil {
|
||||
klog.Errorf("failed to fetch multiclusteringresses")
|
||||
return
|
||||
}
|
||||
|
||||
for index := range mciList.Items {
|
||||
mci := &mciList.Items[index]
|
||||
if !mciSvcLocationsContainsCluster(mci, newCluster) {
|
||||
continue
|
||||
}
|
||||
h.mciEventChan <- event.TypedGenericEvent[*networkingv1alpha1.MultiClusterIngress]{
|
||||
Object: mci,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (h *clusterEventHandler) Delete(_ context.Context, _ event.TypedDeleteEvent[*clusterv1alpha1.Cluster], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
}
|
||||
|
||||
func (h *clusterEventHandler) Generic(_ context.Context, _ event.TypedGenericEvent[*clusterv1alpha1.Cluster], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
}
|
||||
|
||||
func mciSvcLocationsContainsCluster(mci *networkingv1alpha1.MultiClusterIngress, cluster *clusterv1alpha1.Cluster) bool {
|
||||
for _, location := range mci.Status.ServiceLocations {
|
||||
for _, clusterName := range location.Clusters {
|
||||
if clusterName == cluster.Name {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
|
|
@ -3,14 +3,19 @@ package multiclusteringress
|
|||
import (
|
||||
"context"
|
||||
"reflect"
|
||||
"sort"
|
||||
|
||||
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
|
||||
networkingv1alpha1 "github.com/karmada-io/karmada/pkg/apis/networking/v1alpha1"
|
||||
remedyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/remedy/v1alpha1"
|
||||
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
|
||||
"github.com/karmada-io/karmada/pkg/util/fedinformer/genericmanager"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
discoveryv1 "k8s.io/api/discovery/v1"
|
||||
networkingv1 "k8s.io/api/networking/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/client-go/util/retry"
|
||||
"k8s.io/klog/v2"
|
||||
|
@ -86,15 +91,21 @@ func (c *MCIController) handleMCIDelete(ctx context.Context, mci *networkingv1al
|
|||
func (c *MCIController) handleMCICreateOrUpdate(ctx context.Context, mci *networkingv1alpha1.MultiClusterIngress) (controllerruntime.Result, error) {
|
||||
klog.V(4).InfoS("Begin to handle multiClusterIngress create or update event", "namespace", mci.Namespace, "name", mci.Name)
|
||||
|
||||
finalizersUpdated := controllerutil.AddFinalizer(mci, MCIControllerFinalizer)
|
||||
if finalizersUpdated {
|
||||
err := c.Client.Update(ctx, mci)
|
||||
if !controllerutil.ContainsFinalizer(mci, MCIControllerFinalizer) {
|
||||
objPatch := client.MergeFrom(mci)
|
||||
modifiedObj := mci.DeepCopy()
|
||||
controllerutil.AddFinalizer(modifiedObj, MCIControllerFinalizer)
|
||||
err := c.Client.Patch(ctx, modifiedObj, objPatch)
|
||||
if err != nil {
|
||||
klog.V(4).ErrorS(err, "failed to update mci with finalizer", "namespace", mci.Namespace, "name", mci.Name)
|
||||
return controllerruntime.Result{}, err
|
||||
}
|
||||
}
|
||||
|
||||
if err := c.updateMCITrafficBlockClusters(ctx, mci); err != nil {
|
||||
return controllerruntime.Result{}, err
|
||||
}
|
||||
|
||||
_, exist, err := c.LoadBalancer.GetLoadBalancer(ctx, mci)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "failed to get loadBalancer with provider", "namespace", mci.Namespace, "name", mci.Name)
|
||||
|
@ -106,6 +117,60 @@ func (c *MCIController) handleMCICreateOrUpdate(ctx context.Context, mci *networ
|
|||
return c.handleMCICreate(ctx, mci)
|
||||
}
|
||||
|
||||
func (c *MCIController) updateMCITrafficBlockClusters(ctx context.Context, mci *networkingv1alpha1.MultiClusterIngress) error {
|
||||
locatedClusters := sets.NewString()
|
||||
for _, location := range mci.Status.ServiceLocations {
|
||||
locatedClusters.Insert(location.Clusters...)
|
||||
}
|
||||
|
||||
clusterList := &clusterv1alpha1.ClusterList{}
|
||||
if err := c.Client.List(ctx, clusterList); err != nil {
|
||||
klog.Errorf("Failed to list cluster: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
var trafficBlockClusters []string
|
||||
for _, cluster := range clusterList.Items {
|
||||
if !locatedClusters.Has(cluster.Name) {
|
||||
continue
|
||||
}
|
||||
for _, action := range cluster.Status.RemedyActions {
|
||||
if action == string(remedyv1alpha1.TrafficControl) {
|
||||
trafficBlockClusters = append(trafficBlockClusters, cluster.Name)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
sort.Strings(trafficBlockClusters)
|
||||
|
||||
mciNamespacedName := types.NamespacedName{Namespace: mci.Namespace, Name: mci.Name}
|
||||
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
||||
if reflect.DeepEqual(trafficBlockClusters, mci.Status.TrafficBlockClusters) {
|
||||
return nil
|
||||
}
|
||||
mci.Status.TrafficBlockClusters = trafficBlockClusters
|
||||
updateErr := c.Client.Status().Update(ctx, mci)
|
||||
if updateErr == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
updatedMCI := &networkingv1alpha1.MultiClusterIngress{}
|
||||
err := c.Client.Get(ctx, mciNamespacedName, updatedMCI)
|
||||
if err == nil {
|
||||
mci = updatedMCI.DeepCopy()
|
||||
} else {
|
||||
klog.Errorf("Failed to get updated multiClusterIngress(%s): %v", mciNamespacedName.String(), err)
|
||||
}
|
||||
return updateErr
|
||||
})
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to sync multiClusterIngress(%s) trafficBlockClusters: %v", mciNamespacedName.String(), err)
|
||||
return err
|
||||
}
|
||||
klog.V(4).Infof("Success to sync multiClusterIngress(%s) trafficBlockClusters", mciNamespacedName.String())
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *MCIController) handleMCICreate(ctx context.Context, mci *networkingv1alpha1.MultiClusterIngress) (controllerruntime.Result, error) {
|
||||
klog.V(4).InfoS("Begin to handle multiClusterIngress create event", "namespace", mci.Namespace, "name", mci.Name)
|
||||
|
||||
|
@ -169,44 +234,48 @@ func (c *MCIController) SetupWithManager(ctx context.Context, mgr controllerrunt
|
|||
mciController, err := controller.New(ControllerName, mgr,
|
||||
controller.Options{
|
||||
Reconciler: c,
|
||||
RateLimiter: ratelimiterflag.DefaultControllerRateLimiter(c.RateLimiterOptions),
|
||||
RateLimiter: ratelimiterflag.DefaultControllerRateLimiter[controllerruntime.Request](c.RateLimiterOptions),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = c.setupWatches(ctx, mciController); err != nil {
|
||||
if err = c.setupWatches(ctx, mciController, mgr); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *MCIController) setupWatches(ctx context.Context, mciController controller.Controller) error {
|
||||
mciEventChan := make(chan event.GenericEvent)
|
||||
svcEventChan := make(chan event.GenericEvent)
|
||||
func (c *MCIController) setupWatches(ctx context.Context, mciController controller.Controller, mgr controllerruntime.Manager) error {
|
||||
mciEventChan := make(chan event.TypedGenericEvent[*networkingv1alpha1.MultiClusterIngress])
|
||||
svcEventChan := make(chan event.TypedGenericEvent[*corev1.Service])
|
||||
|
||||
mciEventHandler := newMultiClusterIngressEventHandler(ctx, c.Client, c.ProviderClassName)
|
||||
svcEventHandler := newServiceEventHandler(mciEventChan, c.Client)
|
||||
epsEventHandler := newEndpointSlicesEventHandler(svcEventChan)
|
||||
secEventHandler := newSecretEventHandler(mciEventChan, c.Client)
|
||||
clusterHandler := newClusterEventHandler(mciEventChan, c.Client)
|
||||
|
||||
if err := mciController.Watch(&source.Kind{Type: &networkingv1alpha1.MultiClusterIngress{}}, mciEventHandler); err != nil {
|
||||
if err := mciController.Watch(source.Kind[*networkingv1alpha1.MultiClusterIngress](mgr.GetCache(), &networkingv1alpha1.MultiClusterIngress{}, mciEventHandler)); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := mciController.Watch(&source.Channel{Source: mciEventChan}, mciEventHandler); err != nil {
|
||||
if err := mciController.Watch(source.Channel[*networkingv1alpha1.MultiClusterIngress](mciEventChan, mciEventHandler)); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := mciController.Watch(&source.Kind{Type: &corev1.Service{}}, svcEventHandler); err != nil {
|
||||
if err := mciController.Watch(source.Kind[*corev1.Service](mgr.GetCache(), &corev1.Service{}, svcEventHandler)); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := mciController.Watch(&source.Channel{Source: svcEventChan}, svcEventHandler); err != nil {
|
||||
if err := mciController.Watch(source.Channel[*corev1.Service](svcEventChan, svcEventHandler)); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := mciController.Watch(&source.Kind{Type: &discoveryv1.EndpointSlice{}}, epsEventHandler); err != nil {
|
||||
if err := mciController.Watch(source.Kind[*discoveryv1.EndpointSlice](mgr.GetCache(), &discoveryv1.EndpointSlice{}, epsEventHandler)); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := mciController.Watch(&source.Kind{Type: &corev1.Secret{}}, secEventHandler); err != nil {
|
||||
if err := mciController.Watch(source.Kind[*corev1.Secret](mgr.GetCache(), &corev1.Secret{}, secEventHandler)); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := mciController.Watch(source.Kind[*clusterv1alpha1.Cluster](mgr.GetCache(), &clusterv1alpha1.Cluster{}, clusterHandler)); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package multiclusterservice
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
|
||||
networkingv1alpha1 "github.com/karmada-io/karmada/pkg/apis/networking/v1alpha1"
|
||||
|
@ -19,30 +20,29 @@ import (
|
|||
"github.com/karmada-io/multicluster-cloud-provider/pkg/util"
|
||||
)
|
||||
|
||||
func newMultiClusterServiceEventHandler() handler.EventHandler {
|
||||
func newMultiClusterServiceEventHandler() handler.TypedEventHandler[*networkingv1alpha1.MultiClusterService, reconcile.Request] {
|
||||
return &multiClusterServiceEventHandler{}
|
||||
}
|
||||
|
||||
var _ handler.EventHandler = (*multiClusterServiceEventHandler)(nil)
|
||||
var _ handler.TypedEventHandler[*networkingv1alpha1.MultiClusterService, reconcile.Request] = (*multiClusterServiceEventHandler)(nil)
|
||||
|
||||
type multiClusterServiceEventHandler struct {
|
||||
}
|
||||
|
||||
func (h *multiClusterServiceEventHandler) Create(e event.CreateEvent, queue workqueue.RateLimitingInterface) {
|
||||
mcs := e.Object.(*networkingv1alpha1.MultiClusterService)
|
||||
if !util.MCSContainLoadBalanceType(mcs) {
|
||||
func (h *multiClusterServiceEventHandler) Create(_ context.Context, e event.TypedCreateEvent[*networkingv1alpha1.MultiClusterService], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
if !util.MCSContainLoadBalanceType(e.Object) {
|
||||
return
|
||||
}
|
||||
|
||||
queue.Add(reconcile.Request{NamespacedName: types.NamespacedName{
|
||||
Namespace: mcs.Namespace,
|
||||
Name: mcs.Name,
|
||||
Namespace: e.Object.Namespace,
|
||||
Name: e.Object.Name,
|
||||
}})
|
||||
}
|
||||
|
||||
func (h *multiClusterServiceEventHandler) Update(e event.UpdateEvent, queue workqueue.RateLimitingInterface) {
|
||||
mcsOld := e.ObjectOld.(*networkingv1alpha1.MultiClusterService)
|
||||
mcsNew := e.ObjectNew.(*networkingv1alpha1.MultiClusterService)
|
||||
func (h *multiClusterServiceEventHandler) Update(_ context.Context, e event.TypedUpdateEvent[*networkingv1alpha1.MultiClusterService], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
mcsOld := e.ObjectOld
|
||||
mcsNew := e.ObjectNew
|
||||
if !util.MCSContainLoadBalanceType(mcsOld) && !util.MCSContainLoadBalanceType(mcsNew) {
|
||||
return
|
||||
}
|
||||
|
@ -60,39 +60,39 @@ func (h *multiClusterServiceEventHandler) Update(e event.UpdateEvent, queue work
|
|||
}})
|
||||
}
|
||||
|
||||
func (h *multiClusterServiceEventHandler) Delete(_ event.DeleteEvent, _ workqueue.RateLimitingInterface) {
|
||||
func (h *multiClusterServiceEventHandler) Delete(_ context.Context, _ event.TypedDeleteEvent[*networkingv1alpha1.MultiClusterService], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
// Since finalizer is added to the MultiClusterService object,
|
||||
// the delete event is processed by the update event.
|
||||
}
|
||||
|
||||
func (h *multiClusterServiceEventHandler) Generic(e event.GenericEvent, queue workqueue.RateLimitingInterface) {
|
||||
func (h *multiClusterServiceEventHandler) Generic(_ context.Context, e event.TypedGenericEvent[*networkingv1alpha1.MultiClusterService], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
queue.Add(reconcile.Request{NamespacedName: types.NamespacedName{
|
||||
Namespace: e.Object.GetNamespace(),
|
||||
Name: e.Object.GetName(),
|
||||
}})
|
||||
}
|
||||
|
||||
func newServiceEventHandler(mcsEventChan chan<- event.GenericEvent, client client.Client) handler.EventHandler {
|
||||
func newServiceEventHandler(mcsEventChan chan<- event.TypedGenericEvent[*networkingv1alpha1.MultiClusterService], client client.Client) handler.TypedEventHandler[*corev1.Service, reconcile.Request] {
|
||||
return &serviceEventHandler{
|
||||
mcsEventChan: mcsEventChan,
|
||||
client: client,
|
||||
}
|
||||
}
|
||||
|
||||
var _ handler.EventHandler = (*serviceEventHandler)(nil)
|
||||
var _ handler.TypedEventHandler[*corev1.Service, reconcile.Request] = (*serviceEventHandler)(nil)
|
||||
|
||||
type serviceEventHandler struct {
|
||||
mcsEventChan chan<- event.GenericEvent
|
||||
mcsEventChan chan<- event.TypedGenericEvent[*networkingv1alpha1.MultiClusterService]
|
||||
client client.Client
|
||||
}
|
||||
|
||||
func (h *serviceEventHandler) Create(e event.CreateEvent, _ workqueue.RateLimitingInterface) {
|
||||
func (h *serviceEventHandler) Create(_ context.Context, e event.TypedCreateEvent[*corev1.Service], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
h.enqueueImpactedMCS(e.Object.GetNamespace(), e.Object.GetName())
|
||||
}
|
||||
|
||||
func (h *serviceEventHandler) Update(e event.UpdateEvent, _ workqueue.RateLimitingInterface) {
|
||||
svcOld := e.ObjectOld.(*corev1.Service)
|
||||
svcNew := e.ObjectNew.(*corev1.Service)
|
||||
func (h *serviceEventHandler) Update(_ context.Context, e event.TypedUpdateEvent[*corev1.Service], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
svcOld := e.ObjectOld
|
||||
svcNew := e.ObjectNew
|
||||
|
||||
// We only care about the update events below:
|
||||
if equality.Semantic.DeepEqual(svcOld.Annotations, svcNew.Annotations) &&
|
||||
|
@ -103,16 +103,16 @@ func (h *serviceEventHandler) Update(e event.UpdateEvent, _ workqueue.RateLimiti
|
|||
h.enqueueImpactedMCS(svcNew.Namespace, svcNew.Name)
|
||||
}
|
||||
|
||||
func (h *serviceEventHandler) Delete(e event.DeleteEvent, _ workqueue.RateLimitingInterface) {
|
||||
func (h *serviceEventHandler) Delete(_ context.Context, e event.TypedDeleteEvent[*corev1.Service], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
h.enqueueImpactedMCS(e.Object.GetNamespace(), e.Object.GetName())
|
||||
}
|
||||
|
||||
func (h *serviceEventHandler) Generic(e event.GenericEvent, _ workqueue.RateLimitingInterface) {
|
||||
func (h *serviceEventHandler) Generic(_ context.Context, e event.TypedGenericEvent[*corev1.Service], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
h.enqueueImpactedMCS(e.Object.GetNamespace(), e.Object.GetName())
|
||||
}
|
||||
|
||||
func (h *serviceEventHandler) enqueueImpactedMCS(svcNamespace, svcName string) {
|
||||
h.mcsEventChan <- event.GenericEvent{
|
||||
h.mcsEventChan <- event.TypedGenericEvent[*networkingv1alpha1.MultiClusterService]{
|
||||
Object: &networkingv1alpha1.MultiClusterService{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: svcNamespace,
|
||||
|
@ -123,42 +123,42 @@ func (h *serviceEventHandler) enqueueImpactedMCS(svcNamespace, svcName string) {
|
|||
// endpointSlicePrefix is the prefix of service derived from ServiceImport.
|
||||
const derivedServicePrefix = "derived-"
|
||||
|
||||
func newEndpointSlicesEventHandler(svcEventChan chan<- event.GenericEvent) handler.EventHandler {
|
||||
func newEndpointSlicesEventHandler(svcEventChan chan<- event.TypedGenericEvent[*corev1.Service]) handler.TypedEventHandler[*discoveryv1.EndpointSlice, reconcile.Request] {
|
||||
return &endpointSlicesEventHandler{
|
||||
svcEventChan: svcEventChan,
|
||||
}
|
||||
}
|
||||
|
||||
var _ handler.EventHandler = (*endpointSlicesEventHandler)(nil)
|
||||
var _ handler.TypedEventHandler[*discoveryv1.EndpointSlice, reconcile.Request] = (*endpointSlicesEventHandler)(nil)
|
||||
|
||||
type endpointSlicesEventHandler struct {
|
||||
svcEventChan chan<- event.GenericEvent
|
||||
svcEventChan chan<- event.TypedGenericEvent[*corev1.Service]
|
||||
}
|
||||
|
||||
func (h *endpointSlicesEventHandler) Create(e event.CreateEvent, _ workqueue.RateLimitingInterface) {
|
||||
func (h *endpointSlicesEventHandler) Create(_ context.Context, e event.TypedCreateEvent[*discoveryv1.EndpointSlice], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
h.enqueueImpactedSvc(e.Object)
|
||||
}
|
||||
|
||||
func (h *endpointSlicesEventHandler) Update(e event.UpdateEvent, _ workqueue.RateLimitingInterface) {
|
||||
func (h *endpointSlicesEventHandler) Update(_ context.Context, e event.TypedUpdateEvent[*discoveryv1.EndpointSlice], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
h.enqueueImpactedSvc(e.ObjectNew)
|
||||
}
|
||||
|
||||
func (h *endpointSlicesEventHandler) Delete(e event.DeleteEvent, _ workqueue.RateLimitingInterface) {
|
||||
func (h *endpointSlicesEventHandler) Delete(_ context.Context, e event.TypedDeleteEvent[*discoveryv1.EndpointSlice], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
h.enqueueImpactedSvc(e.Object)
|
||||
}
|
||||
|
||||
func (h *endpointSlicesEventHandler) Generic(_ event.GenericEvent, _ workqueue.RateLimitingInterface) {
|
||||
func (h *endpointSlicesEventHandler) Generic(_ context.Context, _ event.TypedGenericEvent[*discoveryv1.EndpointSlice], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
}
|
||||
|
||||
func (h *endpointSlicesEventHandler) enqueueImpactedSvc(obj client.Object) {
|
||||
svcName, ok := obj.GetLabels()[discoveryv1.LabelServiceName]
|
||||
if !ok {
|
||||
klog.Warning("Can not get the key(%s) with the endpointSlices object(%s/%s)",
|
||||
klog.Warningf("Can not get the key(%s) with the endpointSlices object(%s/%s)",
|
||||
discoveryv1.LabelServiceName, obj.GetNamespace(), obj.GetName())
|
||||
return
|
||||
}
|
||||
|
||||
h.svcEventChan <- event.GenericEvent{
|
||||
h.svcEventChan <- event.TypedGenericEvent[*corev1.Service]{
|
||||
Object: &corev1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: obj.GetNamespace(),
|
||||
|
|
|
@ -243,40 +243,40 @@ func (c *MCSController) SetupWithManager(_ context.Context, mgr controllerruntim
|
|||
mcsController, err := controller.New(ControllerName, mgr,
|
||||
controller.Options{
|
||||
Reconciler: c,
|
||||
RateLimiter: ratelimiterflag.DefaultControllerRateLimiter(c.RateLimiterOptions),
|
||||
RateLimiter: ratelimiterflag.DefaultControllerRateLimiter[controllerruntime.Request](c.RateLimiterOptions),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = c.setupWatches(mcsController); err != nil {
|
||||
if err = c.setupWatches(mcsController, mgr); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *MCSController) setupWatches(mcsController controller.Controller) error {
|
||||
mcsEventChan := make(chan event.GenericEvent)
|
||||
svcEventChan := make(chan event.GenericEvent)
|
||||
func (c *MCSController) setupWatches(mcsController controller.Controller, mgr controllerruntime.Manager) error {
|
||||
mcsEventChan := make(chan event.TypedGenericEvent[*networkingv1alpha1.MultiClusterService])
|
||||
svcEventChan := make(chan event.TypedGenericEvent[*corev1.Service])
|
||||
|
||||
mcsEventHandler := newMultiClusterServiceEventHandler()
|
||||
svcEventHandler := newServiceEventHandler(mcsEventChan, c.Client)
|
||||
epsEventHandler := newEndpointSlicesEventHandler(svcEventChan)
|
||||
|
||||
if err := mcsController.Watch(&source.Kind{Type: &networkingv1alpha1.MultiClusterService{}}, mcsEventHandler); err != nil {
|
||||
if err := mcsController.Watch(source.Kind[*networkingv1alpha1.MultiClusterService](mgr.GetCache(), &networkingv1alpha1.MultiClusterService{}, mcsEventHandler)); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := mcsController.Watch(&source.Channel{Source: mcsEventChan}, mcsEventHandler); err != nil {
|
||||
if err := mcsController.Watch(source.Channel[*networkingv1alpha1.MultiClusterService](mcsEventChan, mcsEventHandler)); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := mcsController.Watch(&source.Kind{Type: &corev1.Service{}}, svcEventHandler); err != nil {
|
||||
if err := mcsController.Watch(source.Kind[*corev1.Service](mgr.GetCache(), &corev1.Service{}, svcEventHandler)); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := mcsController.Watch(&source.Channel{Source: svcEventChan}, svcEventHandler); err != nil {
|
||||
if err := mcsController.Watch(source.Channel[*corev1.Service](svcEventChan, svcEventHandler)); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := mcsController.Watch(&source.Kind{Type: &discoveryv1.EndpointSlice{}}, epsEventHandler); err != nil {
|
||||
if err := mcsController.Watch(source.Kind[*discoveryv1.EndpointSlice](mgr.GetCache(), &discoveryv1.EndpointSlice{}, epsEventHandler)); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
|
|
@ -21,21 +21,21 @@ import (
|
|||
"github.com/karmada-io/multicluster-cloud-provider/pkg/util"
|
||||
)
|
||||
|
||||
func newServiceEventHandler(ctx context.Context, client client.Client) handler.EventHandler {
|
||||
func newServiceEventHandler(ctx context.Context, client client.Client) handler.TypedEventHandler[*corev1.Service, reconcile.Request] {
|
||||
return &serviceEventHandler{
|
||||
ctx: ctx,
|
||||
client: client,
|
||||
}
|
||||
}
|
||||
|
||||
var _ handler.EventHandler = (*serviceEventHandler)(nil)
|
||||
var _ handler.TypedEventHandler[*corev1.Service, reconcile.Request] = (*serviceEventHandler)(nil)
|
||||
|
||||
type serviceEventHandler struct {
|
||||
ctx context.Context
|
||||
client client.Client
|
||||
}
|
||||
|
||||
func (h *serviceEventHandler) Create(e event.CreateEvent, queue workqueue.RateLimitingInterface) {
|
||||
func (h *serviceEventHandler) Create(_ context.Context, e event.TypedCreateEvent[*corev1.Service], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
mciList := &networkingv1alpha1.MultiClusterIngressList{}
|
||||
if err := h.client.List(h.ctx, mciList,
|
||||
client.InNamespace(e.Object.GetNamespace()),
|
||||
|
@ -68,17 +68,17 @@ func (h *serviceEventHandler) Create(e event.CreateEvent, queue workqueue.RateLi
|
|||
}})
|
||||
}
|
||||
|
||||
func (h *serviceEventHandler) Update(_ event.UpdateEvent, _ workqueue.RateLimitingInterface) {
|
||||
func (h *serviceEventHandler) Update(_ context.Context, _ event.TypedUpdateEvent[*corev1.Service], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
// We only need to create ServiceExport based on the service and propagate it to
|
||||
// member clusters. Therefore, we do not need to pay attention to service update.
|
||||
}
|
||||
|
||||
func (h *serviceEventHandler) Delete(_ event.DeleteEvent, queue workqueue.RateLimitingInterface) {
|
||||
func (h *serviceEventHandler) Delete(_ context.Context, _ event.TypedDeleteEvent[*corev1.Service], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
// We will add an ownerReference to the service object on the ServiceExport
|
||||
// object, so that cleanup will be handled by gc controller.
|
||||
}
|
||||
|
||||
func (h *serviceEventHandler) Generic(e event.GenericEvent, queue workqueue.RateLimitingInterface) {
|
||||
func (h *serviceEventHandler) Generic(_ context.Context, e event.TypedGenericEvent[*corev1.Service], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
queue.Add(reconcile.Request{
|
||||
NamespacedName: types.NamespacedName{
|
||||
Namespace: e.Object.GetNamespace(),
|
||||
|
@ -86,7 +86,7 @@ func (h *serviceEventHandler) Generic(e event.GenericEvent, queue workqueue.Rate
|
|||
}})
|
||||
}
|
||||
|
||||
func newMultiClusterIngressEventHandler(ctx context.Context, client client.Client, svcEventChan chan<- event.GenericEvent, providerClassName string) handler.EventHandler {
|
||||
func newMultiClusterIngressEventHandler(ctx context.Context, client client.Client, svcEventChan chan<- event.TypedGenericEvent[*corev1.Service], providerClassName string) handler.TypedEventHandler[*networkingv1alpha1.MultiClusterIngress, reconcile.Request] {
|
||||
return &multiClusterIngressEventHandler{
|
||||
ctx: ctx,
|
||||
client: client,
|
||||
|
@ -95,26 +95,25 @@ func newMultiClusterIngressEventHandler(ctx context.Context, client client.Clien
|
|||
}
|
||||
}
|
||||
|
||||
var _ handler.EventHandler = (*multiClusterIngressEventHandler)(nil)
|
||||
var _ handler.TypedEventHandler[*networkingv1alpha1.MultiClusterIngress, reconcile.Request] = (*multiClusterIngressEventHandler)(nil)
|
||||
|
||||
type multiClusterIngressEventHandler struct {
|
||||
ctx context.Context
|
||||
client client.Client
|
||||
svcEventChan chan<- event.GenericEvent
|
||||
svcEventChan chan<- event.TypedGenericEvent[*corev1.Service]
|
||||
ingClassName string
|
||||
}
|
||||
|
||||
func (h *multiClusterIngressEventHandler) Create(e event.CreateEvent, _ workqueue.RateLimitingInterface) {
|
||||
mci := e.Object.(*networkingv1alpha1.MultiClusterIngress)
|
||||
if !util.CheckIngressClassMatched(h.ctx, h.client, mci, h.ingClassName) {
|
||||
func (h *multiClusterIngressEventHandler) Create(_ context.Context, e event.TypedCreateEvent[*networkingv1alpha1.MultiClusterIngress], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
if !util.CheckIngressClassMatched(h.ctx, h.client, e.Object, h.ingClassName) {
|
||||
return
|
||||
}
|
||||
h.enqueueImpactedService(mci)
|
||||
h.enqueueImpactedService(e.Object)
|
||||
}
|
||||
|
||||
func (h *multiClusterIngressEventHandler) Update(e event.UpdateEvent, _ workqueue.RateLimitingInterface) {
|
||||
mciOld := e.ObjectOld.(*networkingv1alpha1.MultiClusterIngress)
|
||||
mciNew := e.ObjectNew.(*networkingv1alpha1.MultiClusterIngress)
|
||||
func (h *multiClusterIngressEventHandler) Update(_ context.Context, e event.TypedUpdateEvent[*networkingv1alpha1.MultiClusterIngress], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
mciOld := e.ObjectOld
|
||||
mciNew := e.ObjectNew
|
||||
if !util.CheckIngressClassMatched(h.ctx, h.client, mciNew, h.ingClassName) {
|
||||
return
|
||||
}
|
||||
|
@ -139,7 +138,7 @@ func (h *multiClusterIngressEventHandler) Update(e event.UpdateEvent, _ workqueu
|
|||
}
|
||||
|
||||
for _, svc := range targetRefs {
|
||||
h.svcEventChan <- event.GenericEvent{
|
||||
h.svcEventChan <- event.TypedGenericEvent[*corev1.Service]{
|
||||
Object: &corev1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: mciNew.Namespace,
|
||||
|
@ -148,21 +147,20 @@ func (h *multiClusterIngressEventHandler) Update(e event.UpdateEvent, _ workqueu
|
|||
}
|
||||
}
|
||||
|
||||
func (h *multiClusterIngressEventHandler) Delete(e event.DeleteEvent, _ workqueue.RateLimitingInterface) {
|
||||
mci := e.Object.(*networkingv1alpha1.MultiClusterIngress)
|
||||
if !util.CheckIngressClassMatched(h.ctx, h.client, mci, h.ingClassName) {
|
||||
func (h *multiClusterIngressEventHandler) Delete(_ context.Context, e event.TypedDeleteEvent[*networkingv1alpha1.MultiClusterIngress], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
if !util.CheckIngressClassMatched(h.ctx, h.client, e.Object, h.ingClassName) {
|
||||
return
|
||||
}
|
||||
h.enqueueImpactedService(mci)
|
||||
h.enqueueImpactedService(e.Object)
|
||||
}
|
||||
|
||||
func (h *multiClusterIngressEventHandler) Generic(_ event.GenericEvent, _ workqueue.RateLimitingInterface) {
|
||||
func (h *multiClusterIngressEventHandler) Generic(_ context.Context, _ event.TypedGenericEvent[*networkingv1alpha1.MultiClusterIngress], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
}
|
||||
|
||||
func (h *multiClusterIngressEventHandler) enqueueImpactedService(mci *networkingv1alpha1.MultiClusterIngress) {
|
||||
svcRefs := indexes.BuildServiceRefIndexes(mci)
|
||||
for _, svc := range svcRefs {
|
||||
h.svcEventChan <- event.GenericEvent{
|
||||
h.svcEventChan <- event.TypedGenericEvent[*corev1.Service]{
|
||||
Object: &corev1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: mci.Namespace,
|
||||
|
@ -174,8 +172,8 @@ func (h *multiClusterIngressEventHandler) enqueueImpactedService(mci *networking
|
|||
func newMultiClusterServiceEventHandler(
|
||||
ctx context.Context,
|
||||
client client.Client,
|
||||
svcEventChan chan<- event.GenericEvent,
|
||||
) handler.EventHandler {
|
||||
svcEventChan chan<- event.TypedGenericEvent[*corev1.Service],
|
||||
) handler.TypedEventHandler[*networkingv1alpha1.MultiClusterService, reconcile.Request] {
|
||||
return &multiClusterServiceEventHandler{
|
||||
ctx: ctx,
|
||||
client: client,
|
||||
|
@ -183,21 +181,21 @@ func newMultiClusterServiceEventHandler(
|
|||
}
|
||||
}
|
||||
|
||||
var _ handler.EventHandler = (*multiClusterServiceEventHandler)(nil)
|
||||
var _ handler.TypedEventHandler[*networkingv1alpha1.MultiClusterService, reconcile.Request] = (*multiClusterServiceEventHandler)(nil)
|
||||
|
||||
type multiClusterServiceEventHandler struct {
|
||||
ctx context.Context
|
||||
client client.Client
|
||||
svcEventChan chan<- event.GenericEvent
|
||||
svcEventChan chan<- event.TypedGenericEvent[*corev1.Service]
|
||||
}
|
||||
|
||||
func (h *multiClusterServiceEventHandler) Create(e event.CreateEvent, _ workqueue.RateLimitingInterface) {
|
||||
func (h *multiClusterServiceEventHandler) Create(_ context.Context, e event.TypedCreateEvent[*networkingv1alpha1.MultiClusterService], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
h.enqueueImpactedService(e.Object.GetNamespace(), e.Object.GetName())
|
||||
}
|
||||
|
||||
func (h *multiClusterServiceEventHandler) Update(e event.UpdateEvent, _ workqueue.RateLimitingInterface) {
|
||||
mcsOld := e.ObjectOld.(*networkingv1alpha1.MultiClusterService)
|
||||
mcsNew := e.ObjectNew.(*networkingv1alpha1.MultiClusterService)
|
||||
func (h *multiClusterServiceEventHandler) Update(_ context.Context, e event.TypedUpdateEvent[*networkingv1alpha1.MultiClusterService], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
mcsOld := e.ObjectOld
|
||||
mcsNew := e.ObjectNew
|
||||
|
||||
// Only care about the update events below:
|
||||
if equality.Semantic.DeepEqual(mcsOld.Annotations, mcsNew.Annotations) &&
|
||||
|
@ -209,15 +207,15 @@ func (h *multiClusterServiceEventHandler) Update(e event.UpdateEvent, _ workqueu
|
|||
h.enqueueImpactedService(mcsNew.Namespace, mcsNew.Name)
|
||||
}
|
||||
|
||||
func (h *multiClusterServiceEventHandler) Delete(e event.DeleteEvent, _ workqueue.RateLimitingInterface) {
|
||||
func (h *multiClusterServiceEventHandler) Delete(_ context.Context, e event.TypedDeleteEvent[*networkingv1alpha1.MultiClusterService], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
h.enqueueImpactedService(e.Object.GetNamespace(), e.Object.GetName())
|
||||
}
|
||||
|
||||
func (h *multiClusterServiceEventHandler) Generic(_ event.GenericEvent, _ workqueue.RateLimitingInterface) {
|
||||
func (h *multiClusterServiceEventHandler) Generic(_ context.Context, _ event.TypedGenericEvent[*networkingv1alpha1.MultiClusterService], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
}
|
||||
|
||||
func (h *multiClusterServiceEventHandler) enqueueImpactedService(namespace, name string) {
|
||||
h.svcEventChan <- event.GenericEvent{
|
||||
h.svcEventChan <- event.TypedGenericEvent[*corev1.Service]{
|
||||
Object: &corev1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: namespace,
|
||||
|
@ -225,36 +223,35 @@ func (h *multiClusterServiceEventHandler) enqueueImpactedService(namespace, name
|
|||
}}}
|
||||
}
|
||||
|
||||
func newResourceBindingEventHandler(svcEventChan chan<- event.GenericEvent) handler.EventHandler {
|
||||
func newResourceBindingEventHandler(svcEventChan chan<- event.TypedGenericEvent[*corev1.Service]) handler.TypedEventHandler[*workv1alpha1.ResourceBinding, reconcile.Request] {
|
||||
return &resourceBindingEventHandler{
|
||||
svcEventChan: svcEventChan,
|
||||
}
|
||||
}
|
||||
|
||||
var _ handler.EventHandler = (*resourceBindingEventHandler)(nil)
|
||||
var _ handler.TypedEventHandler[*workv1alpha1.ResourceBinding, reconcile.Request] = (*resourceBindingEventHandler)(nil)
|
||||
|
||||
type resourceBindingEventHandler struct {
|
||||
svcEventChan chan<- event.GenericEvent
|
||||
svcEventChan chan<- event.TypedGenericEvent[*corev1.Service]
|
||||
}
|
||||
|
||||
func (h *resourceBindingEventHandler) Create(e event.CreateEvent, _ workqueue.RateLimitingInterface) {
|
||||
func (h *resourceBindingEventHandler) Create(_ context.Context, e event.TypedCreateEvent[*workv1alpha1.ResourceBinding], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
// The distribution feature involves directly creating rb objects,
|
||||
// so it is necessary to care about the rb creation event.
|
||||
rb := e.Object.(*workv1alpha1.ResourceBinding)
|
||||
if rb.Spec.Resource.Kind != "Service" {
|
||||
if e.Object.Spec.Resource.Kind != "Service" {
|
||||
return
|
||||
}
|
||||
h.svcEventChan <- event.GenericEvent{
|
||||
h.svcEventChan <- event.TypedGenericEvent[*corev1.Service]{
|
||||
Object: &corev1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: rb.Spec.Resource.Namespace,
|
||||
Name: rb.Spec.Resource.Name,
|
||||
Namespace: e.Object.Spec.Resource.Namespace,
|
||||
Name: e.Object.Spec.Resource.Name,
|
||||
}}}
|
||||
}
|
||||
|
||||
func (h *resourceBindingEventHandler) Update(e event.UpdateEvent, _ workqueue.RateLimitingInterface) {
|
||||
rbOlb := e.ObjectOld.(*workv1alpha1.ResourceBinding)
|
||||
rbNew := e.ObjectNew.(*workv1alpha1.ResourceBinding)
|
||||
func (h *resourceBindingEventHandler) Update(_ context.Context, e event.TypedUpdateEvent[*workv1alpha1.ResourceBinding], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
rbOlb := e.ObjectOld
|
||||
rbNew := e.ObjectNew
|
||||
|
||||
resource := rbNew.Spec.Resource
|
||||
if resource.Kind != "Service" {
|
||||
|
@ -267,7 +264,7 @@ func (h *resourceBindingEventHandler) Update(e event.UpdateEvent, _ workqueue.Ra
|
|||
return
|
||||
}
|
||||
|
||||
h.svcEventChan <- event.GenericEvent{
|
||||
h.svcEventChan <- event.TypedGenericEvent[*corev1.Service]{
|
||||
Object: &corev1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Namespace: resource.Namespace,
|
||||
|
@ -275,10 +272,10 @@ func (h *resourceBindingEventHandler) Update(e event.UpdateEvent, _ workqueue.Ra
|
|||
}}}
|
||||
}
|
||||
|
||||
func (h *resourceBindingEventHandler) Delete(_ event.DeleteEvent, _ workqueue.RateLimitingInterface) {
|
||||
func (h *resourceBindingEventHandler) Delete(_ context.Context, _ event.TypedDeleteEvent[*workv1alpha1.ResourceBinding], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
// The deletion event of the resourceBinding will be
|
||||
// processed by the deletion event of service.
|
||||
}
|
||||
|
||||
func (h *resourceBindingEventHandler) Generic(_ event.GenericEvent, _ workqueue.RateLimitingInterface) {
|
||||
func (h *resourceBindingEventHandler) Generic(_ context.Context, _ event.TypedGenericEvent[*workv1alpha1.ResourceBinding], _ workqueue.TypedRateLimitingInterface[reconcile.Request]) {
|
||||
}
|
||||
|
|
|
@ -287,40 +287,40 @@ func (c *Controller) SetupWithManager(ctx context.Context, mgr controllerruntime
|
|||
serviceExportController, err := controller.New(ControllerName, mgr,
|
||||
controller.Options{
|
||||
Reconciler: c,
|
||||
RateLimiter: ratelimiterflag.DefaultControllerRateLimiter(c.RateLimiterOptions),
|
||||
RateLimiter: ratelimiterflag.DefaultControllerRateLimiter[controllerruntime.Request](c.RateLimiterOptions),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err = c.setupWatches(ctx, serviceExportController); err != nil {
|
||||
if err = c.setupWatches(ctx, serviceExportController, mgr); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Controller) setupWatches(ctx context.Context, serviceExportController controller.Controller) error {
|
||||
svcEventChan := make(chan event.GenericEvent)
|
||||
func (c *Controller) setupWatches(ctx context.Context, serviceExportController controller.Controller, mgr controllerruntime.Manager) error {
|
||||
svcEventChan := make(chan event.TypedGenericEvent[*corev1.Service])
|
||||
|
||||
svcEventHandler := newServiceEventHandler(ctx, c.Client)
|
||||
mciEventHandler := newMultiClusterIngressEventHandler(ctx, c.Client, svcEventChan, c.ProviderClassName)
|
||||
mcsEventHandler := newMultiClusterServiceEventHandler(ctx, c.Client, svcEventChan)
|
||||
rbEventHandler := newResourceBindingEventHandler(svcEventChan)
|
||||
|
||||
if err := serviceExportController.Watch(&source.Kind{Type: &corev1.Service{}}, svcEventHandler); err != nil {
|
||||
if err := serviceExportController.Watch(source.Kind[*corev1.Service](mgr.GetCache(), &corev1.Service{}, svcEventHandler)); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := serviceExportController.Watch(&source.Channel{Source: svcEventChan}, svcEventHandler); err != nil {
|
||||
if err := serviceExportController.Watch(source.Channel[*corev1.Service](svcEventChan, svcEventHandler)); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := serviceExportController.Watch(&source.Kind{Type: &networkingv1alpha1.MultiClusterIngress{}}, mciEventHandler); err != nil {
|
||||
if err := serviceExportController.Watch(source.Kind[*networkingv1alpha1.MultiClusterIngress](mgr.GetCache(), &networkingv1alpha1.MultiClusterIngress{}, mciEventHandler)); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := serviceExportController.Watch(&source.Kind{Type: &networkingv1alpha1.MultiClusterService{}}, mcsEventHandler); err != nil {
|
||||
if err := serviceExportController.Watch(source.Kind[*networkingv1alpha1.MultiClusterService](mgr.GetCache(), &networkingv1alpha1.MultiClusterService{}, mcsEventHandler)); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := serviceExportController.Watch(&source.Kind{Type: &workv1alpha1.ResourceBinding{}}, rbEventHandler); err != nil {
|
||||
if err := serviceExportController.Watch(source.Kind[*workv1alpha1.ResourceBinding](mgr.GetCache(), &workv1alpha1.ResourceBinding{}, rbEventHandler)); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
|
Loading…
Reference in New Issue