Add AWS EMR and Athena components (#1286)

* Add EMR and Athena components

* Update components/aws/athena/query/component.yaml

Co-Authored-By: Jeffwan <seedjeffwan@gmail.com>

* Clean up athena components
This commit is contained in:
Jiaxin Shan 2019-05-06 17:11:44 -07:00 committed by Kubernetes Prow Robot
parent 892e3082b7
commit f5c464a87d
16 changed files with 803 additions and 0 deletions

View File

@ -0,0 +1,26 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
FROM ubuntu:16.04
RUN apt-get update -y && apt-get install --no-install-recommends -y -q ca-certificates python-dev python-setuptools wget unzip
RUN easy_install pip
RUN pip install boto3==1.9.130 pathlib2
COPY query/src/query.py .
ENV PYTHONPATH /app
ENTRYPOINT [ "bash" ]

View File

@ -0,0 +1,36 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
name: Athena Query
description: |
A Kubeflow Pipeline component to submit a query to Amazon Web Services Athena
service and dump outputs to AWS S3.
inputs:
- {name: region, description: 'The Athena region in which to handle the request.'}
- {name: database, description: 'The name of the database.'}
- {name: query, description: 'The SQL query statements to be executed in Athena.'}
- {name: output_path, description: 'The path to the Amazon S3 location where logs for this cluster are stored.'}
outputs:
- {name: output_path, description: 'The path to the S3 bucket containing the query output in CSV format.'}
implementation:
container:
image: seedjeffwan/kubeflow-pipeline-aws-athena:20190501
command: ['python', 'query.py']
args: [
--region, {inputValue: region},
--database, {inputValue: database},
--query, {inputValue: query},
--output, {inputValue: output_path}
]
fileOutputs:
output_path: /output.txt

View File

@ -0,0 +1,92 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import json
import logging
import time
import re
import boto3
def get_client(region=None):
"""Builds a client to the AWS Athena API."""
client = boto3.client('athena', region_name=region)
return client
def query(client, query, database, output):
response = client.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': database
},
ResultConfiguration={
'OutputLocation': output,
}
)
execution_id = response['QueryExecutionId']
logging.info('Execution ID: %s', execution_id)
# Athena query is aync call, we need to fetch results and wait for execution
state = 'RUNNING'
max_execution = 5 # TODO: this should be an optional parameter from users. or use timeout
while (max_execution > 0 and state in ['RUNNING']):
max_execution = max_execution - 1
response = client.get_query_execution(QueryExecutionId = execution_id)
if 'QueryExecution' in response and \
'Status' in response['QueryExecution'] and \
'State' in response['QueryExecution']['Status']:
state = response['QueryExecution']['Status']['State']
if state == 'FAILED':
raise Exception('Athena Query Failed')
elif state == 'SUCCEEDED':
s3_path = response['QueryExecution']['ResultConfiguration']['OutputLocation']
# could be multiple files?
filename = re.findall('.*\/(.*)', s3_path)[0]
logging.info("S3 output file name %s", filename)
break
time.sleep(5)
# TODO:(@Jeffwan) Add more details.
result = {
'total_bytes_processed': response['QueryExecution']['Statistics']['DataScannedInBytes'],
'filename': filename
}
return result
def main():
logging.getLogger().setLevel(logging.INFO)
parser = argparse.ArgumentParser()
parser.add_argument('--region', type=str, help='Athena region.')
parser.add_argument('--database', type=str, required=True, help='The name of the database.')
parser.add_argument('--query', type=str, required=True, help='The SQL query statements to be executed in Athena.')
parser.add_argument('--output', type=str, required=False,
help='The location in Amazon S3 where your query results are stored, such as s3://path/to/query/bucket/')
args = parser.parse_args()
client = get_client(args.region)
results = query(client, args.query, args.database, args.output)
results['output'] = args.output
logging.info('Athena results: %s', results)
with open('/output.txt', 'w+') as f:
json.dump(results, f)
if __name__ == '__main__':
main()

View File

