53 lines
1.9 KiB
Python
53 lines
1.9 KiB
Python
import utils
|
|
import os
|
|
|
|
from minio import Minio
|
|
|
|
|
|
def get_artifact_in_minio(workflow_json, step_name, artifact_name, output_dir):
|
|
"""Minio is the S3 style object storage server for K8s. This method parses
|
|
a pipeline run's workflow json to fetch the output artifact location in
|
|
Minio server for given step in the pipeline and downloads it.
|
|
|
|
There are two types of nodes in the workflow_json: DAG and pod. DAG
|
|
corresonds to the whole pipeline and pod corresponds to a step in
|
|
the DAG. Check `node["type"] != "DAG"` deals with case where name of
|
|
component is part of the pipeline name
|
|
"""
|
|
|
|
s3_data = {}
|
|
minio_access_key = "minio"
|
|
minio_secret_key = "minio123"
|
|
minio_port = utils.get_minio_service_port()
|
|
for node in workflow_json["status"]["nodes"].values():
|
|
if step_name in node["name"] and node["type"] != "DAG":
|
|
for artifact in node["outputs"]["artifacts"]:
|
|
if artifact["name"] == artifact_name:
|
|
s3_data = artifact["s3"]
|
|
s3_bucket = workflow_json["status"]["artifactRepositoryRef"]["artifactRepository"][
|
|
"s3"
|
|
]["bucket"]
|
|
minio_client = Minio(
|
|
"localhost:{}".format(minio_port),
|
|
access_key=minio_access_key,
|
|
secret_key=minio_secret_key,
|
|
secure=False,
|
|
)
|
|
output_file = os.path.join(output_dir, artifact_name + ".tgz")
|
|
minio_client.fget_object(s3_bucket, s3_data["key"], output_file)
|
|
# https://docs.min.io/docs/python-client-api-reference.html#fget_object
|
|
|
|
return output_file
|
|
|
|
|
|
def artifact_download_iterator(workflow_json, outputs_dict, output_dir):
|
|
output_files = {}
|
|
for step_name, artifacts in outputs_dict.items():
|
|
output_files[step_name] = {}
|
|
for artifact in artifacts:
|
|
output_files[step_name][artifact] = get_artifact_in_minio(
|
|
workflow_json, step_name, step_name + "-" + artifact, output_dir
|
|
)
|
|
|
|
return output_files
|