Merge pull request #1059 from fluxcd/watch-label-selector

Add reconciler sharding capability based on label selector
This commit is contained in:
Hidde Beydals 2023-03-29 14:57:58 +02:00 committed by GitHub
commit 7a271f1aeb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 181 additions and 151 deletions

View File

@ -94,7 +94,8 @@ test-api: ## Run api tests
cd api; go test $(GO_TEST_ARGS) ./... -coverprofile cover.out
run: generate fmt vet manifests ## Run against the configured Kubernetes cluster in ~/.kube/config
go run $(GO_STATIC_FLAGS) ./main.go
@mkdir -p $(PWD)/bin/data
go run $(GO_STATIC_FLAGS) ./main.go --storage-adv-addr=:0 --storage-path=$(PWD)/bin/data
install: manifests ## Install CRDs into a cluster
kustomize build config/crd | kubectl apply -f -

16
go.mod
View File

@ -29,7 +29,7 @@ require (
github.com/fluxcd/pkg/lockedfile v0.1.0
github.com/fluxcd/pkg/masktoken v0.2.0
github.com/fluxcd/pkg/oci v0.21.1
github.com/fluxcd/pkg/runtime v0.33.0
github.com/fluxcd/pkg/runtime v0.35.0
github.com/fluxcd/pkg/sourceignore v0.3.3
github.com/fluxcd/pkg/ssh v0.7.3
github.com/fluxcd/pkg/testserver v0.4.0
@ -42,7 +42,7 @@ require (
github.com/google/go-containerregistry/pkg/authn/k8schain v0.0.0-20230307034325-57f010d26af8
github.com/google/uuid v1.3.0
github.com/minio/minio-go/v7 v7.0.49
github.com/onsi/gomega v1.27.2
github.com/onsi/gomega v1.27.5
github.com/opencontainers/go-digest v1.0.0
github.com/opencontainers/go-digest/blake3 v0.0.0-20220411205349-bde1400a84be
github.com/ory/dockertest/v3 v3.9.1
@ -60,10 +60,10 @@ require (
helm.sh/helm/v3 v3.11.2
k8s.io/api v0.26.3
k8s.io/apimachinery v0.26.3
k8s.io/client-go v0.26.2
k8s.io/client-go v0.26.3
k8s.io/utils v0.0.0-20230220204549-a5ecb0141aa5
sigs.k8s.io/cli-utils v0.34.0
sigs.k8s.io/controller-runtime v0.14.5
sigs.k8s.io/controller-runtime v0.14.6
sigs.k8s.io/yaml v1.3.0
)
@ -202,7 +202,7 @@ require (
github.com/golang/glog v1.0.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/gomodule/redigo v1.8.2 // indirect
github.com/google/btree v1.1.2 // indirect
@ -362,14 +362,14 @@ require (
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/exp v0.0.0-20220823124025-807a23277127 // indirect
golang.org/x/mod v0.8.0 // indirect
golang.org/x/mod v0.9.0 // indirect
golang.org/x/net v0.8.0 // indirect
golang.org/x/oauth2 v0.5.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/term v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
golang.org/x/time v0.3.0 // indirect
golang.org/x/tools v0.6.0 // indirect
golang.org/x/tools v0.7.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
@ -387,7 +387,7 @@ require (
k8s.io/apiextensions-apiserver v0.26.1 // indirect
k8s.io/apiserver v0.26.1 // indirect
k8s.io/cli-runtime v0.26.0 // indirect
k8s.io/component-base v0.26.2 // indirect
k8s.io/component-base v0.26.3 // indirect
k8s.io/klog/v2 v2.90.1 // indirect
k8s.io/kube-openapi v0.0.0-20221110221610-a28e98eb7c70 // indirect
k8s.io/kubectl v0.26.0 // indirect

35
go.sum
View File

@ -550,8 +550,8 @@ github.com/fluxcd/pkg/masktoken v0.2.0 h1:HoSPTk4l1fz5Fevs2vVRvZGru33blfMwWSZKsH
github.com/fluxcd/pkg/masktoken v0.2.0/go.mod h1:EA7GleAHL33kN6kTW06m5R3/Q26IyuGO7Ef/0CtpDI0=
github.com/fluxcd/pkg/oci v0.21.1 h1:9kn19wkabE2xB77NRlOtMJlSYhZmUjdloZCzlHdAS6s=
github.com/fluxcd/pkg/oci v0.21.1/go.mod h1:9E2DBlQII7YmeWt2ieTh38wwkiBqx3yg5NEJ51uefaA=
github.com/fluxcd/pkg/runtime v0.33.0 h1:y6mFOj22mU/BXAxSTucTlT7vrWUjd0+iccK0pRN5CF0=
github.com/fluxcd/pkg/runtime v0.33.0/go.mod h1:oDTerqMMtOQVNZeidwAPG7g/ai2xuidUduJzQh1IBVI=
github.com/fluxcd/pkg/runtime v0.35.0 h1:9PYLcul8qdfLYQArcYpHe/QuMqyhAGGFN9F7uY/QVX4=
github.com/fluxcd/pkg/runtime v0.35.0/go.mod h1:sAaSTH8RHj3Y99xj0AtAndDTe5cv0DP4enyLV62EO78=
github.com/fluxcd/pkg/sourceignore v0.3.3 h1:Ue29JAuPECEYdvIqdpXpQaDxpeySn7amarLArp7XoIs=
github.com/fluxcd/pkg/sourceignore v0.3.3/go.mod h1:yuJzKggph0Bdbk9LgXjJQhvJZSTJV/1vS7mJuB7mPa0=
github.com/fluxcd/pkg/ssh v0.7.3 h1:Dhs+nXdp806lBriUJtPyRi0SVIVWbJafJGD/qQ71GiY=
@ -680,8 +680,8 @@ github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfC
github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/go-stack/stack v1.8.1/go.mod h1:dcoOX6HbPZSZptuspn9bctJ+N/CnF5gGygcUP3XYfe4=
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0 h1:p104kn46Q8WdvHunIJ9dAyjPVtrBPhSr3KT2yUst43I=
github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE=
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI=
github.com/go-test/deep v1.1.0 h1:WOcxcdHcvdgThNXjw0t76K42FXTU7HpNQWHpA2HHNlg=
github.com/gobuffalo/attrs v0.0.0-20190224210810-a9411de4debd/go.mod h1:4duuawTqi2wkkpB4ePgWMaai6/Kc6WEz83bhFwpHzj0=
github.com/gobuffalo/depgen v0.0.0-20190329151759-d478694a28d3/go.mod h1:3STtPUQYuzV0gBVOY3vy6CfMm/ljR4pABfrTeHNLHUY=
@ -773,8 +773,9 @@ github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw
github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.1/go.mod h1:DopwsBzvsk0Fs44TXzsVbJyPhcCPeIwnvohx4u74HPM=
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.2/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
@ -1250,15 +1251,15 @@ github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vv
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU=
github.com/onsi/ginkgo/v2 v2.1.3/go.mod h1:vw5CSIxN1JObi/U8gcbwft7ZxR2dgaR70JSE3/PpL4c=
github.com/onsi/ginkgo/v2 v2.8.4 h1:gf5mIQ8cLFieruNLAdgijHF1PYfLphKm2dxxcUtcqK0=
github.com/onsi/ginkgo/v2 v2.9.2 h1:BA2GMJOtfGAfagzYtrAlufIP0lq6QERkFmHLMLPwFSU=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/onsi/gomega v1.17.0/go.mod h1:HnhC7FXeEQY45zxNK3PPoIUhzk/80Xly9PcubAlGdZY=
github.com/onsi/gomega v1.19.0/go.mod h1:LY+I3pBVzYsTBU1AnDwOSxaYi9WoWiqgwooUqq9yPro=
github.com/onsi/gomega v1.27.2 h1:SKU0CXeKE/WVgIV1T61kSa3+IRE8Ekrv9rdXDwwTqnY=
github.com/onsi/gomega v1.27.2/go.mod h1:5mR3phAHpkAVIDkHEUBY6HGVsU+cpcEscrGPB4oPlZI=
github.com/onsi/gomega v1.27.5 h1:T/X6I0RNFw/kTqgfkZPcQ5KU6vCnWNBGdtrIx2dpGeQ=
github.com/onsi/gomega v1.27.5/go.mod h1:PIQNjfQwkP3aQAH7lf7j87O/5FiNr+ZR8+ipb+qQlhg=
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk=
github.com/opencontainers/go-digest v1.0.1-0.20220411205349-bde1400a84be h1:f2PlhC9pm5sqpBZFvnAoKj+KzXRzbjFMA+TqXfJdgho=
github.com/opencontainers/go-digest v1.0.1-0.20220411205349-bde1400a84be/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM=
@ -1831,8 +1832,8 @@ golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.5.0/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro=
golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3/go.mod h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.8.0 h1:LUYupSeNrTNCGzR/hVBk2NHZO4hXcVaW1k4Qx7rjPx8=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.9.0 h1:KENHtAZL2y3NLMYZeHY9DW8HW8V+kQyJsY/V9JlKvCs=
golang.org/x/mod v0.9.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@ -2195,8 +2196,8 @@ golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.7/go.mod h1:LGqMHiF4EqQNHR1JncWGqT5BVaXmza+X+BDGol+dOxo=
golang.org/x/tools v0.1.10/go.mod h1:Uh6Zz+xoGYZom868N8YTex3t7RhtHDBrE8Gzo9bV56E=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.6.0 h1:BOw41kyTf3PuCW1pVQf8+Cyg8pMlkYB1oo9iJ6D/lKM=
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
golang.org/x/tools v0.7.0 h1:W4OVu8VVOaIO0yzWMNdepAulS7YfoS3Zabrm8DOXXU4=
golang.org/x/tools v0.7.0/go.mod h1:4pg6aUX35JBAogB10C9AtvVL+qowtN4pT3CGSQex14s=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
@ -2499,10 +2500,10 @@ k8s.io/apiserver v0.26.1 h1:6vmnAqCDO194SVCPU3MU8NcDgSqsUA62tBUSWrFXhsc=
k8s.io/apiserver v0.26.1/go.mod h1:wr75z634Cv+sifswE9HlAo5FQ7UoUauIICRlOE+5dCg=
k8s.io/cli-runtime v0.26.0 h1:aQHa1SyUhpqxAw1fY21x2z2OS5RLtMJOCj7tN4oq8mw=
k8s.io/cli-runtime v0.26.0/go.mod h1:o+4KmwHzO/UK0wepE1qpRk6l3o60/txUZ1fEXWGIKTY=
k8s.io/client-go v0.26.2 h1:s1WkVujHX3kTp4Zn4yGNFK+dlDXy1bAAkIl+cFAiuYI=
k8s.io/client-go v0.26.2/go.mod h1:u5EjOuSyBa09yqqyY7m3abZeovO/7D/WehVVlZ2qcqU=
k8s.io/component-base v0.26.2 h1:IfWgCGUDzrD6wLLgXEstJKYZKAFS2kO+rBRi0p3LqcI=
k8s.io/component-base v0.26.2/go.mod h1:DxbuIe9M3IZPRxPIzhch2m1eT7uFrSBJUBuVCQEBivs=
k8s.io/client-go v0.26.3 h1:k1UY+KXfkxV2ScEL3gilKcF7761xkYsSD6BC9szIu8s=
k8s.io/client-go v0.26.3/go.mod h1:ZPNu9lm8/dbRIPAgteN30RSXea6vrCpFvq+MateTUuQ=
k8s.io/component-base v0.26.3 h1:oC0WMK/ggcbGDTkdcqefI4wIZRYdK3JySx9/HADpV0g=
k8s.io/component-base v0.26.3/go.mod h1:5kj1kZYwSC6ZstHJN7oHBqcJC6yyn41eR+Sqa/mQc8E=
k8s.io/klog/v2 v2.90.1 h1:m4bYOKall2MmOiRaR1J+We67Do7vm9KiQVlT96lnHUw=
k8s.io/klog/v2 v2.90.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0=
k8s.io/kube-openapi v0.0.0-20221110221610-a28e98eb7c70 h1:zfqQc1V6/ZgGpvrOVvr62OjiqQX4lZjfznK34NQwkqw=
@ -2519,8 +2520,8 @@ rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0=
rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA=
sigs.k8s.io/cli-utils v0.34.0 h1:zCUitt54f0/MYj/ajVFnG6XSXMhpZ72O/3RewIchW8w=
sigs.k8s.io/cli-utils v0.34.0/go.mod h1:EXyMwPMu9OL+LRnj0JEMsGG/fRvbgFadcVlSnE8RhFs=
sigs.k8s.io/controller-runtime v0.14.5 h1:6xaWFqzT5KuAQ9ufgUaj1G/+C4Y1GRkhrxl+BJ9i+5s=
sigs.k8s.io/controller-runtime v0.14.5/go.mod h1:WqIdsAY6JBsjfc/CqO0CORmNtoCtE4S6qbPc9s68h+0=
sigs.k8s.io/controller-runtime v0.14.6 h1:oxstGVvXGNnMvY7TAESYk+lzr6S3V5VFxQ6d92KcwQA=
sigs.k8s.io/controller-runtime v0.14.6/go.mod h1:WqIdsAY6JBsjfc/CqO0CORmNtoCtE4S6qbPc9s68h+0=
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo=
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0=
sigs.k8s.io/kustomize/api v0.12.1 h1:7YM7gW3kYBwtKvoY216ZzY+8hM+lV53LUayghNRJ0vM=

278
main.go
View File

@ -21,10 +21,8 @@ import (
"net"
"net/http"
"os"
"path/filepath"
"time"
"github.com/go-logr/logr"
flag "github.com/spf13/pflag"
"helm.sh/helm/v3/pkg/getter"
corev1 "k8s.io/api/core/v1"
@ -32,7 +30,9 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"k8s.io/client-go/tools/record"
ctrl "sigs.k8s.io/controller-runtime"
ctrlcache "sigs.k8s.io/controller-runtime/pkg/cache"
ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
"github.com/fluxcd/pkg/git"
@ -45,16 +45,16 @@ import (
"github.com/fluxcd/pkg/runtime/pprof"
"github.com/fluxcd/pkg/runtime/probes"
"github.com/fluxcd/source-controller/internal/digest"
"github.com/fluxcd/source-controller/internal/features"
"github.com/fluxcd/source-controller/internal/helm/registry"
"github.com/fluxcd/source-controller/api/v1"
"github.com/fluxcd/source-controller/api/v1beta2"
// +kubebuilder:scaffold:imports
v1 "github.com/fluxcd/source-controller/api/v1"
v1beta2 "github.com/fluxcd/source-controller/api/v1beta2"
"github.com/fluxcd/source-controller/controllers"
"github.com/fluxcd/source-controller/internal/cache"
intdigest "github.com/fluxcd/source-controller/internal/digest"
"github.com/fluxcd/source-controller/internal/features"
"github.com/fluxcd/source-controller/internal/helm"
// +kubebuilder:scaffold:imports
"github.com/fluxcd/source-controller/internal/helm/registry"
)
const controllerName = "source-controller"
@ -92,7 +92,6 @@ func main() {
storageAdvAddr string
concurrent int
requeueDependency time.Duration
watchAllNamespaces bool
helmIndexLimit int64
helmChartLimit int64
helmChartFileLimit int64
@ -101,6 +100,7 @@ func main() {
leaderElectionOptions leaderelection.Options
rateLimiterOptions helper.RateLimiterOptions
featureGates feathelper.FeatureGates
watchOptions helper.WatchOptions
helmCacheMaxSize int
helmCacheTTL string
helmCachePurgeInterval string
@ -121,8 +121,6 @@ func main() {
flag.StringVar(&storageAdvAddr, "storage-adv-addr", envOrDefault("STORAGE_ADV_ADDR", ""),
"The advertised address of the static file server.")
flag.IntVar(&concurrent, "concurrent", 2, "The number of concurrent reconciles per controller.")
flag.BoolVar(&watchAllNamespaces, "watch-all-namespaces", true,
"Watch for custom resources in all namespaces, if set to false it will only watch the runtime namespace.")
flag.Int64Var(&helmIndexLimit, "helm-index-max-size", helm.MaxIndexSize,
"The max allowed size in bytes of a Helm repository index file.")
flag.Int64Var(&helmChartLimit, "helm-chart-max-size", helm.MaxChartSize,
@ -145,7 +143,7 @@ func main() {
"The duration of time that artifacts from previous reconciliations will be kept in storage before being garbage collected.")
flag.IntVar(&artifactRetentionRecords, "artifact-retention-records", 2,
"The maximum number of artifacts to be kept in storage after a garbage collection.")
flag.StringVar(&artifactDigestAlgo, "artifact-digest-algo", digest.Canonical.String(),
flag.StringVar(&artifactDigestAlgo, "artifact-digest-algo", intdigest.Canonical.String(),
"The algorithm to use to calculate the digest of artifacts.")
clientOptions.BindFlags(flag.CommandLine)
@ -153,88 +151,34 @@ func main() {
leaderElectionOptions.BindFlags(flag.CommandLine)
rateLimiterOptions.BindFlags(flag.CommandLine)
featureGates.BindFlags(flag.CommandLine)
watchOptions.BindFlags(flag.CommandLine)
flag.Parse()
logger.SetLogger(logger.NewLogger(logOptions))
err := featureGates.WithLogger(setupLog).
SupportedFeatures(features.FeatureGates())
if err != nil {
if err := featureGates.WithLogger(setupLog).SupportedFeatures(features.FeatureGates()); err != nil {
setupLog.Error(err, "unable to load feature gates")
os.Exit(1)
}
if artifactDigestAlgo != digest.Canonical.String() {
algo, err := digest.AlgorithmForName(artifactDigestAlgo)
if err != nil {
setupLog.Error(err, "unable to configure canonical digest algorithm")
os.Exit(1)
}
digest.Canonical = algo
}
helm.MaxIndexSize = helmIndexLimit
helm.MaxChartSize = helmChartLimit
helm.MaxChartFileSize = helmChartFileLimit
watchNamespace := ""
if !watchAllNamespaces {
watchNamespace = os.Getenv("RUNTIME_NAMESPACE")
}
var disableCacheFor []ctrlclient.Object
shouldCache, err := features.Enabled(features.CacheSecretsAndConfigMaps)
if err != nil {
setupLog.Error(err, "unable to check feature gate "+features.CacheSecretsAndConfigMaps)
os.Exit(1)
}
if !shouldCache {
disableCacheFor = append(disableCacheFor, &corev1.Secret{}, &corev1.ConfigMap{})
}
restConfig := client.GetConfigOrDie(clientOptions)
mgr, err := ctrl.NewManager(restConfig, ctrl.Options{
Scheme: scheme,
MetricsBindAddress: metricsAddr,
HealthProbeBindAddress: healthAddr,
Port: 9443,
LeaderElection: leaderElectionOptions.Enable,
LeaderElectionReleaseOnCancel: leaderElectionOptions.ReleaseOnCancel,
LeaseDuration: &leaderElectionOptions.LeaseDuration,
RenewDeadline: &leaderElectionOptions.RenewDeadline,
RetryPeriod: &leaderElectionOptions.RetryPeriod,
LeaderElectionID: fmt.Sprintf("%s-leader-election", controllerName),
Namespace: watchNamespace,
Logger: ctrl.Log,
ClientDisableCacheFor: disableCacheFor,
})
if err != nil {
setupLog.Error(err, "unable to start manager")
os.Exit(1)
}
mgr := mustSetupManager(metricsAddr, healthAddr, watchOptions, clientOptions, leaderElectionOptions)
probes.SetupChecks(mgr, setupLog)
pprof.SetupHandlers(mgr, setupLog)
var eventRecorder *events.Recorder
if eventRecorder, err = events.NewRecorder(mgr, ctrl.Log, eventsAddr, controllerName); err != nil {
setupLog.Error(err, "unable to create event recorder")
os.Exit(1)
}
metrics := helper.MustMakeMetrics(mgr)
cacheRecorder := cache.MustMakeMetrics()
eventRecorder := mustSetupEventRecorder(mgr, eventsAddr, controllerName)
storage := mustInitStorage(storagePath, storageAdvAddr, artifactRetentionTTL, artifactRetentionRecords, artifactDigestAlgo)
metricsH := helper.MustMakeMetrics(mgr)
mustSetupHelmLimits(helmIndexLimit, helmChartLimit, helmChartFileLimit)
helmIndexCache, helmIndexCacheItemTTL := mustInitHelmCache(helmCacheMaxSize, helmCacheTTL, helmCachePurgeInterval)
if storageAdvAddr == "" {
storageAdvAddr = determineAdvStorageAddr(storageAddr, setupLog)
}
storage := mustInitStorage(storagePath, storageAdvAddr, artifactRetentionTTL, artifactRetentionRecords, setupLog)
if err = (&controllers.GitRepositoryReconciler{
if err := (&controllers.GitRepositoryReconciler{
Client: mgr.GetClient(),
EventRecorder: eventRecorder,
Metrics: metricsH,
Metrics: metrics,
Storage: storage,
ControllerName: controllerName,
}).SetupWithManagerAndOptions(mgr, controllers.GitRepositoryReconcilerOptions{
@ -246,10 +190,10 @@ func main() {
os.Exit(1)
}
if err = (&controllers.HelmRepositoryOCIReconciler{
if err := (&controllers.HelmRepositoryOCIReconciler{
Client: mgr.GetClient(),
EventRecorder: eventRecorder,
Metrics: metricsH,
Metrics: metrics,
Getters: getters,
ControllerName: controllerName,
RegistryClientGenerator: registry.ClientGenerator,
@ -261,35 +205,15 @@ func main() {
os.Exit(1)
}
var c *cache.Cache
var ttl time.Duration
if helmCacheMaxSize > 0 {
interval, err := time.ParseDuration(helmCachePurgeInterval)
if err != nil {
setupLog.Error(err, "unable to parse cache purge interval")
os.Exit(1)
}
ttl, err = time.ParseDuration(helmCacheTTL)
if err != nil {
setupLog.Error(err, "unable to parse cache TTL")
os.Exit(1)
}
c = cache.New(helmCacheMaxSize, interval)
}
cacheRecorder := cache.MustMakeMetrics()
if err = (&controllers.HelmRepositoryReconciler{
if err := (&controllers.HelmRepositoryReconciler{
Client: mgr.GetClient(),
EventRecorder: eventRecorder,
Metrics: metricsH,
Metrics: metrics,
Storage: storage,
Getters: getters,
ControllerName: controllerName,
Cache: c,
TTL: ttl,
Cache: helmIndexCache,
TTL: helmIndexCacheItemTTL,
CacheRecorder: cacheRecorder,
}).SetupWithManagerAndOptions(mgr, controllers.HelmRepositoryReconcilerOptions{
MaxConcurrentReconciles: concurrent,
@ -299,16 +223,16 @@ func main() {
os.Exit(1)
}
if err = (&controllers.HelmChartReconciler{
if err := (&controllers.HelmChartReconciler{
Client: mgr.GetClient(),
RegistryClientGenerator: registry.ClientGenerator,
Storage: storage,
Getters: getters,
EventRecorder: eventRecorder,
Metrics: metricsH,
Metrics: metrics,
ControllerName: controllerName,
Cache: c,
TTL: ttl,
Cache: helmIndexCache,
TTL: helmIndexCacheItemTTL,
CacheRecorder: cacheRecorder,
}).SetupWithManagerAndOptions(mgr, controllers.HelmChartReconcilerOptions{
MaxConcurrentReconciles: concurrent,
@ -317,10 +241,11 @@ func main() {
setupLog.Error(err, "unable to create controller", "controller", v1beta2.HelmChartKind)
os.Exit(1)
}
if err = (&controllers.BucketReconciler{
if err := (&controllers.BucketReconciler{
Client: mgr.GetClient(),
EventRecorder: eventRecorder,
Metrics: metricsH,
Metrics: metrics,
Storage: storage,
ControllerName: controllerName,
}).SetupWithManagerAndOptions(mgr, controllers.BucketReconcilerOptions{
@ -330,12 +255,13 @@ func main() {
setupLog.Error(err, "unable to create controller", "controller", "Bucket")
os.Exit(1)
}
if err = (&controllers.OCIRepositoryReconciler{
if err := (&controllers.OCIRepositoryReconciler{
Client: mgr.GetClient(),
Storage: storage,
EventRecorder: eventRecorder,
ControllerName: controllerName,
Metrics: metricsH,
Metrics: metrics,
}).SetupWithManagerAndOptions(mgr, controllers.OCIRepositoryReconcilerOptions{
MaxConcurrentReconciles: concurrent,
RateLimiter: helper.GetRateLimiter(rateLimiterOptions),
@ -351,7 +277,7 @@ func main() {
// to handle that.
<-mgr.Elected()
startFileServer(storage.BasePath, storageAddr, setupLog)
startFileServer(storage.BasePath, storageAddr)
}()
setupLog.Info("starting manager")
@ -361,37 +287,139 @@ func main() {
}
}
func startFileServer(path string, address string, l logr.Logger) {
l.Info("starting file server")
func startFileServer(path string, address string) {
setupLog.Info("starting file server")
fs := http.FileServer(http.Dir(path))
mux := http.NewServeMux()
mux.Handle("/", fs)
err := http.ListenAndServe(address, mux)
if err != nil {
l.Error(err, "file server error")
setupLog.Error(err, "file server error")
}
}
func mustInitStorage(path string, storageAdvAddr string, artifactRetentionTTL time.Duration, artifactRetentionRecords int, l logr.Logger) *controllers.Storage {
if path == "" {
p, _ := os.Getwd()
path = filepath.Join(p, "bin")
os.MkdirAll(path, 0o700)
func mustSetupEventRecorder(mgr ctrl.Manager, eventsAddr, controllerName string) record.EventRecorder {
eventRecorder, err := events.NewRecorder(mgr, ctrl.Log, eventsAddr, controllerName)
if err != nil {
setupLog.Error(err, "unable to create event recorder")
os.Exit(1)
}
return eventRecorder
}
func mustSetupManager(metricsAddr, healthAddr string, watchOpts helper.WatchOptions, clientOpts client.Options, leaderOpts leaderelection.Options) ctrl.Manager {
watchNamespace := ""
if !watchOpts.AllNamespaces {
watchNamespace = os.Getenv("RUNTIME_NAMESPACE")
}
watchSelector, err := helper.GetWatchSelector(watchOpts)
if err != nil {
setupLog.Error(err, "unable to configure watch label selector for manager")
os.Exit(1)
}
newSelectingCache := ctrlcache.BuilderWithOptions(ctrlcache.Options{
SelectorsByObject: ctrlcache.SelectorsByObject{
&v1.GitRepository{}: {Label: watchSelector},
&v1beta2.HelmRepository{}: {Label: watchSelector},
&v1beta2.HelmChart{}: {Label: watchSelector},
&v1beta2.Bucket{}: {Label: watchSelector},
&v1beta2.OCIRepository{}: {Label: watchSelector},
},
})
var disableCacheFor []ctrlclient.Object
shouldCache, err := features.Enabled(features.CacheSecretsAndConfigMaps)
if err != nil {
setupLog.Error(err, "unable to check feature gate "+features.CacheSecretsAndConfigMaps)
os.Exit(1)
}
if !shouldCache {
disableCacheFor = append(disableCacheFor, &corev1.Secret{}, &corev1.ConfigMap{})
}
leaderElectionId := fmt.Sprintf("%s-%s", controllerName, "leader-election")
if watchOpts.LabelSelector != "" {
leaderElectionId = leaderelection.GenerateID(leaderElectionId, watchOpts.LabelSelector)
}
restConfig := client.GetConfigOrDie(clientOpts)
mgr, err := ctrl.NewManager(restConfig, ctrl.Options{
Scheme: scheme,
MetricsBindAddress: metricsAddr,
HealthProbeBindAddress: healthAddr,
Port: 9443,
LeaderElection: leaderOpts.Enable,
LeaderElectionReleaseOnCancel: leaderOpts.ReleaseOnCancel,
LeaseDuration: &leaderOpts.LeaseDuration,
RenewDeadline: &leaderOpts.RenewDeadline,
RetryPeriod: &leaderOpts.RetryPeriod,
LeaderElectionID: leaderElectionId,
Namespace: watchNamespace,
Logger: ctrl.Log,
ClientDisableCacheFor: disableCacheFor,
NewCache: newSelectingCache,
})
if err != nil {
setupLog.Error(err, "unable to start manager")
os.Exit(1)
}
return mgr
}
func mustSetupHelmLimits(indexLimit, chartLimit, chartFileLimit int64) {
helm.MaxIndexSize = indexLimit
helm.MaxChartSize = chartLimit
helm.MaxChartFileSize = chartFileLimit
}
func mustInitHelmCache(maxSize int, purgeInterval, itemTTL string) (*cache.Cache, time.Duration) {
if maxSize <= 0 {
setupLog.Info("caching of Helm index files is disabled")
return nil, -1
}
interval, err := time.ParseDuration(purgeInterval)
if err != nil {
setupLog.Error(err, "unable to parse Helm index cache purge interval")
os.Exit(1)
}
ttl, err := time.ParseDuration(itemTTL)
if err != nil {
setupLog.Error(err, "unable to parse Helm index cache item TTL")
os.Exit(1)
}
return cache.New(maxSize, interval), ttl
}
func mustInitStorage(path string, storageAdvAddr string, artifactRetentionTTL time.Duration, artifactRetentionRecords int, artifactDigestAlgo string) *controllers.Storage {
if storageAdvAddr == "" {
storageAdvAddr = determineAdvStorageAddr(storageAdvAddr)
}
if artifactDigestAlgo != intdigest.Canonical.String() {
algo, err := intdigest.AlgorithmForName(artifactDigestAlgo)
if err != nil {
setupLog.Error(err, "unable to configure canonical digest algorithm")
os.Exit(1)
}
intdigest.Canonical = algo
}
storage, err := controllers.NewStorage(path, storageAdvAddr, artifactRetentionTTL, artifactRetentionRecords)
if err != nil {
l.Error(err, "unable to initialise storage")
setupLog.Error(err, "unable to initialise storage")
os.Exit(1)
}
return storage
}
func determineAdvStorageAddr(storageAddr string, l logr.Logger) string {
func determineAdvStorageAddr(storageAddr string) string {
host, port, err := net.SplitHostPort(storageAddr)
if err != nil {
l.Error(err, "unable to parse storage address")
setupLog.Error(err, "unable to parse storage address")
os.Exit(1)
}
switch host {
@ -402,7 +430,7 @@ func determineAdvStorageAddr(storageAddr string, l logr.Logger) string {
if host == "" {
hn, err := os.Hostname()
if err != nil {
l.Error(err, "0.0.0.0 specified in storage addr but hostname is invalid")
setupLog.Error(err, "0.0.0.0 specified in storage addr but hostname is invalid")
os.Exit(1)
}
host = hn