@ -0,0 +1,30 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
FROM ubuntu:16.04
RUN apt-get update -y && apt-get install --no-install-recommends -y -q ca-certificates python-dev python-setuptools wget unzip
RUN easy_install pip
RUN pip install boto3==1.9.130 pathlib2
COPY create_cluster/src/create_cluster.py .
COPY delete_cluster/src/delete_cluster.py .
COPY submit_pyspark_job/src/submit_pyspark_job.py .
COPY submit_spark_job/src/submit_spark_job.py .
COPY common /app/common/
ENV PYTHONPATH /app
ENTRYPOINT [ "bash" ]

View File

View File

@ -0,0 +1,147 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import os
import subprocess
import time
import boto3
from botocore.exceptions import ClientError
import json
def get_client(region=None):
"""Builds a client to the AWS EMR API."""
client = boto3.client('emr', region_name=region)
return client
def create_cluster(client, cluster_name, log_s3_uri, release_label, instance_type, instance_count):
"""Create a EMR cluster."""
response = client.run_job_flow(
Name=cluster_name,
LogUri=log_s3_uri,
ReleaseLabel=release_label,
Applications=[
{
'Name': 'Spark'
}
],
BootstrapActions=[
{
'Name': 'Maximize Spark Default Config',
'ScriptBootstrapAction': {
'Path': 's3://support.elasticmapreduce/spark/maximize-spark-default-config',
}
},
],
Instances= {
'MasterInstanceType': instance_type,
'SlaveInstanceType': instance_type,
'InstanceCount': instance_count,
'KeepJobFlowAliveWhenNoSteps':True,
'TerminationProtected':False,
},
VisibleToAllUsers=True,
JobFlowRole='EMR_EC2_DefaultRole',
ServiceRole='EMR_DefaultRole'
)
return response
def delete_cluster(client, jobflow_id):
"""Delete a EMR cluster. Cluster shutdowns in background"""
client.terminate_job_flows(JobFlowIds=[jobflow_id])
def wait_for_cluster(client, jobflow_id):
"""Waiting for a new cluster to be ready."""
while True:
response = client.describe_cluster(ClusterId=jobflow_id)
cluster_status = response['Cluster']['Status']
state = cluster_status['State']
if 'Message' in cluster_status['StateChangeReason']:
state = cluster_status['State']
message = cluster_status['StateChangeReason']['Message']
if state in ['TERMINATED', 'TERMINATED', 'TERMINATED_WITH_ERRORS']:
raise Exception(message)
if state == 'WAITING':
print('EMR cluster create completed')
break
print("Cluster state: {}, wait 15s for cluster to start up.".format(state))
time.sleep(15)
# Check following documentation to add other job type steps. Seems python SDK only have 'HadoopJarStep' here.
# https://docs.aws.amazon.com/cli/latest/reference/emr/add-steps.html
def submit_spark_job(client, jobflow_id, job_name, jar_path, main_class, extra_args):
"""Submits single spark job to a running cluster"""
spark_job = {
'Name': job_name,
'ActionOnFailure': 'CONTINUE',
'HadoopJarStep': {
'Jar': 'command-runner.jar'
}
}
spark_args = ['spark-submit', "--deploy-mode", "cluster"]
if main_class:
spark_args.extend(['--class', main_class])
spark_args.extend([jar_path])
spark_args.extend(extra_args)
spark_job['HadoopJarStep']['Args'] = spark_args
try:
response = client.add_job_flow_steps(
JobFlowId=jobflow_id,
Steps=[spark_job],
)
except ClientError as e:
print(e.response['Error']['Message'])
exit(1)
step_id = response['StepIds'][0]
print("Step Id {} has been submitted".format(step_id))
return step_id
def wait_for_job(client, jobflow_id, step_id):
"""Waiting for a cluster step by polling it."""
while True:
result = client.describe_step(ClusterId=jobflow_id, StepId=step_id)
step_status = result['Step']['Status']
state = step_status['State']
if state in ('CANCELLED', 'FAILED', 'INTERRUPTED'):
err_msg = 'UNKNOWN'
if 'FailureDetails' in step_status:
err_msg = step_status['FailureDetails']
raise Exception(err_msg)
elif state == 'COMPLETED':
print('EMR Step finishes')
break
print("Step state: {}, wait 15s for step status update.".format(state))
time.sleep(10)
def submit_pyspark_job(client, jobflow_id, job_name, py_file, extra_args):
"""Submits single spark job to a running cluster"""
pyspark_args = ['spark-submit', py_file]
pyspark_args.extend(extra_args)
return submit_spark_job(client, jobflow_id, job_name, 'command-runner.jar', '', pyspark_args)

