feat(cache): Explicitly specifying which attributes affect the cache key (#4076)
* Backend - Cache - Explicitly specifying which attributes affect the cache key Fixes https://github.com/kubeflow/pipelines/issues/4038 Fixes https://github.com/kubeflow/pipelines/issues/3972 * Fixed the test * Added comments to intersectStructureWithSkeleton * Fixed the tests incorrectly modifying a global variable * Added the test verifying the template cleanup
This commit is contained in:
parent
bb21597d43
commit
f35462fdb3
|
|
@ -47,7 +47,6 @@ const (
|
|||
SpecContainersPath string = "/spec/containers"
|
||||
SpecInitContainersPath string = "/spec/initContainers"
|
||||
TFXPodSuffix string = "tfx/orchestration/kubeflow/container_entrypoint.py"
|
||||
ArchiveLocationKey string = "archiveLocation"
|
||||
)
|
||||
|
||||
var (
|
||||
|
|
@ -170,6 +169,22 @@ func MutatePodIfCached(req *v1beta1.AdmissionRequest, clientMgr ClientManagerInt
|
|||
return patches, nil
|
||||
}
|
||||
|
||||
// intersectStructureWithSkeleton recursively intersects two maps
|
||||
// nil values in the skeleton map mean that the whole value (which can also be a map) should be kept.
|
||||
func intersectStructureWithSkeleton(src map[string]interface{}, skeleton map[string]interface{}) map[string]interface{} {
|
||||
result := make(map[string]interface{})
|
||||
for key, skeletonValue := range skeleton {
|
||||
if value, ok := src[key]; ok {
|
||||
if skeletonValue == nil {
|
||||
result[key] = value
|
||||
} else {
|
||||
result[key] = intersectStructureWithSkeleton(value.(map[string]interface{}), skeletonValue.(map[string]interface{}))
|
||||
}
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func generateCacheKeyFromTemplate(template string) (string, error) {
|
||||
var templateMap map[string]interface{}
|
||||
b := []byte(template)
|
||||
|
|
@ -178,14 +193,23 @@ func generateCacheKeyFromTemplate(template string) (string, error) {
|
|||
return "", err
|
||||
}
|
||||
|
||||
// template[archiveLocation] needs to be removed when calculating cache key.
|
||||
// Because archiveLocation.key is different in every single run.
|
||||
_, exists := templateMap[ArchiveLocationKey]
|
||||
if exists {
|
||||
log.Println("ArchiveLocation exists in template.")
|
||||
delete(templateMap, ArchiveLocationKey)
|
||||
// Selectively copying parts of the template that should affect the cache
|
||||
templateSkeleton := map[string]interface{}{
|
||||
"container": map[string]interface{}{
|
||||
"image": nil,
|
||||
"command": nil,
|
||||
"args": nil,
|
||||
"env": nil,
|
||||
"volumeMounts": nil,
|
||||
},
|
||||
"inputs": nil,
|
||||
"volumes": nil,
|
||||
"initContainers": nil,
|
||||
"sidecars": nil,
|
||||
}
|
||||
b, err = json.Marshal(templateMap)
|
||||
cacheKeyMap := intersectStructureWithSkeleton(templateMap, templateSkeleton)
|
||||
|
||||
b, err = json.Marshal(cacheKeyMap)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
|
|
|||
|
|
@ -37,7 +37,7 @@ var (
|
|||
ObjectMeta: metav1.ObjectMeta{
|
||||
Annotations: map[string]string{
|
||||
ArgoWorkflowNodeName: "test_node",
|
||||
ArgoWorkflowTemplate: `{"name": "test_template"}`,
|
||||
ArgoWorkflowTemplate: `{"name": "Does not matter","container":{"command":["echo", "Hello"],"image":"python:3.7"}}`,
|
||||
},
|
||||
Labels: map[string]string{
|
||||
ArgoCompleteLabelKey: "true",
|
||||
|
|
@ -108,7 +108,7 @@ func TestMutatePodIfCachedWithDecodeError(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestMutatePodIfCachedWithCacheDisabledPod(t *testing.T) {
|
||||
cacheDisabledPod := *fakePod
|
||||
cacheDisabledPod := *fakePod.DeepCopy()
|
||||
cacheDisabledPod.ObjectMeta.Labels[KFPCacheEnabledLabelKey] = "false"
|
||||
patchOperation, err := MutatePodIfCached(GetFakeRequestFromPod(&cacheDisabledPod), fakeClientManager)
|
||||
assert.Nil(t, patchOperation)
|
||||
|
|
@ -116,7 +116,7 @@ func TestMutatePodIfCachedWithCacheDisabledPod(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestMutatePodIfCachedWithTFXPod(t *testing.T) {
|
||||
tfxPod := *fakePod
|
||||
tfxPod := *fakePod.DeepCopy()
|
||||
mainContainerCommand := append(tfxPod.Spec.Containers[0].Command, "/tfx-src/"+TFXPodSuffix)
|
||||
tfxPod.Spec.Containers[0].Command = mainContainerCommand
|
||||
patchOperation, err := MutatePodIfCached(GetFakeRequestFromPod(&tfxPod), fakeClientManager)
|
||||
|
|
@ -135,9 +135,9 @@ func TestMutatePodIfCached(t *testing.T) {
|
|||
|
||||
func TestMutatePodIfCachedWithCacheEntryExist(t *testing.T) {
|
||||
executionCache := &model.ExecutionCache{
|
||||
ExecutionCacheKey: "f98b62e4625b9f96bac478ac72d88181a37e4f1d6bfd3bd5f53e29286b2ca034",
|
||||
ExecutionCacheKey: "f5fe913be7a4516ebfe1b5de29bcb35edd12ecc776b2f33f10ca19709ea3b2f0",
|
||||
ExecutionOutput: "testOutput",
|
||||
ExecutionTemplate: `{"name": "test_template"}`,
|
||||
ExecutionTemplate: `{"container":{"command":["echo", "Hello"],"image":"python:3.7"}}`,
|
||||
MaxCacheStaleness: -1,
|
||||
}
|
||||
fakeClientManager.CacheStore().CreateExecutionCache(executionCache)
|
||||
|
|
@ -150,3 +150,34 @@ func TestMutatePodIfCachedWithCacheEntryExist(t *testing.T) {
|
|||
require.Equal(t, patchOperation[1].Op, OperationTypeAdd)
|
||||
require.Equal(t, patchOperation[2].Op, OperationTypeAdd)
|
||||
}
|
||||
|
||||
func TestMutatePodIfCachedWithTeamplateCleanup(t *testing.T) {
|
||||
executionCache := &model.ExecutionCache{
|
||||
ExecutionCacheKey: "f5fe913be7a4516ebfe1b5de29bcb35edd12ecc776b2f33f10ca19709ea3b2f0",
|
||||
ExecutionOutput: "testOutput",
|
||||
ExecutionTemplate: `Cache key was calculated from this: {"container":{"command":["echo", "Hello"],"image":"python:3.7"}}`,
|
||||
MaxCacheStaleness: -1,
|
||||
}
|
||||
fakeClientManager.CacheStore().CreateExecutionCache(executionCache)
|
||||
|
||||
pod := *fakePod.DeepCopy()
|
||||
pod.ObjectMeta.Annotations[ArgoWorkflowTemplate] = `{
|
||||
"name": "Does not matter",
|
||||
"metadata": "anything",
|
||||
"container": {
|
||||
"image": "python:3.7",
|
||||
"command": ["echo", "Hello"]
|
||||
},
|
||||
"outputs": "anything",
|
||||
"foo": "bar"
|
||||
}`
|
||||
request := GetFakeRequestFromPod(&pod)
|
||||
|
||||
patchOperation, err := MutatePodIfCached(request, fakeClientManager)
|
||||
assert.Nil(t, err)
|
||||
require.NotNil(t, patchOperation)
|
||||
require.Equal(t, 3, len(patchOperation))
|
||||
require.Equal(t, patchOperation[0].Op, OperationTypeReplace)
|
||||
require.Equal(t, patchOperation[1].Op, OperationTypeAdd)
|
||||
require.Equal(t, patchOperation[2].Op, OperationTypeAdd)
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue