88 lines
3.0 KiB
Python
88 lines
3.0 KiB
Python
# Copyright 2018 The Kubeflow Authors
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License
|
|
|
|
import os
|
|
import re
|
|
import subprocess
|
|
|
|
from minio import Minio
|
|
from junit_xml import TestSuite, TestCase
|
|
|
|
# Parse the workflow json to obtain the artifacts for a particular step.
|
|
# Note: the step_name could be the key words.
|
|
def get_artifact_in_minio(workflow_json, step_name, output_path, artifact_name='mlpipeline-ui-metadata'):
|
|
s3_data = {}
|
|
minio_access_key = 'minio'
|
|
minio_secret_key = 'minio123'
|
|
try:
|
|
for node in workflow_json['status']['nodes'].values():
|
|
if step_name in node['name']:
|
|
for artifact in node['outputs']['artifacts']:
|
|
if artifact['name'] == artifact_name:
|
|
s3_data = artifact['s3']
|
|
minio_client = Minio(s3_data['endpoint'], access_key=minio_access_key, secret_key=minio_secret_key, secure=False)
|
|
data = minio_client.get_object(s3_data['bucket'], s3_data['key'])
|
|
with open(output_path, 'wb') as file:
|
|
for d in data.stream(32*1024):
|
|
file.write(d)
|
|
except Exception as e:
|
|
print('error in get_artifact_in_minio: %s', e)
|
|
print(workflow_json)
|
|
|
|
# Junit xml utilities
|
|
def add_junit_test(test_cases, testname, succ, message='default message', elapsed_sec=0):
|
|
test_case = TestCase(testname, elapsed_sec = elapsed_sec)
|
|
if not succ:
|
|
test_case.add_failure_info(message)
|
|
test_cases.append(test_case)
|
|
|
|
def write_junit_xml(testname, filename, test_cases):
|
|
with open(filename, 'w') as f:
|
|
ts = TestSuite(testname, test_cases)
|
|
TestSuite.to_file(f, [ts], prettyprint=False)
|
|
|
|
# Bash utilities
|
|
def run_bash_command(cmd):
|
|
process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE)
|
|
output_bytes, error_bytes = process.communicate()
|
|
output_string = ''
|
|
error_string = ''
|
|
if output_bytes != None:
|
|
output_string = output_bytes.decode('utf-8')
|
|
if error_bytes != None:
|
|
error_string = error_bytes.decode('utf-8')
|
|
return output_string, error_string
|
|
|
|
|
|
def file_injection(file_in, tmp_file_out, subs):
|
|
"""Utility function that substitute several regex within a file by
|
|
corresponding string.
|
|
|
|
:param file_in: input file name.
|
|
:param tmp_file_out: tmp output file name.
|
|
:param subs: dict, key is the regex expr, value is the substituting string.
|
|
"""
|
|
with open(file_in, 'rt') as fin:
|
|
with open(tmp_file_out, 'wt') as fout:
|
|
for line in fin:
|
|
tmp_line = line
|
|
for old, new in subs.items():
|
|
regex = re.compile(old)
|
|
tmp_line = re.sub(regex, new, line)
|
|
line = tmp_line
|
|
|
|
fout.write(tmp_line)
|
|
|
|
os.rename(tmp_file_out, file_in)
|