View File

@ -0,0 +1,38 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
name: emr_create_cluster
description: |
Creates an Elastic Map Reduce (EMR) cluster in sepcific region.
inputs:
- {name: region, description: 'The EMR region in which to handle the request.'}
- {name: name, description: 'The EMR cluster name. Cluster names within a region must be unique. Names of deleted clusters can be reused'}
- {name: release_label, description: 'The EMR version.', default: 'emr-5.23.0'}
- {name: log_s3_uri, description: 'The path to the Amazon S3 location where logs for this cluster are stored.'}
- {name: instance_type, description: 'The EC2 instance type of master, the core and task nodes.', default: 'm4.xlarge'}
- {name: instance_count, description: 'The number of EC2 instances in the cluster.', default: '3'}
outputs:
- {name: cluster_name, description: 'The cluster name of the created cluster.'}
implementation:
container:
image: seedjeffwan/kubeflow-pipeline-aws-emr:20190507
command: ['python', 'create_cluster.py']
args: [
--region, {inputValue: region},
--name, {inputValue: name},
--release_label, {inputValue: release_label},
--log_s3_uri, {inputValue: log_s3_uri},
--instance_type, {inputValue: instance_type},
--instance_count, {inputValue: instance_count}
]
fileOutputs:
cluster_name: /output.txt

View File

@ -0,0 +1,43 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import os
import logging
from pathlib2 import Path
from common import _utils
def main(argv=None):
parser = argparse.ArgumentParser(description='Create EMR Cluster')
parser.add_argument('--region', type=str, help='EMR Cluster region.')
parser.add_argument('--name', type=str, help='The name of the cluster to create.')
parser.add_argument('--release_label', type=str, default="emr-5.23.0" ,help='The Amazon EMR release label, which determines the version of open-source application packages installed on the cluster.')
parser.add_argument('--log_s3_uri', type=str, help='The path to the Amazon S3 location where logs for this cluster are stored.')
parser.add_argument('--instance_type', type=str, default="m4.xlarge", help='The EC2 instance type of master, the core and task nodes.')
parser.add_argument('--instance_count', type=int, default=3, help='The number of EC2 instances in the cluster.')
parser.add_argument('--output_location_file', type=str, help='File path where the program will write the Amazon S3 URI of the transform job results.')
args = parser.parse_args()
logging.getLogger().setLevel(logging.INFO)
client = _utils.get_client(args.region)
logging.info('Creating cluster...')
create_response = _utils.create_cluster(client, args.name, args.log_s3_uri, args.release_label, args.instance_type, args.instance_count)
logging.info('Cluster creation request submitted. Waiting for completion...')
_utils.wait_for_cluster(client, create_response['JobFlowId'])
Path('/output.txt').write_text(unicode(create_response['JobFlowId']))
logging.info('Cluster created.')
if __name__== "__main__":
main()

View File

@ -0,0 +1,28 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
name: emr_delete_cluster
description: |
Deletes an Elastic Map Reduce (EMR) cluster.
inputs:
- {name: region, description: 'The EMR region in which to handle the request.'}
- {name: jobflow_id, description: 'The cluster id to delete.'}
- {name: dependent, description: 'Dependent to defer EMR cluster termination. This is only used to generate DAG.'}
implementation:
container:
image: seedjeffwan/kubeflow-pipeline-aws-emr:20190507
command: ['python', 'delete_cluster.py']
args: [
--region, {inputValue: region},
--jobflow_id, {inputValue: jobflow_id},
--job_id, {inputValue: dependent}
]

View File

