VPA - serialization of histograms to snapshots.
This commit is contained in:
parent
1da8dc8e68
commit
39c0083256
|
|
@ -20,6 +20,9 @@ import (
|
|||
"fmt"
|
||||
"math"
|
||||
"time"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
vpa_types "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/poc.autoscaling.k8s.io/v1alpha1"
|
||||
)
|
||||
|
||||
var (
|
||||
|
|
@ -115,6 +118,24 @@ func (h *decayingHistogram) decayFactor(timestamp time.Time) float64 {
|
|||
return math.Exp2(float64(timestamp.Sub(h.decayStart)) / float64(h.halfLife))
|
||||
}
|
||||
|
||||
func (h *decayingHistogram) SaveToChekpoint() (*vpa_types.HistogramCheckpoint, error) {
|
||||
checkpoint, err := h.histogram.SaveToChekpoint()
|
||||
if err != nil {
|
||||
return checkpoint, err
|
||||
}
|
||||
checkpoint.ReferenceTimestamp = metav1.NewTime(h.decayStart)
|
||||
return checkpoint, nil
|
||||
}
|
||||
|
||||
func (h *decayingHistogram) LoadFromCheckpoint(checkpoint *vpa_types.HistogramCheckpoint) error {
|
||||
err := h.histogram.LoadFromCheckpoint(checkpoint)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
h.decayStart = checkpoint.ReferenceTimestamp.Time
|
||||
return nil
|
||||
}
|
||||
|
||||
func round(x float64) int {
|
||||
return int(math.Floor(x + 0.5))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,6 +21,8 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
vpa_types "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/poc.autoscaling.k8s.io/v1alpha1"
|
||||
)
|
||||
|
||||
var (
|
||||
|
|
@ -126,3 +128,43 @@ func TestDecayingHistogramMerge(t *testing.T) {
|
|||
h1.Merge(h2)
|
||||
assert.True(t, h1.Equals(expected))
|
||||
}
|
||||
|
||||
func TestDecayingHistogramSaveToCheckpoint(t *testing.T) {
|
||||
d := &decayingHistogram{
|
||||
histogram: *NewHistogram(testHistogramOptions).(*histogram),
|
||||
halfLife: time.Hour,
|
||||
decayStart: time.Time{},
|
||||
}
|
||||
d.AddSample(2, 1, startTime.Add(time.Hour*100))
|
||||
assert.NotEqual(t, d.decayStart, time.Time{})
|
||||
|
||||
checkpoint, err := d.SaveToChekpoint()
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, checkpoint.ReferenceTimestamp.Time, d.decayStart)
|
||||
// Just check that buckets are not empty, actual testing of bucketing
|
||||
// belongs to Histogram
|
||||
assert.NotEmpty(t, checkpoint.BucketWeights)
|
||||
assert.NotZero(t, checkpoint.TotalWeight)
|
||||
}
|
||||
|
||||
func TestDecayingHistogramLoadFromCheckpoint(t *testing.T) {
|
||||
location, _ := time.LoadLocation("UTC")
|
||||
timestamp := time.Date(2018, time.January, 2, 3, 4, 5, 0, location)
|
||||
|
||||
checkpoint := vpa_types.HistogramCheckpoint{
|
||||
TotalWeight: 6.0,
|
||||
BucketWeights: map[int]uint32{
|
||||
0: 1,
|
||||
},
|
||||
ReferenceTimestamp: metav1.NewTime(timestamp),
|
||||
}
|
||||
d := &decayingHistogram{
|
||||
histogram: *NewHistogram(testHistogramOptions).(*histogram),
|
||||
halfLife: time.Hour,
|
||||
decayStart: time.Time{},
|
||||
}
|
||||
d.LoadFromCheckpoint(&checkpoint)
|
||||
|
||||
assert.False(t, d.histogram.IsEmpty())
|
||||
assert.Equal(t, timestamp, d.decayStart)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -20,6 +20,14 @@ import (
|
|||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
vpa_types "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/poc.autoscaling.k8s.io/v1alpha1"
|
||||
)
|
||||
|
||||
const (
|
||||
// MaxCheckpointWeight is the maximum weight that can be stored in
|
||||
// HistogramCheckpoint in a single bucket
|
||||
MaxCheckpointWeight uint32 = 10000
|
||||
)
|
||||
|
||||
// Histogram represents an approximate distribution of some variable.
|
||||
|
|
@ -53,6 +61,15 @@ type Histogram interface {
|
|||
|
||||
// Returns a human-readable text description of the histogram.
|
||||
String() string
|
||||
|
||||
// SaveToChekpoint returns a representation of the histogram as a
|
||||
// HistogramCheckpoint. During conversion buckets with small weights
|
||||
// can be ommited.
|
||||
SaveToChekpoint() (*vpa_types.HistogramCheckpoint, error)
|
||||
|
||||
// LoadFromCheckpoint loads data from the checkpoint into the histogram
|
||||
// by appending samples.
|
||||
LoadFromCheckpoint(*vpa_types.HistogramCheckpoint) error
|
||||
}
|
||||
|
||||
// NewHistogram returns a new Histogram instance using given options.
|
||||
|
|
@ -199,6 +216,66 @@ func (h *histogram) updateMinAndMaxBucket() {
|
|||
}
|
||||
}
|
||||
|
||||
func (h *histogram) SaveToChekpoint() (*vpa_types.HistogramCheckpoint, error) {
|
||||
result := vpa_types.HistogramCheckpoint{
|
||||
BucketWeights: make(map[int]uint32),
|
||||
}
|
||||
result.TotalWeight = h.totalWeight
|
||||
// Find max
|
||||
max := 0.
|
||||
for bucket := h.minBucket; bucket <= h.maxBucket; bucket++ {
|
||||
if h.bucketWeight[bucket] > max {
|
||||
max = h.bucketWeight[bucket]
|
||||
}
|
||||
}
|
||||
// Compute ratio
|
||||
ratio := float64(MaxCheckpointWeight) / max
|
||||
// Convert weights and drop near-zero weights
|
||||
for bucket := h.minBucket; bucket <= h.maxBucket; bucket++ {
|
||||
newWeight := uint32(round(h.bucketWeight[bucket] * ratio))
|
||||
if newWeight > 0 {
|
||||
result.BucketWeights[bucket] = newWeight
|
||||
}
|
||||
}
|
||||
|
||||
return &result, nil
|
||||
}
|
||||
|
||||
func (h *histogram) LoadFromCheckpoint(checkpoint *vpa_types.HistogramCheckpoint) error {
|
||||
if checkpoint == nil {
|
||||
return fmt.Errorf("Cannot load from empty checkpoint")
|
||||
}
|
||||
if checkpoint.TotalWeight < 0.0 {
|
||||
return fmt.Errorf("Cannot load checkpoint with negative weight %v", checkpoint.TotalWeight)
|
||||
}
|
||||
sum := int64(0)
|
||||
for bucket, weight := range checkpoint.BucketWeights {
|
||||
sum += int64(weight)
|
||||
if bucket >= h.options.NumBuckets() {
|
||||
return fmt.Errorf("Checkpoint has bucket %v that is exceeding histogram buckets %v", bucket, h.options.NumBuckets())
|
||||
}
|
||||
if bucket < 0 {
|
||||
return fmt.Errorf("Checkpoint has a negative bucket %v", bucket)
|
||||
}
|
||||
}
|
||||
if sum == 0 {
|
||||
return nil
|
||||
}
|
||||
ratio := checkpoint.TotalWeight / float64(sum)
|
||||
for bucket, weight := range checkpoint.BucketWeights {
|
||||
if bucket < h.minBucket {
|
||||
h.minBucket = bucket
|
||||
}
|
||||
if bucket > h.maxBucket {
|
||||
h.maxBucket = bucket
|
||||
}
|
||||
h.bucketWeight[bucket] += float64(weight) * ratio
|
||||
}
|
||||
h.totalWeight += checkpoint.TotalWeight
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Multiplies all weights by a given factor. The factor must be non-negative.
|
||||
// (note: this operation does not affect the percentiles of the distribution)
|
||||
func (h *histogram) scale(factor float64) {
|
||||
|
|
|
|||
|
|
@ -17,8 +17,10 @@ limitations under the License.
|
|||
package util
|
||||
|
||||
import (
|
||||
"github.com/stretchr/testify/mock"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/mock"
|
||||
vpa_types "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/poc.autoscaling.k8s.io/v1alpha1"
|
||||
)
|
||||
|
||||
// MockHistogram is a mock implementation of Histogram interface.
|
||||
|
|
@ -64,3 +66,13 @@ func (m *MockHistogram) String() string {
|
|||
args := m.Called()
|
||||
return args.String(0)
|
||||
}
|
||||
|
||||
// SaveToChekpoint is a mock implementation of Histogram.SaveToChekpoint.
|
||||
func (m *MockHistogram) SaveToChekpoint() (*vpa_types.HistogramCheckpoint, error) {
|
||||
return &vpa_types.HistogramCheckpoint{}, nil
|
||||
}
|
||||
|
||||
// LoadFromCheckpoint is a mock implementation of Histogram.LoadFromCheckpoint.
|
||||
func (m *MockHistogram) LoadFromCheckpoint(checkpoint *vpa_types.HistogramCheckpoint) error {
|
||||
return nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
vpa_types "k8s.io/autoscaler/vertical-pod-autoscaler/pkg/apis/poc.autoscaling.k8s.io/v1alpha1"
|
||||
)
|
||||
|
||||
var (
|
||||
|
|
@ -118,3 +119,145 @@ func TestHistogramMerge(t *testing.T) {
|
|||
h1.Merge(h2)
|
||||
assert.True(t, h1.Equals(expected))
|
||||
}
|
||||
|
||||
func TestHistogramSaveToCheckpointEmpty(t *testing.T) {
|
||||
h := NewHistogram(testHistogramOptions)
|
||||
s, err := h.SaveToChekpoint()
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 0., s.TotalWeight)
|
||||
assert.Len(t, s.BucketWeights, 0)
|
||||
}
|
||||
|
||||
func TestHistogramSaveToCheckpoint(t *testing.T) {
|
||||
h := NewHistogram(testHistogramOptions)
|
||||
h.AddSample(1, 1, anyTime)
|
||||
s, err := h.SaveToChekpoint()
|
||||
assert.NoError(t, err)
|
||||
bucket := testHistogramOptions.FindBucket(1)
|
||||
assert.Equal(t, 1., s.TotalWeight)
|
||||
assert.Len(t, s.BucketWeights, 1)
|
||||
assert.Contains(t, s.BucketWeights, bucket)
|
||||
assert.Equal(t, MaxCheckpointWeight, s.BucketWeights[bucket])
|
||||
}
|
||||
|
||||
func TestHistogramSaveToCheckpointDropsRelativelySmallValues(t *testing.T) {
|
||||
h := NewHistogram(testHistogramOptions)
|
||||
|
||||
v1, w1 := 1., 1.
|
||||
v2, w2 := 2., 100000.
|
||||
|
||||
h.AddSample(v1, w1, anyTime)
|
||||
h.AddSample(v2, w2, anyTime)
|
||||
|
||||
bucket1 := testHistogramOptions.FindBucket(v1)
|
||||
bucket2 := testHistogramOptions.FindBucket(v2)
|
||||
assert.NotEqualf(t, bucket1, bucket2, "For this test %v and %v have to be stored in different buckets", v1, v2)
|
||||
assert.True(t, w1 < (w2/float64(MaxCheckpointWeight))/2, "w1 to be omitted has to be less than (0.5*w2)/MaxCheckpointWeight")
|
||||
|
||||
s, err := h.SaveToChekpoint()
|
||||
assert.NoError(t, err)
|
||||
|
||||
assert.Equal(t, 100001. /*w1+w2*/, s.TotalWeight)
|
||||
// Bucket 1 shouldn't be there
|
||||
assert.Len(t, s.BucketWeights, 1)
|
||||
assert.Contains(t, s.BucketWeights, bucket2)
|
||||
assert.Equal(t, MaxCheckpointWeight, s.BucketWeights[bucket2])
|
||||
}
|
||||
|
||||
func TestHistogramSaveToCheckpointForMultipleValues(t *testing.T) {
|
||||
h := NewHistogram(testHistogramOptions)
|
||||
|
||||
v1, w1 := 1., 1.
|
||||
v2, w2 := 2., 10000.
|
||||
v3, w3 := 3., 50.
|
||||
|
||||
h.AddSample(v1, w1, anyTime)
|
||||
h.AddSample(v2, w2, anyTime)
|
||||
h.AddSample(v3, w3, anyTime)
|
||||
|
||||
bucket1 := testHistogramOptions.FindBucket(v1)
|
||||
bucket2 := testHistogramOptions.FindBucket(v2)
|
||||
bucket3 := testHistogramOptions.FindBucket(v3)
|
||||
|
||||
assert.Truef(t, areUnique(bucket1, bucket2, bucket3), "For this test values %v have to be stored in different buckets", []float64{v1, v2, v3})
|
||||
|
||||
s, err := h.SaveToChekpoint()
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 10051. /*w1 + w2 + w3*/, s.TotalWeight)
|
||||
assert.Len(t, s.BucketWeights, 3)
|
||||
assert.Equal(t, uint32(1), s.BucketWeights[bucket1])
|
||||
assert.Equal(t, uint32(10000), s.BucketWeights[bucket2])
|
||||
assert.Equal(t, uint32(50), s.BucketWeights[bucket3])
|
||||
}
|
||||
|
||||
func TestHistogramLoadFromCheckpoint(t *testing.T) {
|
||||
checkpoint := vpa_types.HistogramCheckpoint{
|
||||
TotalWeight: 6.0,
|
||||
BucketWeights: map[int]uint32{
|
||||
0: 1,
|
||||
1: 2,
|
||||
},
|
||||
}
|
||||
h := histogram{
|
||||
options: testHistogramOptions,
|
||||
bucketWeight: make([]float64, testHistogramOptions.NumBuckets()),
|
||||
totalWeight: 0.0,
|
||||
minBucket: testHistogramOptions.NumBuckets() - 1,
|
||||
maxBucket: 0}
|
||||
err := h.LoadFromCheckpoint(&checkpoint)
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 6.0, h.totalWeight)
|
||||
assert.Equal(t, 2.0, h.bucketWeight[0])
|
||||
assert.Equal(t, 4.0, h.bucketWeight[1])
|
||||
}
|
||||
|
||||
func TestHistogramLoadFromCheckpointReturnsErrorOnNegativeBucket(t *testing.T) {
|
||||
checkpoint := vpa_types.HistogramCheckpoint{
|
||||
TotalWeight: 1.0,
|
||||
BucketWeights: map[int]uint32{
|
||||
-1: 1,
|
||||
},
|
||||
}
|
||||
h := NewHistogram(testHistogramOptions)
|
||||
err := h.LoadFromCheckpoint(&checkpoint)
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
||||
func TestHistogramLoadFromCheckpointReturnsErrorOnInvalidBucket(t *testing.T) {
|
||||
checkpoint := vpa_types.HistogramCheckpoint{
|
||||
TotalWeight: 1.0,
|
||||
BucketWeights: map[int]uint32{
|
||||
99: 1,
|
||||
},
|
||||
}
|
||||
h := NewHistogram(testHistogramOptions)
|
||||
err := h.LoadFromCheckpoint(&checkpoint)
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
||||
func TestHistogramLoadFromCheckpointReturnsErrorNegativeTotaWeight(t *testing.T) {
|
||||
checkpoint := vpa_types.HistogramCheckpoint{
|
||||
TotalWeight: -1.0,
|
||||
BucketWeights: map[int]uint32{},
|
||||
}
|
||||
h := NewHistogram(testHistogramOptions)
|
||||
err := h.LoadFromCheckpoint(&checkpoint)
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
||||
func TestHistogramLoadFromCheckpointReturnsErrorOnNilInput(t *testing.T) {
|
||||
h := NewHistogram(testHistogramOptions)
|
||||
err := h.LoadFromCheckpoint(nil)
|
||||
assert.Error(t, err)
|
||||
}
|
||||
|
||||
func areUnique(values ...interface{}) bool {
|
||||
dict := make(map[interface{}]bool)
|
||||
for i, v := range values {
|
||||
dict[v] = true
|
||||
if len(dict) != i+1 {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue