Expose step id and step name (#1191)

* add step id and step name

* add step id and step name

* update python sdk version

* fix samples

* fix typo

* add env step id

* pass the name to next step

* change to wf name
This commit is contained in:
cheyang 2019-04-26 05:56:28 +08:00 committed by Kubernetes Prow Robot
parent b795a9a899
commit 8c8e5052df
8 changed files with 105 additions and 98 deletions

View File

@ -114,8 +114,6 @@ def _collect_metrics(name, job_type, metric_name):
return metric
def _get_job_status(name, job_type):
get_cmd = "arena get %s --type %s | grep -i STATUS:|awk -F: '{print $NF}'" % (name, job_type)
status = ""
@ -139,16 +137,12 @@ def _get_tensorboard_url(name, job_type):
return url
#
# Generate standalone job
def generate_job_command(args):
name = args.name
# Generate common options
def generate_options(args):
gpus = args.gpus
cpu = args.cpu
memory = args.memory
tensorboard = args.tensorboard
image = args.image
output_data = args.output_data
data = args.data
env = args.env
@ -157,45 +151,64 @@ def generate_job_command(args):
log_dir = args.log_dir
sync_source = args.sync_source
options = []
if gpus > 0:
options.extend(['--gpus', str(gpus)])
if cpu > 0:
options.extend(['--cpu', str(cpu)])
if memory >0:
options.extend(['--memory', str(memory)])
if tensorboard_image != "tensorflow/tensorflow:1.12.0":
options.extend(['--tensorboardImage', tensorboard_image])
if tensorboard:
options.append("--tensorboard")
if os.path.isdir(args.log_dir):
options.extend(['--logdir', args.log_dir])
else:
logging.info("skip log dir :{0}".format(args.log_dir))
if len(data) > 0:
for d in data:
options.append("--data={0}".format(d))
if len(env) > 0:
for e in env:
options.append("--env={0}".format(e))
if len(args.workflow_name) > 0:
options.append("--env=WORKFLOW_NAME={0}".format(args.workflow_name))
if len(args.step_name) > 0:
options.append("--env=STEP_NAME={0}".format(args.step_name))
if len(sync_source) > 0:
if not sync_source.endswith(".git"):
raise ValueError("sync_source must be an http git url")
options.extend(['--sync-mode','git'])
options.extend(['--sync-source',sync_source])
return options
# Generate standalone job
def generate_job_command(args):
name = args.name
image = args.image
commandArray = [
'arena', 'submit', 'tfjob',
'--name={0}'.format(name),
'--image={0}'.format(image),
]
if gpus > 0:
commandArray.extend(['--gpus', str(gpus)])
if cpu > 0:
commandArray.extend(['--cpu', str(cpu)])
if memory >0:
commandArray.extend(['--memory', str(memory)])
if tensorboard_image != "tensorflow/tensorflow:1.12.0":
commandArray.extend(['--tensorboardImage', tensorboard_image])
if tensorboard:
commandArray.append("--tensorboard")
if os.path.isdir(args.log_dir):
commandArray.append(['--logdir', args.log_dir])
else:
logging.info("skip log dir :{0}".format(args.log_dir))
if len(data) > 0:
for d in data:
commandArray.append("--data={0}".format(d))
if len(env) > 0:
for e in env:
commandArray.append("--env={0}".format(e))
if len(sync_source) > 0:
if not sync_source.endswith(".git"):
raise ValueError("sync_source must be an http git url")
commandArray.extend(['--sync-mode','git'])
commandArray.extend(['--sync-source',sync_source])
commandArray.extend(generate_options(args))
return commandArray, "tfjob"
@ -203,19 +216,7 @@ def generate_job_command(args):
def generate_mpjob_command(args):
name = args.name
workers = args.workers
gpus = args.gpus
cpu = args.cpu
memory = args.memory
tensorboard = args.tensorboard
image = args.image
output_data = args.output_data
data = args.data
env = args.env
tensorboard_image = args.tensorboard_image
tensorboard = str2bool(args.tensorboard)
rdma = str2bool(args.rdma)
log_dir = args.log_dir
sync_source = args.sync_source
commandArray = [
'arena', 'submit', 'mpijob',
@ -224,42 +225,10 @@ def generate_mpjob_command(args):
'--image={0}'.format(image),
]
if gpus > 0:
commandArray.extend(['--gpus', str(gpus)])
if cpu > 0:
commandArray.extend(['--cpu', str(cpu)])
if memory >0:
commandArray.extend(['--memory', str(memory)])
if tensorboard_image != "tensorflow/tensorflow:1.12.0":
commandArray.extend(['--tensorboardImage', tensorboard_image])
if tensorboard:
commandArray.append("--tensorboard")
if rdma:
commandArray.append("--rdma")
commandArray.append("--rdma")
if os.path.isdir(args.log_dir):
commandArray.append(['--logdir', args.log_dir])
else:
logging.info("skip log dir :{0}".format(args.log_dir))
if len(data) > 0:
for d in data:
commandArray.append("--data={0}".format(d))
if len(env) > 0:
for e in env:
commandArray.append("--env={0}".format(e))
if len(sync_source) > 0:
if not sync_source.endswith(".git"):
raise ValueError("sync_source must be an http git url")
commandArray.extend(['--sync-mode','git'])
commandArray.extend(['--sync-source',sync_source])
commandArray.extend(generate_options(args))
return commandArray, "mpijob"
@ -297,6 +266,9 @@ def main(argv=None):
parser.add_argument('--metric', action='append', type=str, default=[])
parser.add_argument('--sync-source', type=str, default='')
parser.add_argument('--workflow-name', type=str, default='')
parser.add_argument('--step-name', type=str, default='')
subparsers = parser.add_subparsers(help='arena sub-command help')
#create the parser for the 'mpijob' command
@ -407,6 +379,14 @@ def main(argv=None):
with open('/output.txt', 'w') as f:
f.write(output)
with open('/workflow-name.txt', 'w') as f:
f.write(args.workflow_name)
with open('/step-name.txt', 'w') as f:
f.write(args.step_name)
with open('/name.txt', 'w') as f:
f.write(args.name)
if __name__== "__main__":
main()

View File

@ -62,7 +62,7 @@ def parameter_servers_op(name, image, command, env, data, sync_source, annotatio
tensorboard,
worker_port, ps_port,
metrics=['Train-accuracy:PERCENTAGE'],
arena_image='cheyang/arena_launcher:v0.3',
arena_image='cheyang/arena_launcher:v0.4',
timeout_hours=240):
"""This function submits Distributed TFJob in Parameter Servers mode.
@ -123,8 +123,13 @@ def distributed_tf_op(name, image, command, env=[], data=[], sync_source=None,
"--timeout-hours", timeout_hours,
"--metric-name", metric_name,
"--metric-unit", metric_unit,
"--step-name", '{{pod.name}}',
"--workflow-name", '{{workflow.name}}',
"tfjob",
"--workers", workers,
"--", command],
file_outputs={'train': '/output.txt'}
file_outputs={'train': '/output.txt',
'workflow':'/workflow-name.txt',
'step':'/step-name.txt',
'name':'/name.txt'}
)

View File

@ -23,7 +23,7 @@ def mpi_job_op(name, image, command, workers=1, gpus=0, cpu=0, memory=0, env=[],
rdma=False,
tensorboard=False, tensorboard_image=None,
metrics=['Train-accuracy:PERCENTAGE'],
arenaImage='cheyang/arena_launcher:v0.3',
arenaImage='cheyang/arena_launcher:v0.4',
timeout_hours=240):
"""This function submits MPI Job, it can run Allreduce-style Distributed Training.
@ -74,13 +74,18 @@ def mpi_job_op(name, image, command, workers=1, gpus=0, cpu=0, memory=0, env=[],
"--gpus", str(gpus),
"--cpu", str(cpu),
"--memory", str(memory),
"--step-name", '{{pod.name}}',
"--workflow-name", '{{workflow.name}}',
"--workers", str(workers),
"--timeout-hours", str(timeout_hours),
] + options +
[
"mpijob",
"--", str(command)],
file_outputs={'train': '/output.txt'}
file_outputs={'train': '/output.txt',
'workflow':'/workflow-name.txt',
'step':'/step-name.txt',
'name':'/name.txt'}
)
op.set_image_pull_policy('Always')
return op

