pipelines/test/sample-test/utils.py

88 lines
3.0 KiB
Python

# Copyright 2018 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License
import os
import re
import subprocess
from minio import Minio
from junit_xml import TestSuite, TestCase
# Parse the workflow json to obtain the artifacts for a particular step.
# Note: the step_name could be the key words.
def get_artifact_in_minio(workflow_json, step_name, output_path, artifact_name='mlpipeline-ui-metadata'):
s3_data = {}
minio_access_key = 'minio'
minio_secret_key = 'minio123'
try:
for node in workflow_json['status']['nodes'].values():
if step_name in node['name']:
for artifact in node['outputs']['artifacts']:
if artifact['name'] == artifact_name:
s3_data = artifact['s3']
minio_client = Minio(s3_data['endpoint'], access_key=minio_access_key, secret_key=minio_secret_key, secure=False)
data = minio_client.get_object(s3_data['bucket'], s3_data['key'])
with open(output_path, 'wb') as file:
for d in data.stream(32*1024):
file.write(d)
except Exception as e:
print('error in get_artifact_in_minio: %s', e)
print(workflow_json)
# Junit xml utilities
def add_junit_test(test_cases, testname, succ, message='default message', elapsed_sec=0):
test_case = TestCase(testname, elapsed_sec = elapsed_sec)
if not succ:
test_case.add_failure_info(message)
test_cases.append(test_case)
def write_junit_xml(testname, filename, test_cases):
with open(filename, 'w') as f:
ts = TestSuite(testname, test_cases)
TestSuite.to_file(f, [ts], prettyprint=False)
# Bash utilities
def run_bash_command(cmd):
process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE)
output_bytes, error_bytes = process.communicate()
output_string = ''
error_string = ''
if output_bytes != None:
output_string = output_bytes.decode('utf-8')
if error_bytes != None:
error_string = error_bytes.decode('utf-8')
return output_string, error_string
def file_injection(file_in, tmp_file_out, subs):
"""Utility function that substitute several regex within a file by
corresponding string.
:param file_in: input file name.
:param tmp_file_out: tmp output file name.
:param subs: dict, key is the regex expr, value is the substituting string.
"""
with open(file_in, 'rt') as fin:
with open(tmp_file_out, 'wt') as fout:
for line in fin:
tmp_line = line
for old, new in subs.items():
regex = re.compile(old)
tmp_line = re.sub(regex, new, line)
line = tmp_line
fout.write(tmp_line)
os.rename(tmp_file_out, file_in)