pipelines/frontend/server/handlers/artifacts.ts

395 lines
13 KiB
TypeScript

// Copyright 2019-2020 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import fetch from 'node-fetch';
import { AWSConfigs, HttpConfigs, MinioConfigs, ProcessEnv } from '../configs';
import { Client as MinioClient } from 'minio';
import { PreviewStream, findFileOnPodVolume } from '../utils';
import { createMinioClient, getObjectStream } from '../minio-helper';
import * as serverInfo from '../helpers/server-info';
import { Handler, Request, Response } from 'express';
import { Storage } from '@google-cloud/storage';
import proxy from 'http-proxy-middleware';
import { HACK_FIX_HPM_PARTIAL_RESPONSE_HEADERS } from '../consts';
import * as fs from 'fs';
/**
* ArtifactsQueryStrings describes the expected query strings key value pairs
* in the artifact request object.
*/
interface ArtifactsQueryStrings {
/** artifact source. */
source: 'minio' | 's3' | 'gcs' | 'http' | 'https' | 'volume';
/** bucket name. */
bucket: string;
/** artifact key/path that is uri encoded. */
key: string;
/** return only the first x characters or bytes. */
peek?: number;
}
/**
* Returns an artifact handler which retrieve an artifact from the corresponding
* backend (i.e. gcs, minio, s3, http/https).
* @param artifactsConfigs configs to retrieve the artifacts from the various backend.
* @param useParameter get bucket and key from parameter instead of query. When true, expect
* to be used in a route like `/artifacts/:source/:bucket/*`.
* @param tryExtract whether the handler try to extract content from *.tar.gz files.
*/
export function getArtifactsHandler({
artifactsConfigs,
useParameter,
tryExtract,
}: {
artifactsConfigs: {
aws: AWSConfigs;
http: HttpConfigs;
minio: MinioConfigs;
};
tryExtract: boolean;
useParameter: boolean;
}): Handler {
const { aws, http, minio } = artifactsConfigs;
return async (req, res) => {
const source = useParameter ? req.params.source : req.query.source;
const bucket = useParameter ? req.params.bucket : req.query.bucket;
const key = useParameter ? req.params[0] : req.query.key;
const { peek = 0 } = req.query as Partial<ArtifactsQueryStrings>;
if (!source) {
res.status(500).send('Storage source is missing from artifact request');
return;
}
if (!bucket) {
res.status(500).send('Storage bucket is missing from artifact request');
return;
}
if (!key) {
res.status(500).send('Storage key is missing from artifact request');
return;
}
console.log(`Getting storage artifact at: ${source}: ${bucket}/${key}`);
switch (source) {
case 'gcs':
getGCSArtifactHandler({ bucket, key }, peek)(req, res);
break;
case 'minio':
getMinioArtifactHandler(
{
bucket,
client: new MinioClient(minio),
key,
tryExtract,
},
peek,
)(req, res);
break;
case 's3':
getMinioArtifactHandler(
{
bucket,
client: await createMinioClient(aws),
key,
},
peek,
)(req, res);
break;
case 'http':
case 'https':
getHttpArtifactsHandler(
getHttpUrl(source, http.baseUrl || '', bucket, key),
http.auth,
peek,
)(req, res);
break;
case 'volume':
await getVolumeArtifactsHandler(
{
bucket,
key,
},
peek,
)(req, res);
break;
default:
res.status(500).send('Unknown storage source: ' + source);
return;
}
};
}
/**
* Returns the http/https url to retrieve a kfp artifact (of the form: `${source}://${baseUrl}${bucket}/${key}`)
* @param source "http" or "https".
* @param baseUrl string to prefix the url.
* @param bucket name of the bucket.
* @param key path to the artifact.
*/
function getHttpUrl(source: 'http' | 'https', baseUrl: string, bucket: string, key: string) {
// trim `/` from both ends of the base URL, then append with a single `/` to the end (empty string remains empty)
baseUrl = baseUrl.replace(/^\/*(.+?)\/*$/, '$1/');
return `${source}://${baseUrl}${bucket}/${key}`;
}
function getHttpArtifactsHandler(
url: string,
auth: {
key: string;
defaultValue: string;
} = { key: '', defaultValue: '' },
peek: number = 0,
) {
return async (req: Request, res: Response) => {
const headers = {};
// add authorization header to fetch request if key is non-empty
if (auth.key.length > 0) {
// inject original request's value if exists, otherwise default to provided default value
headers[auth.key] =
req.headers[auth.key] || req.headers[auth.key.toLowerCase()] || auth.defaultValue;
}
const response = await fetch(url, { headers });
response.body
.on('error', err => res.status(500).send(`Unable to retrieve artifact at ${url}: ${err}`))
.pipe(new PreviewStream({ peek }))
.pipe(res);
};
}
function getMinioArtifactHandler(
options: { bucket: string; key: string; client: MinioClient; tryExtract?: boolean },
peek: number = 0,
) {
return async (_: Request, res: Response) => {
try {
const stream = await getObjectStream(options);
stream
.on('error', err =>
res
.status(500)
.send(
`Failed to get object in bucket ${options.bucket} at path ${options.key}: ${err}`,
),
)
.pipe(new PreviewStream({ peek }))
.pipe(res);
} catch (err) {
console.error(err);
res
.status(500)
.send(`Failed to get object in bucket ${options.bucket} at path ${options.key}: ${err}`);
}
};
}
function getGCSArtifactHandler(options: { key: string; bucket: string }, peek: number = 0) {
const { key, bucket } = options;
return async (_: Request, res: Response) => {
try {
// Read all files that match the key pattern, which can include wildcards '*'.
// The way this works is we list all paths whose prefix is the substring
// of the pattern until the first wildcard, then we create a regular
// expression out of the pattern, escaping all non-wildcard characters,
// and we use it to match all enumerated paths.
const storage = new Storage();
const prefix = key.indexOf('*') > -1 ? key.substr(0, key.indexOf('*')) : key;
const files = await storage.bucket(bucket).getFiles({ prefix });
const matchingFiles = files[0].filter(f => {
// Escape regex characters
const escapeRegexChars = (s: string) => s.replace(/[|\\{}()[\]^$+*?.]/g, '\\$&');
// Build a RegExp object that only recognizes asterisks ('*'), and
// escapes everything else.
const regex = new RegExp(
'^' +
key
.split(/\*+/)
.map(escapeRegexChars)
.join('.*') +
'$',
);
return regex.test(f.name);
});
if (!matchingFiles.length) {
console.log('No matching files found.');
res.send();
return;
}
console.log(
`Found ${matchingFiles.length} matching files: `,
matchingFiles.map(file => file.name).join(','),
);
let contents = '';
// TODO: support peek for concatenated matching files
if (peek) {
matchingFiles[0]
.createReadStream()
.pipe(new PreviewStream({ peek }))
.pipe(res);
return;
}
// if not peeking, iterate and append all the files
matchingFiles.forEach((f, i) => {
const buffer: Buffer[] = [];
f.createReadStream()
.on('data', data => buffer.push(Buffer.from(data)))
.on('end', () => {
contents +=
Buffer.concat(buffer)
.toString()
.trim() + '\n';
if (i === matchingFiles.length - 1) {
res.send(contents);
}
})
.on('error', () => res.status(500).send('Failed to read file: ' + f.name));
});
} catch (err) {
res.status(500).send('Failed to download GCS file(s). Error: ' + err);
}
};
}
function getVolumeArtifactsHandler(options: { bucket: string; key: string }, peek: number = 0) {
const { key, bucket } = options;
return async (req: Request, res: Response) => {
try {
const [pod, err] = await serverInfo.getHostPod();
if (err) {
res.status(500).send(err);
return;
}
if (!pod) {
res.status(500).send('Could not get server pod');
return;
}
// ml-pipeline-ui server container name also be called 'ml-pipeline-ui-artifact' in KFP multi user mode.
// https://github.com/kubeflow/manifests/blob/master/pipeline/installs/multi-user/pipelines-profile-controller/sync.py#L212
const [filePath, parseError] = findFileOnPodVolume(pod, {
containerNames: ['ml-pipeline-ui', 'ml-pipeline-ui-artifact'],
volumeMountName: bucket,
filePathInVolume: key,
});
if (parseError) {
res.status(404).send(`Failed to open volume://${bucket}/${key}, ${parseError}`);
return;
}
// TODO: support directory and support filePath include wildcards '*'
const stat = await fs.promises.stat(filePath);
if (stat.isDirectory()) {
res
.status(400)
.send(
`Failed to open volume://${bucket}/${key}, file ${filePath} is directory, does not support now`,
);
return;
}
fs.createReadStream(filePath)
.pipe(new PreviewStream({ peek }))
.pipe(res);
} catch (err) {
res.status(500).send(`Failed to open volume://${bucket}/${key}: ${err}`);
}
};
}
const ARTIFACTS_PROXY_DEFAULTS = {
serviceName: 'ml-pipeline-ui-artifact',
servicePort: '80',
};
export type NamespacedServiceGetter = (namespace: string) => string;
export interface ArtifactsProxyConfig {
serviceName: string;
servicePort: number;
enabled: boolean;
}
export function loadArtifactsProxyConfig(env: ProcessEnv): ArtifactsProxyConfig {
const {
ARTIFACTS_SERVICE_PROXY_NAME = ARTIFACTS_PROXY_DEFAULTS.serviceName,
ARTIFACTS_SERVICE_PROXY_PORT = ARTIFACTS_PROXY_DEFAULTS.servicePort,
ARTIFACTS_SERVICE_PROXY_ENABLED = 'false',
} = env;
return {
serviceName: ARTIFACTS_SERVICE_PROXY_NAME,
servicePort: parseInt(ARTIFACTS_SERVICE_PROXY_PORT, 10),
enabled: ARTIFACTS_SERVICE_PROXY_ENABLED.toLowerCase() === 'true',
};
}
const QUERIES = {
NAMESPACE: 'namespace',
};
export function getArtifactsProxyHandler({
enabled,
namespacedServiceGetter,
}: {
enabled: boolean;
namespacedServiceGetter: NamespacedServiceGetter;
}): Handler {
if (!enabled) {
return (req, res, next) => next();
}
return proxy(
(_pathname, req) => {
// only proxy requests with namespace query parameter
return !!getNamespaceFromUrl(req.url || '');
},
{
changeOrigin: true,
onProxyReq: proxyReq => {
console.log('Proxied artifact request: ', proxyReq.path);
},
pathRewrite: (pathStr, req) => {
const url = new URL(pathStr || '', DUMMY_BASE_PATH);
url.searchParams.delete(QUERIES.NAMESPACE);
return url.pathname + url.search;
},
router: req => {
const namespace = getNamespaceFromUrl(req.url || '');
if (!namespace) {
throw new Error(`namespace query param expected in ${req.url}.`);
}
return namespacedServiceGetter(namespace);
},
target: '/artifacts',
headers: HACK_FIX_HPM_PARTIAL_RESPONSE_HEADERS,
},
);
}
function getNamespaceFromUrl(path: string): string | undefined {
// Gets namespace from query parameter "namespace"
const params = new URL(path, DUMMY_BASE_PATH).searchParams;
return params.get('namespace') || undefined;
}
// `new URL('/path')` doesn't work, because URL only accepts full URL with scheme and hostname.
// We use the DUMMY_BASE_PATH like `new URL('/path', DUMMY_BASE_PATH)`, so that URL can parse paths
// properly.
const DUMMY_BASE_PATH = 'http://dummy-base-path';
export function getArtifactServiceGetter({ serviceName, servicePort }: ArtifactsProxyConfig) {
return (namespace: string) => `http://${serviceName}.${namespace}:${servicePort}`;
}