@ -0,0 +1,32 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import logging
from common import _utils
def main(argv=None):
parser = argparse.ArgumentParser(description='Shutdown EMR cluster')
parser.add_argument('--region', type=str, help='The region where the cluster launches.')
parser.add_argument('--jobflow_id', type=str, help='Job flows to be shutdown.')
parser.add_argument('--job_id', type=str, help='Job id before cluster termination.')
args = parser.parse_args()
logging.getLogger().setLevel(logging.INFO)
client = _utils.get_client(args.region)
logging.info('Tearing down cluster...')
_utils.delete_cluster(client, args.jobflow_id)
logging.info('Cluster deletion request submitted. Cluster will be shut down in the background')
if __name__== "__main__":
main()

View File

@ -0,0 +1,40 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
name: emr_submit_pyspark_job
description: >-
Submits an Elastic Map Reduce (EMR) PySpark application.
inputs:
- {name: region, description: 'The EMR region in which to handle the request.'}
- {name: jobflow_id, description: 'The cluster id to run the job.'}
- {name: job_name, description: 'The name of the spark job.'}
- {name: py_file, description: 'A path to a pyspark file run during the step.'}
- {name: input, description: 'File path of the dataset.'}
- {name: output, description: 'Output path of the result files.'}
outputs:
- {name: job_id, description: 'The id of the created job.'}
- {name: output_location, description: 'S3 URI of the training job results.'}
implementation:
container:
image: seedjeffwan/kubeflow-pipeline-aws-emr:20190507
command: ['python', 'submit_pyspark_job.py']
args: [
--region, {inputValue: region},
--jobflow_id, {inputValue: jobflow_id},
--job_name, {inputValue: job_name},
--py_file, {inputValue: py_file},
--input, {inputValue: input},
--output, {inputValue: output},
--output_file, {outputPath: output_location},
]
fileOutputs:
job_id: /output.txt

View File

@ -0,0 +1,60 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# A program to perform training through a EMR cluster.
# Usage:
# python train.py \
# --region us-west-2 \
# --jobflow_id j-xsdsadsadsa \
# --job_name traing_job \
# --jar_path s3://kubeflow-pipeline/jars/py_workcount.py\
# --main_class org.apache.spark.examples.JavaWordCount \
# --input s3://kubeflow-pipeline/datasets/words.txt \
# --output s3://kubeflow-pipeline/datasets/output/ \
import argparse
import logging
import random
from datetime import datetime
from pathlib2 import Path
from common import _utils
def main(argv=None):
parser = argparse.ArgumentParser(description='Submit PySpark Job')
parser.add_argument('--region', type=str, help='The region where the cluster launches.')
parser.add_argument('--jobflow_id', type=str, help='The name of the cluster to run job.')
parser.add_argument('--job_name', type=str, help='The name of spark job.')
parser.add_argument('--py_file', type=str, help='A path to a pyspark file run during the step')
parser.add_argument('--input', type=str, help='File path of the dataset.')
parser.add_argument('--output', type=str, help='Output path of the result files.')
parser.add_argument('--output_file', type=str, help='S3 URI of the training job results.')
args = parser.parse_args()
logging.getLogger().setLevel(logging.INFO)
client = _utils.get_client(args.region)
logging.info('Submitting job to %s...', args.jobflow_id)
spark_args = [args.input, args.output]
step_id = _utils.submit_pyspark_job(
client, args.jobflow_id, args.job_name, args.py_file, spark_args)
logging.info('Job request submitted. Waiting for completion...')
_utils.wait_for_job(client, args.jobflow_id, step_id)
Path('/output.txt').write_text(unicode(args.step_id))
Path(args.output_file).parent.mkdir(parents=True, exist_ok=True)
Path(args.output_file).write_text(unicode(args.output))
logging.info('Job completed.')
if __name__== "__main__":
main()

View File

