feat(backend): add support for "finished_at" and others in filter. Fixes #8654 (#8662)

* add support for "finished_at" and other in filter

* update job map

* add UTs

* add integration test for filter

* update integration test

* fix integration test

* add more info

* fixed integration test

* fix small bug

* fix flakiness
This commit is contained in:
Lingqing Gan 2023-01-15 14:00:58 -05:00 committed by GitHub
parent 80c0dc50db
commit 6ee767769d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 137 additions and 12 deletions

View File

@ -5,6 +5,8 @@ import (
"google.golang.org/protobuf/testing/protocmp"
"testing"
"github.com/kubeflow/pipelines/backend/src/apiserver/model"
"github.com/Masterminds/squirrel"
"github.com/golang/protobuf/proto"
"github.com/google/go-cmp/cmp"
@ -352,3 +354,33 @@ func TestUnmarshalJSON(t *testing.T) {
t.Errorf("json.Unmarshal(%+v):\nGot: %v, Error: %v\nWant:\n%+v, Error: nil\nDiff:%s\n", in, got, err, want, cmp.Diff(want, got, cmp.AllowUnexported(Filter{})))
}
}
func TestNewWithKeyMap(t *testing.T) {
filterProto := &api.Filter{
Predicates: []*api.Predicate{
&api.Predicate{
Key: "finished_at",
Op: api.Predicate_GREATER_THAN,
Value: &api.Predicate_StringValue{StringValue: "SomeTime"},
},
},
}
want := &Filter{
filterProto: &api.Filter{
Predicates: []*api.Predicate{
&api.Predicate{
Key: "runs.FinishedAtInSec", Op: api.Predicate_GREATER_THAN,
Value: &api.Predicate_StringValue{StringValue: "SomeTime"},
},
},
},
gt: map[string][]interface{}{"runs.FinishedAtInSec": {"SomeTime"}},
}
got, err := NewWithKeyMap(filterProto, (&model.Run{}).APIToModelFieldMap(), "runs")
if err != nil || !cmp.Equal(got, want, cmpopts.EquateEmpty(), protocmp.Transform(), cmp.AllowUnexported(Filter{})) {
t.Errorf("NewWithKeyMap(%+v):\nGot: %+v, Error: %v\nWant:\n%+v, Error: nil\n", filterProto, got, err, want)
}
}

View File

