fix(frontend): fix parsing large workflow graph. Fixes #4179 (#4180)

* frontend: add large pipeline example to mocked data

* add util function decodeCompressedNodes

* decode compressedNodes in workflow if node is empty and compressedNodes exit

* fix decodeCompressedNodes

* prettify code

* Frontend:test Utils decodeCompressedNodes

* test RunDetails page render compressed grah

* add util function decodeCompressedNodes

* Frontend:test Utils decodeCompressedNodes

* test RunDetails page render compressed grah

* reformat code

* update jest snapshot to add compressed node case

* fix tests

* reformat code

* update RunDetails snapshot

* remove dupplicate test code

* RunDetails: remove compressedNodes after decoding it

* reformat decodeCompressedNodes + add failure case test

* decodeCompressedNodes returns undefined on error

* RunDetails decodeCompressedNodes test: snapshot same as single node graph

* do not wait for decodeCompressedNodes + debug print workflow

* fix Run load + refresh snapshot

* format code

* Fix one-node compressed workflow graph + update snapshot

* reformat code

* rename large pipeline name

* fix decompressNodes to work in browser

* fix test

* fix tests

* remove some of the console.log

* clean code

* address comments

* address comments: wait for zlib instead of mocking decodeCompressedNodes

* address comments: decodeCompressedNodes reject in case of error + catch error on run load

* address comments
This commit is contained in:
radcheb 2020-10-16 16:27:26 +02:00 committed by GitHub
parent 5742991c1a
commit 02b0899dd9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 12788 additions and 4 deletions

View File

@ -22,6 +22,7 @@ import jsonRun from './json-runtime';
import coinflipRun from './mock-coinflip-runtime';
import errorRun from './mock-error-runtime';
import xgboostRun from './mock-xgboost-runtime';
import largeGraph from './large-graph-runtime';
import retryRun from './mock-retry-runtime';
function padStartTwoZeroes(str: string): string {
@ -641,6 +642,32 @@ const runs: ApiRunDetail[] = [
status: 'Succeeded',
},
},
{
pipeline_runtime: {
workflow_manifest: JSON.stringify(largeGraph),
},
run: {
created_at: new Date('2020-07-08T10:03:37.000Z'),
description: 'large pipelien with a lot of nodes.',
finished_at: new Date('2020-07-08T10:39:43.000Z'),
id: '808ecf03-ee3b-48c6-9fa1-5f14ad11a3f8',
name: 'Very large graph',
pipeline_spec: {
workflow_manifest: JSON.stringify(largeGraph),
},
resource_references: [
{
key: {
id: 'a4d4f8c6-ce9c-4200-a92e-c48ec759b733',
type: ApiResourceType.EXPERIMENT,
},
relationship: ApiRelationship.OWNER,
},
],
scheduled_at: new Date('1970-01-01T00:00:00.000Z'),
status: 'Succeeded',
},
},
{
pipeline_runtime: {
workflow_manifest: JSON.stringify(retryRun),

File diff suppressed because one or more lines are too long

View File

@ -16,6 +16,7 @@
import { NodePhase } from './StatusUtils';
import {
decodeCompressedNodes,
enabledDisplayString,
formatDateString,
generateMinioArtifactUrl,
@ -262,4 +263,24 @@ describe('Utils', () => {
);
});
});
describe('decodeCompressedNodes', () => {
it('decompress encoded gzipped json', async () => {
let compressedNodes =
'H4sIAAAAAAACE6tWystPSS1WslKIrlbKS8xNBbLAQoZKOgpKmSlArmFtbC0A+U7xAicAAAA=';
expect(decodeCompressedNodes(compressedNodes)).resolves.toEqual({
nodes: [{ name: 'node1', id: 1 }],
});
compressedNodes = 'H4sIAAAAAAACE6tWystPSTVUslKoVspMAVJQfm0tAEBEv1kaAAAA';
expect(decodeCompressedNodes(compressedNodes)).resolves.toEqual({ node1: { id: 'node1' } });
});
it('raise exception if failed to decompress data', async () => {
let compressedNodes = 'I4sIAAAAAAACE6tWystPSS1WslKIrlxNBbLAQoZKOgpKmSlArmFtbC0A+U7xAicAAAA=';
await expect(decodeCompressedNodes(compressedNodes)).rejects.toEqual(
'failed to gunzip data Error: incorrect header check',
);
});
});
});

