feat(frontend): Runtime DAG in RunDetailsV2. Fix #6673 (#6694)

* feat(frontend): Runtime DAG in RunDetailsV2

* remove debug log
This commit is contained in:
James Liu 2021-10-07 00:55:22 -07:00 committed by GitHub
parent ea205cfa32
commit 74c7773ca4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 946 additions and 39 deletions

View File

@ -55,6 +55,12 @@ development.
Run `npm run mock:api` to start a mock backend api server handler so it can
serve basic api calls with mock data.
If you want to port real MLMD store to be used for mock backend scenario, you can run the following command. Note that a mock MLMD store doesn't exist yet.
```
kubectl port-forward svc/metadata-envoy-service 9090:9090
```
### Proxy to a real cluster
This requires you already have a real KFP cluster, you can proxy requests to it.

View File

@ -16,14 +16,16 @@ import { ApiExperiment } from '../src/apis/experiment';
import { ApiJob } from '../src/apis/job';
import { ApiPipeline, ApiPipelineVersion } from '../src/apis/pipeline';
import { ApiRelationship, ApiResourceType, ApiRunDetail, RunMetricFormat } from '../src/apis/run';
import v2_lightweight_python_pipeline from './data/v2/pipeline/mock_lightweight_python_functions_v2_pipeline.json';
import xgboost_sample_pipeline from './data/v2/pipeline/xgboost_sample_pipeline.json';
import helloWorldRun from './hello-world-runtime';
import helloWorldWithStepsRun from './hello-world-with-steps-runtime';
import jsonRun from './json-runtime';
import largeGraph from './large-graph-runtime';
import coinflipRun from './mock-coinflip-runtime';
import errorRun from './mock-error-runtime';
import xgboostRun from './mock-xgboost-runtime';
import largeGraph from './large-graph-runtime';
import retryRun from './mock-retry-runtime';
import xgboostRun from './mock-xgboost-runtime';
function padStartTwoZeroes(str: string): string {
let padded = str || '';
@ -359,6 +361,11 @@ const jobs: ApiJob[] = [
jobs.push(...generateNJobs());
const experiments: ApiExperiment[] = [
{
description: 'This experiment includes KFP v2 runs',
id: '275ea11d-ac63-4ce3-bc33-ec81981ed56b',
name: 'KFP v2 Runs',
},
{
description: 'This experiment has no runs',
id: '7fc01714-4a13-4c05-5902-a8a72c14253b',
@ -408,6 +415,88 @@ const versions: ApiPipelineVersion[] = [
];
const runs: ApiRunDetail[] = [
{
pipeline_runtime: {
// workflow_manifest: JSON.stringify(coinflipRun),
},
run: {
created_at: new Date('2021-05-17T20:58:23.000Z'),
description: 'V2 xgboost',
finished_at: new Date('2021-05-18T21:01:23.000Z'),
id: 'e0115ac1-0479-4194-a22d-01e65e09a32b',
name: 'v2-xgboost-ilbo',
pipeline_spec: {
pipeline_id: PIPELINE_V2_XGBOOST.id,
pipeline_name: PIPELINE_V2_XGBOOST_DEFAULT.name,
workflow_manifest: JSON.stringify(xgboost_sample_pipeline),
},
resource_references: [
{
key: {
id: '275ea11d-ac63-4ce3-bc33-ec81981ed56b',
type: ApiResourceType.EXPERIMENT,
},
relationship: ApiRelationship.OWNER,
},
],
scheduled_at: new Date('2021-05-17T20:58:23.000Z'),
status: 'Succeeded',
},
},
{
pipeline_runtime: {
// workflow_manifest: JSON.stringify(coinflipRun),
},
run: {
created_at: new Date('2021-04-17T20:58:23.000Z'),
description: 'V2 two steps run from pipeline template',
finished_at: new Date('2021-04-18T21:01:23.000Z'),
id: 'c1e11ff7-e1af-4a8d-a9e4-718f32934ae0',
name: 'v2-lightweight-two-steps-i5jk',
pipeline_spec: {
pipeline_id: PIPELINE_V2_PYTHON_TWO_STEPS_DEFAULT.id,
pipeline_name: PIPELINE_V2_PYTHON_TWO_STEPS_DEFAULT.name,
workflow_manifest: JSON.stringify(v2_lightweight_python_pipeline),
},
resource_references: [
{
key: {
id: '275ea11d-ac63-4ce3-bc33-ec81981ed56b',
type: ApiResourceType.EXPERIMENT,
},
relationship: ApiRelationship.OWNER,
},
],
scheduled_at: new Date('2021-04-17T20:58:23.000Z'),
status: 'Succeeded',
},
},
{
pipeline_runtime: {
// workflow_manifest: JSON.stringify(v2_lightweight_python_pipeline),
},
run: {
created_at: new Date('2021-03-17T20:58:23.000Z'),
description: 'V2 two steps run from SDK',
finished_at: new Date('2021-03-18T21:01:23.000Z'),
id: '3308d0ec-f1b3-4488-a2d3-8ad0f91e88f8',
name: 'v2-lightweight-two-steps-jk4u',
pipeline_spec: {
workflow_manifest: JSON.stringify(v2_lightweight_python_pipeline),
},
resource_references: [
{
key: {
id: '275ea11d-ac63-4ce3-bc33-ec81981ed56b',
type: ApiResourceType.EXPERIMENT,
},
relationship: ApiRelationship.OWNER,
},
],
scheduled_at: new Date('2021-03-17T20:58:23.000Z'),
status: 'Succeeded',
},
},
{
pipeline_runtime: {
workflow_manifest: JSON.stringify(coinflipRun),

View File

@ -13,6 +13,7 @@
// limitations under the License.
import express from 'express';
import proxy from 'http-proxy-middleware';
import mockApiMiddleware from './mock-api-middleware';
const app = express();
@ -28,9 +29,44 @@ app.use((_: any, res: any, next: any) => {
next();
});
export const HACK_FIX_HPM_PARTIAL_RESPONSE_HEADERS = {
Connection: 'keep-alive',
};
// To enable porting MLMD to mock backend, run following command:
// kubectl port-forward svc/metadata-envoy-service 9090:9090
/** Proxy metadata requests to the Envoy instance which will handle routing to the metadata gRPC server */
app.all(
'/ml_metadata.*',
proxy({
changeOrigin: true,
onProxyReq: proxyReq => {
console.log('Metadata proxied request: ', (proxyReq as any).path);
},
headers: HACK_FIX_HPM_PARTIAL_RESPONSE_HEADERS,
target: getAddress({ host: 'localhost', port: '9090' }),
}),
);
mockApiMiddleware(app as any);
app.listen(port, () => {
// tslint:disable-next-line:no-console
console.log('Server listening at http://localhost:' + port);
});
export function getAddress({
host,
port,
namespace,
schema = 'http',
}: {
host: string;
port?: string | number;
namespace?: string;
schema?: string;
}) {
namespace = namespace ? `.${namespace}` : '';
port = port ? `:${port}` : '';
return `${schema}://${host}${namespace}${port}`;
}

View File

@ -14,41 +14,41 @@
* limitations under the License.
*/
import * as React from 'react';
import ArtifactList from '../pages/ArtifactList';
import ArtifactDetails from '../pages/ArtifactDetails';
import Banner, { BannerProps } from '../components/Banner';
import Button from '@material-ui/core/Button';
import Compare from '../pages/Compare';
import Dialog from '@material-ui/core/Dialog';
import DialogActions from '@material-ui/core/DialogActions';
import DialogContent from '@material-ui/core/DialogContent';
import DialogTitle from '@material-ui/core/DialogTitle';
import ExecutionList from '../pages/ExecutionList';
import ExecutionDetails from '../pages/ExecutionDetails';
import ExperimentDetails from '../pages/ExperimentDetails';
import Snackbar, { SnackbarProps } from '@material-ui/core/Snackbar';
import * as React from 'react';
import { Redirect, Route, Switch } from 'react-router-dom';
import FrontendFeatures from 'src/pages/FrontendFeatures';
import RunDetailsRouter from 'src/pages/RunDetailsRouter';
import { classes, stylesheet } from 'typestyle';
import Banner, { BannerProps } from '../components/Banner';
import { commonCss } from '../Css';
import { Deployments, KFP_FLAGS } from '../lib/Flags';
import Page404 from '../pages/404';
import AllExperimentsAndArchive, {
AllExperimentsAndArchiveTab,
} from '../pages/AllExperimentsAndArchive';
import AllRunsAndArchive, { AllRunsAndArchiveTab } from '../pages/AllRunsAndArchive';
import AllRecurringRunsList from '../pages/AllRecurringRunsList';
import AllRunsAndArchive, { AllRunsAndArchiveTab } from '../pages/AllRunsAndArchive';
import ArtifactDetails from '../pages/ArtifactDetails';
import ArtifactList from '../pages/ArtifactList';
import Compare from '../pages/Compare';
import ExecutionDetails from '../pages/ExecutionDetails';
import ExecutionList from '../pages/ExecutionList';
import ExperimentDetails from '../pages/ExperimentDetails';
import { GettingStarted } from '../pages/GettingStarted';
import NewExperiment from '../pages/NewExperiment';
import NewPipelineVersion from '../pages/NewPipelineVersion';
import NewRun from '../pages/NewRun';
import Page404 from '../pages/404';
import PipelineDetails from '../pages/PipelineDetails';
import PipelineList from '../pages/PipelineList';
import RecurringRunDetails from '../pages/RecurringRunDetails';
import RunDetails from '../pages/RunDetails';
import SideNav from './SideNav';
import Snackbar, { SnackbarProps } from '@material-ui/core/Snackbar';
import Toolbar, { ToolbarProps } from './Toolbar';
import { Route, Switch, Redirect } from 'react-router-dom';
import { classes, stylesheet } from 'typestyle';
import { commonCss } from '../Css';
import NewPipelineVersion from '../pages/NewPipelineVersion';
import { GettingStarted } from '../pages/GettingStarted';
import { KFP_FLAGS, Deployments } from '../lib/Flags';
import FrontendFeatures from 'src/pages/FrontendFeatures';
export type RouteConfig = {
path: string;
@ -193,8 +193,8 @@ const Router: React.FC<RouterProps> = ({ configs }) => {
{ path: RoutePage.RUNS, Component: AllRunsAndArchive, view: AllRunsAndArchiveTab.RUNS },
{ path: RoutePage.RECURRING_RUNS, Component: AllRecurringRunsList },
{ path: RoutePage.RECURRING_RUN_DETAILS, Component: RecurringRunDetails },
{ path: RoutePage.RUN_DETAILS, Component: RunDetails },
{ path: RoutePage.RUN_DETAILS_WITH_EXECUTION, Component: RunDetails },
{ path: RoutePage.RUN_DETAILS, Component: RunDetailsRouter },
{ path: RoutePage.RUN_DETAILS_WITH_EXECUTION, Component: RunDetailsRouter },
{ path: RoutePage.COMPARE, Component: Compare },
{ path: RoutePage.FRONTEND_FEATURES, Component: FrontendFeatures },
];

View File

@ -0,0 +1,103 @@
// Copyright 2021 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import * as TWO_STEP_PIPELINE from 'src/data/test/mock_lightweight_python_functions_v2_pipeline.json';
import { PipelineSpec } from 'src/generated/pipeline_spec';
import { ml_pipelines } from 'src/generated/pipeline_spec/pbjs_ml_pipelines';
import { Artifact, Event, Execution, Value } from 'src/third_party/mlmd';
import { TASK_NAME_KEY, updateFlowElementsState } from './DynamicFlow';
import { convertFlowElements } from './StaticFlow';
describe('DynamicFlow', () => {
it('update node status based on MLMD', () => {
// Prepare MLMD objects.
const EXECUTION_PREPROCESS = new Execution()
.setId(3)
.setLastKnownState(Execution.State.COMPLETE);
EXECUTION_PREPROCESS.getCustomPropertiesMap().set(
TASK_NAME_KEY,
new Value().setStringValue('preprocess'),
);
const EXECUTION_TRAIN = new Execution().setId(4).setLastKnownState(Execution.State.FAILED);
EXECUTION_TRAIN.getCustomPropertiesMap().set(
TASK_NAME_KEY,
new Value().setStringValue('train'),
);
const ARTIFACT_OUTPUT_DATA_ONE = new Artifact().setId(1).setState(Artifact.State.LIVE);
const ARTIFACT_OUTPUT_DATA_TWO = new Artifact().setId(2).setState(Artifact.State.PENDING);
const ARTIFACT_MODEL = new Artifact().setId(3).setState(Artifact.State.DELETED);
const EVENT_PREPROCESS_OUTPUT_DATA_ONE = new Event()
.setExecutionId(3)
.setArtifactId(1)
.setType(Event.Type.OUTPUT)
.setPath(new Event.Path().setStepsList([new Event.Path.Step().setKey('output_dataset_one')]));
const EVENT_PREPROCESS_OUTPUT_DATA_TWO = new Event()
.setExecutionId(3)
.setArtifactId(2)
.setType(Event.Type.OUTPUT)
.setPath(
new Event.Path().setStepsList([new Event.Path.Step().setKey('output_dataset_two_path')]),
);
const EVENT_OUTPUT_DATA_ONE_TRAIN = new Event().setExecutionId(4).setArtifactId(1);
const EVENT_OUTPUT_DATA_TWO_TRAIN = new Event().setExecutionId(4).setArtifactId(2);
const EVENT_TRAIN_MODEL = new Event()
.setExecutionId(4)
.setArtifactId(3)
.setType(Event.Type.OUTPUT)
.setPath(new Event.Path().setStepsList([new Event.Path.Step().setKey('model')]));
// Converts to static graph first, its type is Elements<any>.
const jsonObject = TWO_STEP_PIPELINE;
const message = ml_pipelines.PipelineSpec.fromObject(jsonObject['pipelineSpec']);
const buffer = ml_pipelines.PipelineSpec.encode(message).finish();
const pipelineSpec = PipelineSpec.deserializeBinary(buffer);
const graph = convertFlowElements(pipelineSpec);
// MLMD objects to provide node states.
const executions: Execution[] = [EXECUTION_PREPROCESS, EXECUTION_TRAIN];
const events: Event[] = [
EVENT_PREPROCESS_OUTPUT_DATA_ONE,
EVENT_PREPROCESS_OUTPUT_DATA_TWO,
EVENT_OUTPUT_DATA_ONE_TRAIN,
EVENT_OUTPUT_DATA_TWO_TRAIN,
EVENT_TRAIN_MODEL,
];
const artifacts: Artifact[] = [
ARTIFACT_OUTPUT_DATA_ONE,
ARTIFACT_OUTPUT_DATA_TWO,
ARTIFACT_MODEL,
];
updateFlowElementsState(graph, executions, events, artifacts);
for (let element of graph) {
graph
.filter(e => e.id === element.id)
.forEach(e => {
if (e.id === 'task.preprocess') {
expect(e.data.state).toEqual(EXECUTION_PREPROCESS.getLastKnownState());
} else if (e.id === 'task.train') {
expect(e.data.state).toEqual(EXECUTION_TRAIN.getLastKnownState());
} else if (e.id === 'artifact.preprocess.output_dataset_one') {
expect(e.data.state).toEqual(ARTIFACT_OUTPUT_DATA_ONE.getState());
} else if (e.id === 'artifact.preprocess.output_dataset_two_path') {
expect(e.data.state).toEqual(ARTIFACT_OUTPUT_DATA_TWO.getState());
} else if (e.id === 'artifact.train.model') {
expect(e.data.state).toEqual(ARTIFACT_MODEL.getState());
}
});
}
});
});

View File

@ -0,0 +1,176 @@
/*
* Copyright 2021 The Kubeflow Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { ArtifactFlowElementData, ExecutionFlowElementData } from 'src/components/graph/Constants';
import {
getArtifactNodeKey,
getTaskKeyFromNodeKey,
NodeTypeNames,
PipelineFlowElement,
} from 'src/lib/v2/StaticFlow';
import { getArtifactNameFromEvent } from 'src/mlmd/MlmdUtils';
import { Artifact, Event, Execution, Value } from 'src/third_party/mlmd';
export const TASK_NAME_KEY = 'task_name';
// 1. Get the Pipeline Run context using run ID (FOR subDAG, we need to wait for design)
// 2. Fetch all executions by context. Create Map for task_name => Execution
// 3. Fetch all Events by Context. Create Map for OUTPUT events: execution_id => Events
// 5. Fetch all Artifacts by Context.
// 6. Create Map for artifacts: artifact_id => Artifact
// a. For each task in the flowElements, find its execution state.
// b. For each artifact node, get its task name.
// c. get Execution from Map, then get execution_id.
// d. get Events from Map, then get artifact name from path.
// e. for the Event which matches artifact name, get artifact_id.
// f. get Artifact and update the state.
// Construct ArtifactNodeKey -> Artifact Map
// for each OUTPUT event, get execution id and artifact id
// get execution task_name from Execution map
// get artifact name from Event path
// get Artifact from Artifact map
// set ArtifactNodeKey -> Artifact.
// Elements change to Map node key => node, edge key => edge
// For each node: (DAG execution doesn't have design yet)
// If TASK:
// Find exeuction from using task_name
// Update with execution state
// If ARTIFACT:
// Get task_name and artifact_name
// Get artifact from Master Map
// Update with artifact state
// IF SUBDAG: (Not designed)
// similar to TASK, but needs to determine subDAG type.
// IMPORTANT: All the updates are in-place for PipelineFlowElement. Therefore it is no return value.
// Questions:
// How to handle DAG state?
// How to handle subDAG input artifacts and parameters?
// How to handle if-condition? and show the state
// How to handle parallel-for? and list of workers.
export function updateFlowElementsState(
elems: PipelineFlowElement[],
executions: Execution[],
events: Event[],
artifacts: Artifact[],
) {
// IMPORTANT: PipelineFlowElement update is in-place.
const taskNameToExecution = getTaskNameToExecution(executions);
const executionIdToExectuion = getExectuionIdToExecution(executions);
const artifactIdToArtifact = getArtifactIdToArtifact(artifacts);
const artifactNodeKeyToArtifact = getArtifactNodeKeyToArtifact(
events,
executionIdToExectuion,
artifactIdToArtifact,
);
for (let elem of elems) {
let updatedElem = elem;
if (NodeTypeNames.EXECUTION === elem.type) {
const taskName = getTaskKeyFromNodeKey(elem.id);
const execution = taskNameToExecution.get(taskName);
(updatedElem.data as ExecutionFlowElementData).state = execution?.getLastKnownState();
} else if (NodeTypeNames.ARTIFACT === elem.type) {
const artifact = artifactNodeKeyToArtifact.get(elem.id);
(updatedElem.data as ArtifactFlowElementData).state = artifact?.getState();
} else if (NodeTypeNames.SUB_DAG === elem.type) {
// TODO: Update sub-dag state based on future design.
} else {
// Edges don't have types yet.
// For any element that don't match the above types, copy over directly.
}
}
}
function getTaskNameToExecution(executions: Execution[]): Map<string, Execution> {
const map = new Map<string, Execution>();
for (let exec of executions) {
const taskName = getTaskName(exec);
if (!taskName) {
continue;
}
map.set(taskName.getStringValue(), exec);
}
return map;
}
function getExectuionIdToExecution(executions: Execution[]): Map<number, Execution> {
const map = new Map<number, Execution>();
for (let exec of executions) {
map.set(exec.getId(), exec);
}
return map;
}
function getArtifactIdToArtifact(artifacts: Artifact[]): Map<number, Artifact> {
const map = new Map<number, Artifact>();
for (let artifact of artifacts) {
map.set(artifact.getId(), artifact);
}
return map;
}
function getArtifactNodeKeyToArtifact(
events: Event[],
executionIdToExectuion: Map<number, Execution>,
artifactIdToArtifact: Map<number, Artifact>,
): Map<string, Artifact> {
const map = new Map<string, Artifact>();
const outputEvents = events.filter(event => event.getType() === Event.Type.OUTPUT);
for (let event of outputEvents) {
const executionId = event.getExecutionId();
const execution = executionIdToExectuion.get(executionId);
if (!execution) {
console.warn("Execution doesn't exist for ID " + executionId);
continue;
}
const taskName = getTaskName(execution);
if (!taskName) {
continue;
}
const artifactId = event.getArtifactId();
const artifact = artifactIdToArtifact.get(artifactId);
if (!artifact) {
console.warn("Artifact doesn't exist for ID " + artifactId);
continue;
}
const artifactName = getArtifactNameFromEvent(event);
if (!artifactName) {
console.warn("Artifact name doesn't exist in Event. Artifact ID " + artifactId);
continue;
}
const key = getArtifactNodeKey(taskName.getStringValue(), artifactName);
map.set(key, artifact);
}
return map;
}
function getTaskName(exec: Execution): Value | undefined {
const customProperties = exec.getCustomPropertiesMap();
if (!customProperties.has(TASK_NAME_KEY)) {
console.warn("task_name key doesn't exist for custom properties of Execution " + exec.getId());
return undefined;
}
const taskName = customProperties.get(TASK_NAME_KEY);
if (!taskName) {
console.warn(
"task_name value doesn't exist for custom properties of Execution " + exec.getId(),
);
return undefined;
}
return taskName;
}

View File

@ -398,7 +398,7 @@ export function isTaskNode(nodeKey: string) {
}
const ARTIFACT_NODE_KEY_PREFIX = 'artifact.';
function getArtifactNodeKey(taskKey: string, artifactKey: string): string {
export function getArtifactNodeKey(taskKey: string, artifactKey: string): string {
// id is in pattern artifact.producerTaskKey.outputArtifactKey
// Because task name and artifact name cannot contain dot in python.
return ARTIFACT_NODE_KEY_PREFIX + taskKey + '.' + artifactKey;

View File

@ -19,8 +19,11 @@ import {
EXECUTION_KEY_CACHED_EXECUTION_ID,
filterLinkedArtifactsByType,
getArtifactName,
getArtifactNameFromEvent,
getContextByExecution,
getRunContext,
getArtifactsFromContext,
getEventsByExecutions,
} from 'src/mlmd/MlmdUtils';
import { expectWarnings, testBestPractices } from 'src/TestUtils';
import {
@ -34,10 +37,14 @@ import {
GetContextByTypeAndNameResponse,
} from 'src/third_party/mlmd';
import {
GetArtifactsByContextRequest,
GetArtifactsByContextResponse,
GetContextsByExecutionRequest,
GetContextsByExecutionResponse,
GetContextTypeRequest,
GetContextTypeResponse,
GetEventsByExecutionIDsRequest,
GetEventsByExecutionIDsResponse,
} from 'src/third_party/mlmd/generated/ml_metadata/proto/metadata_store_service_pb';
import { Workflow, WorkflowSpec, WorkflowStatus } from 'third_party/argo-ui/argo_template';
@ -158,6 +165,39 @@ describe('MlmdUtils', () => {
});
});
describe('getArtifactNameFromEvent', () => {
it('get the first key of steps list', () => {
const path = new Event.Path();
path.getStepsList().push(new Event.Path.Step().setKey('key1'));
path.getStepsList().push(new Event.Path.Step().setKey('key2'));
const event = new Event();
event.setPath(path);
expect(getArtifactNameFromEvent(event)).toEqual('key1');
});
});
describe('getActifactsFromContext', () => {
it('returns list of artifacts', async () => {
const context = new Context();
context.setId(2);
const artifacts = [new Artifact().setId(10), new Artifact().setId(20)];
mockGetArtifactsByContext(context, artifacts);
const artifactResult = await getArtifactsFromContext(context);
expect(artifactResult).toEqual(artifacts);
});
});
describe('getEventsByExecutions', () => {
it('returns list of events', async () => {
const executions = [new Execution().setId(1), new Execution().setId(2)];
const events = [new Event().setExecutionId(1), new Event().setExecutionId(2)];
mockGetEventsByExecutions(executions, events);
const eventsResult = await getEventsByExecutions(executions);
expect(eventsResult).toEqual(events);
});
});
describe('filterLinkedArtifactsByType', () => {
it('filter input artifacts', () => {
const artifactTypeName = 'INPUT';
@ -224,3 +264,28 @@ function mockGetContextByTypeAndName(contexts: Context[]) {
return response;
});
}
function mockGetArtifactsByContext(context: Context, artifacts: Artifact[]) {
jest
.spyOn(Api.getInstance().metadataStoreService, 'getArtifactsByContext')
.mockImplementation((req: GetArtifactsByContextRequest) => {
const response = new GetArtifactsByContextResponse();
if (req.getContextId() === context.getId()) {
response.setArtifactsList(artifacts);
}
return response;
});
}
function mockGetEventsByExecutions(executions: Execution[], events: Event[]) {
jest
.spyOn(Api.getInstance().metadataStoreService, 'getEventsByExecutionIDs')
.mockImplementation((req: GetEventsByExecutionIDsRequest) => {
const response = new GetEventsByExecutionIDsResponse();
const executionIds = executions.map(e => e.getId());
if (req.getExecutionIdsList().every((val, index) => val === executionIds[index])) {
response.setEventsList(events);
}
return response;
});
}

View File

@ -43,6 +43,7 @@ import {
GetExecutionsByContextRequest,
} from 'src/third_party/mlmd';
import {
GetArtifactsByContextRequest,
GetContextsByExecutionRequest,
GetContextsByExecutionResponse,
GetContextTypeRequest,
@ -92,7 +93,7 @@ async function getKfpRunContext(argoWorkflowName: string): Promise<Context> {
return await getContext({ name: argoWorkflowName, type: 'KfpRun' });
}
async function getKfpV2RunContext(runID: string): Promise<Context> {
export async function getKfpV2RunContext(runID: string): Promise<Context> {
return await getContext({ name: runID, type: KFP_V2_RUN_CONTEXT_TYPE });
}
@ -205,7 +206,7 @@ function getStringValue(value?: string | number | Struct | null): string | undef
return value;
}
async function getEventByExecution(execution: Execution): Promise<Event[]> {
export async function getEventByExecution(execution: Execution): Promise<Event[]> {
const executionId = execution.getId();
if (!executionId) {
throw new Error('Execution must have an ID');
@ -373,8 +374,54 @@ export function filterLinkedArtifactsByType(
}
export function getArtifactName(linkedArtifact: LinkedArtifact): string | undefined {
return linkedArtifact.event
return getArtifactNameFromEvent(linkedArtifact.event);
}
export function getArtifactNameFromEvent(event: Event): string | undefined {
return event
.getPath()
?.getStepsList()[0]
.getKey();
}
export async function getArtifactsFromContext(context: Context): Promise<Artifact[]> {
const request = new GetArtifactsByContextRequest();
request.setContextId(context.getId());
try {
const res = await Api.getInstance().metadataStoreService.getArtifactsByContext(request);
const list = res.getArtifactsList();
if (list == null) {
throw new Error('response.getExecutionsList() is empty');
}
// Display name of artifact exists in getCustomPropertiesMap().get('display_name').getStringValue().
// Note that the actual artifact name is in Event which generates this artifact.
return list;
} catch (err) {
err.message =
`Cannot find executions by context ${context.getId()} with name ${context.getName()}: ` +
err.message;
throw err;
}
}
export async function getEventsByExecutions(executions: Execution[] | undefined): Promise<Event[]> {
if (!executions) {
return [];
}
const request = new GetEventsByExecutionIDsRequest();
for (let exec of executions) {
const execId = exec.getId();
if (!execId) {
throw new Error('Execution must have an ID');
}
request.addExecutionIds(execId);
}
let response: GetEventsByExecutionIDsResponse;
try {
response = await Api.getInstance().metadataStoreService.getEventsByExecutionIDs(request);
} catch (err) {
err.message = 'Failed to getEventsByExecutionIDs: ' + err.message;
throw err;
}
return response.getEventsList();
}

View File

@ -45,7 +45,7 @@ describe('PipelineDetailsV2', () => {
></PipelineDetailsV2>
</CommonTestWrapper>,
);
expect(screen.getByTestId('StaticCanvas')).not.toBeNull();
expect(screen.getByTestId('DagCanvas')).not.toBeNull();
});
it('Render summary card', async () => {
@ -91,7 +91,7 @@ describe('PipelineDetailsV2', () => {
></PipelineDetailsV2>
</CommonTestWrapper>,
);
expect(screen.getByTestId('StaticCanvas')).not.toBeNull();
expect(screen.getByTestId('DagCanvas')).not.toBeNull();
screen.getByText('flip-coin-op');
});

View File

@ -25,7 +25,7 @@ import { StaticNodeDetailsV2 } from 'src/components/tabs/StaticNodeDetailsV2';
import { isSafari } from 'src/lib/Utils';
import { PipelineFlowElement } from 'src/lib/v2/StaticFlow';
import { commonCss, padding } from '../Css';
import StaticCanvas from './v2/StaticCanvas';
import DagCanvas from './v2/DagCanvas';
const TAB_NAMES = ['Graph', 'Pipeline Spec'];
@ -83,12 +83,13 @@ function PipelineDetailsV2({
<MD2Tabs selectedTab={selectedTab} onSwitch={setSelectedTab} tabs={TAB_NAMES} />
{selectedTab === 0 && (
<div className={commonCss.page} style={{ position: 'relative', overflow: 'hidden' }}>
<StaticCanvas
<DagCanvas
layers={layers}
onLayersUpdate={layerChange}
elements={pipelineFlowElements}
onSelectionChange={onSelectionChange}
></StaticCanvas>
setFlowElements={() => {}}
></DagCanvas>
<PipelineVersionCard
apiPipeline={apiPipeline}
selectedVersion={selectedVersion}

View File

@ -0,0 +1,84 @@
/*
* Copyright 2021 The Kubeflow Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import jsyaml from 'js-yaml';
import React from 'react';
import { useQuery } from 'react-query';
import { ApiRunDetail } from 'src/apis/run';
import { RouteParams } from 'src/components/Router';
import { FeatureKey, isFeatureEnabled } from 'src/features';
import { Apis } from 'src/lib/Apis';
import * as StaticGraphParser from 'src/lib/StaticGraphParser';
import { convertFlowElements } from 'src/lib/v2/StaticFlow';
import * as WorkflowUtils from 'src/lib/v2/WorkflowUtils';
import EnhancedRunDetails, { RunDetailsProps } from 'src/pages/RunDetails';
import { RunDetailsV2 } from 'src/pages/RunDetailsV2';
// This is a router to determine whether to show V1 or V2 run detail page.
export default function RunDetailsRouter(props: RunDetailsProps) {
const runId = props.match.params[RouteParams.runId];
// Retrieves run detail.
const { isSuccess, data } = useQuery<ApiRunDetail, Error>(
['run_detail', { id: runId }],
() => Apis.runServiceApi.getRun(runId),
{ staleTime: 30000 },
);
if (
isSuccess &&
data &&
data.run &&
data.run.pipeline_spec &&
data.run.pipeline_spec.workflow_manifest
) {
// TODO(zijianjoy): We need to switch to use pipeline_manifest for new API implementation.
const isIR = isIrPipeline(data.run.pipeline_spec.workflow_manifest);
if (isIR) {
return (
<RunDetailsV2
pipeline_job={data.run.pipeline_spec.workflow_manifest}
runId={runId}
{...props}
/>
);
}
}
return <EnhancedRunDetails {...props} />;
}
// This needs to be changed to use pipeline_manifest vs workflow_manifest to distinguish V1 and V2.
function isIrPipeline(templateString: string) {
if (!templateString) {
return false;
}
try {
const template = jsyaml.safeLoad(templateString);
if (WorkflowUtils.isArgoWorkflowTemplate(template)) {
StaticGraphParser.createGraph(template!);
return false;
} else if (isFeatureEnabled(FeatureKey.V2)) {
const pipelineSpec = WorkflowUtils.convertJsonToV2PipelineSpec(templateString);
convertFlowElements(pipelineSpec);
return true;
} else {
return false;
}
} catch (err) {
return false;
}
}

View File

@ -0,0 +1,153 @@
/*
* Copyright 2021 The Kubeflow Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { render, screen, waitFor } from '@testing-library/react';
import React from 'react';
import { RouteParams } from 'src/components/Router';
import * as v2PipelineSpec from 'src/data/test/mock_lightweight_python_functions_v2_pipeline.json';
import { Api } from 'src/mlmd/Api';
import { KFP_V2_RUN_CONTEXT_TYPE } from 'src/mlmd/MlmdUtils';
import { mockResizeObserver, testBestPractices } from 'src/TestUtils';
import { CommonTestWrapper } from 'src/TestWrapper';
import {
Context,
GetContextByTypeAndNameRequest,
GetContextByTypeAndNameResponse,
GetExecutionsByContextResponse,
} from 'src/third_party/mlmd';
import {
GetArtifactsByContextResponse,
GetEventsByExecutionIDsResponse,
} from 'src/third_party/mlmd/generated/ml_metadata/proto/metadata_store_service_pb';
import { PageProps } from './Page';
import { RunDetailsInternalProps } from './RunDetails';
import { RunDetailsV2 } from './RunDetailsV2';
testBestPractices();
describe('RunDetailsV2', () => {
const RUN_ID = '1';
let updateBannerSpy: any;
let updateDialogSpy: any;
let updateSnackbarSpy: any;
let updateToolbarSpy: any;
let historyPushSpy: any;
function generateProps(): RunDetailsInternalProps & PageProps {
const pageProps: PageProps = {
history: { push: historyPushSpy } as any,
location: '' as any,
match: {
params: {
[RouteParams.runId]: RUN_ID,
},
isExact: true,
path: '',
url: '',
},
toolbarProps: { actions: {}, breadcrumbs: [], pageTitle: '' },
updateBanner: updateBannerSpy,
updateDialog: updateDialogSpy,
updateSnackbar: updateSnackbarSpy,
updateToolbar: updateToolbarSpy,
};
return Object.assign(pageProps, {
gkeMetadata: {},
});
}
beforeEach(() => {
mockResizeObserver();
updateBannerSpy = jest.fn();
});
it('Render detail page with reactflow', async () => {
render(
<CommonTestWrapper>
<RunDetailsV2
pipeline_job={JSON.stringify(v2PipelineSpec)}
runId={RUN_ID}
{...generateProps()}
></RunDetailsV2>
</CommonTestWrapper>,
);
expect(screen.getByTestId('DagCanvas')).not.toBeNull();
});
it('Shows error banner when disconnected from MLMD', async () => {
jest
.spyOn(Api.getInstance().metadataStoreService, 'getContextByTypeAndName')
.mockRejectedValue(new Error('Not connected to MLMD'));
render(
<CommonTestWrapper>
<RunDetailsV2
pipeline_job={JSON.stringify(v2PipelineSpec)}
runId={RUN_ID}
{...generateProps()}
></RunDetailsV2>
</CommonTestWrapper>,
);
await waitFor(() =>
expect(updateBannerSpy).toHaveBeenLastCalledWith(
expect.objectContaining({
additionalInfo:
'Cannot find context with {"typeName":"system.PipelineRun","contextName":"1"}: Not connected to MLMD',
message: 'Cannot get MLMD objects from Metadata store.',
mode: 'error',
}),
),
);
});
it('Shows no banner when connected from MLMD', async () => {
jest
.spyOn(Api.getInstance().metadataStoreService, 'getContextByTypeAndName')
.mockImplementation((request: GetContextByTypeAndNameRequest) => {
const response = new GetContextByTypeAndNameResponse();
if (
request.getTypeName() === KFP_V2_RUN_CONTEXT_TYPE &&
request.getContextName() === RUN_ID
) {
response.setContext(new Context());
}
return response;
});
jest
.spyOn(Api.getInstance().metadataStoreService, 'getExecutionsByContext')
.mockResolvedValue(new GetExecutionsByContextResponse());
jest
.spyOn(Api.getInstance().metadataStoreService, 'getArtifactsByContext')
.mockResolvedValue(new GetArtifactsByContextResponse());
jest
.spyOn(Api.getInstance().metadataStoreService, 'getEventsByExecutionIDs')
.mockResolvedValue(new GetEventsByExecutionIDsResponse());
render(
<CommonTestWrapper>
<RunDetailsV2
pipeline_job={JSON.stringify(v2PipelineSpec)}
runId={RUN_ID}
{...generateProps()}
></RunDetailsV2>
</CommonTestWrapper>,
);
await waitFor(() => expect(updateBannerSpy).toHaveBeenLastCalledWith({}));
});
});

View File

@ -0,0 +1,136 @@
// Copyright 2021 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import * as React from 'react';
import { useState } from 'react';
import { Elements, FlowElement } from 'react-flow-renderer';
import { useQuery } from 'react-query';
import MD2Tabs from 'src/atoms/MD2Tabs';
import { FlowElementDataBase } from 'src/components/graph/Constants';
import SidePanel from 'src/components/SidePanel';
import { commonCss, padding } from 'src/Css';
import { updateFlowElementsState } from 'src/lib/v2/DynamicFlow';
import { convertFlowElements } from 'src/lib/v2/StaticFlow';
import * as WorkflowUtils from 'src/lib/v2/WorkflowUtils';
import {
getArtifactsFromContext,
getEventsByExecutions,
getExecutionsFromContext,
getKfpV2RunContext,
} from 'src/mlmd/MlmdUtils';
import { Artifact, Event, Execution } from 'src/third_party/mlmd';
import { classes } from 'typestyle';
import { RunDetailsProps } from './RunDetails';
import DagCanvas from './v2/DagCanvas';
interface MlmdPackage {
executions: Execution[];
artifacts: Artifact[];
events: Event[];
}
interface RunDetailsV2Info {
pipeline_job: string;
runId: string;
}
export type RunDetailsV2Props = RunDetailsV2Info & RunDetailsProps;
export function RunDetailsV2(props: RunDetailsV2Props) {
const pipelineJobStr = props.pipeline_job;
const pipelineSpec = WorkflowUtils.convertJsonToV2PipelineSpec(pipelineJobStr);
const elements = convertFlowElements(pipelineSpec);
const [flowElements, setFlowElements] = useState(elements);
const [layers, setLayers] = useState(['root']);
const [selectedTab, setSelectedTab] = useState(0);
const [selectedNode, setSelectedNode] = useState<FlowElement<FlowElementDataBase> | null>(null);
// TODO(zijianjoy): Update elements and states when layers change.
const layerChange = (layers: string[]) => {
setSelectedNode(null);
setLayers(layers);
};
const onSelectionChange = (elements: Elements<FlowElementDataBase> | null) => {
if (!elements || elements?.length === 0) {
setSelectedNode(null);
return;
}
if (elements && elements.length === 1) {
setSelectedNode(elements[0]);
}
};
const getNodeName = function(element: FlowElement<FlowElementDataBase> | null): string {
if (element && element.data && element.data.label) {
return element.data.label;
}
return 'unknown';
};
// Retrieves MLMD states from the MLMD store.
const { isSuccess, data } = useQuery<MlmdPackage, Error>(
['mlmd_package', { id: props.runId }],
async () => {
const context = await getKfpV2RunContext(props.runId);
const executions = await getExecutionsFromContext(context);
const artifacts = await getArtifactsFromContext(context);
const events = await getEventsByExecutions(executions);
return { executions, artifacts, events };
},
{
staleTime: 10000,
onError: error =>
props.updateBanner({
message: 'Cannot get MLMD objects from Metadata store.',
additionalInfo: error.message,
mode: 'error',
}),
onSuccess: () => props.updateBanner({}),
},
);
if (isSuccess && data && data.executions && data.events && data.artifacts) {
updateFlowElementsState(flowElements, data.executions, data.events, data.artifacts);
}
return (
<div className={classes(commonCss.page, padding(20, 't'))}>
<MD2Tabs selectedTab={selectedTab} tabs={['Graph', 'Detail']} onSwitch={setSelectedTab} />
{selectedTab === 0 && (
<div className={commonCss.page} style={{ position: 'relative', overflow: 'hidden' }}>
<DagCanvas
layers={layers}
onLayersUpdate={layerChange}
elements={flowElements}
onSelectionChange={onSelectionChange}
setFlowElements={elems => setFlowElements(elems)}
></DagCanvas>
<div className='z-20'>
<SidePanel
isOpen={!!selectedNode}
title={getNodeName(selectedNode)}
onClose={() => onSelectionChange(null)}
defaultWidth={'50%'}
></SidePanel>
</div>
</div>
)}
</div>
);
}

View File

@ -28,19 +28,21 @@ import SubDagLayer from 'src/components/graph/SubDagLayer';
import { color } from 'src/Css';
import { getTaskKeyFromNodeKey, NodeTypeNames, NODE_TYPES } from 'src/lib/v2/StaticFlow';
export interface StaticCanvasProps {
export interface DagCanvasProps {
elements: Elements<FlowElementDataBase>;
setFlowElements: (elements: Elements<any>) => void;
onSelectionChange: (elements: Elements<any> | null) => void;
layers: string[];
onLayersUpdate: (layers: string[]) => void;
onSelectionChange: (elements: Elements<any> | null) => void;
}
const StaticCanvas = ({
export default function DagCanvas({
elements,
layers,
onLayersUpdate,
onSelectionChange,
}: StaticCanvasProps) => {
setFlowElements,
}: DagCanvasProps) {
const onLoad = (reactFlowInstance: OnLoadParams) => {
reactFlowInstance.fitView();
};
@ -63,7 +65,7 @@ const StaticCanvas = ({
return (
<>
<SubDagLayer layers={layers} onLayersUpdate={onLayersUpdate}></SubDagLayer>
<div data-testid='StaticCanvas' style={{ width: '100%', height: '100%' }}>
<div data-testid='DagCanvas' style={{ width: '100%', height: '100%' }}>
<ReactFlowProvider>
<ReactFlow
style={{ background: color.lightGrey }}
@ -73,6 +75,16 @@ const StaticCanvas = ({
nodeTypes={NODE_TYPES}
edgeTypes={{}}
onSelectionChange={onSelectionChange}
onNodeDragStop={(event, node) => {
setFlowElements(
elements.map(value => {
if (value.id === node.id) {
return node;
}
return value;
}),
);
}}
>
<MiniMap />
<Controls />
@ -82,5 +94,4 @@ const StaticCanvas = ({
</div>
</>
);
};
export default StaticCanvas;
}