karmada/pkg/search/proxy/store/util.go

420 lines
10 KiB
Go

/*
Copyright 2022 The Karmada Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package store
import (
"encoding/base64"
"encoding/json"
"sort"
"sync"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/watch"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
)
type multiClusterResourceVersion struct {
rvs map[string]string
isZero bool
}
func newMultiClusterResourceVersionWithCapacity(capacity int) *multiClusterResourceVersion {
return &multiClusterResourceVersion{
rvs: make(map[string]string, capacity),
}
}
func newMultiClusterResourceVersionFromString(s string) *multiClusterResourceVersion {
m := &multiClusterResourceVersion{
rvs: map[string]string{},
}
if s == "" {
return m
}
if s == "0" {
m.isZero = true
return m
}
decoded, err := base64.RawURLEncoding.DecodeString(s)
if err != nil {
// if invalid, ignore the version
return m
}
// if invalid, ignore the version
_ = json.Unmarshal(decoded, &m.rvs)
return m
}
func (m *multiClusterResourceVersion) set(cluster, rv string) {
m.rvs[cluster] = rv
if rv != "0" {
m.isZero = false
}
}
func (m *multiClusterResourceVersion) get(cluster string) string {
if m.isZero {
return "0"
}
return m.rvs[cluster]
}
func (m *multiClusterResourceVersion) clone() *multiClusterResourceVersion {
ret := &multiClusterResourceVersion{
isZero: m.isZero,
rvs: make(map[string]string, len(m.rvs)),
}
for k, v := range m.rvs {
ret.rvs[k] = v
}
return ret
}
func (m *multiClusterResourceVersion) String() string {
if m.isZero {
return "0"
}
// todo consider to separate this two scenarios:
// 1. client do not send ResourceVersion
// 2. client send ResourceVersion with empty cluster.
if len(m.rvs) == 0 {
return ""
}
buf := marshalRvs(m.rvs)
return base64.RawURLEncoding.EncodeToString(buf)
}
func marshalRvs(rvs map[string]string) []byte {
// We must make sure the returned ResourceVersion string is stable, because client might use this string
// for equality comparison. But hashmap's key iteration order is not determined.
// So we can't use json encoding library directly.
// Instead, we convert the map to a slice, sort the slice by `Cluster` field, then manually build a string.
// The result string resembles json encoding result to keep compatible with older version of proxy.
if len(rvs) == 0 {
// need to keep sync with `func (m *multiClusterResourceVersion) String() string`
return nil
}
type onWireRvs struct {
Cluster string
ResourceVersion string
}
slice := make([]onWireRvs, 0, len(rvs))
for clusterName, version := range rvs {
obj := onWireRvs{clusterName, version}
slice = append(slice, obj)
}
sort.Slice(slice, func(i, j int) bool {
return slice[i].Cluster < slice[j].Cluster
})
// Q: Why preallocate []byte with this capacity?
// A: Consider this json `{"cluster1":"1","cluster2":"2"}`
// For begin and end, need "{" and "}", this took 2 bytes.
// For `"cluster1":"1",`, there's 6 bytes. And we add 14bytes for longer cluster name and resource version.
var encoded = make([]byte, 0, (len(slice[0].Cluster)+len(slice[0].ResourceVersion)+20)*len(slice)+2)
encoded = append(encoded, '{')
for i, n := 0, len(slice); i < n; i++ {
encoded = append(encoded, '"')
encoded = append(encoded, slice[i].Cluster...)
encoded = append(encoded, `":"`...)
encoded = append(encoded, slice[i].ResourceVersion...)
encoded = append(encoded, '"')
if i != n-1 {
encoded = append(encoded, ',')
}
}
encoded = append(encoded, '}')
return encoded
}
type multiClusterContinue struct {
RV string `json:"rv"`
Cluster string `json:"cluster,omitempty"`
Continue string `json:"continue,omitempty"`
}
func newMultiClusterContinueFromString(s string) multiClusterContinue {
var m multiClusterContinue
if s == "" {
return m
}
decoded, err := base64.RawURLEncoding.DecodeString(s)
if err != nil {
return m
}
// if invalid, ignore continue
_ = json.Unmarshal(decoded, &m)
return m
}
func (c *multiClusterContinue) String() string {
if c.Cluster == "" {
return ""
}
buf, _ := json.Marshal(c)
return base64.RawURLEncoding.EncodeToString(buf)
}
type decoratedWatcher struct {
watcher watch.Interface
decorator func(watch.Event)
}
type watchMux struct {
lock sync.RWMutex
sources []decoratedWatcher
result chan watch.Event
done chan struct{}
}
func newWatchMux() *watchMux {
return &watchMux{
result: make(chan watch.Event),
done: make(chan struct{}),
}
}
// AddSource shall be called before Start
func (w *watchMux) AddSource(watcher watch.Interface, decorator func(watch.Event)) {
w.sources = append(w.sources, decoratedWatcher{
watcher: watcher,
decorator: decorator,
})
}
// Start run the watcher
func (w *watchMux) Start() {
wg := sync.WaitGroup{}
for i := range w.sources {
source := w.sources[i]
wg.Add(1)
go func() {
defer wg.Done()
w.startWatchSource(source.watcher, source.decorator)
}()
}
go func() {
// close result chan after all goroutines exit, avoiding data race.
defer close(w.result)
wg.Wait()
}()
}
// ResultChan implements watch.Interface
func (w *watchMux) ResultChan() <-chan watch.Event {
return w.result
}
// Stop implements watch.Interface
func (w *watchMux) Stop() {
select {
case <-w.done:
return
default:
}
w.lock.Lock()
defer w.lock.Unlock()
select {
case <-w.done:
default:
close(w.done)
}
}
func (w *watchMux) startWatchSource(source watch.Interface, decorator func(watch.Event)) {
defer source.Stop()
defer w.Stop()
for {
var copyEvent watch.Event
select {
case sourceEvent, ok := <-source.ResultChan():
if !ok {
return
}
// sourceEvent object is cacheObject,all watcher use the same point,must deepcopy.
copyEvent = *sourceEvent.DeepCopy()
if decorator != nil {
decorator(copyEvent)
}
case <-w.done:
return
}
select {
case <-w.done:
return
case w.result <- copyEvent:
}
}
}
// MultiNamespace contains multiple namespaces.
type MultiNamespace struct {
allNamespaces bool
namespaces sets.Set[string]
}
// Merge merges multiNS into n.
func (n *MultiNamespace) Merge(multiNS *MultiNamespace) *MultiNamespace {
if n.allNamespaces || multiNS.allNamespaces {
n.allNamespaces = true
n.namespaces = nil
return n
}
if n.Equal(multiNS) {
return n
}
n.namespaces.Insert(multiNS.namespaces.Clone().UnsortedList()...)
return n
}
// NewMultiNamespace return a new empty MultiNamespace.
func NewMultiNamespace() *MultiNamespace {
return &MultiNamespace{
namespaces: sets.New[string](),
}
}
// Add adds ns.
func (n *MultiNamespace) Add(ns string) {
if n.allNamespaces || n.namespaces.Has(ns) {
return
}
if ns == metav1.NamespaceAll {
n.allNamespaces = true
n.namespaces = nil
return
}
n.namespaces.Insert(ns)
}
// Contains returns if ns is covered by MultiNamespace. NamespaceAll covers all.
func (n *MultiNamespace) Contains(ns string) bool {
return n.allNamespaces || n.namespaces.Has(ns)
}
// Single returns ns name and true when only one namespace in MultiNamespace. NamespaceAll returns false.
func (n *MultiNamespace) Single() (string, bool) {
if n.allNamespaces || n.namespaces.Len() != 1 {
return "", false
}
// reach here means there is exactly one namespace, so we can safely get it.
ns := sets.List(n.namespaces)[0]
return ns, true
}
// Equal tell whether two MultiNamespace has the same namespaces.
func (n *MultiNamespace) Equal(another *MultiNamespace) bool {
if n.allNamespaces != another.allNamespaces {
return false
}
if n.allNamespaces {
return true
}
return n.namespaces.Equal(another.namespaces)
}
func addCacheSourceAnnotation(obj runtime.Object, clusterName string) {
accessor, err := meta.Accessor(obj)
if err != nil {
// Object has no meta, do nothing
return
}
annotations := accessor.GetAnnotations()
if annotations == nil {
annotations = make(map[string]string)
}
annotations[clusterv1alpha1.CacheSourceAnnotationKey] = clusterName
accessor.SetAnnotations(annotations)
}
// RemoveCacheSourceAnnotation delete CacheSourceAnnotationKey annotation in object. If obj is updated, return true.
func RemoveCacheSourceAnnotation(obj runtime.Object) bool {
accessor, err := meta.Accessor(obj)
if err != nil {
// Object has no meta, do nothing
return false
}
annotations := accessor.GetAnnotations()
_, exist := annotations[clusterv1alpha1.CacheSourceAnnotationKey]
if exist {
delete(annotations, clusterv1alpha1.CacheSourceAnnotationKey)
accessor.SetAnnotations(annotations)
return true
}
return false
}
// RecoverClusterResourceVersion convert global resource version to single cluster resource version. If obj is updated, return true.
func RecoverClusterResourceVersion(obj runtime.Object, cluster string) bool {
accessor, err := meta.Accessor(obj)
if err != nil {
// Object has no meta, do nothing
return false
}
rv := accessor.GetResourceVersion()
if rv == "" || rv == "0" {
return false
}
decoded, err := base64.RawURLEncoding.DecodeString(rv)
if err != nil {
// it's not global rv, do nothing
return false
}
m := make(map[string]string)
err = json.Unmarshal(decoded, &m)
if err != nil {
// it's not global rv, do nothing
return false
}
crv := m[cluster]
accessor.SetResourceVersion(crv)
return true
}
// BuildMultiClusterResourceVersion build multi cluster resource version.
func BuildMultiClusterResourceVersion(clusterResourceMap map[string]string) string {
m := newMultiClusterResourceVersionWithCapacity(len(clusterResourceMap))
for cluster, rv := range clusterResourceMap {
m.set(cluster, rv)
}
return m.String()
}