diff --git a/docs/user-guide/jobs/expansions/job.yaml.txt b/docs/user-guide/jobs/expansions/job.yaml.txt deleted file mode 100644 index 790025b38b..0000000000 --- a/docs/user-guide/jobs/expansions/job.yaml.txt +++ /dev/null @@ -1,18 +0,0 @@ -apiVersion: batch/v1 -kind: Job -metadata: - name: process-item-$ITEM - labels: - jobgroup: jobexample -spec: - template: - metadata: - name: jobexample - labels: - jobgroup: jobexample - spec: - containers: - - name: c - image: busybox - command: ["sh", "-c", "echo Processing item $ITEM && sleep 5"] - restartPolicy: Never diff --git a/docs/user-guide/jobs/work-queue-1/Dockerfile b/docs/user-guide/jobs/work-queue-1/Dockerfile deleted file mode 100644 index cbd36bb620..0000000000 --- a/docs/user-guide/jobs/work-queue-1/Dockerfile +++ /dev/null @@ -1,10 +0,0 @@ -# Specify BROKER_URL and QUEUE when running -FROM ubuntu:14.04 - -RUN apt-get update && \ - apt-get install -y curl ca-certificates amqp-tools python \ - --no-install-recommends \ - && rm -rf /var/lib/apt/lists/* -COPY ./worker.py /worker.py - -CMD /usr/bin/amqp-consume --url=$BROKER_URL -q $QUEUE -c 1 /worker.py diff --git a/docs/user-guide/jobs/work-queue-1/job.yaml b/docs/user-guide/jobs/work-queue-1/job.yaml deleted file mode 100644 index 4e1a61892b..0000000000 --- a/docs/user-guide/jobs/work-queue-1/job.yaml +++ /dev/null @@ -1,20 +0,0 @@ -apiVersion: batch/v1 -kind: Job -metadata: - name: job-wq-1 -spec: - completions: 8 - parallelism: 2 - template: - metadata: - name: job-wq-1 - spec: - containers: - - name: c - image: gcr.io//job-wq-1 - env: - - name: BROKER_URL - value: amqp://guest:guest@rabbitmq-service:5672 - - name: QUEUE - value: job1 - restartPolicy: OnFailure diff --git a/docs/user-guide/jobs/work-queue-1/worker.py b/docs/user-guide/jobs/work-queue-1/worker.py deleted file mode 100755 index a20884515d..0000000000 --- a/docs/user-guide/jobs/work-queue-1/worker.py +++ /dev/null @@ -1,7 +0,0 @@ -#!/usr/bin/env python - -# Just prints standard out and sleeps for 10 seconds. -import sys -import time -print("Processing " + sys.stdin.lines()) -time.sleep(10) diff --git a/docs/user-guide/jobs/work-queue-2/Dockerfile b/docs/user-guide/jobs/work-queue-2/Dockerfile deleted file mode 100644 index 2de23b3c98..0000000000 --- a/docs/user-guide/jobs/work-queue-2/Dockerfile +++ /dev/null @@ -1,6 +0,0 @@ -FROM python -RUN pip install redis -COPY ./worker.py /worker.py -COPY ./rediswq.py /rediswq.py - -CMD python worker.py diff --git a/docs/user-guide/jobs/work-queue-2/job.yaml b/docs/user-guide/jobs/work-queue-2/job.yaml deleted file mode 100644 index ee7a06c732..0000000000 --- a/docs/user-guide/jobs/work-queue-2/job.yaml +++ /dev/null @@ -1,14 +0,0 @@ -apiVersion: batch/v1 -kind: Job -metadata: - name: job-wq-2 -spec: - parallelism: 2 - template: - metadata: - name: job-wq-2 - spec: - containers: - - name: c - image: gcr.io/myproject/job-wq-2 - restartPolicy: OnFailure diff --git a/docs/user-guide/jobs/work-queue-2/redis-pod.yaml b/docs/user-guide/jobs/work-queue-2/redis-pod.yaml deleted file mode 100644 index ae0c43a793..0000000000 --- a/docs/user-guide/jobs/work-queue-2/redis-pod.yaml +++ /dev/null @@ -1,15 +0,0 @@ -apiVersion: v1 -kind: Pod -metadata: - name: redis-master - labels: - app: redis -spec: - containers: - - name: master - image: redis - env: - - name: MASTER - value: "true" - ports: - - containerPort: 6379 diff --git a/docs/user-guide/jobs/work-queue-2/redis-service.yaml b/docs/user-guide/jobs/work-queue-2/redis-service.yaml deleted file mode 100644 index 519ea60fb9..0000000000 --- a/docs/user-guide/jobs/work-queue-2/redis-service.yaml +++ /dev/null @@ -1,10 +0,0 @@ -apiVersion: v1 -kind: Service -metadata: - name: redis -spec: - ports: - - port: 6379 - targetPort: 6379 - selector: - app: redis diff --git a/docs/user-guide/jobs/work-queue-2/rediswq.py b/docs/user-guide/jobs/work-queue-2/rediswq.py deleted file mode 100644 index ceda8bd1e3..0000000000 --- a/docs/user-guide/jobs/work-queue-2/rediswq.py +++ /dev/null @@ -1,130 +0,0 @@ -#!/usr/bin/env python - -# Based on http://peter-hoffmann.com/2012/python-simple-queue-redis-queue.html -# and the suggestion in the redis documentation for RPOPLPUSH, at -# http://redis.io/commands/rpoplpush, which suggests how to implement a work-queue. - - -import redis -import uuid -import hashlib - -class RedisWQ(object): - """Simple Finite Work Queue with Redis Backend - - This work queue is finite: as long as no more work is added - after workers start, the workers can detect when the queue - is completely empty. - - The items in the work queue are assumed to have unique values. - - This object is not intended to be used by multiple threads - concurrently. - """ - def __init__(self, name, **redis_kwargs): - """The default connection parameters are: host='localhost', port=6379, db=0 - - The work queue is identified by "name". The library may create other - keys with "name" as a prefix. - """ - self._db = redis.StrictRedis(**redis_kwargs) - # The session ID will uniquely identify this "worker". - self._session = str(uuid.uuid4()) - # Work queue is implemented as two queues: main, and processing. - # Work is initially in main, and moved to processing when a client picks it up. - self._main_q_key = name - self._processing_q_key = name + ":processing" - self._lease_key_prefix = name + ":leased_by_session:" - - def sessionID(self): - """Return the ID for this session.""" - return self._session - - def _main_qsize(self): - """Return the size of the main queue.""" - return self._db.llen(self._main_q_key) - - def _processing_qsize(self): - """Return the size of the main queue.""" - return self._db.llen(self._processing_q_key) - - def empty(self): - """Return True if the queue is empty, including work being done, False otherwise. - - False does not necessarily mean that there is work available to work on right now, - """ - return self._main_qsize() == 0 and self._processing_qsize() == 0 - -# TODO: implement this -# def check_expired_leases(self): -# """Return to the work queueReturn True if the queue is empty, False otherwise.""" -# # Processing list should not be _too_ long since it is approximately as long -# # as the number of active and recently active workers. -# processing = self._db.lrange(self._processing_q_key, 0, -1) -# for item in processing: -# # If the lease key is not present for an item (it expired or was -# # never created because the client crashed before creating it) -# # then move the item back to the main queue so others can work on it. -# if not self._lease_exists(item): -# TODO: transactionally move the key from processing queue to -# to main queue, while detecting if a new lease is created -# or if either queue is modified. - - def _itemkey(self, item): - """Returns a string that uniquely identifies an item (bytes).""" - return hashlib.sha224(item).hexdigest() - - def _lease_exists(self, item): - """True if a lease on 'item' exists.""" - return self._db.exists(self._lease_key_prefix + self._itemkey(item)) - - def lease(self, lease_secs=60, block=True, timeout=None): - """Begin working on an item the work queue. - - Lease the item for lease_secs. After that time, other - workers may consider this client to have crashed or stalled - and pick up the item instead. - - If optional args block is true and timeout is None (the default), block - if necessary until an item is available.""" - if block: - item = self._db.brpoplpush(self._main_q_key, self._processing_q_key, timeout=timeout) - else: - item = self._db.rpoplpush(self._main_q_key, self._processing_q_key) - if item: - # Record that we (this session id) are working on a key. Expire that - # note after the lease timeout. - # Note: if we crash at this line of the program, then GC will see no lease - # for this item a later return it to the main queue. - itemkey = self._itemkey(item) - self._db.setex(self._lease_key_prefix + itemkey, lease_secs, self._session) - return item - - def complete(self, value): - """Complete working on the item with 'value'. - - If the lease expired, the item may not have completed, and some - other worker may have picked it up. There is no indication - of what happened. - """ - self._db.lrem(self._processing_q_key, 0, value) - # If we crash here, then the GC code will try to move the value, but it will - # not be here, which is fine. So this does not need to be a transaction. - itemkey = self._itemkey(value) - self._db.delete(self._lease_key_prefix + itemkey, self._session) - -# TODO: add functions to clean up all keys associated with "name" when -# processing is complete. - -# TODO: add a function to add an item to the queue. Atomically -# check if the queue is empty and if so fail to add the item -# since other workers might think work is done and be in the process -# of exiting. - -# TODO(etune): move to my own github for hosting, e.g. github.com/erictune/rediswq-py and -# make it so it can be pip installed by anyone (see -# http://stackoverflow.com/questions/8247605/configuring-so-that-pip-install-can-work-from-github) - -# TODO(etune): finish code to GC expired leases, and call periodically -# e.g. each time lease times out. - diff --git a/docs/user-guide/jobs/work-queue-2/worker.py b/docs/user-guide/jobs/work-queue-2/worker.py deleted file mode 100755 index 49e5dae798..0000000000 --- a/docs/user-guide/jobs/work-queue-2/worker.py +++ /dev/null @@ -1,23 +0,0 @@ -#!/usr/bin/env python - -import time -import rediswq - -host="redis" -# Uncomment next two lines if you do not have Kube-DNS working. -# import os -# host = os.getenv("REDIS_SERVICE_HOST") - -q = rediswq.RedisWQ(name="job2", host="redis") -print("Worker with sessionID: " + q.sessionID()) -print("Initial queue state: empty=" + str(q.empty())) -while not q.empty(): - item = q.lease(lease_secs=10, block=True, timeout=2) - if item is not None: - itemstr = item.decode("utf=8") - print("Working on " + itemstr) - time.sleep(10) # Put your actual work here instead of sleep. - q.complete(item) - else: - print("Waiting for work") -print("Queue empty, exiting") diff --git a/test/examples_test.go b/test/examples_test.go index deb0091715..2bc6ba8084 100644 --- a/test/examples_test.go +++ b/test/examples_test.go @@ -540,14 +540,6 @@ func TestExampleObjectSchemas(t *testing.T) { "mount-file-pod": {&api.Pod{}}, "volume-pod": {&api.Pod{}}, }, - "../docs/user-guide/jobs/work-queue-1": { - "job": {&batch.Job{}}, - }, - "../docs/user-guide/jobs/work-queue-2": { - "job": {&batch.Job{}}, - "redis-pod": {&api.Pod{}}, - "redis-service": {&api.Service{}}, - }, "../docs/user-guide/liveness": { "exec-liveness": {&api.Pod{}}, "http-liveness": {&api.Pod{}},