180 lines
6.5 KiB
Python
180 lines
6.5 KiB
Python
# Copyright 2021 The Kubeflow Authors
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import os
|
|
import json
|
|
from kfp.deprecated.onprem import use_k8s_secret
|
|
from kfp.deprecated import dsl, components
|
|
from kfp.deprecated.components import OutputPath, create_component_from_func
|
|
|
|
|
|
prepare_tensorboard = components.load_component_from_url(
|
|
'https://raw.githubusercontent.com/kubeflow/pipelines/1b107eb4bb2510ecb99fd5f4fb438cbf7c96a87a/components/contrib/tensorflow/tensorboard/prepare_tensorboard/component.yaml'
|
|
)
|
|
|
|
|
|
def train(minio_endpoint: str, log_bucket: str, log_dir: str):
|
|
# Reference: https://www.tensorflow.org/tensorboard/get_started
|
|
import tensorflow as tf
|
|
|
|
mnist = tf.keras.datasets.mnist
|
|
|
|
(x_train, y_train), (x_test, y_test) = mnist.load_data()
|
|
x_train, x_test = x_train / 255.0, x_test / 255.0
|
|
|
|
def create_model():
|
|
return tf.keras.models.Sequential([
|
|
tf.keras.layers.Flatten(input_shape=(28, 28)),
|
|
tf.keras.layers.Dense(512, activation='relu'),
|
|
tf.keras.layers.Dropout(0.2),
|
|
tf.keras.layers.Dense(10, activation='softmax')
|
|
])
|
|
|
|
model = create_model()
|
|
model.compile(
|
|
optimizer='adam',
|
|
loss='sparse_categorical_crossentropy',
|
|
metrics=['accuracy']
|
|
)
|
|
|
|
log_dir_local = "logs/fit"
|
|
tensorboard_callback = tf.keras.callbacks.TensorBoard(
|
|
log_dir=log_dir_local, histogram_freq=1
|
|
)
|
|
|
|
model.fit(
|
|
x=x_train,
|
|
y=y_train,
|
|
epochs=5,
|
|
validation_data=(x_test, y_test),
|
|
callbacks=[tensorboard_callback]
|
|
)
|
|
|
|
# Copy the local logs folder to minio.
|
|
#
|
|
# TODO: we may write a filesystem watch process that continuously copy logs
|
|
# dir to minio, so that we can watch live training logs via tensorboard.
|
|
#
|
|
# Note, although tensorflow supports minio via s3:// protocol. We want to
|
|
# demo how minio can be used instead, e.g. the same approach can be used with
|
|
# frameworks only support local path.
|
|
from minio import Minio
|
|
import os
|
|
minio_access_key = os.getenv('MINIO_ACCESS_KEY')
|
|
minio_secret_key = os.getenv('MINIO_SECRET_KEY')
|
|
if not minio_access_key or not minio_secret_key:
|
|
raise Exception('MINIO_ACCESS_KEY or MINIO_SECRET_KEY env is not set')
|
|
client = Minio(
|
|
minio_endpoint,
|
|
access_key=minio_access_key,
|
|
secret_key=minio_secret_key,
|
|
secure=False
|
|
)
|
|
count = 0
|
|
from pathlib import Path
|
|
for path in Path("logs").rglob("*"):
|
|
if not path.is_dir():
|
|
object_name = os.path.join(
|
|
log_dir, os.path.relpath(start=log_dir_local, path=path)
|
|
)
|
|
client.fput_object(
|
|
bucket_name=log_bucket,
|
|
object_name=object_name,
|
|
file_path=path,
|
|
)
|
|
count = count + 1
|
|
print(f'{path} uploaded to minio://{log_bucket}/{object_name}')
|
|
print(f'{count} log files uploaded to minio://{log_bucket}/{log_dir}')
|
|
|
|
|
|
# tensorflow/tensorflow:2.4 may fail with image pull backoff, because of dockerhub rate limiting.
|
|
train_op = create_component_from_func(
|
|
train,
|
|
base_image='gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest',
|
|
packages_to_install=['minio'], # TODO: pin minio version
|
|
)
|
|
|
|
|
|
@dsl.pipeline(name='pipeline-tensorboard-minio')
|
|
def my_pipeline(
|
|
minio_endpoint: str = 'minio-service:9000',
|
|
log_bucket: str = 'mlpipeline',
|
|
log_dir: str = f'tensorboard/logs/{dsl.RUN_ID_PLACEHOLDER}',
|
|
# Pin to tensorflow 2.3, because in 2.4+ tensorboard cannot load in KFP:
|
|
# refer to https://github.com/kubeflow/pipelines/issues/5521.
|
|
tf_image: str = 'gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest'
|
|
):
|
|
# tensorboard uses s3 protocol to access minio
|
|
prepare_tb_task = prepare_tensorboard(
|
|
log_dir_uri=f's3://{log_bucket}/{log_dir}',
|
|
image=tf_image,
|
|
pod_template_spec=json.dumps({
|
|
'spec': {
|
|
'containers': [{
|
|
# These env vars make tensorboard access KFP in-cluster minio
|
|
# using s3 protocol.
|
|
# Reference: https://blog.min.io/hyper-scale-machine-learning-with-minio-and-tensorflow/
|
|
'env': [{
|
|
'name': 'AWS_ACCESS_KEY_ID',
|
|
'valueFrom': {
|
|
'secretKeyRef': {
|
|
'name': 'mlpipeline-minio-artifact',
|
|
'key': 'accesskey'
|
|
}
|
|
}
|
|
}, {
|
|
'name': 'AWS_SECRET_ACCESS_KEY',
|
|
'valueFrom': {
|
|
'secretKeyRef': {
|
|
'name': 'mlpipeline-minio-artifact',
|
|
'key': 'secretkey'
|
|
}
|
|
}
|
|
}, {
|
|
'name': 'AWS_REGION',
|
|
'value': 'minio'
|
|
}, {
|
|
'name': 'S3_ENDPOINT',
|
|
'value': f'{minio_endpoint}',
|
|
}, {
|
|
'name': 'S3_USE_HTTPS',
|
|
'value': '0',
|
|
}, {
|
|
'name': 'S3_VERIFY_SSL',
|
|
'value': '0',
|
|
}]
|
|
}],
|
|
},
|
|
})
|
|
)
|
|
train_task = train_op(
|
|
minio_endpoint=minio_endpoint,
|
|
log_bucket=log_bucket,
|
|
log_dir=log_dir,
|
|
)
|
|
train_task.set_memory_request('2Gi').set_memory_limit('2Gi')
|
|
train_task.apply(
|
|
use_k8s_secret(
|
|
secret_name='mlpipeline-minio-artifact',
|
|
k8s_secret_key_to_env={
|
|
'secretkey': 'MINIO_SECRET_KEY',
|
|
'accesskey': 'MINIO_ACCESS_KEY'
|
|
},
|
|
)
|
|
)
|
|
# optional, let training task use the same tensorflow image as specified tensorboard
|
|
# this does not work in v2 compatible mode
|
|
# train_task.container.image = tf_image
|
|
train_task.after(prepare_tb_task)
|