@ -0,0 +1,42 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
name: emr_submit_spark_job
description: >-
Submits an Elastic Map Reduce (EMR) Spark applications.
inputs:
- {name: region, description: 'The EMR region in which to handle the request.'}
- {name: jobflow_id, description: 'The cluster id to run the job.'}
- {name: job_name, description: The name of the spark job.}
- {name: jar_path, description: 'A path to a JAR file run during the step.'}
- {name: main_class, default: '', description: 'The name of the main class in the specified Java file. If not specified, the JAR file should specify a Main-Class in its manifest file.'}
- {name: input, description: 'File path of the dataset.'}
- {name: output, description: 'Output path of the result files.'}
outputs:
- {name: job_id, description: 'The id of the created EMR step.'}
- {name: output_location, description: 'S3 URI of the training job results.'}
implementation:
container:
image: seedjeffwan/kubeflow-pipeline-aws-emr:20190507
command: ['python', 'submit_spark_job.py']
args: [
--region, {inputValue: region},
--jobflow_id, {inputValue: jobflow_id},
--job_name, {inputValue: job_name},
--jar_path, {inputValue: jar_path},
--main_class, {inputValue: main_class},
--input, {inputValue: input},
--output, {inputValue: output},
--output_file, {outputPath: output_location},
]
fileOutputs:
job_id: /output.txt

View File

@ -0,0 +1,62 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# A program to perform training through a EMR cluster.
# Usage:
# python train.py \
# --region us-west-2 \
# --jobflow_id j-xsdsadsadsa \
# --job_name traing_job \
# --jar_path s3://kubeflow-pipeline/jars/spark-examples_2.11-2.4.1.jar \
# --main_class org.apache.spark.examples.JavaWordCount \
# --input s3://kubeflow-pipeline/datasets/words.txt \
# --output '' \
import argparse
import logging
import random
from datetime import datetime
from pathlib2 import Path
from common import _utils
def main(argv=None):
parser = argparse.ArgumentParser(description='Submit Spark Job')
parser.add_argument('--region', type=str, help='The region where the cluster launches.')
parser.add_argument('--jobflow_id', type=str, help='The name of the cluster to run job.')
parser.add_argument('--job_name', type=str, help='The name of spark job.')
parser.add_argument('--jar_path', type=str, help='A path to a JAR file run during the step')
parser.add_argument('--main_class', type=str, default=None,
help='The name of the main class in the specified Java file. If not specified, the JAR file should specify a Main-Class in its manifest file.')
parser.add_argument('--input', type=str, help='File path of the dataset.')
parser.add_argument('--output', type=str, help='Output path of the result files')
parser.add_argument('--output_file', type=str, help='S3 URI of the training job results.')
args = parser.parse_args()
logging.getLogger().setLevel(logging.INFO)
client = _utils.get_client(args.region)
logging.info('Submitting job...')
spark_args = [args.input, args.output]
step_id = _utils.submit_spark_job(
client, args.jobflow_id, args.job_name, args.jar_path, args.main_class, spark_args)
logging.info('Job request submitted. Waiting for completion...')
_utils.wait_for_job(client, args.jobflow_id, step_id)
Path('/output.txt').write_text(unicode(args.step_id))
Path(args.output_file).parent.mkdir(parents=True, exist_ok=True)
Path(args.output_file).write_text(unicode(args.output))
logging.info('Job completed.')
if __name__== "__main__":
main()

View File

