In a discussion of moving from
Google AppEngine to EC2, a participant
mentioned that the
only "odd" GAE service he made use of was transactional task
queues,
and that there was no direct analogue of this service in other
queueing systems.
Here's one way of building a transactional task queue using RabbitMQ
and your favourite SQL database.
- a table in the database will hold pending tasks
- messages going through RabbitMQ will inform workers of the arrival of new tasks
- the available tasks will be load-balanced between n workers
- a "cleaner" process makes sure all tasks are eventually run and
that failed tasks are eventually detected
In this article, I'll spell out the requirements for the system, and
discuss the details of the database tables, RabbitMQ queues, and
daemon processes required. I'll then evaluate the solution presented
in terms of the requirements, and conclude by discussing a few
variations on the solution that might be worth exploring.
Requirements
This is what the GAE Task Queue API has to say about
transactional task queueing:
You can enqueue a task as part of a datastore transaction, such that
the task is only enqueued—and guaranteed to be enqueued—if the
transaction is committed successfully. Tasks added within a
transaction are considered to be a part of it and have the same
level of isolation and consistency.
Our requirements, then, are:
- all successfully-enqueued tasks should eventually run
- successfully-enqueued tasks should be runnable shortly after transaction commit
- tasks where the transaction rolled back should never run
- the system should be robust against race conditions
- successfully-enqueued tasks shouldn't be run more than once
where a "successfully-enqueued" task is one that was enqueued as part
of a successfully-committed transaction.
Tables & Queues
Create a table—let's call it tasks—in your SQL database (here
imagined to be PostgreSQL):
CREATE TABLE tasks (
id TEXT PRIMARY KEY,
description TEXT UNIQUE,
start_time TIMESTAMP WITH TIME ZONE
);
Each column is important:
the id column uniquely names the job. Use a
UUID,
or the MD5 or SHA-1 hash of the description, or some other
suitably-unique name for the job.
the description column describes the task to be performed. You
could use JSON, a URL, a Java class or method name, or any other
means of identifying what is to be done.
the start_time column is used to make sure only a single worker
starts performing the task, and to detect failed tasks.
Next, create a queue in your RabbitMQ instance; let's call it
task_queue.
Queue_Declare(queue = 'task_queue', durable = True)
The tasks table will serve as the "master list" of work remaining to
be done. The task_queue queue will only be used to rapidly notify
workers that a new task is waiting for them. If messages are lost from
that queue on occasion, no harm will be done: the "cleaner" process,
described below, will pick up the pieces.
Enqueueing a Task
To enqueue a task as part of a database transaction, create a row for
the task as part of the transaction, and commit normally:
BEGIN;
-- ... the main part of your transaction goes here ...
INSERT INTO tasks
VALUES ('ad5b4ccf3902db006405074c721a990e', 'mytask', NULL);
COMMIT;
The new row must have its start_time column set to NULL.
At any time near the end of the transaction—either before or just
after the commit—send a message to the task_queue containing the
id of the new task:
Basic_Publish(exchange = '',
routing_key = 'task_queue',
body = 'ad5b4ccf3902db006405074c721a990e')
Worker Processes
Your workers should connect to the database and to RabbitMQ as
usual. Each worker process should
Basic_Consume
from the task_queue. Whenever a message comes in containing a task
id—for example, the ID of the task enqueued just above,
ad5b4ccf3902db006405074c721a990e—the worker should:
BEGIN a transaction.
SELECT * FROM tasks WHERE id = 'ad5b4ccf3902db006405074c721a990e' FOR UPDATE
The FOR UPDATE is important: it prevents a race condition if two
workers accidentally start looking at the same job at the same
time.
If no row is returned, wait for a few seconds (the "retry time")
and retry the SELECT. The message travelling through RabbitMQ
could have overtaken the database commit. If after a couple of
retries (the "giveup time") the row is still absent, it's likely
the sender's database commit didn't complete, so move on to
waiting for the next message arriving from task_queue.
Check the start_time of the returned row. If it's anything but
NULL, some other worker has already started this job, so move on
to the next message.
UPDATE tasks SET start_time = NOW() WHERE id = 'ad5b4ccf3902db006405074c721a990e'
COMMIT the transaction. At this point, this worker instance is
considered to have claimed the job.
Perform the task described by the description column of the row.
DELETE FROM tasks WHERE id = 'ad5b4ccf3902db006405074c721a990e'
At step 3, where a worker may give up on a given notification and move
on to the next one, it's possible that the worker might just not have
waited for long enough for the record to appear in the
database. Similarly, at step 4, the worker that had claimed the task
may have crashed or been terminated before it was able to DELETE the
task record from the database. In both cases, the "cleaner" process
will make sure the job gets done eventually.
The "Cleaner" Process
The "cleaner" process should connect to the database and to RabbitMQ
as usual. It should periodically—say, every minute (the "cleanup
interval")—wake up and perform the following routine:
SELECT * FROM tasks WHERE start_time IS NULL
Each row returned indicates a possibly-overlooked task, so a
repeat message to task_queue should be sent, just like the ones
described above. The worker pool will pick up the message as usual
and will start work on the task. The use of FOR UPDATE in each
worker ensures that the job won't be performed twice.
SELECT * FROM tasks WHERE start_time < (NOW() - INTERVAL '1 hour')
Each row returned indicates a possible crashed or deadlocked
task. Vary the interval as required! For some jobs, one hour is
too long; for others, much too short. The "cleaner" should kick
off task-specific recovery actions:
sometimes it's appropriate to simply retry the job: UPDATE
the start_time to NULL, and then post a message to
task_queue.
sometimes a compensating job should be enqueued, either as a
task (in which case use the normal task enqueueing process) or
as a simple function call within the "cleaner".
sometimes the job should be abandoned, and a "dead job"
notification sent to a system operator: perhaps send an email,
or send a message to an appropriate exchange using RabbitMQ.
Evaluation
How did we do in satisfying our requirements?
Requirement 1 is satisfied by the action of the "cleaner": any time a
row is successfully inserted into the tasks table, the combined
action of the "cleaner" and the worker processes make sure the job
will be run.
Requirement 2 is satisfied by the use of the task_queue: the message
sent through RabbitMQ serves to alert a worker that an item is headed
their way in the database.
Requirement 3 is satisfied by the use of transactional inserts into
the tasks table: if the commit is rolled back, the row will never
appear in the database, and even if a message was sent on
task_queue, the worker will eventually stop waiting for the database
row to arrive, and the "cleaner" will never hear of the rolled-back
job to begin with.
Requirement 4 is satisfied by the use of the retry timers, which will
catch most races between the database and RabbitMQ. In cases where
severe delays mean that the workers give up on a job before its row
appears, the "cleaner" process will notice, and will resend a job
notification message to task_queue.
Requirement 5 is satisfied by the use of the start_time field in
each row, in conjunction with SELECT ... FOR UPDATE to ensure that
only one worker will be able to claim a job for processing.
Variations
There are plenty of variations on this basic pattern.
You could make the "cleaner" process only resubmit jobs that have
been waiting for more than two "cleanup intervals", to lower the
(already small) probability of (harmless) double
notifications. You'd only need to do this if your database was bad
at performing SELECT ... FOR UPDATE queries.
You could automatically scale your worker pool: instead of posting
notifications directly to the task_queue, you'd send them to a
task_exchange. They'd then be distributed not only to workers
but to a special "scaler" process. Each worker would send a
notification to the "scaler" when each task was completed, and the
"scaler" would keep statistics on the arrival rate and the
completion rate of tasks. New worker process instances would be
started by the "scaler" in situations where the arrival rate was
greatly exceeding the completion rate, and old instances would be
torn down in the opposite situation.
The (highly experimental) presence
exchange RabbitMQ
plugin could be used to detect crashed jobs more quickly.
Instead of DELETEing tasks, an additional timestamp column
finish_time could be added to the table, and the queries and
updates could be adjusted to use it to simulate job
deletion. That way, a permanent record is kept of all the tasks
the system has performed.
And, of course, the "retry time", "giveup time", "cleanup interval"
and crashed-task-detection-interval can all be adjusted to the needs
of a given system.
Finally, in cases where you don't need transactional task queueing,
systems like Celery provide an amazing
wealth of general task-execution functionality, and will almost
certainly serve you better than rolling your own.