Move the hashing from the serving (#1469)

* Move the hashing from the serving

* comment

* docs

* change api to accept set
This commit is contained in:
Victor Agababov 2020-07-08 10:14:47 -07:00 committed by GitHub
parent 0a8314b444
commit 5358179e74
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 363 additions and 0 deletions

7
hash/OWNERS Normal file
View File

@ -0,0 +1,7 @@
# The OWNERS file is used by prow to automatically merge approved PRs.
approvers:
- controller-approvers
reviewers:
- controller-reviewers

22
hash/doc.go Normal file
View File

@ -0,0 +1,22 @@
/*
Copyright 2020 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package hash contains various Knative specific hashing utilities.
//
// - ChooseSubset is a consistent hashing/mapping function providing
// a consistent selection of N keys from M (N<=M) keys for a given
// target.
package hash

166
hash/hash.go Normal file
View File

@ -0,0 +1,166 @@
/*
Copyright 2020 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package hash
// This file contains the implementation of the subsetting algorithm for
// choosing a subset of input values in a consistent manner.
import (
"bytes"
"hash"
"hash/fnv"
"sort"
"strconv"
"k8s.io/apimachinery/pkg/util/sets"
)
const (
startSalt = "start-angle-salt"
stepSalt = "step-angle-salt"
// universe represents the possible range of angles [0, universe).
// We want to have universe divide total range evenly to reduce bias.
universe = (1 << 11)
)
// computeAngle returns a uint64 number which represents
// a hash built off the given `n` string for consistent selection
// algorithm.
// We return uint64 here and cast after computing modulo, since
// int might 32 bits on 32 platforms and that would trim result.
func computeHash(n []byte, h hash.Hash64) uint64 {
h.Reset()
h.Write(n)
return h.Sum64()
}
type hashData struct {
// The set of all hashes for fast lookup and to name mapping
nameLookup map[int]string
// Sorted set of hashes for selection algorithm.
hashPool []int
// start angle
start int
// step angle
step int
}
func (hd *hashData) fromIndexSet(s sets.Int) sets.String {
ret := sets.NewString()
for v := range s {
ret.Insert(hd.nameForHIndex(v))
}
return ret
}
func (hd *hashData) nameForHIndex(hi int) string {
return hd.nameLookup[hd.hashPool[hi]]
}
func buildHashes(in sets.String, target string) *hashData {
// Any one changing this function must execute
// `go test -run=TestOverlay -count=200`.
// This is to ensure there is no regression in the selection
// algorithm.
// Sorted list to ensure consistent results every time.
from := in.List()
// Write in two pieces, so we don't allocate temp string which is sum of both.
buf := bytes.NewBufferString(target)
buf.WriteString(startSalt)
hasher := fnv.New64a()
hd := &hashData{
nameLookup: make(map[int]string, len(from)),
hashPool: make([]int, len(from)),
start: int(computeHash(buf.Bytes(), hasher) % universe),
}
buf.Truncate(len(target)) // Discard the angle salt.
buf.WriteString(stepSalt)
hd.step = int(computeHash(buf.Bytes(), hasher) % universe)
for i, f := range from {
buf.Reset() // This retains the storage.
// Make unique sets for every target.
buf.WriteString(f)
buf.WriteString(target)
h := computeHash(buf.Bytes(), hasher)
hs := int(h % universe)
// Two values slotted to the same bucket.
// On average should happen with 1/universe probability.
_, ok := hd.nameLookup[hs]
for ok {
// Feed the hash as salt.
buf.WriteString(strconv.FormatUint(h, 16 /*append hex strings for shortness*/))
h = computeHash(buf.Bytes(), hasher)
hs = int(h % universe)
_, ok = hd.nameLookup[hs]
}
hd.hashPool[i] = hs
hd.nameLookup[hs] = f
}
// Sort for consistent mapping later.
sort.Slice(hd.hashPool, func(i, j int) bool {
return hd.hashPool[i] < hd.hashPool[j]
})
return hd
}
// ChooseSubset consistently chooses n items from `from`, using
// `target` as a seed value.
// ChooseSubset is an internal function and presumes sanitized inputs.
// TODO(vagababov): once initial impl is ready, think about how to cache
// the prepared data.
func ChooseSubset(from sets.String, n int, target string) sets.String {
if n >= len(from) {
return from
}
hashData := buildHashes(from, target)
// The algorithm for selection does the following:
// 0. Select angle to be the start angle
// 1. While n candidates are not selected
// 2. Find the index for that angle.
// 2.1. While that index is already selected pick next index
// 3. Advance angle by `step`
// 4. Goto 1.
selection := sets.NewInt()
angle := hashData.start
hpl := len(hashData.hashPool)
for len(selection) < n {
root := sort.Search(hpl, func(i int) bool {
return hashData.hashPool[i] >= angle
})
// Wrap around.
if root == hpl {
root = 0
}
// Already matched this one. Continue to the next index.
for selection.Has(root) {
root += 1
if root == hpl {
root = 0
}
}
selection.Insert(root)
angle = (angle + hashData.step) % universe
}
return hashData.fromIndexSet(selection)
}

