143 lines
5.1 KiB
TypeScript
143 lines
5.1 KiB
TypeScript
import { Stream } from 'stream';
|
|
// Copyright 2019-2020 The Kubeflow Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
import { Transform, PassThrough } from 'stream';
|
|
import * as tar from 'tar-stream';
|
|
import peek from 'peek-stream';
|
|
import gunzip from 'gunzip-maybe';
|
|
import { Client as MinioClient, ClientOptions as MinioClientOptions } from 'minio';
|
|
import { awsInstanceProfileCredentials } from './aws-helper';
|
|
|
|
/** MinioRequestConfig describes the info required to retrieve an artifact. */
|
|
export interface MinioRequestConfig {
|
|
bucket: string;
|
|
key: string;
|
|
client: MinioClient;
|
|
tryExtract?: boolean;
|
|
}
|
|
|
|
/** MinioClientOptionsWithOptionalSecrets wraps around MinioClientOptions where only endPoint is required (accesskey and secretkey are optional). */
|
|
export interface MinioClientOptionsWithOptionalSecrets extends Partial<MinioClientOptions> {
|
|
endPoint: string;
|
|
}
|
|
|
|
/**
|
|
* Create minio client with aws instance profile credentials if needed.
|
|
* @param config minio client options where `accessKey` and `secretKey` are optional.
|
|
*/
|
|
export async function createMinioClient(config: MinioClientOptionsWithOptionalSecrets) {
|
|
if (!config.accessKey || !config.secretKey) {
|
|
try {
|
|
if (await awsInstanceProfileCredentials.ok()) {
|
|
const credentials = await awsInstanceProfileCredentials.getCredentials();
|
|
if (credentials) {
|
|
const {
|
|
AccessKeyId: accessKey,
|
|
SecretAccessKey: secretKey,
|
|
Token: sessionToken,
|
|
} = credentials;
|
|
return new MinioClient({ ...config, accessKey, secretKey, sessionToken });
|
|
}
|
|
console.error('unable to get credentials from AWS metadata store.');
|
|
}
|
|
} catch (err) {
|
|
console.error('Unable to get aws instance profile credentials: ', err);
|
|
}
|
|
}
|
|
return new MinioClient(config as MinioClientOptions);
|
|
}
|
|
|
|
/**
|
|
* Checks the magic number of a buffer to see if the mime type is a uncompressed
|
|
* tarball. The buffer must be of length 264 bytes or more.
|
|
*
|
|
* See also: https://www.gnu.org/software/tar/manual/html_node/Standard.html
|
|
*
|
|
* @param buf Buffer
|
|
*/
|
|
export function isTarball(buf: Buffer) {
|
|
if (!buf || buf.length < 264) {
|
|
return false;
|
|
}
|
|
const offset = 257;
|
|
const v1 = [0x75, 0x73, 0x74, 0x61, 0x72, 0x00, 0x30, 0x30];
|
|
const v0 = [0x75, 0x73, 0x74, 0x61, 0x72, 0x20, 0x20, 0x00];
|
|
|
|
return (
|
|
v1.reduce((res, curr, i) => res && curr === buf[offset + i], true) ||
|
|
v0.reduce((res, curr, i) => res && curr === buf[offset + i], true as boolean)
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Returns a stream that extracts the first record of a tarball if the source
|
|
* stream is a tarball, otherwise just pipe the content as is.
|
|
*/
|
|
export function maybeTarball(): Transform {
|
|
return peek(
|
|
{ newline: false, maxBuffer: 264 },
|
|
(data: Buffer, swap: (error?: Error, parser?: Transform) => void) => {
|
|
if (isTarball(data)) swap(undefined, extractFirstTarRecordAsStream());
|
|
else swap(undefined, new PassThrough());
|
|
},
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Returns a transform stream where the first record inside a tarball will be
|
|
* pushed - i.e. all other contents will be dropped.
|
|
*/
|
|
function extractFirstTarRecordAsStream() {
|
|
const extract = tar.extract();
|
|
const transformStream = new Transform({
|
|
write: (chunk: any, encoding: string, callback: (error?: Error | null) => void) => {
|
|
extract.write(chunk, encoding, callback);
|
|
},
|
|
});
|
|
extract.once('entry', function(_header, stream, next) {
|
|
stream.on('data', (buffer: any) => transformStream.push(buffer));
|
|
stream.on('end', () => {
|
|
transformStream.emit('end');
|
|
next();
|
|
});
|
|
stream.resume(); // just auto drain the stream
|
|
});
|
|
extract.on('error', error => transformStream.emit('error', error));
|
|
return transformStream;
|
|
}
|
|
|
|
/**
|
|
* Returns a stream from an object in a s3 compatible object store (e.g. minio).
|
|
* The actual content of the stream depends on the object.
|
|
*
|
|
* Any gzipped or deflated objects will be ungzipped or inflated. If the object
|
|
* is a tarball, only the content of the first record in the tarball will be
|
|
* returned. For any other objects, the raw content will be returned.
|
|
*
|
|
* @param param.bucket Bucket name to retrieve the object from.
|
|
* @param param.key Key of the object to retrieve.
|
|
* @param param.client Minio client.
|
|
* @param param.tryExtract Whether we try to extract *.tar.gz, default to true.
|
|
*
|
|
*/
|
|
export async function getObjectStream({
|
|
bucket,
|
|
key,
|
|
client,
|
|
tryExtract = true,
|
|
}: MinioRequestConfig): Promise<Transform> {
|
|
const stream = await client.getObject(bucket, key);
|
|
return tryExtract ? stream.pipe(gunzip()).pipe(maybeTarball()) : stream.pipe(new PassThrough());
|
|
}
|