feat(frontend): support tensorboard viewer and other visualize Results using volume mount path. Part of #4208 (#4236)

* support local file storage type for local volume mount path, refer: https://github.com/kubeflow/pipelines/issues/4208

* add todo comment to support directory and filePath include wildcards '*', detail refer: https://github.com/kubeflow/pipelines/issues/4208

* revert old code indent

* run 'npm run format' to format code

* support tensorboard viewer and other visualize Results using volume mount path, modify 'file' schema to 'volume':

1. source schema: volume://volume-name/relative/path/from/volume/xxx.csv
2. for tensorboard(also support Series1:volume://volume-name/path_to_model_dir_1,Series2:volume://volume-name/path_to_model_dir_2):
* check volume-name was specified in podTemplateSpec( which was inject by VIEWER_TENSORBOARD_POD_TEMPLATE_SPEC_PATH env)
* check /relative/path/from/volume/xxx file path was prefix-mounted in podTemplateSpec
3. for others:
* check volume-name was specified at ml-pipeline-ui pod
* check /relative/path/from/volume/xxx.csv file path exist

* fix test and add more tests

* change error message not found to not exist.

* fix tensorboard create test

* combining volume mount path and key as artifacts path

* extra complex code to a function and add more test

* use ml-pipeline-ui container name to find server container instead of use containers[0]

* fix review suggestion: https://github.com/kubeflow/pipelines/pull/4236

* format code

* extract how to find file path on a pod volume to a common function, and optimize error message

* fix k8s-helper.test error

* add more documentation and fix mistake: volumeMountPath to filePathInVolume

* fix test error

* Update k8s-helper.test.ts

* format error message

Co-authored-by: Yuan (Bob) Gong <gongyuan94@gmail.com>
This commit is contained in:
haibingzhao 2020-08-06 16:06:55 +08:00 committed by GitHub
parent 3f9ce57a81
commit 7f77e8bc7f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 1183 additions and 12 deletions

View File

@ -23,8 +23,12 @@ import { UIServer } from './app';
import { loadConfigs } from './configs';
import * as minioHelper from './minio-helper';
import { TEST_ONLY as K8S_TEST_EXPORT } from './k8s-helper';
import * as serverInfo from './helpers/server-info';
import { Server } from 'http';
import { commonSetup } from './integration-tests/test-helper';
import * as fs from 'fs';
import * as path from 'path';
import * as os from 'os';
jest.mock('minio');
jest.mock('node-fetch');
@ -414,6 +418,230 @@ describe('UIServer apis', () => {
.get('/artifacts/get?source=gcs&bucket=ml-pipeline&key=hello%2Fworld.txt&peek=5')
.expect(200, artifactContent.slice(0, 5), done);
});
it('responds with a volume artifact if source=volume', done => {
const artifactContent = 'hello world';
const tempPath = path.join(fs.mkdtempSync(os.tmpdir()), 'content');
fs.writeFileSync(tempPath, artifactContent);
jest.spyOn(serverInfo, 'getHostPod').mockImplementation(() =>
Promise.resolve([
{
spec: {
containers: [
{
volumeMounts: [
{
name: 'artifact',
mountPath: path.dirname(tempPath),
subPath: 'subartifact',
},
],
name: 'ml-pipeline-ui',
},
],
volumes: [
{
name: 'artifact',
persistentVolumeClaim: {
claimName: 'artifact_pvc',
},
},
],
},
} as any,
undefined,
]),
);
const configs = loadConfigs(argv, {});
app = new UIServer(configs);
const request = requests(app.start());
request
.get('/artifacts/get?source=volume&bucket=artifact&key=subartifact/content')
.expect(200, artifactContent, done);
});
it('responds with a partial volume artifact if peek=5 is set', done => {
const artifactContent = 'hello world';
const tempPath = path.join(fs.mkdtempSync(os.tmpdir()), 'content');
fs.writeFileSync(tempPath, artifactContent);
jest.spyOn(serverInfo, 'getHostPod').mockImplementation(() =>
Promise.resolve([
{
spec: {
containers: [
{
volumeMounts: [
{
name: 'artifact',
mountPath: path.dirname(tempPath),
},
],
name: 'ml-pipeline-ui',
},
],
volumes: [
{
name: 'artifact',
persistentVolumeClaim: {
claimName: 'artifact_pvc',
},
},
],
},
} as any,
undefined,
]),
);
const configs = loadConfigs(argv, {});
app = new UIServer(configs);
const request = requests(app.start());
request
.get(`/artifacts/get?source=volume&bucket=artifact&key=content&peek=5`)
.expect(200, artifactContent.slice(0, 5), done);
});
it('responds error with a not exist volume', done => {
jest.spyOn(serverInfo, 'getHostPod').mockImplementation(() =>
Promise.resolve([
{
metadata: {
name: 'ml-pipeline-ui',
},
spec: {
containers: [
{
volumeMounts: [
{
name: 'artifact',
mountPath: '/foo/bar/path',
},
],
name: 'ml-pipeline-ui',
},
],
volumes: [
{
name: 'artifact',
persistentVolumeClaim: {
claimName: 'artifact_pvc',
},
},
],
},
} as any,
undefined,
]),
);
const configs = loadConfigs(argv, {});
app = new UIServer(configs);
const request = requests(app.start());
request
.get(`/artifacts/get?source=volume&bucket=notexist&key=content`)
.expect(
404,
'Failed to open volume://notexist/content, Cannot find file "volume://notexist/content" in pod "ml-pipeline-ui": volume "notexist" not configured',
done,
);
});
it('responds error with a not exist volume mount path if source=volume', done => {
jest.spyOn(serverInfo, 'getHostPod').mockImplementation(() =>
Promise.resolve([
{
metadata: {
name: 'ml-pipeline-ui',
},
spec: {
containers: [
{
volumeMounts: [
{
name: 'artifact',
mountPath: '/foo/bar/path',
subPath: 'subartifact',
},
],
name: 'ml-pipeline-ui',
},
],
volumes: [
{
name: 'artifact',
persistentVolumeClaim: {
claimName: 'artifact_pvc',
},
},
],
},
} as any,
undefined,
]),
);
const configs = loadConfigs(argv, {});
app = new UIServer(configs);
const request = requests(app.start());
request
.get(`/artifacts/get?source=volume&bucket=artifact&key=notexist/config`)
.expect(
404,
'Failed to open volume://artifact/notexist/config, Cannot find file "volume://artifact/notexist/config" in pod "ml-pipeline-ui": volume "artifact" not mounted or volume "artifact" with subPath (which is prefix of notexist/config) not mounted',
done,
);
});
it('responds error with a not exist volume mount artifact if source=volume', done => {
jest.spyOn(serverInfo, 'getHostPod').mockImplementation(() =>
Promise.resolve([
{
spec: {
containers: [
{
volumeMounts: [
{
name: 'artifact',
mountPath: '/foo/bar',
subPath: 'subartifact',
},
],
name: 'ml-pipeline-ui',
},
],
volumes: [
{
name: 'artifact',
persistentVolumeClaim: {
claimName: 'artifact_pvc',
},
},
],
},
} as any,
undefined,
]),
);
const configs = loadConfigs(argv, {});
app = new UIServer(configs);
const request = requests(app.start());
request
.get(`/artifacts/get?source=volume&bucket=artifact&key=subartifact/notxist.csv`)
.expect(
500,
"Failed to open volume://artifact/subartifact/notxist.csv: Error: ENOENT: no such file or directory, stat '/foo/bar/notxist.csv'",
done,
);
});
});
describe('/system', () => {
@ -601,6 +829,40 @@ describe('UIServer apis', () => {
});
describe('/apps/tensorboard', () => {
const POD_TEMPLATE_SPEC = {
spec: {
containers: [
{
volumeMounts: [
{
name: 'tensorboard',
mountPath: '/logs',
},
{
name: 'data',
subPath: 'tensorboard',
mountPath: '/data',
},
],
},
],
volumes: [
{
name: 'tensorboard',
persistentVolumeClaim: {
claimName: 'logs',
},
},
{
name: 'data',
persistentVolumeClaim: {
claimName: 'data',
},
},
],
},
};
let k8sGetCustomObjectSpy: jest.SpyInstance;
let k8sDeleteCustomObjectSpy: jest.SpyInstance;
let k8sCreateCustomObjectSpy: jest.SpyInstance;
@ -933,6 +1195,298 @@ describe('UIServer apis', () => {
);
});
it('creates tensorboard viewer with exist volume', done => {
let getRequestCount = 0;
k8sGetCustomObjectSpy.mockImplementation(() => {
++getRequestCount;
switch (getRequestCount) {
case 1:
return Promise.reject('Not found');
case 2:
return Promise.resolve(
newGetTensorboardResponse({
name: 'viewer-abcdefg',
logDir: 'Series1:/logs/log-dir-1,Series2:/logs/log-dir-2',
tensorflowImage: 'tensorflow:2.0.0',
}),
);
default:
throw new Error('only expected to be called twice in this test');
}
});
k8sCreateCustomObjectSpy.mockImplementation(() => Promise.resolve());
const tempPath = path.join(fs.mkdtempSync(os.tmpdir()), 'config.json');
fs.writeFileSync(tempPath, JSON.stringify(POD_TEMPLATE_SPEC));
app = new UIServer(
loadConfigs(argv, { VIEWER_TENSORBOARD_POD_TEMPLATE_SPEC_PATH: tempPath }),
);
requests(app.start())
.post(
`/apps/tensorboard?logdir=${encodeURIComponent(
'Series1:volume://tensorboard/log-dir-1,Series2:volume://tensorboard/log-dir-2',
)}&namespace=test-ns&tfversion=2.0.0`,
)
.expect(
200,
'http://viewer-abcdefg-service.test-ns.svc.cluster.local:80/tensorboard/viewer-abcdefg/',
err => {
expect(k8sGetCustomObjectSpy.mock.calls[0]).toMatchInlineSnapshot(`
Array [
"kubeflow.org",
"v1beta1",
"test-ns",
"viewers",
"viewer-a800f945f0934d978f9cce9959b82ff44dac8493",
]
`);
expect(k8sCreateCustomObjectSpy.mock.calls[0]).toMatchInlineSnapshot(`
Array [
"kubeflow.org",
"v1beta1",
"test-ns",
"viewers",
Object {
"apiVersion": "kubeflow.org/v1beta1",
"kind": "Viewer",
"metadata": Object {
"name": "viewer-a800f945f0934d978f9cce9959b82ff44dac8493",
"namespace": "test-ns",
},
"spec": Object {
"podTemplateSpec": Object {
"spec": Object {
"containers": Array [
Object {
"volumeMounts": Array [
Object {
"mountPath": "/logs",
"name": "tensorboard",
},
Object {
"mountPath": "/data",
"name": "data",
"subPath": "tensorboard",
},
],
},
],
"volumes": Array [
Object {
"name": "tensorboard",
"persistentVolumeClaim": Object {
"claimName": "logs",
},
},
Object {
"name": "data",
"persistentVolumeClaim": Object {
"claimName": "data",
},
},
],
},
},
"tensorboardSpec": Object {
"logDir": "Series1:/logs/log-dir-1,Series2:/logs/log-dir-2",
"tensorflowImage": "tensorflow/tensorflow:2.0.0",
},
"type": "tensorboard",
},
},
]
`);
expect(k8sGetCustomObjectSpy.mock.calls[1]).toMatchInlineSnapshot(`
Array [
"kubeflow.org",
"v1beta1",
"test-ns",
"viewers",
"viewer-a800f945f0934d978f9cce9959b82ff44dac8493",
]
`);
done(err);
},
);
});
it('creates tensorboard viewer with exist subPath volume', done => {
let getRequestCount = 0;
k8sGetCustomObjectSpy.mockImplementation(() => {
++getRequestCount;
switch (getRequestCount) {
case 1:
return Promise.reject('Not found');
case 2:
return Promise.resolve(
newGetTensorboardResponse({
name: 'viewer-abcdefg',
logDir: 'Series1:/data/log-dir-1,Series2:/data/log-dir-2',
tensorflowImage: 'tensorflow:2.0.0',
}),
);
default:
throw new Error('only expected to be called twice in this test');
}
});
k8sCreateCustomObjectSpy.mockImplementation(() => Promise.resolve());
const tempPath = path.join(fs.mkdtempSync(os.tmpdir()), 'config.json');
fs.writeFileSync(tempPath, JSON.stringify(POD_TEMPLATE_SPEC));
app = new UIServer(
loadConfigs(argv, { VIEWER_TENSORBOARD_POD_TEMPLATE_SPEC_PATH: tempPath }),
);
requests(app.start())
.post(
`/apps/tensorboard?logdir=${encodeURIComponent(
'Series1:volume://data/tensorboard/log-dir-1,Series2:volume://data/tensorboard/log-dir-2',
)}&namespace=test-ns&tfversion=2.0.0`,
)
.expect(
200,
'http://viewer-abcdefg-service.test-ns.svc.cluster.local:80/tensorboard/viewer-abcdefg/',
err => {
expect(k8sGetCustomObjectSpy.mock.calls[0]).toMatchInlineSnapshot(`
Array [
"kubeflow.org",
"v1beta1",
"test-ns",
"viewers",
"viewer-82d7d06a6ecb1e4dcba66d06b884d6445a88e4ca",
]
`);
expect(k8sCreateCustomObjectSpy.mock.calls[0]).toMatchInlineSnapshot(`
Array [
"kubeflow.org",
"v1beta1",
"test-ns",
"viewers",
Object {
"apiVersion": "kubeflow.org/v1beta1",
"kind": "Viewer",
"metadata": Object {
"name": "viewer-82d7d06a6ecb1e4dcba66d06b884d6445a88e4ca",
"namespace": "test-ns",
},
"spec": Object {
"podTemplateSpec": Object {
"spec": Object {
"containers": Array [
Object {
"volumeMounts": Array [
Object {
"mountPath": "/logs",
"name": "tensorboard",
},
Object {
"mountPath": "/data",
"name": "data",
"subPath": "tensorboard",
},
],
},
],
"volumes": Array [
Object {
"name": "tensorboard",
"persistentVolumeClaim": Object {
"claimName": "logs",
},
},
Object {
"name": "data",
"persistentVolumeClaim": Object {
"claimName": "data",
},
},
],
},
},
"tensorboardSpec": Object {
"logDir": "Series1:/data/log-dir-1,Series2:/data/log-dir-2",
"tensorflowImage": "tensorflow/tensorflow:2.0.0",
},
"type": "tensorboard",
},
},
]
`);
expect(k8sGetCustomObjectSpy.mock.calls[1]).toMatchInlineSnapshot(`
Array [
"kubeflow.org",
"v1beta1",
"test-ns",
"viewers",
"viewer-82d7d06a6ecb1e4dcba66d06b884d6445a88e4ca",
]
`);
done(err);
},
);
});
it('creates tensorboard viewer with not exist volume and return error', done => {
const errorSpy = jest.spyOn(console, 'error');
errorSpy.mockImplementation();
k8sGetCustomObjectSpy.mockImplementation(() => {
return Promise.reject('Not found');
});
const tempPath = path.join(fs.mkdtempSync(os.tmpdir()), 'config.json');
fs.writeFileSync(tempPath, JSON.stringify(POD_TEMPLATE_SPEC));
app = new UIServer(
loadConfigs(argv, { VIEWER_TENSORBOARD_POD_TEMPLATE_SPEC_PATH: tempPath }),
);
requests(app.start())
.post(
`/apps/tensorboard?logdir=${encodeURIComponent(
'volume://notexistvolume/logs/log-dir-1',
)}&namespace=test-ns&tfversion=2.0.0`,
)
.expect(
500,
`Failed to start Tensorboard app: Cannot find file "volume://notexistvolume/logs/log-dir-1" in pod "unknown": volume "notexistvolume" not configured`,
err => {
expect(errorSpy).toHaveBeenCalledTimes(1);
done(err);
},
);
});
it('creates tensorboard viewer with not exist subPath volume mount and return error', done => {
const errorSpy = jest.spyOn(console, 'error');
errorSpy.mockImplementation();
k8sGetCustomObjectSpy.mockImplementation(() => {
return Promise.reject('Not found');
});
const tempPath = path.join(fs.mkdtempSync(os.tmpdir()), 'config.json');
fs.writeFileSync(tempPath, JSON.stringify(POD_TEMPLATE_SPEC));
app = new UIServer(
loadConfigs(argv, { VIEWER_TENSORBOARD_POD_TEMPLATE_SPEC_PATH: tempPath }),
);
requests(app.start())
.post(
`/apps/tensorboard?logdir=${encodeURIComponent(
'volume://data/notexit/mountnotexist/log-dir-1',
)}&namespace=test-ns&tfversion=2.0.0`,
)
.expect(
500,
`Failed to start Tensorboard app: Cannot find file "volume://data/notexit/mountnotexist/log-dir-1" in pod "unknown": volume "data" not mounted or volume "data" with subPath (which is prefix of notexit/mountnotexist/log-dir-1) not mounted`,
err => {
expect(errorSpy).toHaveBeenCalledTimes(1);
done(err);
},
);
});
it('returns error when there is an existing tensorboard with different version', done => {
const errorSpy = jest.spyOn(console, 'error');
errorSpy.mockImplementation();

View File

@ -14,20 +14,24 @@
import fetch from 'node-fetch';
import { AWSConfigs, HttpConfigs, MinioConfigs, ProcessEnv } from '../configs';
import { Client as MinioClient } from 'minio';
import { PreviewStream } from '../utils';
import { PreviewStream, findFileOnPodVolume } from '../utils';
import { createMinioClient, getObjectStream } from '../minio-helper';
import * as serverInfo from '../helpers/server-info';
import { Handler, Request, Response } from 'express';
import { Storage } from '@google-cloud/storage';
import proxy from 'http-proxy-middleware';
import { HACK_FIX_HPM_PARTIAL_RESPONSE_HEADERS } from '../consts';
import * as fs from 'fs';
import { V1Container } from '@kubernetes/client-node/dist/api';
/**
* ArtifactsQueryStrings describes the expected query strings key value pairs
* in the artifact request object.
*/
interface ArtifactsQueryStrings {
/** artifact source. */
source: 'minio' | 's3' | 'gcs' | 'http' | 'https';
source: 'minio' | 's3' | 'gcs' | 'http' | 'https' | 'volume';
/** bucket name. */
bucket: string;
/** artifact key/path that is uri encoded. */
@ -101,6 +105,16 @@ export function getArtifactsHandler(artifactsConfigs: {
)(req, res);
break;
case 'volume':
await getVolumeArtifactsHandler(
{
bucket,
key,
},
peek,
)(req, res);
break;
default:
res.status(500).send('Unknown storage source: ' + source);
return;
@ -240,6 +254,53 @@ function getGCSArtifactHandler(options: { key: string; bucket: string }, peek: n
};
}
function getVolumeArtifactsHandler(options: { bucket: string; key: string }, peek: number = 0) {
const { key, bucket } = options;
return async (req: Request, res: Response) => {
try {
const [pod, err] = await serverInfo.getHostPod();
if (err) {
res.status(500).send(err);
return;
}
if (!pod) {
res.status(500).send('Could not get server pod');
return;
}
// ml-pipeline-ui server container name also be called 'ml-pipeline-ui-artifact' in KFP multi user mode.
// https://github.com/kubeflow/manifests/blob/master/pipeline/installs/multi-user/pipelines-profile-controller/sync.py#L212
const [filePath, parseError] = findFileOnPodVolume(pod, {
containerNames: ['ml-pipeline-ui', 'ml-pipeline-ui-artifact'],
volumeMountName: bucket,
filePathInVolume: key,
});
if (parseError) {
res.status(404).send(`Failed to open volume://${bucket}/${key}, ${parseError}`);
return;
}
// TODO: support directory and support filePath include wildcards '*'
const stat = await fs.promises.stat(filePath);
if (stat.isDirectory()) {
res
.status(400)
.send(
`Failed to open volume://${bucket}/${key}, file ${filePath} is directory, does not support now`,
);
return;
}
fs.createReadStream(filePath)
.pipe(new PreviewStream({ peek }))
.pipe(res);
} catch (err) {
res.status(500).send(`Failed to open volume://${bucket}/${key}: ${err}`);
}
};
}
const ARTIFACTS_PROXY_DEFAULTS = {
serviceName: 'ml-pipeline-ui-artifact',
servicePort: '80',

View File

@ -0,0 +1,55 @@
// Copyright 2019-2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import * as fs from 'fs';
import { V1Pod } from '@kubernetes/client-node';
import { getPod } from '../k8s-helper';
const namespaceFilePath = '/var/run/secrets/kubernetes.io/serviceaccount/namespace';
let serverNamespace: string | undefined;
let hostPod: V1Pod | undefined;
// The file path contains pod namespace when in Kubernetes cluster.
if (fs.existsSync(namespaceFilePath)) {
serverNamespace = fs.readFileSync(namespaceFilePath, 'utf-8');
}
// get ml-pipeline-ui host pod
export async function getHostPod(): Promise<[V1Pod | undefined, undefined] | [undefined, string]> {
// use cached hostPod
if (hostPod) {
return [hostPod, undefined];
}
if (!serverNamespace) {
return [undefined, "server namespace can't be obtained"];
}
// get ml-pipeline-ui server pod name
const { HOSTNAME: POD_NAME } = process.env;
if (!POD_NAME) {
return [undefined, "server pod name can't be obtained"];
}
const [pod, err] = await getPod(POD_NAME, serverNamespace);
if (err) {
const { message, additionalInfo } = err;
console.error(message, additionalInfo);
return [undefined, `Failed to get host pod: ${message}`];
}
hostPod = pod;
return [hostPod, undefined];
}

View File

@ -0,0 +1,167 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import { TEST_ONLY as K8S_TEST_EXPORT } from './k8s-helper';
describe('k8s-helper', () => {
describe('parseTensorboardLogDir', () => {
const podTemplateSpec = {
spec: {
containers: [
{
volumeMounts: [
{
name: 'output',
mountPath: '/data',
},
{
name: 'artifact',
subPath: 'pipeline1',
mountPath: '/data1',
},
{
name: 'artifact',
subPath: 'pipeline2',
mountPath: '/data2',
},
],
},
],
volumes: [
{
name: 'output',
hostPath: {
path: '/data/output',
type: 'Directory',
},
},
{
name: 'artifact',
persistentVolumeClaim: {
claimName: 'artifact_pvc',
},
},
],
},
};
it('handles not volume storage', () => {
const logdir = 'gs://testbucket/test/key/path';
const url = K8S_TEST_EXPORT.parseTensorboardLogDir(logdir, podTemplateSpec);
expect(url).toEqual(logdir);
});
it('handles not volume storage with Series', () => {
const logdir =
'Series1:gs://testbucket/test/key/path1,Series2:gs://testbucket/test/key/path2';
const url = K8S_TEST_EXPORT.parseTensorboardLogDir(logdir, podTemplateSpec);
expect(url).toEqual(logdir);
});
it('handles volume storage without subPath', () => {
const logdir = 'volume://output';
const url = K8S_TEST_EXPORT.parseTensorboardLogDir(logdir, podTemplateSpec);
expect(url).toEqual('/data');
});
it('handles volume storage without subPath with Series', () => {
const logdir = 'Series1:volume://output/volume/path1,Series2:volume://output/volume/path2';
const url = K8S_TEST_EXPORT.parseTensorboardLogDir(logdir, podTemplateSpec);
expect(url).toEqual('Series1:/data/volume/path1,Series2:/data/volume/path2');
});
it('handles volume storage with subPath', () => {
const logdir = 'volume://artifact/pipeline1';
const url = K8S_TEST_EXPORT.parseTensorboardLogDir(logdir, podTemplateSpec);
expect(url).toEqual('/data1');
});
it('handles volume storage with subPath with Series', () => {
const logdir =
'Series1:volume://artifact/pipeline1/path1,Series2:volume://artifact/pipeline2/path2';
const url = K8S_TEST_EXPORT.parseTensorboardLogDir(logdir, podTemplateSpec);
expect(url).toEqual('Series1:/data1/path1,Series2:/data2/path2');
});
it('handles volume storage without subPath throw volume not configured error', () => {
const logdir = 'volume://other/path';
expect(() => K8S_TEST_EXPORT.parseTensorboardLogDir(logdir, podTemplateSpec)).toThrowError(
'Cannot find file "volume://other/path" in pod "unknown": volume "other" not configured',
);
});
it('handles volume storage without subPath throw volume not configured error with Series', () => {
const logdir = 'Series1:volume://output/volume/path1,Series2:volume://other/volume/path2';
expect(() => K8S_TEST_EXPORT.parseTensorboardLogDir(logdir, podTemplateSpec)).toThrowError(
'Cannot find file "volume://other/volume/path2" in pod "unknown": volume "other" not configured',
);
});
it('handles volume storage without subPath throw volume not mounted', () => {
const noMountPodTemplateSpec = {
spec: {
volumes: [
{
name: 'artifact',
persistentVolumeClaim: {
claimName: 'artifact_pvc',
},
},
],
},
};
const logdir = 'volume://artifact/path1';
expect(() =>
K8S_TEST_EXPORT.parseTensorboardLogDir(logdir, noMountPodTemplateSpec),
).toThrowError(
'Cannot find file "volume://artifact/path1" in pod "unknown": container "" not found',
);
});
it('handles volume storage without volumeMounts throw volume not mounted', () => {
const noMountPodTemplateSpec = {
spec: {
containers: [
{
volumeMounts: [
{
name: 'other',
mountPath: '/data',
},
],
},
],
volumes: [
{
name: 'artifact',
persistentVolumeClaim: {
claimName: 'artifact_pvc',
},
},
],
},
};
const logdir = 'volume://artifact/path';
expect(() => K8S_TEST_EXPORT.parseTensorboardLogDir(logdir, podTemplateSpec)).toThrowError(
'Cannot find file "volume://artifact/path" in pod "unknown": volume "artifact" not mounted',
);
});
it('handles volume storage with subPath throw volume mount not found', () => {
const logdir = 'volume://artifact/other';
expect(() => K8S_TEST_EXPORT.parseTensorboardLogDir(logdir, podTemplateSpec)).toThrowError(
'Cannot find file "volume://artifact/other" in pod "unknown": volume "artifact" not mounted or volume "artifact" with subPath (which is prefix of other) not mounted',
);
});
});
});

View File

@ -23,7 +23,7 @@ import {
import * as crypto from 'crypto-js';
import * as fs from 'fs';
import { PartialArgoWorkflow } from './workflow-helper';
import { parseError } from './utils';
import { parseError, findFileOnPodVolume } from './utils';
// If this is running inside a k8s Pod, its namespace should be written at this
// path, this is also how we can tell whether we're running in the cluster.
@ -62,6 +62,37 @@ function getNameOfViewerResource(logdir: string): string {
return 'viewer-' + crypto.SHA1(logdir);
}
/**
* Parse logdir to Support volume:// url in logdir,
* otherwise, there's no need to parse logdir, we can just use them.
*/
function parseTensorboardLogDir(logdir: string, podTemplateSpec: object): string {
const urls: string[] = [];
const seriesParts = logdir.split(',');
for (const seriesPart of seriesParts) {
const strPath = seriesPart.replace(/Series\d+:/g, '');
if (!strPath.startsWith('volume://')) {
urls.push(strPath);
continue;
}
// volume storage: parse real local mount path
const pathParts = strPath.substr('volume://'.length).split('/');
const bucket = pathParts[0];
const key = pathParts.slice(1).join('/');
const [filePath, err] = findFileOnPodVolume(podTemplateSpec, {
volumeMountName: bucket,
filePathInVolume: key,
containerNames: undefined,
});
if (err) {
throw new Error(err);
}
urls.push(filePath);
}
return urls.length === 1 ? urls[0] : urls.map((c, i) => `Series${i + 1}:` + c).join(',');
}
/**
* Create Tensorboard instance via CRD with the given logdir if there is no
* existing Tensorboard instance.
@ -83,7 +114,6 @@ export async function newTensorboardInstance(
);
}
}
const body = {
apiVersion: viewerGroup + '/' + viewerVersion,
kind: 'Viewer',
@ -94,7 +124,7 @@ export async function newTensorboardInstance(
spec: {
podTemplateSpec,
tensorboardSpec: {
logDir: logdir,
logDir: parseTensorboardLogDir(logdir, podTemplateSpec),
tensorflowImage: tfImageName + ':' + tfversion,
},
type: 'tensorboard',
@ -129,13 +159,14 @@ export async function getTensorboardInstance(
// Viewer CRD pod has tensorboard instance running at port 6006 while
// viewer CRD service has tensorboard instance running at port 80. Since
// we return service address here (instead of pod address), so use 80.
// remove to check viewer.body.spec.tensorboardSpec.logDir===logdir
// actually getNameOfViewerResource(logdir) may have hash collision
// but if there is a hash collision, not check logdir will return error tensorboard link
// if check logdir and then create Viewer CRD with same name will break anyway.
// TODO fix hash collision
(viewer: any) => {
if (
viewer &&
viewer.body &&
viewer.body.spec.tensorboardSpec.logDir === logdir &&
viewer.body.spec.type === 'tensorboard'
) {
if (viewer && viewer.body && viewer.body.spec.type === 'tensorboard') {
const address = `http://${viewer.body.metadata.name}-service.${namespace}.svc.cluster.local:80/tensorboard/${viewer.body.metadata.name}/`;
const tfImageParts = viewer.body.spec.tensorboardSpec.tensorflowImage.split(':', 2);
const tfVersion = tfImageParts.length == 2 ? tfImageParts[1] : '';
@ -311,4 +342,5 @@ export async function getK8sSecret(name: string, key: string) {
export const TEST_ONLY = {
k8sV1Client,
k8sV1CustomObjectClient,
parseTensorboardLogDir,
};

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
import { PassThrough } from 'stream';
import { PreviewStream } from './utils';
import { PreviewStream, findFileOnPodVolume, resolveFilePathOnVolume } from './utils';
describe('utils', () => {
describe('PreviewStream', () => {
@ -36,4 +36,173 @@ describe('utils', () => {
dst.once('readable', () => expect(dst.read().toString()).toBe(input));
});
});
describe('findFileOnPodVolume', () => {
const podTemplateSpec = {
spec: {
containers: [
{
volumeMounts: [
{
name: 'output',
mountPath: '/main',
},
{
name: 'artifact',
subPath: 'pipeline1',
mountPath: '/main1',
},
{
name: 'artifact',
subPath: 'pipeline2',
mountPath: '/main2',
},
],
name: 'main',
},
{
volumeMounts: [
{
name: 'output',
mountPath: '/data',
},
{
name: 'artifact',
subPath: 'pipeline1',
mountPath: '/data1',
},
{
name: 'artifact',
subPath: 'pipeline2',
mountPath: '/data2',
},
],
name: 'ml-pipeline-ui',
},
],
volumes: [
{
name: 'output',
hostPath: {
path: '/data/output',
type: 'Directory',
},
},
{
name: 'artifact',
persistentVolumeClaim: {
claimName: 'artifact_pvc',
},
},
],
},
};
it('parse file path with containerNames', () => {
const [filePath, err] = findFileOnPodVolume(podTemplateSpec, {
containerNames: ['ml-pipeline-ui', 'ml-pipeline-ui-artifact'],
volumeMountName: 'output',
filePathInVolume: 'a/b/c',
});
expect(err).toEqual(undefined);
expect(filePath).toEqual('/data/a/b/c');
});
it('parse file path with containerNames and subPath', () => {
const [filePath, err] = findFileOnPodVolume(podTemplateSpec, {
containerNames: ['ml-pipeline-ui', 'ml-pipeline-ui-artifact'],
volumeMountName: 'artifact',
filePathInVolume: 'pipeline1/a/b/c',
});
expect(err).toEqual(undefined);
expect(filePath).toEqual('/data1/a/b/c');
});
it('parse file path without containerNames', () => {
const [filePath, err] = findFileOnPodVolume(podTemplateSpec, {
containerNames: undefined,
volumeMountName: 'output',
filePathInVolume: 'a/b/c',
});
expect(err).toEqual(undefined);
expect(filePath).toEqual('/main/a/b/c');
});
it('parse file path error with not exist volume', () => {
const [filePath, err] = findFileOnPodVolume(podTemplateSpec, {
containerNames: undefined,
volumeMountName: 'other',
filePathInVolume: 'a/b/c',
});
expect(err).toEqual(
'Cannot find file "volume://other/a/b/c" in pod "unknown": volume "other" not configured',
);
expect(filePath).toEqual('');
});
it('parse file path error with not exist container', () => {
const [filePath, err] = findFileOnPodVolume(podTemplateSpec, {
containerNames: ['other1', 'other2'],
volumeMountName: 'output',
filePathInVolume: 'a/b/c',
});
expect(err).toEqual(
'Cannot find file "volume://output/a/b/c" in pod "unknown": container "other1" or "other2" not found',
);
expect(filePath).toEqual('');
});
it('parse file path error with volume not mount error', () => {
const [filePath, err] = findFileOnPodVolume(podTemplateSpec, {
containerNames: undefined,
volumeMountName: 'artifact',
filePathInVolume: 'a/b/c',
});
expect(err).toEqual(
'Cannot find file "volume://artifact/a/b/c" in pod "unknown": volume "artifact" not mounted or volume "artifact" with subPath (which is prefix of a/b/c) not mounted',
);
expect(filePath).toEqual('');
});
});
describe('resolveFilePathOnVolume', () => {
it('undefined volumeMountSubPath', () => {
const path = resolveFilePathOnVolume({
filePathInVolume: 'a/b/c',
volumeMountPath: '/data',
volumeMountSubPath: undefined,
});
expect(path).toEqual(['/data/a/b/c', undefined]);
});
it('with volumeMountSubPath', () => {
const path = resolveFilePathOnVolume({
volumeMountPath: '/data',
filePathInVolume: 'a/b/c',
volumeMountSubPath: 'a',
});
expect(path).toEqual(['/data/b/c', undefined]);
});
it('with multiple layer volumeMountSubPath', () => {
const path = resolveFilePathOnVolume({
volumeMountPath: '/data',
filePathInVolume: 'a/b/c',
volumeMountSubPath: 'a/b',
});
expect(path).toEqual(['/data/c', undefined]);
});
it('with not exist volumeMountSubPath', () => {
const path = resolveFilePathOnVolume({
volumeMountPath: '/data',
filePathInVolume: 'a/b/c',
volumeMountSubPath: 'other',
});
expect(path).toEqual([
'',
'File a/b/c not mounted, expecting the file to be inside volume mount subpath other',
]);
});
});
});

View File

@ -13,6 +13,7 @@
// limitations under the License.
import { readFileSync } from 'fs';
import { Transform, TransformOptions } from 'stream';
import path from 'path';
/** get the server address from host, port, and schema (defaults to 'http'). */
export function getAddress({
@ -65,6 +66,114 @@ export function loadJSON<T>(filepath?: string, defaultValue?: T): T | undefined
}
}
/**
* find final file path in pod:
* 1. check volume and volume mount exist in pod
* 2. if volume mount configured with subPath, check filePathInVolume startsWith subPath and prune filePathInVolume
* 3. concat volume mount path with pruned filePathInVolume as final path or error message if check failed
* @param pod contains volumes and volume mounts info
* @param options
* - containerNames optional, will match to find container or container[0] in pod will be used
* - volumeMountName container volume mount name
* - filePathInVolume file path in volume
* @return [final file path, error message if check failed]
*/
export function findFileOnPodVolume(
pod: any,
options: {
containerNames: string[] | undefined;
volumeMountName: string;
filePathInVolume: string;
},
): [string, string | undefined] {
const { containerNames, volumeMountName, filePathInVolume } = options;
const volumes = pod?.spec?.volumes;
const prefixErrorMessage = `Cannot find file "volume://${volumeMountName}/${filePathInVolume}" in pod "${pod
?.metadata?.name || 'unknown'}":`;
// volumes not specified or volume named ${volumeMountName} not specified
if (!Array.isArray(volumes) || !volumes.find(v => v?.name === volumeMountName)) {
return ['', `${prefixErrorMessage} volume "${volumeMountName}" not configured`];
}
// get pod main container
let container;
if (Array.isArray(pod.spec.containers)) {
if (containerNames) {
// find main container by container name match containerNames
container = pod.spec.containers.find((c: { [name: string]: string }) =>
containerNames.includes(c.name),
);
} else {
// use containers[0] as pod main container
container = pod.spec.containers[0];
}
}
if (!container) {
const containerNamesMessage = containerNames ? containerNames.join('" or "') : '';
return ['', `${prefixErrorMessage} container "${containerNamesMessage}" not found`];
}
const volumeMounts = container.volumeMounts;
if (!Array.isArray(volumeMounts)) {
return ['', `${prefixErrorMessage} volume "${volumeMountName}" not mounted`];
}
// find volumes mount
const volumeMount = volumeMounts.find(v => {
// volume name must be same
if (v?.name !== volumeMountName) {
return false;
}
// if volume subPath set, volume subPath must be prefix of key
if (v?.subPath) {
return filePathInVolume.startsWith(v.subPath);
}
return true;
});
if (!volumeMount) {
return [
'',
`${prefixErrorMessage} volume "${volumeMountName}" not mounted or volume "${volumeMountName}" with subPath (which is prefix of ${filePathInVolume}) not mounted`,
];
}
// resolve file path
const [filePath, err] = resolveFilePathOnVolume({
filePathInVolume,
volumeMountPath: volumeMount.mountPath,
volumeMountSubPath: volumeMount.subPath,
});
if (err) {
return ['', `${prefixErrorMessage} err`];
}
return [filePath, undefined];
}
export function resolveFilePathOnVolume(volume: {
filePathInVolume: string;
volumeMountPath: string;
volumeMountSubPath: string | undefined;
}): [string, string | undefined] {
const { filePathInVolume, volumeMountPath, volumeMountSubPath } = volume;
if (!volumeMountSubPath) {
return [path.join(volumeMountPath, filePathInVolume), undefined];
}
if (filePathInVolume.startsWith(volumeMountSubPath)) {
return [
path.join(volumeMountPath, filePathInVolume.substring(volumeMountSubPath.length)),
undefined,
];
}
return [
'',
`File ${filePathInVolume} not mounted, expecting the file to be inside volume mount subpath ${volumeMountSubPath}`,
];
}
export interface PreviewStreamOptions extends TransformOptions {
peek: number;
}

View File

@ -1108,6 +1108,22 @@ describe('WorkflowParser', () => {
source: StorageService.HTTPS,
});
});
it('handles volume file without path', () => {
expect(WorkflowParser.parseStoragePath('volume://output')).toEqual({
bucket: 'output',
key: '',
source: StorageService.VOLUME,
});
});
it('handles volume file with path', () => {
expect(WorkflowParser.parseStoragePath('volume://output/path/foo/bar')).toEqual({
bucket: 'output',
key: 'path/foo/bar',
source: StorageService.VOLUME,
});
});
});
describe('getOutboundNodes', () => {

View File

@ -37,6 +37,7 @@ export enum StorageService {
HTTPS = 'https',
MINIO = 'minio',
S3 = 's3',
VOLUME = 'volume',
}
export interface StoragePath {
@ -361,6 +362,13 @@ export default class WorkflowParser {
key: pathParts.slice(1).join('/'),
source: StorageService.HTTPS,
};
} else if (strPath.startsWith('volume://')) {
const pathParts = strPath.substr('volume://'.length).split('/');
return {
bucket: pathParts[0],
key: pathParts.slice(1).join('/'),
source: StorageService.VOLUME,
};
} else {
throw new Error('Unsupported storage path: ' + strPath);
}