feat[frontend]: implement artifact-repositories configmap support (#11354)
* feat[frontend]: implement configmap parsing Signed-off-by: droctothorpe <mythicalsunlight@gmail.com> Co-authored-by: quinnovator <jack@jq.codes> * Improve error handling Signed-off-by: droctothorpe <mythicalsunlight@gmail.com> --------- Signed-off-by: droctothorpe <mythicalsunlight@gmail.com> Co-authored-by: quinnovator <jack@jq.codes>
This commit is contained in:
parent
533eddc942
commit
467f30cf61
|
@ -96,6 +96,13 @@ export function loadConfigs(argv: string[], env: ProcessEnv): UIConfigs {
|
||||||
* https://github.com/kubeflow/pipelines/blob/7b7918ebf8c30e6ceec99283ef20dbc02fdf6a42/manifests/kustomize/third-party/argo/base/workflow-controller-configmap-patch.yaml#L28
|
* https://github.com/kubeflow/pipelines/blob/7b7918ebf8c30e6ceec99283ef20dbc02fdf6a42/manifests/kustomize/third-party/argo/base/workflow-controller-configmap-patch.yaml#L28
|
||||||
*/
|
*/
|
||||||
ARGO_KEYFORMAT = 'artifacts/{{workflow.name}}/{{workflow.creationTimestamp.Y}}/{{workflow.creationTimestamp.m}}/{{workflow.creationTimestamp.d}}/{{pod.name}}',
|
ARGO_KEYFORMAT = 'artifacts/{{workflow.name}}/{{workflow.creationTimestamp.Y}}/{{workflow.creationTimestamp.m}}/{{workflow.creationTimestamp.d}}/{{pod.name}}',
|
||||||
|
/** Argo Workflows lets you specify a unique artifact repository for each
|
||||||
|
* namespace by adding an appropriately formatted configmap to the namespace
|
||||||
|
* as documented here:
|
||||||
|
* https://argo-workflows.readthedocs.io/en/latest/artifact-repository-ref/.
|
||||||
|
* Use this field to enable this lookup. It defaults to false.
|
||||||
|
*/
|
||||||
|
ARGO_ARTIFACT_REPOSITORIES_LOOKUP = 'false',
|
||||||
/** Should use server API for log streaming? */
|
/** Should use server API for log streaming? */
|
||||||
STREAM_LOGS_FROM_SERVER_API = 'false',
|
STREAM_LOGS_FROM_SERVER_API = 'false',
|
||||||
/** The main container name of a pod where logs are retrieved */
|
/** The main container name of a pod where logs are retrieved */
|
||||||
|
@ -132,6 +139,7 @@ export function loadConfigs(argv: string[], env: ProcessEnv): UIConfigs {
|
||||||
archiveBucketName: ARGO_ARCHIVE_BUCKETNAME,
|
archiveBucketName: ARGO_ARCHIVE_BUCKETNAME,
|
||||||
archiveLogs: asBool(ARGO_ARCHIVE_LOGS),
|
archiveLogs: asBool(ARGO_ARCHIVE_LOGS),
|
||||||
keyFormat: ARGO_KEYFORMAT,
|
keyFormat: ARGO_KEYFORMAT,
|
||||||
|
artifactRepositoriesLookup: asBool(ARGO_ARTIFACT_REPOSITORIES_LOOKUP),
|
||||||
},
|
},
|
||||||
pod: {
|
pod: {
|
||||||
logContainerName: POD_LOG_CONTAINER_NAME,
|
logContainerName: POD_LOG_CONTAINER_NAME,
|
||||||
|
@ -259,6 +267,7 @@ export interface ArgoConfigs {
|
||||||
archiveArtifactory: string;
|
archiveArtifactory: string;
|
||||||
archiveBucketName: string;
|
archiveBucketName: string;
|
||||||
keyFormat: string;
|
keyFormat: string;
|
||||||
|
artifactRepositoriesLookup: boolean;
|
||||||
}
|
}
|
||||||
export interface ServerConfigs {
|
export interface ServerConfigs {
|
||||||
basePath: string;
|
basePath: string;
|
||||||
|
|
|
@ -39,7 +39,13 @@ export function getPodLogsHandler(
|
||||||
},
|
},
|
||||||
podLogContainerName: string,
|
podLogContainerName: string,
|
||||||
): Handler {
|
): Handler {
|
||||||
const { archiveLogs, archiveArtifactory, archiveBucketName, keyFormat } = argoOptions;
|
const {
|
||||||
|
archiveLogs,
|
||||||
|
archiveArtifactory,
|
||||||
|
archiveBucketName,
|
||||||
|
keyFormat,
|
||||||
|
artifactRepositoriesLookup,
|
||||||
|
} = argoOptions;
|
||||||
|
|
||||||
// get pod log from the provided bucket and keyFormat.
|
// get pod log from the provided bucket and keyFormat.
|
||||||
const getPodLogsStreamFromArchive = toGetPodLogsStream(
|
const getPodLogsStreamFromArchive = toGetPodLogsStream(
|
||||||
|
@ -47,6 +53,7 @@ export function getPodLogsHandler(
|
||||||
archiveArtifactory === 'minio' ? artifactsOptions.minio : artifactsOptions.aws,
|
archiveArtifactory === 'minio' ? artifactsOptions.minio : artifactsOptions.aws,
|
||||||
archiveBucketName,
|
archiveBucketName,
|
||||||
keyFormat,
|
keyFormat,
|
||||||
|
artifactRepositoriesLookup,
|
||||||
),
|
),
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,7 @@ import {
|
||||||
V1DeleteOptions,
|
V1DeleteOptions,
|
||||||
V1Pod,
|
V1Pod,
|
||||||
V1EventList,
|
V1EventList,
|
||||||
|
V1ConfigMap,
|
||||||
} from '@kubernetes/client-node';
|
} from '@kubernetes/client-node';
|
||||||
import * as crypto from 'crypto-js';
|
import * as crypto from 'crypto-js';
|
||||||
import * as fs from 'fs';
|
import * as fs from 'fs';
|
||||||
|
@ -277,6 +278,25 @@ export async function getPod(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Retrieves a configmap.
|
||||||
|
* @param configMapName name of the configmap
|
||||||
|
* @param configMapNamespace namespace of the configmap
|
||||||
|
*/
|
||||||
|
export async function getConfigMap(
|
||||||
|
configMapName: string,
|
||||||
|
configMapNamespace: string,
|
||||||
|
): Promise<[V1ConfigMap, undefined] | [undefined, K8sError]> {
|
||||||
|
try {
|
||||||
|
const { body } = await k8sV1Client.readNamespacedConfigMap(configMapName, configMapNamespace);
|
||||||
|
return [body, undefined];
|
||||||
|
} catch (error) {
|
||||||
|
const { message, additionalInfo } = await parseError(error);
|
||||||
|
const userMessage = `Could not get configMap ${configMapName} in namespace ${configMapNamespace}: ${message}`;
|
||||||
|
return [undefined, { message: userMessage, additionalInfo }];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Golang style result type including an error.
|
// Golang style result type including an error.
|
||||||
export type Result<T, E = K8sError> = [T, undefined] | [undefined, E];
|
export type Result<T, E = K8sError> = [T, undefined] | [undefined, E];
|
||||||
export async function listPodEvents(
|
export async function listPodEvents(
|
||||||
|
|
|
@ -19,8 +19,10 @@ import {
|
||||||
getPodLogsStreamFromK8s,
|
getPodLogsStreamFromK8s,
|
||||||
getPodLogsStreamFromWorkflow,
|
getPodLogsStreamFromWorkflow,
|
||||||
toGetPodLogsStream,
|
toGetPodLogsStream,
|
||||||
|
getKeyFormatFromArtifactRepositories,
|
||||||
} from './workflow-helper';
|
} from './workflow-helper';
|
||||||
import { getK8sSecret, getArgoWorkflow, getPodLogs } from './k8s-helper';
|
import { getK8sSecret, getArgoWorkflow, getPodLogs, getConfigMap } from './k8s-helper';
|
||||||
|
import { V1ConfigMap, V1ObjectMeta } from '@kubernetes/client-node';
|
||||||
|
|
||||||
jest.mock('minio');
|
jest.mock('minio');
|
||||||
jest.mock('./k8s-helper');
|
jest.mock('./k8s-helper');
|
||||||
|
@ -118,6 +120,40 @@ describe('workflow-helper', () => {
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
describe('getKeyFormatFromArtifactRepositories', () => {
|
||||||
|
it('returns a keyFormat string from the artifact-repositories configmap.', async () => {
|
||||||
|
const artifactRepositories = {
|
||||||
|
'artifact-repositories':
|
||||||
|
'archiveLogs: true\n' +
|
||||||
|
's3:\n' +
|
||||||
|
' accessKeySecret:\n' +
|
||||||
|
' key: accesskey\n' +
|
||||||
|
' name: mlpipeline-minio-artifact\n' +
|
||||||
|
' bucket: mlpipeline\n' +
|
||||||
|
' endpoint: minio-service.kubeflow:9000\n' +
|
||||||
|
' insecure: true\n' +
|
||||||
|
' keyFormat: foo\n' +
|
||||||
|
' secretKeySecret:\n' +
|
||||||
|
' key: secretkey\n' +
|
||||||
|
' name: mlpipeline-minio-artifact',
|
||||||
|
};
|
||||||
|
|
||||||
|
const mockedConfigMap: V1ConfigMap = {
|
||||||
|
apiVersion: 'v1',
|
||||||
|
kind: 'ConfigMap',
|
||||||
|
metadata: new V1ObjectMeta(),
|
||||||
|
data: artifactRepositories,
|
||||||
|
binaryData: {},
|
||||||
|
};
|
||||||
|
|
||||||
|
const mockedGetConfigMap: jest.Mock = getConfigMap as any;
|
||||||
|
mockedGetConfigMap.mockResolvedValueOnce([mockedConfigMap, undefined]);
|
||||||
|
const res = await getKeyFormatFromArtifactRepositories('');
|
||||||
|
expect(mockedGetConfigMap).toBeCalledTimes(1);
|
||||||
|
expect(res).toEqual('foo');
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
describe('createPodLogsMinioRequestConfig', () => {
|
describe('createPodLogsMinioRequestConfig', () => {
|
||||||
it('returns a MinioRequestConfig factory with the provided minioClientOptions, bucket, and prefix.', async () => {
|
it('returns a MinioRequestConfig factory with the provided minioClientOptions, bucket, and prefix.', async () => {
|
||||||
const mockedClient: jest.Mock = MinioClient as any;
|
const mockedClient: jest.Mock = MinioClient as any;
|
||||||
|
@ -125,6 +161,7 @@ describe('workflow-helper', () => {
|
||||||
minioConfig,
|
minioConfig,
|
||||||
'bucket',
|
'bucket',
|
||||||
'artifacts/{{workflow.name}}/{{workflow.creationTimestamp.Y}}/{{workflow.creationTimestamp.m}}/{{workflow.creationTimestamp.d}}/{{pod.name}}',
|
'artifacts/{{workflow.name}}/{{workflow.creationTimestamp.Y}}/{{workflow.creationTimestamp.m}}/{{workflow.creationTimestamp.d}}/{{pod.name}}',
|
||||||
|
true,
|
||||||
);
|
);
|
||||||
const request = await requestFunc(
|
const request = await requestFunc(
|
||||||
'workflow-name-system-container-impl-foo',
|
'workflow-name-system-container-impl-foo',
|
||||||
|
|
|
@ -13,8 +13,9 @@
|
||||||
// limitations under the License.
|
// limitations under the License.
|
||||||
import { PassThrough, Stream } from 'stream';
|
import { PassThrough, Stream } from 'stream';
|
||||||
import { ClientOptions as MinioClientOptions } from 'minio';
|
import { ClientOptions as MinioClientOptions } from 'minio';
|
||||||
import { getK8sSecret, getArgoWorkflow, getPodLogs } from './k8s-helper';
|
import { getK8sSecret, getArgoWorkflow, getPodLogs, getConfigMap } from './k8s-helper';
|
||||||
import { createMinioClient, MinioRequestConfig, getObjectStream } from './minio-helper';
|
import { createMinioClient, MinioRequestConfig, getObjectStream } from './minio-helper';
|
||||||
|
import * as JsYaml from 'js-yaml';
|
||||||
|
|
||||||
export interface PartialArgoWorkflow {
|
export interface PartialArgoWorkflow {
|
||||||
status: {
|
status: {
|
||||||
|
@ -142,18 +143,76 @@ export function toGetPodLogsStream(
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** PartialArtifactRepositoriesValue is used to deserialize the contents of the
|
||||||
|
* artifact-repositories configmap.
|
||||||
|
*/
|
||||||
|
interface PartialArtifactRepositoriesValue {
|
||||||
|
s3?: {
|
||||||
|
keyFormat: string;
|
||||||
|
};
|
||||||
|
gcs?: {
|
||||||
|
keyFormat: string;
|
||||||
|
};
|
||||||
|
oss?: {
|
||||||
|
keyFormat: string;
|
||||||
|
};
|
||||||
|
artifactory?: {
|
||||||
|
keyFormat: string;
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns a MinioRequestConfig with the provided minio options (a MinioRequestConfig
|
* getKeyFormatFromArtifactRepositories attempts to retrieve an
|
||||||
* object contains the artifact bucket and keys, with the corresponding minio
|
* artifact-repositories configmap from a specified namespace. It then parses
|
||||||
* client).
|
* the configmap and returns a keyFormat value in its data field.
|
||||||
|
* @param namespace namespace of the configmap
|
||||||
|
*/
|
||||||
|
export async function getKeyFormatFromArtifactRepositories(
|
||||||
|
namespace: string,
|
||||||
|
): Promise<string | undefined> {
|
||||||
|
try {
|
||||||
|
const [configMap, k8sError] = await getConfigMap('artifact-repositories', namespace);
|
||||||
|
if (configMap === undefined) {
|
||||||
|
throw k8sError;
|
||||||
|
}
|
||||||
|
const artifactRepositories = configMap?.data['artifact-repositories'];
|
||||||
|
const artifactRepositoriesValue = JsYaml.safeLoad(
|
||||||
|
artifactRepositories,
|
||||||
|
) as PartialArtifactRepositoriesValue;
|
||||||
|
if ('s3' in artifactRepositoriesValue) {
|
||||||
|
return artifactRepositoriesValue.s3?.keyFormat;
|
||||||
|
} else if ('gcs' in artifactRepositoriesValue) {
|
||||||
|
return artifactRepositoriesValue.gcs?.keyFormat;
|
||||||
|
} else if ('oss' in artifactRepositoriesValue) {
|
||||||
|
return artifactRepositoriesValue.oss?.keyFormat;
|
||||||
|
} else if ('artifactory' in artifactRepositoriesValue) {
|
||||||
|
return artifactRepositoriesValue.artifactory?.keyFormat;
|
||||||
|
} else {
|
||||||
|
throw new Error(
|
||||||
|
'artifact-repositories configmap missing one of [s3|gcs|oss|artifactory] fields.',
|
||||||
|
);
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
console.log(error);
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a MinioRequestConfig with the provided minio options (a
|
||||||
|
* MinioRequestConfig object contains the artifact bucket and keys, with the
|
||||||
|
* corresponding minio client).
|
||||||
* @param minioOptions Minio options to create a minio client.
|
* @param minioOptions Minio options to create a minio client.
|
||||||
* @param bucket bucket containing the pod logs artifacts.
|
* @param bucket bucket containing the pod logs artifacts.
|
||||||
* @param keyFormat the keyFormat for pod logs artifacts stored in the bucket.
|
* @param keyFormatDefault the default keyFormat for pod logs artifacts stored
|
||||||
|
* in the bucket. This is overriden if there's an "artifact-repositories"
|
||||||
|
* configmap in the target namespace with a keyFormat field.
|
||||||
*/
|
*/
|
||||||
export function createPodLogsMinioRequestConfig(
|
export function createPodLogsMinioRequestConfig(
|
||||||
minioOptions: MinioClientOptions,
|
minioOptions: MinioClientOptions,
|
||||||
bucket: string,
|
bucket: string,
|
||||||
keyFormat: string,
|
keyFormatDefault: string,
|
||||||
|
artifactRepositoriesLookup: boolean,
|
||||||
) {
|
) {
|
||||||
return async (
|
return async (
|
||||||
podName: string,
|
podName: string,
|
||||||
|
@ -164,7 +223,21 @@ export function createPodLogsMinioRequestConfig(
|
||||||
const client = await createMinioClient(minioOptions, 's3');
|
const client = await createMinioClient(minioOptions, 's3');
|
||||||
const createdAtArray = createdAt.split('-');
|
const createdAtArray = createdAt.split('-');
|
||||||
|
|
||||||
let key: string = keyFormat
|
// If artifactRepositoriesLookup is enabled, try to extract they keyformat
|
||||||
|
// from the configmap. Otherwise, just used the default keyFormat specified
|
||||||
|
// in configs.ts.
|
||||||
|
let keyFormatFromConfigMap = undefined;
|
||||||
|
if (artifactRepositoriesLookup) {
|
||||||
|
keyFormatFromConfigMap = await getKeyFormatFromArtifactRepositories(namespace);
|
||||||
|
}
|
||||||
|
let key: string;
|
||||||
|
if (keyFormatFromConfigMap !== undefined) {
|
||||||
|
key = keyFormatFromConfigMap;
|
||||||
|
} else {
|
||||||
|
key = keyFormatDefault;
|
||||||
|
}
|
||||||
|
|
||||||
|
key = key
|
||||||
.replace(/\s+/g, '') // Remove all whitespace.
|
.replace(/\s+/g, '') // Remove all whitespace.
|
||||||
.replace('{{workflow.name}}', podName.replace(/-system-container-impl-.*/, ''))
|
.replace('{{workflow.name}}', podName.replace(/-system-container-impl-.*/, ''))
|
||||||
.replace('{{workflow.creationTimestamp.Y}}', createdAtArray[0])
|
.replace('{{workflow.creationTimestamp.Y}}', createdAtArray[0])
|
||||||
|
|
|
@ -22,6 +22,7 @@ rules:
|
||||||
- ""
|
- ""
|
||||||
resources:
|
resources:
|
||||||
- secrets
|
- secrets
|
||||||
|
- configmaps
|
||||||
verbs:
|
verbs:
|
||||||
- get
|
- get
|
||||||
- list
|
- list
|
||||||
|
|
Loading…
Reference in New Issue