168
hash/hash_test.go Normal file
View File

@ -0,0 +1,168 @@
/*
Copyright 2020 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package hash
import (
"fmt"
"hash/fnv"
"math"
"sort"
"testing"
"github.com/davecgh/go-spew/spew"
"github.com/google/go-cmp/cmp"
"github.com/google/uuid"
"k8s.io/apimachinery/pkg/util/sets"
)
func TestBuildHashes(t *testing.T) {
const target = "a target to remember"
set := sets.NewString("a", "b", "c", "e", "f")
hd1 := buildHashes(set, target)
hd2 := buildHashes(set, target)
t.Log("HashData = ", spew.Sprintf("%+v", hd1))
if !cmp.Equal(hd1, hd2, cmp.AllowUnexported(hashData{})) {
t.Errorf("buildHashe is not consistent: diff(-want,+got):\n%s",
cmp.Diff(hd1, hd2, cmp.AllowUnexported(hashData{})))
}
if !sort.SliceIsSorted(hd1.hashPool, func(i, j int) bool {
return hd1.hashPool[i] < hd1.hashPool[j]
}) {
t.Errorf("From list is not sorted: %v", hd1.hashPool)
}
}
func TestChooseSubset(t *testing.T) {
tests := []struct {
name string
from sets.String
target string
wantNum int
want sets.String
}{{
name: "return all",
from: sets.NewString("sun", "moon", "mars", "mercury"),
target: "a target!",
wantNum: 4,
want: sets.NewString("sun", "moon", "mars", "mercury"),
}, {
name: "subset 1",
from: sets.NewString("sun", "moon", "mars", "mercury"),
target: "a target!",
wantNum: 2,
want: sets.NewString("mercury", "moon"),
}, {
name: "subset 2",
from: sets.NewString("sun", "moon", "mars", "mercury"),
target: "something else entirely",
wantNum: 2,
want: sets.NewString("mercury", "mars"),
}, {
name: "select 3",
from: sets.NewString("sun", "moon", "mars", "mercury"),
target: "something else entirely",
wantNum: 3,
want: sets.NewString("mars", "mercury", "sun"),
}}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
got := ChooseSubset(tc.from, tc.wantNum, tc.target)
if !got.Equal(tc.want) {
t.Errorf("Chose = %v, want = %v, diff(-want,+got):\n%s", got, tc.want, cmp.Diff(tc.want, got))
}
})
}
}
func TestCollisionHandling(t *testing.T) {
const (
key1 = "b08006d4-81f9-42ee-808b-ea18a39cbd83"
key2 = "c9dc8df4-8c8d-4077-8750-6d2c2113a23b"
target = "e68a64e1-19d8-4855-9ffa-04f49223a059"
)
// Verify baseline, that they collide.
hasher := fnv.New64a()
h1 := computeHash([]byte(key1+target), hasher) % universe
hasher.Reset()
h2 := computeHash([]byte(key2+target), hasher) % universe
if h1 != h2 {
t.Fatalf("Baseline incorrect keys don't collide %d != %d", h1, h2)
}
hd := buildHashes(sets.NewString(key1, key2), target)
if got, want := len(hd.nameLookup), 2; got != want {
t.Error("Did not resolve collision, only 1 key in the map")
}
}
func TestOverlay(t *testing.T) {
// Execute
// `go test -run=TestOverlay -count=200`
// To ensure assignments are still not skewed.
const (
sources = 50
samples = 100000
selection = 10
want = samples * selection / sources
threshold = want / 5 // 20%
)
from := sets.NewString()
for i := 0; i < sources; i++ {
from.Insert(uuid.New().String())
}
freqs := make(map[string]int, sources)
for i := 0; i < samples; i++ {
target := uuid.New().String()
got := ChooseSubset(from, selection, target)
for k := range got {
freqs[k]++
}
}
totalDiff := 0.
for _, v := range freqs {
diff := float64(v - want)
adiff := math.Abs(diff)
totalDiff += adiff
if adiff > threshold {
t.Errorf("Diff for %d is %v, larger than threshold: %d", v, diff, threshold)
}
}
t.Log(totalDiff / float64(len(freqs)))
}
func BenchmarkSelection(b *testing.B) {
const maxSet = 200
from := make([]string, maxSet)
for i := 0; i < maxSet; i++ {
from[i] = uuid.New().String()
}
for _, v := range []int{5, 10, 25, 50, 100, 150, maxSet} {
for _, ss := range []int{1, 5, 10, 15, 20, 25} {
b.Run(fmt.Sprintf("pool-%d-subset-%d", v, ss), func(b *testing.B) {
target := uuid.New().String()
in := sets.NewString(from[:v]...)
for i := 0; i < b.N; i++ {
ChooseSubset(in, 10, target)
}
})
}
}
}