pipelines/frontend/server/k8s-helper.ts

377 lines
12 KiB
TypeScript

// Copyright 2018 The Kubeflow Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import {
CoreV1Api,
CustomObjectsApi,
KubeConfig,
V1Pod,
V1EventList,
V1ConfigMap,
} from '@kubernetes/client-node';
import * as crypto from 'crypto-js';
import * as fs from 'fs';
import { PartialArgoWorkflow } from './workflow-helper';
import { parseError, findFileOnPodVolume } from './utils';
// If this is running inside a k8s Pod, its namespace should be written at this
// path, this is also how we can tell whether we're running in the cluster.
const namespaceFilePath = '/var/run/secrets/kubernetes.io/serviceaccount/namespace';
let serverNamespace: string | undefined = undefined;
// Constants for creating customer resource Viewer.
const viewerGroup = 'kubeflow.org';
const viewerVersion = 'v1beta1';
const viewerPlural = 'viewers';
// Constants for argo workflow
const workflowGroup = 'argoproj.io';
const workflowVersion = 'v1alpha1';
const workflowPlural = 'workflows';
/** Default pod template spec used to create tensorboard viewer. */
export const defaultPodTemplateSpec = {
spec: {
containers: [{}],
},
};
// The file path contains pod namespace when in Kubernetes cluster.
if (fs.existsSync(namespaceFilePath)) {
serverNamespace = fs.readFileSync(namespaceFilePath, 'utf-8');
}
const kc = new KubeConfig();
// This loads kubectl config when not in cluster.
kc.loadFromDefault();
const k8sV1Client = kc.makeApiClient(CoreV1Api);
const k8sV1CustomObjectClient = kc.makeApiClient(CustomObjectsApi);
function getNameOfViewerResource(logdir: string): string {
// TODO: find some hash function with shorter resulting message.
return 'viewer-' + crypto.SHA1(logdir);
}
/**
* Parse logdir to Support volume:// url in logdir,
* otherwise, there's no need to parse logdir, we can just use them.
*/
function parseTensorboardLogDir(logdir: string, podTemplateSpec: object): string {
const urls: string[] = [];
const seriesParts = logdir.split(',');
for (const seriesPart of seriesParts) {
const strPath = seriesPart.replace(/Series\d+:/g, '');
if (!strPath.startsWith('volume://')) {
urls.push(strPath);
continue;
}
// volume storage: parse real local mount path
const pathParts = strPath.substr('volume://'.length).split('/');
const bucket = pathParts[0];
const key = pathParts.slice(1).join('/');
const [filePath, err] = findFileOnPodVolume(podTemplateSpec, {
volumeMountName: bucket,
filePathInVolume: key,
containerNames: undefined,
});
if (err) {
throw new Error(err);
}
urls.push(filePath);
}
return urls.length === 1 ? urls[0] : urls.map((c, i) => `Series${i + 1}:` + c).join(',');
}
/**
* Create Tensorboard instance via CRD with the given logdir if there is no
* existing Tensorboard instance.
*/
export async function newTensorboardInstance(
logdir: string,
namespace: string,
tfImageName: string,
tfversion: string,
podTemplateSpec: object = defaultPodTemplateSpec,
): Promise<void> {
const currentPod = await getTensorboardInstance(logdir, namespace);
if (currentPod.podAddress) {
if (tfversion === currentPod.tfVersion) {
return;
} else {
throw new Error(
`There's already an existing tensorboard instance with a different version ${currentPod.tfVersion}`,
);
}
}
const tensorboardImage = tfImageName + (tfversion ? `:${tfversion}` : '');
const body = {
apiVersion: viewerGroup + '/' + viewerVersion,
kind: 'Viewer',
metadata: {
name: getNameOfViewerResource(logdir),
namespace,
},
spec: {
podTemplateSpec,
tensorboardSpec: {
logDir: parseTensorboardLogDir(logdir, podTemplateSpec),
tensorflowImage: tensorboardImage,
},
type: 'tensorboard',
},
};
await k8sV1CustomObjectClient.createNamespacedCustomObject(
viewerGroup,
viewerVersion,
namespace,
viewerPlural,
body,
);
}
/**
* Finds a running Tensorboard instance created via CRD with the given logdir
* and returns its dns address and its version
*/
export async function getTensorboardInstance(
logdir: string,
namespace: string,
): Promise<{ podAddress: string; tfVersion: string; image: string }> {
return await k8sV1CustomObjectClient
.getNamespacedCustomObject(
viewerGroup,
viewerVersion,
namespace,
viewerPlural,
getNameOfViewerResource(logdir),
)
.then(
// Viewer CRD pod has tensorboard instance running at port 6006 while
// viewer CRD service has tensorboard instance running at port 80. Since
// we return service address here (instead of pod address), so use 80.
// remove to check viewer.body.spec.tensorboardSpec.logDir===logdir
// actually getNameOfViewerResource(logdir) may have hash collision
// but if there is a hash collision, not check logdir will return error tensorboard link
// if check logdir and then create Viewer CRD with same name will break anyway.
// TODO fix hash collision
(viewer: any) => {
if (viewer && viewer.body && viewer.body.spec.type === 'tensorboard') {
const address = `http://${viewer.body.metadata.name}-service.${namespace}.svc.cluster.local:80/tensorboard/${viewer.body.metadata.name}/`;
const image = viewer.body.spec.tensorboardSpec.tensorflowImage;
const tfImageParts = image.split(':', 2);
const tfVersion = tfImageParts.length == 2 ? tfImageParts[1] : '';
return { podAddress: address, tfVersion: tfVersion, image };
} else {
return { podAddress: '', tfVersion: '', image: '' };
}
},
// No existing custom object with the given name, i.e., no existing
// tensorboard instance.
err => {
// This is often expected, so only use debug level for logging.
console.debug(
`Failed getting viewer custom object for logdir=${logdir} in ${namespace} namespace, err: `,
err?.body || err,
);
return { podAddress: '', tfVersion: '', image: '' };
},
);
}
/**
* Find a running Tensorboard instance with the given logdir, delete the instance
* and returns the deleted podAddress
*/
export async function deleteTensorboardInstance(logdir: string, namespace: string): Promise<void> {
const currentPod = await getTensorboardInstance(logdir, namespace);
if (!currentPod.podAddress) {
return;
}
const viewerName = getNameOfViewerResource(logdir);
await k8sV1CustomObjectClient.deleteNamespacedCustomObject(
viewerGroup,
viewerVersion,
namespace,
viewerPlural,
viewerName,
);
}
/**
* Polls every second for a running Tensorboard instance with the given logdir,
* and returns the address of one if found, or rejects if a timeout expires.
*/
export function waitForTensorboardInstance(
logdir: string,
namespace: string,
timeout: number,
): Promise<string> {
const start = Date.now();
return new Promise((resolve, reject) => {
const handle = setInterval(async () => {
if (Date.now() - start > timeout) {
clearInterval(handle);
reject('Timed out waiting for tensorboard');
}
const tensorboardInstance = await getTensorboardInstance(logdir, namespace);
const tensorboardAddress = tensorboardInstance.podAddress;
if (tensorboardAddress) {
clearInterval(handle);
resolve(tensorboardAddress);
}
}, 1000);
});
}
export function getPodLogs(
podName: string,
podNamespace?: string,
containerName: string = 'main',
): Promise<string> {
podNamespace = podNamespace || serverNamespace;
if (!podNamespace) {
throw new Error(
`podNamespace is not specified and cannot get namespace from ${namespaceFilePath}.`,
);
}
return (k8sV1Client.readNamespacedPodLog(podName, podNamespace, containerName) as any).then(
(response: any) => (response && response.body ? response.body.toString() : ''),
(error: any) => {
throw new Error(JSON.stringify(error.body));
},
);
}
export interface K8sError {
message: string;
additionalInfo: any;
}
export async function getPod(
podName: string,
podNamespace: string,
): Promise<[V1Pod, undefined] | [undefined, K8sError]> {
try {
const { body } = await k8sV1Client.readNamespacedPod(podName, podNamespace);
return [body, undefined];
} catch (error) {
const { message, additionalInfo } = await parseError(error);
const userMessage = `Could not get pod ${podName} in namespace ${podNamespace}: ${message}`;
return [undefined, { message: userMessage, additionalInfo }];
}
}
/**
* Retrieves a configmap.
* @param configMapName name of the configmap
* @param configMapNamespace namespace of the configmap
*/
export async function getConfigMap(
configMapName: string,
configMapNamespace: string,
): Promise<[V1ConfigMap, undefined] | [undefined, K8sError]> {
try {
const { body } = await k8sV1Client.readNamespacedConfigMap(configMapName, configMapNamespace);
return [body, undefined];
} catch (error) {
const { message, additionalInfo } = await parseError(error);
const userMessage = `Could not get configMap ${configMapName} in namespace ${configMapNamespace}: ${message}`;
return [undefined, { message: userMessage, additionalInfo }];
}
}
// Golang style result type including an error.
export type Result<T, E = K8sError> = [T, undefined] | [undefined, E];
export async function listPodEvents(
podName: string,
podNamespace: string,
): Promise<Result<V1EventList>> {
try {
const { body } = await k8sV1Client.listNamespacedEvent(
podNamespace,
undefined,
undefined,
undefined,
// The following fieldSelector can be found when running
// `kubectl describe <pod-name> -v 8`
// (-v 8) will verbosely print network requests sent by kubectl.
`involvedObject.namespace=${podNamespace},involvedObject.name=${podName},involvedObject.kind=Pod`,
);
return [body, undefined];
} catch (error) {
const { message, additionalInfo } = await parseError(error);
const userMessage = `Error when listing pod events for pod "${podName}" in "${podNamespace}" namespace: ${message}`;
return [undefined, { message: userMessage, additionalInfo }];
}
}
/**
* Retrieves the argo workflow CRD.
* @param workflowName name of the argo workflow
*/
export async function getArgoWorkflow(workflowName: string): Promise<PartialArgoWorkflow> {
if (!serverNamespace) {
throw new Error(`Cannot get namespace from ${namespaceFilePath}`);
}
const res = await k8sV1CustomObjectClient.getNamespacedCustomObject(
workflowGroup,
workflowVersion,
serverNamespace,
workflowPlural,
workflowName,
);
if (res.response.statusCode == null) {
throw new Error(`Unable to query workflow:${workflowName}: No status code present.`);
}
if (res.response.statusCode >= 400) {
throw new Error(`Unable to query workflow:${workflowName}: Access denied.`);
}
return res.body as PartialArgoWorkflow;
}
/**
* Retrieves k8s secret by key and decode from base64.
* @param name name of the secret
* @param key key in the secret
* @param providedNamespace use this namespace when provided, otherwise default to server's namespace
*/
export async function getK8sSecret(name: string, key: string, providedNamespace?: string) {
let namespace = serverNamespace;
if (providedNamespace) {
namespace = providedNamespace;
}
if (!namespace) {
throw new Error(`Cannot get namespace from ${namespaceFilePath}`);
}
const k8sSecret = await k8sV1Client.readNamespacedSecret(name, namespace);
const secretb64 = k8sSecret.body.data?.[key] || '';
const buff = new Buffer(secretb64, 'base64');
return buff.toString('ascii');
}
export const TEST_ONLY = {
k8sV1Client,
k8sV1CustomObjectClient,
parseTensorboardLogDir,
};