kfp UI node server support preview and handles gzip, tarball, and raw artifacts in a consistent manner. (#2992)

This commit is contained in:
Eterna2 2020-03-24 13:30:47 +08:00 committed by GitHub
parent 4bee00ecc1
commit a7606a12de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 960 additions and 545 deletions

View File

@ -1,4 +1,4 @@
// Copyright 2019 Google LLC // Copyright 2019-2020 Google LLC
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
@ -186,10 +186,13 @@ describe('UIServer apis', () => {
it('responds with a minio artifact if source=minio', done => { it('responds with a minio artifact if source=minio', done => {
const artifactContent = 'hello world'; const artifactContent = 'hello world';
const mockedMinioClient: jest.Mock = MinioClient as any; const mockedMinioClient: jest.Mock = MinioClient as any;
const mockedGetTarObjectAsString: jest.Mock = minioHelper.getTarObjectAsString as any; const mockedGetObjectStream: jest.Mock = minioHelper.getObjectStream as any;
mockedGetTarObjectAsString.mockImplementationOnce(opt => const objStream = new PassThrough();
objStream.end(artifactContent);
mockedGetObjectStream.mockImplementationOnce(opt =>
opt.bucket === 'ml-pipeline' && opt.key === 'hello/world.txt' opt.bucket === 'ml-pipeline' && opt.key === 'hello/world.txt'
? Promise.resolve(artifactContent) ? Promise.resolve(objStream)
: Promise.reject('Unable to retrieve minio artifact.'), : Promise.reject('Unable to retrieve minio artifact.'),
); );
const configs = loadConfigs(argv, { const configs = loadConfigs(argv, {
@ -249,7 +252,7 @@ describe('UIServer apis', () => {
}); });
}); });
it('responds with a s3 artifact if source=s3', done => { it('responds with partial s3 artifact if peek=5 flag is set', done => {
const artifactContent = 'hello world'; const artifactContent = 'hello world';
const mockedMinioClient: jest.Mock = minioHelper.createMinioClient as any; const mockedMinioClient: jest.Mock = minioHelper.createMinioClient as any;
const mockedGetObjectStream: jest.Mock = minioHelper.getObjectStream as any; const mockedGetObjectStream: jest.Mock = minioHelper.getObjectStream as any;
@ -270,8 +273,8 @@ describe('UIServer apis', () => {
const request = requests(app.start()); const request = requests(app.start());
request request
.get('/artifacts/get?source=s3&bucket=ml-pipeline&key=hello%2Fworld.txt') .get('/artifacts/get?source=s3&bucket=ml-pipeline&key=hello%2Fworld.txt&peek=5')
.expect(200, artifactContent, err => { .expect(200, artifactContent.slice(0, 5), err => {
expect(mockedMinioClient).toBeCalledWith({ expect(mockedMinioClient).toBeCalledWith({
accessKey: 'aws123', accessKey: 'aws123',
endPoint: 's3.amazonaws.com', endPoint: 's3.amazonaws.com',
@ -285,7 +288,10 @@ describe('UIServer apis', () => {
const artifactContent = 'hello world'; const artifactContent = 'hello world';
mockedFetch.mockImplementationOnce((url: string, opts: any) => mockedFetch.mockImplementationOnce((url: string, opts: any) =>
url === 'http://foo.bar/ml-pipeline/hello/world.txt' url === 'http://foo.bar/ml-pipeline/hello/world.txt'
? Promise.resolve({ buffer: () => Promise.resolve(artifactContent) }) ? Promise.resolve({
buffer: () => Promise.resolve(artifactContent),
body: new PassThrough().end(artifactContent),
})
: Promise.reject('Unable to retrieve http artifact.'), : Promise.reject('Unable to retrieve http artifact.'),
); );
const configs = loadConfigs(argv, { const configs = loadConfigs(argv, {
@ -304,12 +310,42 @@ describe('UIServer apis', () => {
}); });
}); });
it('responds with partial http artifact if peek=5 flag is set', done => {
const artifactContent = 'hello world';
const mockedFetch: jest.Mock = fetch as any;
mockedFetch.mockImplementationOnce((url: string, opts: any) =>
url === 'http://foo.bar/ml-pipeline/hello/world.txt'
? Promise.resolve({
buffer: () => Promise.resolve(artifactContent),
body: new PassThrough().end(artifactContent),
})
: Promise.reject('Unable to retrieve http artifact.'),
);
const configs = loadConfigs(argv, {
HTTP_BASE_URL: 'foo.bar/',
});
app = new UIServer(configs);
const request = requests(app.start());
request
.get('/artifacts/get?source=http&bucket=ml-pipeline&key=hello%2Fworld.txt&peek=5')
.expect(200, artifactContent.slice(0, 5), err => {
expect(mockedFetch).toBeCalledWith('http://foo.bar/ml-pipeline/hello/world.txt', {
headers: {},
});
done(err);
});
});
it('responds with a https artifact if source=https', done => { it('responds with a https artifact if source=https', done => {
const artifactContent = 'hello world'; const artifactContent = 'hello world';
mockedFetch.mockImplementationOnce((url: string, opts: any) => mockedFetch.mockImplementationOnce((url: string, opts: any) =>
url === 'https://foo.bar/ml-pipeline/hello/world.txt' && url === 'https://foo.bar/ml-pipeline/hello/world.txt' &&
opts.headers.Authorization === 'someToken' opts.headers.Authorization === 'someToken'
? Promise.resolve({ buffer: () => Promise.resolve(artifactContent) }) ? Promise.resolve({
buffer: () => Promise.resolve(artifactContent),
body: new PassThrough().end(artifactContent),
})
: Promise.reject('Unable to retrieve http artifact.'), : Promise.reject('Unable to retrieve http artifact.'),
); );
const configs = loadConfigs(argv, { const configs = loadConfigs(argv, {
@ -336,7 +372,10 @@ describe('UIServer apis', () => {
const artifactContent = 'hello world'; const artifactContent = 'hello world';
mockedFetch.mockImplementationOnce((url: string, _opts: any) => mockedFetch.mockImplementationOnce((url: string, _opts: any) =>
url === 'https://foo.bar/ml-pipeline/hello/world.txt' url === 'https://foo.bar/ml-pipeline/hello/world.txt'
? Promise.resolve({ buffer: () => Promise.resolve(artifactContent) }) ? Promise.resolve({
buffer: () => Promise.resolve(artifactContent),
body: new PassThrough().end(artifactContent),
})
: Promise.reject('Unable to retrieve http artifact.'), : Promise.reject('Unable to retrieve http artifact.'),
); );
const configs = loadConfigs(argv, { const configs = loadConfigs(argv, {
@ -379,6 +418,26 @@ describe('UIServer apis', () => {
.get('/artifacts/get?source=gcs&bucket=ml-pipeline&key=hello%2Fworld.txt') .get('/artifacts/get?source=gcs&bucket=ml-pipeline&key=hello%2Fworld.txt')
.expect(200, artifactContent + '\n', done); .expect(200, artifactContent + '\n', done);
}); });
it('responds with a partial gcs artifact if peek=5 is set', done => {
const artifactContent = 'hello world';
const mockedGcsStorage: jest.Mock = GCSStorage as any;
const stream = new PassThrough();
stream.end(artifactContent);
mockedGcsStorage.mockImplementationOnce(() => ({
bucket: () => ({
getFiles: () =>
Promise.resolve([[{ name: 'hello/world.txt', createReadStream: () => stream }]]),
}),
}));
const configs = loadConfigs(argv, {});
app = new UIServer(configs);
const request = requests(app.start());
request
.get('/artifacts/get?source=gcs&bucket=ml-pipeline&key=hello%2Fworld.txt&peek=5')
.expect(200, artifactContent.slice(0, 5), done);
});
}); });
describe('/system', () => { describe('/system', () => {
@ -418,6 +477,7 @@ describe('UIServer apis', () => {
.expect(500, 'GKE metadata endpoints are disabled.', done); .expect(500, 'GKE metadata endpoints are disabled.', done);
}); });
}); });
describe('/project-id', () => { describe('/project-id', () => {
it('responds with project id data from gke metadata', done => { it('responds with project id data from gke metadata', done => {
mockedFetch.mockImplementationOnce((url: string, _opts: any) => mockedFetch.mockImplementationOnce((url: string, _opts: any) =>

View File

@ -1,4 +1,4 @@
// Copyright 2019 Google LLC // Copyright 2019-2020 Google LLC
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
@ -11,14 +11,14 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
import { Handler, Request, Response } from 'express';
import fetch from 'node-fetch'; import fetch from 'node-fetch';
import { AWSConfigs, HttpConfigs, MinioConfigs } from '../configs';
import { Client as MinioClient } from 'minio'; import { Client as MinioClient } from 'minio';
import { PreviewStream } from '../utils';
import { createMinioClient, getObjectStream } from '../minio-helper';
import { Handler, Request, Response } from 'express';
import { Storage } from '@google-cloud/storage'; import { Storage } from '@google-cloud/storage';
import { getTarObjectAsString, getObjectStream, createMinioClient } from '../minio-helper';
import { HttpConfigs, AWSConfigs, MinioConfigs } from '../configs';
/** /**
* ArtifactsQueryStrings describes the expected query strings key value pairs * ArtifactsQueryStrings describes the expected query strings key value pairs
* in the artifact request object. * in the artifact request object.
@ -30,6 +30,8 @@ interface ArtifactsQueryStrings {
bucket: string; bucket: string;
/** artifact key/path that is uri encoded. */ /** artifact key/path that is uri encoded. */
key: string; key: string;
/** return only the first x characters or bytes. */
peek?: number;
} }
/** /**
@ -44,7 +46,9 @@ export function getArtifactsHandler(artifactsConfigs: {
}): Handler { }): Handler {
const { aws, http, minio } = artifactsConfigs; const { aws, http, minio } = artifactsConfigs;
return async (req, res) => { return async (req, res) => {
const { source, bucket, key: encodedKey } = req.query as Partial<ArtifactsQueryStrings>; const { source, bucket, key: encodedKey, peek = 0 } = req.query as Partial<
ArtifactsQueryStrings
>;
if (!source) { if (!source) {
res.status(500).send('Storage source is missing from artifact request'); res.status(500).send('Storage source is missing from artifact request');
return; return;
@ -61,31 +65,38 @@ export function getArtifactsHandler(artifactsConfigs: {
console.log(`Getting storage artifact at: ${source}: ${bucket}/${key}`); console.log(`Getting storage artifact at: ${source}: ${bucket}/${key}`);
switch (source) { switch (source) {
case 'gcs': case 'gcs':
getGCSArtifactHandler({ bucket, key })(req, res); getGCSArtifactHandler({ bucket, key }, peek)(req, res);
break; break;
case 'minio': case 'minio':
getMinioArtifactHandler({ getMinioArtifactHandler(
bucket, {
client: new MinioClient(minio), bucket,
key, client: new MinioClient(minio),
})(req, res); key,
},
peek,
)(req, res);
break; break;
case 's3': case 's3':
getS3ArtifactHandler({ getMinioArtifactHandler(
bucket, {
client: await createMinioClient(aws), bucket,
key, client: await createMinioClient(aws),
})(req, res); key,
},
peek,
)(req, res);
break; break;
case 'http': case 'http':
case 'https': case 'https':
getHttpArtifactsHandler(getHttpUrl(source, http.baseUrl || '', bucket, key), http.auth)( getHttpArtifactsHandler(
req, getHttpUrl(source, http.baseUrl || '', bucket, key),
res, http.auth,
); peek,
)(req, res);
break; break;
default: default:
@ -114,6 +125,7 @@ function getHttpArtifactsHandler(
key: string; key: string;
defaultValue: string; defaultValue: string;
} = { key: '', defaultValue: '' }, } = { key: '', defaultValue: '' },
peek: number = 0,
) { ) {
return async (req: Request, res: Response) => { return async (req: Request, res: Response) => {
const headers = {}; const headers = {};
@ -125,32 +137,30 @@ function getHttpArtifactsHandler(
req.headers[auth.key] || req.headers[auth.key.toLowerCase()] || auth.defaultValue; req.headers[auth.key] || req.headers[auth.key.toLowerCase()] || auth.defaultValue;
} }
const response = await fetch(url, { headers }); const response = await fetch(url, { headers });
const content = await response.buffer(); response.body
res.send(content); .on('error', err => res.status(500).send(`Unable to retrieve artifact at ${url}: ${err}`))
.pipe(new PreviewStream({ peek }))
.pipe(res);
}; };
} }
function getS3ArtifactHandler(options: { bucket: string; key: string; client: MinioClient }) { function getMinioArtifactHandler(
options: { bucket: string; key: string; client: MinioClient },
peek: number = 0,
) {
return async (_: Request, res: Response) => { return async (_: Request, res: Response) => {
try { try {
const stream = await getObjectStream(options); const stream = await getObjectStream(options);
stream.on('end', () => res.end()); stream
stream.on('error', err => .on('error', err =>
res res
.status(500) .status(500)
.send(`Failed to get object in bucket ${options.bucket} at path ${options.key}: ${err}`), .send(
); `Failed to get object in bucket ${options.bucket} at path ${options.key}: ${err}`,
stream.pipe(res); ),
} catch (err) { )
res.send(`Failed to get object in bucket ${options.bucket} at path ${options.key}: ${err}`); .pipe(new PreviewStream({ peek }))
} .pipe(res);
};
}
function getMinioArtifactHandler(options: { bucket: string; key: string; client: MinioClient }) {
return async (_: Request, res: Response) => {
try {
res.send(await getTarObjectAsString(options));
} catch (err) { } catch (err) {
res res
.status(500) .status(500)
@ -159,7 +169,7 @@ function getMinioArtifactHandler(options: { bucket: string; key: string; client:
}; };
} }
function getGCSArtifactHandler(options: { key: string; bucket: string }) { function getGCSArtifactHandler(options: { key: string; bucket: string }, peek: number = 0) {
const { key, bucket } = options; const { key, bucket } = options;
return async (_: Request, res: Response) => { return async (_: Request, res: Response) => {
try { try {
@ -197,6 +207,16 @@ function getGCSArtifactHandler(options: { key: string; bucket: string }) {
matchingFiles.map(file => file.name).join(','), matchingFiles.map(file => file.name).join(','),
); );
let contents = ''; let contents = '';
// TODO: support peek for concatenated matching files
if (peek) {
matchingFiles[0]
.createReadStream()
.pipe(new PreviewStream({ peek }))
.pipe(res);
return;
}
// if not peeking, iterate and append all the files
matchingFiles.forEach((f, i) => { matchingFiles.forEach((f, i) => {
const buffer: Buffer[] = []; const buffer: Buffer[] = [];
f.createReadStream() f.createReadStream()

View File

@ -1,4 +1,4 @@
// Copyright 2019 Google LLC // Copyright 2019-2020 Google LLC
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
@ -11,10 +11,11 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
import * as zlib from 'zlib';
import { PassThrough } from 'stream'; import { PassThrough } from 'stream';
import { Client as MinioClient } from 'minio'; import { Client as MinioClient } from 'minio';
import { awsInstanceProfileCredentials } from './aws-helper'; import { awsInstanceProfileCredentials } from './aws-helper';
import { createMinioClient, getTarObjectAsString } from './minio-helper'; import { createMinioClient, isTarball, maybeTarball, getObjectStream } from './minio-helper';
jest.mock('minio'); jest.mock('minio');
jest.mock('./aws-helper'); jest.mock('./aws-helper');
@ -81,31 +82,129 @@ describe('minio-helper', () => {
}); });
}); });
describe('getTarObjectAsString', () => { describe('isTarball', () => {
it('checks magic number in buffer is a tarball.', () => {
const tarGzBase64 =
'H4sIAFa7DV4AA+3PSwrCMBRG4Y5dxV1BuSGPridgwcItkTZSl++johNBJ0WE803OIHfwZ87j0fq2nmuzGVVNIcitXYqPpntXLojzSb33MToVdTG5rhHdbtLLaa55uk5ZBrMhj23ty9u7T+/rT+TZP3HozYosZbL97tdbAAAAAAAAAAAAAAAAAADfuwAyiYcHACgAAA==';
const tarGzBuffer = Buffer.from(tarGzBase64, 'base64');
const tarBuffer = zlib.gunzipSync(tarGzBuffer);
expect(isTarball(tarBuffer)).toBe(true);
});
it('checks magic number in buffer is not a tarball.', () => {
expect(
isTarball(
Buffer.from(
'some-random-string-more-random-string-even-more-random-string-even-even-more-random',
),
),
).toBe(false);
});
});
describe('maybeTarball', () => {
// hello world // hello world
const tarGzBase64 = const tarGzBase64 =
'H4sIAFa7DV4AA+3PSwrCMBRG4Y5dxV1BuSGPridgwcItkTZSl++johNBJ0WE803OIHfwZ87j0fq2nmuzGVVNIcitXYqPpntXLojzSb33MToVdTG5rhHdbtLLaa55uk5ZBrMhj23ty9u7T+/rT+TZP3HozYosZbL97tdbAAAAAAAAAAAAAAAAAADfuwAyiYcHACgAAA=='; 'H4sIAFa7DV4AA+3PSwrCMBRG4Y5dxV1BuSGPridgwcItkTZSl++johNBJ0WE803OIHfwZ87j0fq2nmuzGVVNIcitXYqPpntXLojzSb33MToVdTG5rhHdbtLLaa55uk5ZBrMhj23ty9u7T+/rT+TZP3HozYosZbL97tdbAAAAAAAAAAAAAAAAAADfuwAyiYcHACgAAA==';
const tarGzBuffer = Buffer.from(tarGzBase64, 'base64'); const tarGzBuffer = Buffer.from(tarGzBase64, 'base64');
const tarBuffer = zlib.gunzipSync(tarGzBuffer);
it('unpacks the tar gz and return the string "hello world".', async () => { it('return the content for the 1st file inside a tarball', done => {
const client = new MinioClient({
accessKey: 'minio',
endPoint: 'minio-service.kubeflow',
secretKey: 'minio123',
useSSL: false,
});
const mockedGetObject: jest.Mock = client.getObject as any;
const stream = new PassThrough(); const stream = new PassThrough();
stream.end(tarGzBuffer); const maybeTar = stream.pipe(maybeTarball());
mockedGetObject.mockResolvedValue(stream); stream.end(tarBuffer);
stream.on('end', () => {
const content = await getTarObjectAsString({ expect(maybeTar.read().toString()).toBe('hello world\n');
bucket: 'bucket', done();
client,
key: 'key',
}); });
});
expect(content).toBe('hello world\n'); it('return the content normal if is not a tarball', done => {
const stream = new PassThrough();
const maybeTar = stream.pipe(maybeTarball());
stream.end('hello world');
stream.on('end', () => {
expect(maybeTar.read().toString()).toBe('hello world');
done();
});
});
});
describe('getObjectStream', () => {
// hello world
const tarGzBase64 =
'H4sIAFa7DV4AA+3PSwrCMBRG4Y5dxV1BuSGPridgwcItkTZSl++johNBJ0WE803OIHfwZ87j0fq2nmuzGVVNIcitXYqPpntXLojzSb33MToVdTG5rhHdbtLLaa55uk5ZBrMhj23ty9u7T+/rT+TZP3HozYosZbL97tdbAAAAAAAAAAAAAAAAAADfuwAyiYcHACgAAA==';
const tarGzBuffer = Buffer.from(tarGzBase64, 'base64');
const tarBuffer = zlib.gunzipSync(tarGzBuffer);
let minioClient: MinioClient;
let mockedMinioGetObject: jest.Mock;
beforeEach(() => {
minioClient = new MinioClient({
endPoint: 's3.amazonaws.com',
accessKey: '',
secretKey: '',
});
mockedMinioGetObject = minioClient.getObject as any;
});
afterEach(() => {
mockedMinioGetObject.mockReset();
});
it('unpacks a gzipped tarball', async done => {
const objStream = new PassThrough();
objStream.end(tarGzBuffer);
mockedMinioGetObject.mockResolvedValueOnce(Promise.resolve(objStream));
const stream = await getObjectStream({ bucket: 'bucket', key: 'key', client: minioClient });
expect(mockedMinioGetObject).toBeCalledWith('bucket', 'key');
stream.on('finish', () => {
expect(
stream
.read()
.toString()
.trim(),
).toBe('hello world');
done();
});
});
it('unpacks a uncompressed tarball', async done => {
const objStream = new PassThrough();
objStream.end(tarBuffer);
mockedMinioGetObject.mockResolvedValueOnce(Promise.resolve(objStream));
const stream = await getObjectStream({ bucket: 'bucket', key: 'key', client: minioClient });
expect(mockedMinioGetObject).toBeCalledWith('bucket', 'key');
stream.on('finish', () => {
expect(
stream
.read()
.toString()
.trim(),
).toBe('hello world');
done();
});
});
it('returns the content as a stream', async done => {
const objStream = new PassThrough();
objStream.end('hello world');
mockedMinioGetObject.mockResolvedValueOnce(Promise.resolve(objStream));
const stream = await getObjectStream({ bucket: 'bucket', key: 'key', client: minioClient });
expect(mockedMinioGetObject).toBeCalledWith('bucket', 'key');
stream.on('finish', () => {
expect(
stream
.read()
.toString()
.trim(),
).toBe('hello world');
done();
});
}); });
}); });
}); });

View File

@ -1,4 +1,4 @@
// Copyright 2019 Google LLC // Copyright 2019-2020 Google LLC
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
@ -11,8 +11,10 @@
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
import { Stream } from 'stream'; import { Transform, PassThrough } from 'stream';
import * as tar from 'tar'; import * as tar from 'tar-stream';
import * as peek from 'peek-stream';
import * as gunzip from 'gunzip-maybe';
import { Client as MinioClient, ClientOptions as MinioClientOptions } from 'minio'; import { Client as MinioClient, ClientOptions as MinioClientOptions } from 'minio';
import { awsInstanceProfileCredentials } from './aws-helper'; import { awsInstanceProfileCredentials } from './aws-helper';
@ -54,24 +56,83 @@ export async function createMinioClient(config: MinioClientOptionsWithOptionalSe
return new MinioClient(config as MinioClientOptions); return new MinioClient(config as MinioClientOptions);
} }
export function getTarObjectAsString({ bucket, key, client }: MinioRequestConfig) { /**
return new Promise<string>(async (resolve, reject) => { * Checks the magic number of a buffer to see if the mime type is a uncompressed
try { * tarball. The buffer must be of length 264 bytes or more.
const stream = await getObjectStream({ bucket, key, client }); *
let contents = ''; * See also: https://www.gnu.org/software/tar/manual/html_node/Standard.html
// TODO: fix tar.Parse typing problem *
stream.pipe(new (tar.Parse as any)()).on('entry', (entry: Stream) => { * @param buf Buffer
entry.on('data', buffer => (contents += buffer.toString())); */
}); export function isTarball(buf: Buffer) {
stream.on('end', () => { if (!buf || buf.length < 264) {
resolve(contents); return false;
}); }
} catch (err) { const offset = 257;
reject(err); const v1 = [0x75, 0x73, 0x74, 0x61, 0x72, 0x00, 0x30, 0x30];
} const v0 = [0x75, 0x73, 0x74, 0x61, 0x72, 0x20, 0x20, 0x00];
});
return (
v1.reduce((res, curr, i) => res && curr === buf[offset + i], true) ||
v0.reduce((res, curr, i) => res && curr === buf[offset + i], true)
);
} }
export function getObjectStream({ bucket, key, client }: MinioRequestConfig) { /**
return client.getObject(bucket, key); * Returns a stream that extracts the first record of a tarball if the source
* stream is a tarball, otherwise just pipe the content as is.
*/
export function maybeTarball(): Transform {
return peek(
{ newline: false, maxBuffer: 264 },
(data: Buffer, swap: (error?: Error, parser?: Transform) => void) => {
if (isTarball(data)) swap(undefined, extractFirstTarRecordAsStream());
else swap(undefined, new PassThrough());
},
);
}
/**
* Returns a transform stream where the first record inside a tarball will be
* pushed - i.e. all other contents will be dropped.
*/
function extractFirstTarRecordAsStream() {
const extract = tar.extract();
const transformStream = new Transform({
write: (chunk: any, encoding: string, callback: (error?: Error) => void) => {
extract.write(chunk, encoding, callback);
},
});
extract.once('entry', function(_header, stream, next) {
stream.on('data', (buffer: any) => transformStream.push(buffer));
stream.on('end', () => {
transformStream.emit('end');
next();
});
stream.resume(); // just auto drain the stream
});
extract.on('error', error => transformStream.emit('error', error));
return transformStream;
}
/**
* Returns a stream from an object in a s3 compatible object store (e.g. minio).
* The actual content of the stream depends on the object.
*
* Any gzipped or deflated objects will be ungzipped or inflated. If the object
* is a tarball, only the content of the first record in the tarball will be
* returned. For any other objects, the raw content will be returned.
*
* @param param.bucket Bucket name to retrieve the object from.
* @param param.key Key of the object to retrieve.
* @param param.client Minio client.
*
*/
export async function getObjectStream({
bucket,
key,
client,
}: MinioRequestConfig): Promise<Transform> {
const stream = await client.getObject(bucket, key);
return stream.pipe(gunzip()).pipe(maybeTarball());
} }

File diff suppressed because it is too large Load Diff

View File

@ -7,11 +7,13 @@
"axios": ">=0.19.0", "axios": ">=0.19.0",
"crypto-js": "^3.1.8", "crypto-js": "^3.1.8",
"express": "^4.16.3", "express": "^4.16.3",
"gunzip-maybe": "^1.4.1",
"http-proxy-middleware": "^0.18.0", "http-proxy-middleware": "^0.18.0",
"lodash": ">=4.17.13", "lodash": ">=4.17.13",
"minio": "^7.0.0", "minio": "^7.0.0",
"node-fetch": "^2.1.2", "node-fetch": "^2.1.2",
"tar": "^4.4.13" "peek-stream": "^1.1.3",
"tar-stream": "^2.1.0"
}, },
"devDependencies": { "devDependencies": {
"@types/crypto-js": "^3.1.43", "@types/crypto-js": "^3.1.43",
@ -22,6 +24,7 @@
"@types/node-fetch": "^2.1.2", "@types/node-fetch": "^2.1.2",
"@types/supertest": "^2.0.8", "@types/supertest": "^2.0.8",
"@types/tar": "^4.0.3", "@types/tar": "^4.0.3",
"@types/tar-stream": "^1.6.1",
"jest": "^25.1.0", "jest": "^25.1.0",
"supertest": "^4.0.2", "supertest": "^4.0.2",
"ts-jest": "^25.2.1", "ts-jest": "^25.2.1",

View File

@ -0,0 +1,39 @@
// Copyright 2020 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import { PassThrough } from 'stream';
import { PreviewStream } from './utils';
describe('utils', () => {
describe('PreviewStream', () => {
it('should stream first 5 bytes', done => {
const peek = 5;
const input = 'some string that will be truncated.';
const source = new PassThrough();
const preview = new PreviewStream({ peek });
const dst = source.pipe(preview).on('end', done);
source.end(input);
dst.once('readable', () => expect(dst.read().toString()).toBe(input.slice(0, peek)));
});
it('should stream everything if peek==0', done => {
const peek = 0;
const input = 'some string that will be truncated.';
const source = new PassThrough();
const preview = new PreviewStream({ peek });
const dst = source.pipe(preview).on('end', done);
source.end(input);
dst.once('readable', () => expect(dst.read().toString()).toBe(input));
});
});
});

View File

@ -1,4 +1,4 @@
// Copyright 2018 Google LLC // Copyright 2018-2020 Google LLC
// //
// Licensed under the Apache License, Version 2.0 (the "License"); // Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License. // you may not use this file except in compliance with the License.
@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
import { readFileSync } from 'fs'; import { readFileSync } from 'fs';
import { Transform, TransformOptions } from 'stream';
/** get the server address from host, port, and schema (defaults to 'http'). */ /** get the server address from host, port, and schema (defaults to 'http'). */
export function getAddress({ export function getAddress({
@ -63,3 +64,35 @@ export function loadJSON<T>(filepath?: string, defaultValue?: T): T | undefined
return defaultValue; return defaultValue;
} }
} }
export interface PreviewStreamOptions extends TransformOptions {
peek: number;
}
/**
* Transform stream that only stream the first X number of bytes.
*/
export class PreviewStream extends Transform {
_peek: number;
constructor({ peek, ...opts }: PreviewStreamOptions) {
// acts like passthrough
let transform: TransformOptions['transform'] = (chunk, _encoding, callback) =>
callback(undefined, chunk);
// implements preview - peek must be positive number
if (peek && peek > 0) {
let size = 0;
transform = (chunk, _encoding, callback) => {
const delta = peek - size;
size += chunk.length;
if (size >= peek) {
callback(undefined, chunk.slice(0, delta));
this.resume(); // do not handle any subsequent data
return;
}
callback(undefined, chunk);
};
}
super({ ...opts, transform });
}
}

View File

@ -91,6 +91,8 @@ describe('workflow-helper', () => {
describe('toGetPodLogsStream', () => { describe('toGetPodLogsStream', () => {
it('wraps a getMinioRequestConfig function to return the corresponding object stream.', async () => { it('wraps a getMinioRequestConfig function to return the corresponding object stream.', async () => {
const objStream = new PassThrough(); const objStream = new PassThrough();
objStream.end('some fake logs.');
const client = new MinioClient(minioConfig); const client = new MinioClient(minioConfig);
const mockedClientGetObject: jest.Mock = client.getObject as any; const mockedClientGetObject: jest.Mock = client.getObject as any;
mockedClientGetObject.mockResolvedValueOnce(objStream); mockedClientGetObject.mockResolvedValueOnce(objStream);
@ -104,7 +106,6 @@ describe('workflow-helper', () => {
); );
const stream = await toGetPodLogsStream(createRequest)('podName', 'namespace'); const stream = await toGetPodLogsStream(createRequest)('podName', 'namespace');
expect(mockedClientGetObject).toBeCalledWith('bucket', 'folder/key'); expect(mockedClientGetObject).toBeCalledWith('bucket', 'folder/key');
expect(stream).toBe(objStream);
}); });
}); });
@ -171,6 +172,7 @@ describe('workflow-helper', () => {
const mockedClient: jest.Mock = MinioClient as any; const mockedClient: jest.Mock = MinioClient as any;
const mockedClientGetObject: jest.Mock = MinioClient.prototype.getObject as any; const mockedClientGetObject: jest.Mock = MinioClient.prototype.getObject as any;
mockedClientGetObject.mockResolvedValueOnce(objStream); mockedClientGetObject.mockResolvedValueOnce(objStream);
objStream.end('some fake logs.');
const stream = await getPodLogsStreamFromWorkflow('workflow-name-abc'); const stream = await getPodLogsStreamFromWorkflow('workflow-name-abc');
@ -193,7 +195,6 @@ describe('workflow-helper', () => {
'bucket', 'bucket',
'prefix/workflow-name/workflow-name-abc/main.log', 'prefix/workflow-name/workflow-name-abc/main.log',
); );
expect(stream).toBe(objStream);
}); });
}); });
}); });