@ -5,18 +5,17 @@ import (
"strings"
"testing"
"google.golang.org/protobuf/testing/protocmp"
api "github.com/kubeflow/pipelines/backend/api/v1beta1/go_client"
"github.com/kubeflow/pipelines/backend/src/apiserver/common"
"github.com/kubeflow/pipelines/backend/src/apiserver/filter"
"github.com/kubeflow/pipelines/backend/src/apiserver/model"
"github.com/kubeflow/pipelines/backend/src/common/util"
"github.com/stretchr/testify/assert"
sq "github.com/Masterminds/squirrel"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
api "github.com/kubeflow/pipelines/backend/api/v1beta1/go_client"
"github.com/stretchr/testify/assert"
"google.golang.org/protobuf/testing/protocmp"
)
type fakeMetric struct {
@ -497,6 +496,55 @@ func TestNewOptions_InvalidFilter(t *testing.T) {
}
}
func TestNewOptions_ModelFilter(t *testing.T) {
protoFilter := &api.Filter{
Predicates: []*api.Predicate{
&api.Predicate{
Key: "finished_at",
Op: api.Predicate_GREATER_THAN,
Value: &api.Predicate_StringValue{StringValue: "SomeTime"},
},
},
}
protoFilterWithRightKeyNames := &api.Filter{
Predicates: []*api.Predicate{
&api.Predicate{
Key: "FinishedAtInSec",
Op: api.Predicate_GREATER_THAN,
Value: &api.Predicate_StringValue{StringValue: "SomeTime"},
},
},
}
f, err := filter.New(protoFilterWithRightKeyNames)
if err != nil {
t.Fatalf("failed to parse filter proto %+v: %v", protoFilter, err)
}
got, err := NewOptions(&model.Run{}, 10, "name", protoFilter)
want := &Options{
PageSize: 10,
token: &token{
KeyFieldName: "UUID",
SortByFieldName: "DisplayName",
IsDesc: false,
Filter: f,
},
}
opts := []cmp.Option{
cmpopts.EquateEmpty(), protocmp.Transform(),
cmp.AllowUnexported(Options{}),
cmp.AllowUnexported(filter.Filter{}),
}
if !cmp.Equal(got, want, opts...) || err != nil {
t.Errorf("NewOptions(protoFilter=%+v) =\nGot: %+v, %v\nWant: %+v, nil\nDiff:\n%s",
protoFilter, got, err, want, cmp.Diff(got, want, opts...))
}
}
func TestAddPaginationAndFilterToSelect(t *testing.T) {
protoFilter := &api.Filter{
Predicates: []*api.Predicate{

View File

@ -89,10 +89,11 @@ func (j *Job) DefaultSortField() string {
}
var jobAPIToModelFieldMap = map[string]string{
"id": "UUID",
"name": "DisplayName",
"created_at": "CreatedAtInSec",
"package_id": "PipelineId",
"id": "UUID",
"name": "DisplayName",
"created_at": "CreatedAtInSec",
"updated_at": "UpdatedAtInSec",
"description": "Description",
}
// APIToModelFieldMap returns a map from API names to field names for model Job.

View File

@ -86,6 +86,7 @@ var runAPIToModelFieldMap = map[string]string{
"scheduled_at": "ScheduledAtInSec",
"storage_state": "StorageState",
"status": "Conditions",
"finished_at": "FinishedAtInSec",
}
// APIToModelFieldMap returns a map from API names to field names for model Run.

View File

@ -288,6 +288,26 @@ func (s *JobApiTestSuite) TestJobApis() {
assert.Equal(t, 1, totalSize)
assert.Equal(t, "hello world", jobs[0].Name)
/* ---------- List the jobs, filtered by created_at, only return the previous two jobs ---------- */
time.Sleep(5 * time.Second) // Sleep for 5 seconds to make sure the previous jobs are created at a different timestamp
filterTime := time.Now().Unix()
time.Sleep(5 * time.Second)
_, err = s.jobClient.Create(createJobRequest)
// Check total number of jobs to be 3
jobs, totalSize, _, err = test.ListAllJobs(s.jobClient, s.resourceNamespace)
assert.Nil(t, err)
assert.Equal(t, 3, totalSize)
assert.Equal(t, 3, len(jobs))
// Check number of filtered jobs finished before filterTime to be 2
jobs, totalSize, _, err = test.ListJobs(
s.jobClient,
&jobparams.ListJobsParams{
Filter: util.StringPointer(`{"predicates": [{"key": "created_at", "op": 6, "string_value": "` + fmt.Sprint(filterTime) + `"}]}`)},
s.resourceNamespace)
assert.Nil(t, err)
assert.Equal(t, 2, len(jobs))
assert.Equal(t, 2, totalSize)
// The scheduledWorkflow CRD would create the run and it synced to the DB by persistent agent.
// This could take a few seconds to finish.
@ -319,11 +339,11 @@ func (s *JobApiTestSuite) TestJobApis() {
if err != nil {
return err
}
if len(runs) != 1 {
return fmt.Errorf("expected runs to be length 1, got: %v", len(runs))
if len(runs) != 2 {
return fmt.Errorf("expected runs to be length 2, got: %v", len(runs))
}
if totalSize != 1 {
return fmt.Errorf("expected total size 1, got: %v", totalSize)
if totalSize != 2 {
return fmt.Errorf("expected total size 2, got: %v", totalSize)
}
argParamsRun := runs[0]
return s.checkArgParamsRun(argParamsRun, argParamsExperiment.ID, argParamsExperiment.Name, argParamsJob.ID, argParamsJob.Name)

View File

@ -1,6 +1,7 @@
package integration
import (
"fmt"
"io/ioutil"
"sort"
"testing"
@ -250,6 +251,28 @@ func (s *RunApiTestSuite) TestRunApis() {
assert.Equal(t, 1, totalSize)
assert.Equal(t, "hello world", runs[0].Name)
/* ---------- List the runs, filtered by created_at, only return the previous two runs ---------- */
time.Sleep(5 * time.Second) // Sleep for 5 seconds to make sure the previous runs are created at a different timestamp
filterTime := time.Now().Unix()
time.Sleep(5 * time.Second)
// Create a new run
_, _, err = s.runClient.Create(createRunRequest)
assert.Nil(t, err)
// Check total number of runs is 3
runs, totalSize, _, err = test.ListAllRuns(s.runClient, s.resourceNamespace)
assert.Nil(t, err)
assert.Equal(t, 3, len(runs))
assert.Equal(t, 3, totalSize)
// Check number of filtered runs created before filterTime to be 2
runs, totalSize, _, err = test.ListRuns(
s.runClient,
&runparams.ListRunsV1Params{
Filter: util.StringPointer(`{"predicates": [{"key": "created_at", "op": 6, "string_value": "` + fmt.Sprint(filterTime) + `"}]}`)},
s.resourceNamespace)
assert.Nil(t, err)
assert.Equal(t, 2, len(runs))
assert.Equal(t, 2, totalSize)
/* ---------- Archive a run ------------*/
err = s.runClient.Archive(&runparams.ArchiveRunV1Params{
ID: helloWorldRunDetail.Run.ID,