Tasks within Transactions using RabbitMQ

In a discussion of moving from Google AppEngine to EC2, a participant mentioned that the only “odd” GAE service he made use of was transactional task queues, and that there was no direct analogue of this service in other queueing systems.

Here’s one way of building a transactional task queue using RabbitMQ and your favourite SQL database.

  • a table in the database will hold pending tasks
  • messages going through RabbitMQ will inform workers of the arrival of new tasks
  • the available tasks will be load-balanced between n workers
  • a “cleaner” process makes sure all tasks are eventually run and that failed tasks are eventually detected

In this article, I’ll spell out the requirements for the system, and discuss the details of the database tables, RabbitMQ queues, and daemon processes required. I’ll then evaluate the solution presented in terms of the requirements, and conclude by discussing a few variations on the solution that might be worth exploring.

Requirements

This is what the GAE Task Queue API has to say about transactional task queueing:

You can enqueue a task as part of a datastore transaction, such that the task is only enqueued—and guaranteed to be enqueued—if the transaction is committed successfully. Tasks added within a transaction are considered to be a part of it and have the same level of isolation and consistency.

Our requirements, then, are:

  1. all successfully-enqueued tasks should eventually run
  2. successfully-enqueued tasks should be runnable shortly after transaction commit
  3. tasks where the transaction rolled back should never run
  4. the system should be robust against race conditions
  5. successfully-enqueued tasks shouldn’t be run more than once

where a “successfully-enqueued” task is one that was enqueued as part of a successfully-committed transaction.

Tables & Queues

Create a table—let’s call it tasks —in your SQL database (here imagined to be PostgreSQL):

CREATE TABLE tasks (
  id TEXT PRIMARY KEY,
  description TEXT UNIQUE,
  start_time TIMESTAMP WITH TIME ZONE
);

Each column is important:

  • the id column uniquely names the job. Use a UUID, or the MD5 or SHA-1 hash of the description, or some other suitably-unique name for the job.

  • the description column describes the task to be performed. You could use JSON, a URL, a Java class or method name, or any other means of identifying what is to be done.

  • the start_time column is used to make sure only a single worker starts performing the task, and to detect failed tasks.

Next, create a queue in your RabbitMQ instance; let’s call it task_queue.

Queue_Declare(queue = 'task_queue', durable = True)

The tasks table will serve as the “master list” of work remaining to be done. The task_queue queue will only be used to rapidly notify workers that a new task is waiting for them. If messages are lost from that queue on occasion, no harm will be done: the “cleaner” process, described below, will pick up the pieces.

Enqueueing a Task

To enqueue a task as part of a database transaction, create a row for the task as part of the transaction, and commit normally:

BEGIN;
  -- ... the main part of your transaction goes here ...
  INSERT INTO tasks
    VALUES ('ad5b4ccf3902db006405074c721a990e', 'mytask', NULL);
COMMIT;

The new row must have its start_time column set to NULL.

At any time near the end of the transaction—either before or just after the commit—send a message to the task_queue containing the id of the new task:

Basic_Publish(exchange = '',
              routing_key = 'task_queue',
              body = 'ad5b4ccf3902db006405074c721a990e')
Worker Processes

Your workers should connect to the database and to RabbitMQ as usual. Each worker process should Basic_Consume from the task_queue. Whenever a message comes in containing a task id —for example, the ID of the task enqueued just above, ad5b4ccf3902db006405074c721a990e —the worker should:

  1. BEGIN a transaction.

  2. SELECT * FROM tasks WHERE id = 'ad5b4ccf3902db006405074c721a990e' FOR UPDATE

    The FOR UPDATE is important: it prevents a race condition if two workers accidentally start looking at the same job at the same time.

  3. If no row is returned, wait for a few seconds (the “retry time”) and retry the SELECT. The message travelling through RabbitMQ could have overtaken the database commit. If after a couple of retries (the “giveup time”) the row is still absent, it’s likely the sender’s database commit didn’t complete, so move on to waiting for the next message arriving from task_queue.

  4. Check the start_time of the returned row. If it’s anything but NULL, some other worker has already started this job, so move on to the next message.

  5. UPDATE tasks SET start_time = NOW() WHERE id = 'ad5b4ccf3902db006405074c721a990e'

  6. COMMIT the transaction. At this point, this worker instance is considered to have claimed the job.

  7. Perform the task described by the description column of the row.

  8. DELETE FROM tasks WHERE id = 'ad5b4ccf3902db006405074c721a990e'

