Introduce key/value store interface + cm backed version of it. (#1173)

* simple configstore for saving state in configmaps

* introduce interface, tests

* address pr feedback

* return interface -> add Init to interface
This commit is contained in:
Ville Aikas 2020-03-24 16:06:09 -07:00 committed by GitHub
parent 0840da9555
commit 9f9f7bea94
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 354 additions and 0 deletions

35
kvstore/kvstore.go Normal file
View File

@ -0,0 +1,35 @@
/*
Copyright 2020 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kvstore
import (
"context"
)
type Interface interface {
// Init loads the configstore from the backing store if it exists, or
// if it does not, will create an empty one.
Init(ctx context.Context) error
// Load loads the configstore from the backing store
Load(ctx context.Context) error
// Save saves the configstore to the backing store
Save(ctx context.Context) error
// Get gets the key from the KVStore into the provided value
Get(ctx context.Context, key string, value interface{}) error
// Set sets the key into the KVStore from the provided value
Set(ctx context.Context, key string, value interface{}) error
}

123
kvstore/kvstore_cm.go Normal file
View File

@ -0,0 +1,123 @@
/*
Copyright 2020 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Simple abstraction for storing state on a k8s ConfigMap. Very very simple
// and uses a single entry in the ConfigMap.data for storing serialized
// JSON of the generic data that Load/Save uses. Handy for things like sources
// that need to persist some state (checkpointing for example).
package kvstore
import (
"context"
"encoding/json"
"fmt"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
"knative.dev/pkg/logging"
)
type configMapKVStore struct {
cmClient v1.ConfigMapInterface
name string
namespace string
data map[string]string
}
var (
_ Interface = (*configMapKVStore)(nil)
)
func NewConfigMapKVStore(ctx context.Context, name string, namespace string, clientset v1.CoreV1Interface) Interface {
return &configMapKVStore{name: name, namespace: namespace, cmClient: clientset.ConfigMaps(namespace)}
}
// Init initializes configMapKVStore either by loading or creating an empty one.
func (cs *configMapKVStore) Init(ctx context.Context) error {
l := logging.FromContext(ctx)
l.Info("Initializing configMapKVStore...")
err := cs.Load(ctx)
if apierrors.IsNotFound(err) {
l.Info("No config found, creating empty")
return cs.createConfigMap()
}
return err
}
// Load fetches the ConfigMap from k8s and unmarshals the data found
// in the configdatakey type as specified by value.
func (cs *configMapKVStore) Load(ctx context.Context) error {
cm, err := cs.cmClient.Get(cs.name, metav1.GetOptions{})
if err != nil {
return err
}
cs.data = cm.Data
return nil
}
// Save takes the value given in, and marshals it into a string
// and saves it into the k8s ConfigMap under the configdatakey.
func (cs *configMapKVStore) Save(ctx context.Context) error {
cm, err := cs.cmClient.Get(cs.name, metav1.GetOptions{})
if err != nil {
return err
}
cm.Data = cs.data
_, err = cs.cmClient.Update(cm)
return err
}
// Get retrieves and unmarshals the value from the map.
func (cs *configMapKVStore) Get(ctx context.Context, key string, value interface{}) error {
v, ok := cs.data[key]
if !ok {
return fmt.Errorf("key %s does not exist", key)
}
err := json.Unmarshal([]byte(v), value)
if err != nil {
return fmt.Errorf("Failed to Unmarshal %q: %v", v, err)
}
return nil
}
// Set marshals and sets the value given under specified key.
func (cs *configMapKVStore) Set(ctx context.Context, key string, value interface{}) error {
bytes, err := json.Marshal(value)
if err != nil {
return fmt.Errorf("Failed to Marshal: %v", err)
}
cs.data[key] = string(bytes)
return nil
}
func (cs *configMapKVStore) createConfigMap() error {
cm := &corev1.ConfigMap{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "ConfigMap",
},
ObjectMeta: metav1.ObjectMeta{
Name: cs.name,
Namespace: cs.namespace,
},
}
_, err := cs.cmClient.Create(cm)
return err
}

196
kvstore/kvstore_cm_test.go Normal file
View File

@ -0,0 +1,196 @@
/*
Copyright 2020 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kvstore
import (
"context"
"encoding/json"
"reflect"
"testing"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/fake"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
clientgotesting "k8s.io/client-go/testing"
)
const (
namespace = "mynamespace"
name = "mycm"
)
type testStruct struct {
LastThingProcessed string
Stuff []string
}
type testClient struct {
created *corev1.ConfigMap
updated *corev1.ConfigMap
clientset v1.CoreV1Interface
}
func NewTestClient(objects ...runtime.Object) *testClient {
tc := testClient{}
cs := fake.NewSimpleClientset(objects...)
cs.PrependReactor("create", "*", func(action clientgotesting.Action) (handled bool, ret runtime.Object, err error) {
createAction := action.(clientgotesting.CreateAction)
tc.created = createAction.GetObject().(*corev1.ConfigMap)
return true, tc.created, nil
})
cs.PrependReactor("update", "*", func(action clientgotesting.Action) (handled bool, ret runtime.Object, err error) {
updateAction := action.(clientgotesting.UpdateAction)
tc.updated = updateAction.GetObject().(*corev1.ConfigMap)
return true, tc.updated, nil
})
tc.clientset = cs.CoreV1()
return &tc
}
func TestInitCreates(t *testing.T) {
tc := NewTestClient()
cs := NewConfigMapKVStore(context.Background(), name, namespace, tc.clientset)
err := cs.Init(context.Background())
if err != nil {
t.Errorf("Failed to Init ConfigStore: %v", err)
}
if tc.created == nil {
t.Errorf("ConfigMap not created")
}
if len(tc.created.Data) != 0 {
t.Errorf("ConfigMap data is not empty")
}
if tc.updated != nil {
t.Errorf("ConfigMap updated")
}
}
func TestLoadNonexisting(t *testing.T) {
tc := NewTestClient()
if NewConfigMapKVStore(context.Background(), name, namespace, tc.clientset).Load(context.Background()) == nil {
t.Error("non-existent store load didn't fail")
}
}
func TestInitLoads(t *testing.T) {
tc := NewTestClient([]runtime.Object{configMap(map[string]string{"foo": marshal(t, "bar")})}...)
cs := NewConfigMapKVStore(context.Background(), name, namespace, tc.clientset)
err := cs.Init(context.Background())
if err != nil {
t.Errorf("Failed to Init ConfigStore: %v", err)
}
if tc.created != nil {
t.Errorf("ConfigMap created")
}
if tc.updated != nil {
t.Errorf("ConfigMap updated")
}
var ret string
err = cs.Get(context.Background(), "foo", &ret)
if err != nil {
t.Errorf("failed to return string: %v", err)
}
if ret != "bar" {
t.Errorf("got back unexpected value, wanted %q got %q", "bar", ret)
}
if cs.Get(context.Background(), "not there", &ret) == nil {
t.Error("non-existent key didn't error")
}
}
func TestLoadSaveUpdate(t *testing.T) {
tc := NewTestClient([]runtime.Object{configMap(map[string]string{"foo": marshal(t, "bar")})}...)
cs := NewConfigMapKVStore(context.Background(), name, namespace, tc.clientset)
err := cs.Init(context.Background())
if err != nil {
t.Errorf("Failed to Init ConfigStore: %v", err)
}
cs.Set(context.Background(), "jimmy", "otherbar")
cs.Save(context.Background())
if tc.updated == nil {
t.Errorf("ConfigMap Not updated")
}
var ret string
err = cs.Get(context.Background(), "jimmy", &ret)
if err != nil {
t.Errorf("failed to return string: %v", err)
}
if err != nil {
t.Errorf("failed to return string: %v", err)
}
if ret != "otherbar" {
t.Errorf("got back unexpected value, wanted %q got %q", "bar", ret)
}
}
func TestLoadSaveUpdateComplex(t *testing.T) {
ts := testStruct{
LastThingProcessed: "somethingie",
Stuff: []string{"first", "second", "third"},
}
tc := NewTestClient([]runtime.Object{configMap(map[string]string{"foo": marshal(t, &ts)})}...)
cs := NewConfigMapKVStore(context.Background(), name, namespace, tc.clientset)
err := cs.Init(context.Background())
if err != nil {
t.Errorf("Failed to Init ConfigStore: %v", err)
}
ts2 := testStruct{
LastThingProcessed: "otherthingie",
Stuff: []string{"fourth", "fifth", "sixth"},
}
cs.Set(context.Background(), "jimmy", &ts2)
cs.Save(context.Background())
if tc.updated == nil {
t.Errorf("ConfigMap Not updated")
}
var ret testStruct
err = cs.Get(context.Background(), "jimmy", &ret)
if err != nil {
t.Errorf("failed to return string: %v", err)
}
if err != nil {
t.Errorf("failed to return string: %v", err)
}
if !reflect.DeepEqual(ret, ts2) {
t.Errorf("got back unexpected value, wanted %+v got %+v", ts2, ret)
}
}
func marshal(t *testing.T, value interface{}) string {
bytes, err := json.Marshal(value)
if err != nil {
t.Fatalf("Failed to Marshal %q: %v", value, err)
}
return string(bytes)
}
func configMap(data map[string]string) *corev1.ConfigMap {
return &corev1.ConfigMap{
TypeMeta: metav1.TypeMeta{
APIVersion: "v1",
Kind: "ConfigMap",
},
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
},
Data: data,
}
}