344 lines
12 KiB
Markdown
344 lines
12 KiB
Markdown
---
|
|
title: Coarse Parallel Processing Using a Work Queue
|
|
content_type: task
|
|
weight: 20
|
|
---
|
|
|
|
|
|
<!-- overview -->
|
|
|
|
In this example, you will run a Kubernetes Job with multiple parallel
|
|
worker processes.
|
|
|
|
In this example, as each pod is created, it picks up one unit of work
|
|
from a task queue, completes it, deletes it from the queue, and exits.
|
|
|
|
Here is an overview of the steps in this example:
|
|
|
|
1. **Start a message queue service.** In this example, you use RabbitMQ, but you could use another
|
|
one. In practice you would set up a message queue service once and reuse it for many jobs.
|
|
1. **Create a queue, and fill it with messages.** Each message represents one task to be done. In
|
|
this example, a message is an integer that we will do a lengthy computation on.
|
|
1. **Start a Job that works on tasks from the queue**. The Job starts several pods. Each pod takes
|
|
one task from the message queue, processes it, and exits.
|
|
|
|
## {{% heading "prerequisites" %}}
|
|
|
|
|
|
You should already be familiar with the basic,
|
|
non-parallel, use of [Job](/docs/concepts/workloads/controllers/job/).
|
|
|
|
{{< include "task-tutorial-prereqs.md" >}}
|
|
|
|
You will need a container image registry where you can upload images to run in your cluster.
|
|
|
|
This task example also assumes that you have Docker installed locally.
|
|
|
|
|
|
<!-- steps -->
|
|
|
|
## Starting a message queue service
|
|
|
|
This example uses RabbitMQ, however, you can adapt the example to use another AMQP-type message service.
|
|
|
|
In practice you could set up a message queue service once in a
|
|
cluster and reuse it for many jobs, as well as for long-running services.
|
|
|
|
Start RabbitMQ as follows:
|
|
|
|
```shell
|
|
# make a Service for the StatefulSet to use
|
|
kubectl create -f https://kubernetes.io/examples/application/job/rabbitmq/rabbitmq-service.yaml
|
|
```
|
|
```
|
|
service "rabbitmq-service" created
|
|
```
|
|
|
|
```shell
|
|
kubectl create -f https://kubernetes.io/examples/application/job/rabbitmq/rabbitmq-statefulset.yaml
|
|
```
|
|
```
|
|
statefulset "rabbitmq" created
|
|
```
|
|
|
|
## Testing the message queue service
|
|
|
|
Now, we can experiment with accessing the message queue. We will
|
|
create a temporary interactive pod, install some tools on it,
|
|
and experiment with queues.
|
|
|
|
First create a temporary interactive Pod.
|
|
|
|
```shell
|
|
# Create a temporary interactive container
|
|
kubectl run -i --tty temp --image ubuntu:22.04
|
|
```
|
|
```
|
|
Waiting for pod default/temp-loe07 to be running, status is Pending, pod ready: false
|
|
... [ previous line repeats several times .. hit return when it stops ] ...
|
|
```
|
|
|
|
Note that your pod name and command prompt will be different.
|
|
|
|
Next install the `amqp-tools` so you can work with message queues.
|
|
The next commands show what you need to run inside the interactive shell in that Pod:
|
|
|
|
```shell
|
|
apt-get update && apt-get install -y curl ca-certificates amqp-tools python3 dnsutils
|
|
```
|
|
|
|
Later, you will make a container image that includes these packages.
|
|
|
|
Next, you will check that you can discover the Service for RabbitMQ:
|
|
|
|
```
|
|
# Run these commands inside the Pod
|
|
# Note the rabbitmq-service has a DNS name, provided by Kubernetes:
|
|
nslookup rabbitmq-service
|
|
```
|
|
```
|
|
Server: 10.0.0.10
|
|
Address: 10.0.0.10#53
|
|
|
|
Name: rabbitmq-service.default.svc.cluster.local
|
|
Address: 10.0.147.152
|
|
```
|
|
(the IP addresses will vary)
|
|
|
|
If the kube-dns addon is not set up correctly, the previous step may not work for you.
|
|
You can also find the IP address for that Service in an environment variable:
|
|
|
|
```shell
|
|
# run this check inside the Pod
|
|
env | grep RABBITMQ_SERVICE | grep HOST
|
|
```
|
|
```
|
|
RABBITMQ_SERVICE_SERVICE_HOST=10.0.147.152
|
|
```
|
|
(the IP address will vary)
|
|
|
|
Next you will verify that you can create a queue, and publish and consume messages.
|
|
|
|
```shell
|
|
# Run these commands inside the Pod
|
|
# In the next line, rabbitmq-service is the hostname where the rabbitmq-service
|
|
# can be reached. 5672 is the standard port for rabbitmq.
|
|
export BROKER_URL=amqp://guest:guest@rabbitmq-service:5672
|
|
# If you could not resolve "rabbitmq-service" in the previous step,
|
|
# then use this command instead:
|
|
BROKER_URL=amqp://guest:guest@$RABBITMQ_SERVICE_SERVICE_HOST:5672
|
|
|
|
# Now create a queue:
|
|
|
|
/usr/bin/amqp-declare-queue --url=$BROKER_URL -q foo -d
|
|
```
|
|
```
|
|
foo
|
|
```
|
|
|
|
Publish one message to the queue:
|
|
```shell
|
|
/usr/bin/amqp-publish --url=$BROKER_URL -r foo -p -b Hello
|
|
|
|
# And get it back.
|
|
|
|
/usr/bin/amqp-consume --url=$BROKER_URL -q foo -c 1 cat && echo 1>&2
|
|
```
|
|
```
|
|
Hello
|
|
```
|
|
|
|
In the last command, the `amqp-consume` tool took one message (`-c 1`)
|
|
from the queue, and passes that message to the standard input of an arbitrary command.
|
|
In this case, the program `cat` prints out the characters read from standard input, and
|
|
the echo adds a carriage return so the example is readable.
|
|
|
|
## Fill the queue with tasks
|
|
|
|
Now, fill the queue with some simulated tasks. In this example, the tasks are strings to be
|
|
printed.
|
|
|
|
In a practice, the content of the messages might be:
|
|
|
|
- names of files to that need to be processed
|
|
- extra flags to the program
|
|
- ranges of keys in a database table
|
|
- configuration parameters to a simulation
|
|
- frame numbers of a scene to be rendered
|
|
|
|
If there is large data that is needed in a read-only mode by all pods
|
|
of the Job, you typically put that in a shared file system like NFS and mount
|
|
that readonly on all the pods, or write the program in the pod so that it can natively read
|
|
data from a cluster file system (for example: HDFS).
|
|
|
|
For this example, you will create the queue and fill it using the AMQP command line tools.
|
|
In practice, you might write a program to fill the queue using an AMQP client library.
|
|
|
|
```shell
|
|
# Run this on your computer, not in the Pod
|
|
/usr/bin/amqp-declare-queue --url=$BROKER_URL -q job1 -d
|
|
```
|
|
```
|
|
job1
|
|
```
|
|
Add items to the queue:
|
|
```shell
|
|
for f in apple banana cherry date fig grape lemon melon
|
|
do
|
|
/usr/bin/amqp-publish --url=$BROKER_URL -r job1 -p -b $f
|
|
done
|
|
```
|
|
|
|
You added 8 messages to the queue.
|
|
|
|
## Create a container image
|
|
|
|
Now you are ready to create an image that you will run as a Job.
|
|
|
|
The job will use the `amqp-consume` utility to read the message
|
|
from the queue and run the actual work. Here is a very simple
|
|
example program:
|
|
|
|
{{% code_sample language="python" file="application/job/rabbitmq/worker.py" %}}
|
|
|
|
Give the script execution permission:
|
|
|
|
```shell
|
|
chmod +x worker.py
|
|
```
|
|
|
|
Now, build an image. Make a temporary directory, change to it,
|
|
download the [Dockerfile](/examples/application/job/rabbitmq/Dockerfile),
|
|
and [worker.py](/examples/application/job/rabbitmq/worker.py). In either case,
|
|
build the image with this command:
|
|
|
|
```shell
|
|
docker build -t job-wq-1 .
|
|
```
|
|
|
|
For the [Docker Hub](https://hub.docker.com/), tag your app image with
|
|
your username and push to the Hub with the below commands. Replace
|
|
`<username>` with your Hub username.
|
|
|
|
```shell
|
|
docker tag job-wq-1 <username>/job-wq-1
|
|
docker push <username>/job-wq-1
|
|
```
|
|
|
|
If you are using an alternative container image registry, tag the
|
|
image and push it there instead.
|
|
|
|
## Defining a Job
|
|
|
|
Here is a manifest for a Job. You'll need to make a copy of the Job manifest
|
|
(call it `./job.yaml`),
|
|
and edit the name of the container image to match the name you used.
|
|
|
|
{{% code_sample file="application/job/rabbitmq/job.yaml" %}}
|
|
|
|
In this example, each pod works on one item from the queue and then exits.
|
|
So, the completion count of the Job corresponds to the number of work items
|
|
done. That is why the example manifest has `.spec.completions` set to `8`.
|
|
|
|
## Running the Job
|
|
|
|
Now, run the Job:
|
|
|
|
```shell
|
|
# this assumes you downloaded and then edited the manifest already
|
|
kubectl apply -f ./job.yaml
|
|
```
|
|
|
|
You can wait for the Job to succeed, with a timeout:
|
|
```shell
|
|
# The check for condition name is case insensitive
|
|
kubectl wait --for=condition=complete --timeout=300s job/job-wq-1
|
|
```
|
|
|
|
Next, check on the Job:
|
|
|
|
```shell
|
|
kubectl describe jobs/job-wq-1
|
|
```
|
|
```
|
|
Name: job-wq-1
|
|
Namespace: default
|
|
Selector: controller-uid=41d75705-92df-11e7-b85e-fa163ee3c11f
|
|
Labels: controller-uid=41d75705-92df-11e7-b85e-fa163ee3c11f
|
|
job-name=job-wq-1
|
|
Annotations: <none>
|
|
Parallelism: 2
|
|
Completions: 8
|
|
Start Time: Wed, 06 Sep 2022 16:42:02 +0000
|
|
Pods Statuses: 0 Running / 8 Succeeded / 0 Failed
|
|
Pod Template:
|
|
Labels: controller-uid=41d75705-92df-11e7-b85e-fa163ee3c11f
|
|
job-name=job-wq-1
|
|
Containers:
|
|
c:
|
|
Image: container-registry.example/causal-jigsaw-637/job-wq-1
|
|
Port:
|
|
Environment:
|
|
BROKER_URL: amqp://guest:guest@rabbitmq-service:5672
|
|
QUEUE: job1
|
|
Mounts: <none>
|
|
Volumes: <none>
|
|
Events:
|
|
FirstSeen LastSeen Count From SubobjectPath Type Reason Message
|
|
───────── ──────── ───── ──── ───────────── ────── ────── ───────
|
|
27s 27s 1 {job } Normal SuccessfulCreate Created pod: job-wq-1-hcobb
|
|
27s 27s 1 {job } Normal SuccessfulCreate Created pod: job-wq-1-weytj
|
|
27s 27s 1 {job } Normal SuccessfulCreate Created pod: job-wq-1-qaam5
|
|
27s 27s 1 {job } Normal SuccessfulCreate Created pod: job-wq-1-b67sr
|
|
26s 26s 1 {job } Normal SuccessfulCreate Created pod: job-wq-1-xe5hj
|
|
15s 15s 1 {job } Normal SuccessfulCreate Created pod: job-wq-1-w2zqe
|
|
14s 14s 1 {job } Normal SuccessfulCreate Created pod: job-wq-1-d6ppa
|
|
14s 14s 1 {job } Normal SuccessfulCreate Created pod: job-wq-1-p17e0
|
|
```
|
|
|
|
|
|
|
|
All the pods for that Job succeeded! You're done.
|
|
|
|
|
|
<!-- discussion -->
|
|
|
|
## Alternatives
|
|
|
|
This approach has the advantage that you do not need to modify your "worker" program to be
|
|
aware that there is a work queue. You can include the worker program unmodified in your container
|
|
image.
|
|
|
|
Using this approach does require that you run a message queue service.
|
|
If running a queue service is inconvenient, you may
|
|
want to consider one of the other [job patterns](/docs/concepts/workloads/controllers/job/#job-patterns).
|
|
|
|
This approach creates a pod for every work item. If your work items only take a few seconds,
|
|
though, creating a Pod for every work item may add a lot of overhead. Consider another
|
|
design, such as in the [fine parallel work queue example](/docs/tasks/job/fine-parallel-processing-work-queue/),
|
|
that executes multiple work items per Pod.
|
|
|
|
In this example, you used the `amqp-consume` utility to read the message
|
|
from the queue and run the actual program. This has the advantage that you
|
|
do not need to modify your program to be aware of the queue.
|
|
The [fine parallel work queue example](/docs/tasks/job/fine-parallel-processing-work-queue/)
|
|
shows how to communicate with the work queue using a client library.
|
|
|
|
## Caveats
|
|
|
|
If the number of completions is set to less than the number of items in the queue, then
|
|
not all items will be processed.
|
|
|
|
If the number of completions is set to more than the number of items in the queue,
|
|
then the Job will not appear to be completed, even though all items in the queue
|
|
have been processed. It will start additional pods which will block waiting
|
|
for a message.
|
|
You would need to make your own mechanism to spot when there is work
|
|
to do and measure the size of the queue, setting the number of completions to match.
|
|
|
|
There is an unlikely race with this pattern. If the container is killed in between the time
|
|
that the message is acknowledged by the `amqp-consume` command and the time that the container
|
|
exits with success, or if the node crashes before the kubelet is able to post the success of the pod
|
|
back to the API server, then the Job will not appear to be complete, even though all items
|
|
in the queue have been processed.
|