View File

@ -15,6 +15,7 @@
*/
import * as React from 'react';
import * as zlib from 'zlib';
import { ApiRun } from '../apis/run';
import { ApiTrigger } from '../apis/job';
import { Workflow } from '../../third_party/argo-ui/argo_template';
@ -358,3 +359,20 @@ export function buildQuery(queriesMap: { [key: string]: string | number | undefi
}
return `?${queryContent}`;
}
export async function decodeCompressedNodes(compressedNodes: string): Promise<object> {
return new Promise<object>((resolve, reject) => {
const compressedBuffer = Buffer.from(compressedNodes, 'base64');
zlib.gunzip(compressedBuffer, (error, result: Buffer) => {
if (error) {
const gz_error_msg = `failed to gunzip data ${error}`;
logger.error(gz_error_msg);
reject(gz_error_msg);
} else {
const nodesStr = result.toString('utf8');
const nodes = JSON.parse(nodesStr);
resolve(nodes);
}
});
});
}

View File

@ -13,7 +13,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import { Api, GetArtifactTypesResponse } from '@kubeflow/frontend';
import { render } from '@testing-library/react';
import * as dagre from 'dagre';
@ -654,6 +653,50 @@ describe('RunDetails', () => {
`);
});
it('shows a one-node compressed workflow graph', async () => {
testRun.pipeline_runtime!.workflow_manifest = JSON.stringify({
...WORKFLOW_TEMPLATE,
status: { compressedNodes: 'H4sIAAAAAAACE6tWystPSTVUslKoVspMAVJQfm0tAEBEv1kaAAAA' },
});
const { getByTestId } = render(<RunDetails {...generateProps()} />);
await getRunSpy;
await TestUtils.flushPromises();
jest.useRealTimers();
await new Promise(resolve => setTimeout(resolve, 500));
jest.useFakeTimers();
expect(getByTestId('graph')).toMatchInlineSnapshot(`
<pre
data-testid="graph"
>
Node node1
Node node1-running-placeholder
Edge node1 to node1-running-placeholder
</pre>
`);
});
it('shows a empty workflow graph if compressedNodes corrupt', async () => {
testRun.pipeline_runtime!.workflow_manifest = JSON.stringify({
...WORKFLOW_TEMPLATE,
status: { compressedNodes: 'Y29ycnVwdF9kYXRh' },
});
const { queryAllByTestId } = render(<RunDetails {...generateProps()} />);
await getRunSpy;
await TestUtils.flushPromises();
jest.useRealTimers();
await new Promise(resolve => setTimeout(resolve, 500));
jest.useFakeTimers();
expect(queryAllByTestId('graph')).toEqual([]);
});
it('opens side panel when graph node is clicked', async () => {
testRun.pipeline_runtime!.workflow_manifest = JSON.stringify({
status: { nodes: { node1: { id: 'node1' } } },

View File

@ -71,6 +71,7 @@ import {
getRunDurationFromWorkflow,
logger,
serviceErrorToString,
decodeCompressedNodes,
} from '../lib/Utils';
import WorkflowParser from '../lib/WorkflowParser';
import { ExecutionDetailsContent } from './ExecutionDetails';
@ -669,9 +670,23 @@ class RunDetails extends Page<RunDetailsInternalProps, RunDetailsState> {
runFinished = true;
}
const workflow = JSON.parse(
runDetail.pipeline_runtime!.workflow_manifest || '{}',
) as Workflow;
const jsonWorkflow = JSON.parse(runDetail.pipeline_runtime!.workflow_manifest || '{}');
if (
jsonWorkflow.status &&
!jsonWorkflow.status.nodes &&
jsonWorkflow.status.compressedNodes
) {
try {
jsonWorkflow.status.nodes = await decodeCompressedNodes(
jsonWorkflow.status.compressedNodes,
);
delete jsonWorkflow.status.compressedNodes;
} catch (err) {
console.error(`Failed to decode compressedNodes: ${err}`);
}
}
const workflow = jsonWorkflow as Workflow;
// Show workflow errors
const workflowError = WorkflowParser.getWorkflowError(workflow);