View File

@ -23,7 +23,7 @@ def standalone_job_op(name, image, command, gpus=0, cpu=0, memory=0, env=[],
tensorboard=False, tensorboard_image=None,
data=[], sync_source=None, annotations=[],
metrics=['Train-accuracy:PERCENTAGE'],
arena_image='cheyang/arena_launcher:v0.3',
arena_image='cheyang/arena_launcher:v0.4',
timeout_hours=240):
"""This function submits a standalone training Job
@ -73,13 +73,18 @@ def standalone_job_op(name, image, command, gpus=0, cpu=0, memory=0, env=[],
"--image", str(image),
"--gpus", str(gpus),
"--cpu", str(cpu),
"--step-name", '{{pod.name}}',
"--workflow-name", '{{workflow.name}}',
"--memory", str(memory),
"--timeout-hours", str(timeout_hours),
] + options +
[
"job",
"--", str(command)],
file_outputs={'train': '/output.txt'}
file_outputs={'train': '/output.txt',
'workflow':'/workflow-name.txt',
'step':'/step-name.txt',
'name':'/name.txt'}
)
op.set_image_pull_policy('Always')
return op

View File

@ -5,7 +5,7 @@ get_abs_filename() {
echo "$(cd "$(dirname "$1")" && pwd)/$(basename "$1")"
}
target_archive_file=${1:-kfp-arena-0.3.tar.gz}
target_archive_file=${1:-kfp-arena-0.4.tar.gz}
target_archive_file=$(get_abs_filename "$target_archive_file")
DIR=$(mktemp -d)

View File

@ -2,7 +2,7 @@ from setuptools import setup
NAME = 'kfp-arena'
VERSION = '0.3'
VERSION = '0.4'
REQUIRES = ['kfp >= 0.1']

View File

@ -73,7 +73,7 @@ spec:
First, install the necessary Python Packages
```shell
pip3 install http://kubeflow.oss-cn-beijing.aliyuncs.com/kfp/0.1.14/kfp.tar.gz --upgrade
pip3 install http://kubeflow.oss-cn-beijing.aliyuncs.com/kfp-arena/kfp-arena-0.3.tar.gz --upgrade
pip3 install http://kubeflow.oss-cn-beijing.aliyuncs.com/kfp-arena/kfp-arena-0.4.tar.gz --upgrade
```
Then run [standalone_pipeline.py](standalone_pipeline.py) with different parameters.

View File

@ -37,7 +37,15 @@ def sample_pipeline(learning_rate='0.01',
env=["GIT_SYNC_REV=%s" % (commit)],
gpus=gpus,
data=data,
command="echo %s;python code/tensorflow-sample-code/tfjob/docker/mnist/main.py --max_steps 500 --data_dir /training/dataset/mnist --log_dir /training/output/mnist --learning_rate %s --dropout %s" % (prepare_data.output, learning_rate, dropout),
command='''echo prepare_step_name=%s and prepare_wf_name=%s && \
python code/tensorflow-sample-code/tfjob/docker/mnist/main.py --max_steps 500 \
--data_dir /training/dataset/mnist \
--log_dir /training/output/mnist \
--learning_rate %s --dropout %s''' % (
prepare_data.outputs['step'],
prepare_data.outputs['workflow'],
learning_rate,
dropout),
metrics=["Train-accuracy:PERCENTAGE"])
# 3. export the model
export_model = arena.standalone_job_op(
@ -46,7 +54,11 @@ def sample_pipeline(learning_rate='0.01',
sync_source="https://code.aliyun.com/xiaozhou/tensorflow-sample-code.git",
env=["GIT_SYNC_REV=%s" % (commit)],
data=data,
command="echo %s;python code/tensorflow-sample-code/tfjob/docker/mnist/export_model.py --model_version=%s --checkpoint_path=/training/output/mnist /training/output/models" % (train.output, model_version))
command="echo train_step_name=%s and train_wf_name=%s && \
python code/tensorflow-sample-code/tfjob/docker/mnist/export_model.py \
--model_version=%s \
--checkpoint_path=/training/output/mnist \
/training/output/models" % (train.outputs['step'], train.outputs['workflow'], model_version))
if __name__ == '__main__':
parser = argparse.ArgumentParser()