Remove unused files from user-guide/jobs. (#7427)
This commit is contained in:
parent
7438b73e91
commit
01b83e4551
|
@ -1,18 +0,0 @@
|
|||
apiVersion: batch/v1
|
||||
kind: Job
|
||||
metadata:
|
||||
name: process-item-$ITEM
|
||||
labels:
|
||||
jobgroup: jobexample
|
||||
spec:
|
||||
template:
|
||||
metadata:
|
||||
name: jobexample
|
||||
labels:
|
||||
jobgroup: jobexample
|
||||
spec:
|
||||
containers:
|
||||
- name: c
|
||||
image: busybox
|
||||
command: ["sh", "-c", "echo Processing item $ITEM && sleep 5"]
|
||||
restartPolicy: Never
|
|
@ -1,10 +0,0 @@
|
|||
# Specify BROKER_URL and QUEUE when running
|
||||
FROM ubuntu:14.04
|
||||
|
||||
RUN apt-get update && \
|
||||
apt-get install -y curl ca-certificates amqp-tools python \
|
||||
--no-install-recommends \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
COPY ./worker.py /worker.py
|
||||
|
||||
CMD /usr/bin/amqp-consume --url=$BROKER_URL -q $QUEUE -c 1 /worker.py
|
|
@ -1,20 +0,0 @@
|
|||
apiVersion: batch/v1
|
||||
kind: Job
|
||||
metadata:
|
||||
name: job-wq-1
|
||||
spec:
|
||||
completions: 8
|
||||
parallelism: 2
|
||||
template:
|
||||
metadata:
|
||||
name: job-wq-1
|
||||
spec:
|
||||
containers:
|
||||
- name: c
|
||||
image: gcr.io/<project>/job-wq-1
|
||||
env:
|
||||
- name: BROKER_URL
|
||||
value: amqp://guest:guest@rabbitmq-service:5672
|
||||
- name: QUEUE
|
||||
value: job1
|
||||
restartPolicy: OnFailure
|
|
@ -1,7 +0,0 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
# Just prints standard out and sleeps for 10 seconds.
|
||||
import sys
|
||||
import time
|
||||
print("Processing " + sys.stdin.lines())
|
||||
time.sleep(10)
|
|
@ -1,6 +0,0 @@
|
|||
FROM python
|
||||
RUN pip install redis
|
||||
COPY ./worker.py /worker.py
|
||||
COPY ./rediswq.py /rediswq.py
|
||||
|
||||
CMD python worker.py
|
|
@ -1,14 +0,0 @@
|
|||
apiVersion: batch/v1
|
||||
kind: Job
|
||||
metadata:
|
||||
name: job-wq-2
|
||||
spec:
|
||||
parallelism: 2
|
||||
template:
|
||||
metadata:
|
||||
name: job-wq-2
|
||||
spec:
|
||||
containers:
|
||||
- name: c
|
||||
image: gcr.io/myproject/job-wq-2
|
||||
restartPolicy: OnFailure
|
|
@ -1,15 +0,0 @@
|
|||
apiVersion: v1
|
||||
kind: Pod
|
||||
metadata:
|
||||
name: redis-master
|
||||
labels:
|
||||
app: redis
|
||||
spec:
|
||||
containers:
|
||||
- name: master
|
||||
image: redis
|
||||
env:
|
||||
- name: MASTER
|
||||
value: "true"
|
||||
ports:
|
||||
- containerPort: 6379
|
|
@ -1,10 +0,0 @@
|
|||
apiVersion: v1
|
||||
kind: Service
|
||||
metadata:
|
||||
name: redis
|
||||
spec:
|
||||
ports:
|
||||
- port: 6379
|
||||
targetPort: 6379
|
||||
selector:
|
||||
app: redis
|
|
@ -1,130 +0,0 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
# Based on http://peter-hoffmann.com/2012/python-simple-queue-redis-queue.html
|
||||
# and the suggestion in the redis documentation for RPOPLPUSH, at
|
||||
# http://redis.io/commands/rpoplpush, which suggests how to implement a work-queue.
|
||||
|
||||
|
||||
import redis
|
||||
import uuid
|
||||
import hashlib
|
||||
|
||||
class RedisWQ(object):
|
||||
"""Simple Finite Work Queue with Redis Backend
|
||||
|
||||
This work queue is finite: as long as no more work is added
|
||||
after workers start, the workers can detect when the queue
|
||||
is completely empty.
|
||||
|
||||
The items in the work queue are assumed to have unique values.
|
||||
|
||||
This object is not intended to be used by multiple threads
|
||||
concurrently.
|
||||
"""
|
||||
def __init__(self, name, **redis_kwargs):
|
||||
"""The default connection parameters are: host='localhost', port=6379, db=0
|
||||
|
||||
The work queue is identified by "name". The library may create other
|
||||
keys with "name" as a prefix.
|
||||
"""
|
||||
self._db = redis.StrictRedis(**redis_kwargs)
|
||||
# The session ID will uniquely identify this "worker".
|
||||
self._session = str(uuid.uuid4())
|
||||
# Work queue is implemented as two queues: main, and processing.
|
||||
# Work is initially in main, and moved to processing when a client picks it up.
|
||||
self._main_q_key = name
|
||||
self._processing_q_key = name + ":processing"
|
||||
self._lease_key_prefix = name + ":leased_by_session:"
|
||||
|
||||
def sessionID(self):
|
||||
"""Return the ID for this session."""
|
||||
return self._session
|
||||
|
||||
def _main_qsize(self):
|
||||
"""Return the size of the main queue."""
|
||||
return self._db.llen(self._main_q_key)
|
||||
|
||||
def _processing_qsize(self):
|
||||
"""Return the size of the main queue."""
|
||||
return self._db.llen(self._processing_q_key)
|
||||
|
||||
def empty(self):
|
||||
"""Return True if the queue is empty, including work being done, False otherwise.
|
||||
|
||||
False does not necessarily mean that there is work available to work on right now,
|
||||
"""
|
||||
return self._main_qsize() == 0 and self._processing_qsize() == 0
|
||||
|
||||
# TODO: implement this
|
||||
# def check_expired_leases(self):
|
||||
# """Return to the work queueReturn True if the queue is empty, False otherwise."""
|
||||
# # Processing list should not be _too_ long since it is approximately as long
|
||||
# # as the number of active and recently active workers.
|
||||
# processing = self._db.lrange(self._processing_q_key, 0, -1)
|
||||
# for item in processing:
|
||||
# # If the lease key is not present for an item (it expired or was
|
||||
# # never created because the client crashed before creating it)
|
||||
# # then move the item back to the main queue so others can work on it.
|
||||
# if not self._lease_exists(item):
|
||||
# TODO: transactionally move the key from processing queue to
|
||||
# to main queue, while detecting if a new lease is created
|
||||
# or if either queue is modified.
|
||||
|
||||
def _itemkey(self, item):
|
||||
"""Returns a string that uniquely identifies an item (bytes)."""
|
||||
return hashlib.sha224(item).hexdigest()
|
||||
|
||||
def _lease_exists(self, item):
|
||||
"""True if a lease on 'item' exists."""
|
||||
return self._db.exists(self._lease_key_prefix + self._itemkey(item))
|
||||
|
||||
def lease(self, lease_secs=60, block=True, timeout=None):
|
||||
"""Begin working on an item the work queue.
|
||||
|
||||
Lease the item for lease_secs. After that time, other
|
||||
workers may consider this client to have crashed or stalled
|
||||
and pick up the item instead.
|
||||
|
||||
If optional args block is true and timeout is None (the default), block
|
||||
if necessary until an item is available."""
|
||||
if block:
|
||||
item = self._db.brpoplpush(self._main_q_key, self._processing_q_key, timeout=timeout)
|
||||
else:
|
||||
item = self._db.rpoplpush(self._main_q_key, self._processing_q_key)
|
||||
if item:
|
||||
# Record that we (this session id) are working on a key. Expire that
|
||||
# note after the lease timeout.
|
||||
# Note: if we crash at this line of the program, then GC will see no lease
|
||||
# for this item a later return it to the main queue.
|
||||
itemkey = self._itemkey(item)
|
||||
self._db.setex(self._lease_key_prefix + itemkey, lease_secs, self._session)
|
||||
return item
|
||||
|
||||
def complete(self, value):
|
||||
"""Complete working on the item with 'value'.
|
||||
|
||||
If the lease expired, the item may not have completed, and some
|
||||
other worker may have picked it up. There is no indication
|
||||
of what happened.
|
||||
"""
|
||||
self._db.lrem(self._processing_q_key, 0, value)
|
||||
# If we crash here, then the GC code will try to move the value, but it will
|
||||
# not be here, which is fine. So this does not need to be a transaction.
|
||||
itemkey = self._itemkey(value)
|
||||
self._db.delete(self._lease_key_prefix + itemkey, self._session)
|
||||
|
||||
# TODO: add functions to clean up all keys associated with "name" when
|
||||
# processing is complete.
|
||||
|
||||
# TODO: add a function to add an item to the queue. Atomically
|
||||
# check if the queue is empty and if so fail to add the item
|
||||
# since other workers might think work is done and be in the process
|
||||
# of exiting.
|
||||
|
||||
# TODO(etune): move to my own github for hosting, e.g. github.com/erictune/rediswq-py and
|
||||
# make it so it can be pip installed by anyone (see
|
||||
# http://stackoverflow.com/questions/8247605/configuring-so-that-pip-install-can-work-from-github)
|
||||
|
||||
# TODO(etune): finish code to GC expired leases, and call periodically
|
||||
# e.g. each time lease times out.
|
||||
|
|
@ -1,23 +0,0 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
import time
|
||||
import rediswq
|
||||
|
||||
host="redis"
|
||||
# Uncomment next two lines if you do not have Kube-DNS working.
|
||||
# import os
|
||||
# host = os.getenv("REDIS_SERVICE_HOST")
|
||||
|
||||
q = rediswq.RedisWQ(name="job2", host="redis")
|
||||
print("Worker with sessionID: " + q.sessionID())
|
||||
print("Initial queue state: empty=" + str(q.empty()))
|
||||
while not q.empty():
|
||||
item = q.lease(lease_secs=10, block=True, timeout=2)
|
||||
if item is not None:
|
||||
itemstr = item.decode("utf=8")
|
||||
print("Working on " + itemstr)
|
||||
time.sleep(10) # Put your actual work here instead of sleep.
|
||||
q.complete(item)
|
||||
else:
|
||||
print("Waiting for work")
|
||||
print("Queue empty, exiting")
|
|
@ -540,14 +540,6 @@ func TestExampleObjectSchemas(t *testing.T) {
|
|||
"mount-file-pod": {&api.Pod{}},
|
||||
"volume-pod": {&api.Pod{}},
|
||||
},
|
||||
"../docs/user-guide/jobs/work-queue-1": {
|
||||
"job": {&batch.Job{}},
|
||||
},
|
||||
"../docs/user-guide/jobs/work-queue-2": {
|
||||
"job": {&batch.Job{}},
|
||||
"redis-pod": {&api.Pod{}},
|
||||
"redis-service": {&api.Service{}},
|
||||
},
|
||||
"../docs/user-guide/liveness": {
|
||||
"exec-liveness": {&api.Pod{}},
|
||||
"http-liveness": {&api.Pod{}},
|
||||
|
|
Loading…
Reference in New Issue