277 lines
9.8 KiB
TypeScript
277 lines
9.8 KiB
TypeScript
import { Stream } from 'stream';
|
|
// Copyright 2019-2020 The Kubeflow Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
import { Transform, PassThrough } from 'stream';
|
|
import * as tar from 'tar-stream';
|
|
import peek from 'peek-stream';
|
|
import gunzip from 'gunzip-maybe';
|
|
import { URL } from 'url';
|
|
import { Client as MinioClient, ClientOptions as MinioClientOptions } from 'minio';
|
|
import { isAWSS3Endpoint } from './aws-helper';
|
|
import { S3ProviderInfo } from './handlers/artifacts';
|
|
import { getK8sSecret } from './k8s-helper';
|
|
import { parseJSONString } from './utils';
|
|
const { fromNodeProviderChain } = require('@aws-sdk/credential-providers');
|
|
/** MinioRequestConfig describes the info required to retrieve an artifact. */
|
|
export interface MinioRequestConfig {
|
|
bucket: string;
|
|
key: string;
|
|
client: MinioClient;
|
|
tryExtract?: boolean;
|
|
}
|
|
|
|
/** MinioClientOptionsWithOptionalSecrets wraps around MinioClientOptions where only endPoint is required (accesskey and secretkey are optional). */
|
|
export interface MinioClientOptionsWithOptionalSecrets extends Partial<MinioClientOptions> {
|
|
endPoint: string;
|
|
}
|
|
|
|
/**
|
|
* Create minio client for s3 compatible storage
|
|
*
|
|
* If providerInfoString is available, use these over defaultConfigs.
|
|
*
|
|
* If providerInfo is not provided or, if credentials are sourced fromEnv,
|
|
* then, if using aws s3 (via provider chain or instance profile), create a
|
|
* minio client backed by aws s3 client.
|
|
*
|
|
* Otherwise, assume s3 compatible credentials have been provided via configs
|
|
* (defaultConfigs or ProviderInfo), and return a minio client configured
|
|
* respectively.
|
|
*
|
|
* @param config minio client options where `accessKey` and `secretKey` are optional.
|
|
* @param providerType provider type ('s3' or 'minio')
|
|
* @param authorizeFn
|
|
* @param req
|
|
* @param namespace
|
|
* @param providerInfoString?? json string container optional provider info
|
|
*/
|
|
export async function createMinioClient(
|
|
config: MinioClientOptionsWithOptionalSecrets,
|
|
providerType: string,
|
|
providerInfoString?: string,
|
|
namespace?: string,
|
|
) {
|
|
if (providerInfoString) {
|
|
const providerInfo = parseJSONString<S3ProviderInfo>(providerInfoString);
|
|
if (!providerInfo) {
|
|
throw new Error('Failed to parse provider info.');
|
|
}
|
|
// If fromEnv == false, we rely on the default credentials or env to provide credentials (e.g. IRSA)
|
|
if (providerInfo.Params.fromEnv === 'false') {
|
|
if (!namespace) {
|
|
throw new Error('Artifact Store provider given, but no namespace provided.');
|
|
} else {
|
|
config = await parseS3ProviderInfo(config, providerInfo, namespace);
|
|
}
|
|
}
|
|
}
|
|
|
|
// If using s3 and sourcing credentials from environment (currently only aws is supported)
|
|
if (providerType === 's3' && !(config.accessKey && config.secretKey)) {
|
|
// AWS S3 with credentials from provider chain
|
|
if (isAWSS3Endpoint(config.endPoint)) {
|
|
try {
|
|
const credentials = fromNodeProviderChain();
|
|
const awsCredentials = await credentials();
|
|
if (awsCredentials) {
|
|
const {
|
|
accessKeyId: accessKey,
|
|
secretAccessKey: secretKey,
|
|
sessionToken,
|
|
} = awsCredentials;
|
|
return new MinioClient({ ...config, accessKey, secretKey, sessionToken });
|
|
}
|
|
} catch (e) {
|
|
console.error('Unable to get aws instance profile credentials: ', e);
|
|
}
|
|
} else {
|
|
console.error(
|
|
'Encountered S3-compatible provider type with no provided credentials, and unsupported environment based credential support.',
|
|
);
|
|
}
|
|
}
|
|
|
|
// If using any AWS or S3 compatible store (e.g. minio, aws s3 when using manual creds, ceph, etc.)
|
|
let mc: MinioClient;
|
|
try {
|
|
mc = await new MinioClient(config as MinioClientOptions);
|
|
} catch (err) {
|
|
throw new Error(`Failed to create MinioClient: ${err}`);
|
|
}
|
|
return mc;
|
|
}
|
|
|
|
// Parse provider info for any s3 compatible store that's not AWS S3
|
|
async function parseS3ProviderInfo(
|
|
config: MinioClientOptionsWithOptionalSecrets,
|
|
providerInfo: S3ProviderInfo,
|
|
namespace: string,
|
|
): Promise<MinioClientOptionsWithOptionalSecrets> {
|
|
if (
|
|
!providerInfo.Params.accessKeyKey ||
|
|
!providerInfo.Params.secretKeyKey ||
|
|
!providerInfo.Params.secretName
|
|
) {
|
|
throw new Error(
|
|
'Provider info with fromEnv:false supplied with incomplete secret credential info.',
|
|
);
|
|
}
|
|
|
|
try {
|
|
config.accessKey = await getK8sSecret(
|
|
providerInfo.Params.secretName,
|
|
providerInfo.Params.accessKeyKey,
|
|
namespace,
|
|
);
|
|
config.secretKey = await getK8sSecret(
|
|
providerInfo.Params.secretName,
|
|
providerInfo.Params.secretKeyKey,
|
|
namespace,
|
|
);
|
|
} catch (e) {
|
|
throw new Error(
|
|
`Encountered error when trying to fetch provider secret ${providerInfo.Params.secretName}.`,
|
|
);
|
|
}
|
|
|
|
if (isAWSS3Endpoint(providerInfo.Params.endpoint)) {
|
|
if (providerInfo.Params.endpoint) {
|
|
if (providerInfo.Params.endpoint.startsWith('https')) {
|
|
const parseEndpoint = new URL(providerInfo.Params.endpoint);
|
|
config.endPoint = parseEndpoint.hostname;
|
|
} else {
|
|
config.endPoint = providerInfo.Params.endpoint;
|
|
}
|
|
} else {
|
|
throw new Error('Provider info missing endpoint parameter.');
|
|
}
|
|
|
|
if (providerInfo.Params.region) {
|
|
config.region = providerInfo.Params.region;
|
|
}
|
|
|
|
// It's possible the user specifies these via config
|
|
// since aws s3 and s3-compatible use the same config parameters
|
|
// safeguard the user by ensuring these remain unset (default)
|
|
config.port = undefined;
|
|
config.useSSL = undefined;
|
|
} else {
|
|
if (providerInfo.Params.endpoint) {
|
|
const url = providerInfo.Params.endpoint;
|
|
// this is a bit of a hack to add support for endpoints without a protocol (required by WHATWG URL standard)
|
|
// example: <ip>:<port> format. In general should expect most endpoints to provide a protocol as serviced
|
|
// by the backend
|
|
const parseEndpoint = new URL(url.startsWith('http') ? url : `https://${url}`);
|
|
const host = parseEndpoint.hostname;
|
|
const port = parseEndpoint.port;
|
|
config.endPoint = host;
|
|
// user provided port in endpoint takes precedence
|
|
// e.g. if the user has provided <service-name>.<namespace>.svc.cluster.local:<service-port>
|
|
config.port = port ? Number(port) : undefined;
|
|
}
|
|
|
|
config.region = providerInfo.Params.region ? providerInfo.Params.region : undefined;
|
|
|
|
if (providerInfo.Params.disableSSL) {
|
|
config.useSSL = !(providerInfo.Params.disableSSL.toLowerCase() === 'true');
|
|
} else {
|
|
config.useSSL = undefined;
|
|
}
|
|
}
|
|
return config;
|
|
}
|
|
|
|
/**
|
|
* Checks the magic number of a buffer to see if the mime type is a uncompressed
|
|
* tarball. The buffer must be of length 264 bytes or more.
|
|
*
|
|
* See also: https://www.gnu.org/software/tar/manual/html_node/Standard.html
|
|
*
|
|
* @param buf Buffer
|
|
*/
|
|
export function isTarball(buf: Buffer) {
|
|
if (!buf || buf.length < 264) {
|
|
return false;
|
|
}
|
|
const offset = 257;
|
|
const v1 = [0x75, 0x73, 0x74, 0x61, 0x72, 0x00, 0x30, 0x30];
|
|
const v0 = [0x75, 0x73, 0x74, 0x61, 0x72, 0x20, 0x20, 0x00];
|
|
|
|
return (
|
|
v1.reduce((res, curr, i) => res && curr === buf[offset + i], true) ||
|
|
v0.reduce((res, curr, i) => res && curr === buf[offset + i], true as boolean)
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Returns a stream that extracts the first record of a tarball if the source
|
|
* stream is a tarball, otherwise just pipe the content as is.
|
|
*/
|
|
export function maybeTarball(): Transform {
|
|
return peek(
|
|
{ newline: false, maxBuffer: 264 },
|
|
(data: Buffer, swap: (error?: Error, parser?: Transform) => void) => {
|
|
if (isTarball(data)) swap(undefined, extractFirstTarRecordAsStream());
|
|
else swap(undefined, new PassThrough());
|
|
},
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Returns a transform stream where the first record inside a tarball will be
|
|
* pushed - i.e. all other contents will be dropped.
|
|
*/
|
|
function extractFirstTarRecordAsStream() {
|
|
const extract = tar.extract();
|
|
const transformStream = new Transform({
|
|
write: (chunk: any, _encoding: string, callback: (error?: Error | null) => void) => {
|
|
extract.write(chunk, callback);
|
|
},
|
|
});
|
|
extract.once('entry', function(_header, stream, next) {
|
|
stream.on('data', (buffer: any) => transformStream.push(buffer));
|
|
stream.on('end', () => {
|
|
transformStream.emit('end');
|
|
next();
|
|
});
|
|
stream.resume(); // just auto drain the stream
|
|
});
|
|
extract.on('error', error => transformStream.emit('error', error));
|
|
return transformStream;
|
|
}
|
|
|
|
/**
|
|
* Returns a stream from an object in a s3 compatible object store (e.g. minio).
|
|
* The actual content of the stream depends on the object.
|
|
*
|
|
* Any gzipped or deflated objects will be ungzipped or inflated. If the object
|
|
* is a tarball, only the content of the first record in the tarball will be
|
|
* returned. For any other objects, the raw content will be returned.
|
|
*
|
|
* @param param.bucket Bucket name to retrieve the object from.
|
|
* @param param.key Key of the object to retrieve.
|
|
* @param param.client Minio client.
|
|
* @param param.tryExtract Whether we try to extract *.tar.gz, default to true.
|
|
*
|
|
*/
|
|
export async function getObjectStream({
|
|
bucket,
|
|
key,
|
|
client,
|
|
tryExtract = true,
|
|
}: MinioRequestConfig): Promise<Transform> {
|
|
const stream = await client.getObject(bucket, key);
|
|
return tryExtract ? stream.pipe(gunzip()).pipe(maybeTarball()) : stream.pipe(new PassThrough());
|
|
}
|