At step 3, where a worker may give up on a given notification and move on to the next one, it’s possible that the worker might just not have waited for long enough for the record to appear in the database. Similarly, at step 4, the worker that had claimed the task may have crashed or been terminated before it was able to DELETE the task record from the database. In both cases, the “cleaner” process will make sure the job gets done eventually.

The “Cleaner” Process

The “cleaner” process should connect to the database and to RabbitMQ as usual. It should periodically—say, every minute (the “cleanup interval”) —wake up and perform the following routine:

  1. SELECT * FROM tasks WHERE start_time IS NULL

  2. Each row returned indicates a possibly-overlooked task, so a repeat message to task_queue should be sent, just like the ones described above. The worker pool will pick up the message as usual and will start work on the task. The use of FOR UPDATE in each worker ensures that the job won’t be performed twice.

  3. SELECT * FROM tasks WHERE start_time < (NOW() - INTERVAL '1 hour')

  4. Each row returned indicates a possible crashed or deadlocked task. Vary the interval as required! For some jobs, one hour is too long; for others, much too short. The “cleaner” should kick off task-specific recovery actions:

    • sometimes it’s appropriate to simply retry the job: UPDATE the start_time to NULL, and then post a message to task_queue.

    • sometimes a compensating job should be enqueued, either as a task (in which case use the normal task enqueueing process) or as a simple function call within the “cleaner”.

    • sometimes the job should be abandoned, and a “dead job” notification sent to a system operator: perhaps send an email, or send a message to an appropriate exchange using RabbitMQ.

Evaluation

How did we do in satisfying our requirements?

Requirement 1 is satisfied by the action of the “cleaner”: any time a row is successfully inserted into the tasks table, the combined action of the “cleaner” and the worker processes make sure the job will be run.

Requirement 2 is satisfied by the use of the task_queue: the message sent through RabbitMQ serves to alert a worker that an item is headed their way in the database.

Requirement 3 is satisfied by the use of transactional inserts into the tasks table: if the commit is rolled back, the row will never appear in the database, and even if a message was sent on task_queue, the worker will eventually stop waiting for the database row to arrive, and the “cleaner” will never hear of the rolled-back job to begin with.

Requirement 4 is satisfied by the use of the retry timers, which will catch most races between the database and RabbitMQ. In cases where severe delays mean that the workers give up on a job before its row appears, the “cleaner” process will notice, and will resend a job notification message to task_queue.

Requirement 5 is satisfied by the use of the start_time field in each row, in conjunction with SELECT ... FOR UPDATE to ensure that only one worker will be able to claim a job for processing.

Variations

There are plenty of variations on this basic pattern.

  1. You could make the “cleaner” process only resubmit jobs that have been waiting for more than two “cleanup intervals”, to lower the (already small) probability of (harmless) double notifications. You’d only need to do this if your database was bad at performing SELECT ... FOR UPDATE queries.

  2. You could automatically scale your worker pool: instead of posting notifications directly to the task_queue, you’d send them to a task_exchange. They’d then be distributed not only to workers but to a special “scaler” process. Each worker would send a notification to the “scaler” when each task was completed, and the “scaler” would keep statistics on the arrival rate and the completion rate of tasks. New worker process instances would be started by the “scaler” in situations where the arrival rate was greatly exceeding the completion rate, and old instances would be torn down in the opposite situation.

  3. The (highly experimental) presence exchange RabbitMQ plugin could be used to detect crashed jobs more quickly.

  4. Instead of DELETEing tasks, an additional timestamp column finish_time could be added to the table, and the queries and updates could be adjusted to use it to simulate job deletion. That way, a permanent record is kept of all the tasks the system has performed.

And, of course, the “retry time”, “giveup time”, “cleanup interval” and crashed-task-detection-interval can all be adjusted to the needs of a given system.

Finally, in cases where you don’t need transactional task queueing, systems like Celery provide an amazing wealth of general task-execution functionality, and will almost certainly serve you better than rolling your own.