import { Stream } from 'stream'; // Copyright 2019-2020 The Kubeflow Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. import { Transform, PassThrough } from 'stream'; import * as tar from 'tar-stream'; import peek from 'peek-stream'; import gunzip from 'gunzip-maybe'; import { Client as MinioClient, ClientOptions as MinioClientOptions } from 'minio'; import { awsInstanceProfileCredentials } from './aws-helper'; /** MinioRequestConfig describes the info required to retrieve an artifact. */ export interface MinioRequestConfig { bucket: string; key: string; client: MinioClient; tryExtract?: boolean; } /** MinioClientOptionsWithOptionalSecrets wraps around MinioClientOptions where only endPoint is required (accesskey and secretkey are optional). */ export interface MinioClientOptionsWithOptionalSecrets extends Partial { endPoint: string; } /** * Create minio client with aws instance profile credentials if needed. * @param config minio client options where `accessKey` and `secretKey` are optional. */ export async function createMinioClient(config: MinioClientOptionsWithOptionalSecrets) { if (!config.accessKey || !config.secretKey) { try { if (await awsInstanceProfileCredentials.ok()) { const credentials = await awsInstanceProfileCredentials.getCredentials(); if (credentials) { const { AccessKeyId: accessKey, SecretAccessKey: secretKey, Token: sessionToken, } = credentials; return new MinioClient({ ...config, accessKey, secretKey, sessionToken }); } console.error('unable to get credentials from AWS metadata store.'); } } catch (err) { console.error('Unable to get aws instance profile credentials: ', err); } } return new MinioClient(config as MinioClientOptions); } /** * Checks the magic number of a buffer to see if the mime type is a uncompressed * tarball. The buffer must be of length 264 bytes or more. * * See also: https://www.gnu.org/software/tar/manual/html_node/Standard.html * * @param buf Buffer */ export function isTarball(buf: Buffer) { if (!buf || buf.length < 264) { return false; } const offset = 257; const v1 = [0x75, 0x73, 0x74, 0x61, 0x72, 0x00, 0x30, 0x30]; const v0 = [0x75, 0x73, 0x74, 0x61, 0x72, 0x20, 0x20, 0x00]; return ( v1.reduce((res, curr, i) => res && curr === buf[offset + i], true) || v0.reduce((res, curr, i) => res && curr === buf[offset + i], true as boolean) ); } /** * Returns a stream that extracts the first record of a tarball if the source * stream is a tarball, otherwise just pipe the content as is. */ export function maybeTarball(): Transform { return peek( { newline: false, maxBuffer: 264 }, (data: Buffer, swap: (error?: Error, parser?: Transform) => void) => { if (isTarball(data)) swap(undefined, extractFirstTarRecordAsStream()); else swap(undefined, new PassThrough()); }, ); } /** * Returns a transform stream where the first record inside a tarball will be * pushed - i.e. all other contents will be dropped. */ function extractFirstTarRecordAsStream() { const extract = tar.extract(); const transformStream = new Transform({ write: (chunk: any, encoding: string, callback: (error?: Error | null) => void) => { extract.write(chunk, encoding, callback); }, }); extract.once('entry', function(_header, stream, next) { stream.on('data', (buffer: any) => transformStream.push(buffer)); stream.on('end', () => { transformStream.emit('end'); next(); }); stream.resume(); // just auto drain the stream }); extract.on('error', error => transformStream.emit('error', error)); return transformStream; } /** * Returns a stream from an object in a s3 compatible object store (e.g. minio). * The actual content of the stream depends on the object. * * Any gzipped or deflated objects will be ungzipped or inflated. If the object * is a tarball, only the content of the first record in the tarball will be * returned. For any other objects, the raw content will be returned. * * @param param.bucket Bucket name to retrieve the object from. * @param param.key Key of the object to retrieve. * @param param.client Minio client. * @param param.tryExtract Whether we try to extract *.tar.gz, default to true. * */ export async function getObjectStream({ bucket, key, client, tryExtract = true, }: MinioRequestConfig): Promise { const stream = await client.getObject(bucket, key); return tryExtract ? stream.pipe(gunzip()).pipe(maybeTarball()) : stream.pipe(new PassThrough()); }