@ -0,0 +1,73 @@
The `titanic-survival-prediction.py` sample runs a Spark ML pipeline to train a classfication model using random forest on AWS Elastic Map Reduce(EMR).
## The dataset
Check Kaggle [Titanic: Machine Learning from Disaster](https://www.kaggle.com/c/titanic) for more details about this problem. 70% training dataset is used to train model and rest 30% for validation.
Please upload training dataset [train.csv](https://www.kaggle.com/c/titanic/data) to your s3 bucket.
## Spark ML Job
Please check [aws-emr-titanic-ml-example](https://github.com/Jeffwan/aws-emr-titanic-ml-example) for example spark project.
To get jar file, you can clone that project and run
```
sbt clean package
# copy this jar to your s3 bucket. main class is `com.amazonaws.emr.titanic.Titanic`
ls target/scala-2.11/titanic-survivors-prediction_2.11-1.0.jar
```
## EMR permission
This pipeline use aws-secret to get access to EMR services, please make sure you have a `aws-secret` in the kubeflow namespace and attach `AmazonElasticMapReduceFullAccess` policy.
```yaml
apiVersion: v1
kind: Secret
metadata:
name: aws-secret
namespace: kubeflow
type: Opaque
data:
AWS_ACCESS_KEY_ID: YOUR_BASE64_ACCESS_KEY
AWS_SECRET_ACCESS_KEY: YOUR_BASE64_SECRET_ACCESS
```
> Note: To get base64 string, try `echo -n $AWS_ACCESS_KEY_ID | base64`
## Compiling the pipeline template
Follow the guide to [building a pipeline](https://www.kubeflow.org/docs/guides/pipelines/build-pipeline/) to install the Kubeflow Pipelines SDK, then run the following command to compile the sample Python into a workflow specification. The specification takes the form of a YAML file compressed into a `.tar.gz` file.
```bash
dsl-compile --py titanic-survival-prediction.py --output titanic-survival-prediction.tar.gz
```
## Deploying the pipeline
Open the Kubeflow pipelines UI. Create a new pipeline, and then upload the compiled specification (`.tar.gz` file) as a new pipeline template.
Once the pipeline done, you can go to the S3 path specified in `output` to check your prediction results. There're three columes, `PassengerId`, `prediction`, `Survived` (Ground True value)
```
...
4,1,1
5,0,0
6,0,0
7,0,0
...
```
## Components source
Create Cluster:
[source code](https://github.com/kubeflow/pipelines/tree/master/components/aws/emr/create_cluster/src)
Submit Spark Job:
[source code](https://github.com/kubeflow/pipelines/tree/master/components/aws/emr/submit_spark_job/src)
Delete Cluster:
[source code](https://github.com/kubeflow/pipelines/tree/master/components/aws/emr/delete_cluster/src)

View File

@ -0,0 +1,54 @@
#!/usr/bin/env python3
import kfp
from kfp import components
from kfp import dsl
from kfp import gcp
from kfp.aws import use_aws_secret
emr_create_cluster_op = components.load_component_from_file('../../../components/aws/emr/create_cluster/component.yaml')
emr_submit_spark_job_op = components.load_component_from_file('../../../components/aws/emr/submit_spark_job/component.yaml')
emr_delete_cluster_op = components.load_component_from_file('../../../components/aws/emr/delete_cluster/component.yaml')
@dsl.pipeline(
name='Titanic Suvival Prediction Pipeline',
description='Predict survival on the Titanic'
)
def titanic_suvival_prediction(region='us-west-2',
log_s3_uri="s3://kubeflow-pipeline-data/emr/titanic/logs",
cluster_name="emr-cluster",
job_name='spark-ml-trainner',
input='s3://kubeflow-pipeline-data/emr/titanic/train.csv',
output='s3://kubeflow-pipeline-data/emr/titanic/output',
jar_path='s3://kubeflow-pipeline-data/emr/titanic/titanic-survivors-prediction_2.11-1.0.jar',
main_class='com.amazonaws.emr.titanic.Titanic',
instance_type="m4.xlarge",
instance_count="3"
):
create_cluster = emr_create_cluster_op(
region=region,
name=cluster_name,
instance_type=instance_type,
instance_count=instance_count,
log_s3_uri=log_s3_uri,
).apply(use_aws_secret('aws-secret', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY'))
training_and_prediction = emr_submit_spark_job_op(
region=region,
jobflow_id=create_cluster.output,
job_name=job_name,
jar_path=jar_path,
main_class=main_class,
input=input,
output=output
).apply(use_aws_secret('aws-secret', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY'))
delete_cluster = emr_delete_cluster_op(
region=region,
jobflow_id=create_cluster.output,
dependent=training_and_prediction.outputs['job_id']
).apply(use_aws_secret('aws-secret', 'AWS_ACCESS_KEY_ID', 'AWS_SECRET_ACCESS_KEY'))
if __name__ == '__main__':
kfp.compiler.Compiler().compile(titanic_suvival_prediction, __file__ + '.zip')