pipelines/samples/core/visualization/tensorboard_minio.py

180 lines
6.5 KiB
Python

# Copyright 2021 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import json
from kfp.deprecated.onprem import use_k8s_secret
from kfp.deprecated import dsl, components
from kfp.deprecated.components import OutputPath, create_component_from_func
prepare_tensorboard = components.load_component_from_url(
'https://raw.githubusercontent.com/kubeflow/pipelines/1b107eb4bb2510ecb99fd5f4fb438cbf7c96a87a/components/contrib/tensorflow/tensorboard/prepare_tensorboard/component.yaml'
)
def train(minio_endpoint: str, log_bucket: str, log_dir: str):
# Reference: https://www.tensorflow.org/tensorboard/get_started
import tensorflow as tf
mnist = tf.keras.datasets.mnist
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0
def create_model():
return tf.keras.models.Sequential([
tf.keras.layers.Flatten(input_shape=(28, 28)),
tf.keras.layers.Dense(512, activation='relu'),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(10, activation='softmax')
])
model = create_model()
model.compile(
optimizer='adam',
loss='sparse_categorical_crossentropy',
metrics=['accuracy']
)
log_dir_local = "logs/fit"
tensorboard_callback = tf.keras.callbacks.TensorBoard(
log_dir=log_dir_local, histogram_freq=1
)
model.fit(
x=x_train,
y=y_train,
epochs=5,
validation_data=(x_test, y_test),
callbacks=[tensorboard_callback]
)
# Copy the local logs folder to minio.
#
# TODO: we may write a filesystem watch process that continuously copy logs
# dir to minio, so that we can watch live training logs via tensorboard.
#
# Note, although tensorflow supports minio via s3:// protocol. We want to
# demo how minio can be used instead, e.g. the same approach can be used with
# frameworks only support local path.
from minio import Minio
import os
minio_access_key = os.getenv('MINIO_ACCESS_KEY')
minio_secret_key = os.getenv('MINIO_SECRET_KEY')
if not minio_access_key or not minio_secret_key:
raise Exception('MINIO_ACCESS_KEY or MINIO_SECRET_KEY env is not set')
client = Minio(
minio_endpoint,
access_key=minio_access_key,
secret_key=minio_secret_key,
secure=False
)
count = 0
from pathlib import Path
for path in Path("logs").rglob("*"):
if not path.is_dir():
object_name = os.path.join(
log_dir, os.path.relpath(start=log_dir_local, path=path)
)
client.fput_object(
bucket_name=log_bucket,
object_name=object_name,
file_path=path,
)
count = count + 1
print(f'{path} uploaded to minio://{log_bucket}/{object_name}')
print(f'{count} log files uploaded to minio://{log_bucket}/{log_dir}')
# tensorflow/tensorflow:2.4 may fail with image pull backoff, because of dockerhub rate limiting.
train_op = create_component_from_func(
train,
base_image='gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest',
packages_to_install=['minio'], # TODO: pin minio version
)
@dsl.pipeline(name='pipeline-tensorboard-minio')
def my_pipeline(
minio_endpoint: str = 'minio-service:9000',
log_bucket: str = 'mlpipeline',
log_dir: str = f'tensorboard/logs/{dsl.RUN_ID_PLACEHOLDER}',
# Pin to tensorflow 2.3, because in 2.4+ tensorboard cannot load in KFP:
# refer to https://github.com/kubeflow/pipelines/issues/5521.
tf_image: str = 'gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest'
):
# tensorboard uses s3 protocol to access minio
prepare_tb_task = prepare_tensorboard(
log_dir_uri=f's3://{log_bucket}/{log_dir}',
image=tf_image,
pod_template_spec=json.dumps({
'spec': {
'containers': [{
# These env vars make tensorboard access KFP in-cluster minio
# using s3 protocol.
# Reference: https://blog.min.io/hyper-scale-machine-learning-with-minio-and-tensorflow/
'env': [{
'name': 'AWS_ACCESS_KEY_ID',
'valueFrom': {
'secretKeyRef': {
'name': 'mlpipeline-minio-artifact',
'key': 'accesskey'
}
}
}, {
'name': 'AWS_SECRET_ACCESS_KEY',
'valueFrom': {
'secretKeyRef': {
'name': 'mlpipeline-minio-artifact',
'key': 'secretkey'
}
}
}, {
'name': 'AWS_REGION',
'value': 'minio'
}, {
'name': 'S3_ENDPOINT',
'value': f'{minio_endpoint}',
}, {
'name': 'S3_USE_HTTPS',
'value': '0',
}, {
'name': 'S3_VERIFY_SSL',
'value': '0',
}]
}],
},
})
)
train_task = train_op(
minio_endpoint=minio_endpoint,
log_bucket=log_bucket,
log_dir=log_dir,
)
train_task.set_memory_request('2Gi').set_memory_limit('2Gi')
train_task.apply(
use_k8s_secret(
secret_name='mlpipeline-minio-artifact',
k8s_secret_key_to_env={
'secretkey': 'MINIO_SECRET_KEY',
'accesskey': 'MINIO_ACCESS_KEY'
},
)
)
# optional, let training task use the same tensorflow image as specified tensorboard
# this does not work in v2 compatible mode
# train_task.container.image = tf_image
train_task.after(prepare_tb_task)