From 641864dd528b4220a106581b2159ffc502ab2ae5 Mon Sep 17 00:00:00 2001 From: Matt Moore Date: Wed, 22 Jul 2020 09:10:57 -0700 Subject: [PATCH] [master] Auto-update dependencies (#291) Produced via: `./hack/update-deps.sh --upgrade && ./hack/update-codegen.sh` /assign n3wscott vagababov /cc n3wscott vagababov --- go.mod | 6 +- go.sum | 17 +++- vendor/golang.org/x/sys/unix/syscall_linux.go | 12 +++ .../golang.org/x/sys/unix/zsyscall_linux.go | 46 ++++++++++ .../knative.dev/pkg/controller/controller.go | 84 ++++++++++++------- .../pkg/controller/two_lane_queue.go | 6 +- vendor/knative.dev/pkg/hash/bucketer.go | 35 ++++---- .../knative.dev/pkg/leaderelection/context.go | 67 ++++++--------- vendor/modules.txt | 12 +-- 9 files changed, 179 insertions(+), 106 deletions(-) diff --git a/go.mod b/go.mod index 4b4253a7..0f9a58c6 100644 --- a/go.mod +++ b/go.mod @@ -12,12 +12,12 @@ require ( github.com/tsenart/go-tsz v0.0.0-20180814235614-0bd30b3df1c3 // indirect github.com/tsenart/vegeta v12.7.1-0.20190725001342-b5f4fca92137+incompatible k8s.io/api v0.17.6 - k8s.io/apimachinery v0.18.5 + k8s.io/apimachinery v0.18.6 k8s.io/client-go v11.0.1-0.20190805182717-6502b5e7b1b5+incompatible k8s.io/code-generator v0.18.0 k8s.io/kube-openapi v0.0.0-20200410145947-bcb3869e6f29 - knative.dev/pkg v0.0.0-20200721091635-3b7ca76a63e4 - knative.dev/test-infra v0.0.0-20200720224135-d2706240545c + knative.dev/pkg v0.0.0-20200722085354-ba0f3cb7cf84 + knative.dev/test-infra v0.0.0-20200721175154-c98db9bd4d5d ) replace ( diff --git a/go.sum b/go.sum index 007d3042..96f76cee 100644 --- a/go.sum +++ b/go.sum @@ -761,6 +761,7 @@ github.com/mattn/go-shellwords v1.0.10/go.mod h1:EZzvwXDESEeg03EKmM+RmDnNOPKG4lL github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= github.com/mattn/go-sqlite3 v2.0.1+incompatible/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= github.com/mattn/go-zglob v0.0.1/go.mod h1:9fxibJccNxU2cnpIKLRRFA7zX7qhkJIQWBb449FYHOo= +github.com/mattn/go-zglob v0.0.2/go.mod h1:9fxibJccNxU2cnpIKLRRFA7zX7qhkJIQWBb449FYHOo= github.com/mattn/goveralls v0.0.2/go.mod h1:8d1ZMHsd7fW6IRPKQh46F2WRpyib5/X4FOpevwGNQEw= github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= @@ -1324,6 +1325,8 @@ golang.org/x/sys v0.0.0-20200523222454-059865788121/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200610111108-226ff32320da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae h1:Ih9Yo4hSPImZOpfGuA4bR/ORKTAbhZo2AbWNRCnevdo= golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200720211630-cb9d2d5c5666 h1:gVCS+QOncANNPlmlO1AhlU3oxs4V9z+gTtPwIk3p2N8= +golang.org/x/sys v0.0.0-20200720211630-cb9d2d5c5666/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.0.0-20160726164857-2910a502d2bf/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -1431,6 +1434,8 @@ golang.org/x/tools v0.0.0-20200709181711-e327e1019dfe/go.mod h1:njjCfa9FT2d7l9Bc golang.org/x/tools v0.0.0-20200713011307-fd294ab11aed/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200714190737-9048b464a08d h1:hYhnolbefSSt3WZp66sgmgnEOFv5PD6a5PIcnKJ8jdU= golang.org/x/tools v0.0.0-20200714190737-9048b464a08d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= +golang.org/x/tools v0.0.0-20200721032237-77f530d86f9a h1:kVMPw4f6EVqYdfGQTedjrpw1dbE2PEMfw4jwXsNdn9s= +golang.org/x/tools v0.0.0-20200721032237-77f530d86f9a/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4= @@ -1526,6 +1531,8 @@ google.golang.org/genproto v0.0.0-20200709005830-7a2ca40e9dc3/go.mod h1:FWY/as6D google.golang.org/genproto v0.0.0-20200711021454-869866162049/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20200715011427-11fb19a81f2c h1:6DWnZZ6EY/59QRRQttZKiktVL23UuQYs7uy75MhhLRM= google.golang.org/genproto v0.0.0-20200715011427-11fb19a81f2c/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= +google.golang.org/genproto v0.0.0-20200721032028-5044d0edf986 h1:10ohwcLf82I55O/aQxYqmWKoOdNbQTYYComeP1KDOS4= +google.golang.org/genproto v0.0.0-20200721032028-5044d0edf986/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/grpc v0.0.0-20160317175043-d3ddb4469d5a/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw= google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= @@ -1664,18 +1671,19 @@ k8s.io/metrics v0.17.2/go.mod h1:3TkNHET4ROd+NfzNxkjoVfQ0Ob4iZnaHmSEA4vYpwLw= k8s.io/test-infra v0.0.0-20200514184223-ba32c8aae783/go.mod h1:bW6thaPZfL2hW7ecjx2WYwlP9KQLM47/xIJyttkVk5s= k8s.io/test-infra v0.0.0-20200617221206-ea73eaeab7ff/go.mod h1:L3+cRvwftUq8IW1TrHji5m3msnc4uck/7LsE/GR/aZk= k8s.io/test-infra v0.0.0-20200715094037-cc150f5ae724/go.mod h1:D2jUSuQFYy6McY2qbknsLUE9stqN0yIuJ+rjdUAxSCs= +k8s.io/test-infra v0.0.0-20200721115715-1af01ef6b4c8/go.mod h1:4cRZlOy5Ka3Ym/orCmNWL2dsE39pN0xHFT0WFrZe2HQ= k8s.io/utils v0.0.0-20191114184206-e782cd3c129f/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew= k8s.io/utils v0.0.0-20200124190032-861946025e34 h1:HjlUD6M0K3P8nRXmr2B9o4F9dUy9TCj/aEpReeyi6+k= k8s.io/utils v0.0.0-20200124190032-861946025e34/go.mod h1:sZAwmy6armz5eXlNoLmJcl4F1QuKu7sr+mFQ0byX7Ew= knative.dev/caching v0.0.0-20200116200605-67bca2c83dfa/go.mod h1:dHXFU6CGlLlbzaWc32g80cR92iuBSpsslDNBWI8C7eg= knative.dev/eventing-contrib v0.11.2/go.mod h1:SnXZgSGgMSMLNFTwTnpaOH7hXDzTFtw0J8OmHflNx3g= knative.dev/pkg v0.0.0-20200207155214-fef852970f43/go.mod h1:pgODObA1dTyhNoFxPZTTjNWfx6F0aKsKzn+vaT9XO/Q= -knative.dev/pkg v0.0.0-20200721091635-3b7ca76a63e4 h1:Xwz4UudITaD3T8KXv/ReRMtDGeh5vX+YOhkRF3Hlb4o= -knative.dev/pkg v0.0.0-20200721091635-3b7ca76a63e4/go.mod h1:uiDEJrNciKWl5fTSURoGXekCN5Wz9yo9ambLR80ISL0= +knative.dev/pkg v0.0.0-20200722085354-ba0f3cb7cf84 h1:VQKWwqV0RYZi9pCCVykFY9WxQotZXr79zkGlVGi6KjE= +knative.dev/pkg v0.0.0-20200722085354-ba0f3cb7cf84/go.mod h1:uiDEJrNciKWl5fTSURoGXekCN5Wz9yo9ambLR80ISL0= knative.dev/test-infra v0.0.0-20200719034534-5adf654f5ed5 h1:ZspjtLBz7pzkB9PvPxSgDNHVNftTU1mmjtYh+j5LZJE= knative.dev/test-infra v0.0.0-20200719034534-5adf654f5ed5/go.mod h1:mAsPDmFmlsTJjRWplWBz8xtEiarSgvGiiOjkGj4Or1g= -knative.dev/test-infra v0.0.0-20200720224135-d2706240545c h1:hUjic+ENVkzG7Kp4GhXp1+WkUsCzbGFaBrgeAMet4v8= -knative.dev/test-infra v0.0.0-20200720224135-d2706240545c/go.mod h1:mAsPDmFmlsTJjRWplWBz8xtEiarSgvGiiOjkGj4Or1g= +knative.dev/test-infra v0.0.0-20200721175154-c98db9bd4d5d h1:GFzUmKNa8SVL3uMwa3R+NOjaSY1oUzxNdNWW3j5HrZ4= +knative.dev/test-infra v0.0.0-20200721175154-c98db9bd4d5d/go.mod h1:kzRhTm5L08eDQFRl8NKSAN93lz6IZWQMs+2TjTCN+VA= modernc.org/cc v1.0.0/go.mod h1:1Sk4//wdnYJiUIxnW8ddKpaOJCF37yAdqYnkxUpaYxw= modernc.org/golex v1.0.0/go.mod h1:b/QX9oBD/LhixY6NDh+IdGv17hgB+51fET1i2kPSmvk= modernc.org/mathutil v1.0.0/go.mod h1:wU0vUrJsVWBZ4P6e7xtFJEhFSNsfRLJ8H458uRjg03k= @@ -1694,6 +1702,7 @@ rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= rsc.io/sampler v1.3.0/go.mod h1:T1hPZKmBbMNahiBKFy5HrXp6adAjACjK9JXDnKaTXpA= sigs.k8s.io/boskos v0.0.0-20200526191642-45fc818e2d00/go.mod h1:L1ubP7d1CCMSQSjKiZv6dGbh7b4kfoG+dFPj8cfYDnI= sigs.k8s.io/boskos v0.0.0-20200710214748-f5935686c7fc/go.mod h1:ZO5RV+VxJS9mb6DvZ1yAjywoyq/wQ8b0vDoZxcIA5kE= +sigs.k8s.io/boskos v0.0.0-20200717180850-7299d535c033/go.mod h1:ZO5RV+VxJS9mb6DvZ1yAjywoyq/wQ8b0vDoZxcIA5kE= sigs.k8s.io/controller-runtime v0.5.0/go.mod h1:REiJzC7Y00U+2YkMbT8wxgrsX5USpXKGhb2sCtAXiT8= sigs.k8s.io/controller-runtime v0.5.4/go.mod h1:JZUwSMVbxDupo0lTJSSFP5pimEyxGynROImSsqIOx1A= sigs.k8s.io/kustomize v2.0.3+incompatible/go.mod h1:MkjgH3RdOWrievjo6c9T245dYlB5QeXV4WCbnt/PEpU= diff --git a/vendor/golang.org/x/sys/unix/syscall_linux.go b/vendor/golang.org/x/sys/unix/syscall_linux.go index e50e4cb2..fad483bb 100644 --- a/vendor/golang.org/x/sys/unix/syscall_linux.go +++ b/vendor/golang.org/x/sys/unix/syscall_linux.go @@ -2122,6 +2122,18 @@ func Klogset(typ int, arg int) (err error) { return nil } +// RemoteIovec is Iovec with the pointer replaced with an integer. +// It is used for ProcessVMReadv and ProcessVMWritev, where the pointer +// refers to a location in a different process' address space, which +// would confuse the Go garbage collector. +type RemoteIovec struct { + Base uintptr + Len int +} + +//sys ProcessVMReadv(pid int, localIov []Iovec, remoteIov []RemoteIovec, flags uint) (n int, err error) = SYS_PROCESS_VM_READV +//sys ProcessVMWritev(pid int, localIov []Iovec, remoteIov []RemoteIovec, flags uint) (n int, err error) = SYS_PROCESS_VM_WRITEV + /* * Unimplemented */ diff --git a/vendor/golang.org/x/sys/unix/zsyscall_linux.go b/vendor/golang.org/x/sys/unix/zsyscall_linux.go index df217825..f6603de4 100644 --- a/vendor/golang.org/x/sys/unix/zsyscall_linux.go +++ b/vendor/golang.org/x/sys/unix/zsyscall_linux.go @@ -1847,6 +1847,52 @@ func openByHandleAt(mountFD int, fh *fileHandle, flags int) (fd int, err error) // THIS FILE IS GENERATED BY THE COMMAND AT THE TOP; DO NOT EDIT +func ProcessVMReadv(pid int, localIov []Iovec, remoteIov []RemoteIovec, flags uint) (n int, err error) { + var _p0 unsafe.Pointer + if len(localIov) > 0 { + _p0 = unsafe.Pointer(&localIov[0]) + } else { + _p0 = unsafe.Pointer(&_zero) + } + var _p1 unsafe.Pointer + if len(remoteIov) > 0 { + _p1 = unsafe.Pointer(&remoteIov[0]) + } else { + _p1 = unsafe.Pointer(&_zero) + } + r0, _, e1 := Syscall6(SYS_PROCESS_VM_READV, uintptr(pid), uintptr(_p0), uintptr(len(localIov)), uintptr(_p1), uintptr(len(remoteIov)), uintptr(flags)) + n = int(r0) + if e1 != 0 { + err = errnoErr(e1) + } + return +} + +// THIS FILE IS GENERATED BY THE COMMAND AT THE TOP; DO NOT EDIT + +func ProcessVMWritev(pid int, localIov []Iovec, remoteIov []RemoteIovec, flags uint) (n int, err error) { + var _p0 unsafe.Pointer + if len(localIov) > 0 { + _p0 = unsafe.Pointer(&localIov[0]) + } else { + _p0 = unsafe.Pointer(&_zero) + } + var _p1 unsafe.Pointer + if len(remoteIov) > 0 { + _p1 = unsafe.Pointer(&remoteIov[0]) + } else { + _p1 = unsafe.Pointer(&_zero) + } + r0, _, e1 := Syscall6(SYS_PROCESS_VM_WRITEV, uintptr(pid), uintptr(_p0), uintptr(len(localIov)), uintptr(_p1), uintptr(len(remoteIov)), uintptr(flags)) + n = int(r0) + if e1 != 0 { + err = errnoErr(e1) + } + return +} + +// THIS FILE IS GENERATED BY THE COMMAND AT THE TOP; DO NOT EDIT + func pipe2(p *[2]_C_int, flags int) (err error) { _, _, e1 := RawSyscall(SYS_PIPE2, uintptr(unsafe.Pointer(p)), uintptr(flags), 0) if e1 != 0 { diff --git a/vendor/knative.dev/pkg/controller/controller.go b/vendor/knative.dev/pkg/controller/controller.go index 966a1fff..55108834 100644 --- a/vendor/knative.dev/pkg/controller/controller.go +++ b/vendor/knative.dev/pkg/controller/controller.go @@ -30,7 +30,6 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" @@ -186,12 +185,14 @@ type Impl struct { // from the workqueue to process. Public for testing. Reconciler Reconciler - // WorkQueue is a rate limited work queue. This is used to queue work to be - // processed instead of performing it as soon as a change happens. This - // means we can ensure we only process a fixed amount of resources at a - // time, and makes it easy to ensure we are never processing the same item - // simultaneously in two different workers. - WorkQueue workqueue.RateLimitingInterface + // workQueue is a rate-limited two-lane work queue. + // This is used to queue work to be processed instead of performing it as + // soon as a change happens. This means we can ensure we only process a + // fixed amount of resources at a time, and makes it easy to ensure we are + // never processing the same item simultaneously in two different workers. + // The slow queue is used for global resync and other background processes + // which are not required to complete at the highest priority. + workQueue *twoLaneQueue // Sugared logger is easier to use but is not as performant as the // raw logger. In performance critical paths, call logger.Desugar() @@ -215,12 +216,17 @@ func NewImplWithStats(r Reconciler, logger *zap.SugaredLogger, workQueueName str return &Impl{ Name: workQueueName, Reconciler: r, - WorkQueue: newTwoLaneWorkQueue(workQueueName), + workQueue: newTwoLaneWorkQueue(workQueueName), logger: logger, statsReporter: reporter, } } +// WorkQueue permits direct access to the work queue. +func (c *Impl) WorkQueue() workqueue.RateLimitingInterface { + return c.workQueue +} + // EnqueueAfter takes a resource, converts it into a namespace/name string, // and passes it to EnqueueKey. func (c *Impl) EnqueueAfter(obj interface{}, after time.Duration) { @@ -232,6 +238,25 @@ func (c *Impl) EnqueueAfter(obj interface{}, after time.Duration) { c.EnqueueKeyAfter(types.NamespacedName{Namespace: object.GetNamespace(), Name: object.GetName()}, after) } +// EnqueueSlowKey takes a resource, converts it into a namespace/name string, +// and enqueues that key in the slow lane. +func (c *Impl) EnqueueSlowKey(key types.NamespacedName) { + c.workQueue.SlowLane().Add(key) + c.logger.Debugf("Adding to the slow queue %s (depth(total/slow): %d/%d)", safeKey(key), c.workQueue.Len(), c.workQueue.SlowLane().Len()) +} + +// EnqueueSlow extracts namesspeced name from the object and enqueues it on the slow +// work queue. +func (c *Impl) EnqueueSlow(obj interface{}) { + object, err := kmeta.DeletionHandlingAccessor(obj) + if err != nil { + c.logger.Errorw("EnqueueSlow", zap.Error(err)) + return + } + key := types.NamespacedName{Namespace: object.GetNamespace(), Name: object.GetName()} + c.EnqueueSlowKey(key) +} + // Enqueue takes a resource, converts it into a namespace/name string, // and passes it to EnqueueKey. func (c *Impl) Enqueue(obj interface{}) { @@ -342,23 +367,23 @@ func (c *Impl) EnqueueNamespaceOf(obj interface{}) { // EnqueueKey takes a namespace/name string and puts it onto the work queue. func (c *Impl) EnqueueKey(key types.NamespacedName) { - c.WorkQueue.Add(key) - c.logger.Debugf("Adding to queue %s (depth: %d)", safeKey(key), c.WorkQueue.Len()) + c.workQueue.Add(key) + c.logger.Debugf("Adding to queue %s (depth: %d)", safeKey(key), c.workQueue.Len()) } -// MaybeEnqueueBucketKey takes a Bucket and namespace/name string and puts it onto the work queue. +// MaybeEnqueueBucketKey takes a Bucket and namespace/name string and puts it onto +// the slow work queue. func (c *Impl) MaybeEnqueueBucketKey(bkt reconciler.Bucket, key types.NamespacedName) { if bkt.Has(key) { - c.WorkQueue.Add(key) - c.logger.Debugf("Adding to queue %s (depth: %d)", safeKey(key), c.WorkQueue.Len()) + c.EnqueueSlowKey(key) } } // EnqueueKeyAfter takes a namespace/name string and schedules its execution in // the work queue after given delay. func (c *Impl) EnqueueKeyAfter(key types.NamespacedName, delay time.Duration) { - c.WorkQueue.AddAfter(key, delay) - c.logger.Debugf("Adding to queue %s (delay: %v, depth: %d)", safeKey(key), delay, c.WorkQueue.Len()) + c.workQueue.AddAfter(key, delay) + c.logger.Debugf("Adding to queue %s (delay: %v, depth: %d)", safeKey(key), delay, c.workQueue.Len()) } // RunContext starts the controller's worker threads, the number of which is threadiness. @@ -372,8 +397,8 @@ func (c *Impl) RunContext(ctx context.Context, threadiness int) error { sg := sync.WaitGroup{} defer sg.Wait() defer func() { - c.WorkQueue.ShutDown() - for c.WorkQueue.Len() > 0 { + c.workQueue.ShutDown() + for c.workQueue.Len() > 0 { time.Sleep(time.Millisecond * 100) } }() @@ -423,25 +448,25 @@ func (c *Impl) Run(threadiness int, stopCh <-chan struct{}) error { // processNextWorkItem will read a single work item off the workqueue and // attempt to process it, by calling Reconcile on our Reconciler. func (c *Impl) processNextWorkItem() bool { - obj, shutdown := c.WorkQueue.Get() + obj, shutdown := c.workQueue.Get() if shutdown { return false } key := obj.(types.NamespacedName) keyStr := safeKey(key) - c.logger.Debugf("Processing from queue %s (depth: %d)", safeKey(key), c.WorkQueue.Len()) + c.logger.Debugf("Processing from queue %s (depth: %d)", safeKey(key), c.workQueue.Len()) startTime := time.Now() // Send the metrics for the current queue depth - c.statsReporter.ReportQueueDepth(int64(c.WorkQueue.Len())) + c.statsReporter.ReportQueueDepth(int64(c.workQueue.Len())) // We call Done here so the workqueue knows we have finished // processing this item. We also must remember to call Forget if // reconcile succeeds. If a transient error occurs, we do not call // Forget and put the item back to the queue with an increased // delay. - defer c.WorkQueue.Done(key) + defer c.workQueue.Done(key) var err error defer func() { @@ -467,7 +492,7 @@ func (c *Impl) processNextWorkItem() bool { // Finally, if no error occurs we Forget this item so it does not // have any delay when another change happens. - c.WorkQueue.Forget(key) + c.workQueue.Forget(key) logger.Info("Reconcile succeeded. Time taken: ", time.Since(startTime)) return true @@ -480,16 +505,16 @@ func (c *Impl) handleErr(err error, key types.NamespacedName) { // We want to check that the queue is shutting down here // since controller Run might have exited by now (since while this item was // being processed, queue.Len==0). - if !IsPermanentError(err) && !c.WorkQueue.ShuttingDown() { - c.WorkQueue.AddRateLimited(key) - c.logger.Debugf("Requeuing key %s due to non-permanent error (depth: %d)", safeKey(key), c.WorkQueue.Len()) + if !IsPermanentError(err) && !c.workQueue.ShuttingDown() { + c.workQueue.AddRateLimited(key) + c.logger.Debugf("Requeuing key %s due to non-permanent error (depth: %d)", safeKey(key), c.workQueue.Len()) return } - c.WorkQueue.Forget(key) + c.workQueue.Forget(key) } -// GlobalResync enqueues (with a delay) all objects from the passed SharedInformer +// GlobalResync enqueues into the slow lane all objects from the passed SharedInformer func (c *Impl) GlobalResync(si cache.SharedInformer) { alwaysTrue := func(interface{}) bool { return true } c.FilteredGlobalResync(alwaysTrue, si) @@ -498,14 +523,13 @@ func (c *Impl) GlobalResync(si cache.SharedInformer) { // FilteredGlobalResync enqueues (with a delay) all objects from the // SharedInformer that pass the filter function func (c *Impl) FilteredGlobalResync(f func(interface{}) bool, si cache.SharedInformer) { - if c.WorkQueue.ShuttingDown() { + if c.workQueue.ShuttingDown() { return } list := si.GetStore().List() - count := float64(len(list)) for _, obj := range list { if f(obj) { - c.EnqueueAfter(obj, wait.Jitter(time.Second, count)) + c.EnqueueSlow(obj) } } } diff --git a/vendor/knative.dev/pkg/controller/two_lane_queue.go b/vendor/knative.dev/pkg/controller/two_lane_queue.go index 165eb2df..ebd53df5 100644 --- a/vendor/knative.dev/pkg/controller/two_lane_queue.go +++ b/vendor/knative.dev/pkg/controller/two_lane_queue.go @@ -16,9 +16,7 @@ limitations under the License. package controller -import ( - "k8s.io/client-go/util/workqueue" -) +import "k8s.io/client-go/util/workqueue" // twoLaneQueue is a rate limited queue that wraps around two queues // -- fast queue (anonymously aliased), whose contents are processed with priority. @@ -72,8 +70,8 @@ func process(q workqueue.Interface, ch chan interface{}) { if d { break } - ch <- i q.Done(i) + ch <- i } } diff --git a/vendor/knative.dev/pkg/hash/bucketer.go b/vendor/knative.dev/pkg/hash/bucketer.go index 37ad8398..d0c1460a 100644 --- a/vendor/knative.dev/pkg/hash/bucketer.go +++ b/vendor/knative.dev/pkg/hash/bucketer.go @@ -19,7 +19,6 @@ limitations under the License. package hash import ( - "errors" "sync" lru "github.com/hashicorp/golang-lru" @@ -67,15 +66,6 @@ func NewBucketSet(bucketList sets.String) *BucketSet { } } -// NewBucket creates a new bucket. Caller MUST make sure that -// the given `name` is in the given `bl.buckets`. -func NewBucket(name string, bl *BucketSet) *Bucket { - return &Bucket{ - name: name, - buckets: bl, - } -} - // Name implements Bucket. func (b *Bucket) Name() string { return b.name @@ -87,17 +77,24 @@ func (b *Bucket) Has(nn types.NamespacedName) bool { return b.buckets.Owner(nn.String()) == b.name } -// NewBucket creates a new Bucket with given name based on this bucketset. -func (bs *BucketSet) NewBucket(name string) (*Bucket, error) { +// Buckets creates a new list of all possible Bucket based on this bucketset +// ordered by bucket name. +func (bs *BucketSet) Buckets() []reconciler.Bucket { + bkts := make([]reconciler.Bucket, len(bs.buckets)) + for i, n := range bs.sortedBucketNames() { + bkts[i] = &Bucket{ + name: n, + buckets: bs, + } + } + return bkts +} + +func (bs *BucketSet) sortedBucketNames() []string { bs.mu.RLock() defer bs.mu.RUnlock() - if !bs.buckets.Has(name) { - return nil, errors.New(name + " is not a valid bucket in the bucketset") - } - return &Bucket{ - name: name, - buckets: bs, - }, nil + + return bs.buckets.List() } // Owner returns the owner of the key. diff --git a/vendor/knative.dev/pkg/leaderelection/context.go b/vendor/knative.dev/pkg/leaderelection/context.go index eaae67a7..5e897bcc 100644 --- a/vendor/knative.dev/pkg/leaderelection/context.go +++ b/vendor/knative.dev/pkg/leaderelection/context.go @@ -19,7 +19,6 @@ package leaderelection import ( "context" "fmt" - "hash/fnv" "strings" "sync" @@ -82,11 +81,11 @@ type Elector interface { // BuildElector builds a leaderelection.LeaderElector for the named LeaderAware // reconciler using a builder added to the context via WithStandardLeaderElectorBuilder. -func BuildElector(ctx context.Context, la reconciler.LeaderAware, name string, enq func(reconciler.Bucket, types.NamespacedName)) (Elector, error) { +func BuildElector(ctx context.Context, la reconciler.LeaderAware, queueName string, enq func(reconciler.Bucket, types.NamespacedName)) (Elector, error) { if val := ctx.Value(builderKey{}); val != nil { switch builder := val.(type) { case *standardBuilder: - return builder.buildElector(ctx, la, name, enq) + return builder.buildElector(ctx, la, queueName, enq) case *statefulSetBuilder: return builder.buildElector(ctx, la, enq) } @@ -107,7 +106,7 @@ type standardBuilder struct { } func (b *standardBuilder) buildElector(ctx context.Context, la reconciler.LeaderAware, - name string, enq func(reconciler.Bucket, types.NamespacedName)) (Elector, error) { + queueName string, enq func(reconciler.Bucket, types.NamespacedName)) (Elector, error) { logger := logging.FromContext(ctx) id, err := UniqueID() @@ -115,16 +114,12 @@ func (b *standardBuilder) buildElector(ctx context.Context, la reconciler.Leader return nil, err } - buckets := make([]Elector, 0, b.lec.Buckets) - for i := uint32(0); i < b.lec.Buckets; i++ { - bkt := &bucket{ - // The resource name is the lowercase: - // {component}.{workqueue}.{index}-of-{total} - name: strings.ToLower(fmt.Sprintf("%s.%s.%02d-of-%02d", b.lec.Component, name, i, b.lec.Buckets)), - index: i, - total: b.lec.Buckets, - } - + bkts := newStandardBuckets(queueName, b.lec) + electors := make([]Elector, 0, b.lec.Buckets) + for _, bkt := range bkts { + // Use a local var which won't change across the for loop since it is + // used in a callback asynchronously. + bkt := bkt rl, err := resourcelock.New(KnativeResourceLock, system.Namespace(), // use namespace we are running in bkt.Name(), @@ -168,9 +163,22 @@ func (b *standardBuilder) buildElector(ctx context.Context, la reconciler.Leader // if lec.WatchDog != nil { // lec.WatchDog.SetLeaderElection(le) // } - buckets = append(buckets, &runUntilCancelled{Elector: le}) + electors = append(electors, &runUntilCancelled{Elector: le}) } - return &runAll{les: buckets}, nil + return &runAll{les: electors}, nil +} + +func newStandardBuckets(queueName string, cc ComponentConfig) []reconciler.Bucket { + names := make(sets.String, cc.Buckets) + for i := uint32(0); i < cc.Buckets; i++ { + names.Insert(standardBucketName(i, queueName, cc)) + } + + return hash.NewBucketSet(names).Buckets() +} + +func standardBucketName(ordinal uint32, queueName string, cc ComponentConfig) string { + return strings.ToLower(fmt.Sprintf("%s.%s.%02d-of-%02d", cc.Component, queueName, ordinal, cc.Buckets)) } type statefulSetBuilder struct { @@ -210,7 +218,9 @@ func NewStatefulSetBucketAndSet(buckets int) (reconciler.Bucket, *hash.BucketSet } bs := hash.NewBucketSet(names) - return hash.NewBucket(statefulSetPodDNS(ssc.StatefulSetID.ordinal, ssc), bs), bs, nil + // Buckets is sorted in order of names so we can use ordinal to + // get the correct Bucket for this binary. + return bs.Buckets()[ssc.StatefulSetID.ordinal], bs, nil } func statefulSetPodDNS(ordinal int, ssc *statefulSetConfig) string { @@ -269,26 +279,3 @@ func (ruc *runUntilCancelled) Run(ctx context.Context) { } } } - -type bucket struct { - name string - - // We are bucket {index} of {total} - index uint32 - total uint32 -} - -var _ reconciler.Bucket = (*bucket)(nil) - -// Name implements reconciler.Bucket -func (b *bucket) Name() string { - return b.name -} - -// Has implements reconciler.Bucket -func (b *bucket) Has(nn types.NamespacedName) bool { - h := fnv.New32a() - h.Write([]byte(nn.Namespace + "." + nn.Name)) - ii := h.Sum32() % b.total - return b.index == ii -} diff --git a/vendor/modules.txt b/vendor/modules.txt index 73bc6742..81743481 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -251,7 +251,7 @@ golang.org/x/oauth2/jws golang.org/x/oauth2/jwt # golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208 golang.org/x/sync/semaphore -# golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae +# golang.org/x/sys v0.0.0-20200720211630-cb9d2d5c5666 golang.org/x/sys/internal/unsafeheader golang.org/x/sys/unix golang.org/x/sys/windows @@ -265,7 +265,7 @@ golang.org/x/text/unicode/norm golang.org/x/text/width # golang.org/x/time v0.0.0-20200416051211-89c76fbcd5d1 golang.org/x/time/rate -# golang.org/x/tools v0.0.0-20200714190737-9048b464a08d +# golang.org/x/tools v0.0.0-20200721032237-77f530d86f9a golang.org/x/tools/go/ast/astutil golang.org/x/tools/imports golang.org/x/tools/internal/event @@ -323,7 +323,7 @@ google.golang.org/appengine/internal/socket google.golang.org/appengine/internal/urlfetch google.golang.org/appengine/socket google.golang.org/appengine/urlfetch -# google.golang.org/genproto v0.0.0-20200715011427-11fb19a81f2c +# google.golang.org/genproto v0.0.0-20200721032028-5044d0edf986 google.golang.org/genproto/googleapis/api google.golang.org/genproto/googleapis/api/annotations google.golang.org/genproto/googleapis/api/distribution @@ -475,7 +475,7 @@ k8s.io/api/settings/v1alpha1 k8s.io/api/storage/v1 k8s.io/api/storage/v1alpha1 k8s.io/api/storage/v1beta1 -# k8s.io/apimachinery v0.18.5 => k8s.io/apimachinery v0.17.6 +# k8s.io/apimachinery v0.18.6 => k8s.io/apimachinery v0.17.6 ## explicit k8s.io/apimachinery/pkg/api/equality k8s.io/apimachinery/pkg/api/errors @@ -747,7 +747,7 @@ k8s.io/kube-openapi/pkg/util/sets k8s.io/utils/buffer k8s.io/utils/integer k8s.io/utils/trace -# knative.dev/pkg v0.0.0-20200721091635-3b7ca76a63e4 +# knative.dev/pkg v0.0.0-20200722085354-ba0f3cb7cf84 ## explicit knative.dev/pkg/apis knative.dev/pkg/apis/duck/ducktypes @@ -771,7 +771,7 @@ knative.dev/pkg/metrics/metricskey knative.dev/pkg/network knative.dev/pkg/reconciler knative.dev/pkg/system -# knative.dev/test-infra v0.0.0-20200720224135-d2706240545c +# knative.dev/test-infra v0.0.0-20200721175154-c98db9bd4d5d ## explicit knative.dev/test-infra/scripts knative.dev/test-infra/tools/dep-collector