From 9f9f7bea94e15840f36f08b90a2c03203e886f91 Mon Sep 17 00:00:00 2001 From: Ville Aikas <11279988+vaikas@users.noreply.github.com> Date: Tue, 24 Mar 2020 16:06:09 -0700 Subject: [PATCH] Introduce key/value store interface + cm backed version of it. (#1173) * simple configstore for saving state in configmaps * introduce interface, tests * address pr feedback * return interface -> add Init to interface --- kvstore/kvstore.go | 35 +++++++ kvstore/kvstore_cm.go | 123 +++++++++++++++++++++++ kvstore/kvstore_cm_test.go | 196 +++++++++++++++++++++++++++++++++++++ 3 files changed, 354 insertions(+) create mode 100644 kvstore/kvstore.go create mode 100644 kvstore/kvstore_cm.go create mode 100644 kvstore/kvstore_cm_test.go diff --git a/kvstore/kvstore.go b/kvstore/kvstore.go new file mode 100644 index 000000000..be8ed2b66 --- /dev/null +++ b/kvstore/kvstore.go @@ -0,0 +1,35 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kvstore + +import ( + "context" +) + +type Interface interface { + // Init loads the configstore from the backing store if it exists, or + // if it does not, will create an empty one. + Init(ctx context.Context) error + // Load loads the configstore from the backing store + Load(ctx context.Context) error + // Save saves the configstore to the backing store + Save(ctx context.Context) error + // Get gets the key from the KVStore into the provided value + Get(ctx context.Context, key string, value interface{}) error + // Set sets the key into the KVStore from the provided value + Set(ctx context.Context, key string, value interface{}) error +} diff --git a/kvstore/kvstore_cm.go b/kvstore/kvstore_cm.go new file mode 100644 index 000000000..c4e1a6e86 --- /dev/null +++ b/kvstore/kvstore_cm.go @@ -0,0 +1,123 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Simple abstraction for storing state on a k8s ConfigMap. Very very simple +// and uses a single entry in the ConfigMap.data for storing serialized +// JSON of the generic data that Load/Save uses. Handy for things like sources +// that need to persist some state (checkpointing for example). +package kvstore + +import ( + "context" + "encoding/json" + "fmt" + + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + v1 "k8s.io/client-go/kubernetes/typed/core/v1" + "knative.dev/pkg/logging" +) + +type configMapKVStore struct { + cmClient v1.ConfigMapInterface + name string + namespace string + data map[string]string +} + +var ( + _ Interface = (*configMapKVStore)(nil) +) + +func NewConfigMapKVStore(ctx context.Context, name string, namespace string, clientset v1.CoreV1Interface) Interface { + + return &configMapKVStore{name: name, namespace: namespace, cmClient: clientset.ConfigMaps(namespace)} +} + +// Init initializes configMapKVStore either by loading or creating an empty one. +func (cs *configMapKVStore) Init(ctx context.Context) error { + l := logging.FromContext(ctx) + l.Info("Initializing configMapKVStore...") + + err := cs.Load(ctx) + if apierrors.IsNotFound(err) { + l.Info("No config found, creating empty") + return cs.createConfigMap() + } + return err +} + +// Load fetches the ConfigMap from k8s and unmarshals the data found +// in the configdatakey type as specified by value. +func (cs *configMapKVStore) Load(ctx context.Context) error { + cm, err := cs.cmClient.Get(cs.name, metav1.GetOptions{}) + if err != nil { + return err + } + cs.data = cm.Data + return nil +} + +// Save takes the value given in, and marshals it into a string +// and saves it into the k8s ConfigMap under the configdatakey. +func (cs *configMapKVStore) Save(ctx context.Context) error { + cm, err := cs.cmClient.Get(cs.name, metav1.GetOptions{}) + if err != nil { + return err + } + cm.Data = cs.data + _, err = cs.cmClient.Update(cm) + return err +} + +// Get retrieves and unmarshals the value from the map. +func (cs *configMapKVStore) Get(ctx context.Context, key string, value interface{}) error { + v, ok := cs.data[key] + if !ok { + return fmt.Errorf("key %s does not exist", key) + } + err := json.Unmarshal([]byte(v), value) + if err != nil { + return fmt.Errorf("Failed to Unmarshal %q: %v", v, err) + } + return nil +} + +// Set marshals and sets the value given under specified key. +func (cs *configMapKVStore) Set(ctx context.Context, key string, value interface{}) error { + bytes, err := json.Marshal(value) + if err != nil { + return fmt.Errorf("Failed to Marshal: %v", err) + } + cs.data[key] = string(bytes) + return nil +} + +func (cs *configMapKVStore) createConfigMap() error { + cm := &corev1.ConfigMap{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "ConfigMap", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: cs.name, + Namespace: cs.namespace, + }, + } + _, err := cs.cmClient.Create(cm) + return err +} diff --git a/kvstore/kvstore_cm_test.go b/kvstore/kvstore_cm_test.go new file mode 100644 index 000000000..e06b6df03 --- /dev/null +++ b/kvstore/kvstore_cm_test.go @@ -0,0 +1,196 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kvstore + +import ( + "context" + "encoding/json" + "reflect" + "testing" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/kubernetes/fake" + v1 "k8s.io/client-go/kubernetes/typed/core/v1" + clientgotesting "k8s.io/client-go/testing" +) + +const ( + namespace = "mynamespace" + name = "mycm" +) + +type testStruct struct { + LastThingProcessed string + Stuff []string +} + +type testClient struct { + created *corev1.ConfigMap + updated *corev1.ConfigMap + clientset v1.CoreV1Interface +} + +func NewTestClient(objects ...runtime.Object) *testClient { + tc := testClient{} + cs := fake.NewSimpleClientset(objects...) + cs.PrependReactor("create", "*", func(action clientgotesting.Action) (handled bool, ret runtime.Object, err error) { + createAction := action.(clientgotesting.CreateAction) + tc.created = createAction.GetObject().(*corev1.ConfigMap) + return true, tc.created, nil + }) + cs.PrependReactor("update", "*", func(action clientgotesting.Action) (handled bool, ret runtime.Object, err error) { + updateAction := action.(clientgotesting.UpdateAction) + tc.updated = updateAction.GetObject().(*corev1.ConfigMap) + return true, tc.updated, nil + }) + tc.clientset = cs.CoreV1() + return &tc +} + +func TestInitCreates(t *testing.T) { + tc := NewTestClient() + cs := NewConfigMapKVStore(context.Background(), name, namespace, tc.clientset) + err := cs.Init(context.Background()) + if err != nil { + t.Errorf("Failed to Init ConfigStore: %v", err) + } + if tc.created == nil { + t.Errorf("ConfigMap not created") + } + if len(tc.created.Data) != 0 { + t.Errorf("ConfigMap data is not empty") + } + if tc.updated != nil { + t.Errorf("ConfigMap updated") + } +} + +func TestLoadNonexisting(t *testing.T) { + tc := NewTestClient() + if NewConfigMapKVStore(context.Background(), name, namespace, tc.clientset).Load(context.Background()) == nil { + t.Error("non-existent store load didn't fail") + } +} + +func TestInitLoads(t *testing.T) { + tc := NewTestClient([]runtime.Object{configMap(map[string]string{"foo": marshal(t, "bar")})}...) + cs := NewConfigMapKVStore(context.Background(), name, namespace, tc.clientset) + err := cs.Init(context.Background()) + if err != nil { + t.Errorf("Failed to Init ConfigStore: %v", err) + } + if tc.created != nil { + t.Errorf("ConfigMap created") + } + if tc.updated != nil { + t.Errorf("ConfigMap updated") + } + var ret string + err = cs.Get(context.Background(), "foo", &ret) + if err != nil { + t.Errorf("failed to return string: %v", err) + } + if ret != "bar" { + t.Errorf("got back unexpected value, wanted %q got %q", "bar", ret) + } + if cs.Get(context.Background(), "not there", &ret) == nil { + t.Error("non-existent key didn't error") + } +} + +func TestLoadSaveUpdate(t *testing.T) { + tc := NewTestClient([]runtime.Object{configMap(map[string]string{"foo": marshal(t, "bar")})}...) + cs := NewConfigMapKVStore(context.Background(), name, namespace, tc.clientset) + err := cs.Init(context.Background()) + if err != nil { + t.Errorf("Failed to Init ConfigStore: %v", err) + } + cs.Set(context.Background(), "jimmy", "otherbar") + cs.Save(context.Background()) + if tc.updated == nil { + t.Errorf("ConfigMap Not updated") + } + var ret string + err = cs.Get(context.Background(), "jimmy", &ret) + if err != nil { + t.Errorf("failed to return string: %v", err) + } + if err != nil { + t.Errorf("failed to return string: %v", err) + } + if ret != "otherbar" { + t.Errorf("got back unexpected value, wanted %q got %q", "bar", ret) + } +} + +func TestLoadSaveUpdateComplex(t *testing.T) { + ts := testStruct{ + LastThingProcessed: "somethingie", + Stuff: []string{"first", "second", "third"}, + } + + tc := NewTestClient([]runtime.Object{configMap(map[string]string{"foo": marshal(t, &ts)})}...) + cs := NewConfigMapKVStore(context.Background(), name, namespace, tc.clientset) + err := cs.Init(context.Background()) + if err != nil { + t.Errorf("Failed to Init ConfigStore: %v", err) + } + ts2 := testStruct{ + LastThingProcessed: "otherthingie", + Stuff: []string{"fourth", "fifth", "sixth"}, + } + cs.Set(context.Background(), "jimmy", &ts2) + cs.Save(context.Background()) + if tc.updated == nil { + t.Errorf("ConfigMap Not updated") + } + var ret testStruct + err = cs.Get(context.Background(), "jimmy", &ret) + if err != nil { + t.Errorf("failed to return string: %v", err) + } + if err != nil { + t.Errorf("failed to return string: %v", err) + } + if !reflect.DeepEqual(ret, ts2) { + t.Errorf("got back unexpected value, wanted %+v got %+v", ts2, ret) + } +} + +func marshal(t *testing.T, value interface{}) string { + bytes, err := json.Marshal(value) + if err != nil { + t.Fatalf("Failed to Marshal %q: %v", value, err) + } + return string(bytes) +} + +func configMap(data map[string]string) *corev1.ConfigMap { + return &corev1.ConfigMap{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "ConfigMap", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + }, + Data: data, + } +}