pipelines/frontend/server/minio-helper.ts

143 lines
5.1 KiB
TypeScript

import { Stream } from 'stream';
// Copyright 2019-2020 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import { Transform, PassThrough } from 'stream';
import * as tar from 'tar-stream';
import peek from 'peek-stream';
import gunzip from 'gunzip-maybe';
import { Client as MinioClient, ClientOptions as MinioClientOptions } from 'minio';
import { awsInstanceProfileCredentials } from './aws-helper';
/** MinioRequestConfig describes the info required to retrieve an artifact. */
export interface MinioRequestConfig {
bucket: string;
key: string;
client: MinioClient;
tryExtract?: boolean;
}
/** MinioClientOptionsWithOptionalSecrets wraps around MinioClientOptions where only endPoint is required (accesskey and secretkey are optional). */
export interface MinioClientOptionsWithOptionalSecrets extends Partial<MinioClientOptions> {
endPoint: string;
}
/**
* Create minio client with aws instance profile credentials if needed.
* @param config minio client options where `accessKey` and `secretKey` are optional.
*/
export async function createMinioClient(config: MinioClientOptionsWithOptionalSecrets) {
if (!config.accessKey || !config.secretKey) {
try {
if (await awsInstanceProfileCredentials.ok()) {
const credentials = await awsInstanceProfileCredentials.getCredentials();
if (credentials) {
const {
AccessKeyId: accessKey,
SecretAccessKey: secretKey,
Token: sessionToken,
} = credentials;
return new MinioClient({ ...config, accessKey, secretKey, sessionToken });
}
console.error('unable to get credentials from AWS metadata store.');
}
} catch (err) {
console.error('Unable to get aws instance profile credentials: ', err);
}
}
return new MinioClient(config as MinioClientOptions);
}
/**
* Checks the magic number of a buffer to see if the mime type is a uncompressed
* tarball. The buffer must be of length 264 bytes or more.
*
* See also: https://www.gnu.org/software/tar/manual/html_node/Standard.html
*
* @param buf Buffer
*/
export function isTarball(buf: Buffer) {
if (!buf || buf.length < 264) {
return false;
}
const offset = 257;
const v1 = [0x75, 0x73, 0x74, 0x61, 0x72, 0x00, 0x30, 0x30];
const v0 = [0x75, 0x73, 0x74, 0x61, 0x72, 0x20, 0x20, 0x00];
return (
v1.reduce((res, curr, i) => res && curr === buf[offset + i], true) ||
v0.reduce((res, curr, i) => res && curr === buf[offset + i], true as boolean)
);
}
/**
* Returns a stream that extracts the first record of a tarball if the source
* stream is a tarball, otherwise just pipe the content as is.
*/
export function maybeTarball(): Transform {
return peek(
{ newline: false, maxBuffer: 264 },
(data: Buffer, swap: (error?: Error, parser?: Transform) => void) => {
if (isTarball(data)) swap(undefined, extractFirstTarRecordAsStream());
else swap(undefined, new PassThrough());
},
);
}
/**
* Returns a transform stream where the first record inside a tarball will be
* pushed - i.e. all other contents will be dropped.
*/
function extractFirstTarRecordAsStream() {
const extract = tar.extract();
const transformStream = new Transform({
write: (chunk: any, encoding: string, callback: (error?: Error | null) => void) => {
extract.write(chunk, encoding, callback);
},
});
extract.once('entry', function(_header, stream, next) {
stream.on('data', (buffer: any) => transformStream.push(buffer));
stream.on('end', () => {
transformStream.emit('end');
next();
});
stream.resume(); // just auto drain the stream
});
extract.on('error', error => transformStream.emit('error', error));
return transformStream;
}
/**
* Returns a stream from an object in a s3 compatible object store (e.g. minio).
* The actual content of the stream depends on the object.
*
* Any gzipped or deflated objects will be ungzipped or inflated. If the object
* is a tarball, only the content of the first record in the tarball will be
* returned. For any other objects, the raw content will be returned.
*
* @param param.bucket Bucket name to retrieve the object from.
* @param param.key Key of the object to retrieve.
* @param param.client Minio client.
* @param param.tryExtract Whether we try to extract *.tar.gz, default to true.
*
*/
export async function getObjectStream({
bucket,
key,
client,
tryExtract = true,
}: MinioRequestConfig): Promise<Transform> {
const stream = await client.getObject(bucket, key);
return tryExtract ? stream.pipe(gunzip()).pipe(maybeTarball()) : stream.pipe(new PassThrough());
}