Successors to "Enterprise Integration Patterns"?

"Enterprise Integration Patterns", by Gregor Hohpe, has been a classic go-to volume for a lot of people working with distributed systems over the years.

It was published back in 2002, though, before things like AMQP, ZeroMQ, Websockets and Twitter.

Is there anything that could be considered an update on the book? Something that covers modern integration scenarios. Perhaps something that touches not only on the newer messaging technologies but also on NoSQL, improvements to the browser environment, and so on.

What are people reading to get a common vocabulary for all this stuff and to get their heads around how the pieces fit together?

'bad_vertex' errors while developing and testing RabbitMQ plugins

Today I have been doing maintenance work on an old RabbitMQ plugin of mine. Part of this work was updating its Makefiles to work with the latest RabbitMQ build system.

The problem and symptom

After getting it to compile, and trying to run it, I started seeing errors like this:

Error: {'EXIT',
       {{badmatch,
        {error,
        {edge,{bad_vertex,mochiweb},rabbitmq_mochiweb,mochiweb}}},
    [{rabbit_plugins,dependencies,3,
         [{file,"src/rabbit_plugins.erl"},{line,100}]},
     {rabbit_plugins_main,format_plugins,4,
         [{file,"src/rabbit_plugins_main.erl"},{line,184}]},
     {rabbit_plugins_main,start,0,
         [{file,"src/rabbit_plugins_main.erl"},{line,70}]},
     {init,start_it,1,[]},
     {init,start_em,1,[]}]}}

Not just from rabbitmq-plugins, but also a similar error when starting the RabbitMQ server itself.

The reason turned out to be simple: I had symbolically linked the rabbitmq-mochiweb and mochiweb-wrapper directories into my plugins directory, as per the manual, but what the manual didn't say was that this works for all plugins except the -wrapper plugins (and the Erlang "client" plugin, rabbitmq-erlang-client a.k.a. amqp_client.ez).

The solution

Symlink all the plugins except the wrapper plugins and amqp_client.ez.

The wrapper plugin *.ez files and amqp_client.ez need to be present in the plugins directory itself. So instead of the instructions given, try the following steps:

$ mkdir -p rabbitmq-server/plugins
$ cd rabbitmq-server/plugins
$ ln -s ../../rabbitmq-mochiweb
$ cp rabbitmq-mochiweb/dist/mochiweb*ez .
$ cp rabbitmq-mochiweb/dist/webmachine*ez .
$ ../scripts/rabbitmq-plugins enable rabbitmq_mochiweb

A working configuration for me has the following contents of the plugins directory:

total 816
-rw-r--r--   1 tonyg  staff  260123 Sep 17 18:36 mochiweb-2.3.1-rmq0.0.0-gitd541e9a.ez
lrwxr-xr-x   1 tonyg  staff      23 Sep 17 17:59 rabbitmq-mochiweb -> ../../rabbitmq-mochiweb
-rw-r--r--   1 tonyg  staff  149142 Sep 17 18:37 webmachine-1.9.1-rmq0.0.0-git52e62bc.ez

RabbitHub Status, May 2012

RabbitHub, an implementation of PubSubHubBub I built as part of my work on RabbitMQ, hasn't been maintained for a while now. It lags the current state-of-the-art in two respects:

  1. it will need straightforward updates to work with the current versions of RabbitMQ Server, and
  2. it implements PSHB version 0.1 rather than the current 0.3.

Fixing the former would take about two days of expert work, and I don't know how much work fixing the latter would be. Both perfectly reasonable to consider, though.

RabbitHub is unique, as far as I know, in one respect: it uses PSHB as a generic webhook-based messaging protocol, rather than the more narrow Atom distribution protocol that the designers have in mind. There are other PSHB implementations, notably the reference implementation and Superfeedr, but they all as far as I know are specialized to Atom transport. A list of hub implementations can be found at http://code.google.com/p/pubsubhubbub/wiki/Hubs.

Tasks within Transactions using RabbitMQ

In a discussion of moving from Google AppEngine to EC2, a participant mentioned that the only "odd" GAE service he made use of was transactional task queues, and that there was no direct analogue of this service in other queueing systems.

Here's one way of building a transactional task queue using RabbitMQ and your favourite SQL database.

  • a table in the database will hold pending tasks
  • messages going through RabbitMQ will inform workers of the arrival of new tasks
  • the available tasks will be load-balanced between n workers
  • a "cleaner" process makes sure all tasks are eventually run and that failed tasks are eventually detected

In this article, I'll spell out the requirements for the system, and discuss the details of the database tables, RabbitMQ queues, and daemon processes required. I'll then evaluate the solution presented in terms of the requirements, and conclude by discussing a few variations on the solution that might be worth exploring.

Requirements

This is what the GAE Task Queue API has to say about transactional task queueing:

You can enqueue a task as part of a datastore transaction, such that the task is only enqueued—and guaranteed to be enqueued—if the transaction is committed successfully. Tasks added within a transaction are considered to be a part of it and have the same level of isolation and consistency.

Our requirements, then, are:

  1. all successfully-enqueued tasks should eventually run
  2. successfully-enqueued tasks should be runnable shortly after transaction commit
  3. tasks where the transaction rolled back should never run
  4. the system should be robust against race conditions
  5. successfully-enqueued tasks shouldn't be run more than once

where a "successfully-enqueued" task is one that was enqueued as part of a successfully-committed transaction.

Tables & Queues

Create a table—let's call it tasks—in your SQL database (here imagined to be PostgreSQL):

CREATE TABLE tasks (
  id TEXT PRIMARY KEY,
  description TEXT UNIQUE,
  start_time TIMESTAMP WITH TIME ZONE
);

Each column is important:

  • the id column uniquely names the job. Use a UUID, or the MD5 or SHA-1 hash of the description, or some other suitably-unique name for the job.

  • the description column describes the task to be performed. You could use JSON, a URL, a Java class or method name, or any other means of identifying what is to be done.

  • the start_time column is used to make sure only a single worker starts performing the task, and to detect failed tasks.

Next, create a queue in your RabbitMQ instance; let's call it task_queue.

Queue_Declare(queue = 'task_queue', durable = True)

The tasks table will serve as the "master list" of work remaining to be done. The task_queue queue will only be used to rapidly notify workers that a new task is waiting for them. If messages are lost from that queue on occasion, no harm will be done: the "cleaner" process, described below, will pick up the pieces.

Enqueueing a Task

To enqueue a task as part of a database transaction, create a row for the task as part of the transaction, and commit normally:

BEGIN;
  -- ... the main part of your transaction goes here ...
  INSERT INTO tasks
    VALUES ('ad5b4ccf3902db006405074c721a990e', 'mytask', NULL);
COMMIT;

The new row must have its start_time column set to NULL.

At any time near the end of the transaction—either before or just after the commit—send a message to the task_queue containing the id of the new task:

Basic_Publish(exchange = '',
              routing_key = 'task_queue',
              body = 'ad5b4ccf3902db006405074c721a990e')
Worker Processes

Your workers should connect to the database and to RabbitMQ as usual. Each worker process should Basic_Consume from the task_queue. Whenever a message comes in containing a task id—for example, the ID of the task enqueued just above, ad5b4ccf3902db006405074c721a990e—the worker should:

  1. BEGIN a transaction.

  2. SELECT * FROM tasks WHERE id = 'ad5b4ccf3902db006405074c721a990e' FOR UPDATE

    The FOR UPDATE is important: it prevents a race condition if two workers accidentally start looking at the same job at the same time.

  3. If no row is returned, wait for a few seconds (the "retry time") and retry the SELECT. The message travelling through RabbitMQ could have overtaken the database commit. If after a couple of retries (the "giveup time") the row is still absent, it's likely the sender's database commit didn't complete, so move on to waiting for the next message arriving from task_queue.

  4. Check the start_time of the returned row. If it's anything but NULL, some other worker has already started this job, so move on to the next message.

  5. UPDATE tasks SET start_time = NOW() WHERE id = 'ad5b4ccf3902db006405074c721a990e'

  6. COMMIT the transaction. At this point, this worker instance is considered to have claimed the job.

  7. Perform the task described by the description column of the row.

  8. DELETE FROM tasks WHERE id = 'ad5b4ccf3902db006405074c721a990e'

At step 3, where a worker may give up on a given notification and move on to the next one, it's possible that the worker might just not have waited for long enough for the record to appear in the database. Similarly, at step 4, the worker that had claimed the task may have crashed or been terminated before it was able to DELETE the task record from the database. In both cases, the "cleaner" process will make sure the job gets done eventually.

The "Cleaner" Process

The "cleaner" process should connect to the database and to RabbitMQ as usual. It should periodically—say, every minute (the "cleanup interval")—wake up and perform the following routine:

  1. SELECT * FROM tasks WHERE start_time IS NULL

  2. Each row returned indicates a possibly-overlooked task, so a repeat message to task_queue should be sent, just like the ones described above. The worker pool will pick up the message as usual and will start work on the task. The use of FOR UPDATE in each worker ensures that the job won't be performed twice.

  3. SELECT * FROM tasks WHERE start_time < (NOW() - INTERVAL '1 hour')

  4. Each row returned indicates a possible crashed or deadlocked task. Vary the interval as required! For some jobs, one hour is too long; for others, much too short. The "cleaner" should kick off task-specific recovery actions:

    • sometimes it's appropriate to simply retry the job: UPDATE the start_time to NULL, and then post a message to task_queue.

    • sometimes a compensating job should be enqueued, either as a task (in which case use the normal task enqueueing process) or as a simple function call within the "cleaner".

    • sometimes the job should be abandoned, and a "dead job" notification sent to a system operator: perhaps send an email, or send a message to an appropriate exchange using RabbitMQ.

Evaluation

How did we do in satisfying our requirements?

Requirement 1 is satisfied by the action of the "cleaner": any time a row is successfully inserted into the tasks table, the combined action of the "cleaner" and the worker processes make sure the job will be run.

Requirement 2 is satisfied by the use of the task_queue: the message sent through RabbitMQ serves to alert a worker that an item is headed their way in the database.

Requirement 3 is satisfied by the use of transactional inserts into the tasks table: if the commit is rolled back, the row will never appear in the database, and even if a message was sent on task_queue, the worker will eventually stop waiting for the database row to arrive, and the "cleaner" will never hear of the rolled-back job to begin with.

Requirement 4 is satisfied by the use of the retry timers, which will catch most races between the database and RabbitMQ. In cases where severe delays mean that the workers give up on a job before its row appears, the "cleaner" process will notice, and will resend a job notification message to task_queue.

Requirement 5 is satisfied by the use of the start_time field in each row, in conjunction with SELECT ... FOR UPDATE to ensure that only one worker will be able to claim a job for processing.

Variations

There are plenty of variations on this basic pattern.

  1. You could make the "cleaner" process only resubmit jobs that have been waiting for more than two "cleanup intervals", to lower the (already small) probability of (harmless) double notifications. You'd only need to do this if your database was bad at performing SELECT ... FOR UPDATE queries.

  2. You could automatically scale your worker pool: instead of posting notifications directly to the task_queue, you'd send them to a task_exchange. They'd then be distributed not only to workers but to a special "scaler" process. Each worker would send a notification to the "scaler" when each task was completed, and the "scaler" would keep statistics on the arrival rate and the completion rate of tasks. New worker process instances would be started by the "scaler" in situations where the arrival rate was greatly exceeding the completion rate, and old instances would be torn down in the opposite situation.

  3. The (highly experimental) presence exchange RabbitMQ plugin could be used to detect crashed jobs more quickly.

  4. Instead of DELETEing tasks, an additional timestamp column finish_time could be added to the table, and the queries and updates could be adjusted to use it to simulate job deletion. That way, a permanent record is kept of all the tasks the system has performed.

And, of course, the "retry time", "giveup time", "cleanup interval" and crashed-task-detection-interval can all be adjusted to the needs of a given system.

Finally, in cases where you don't need transactional task queueing, systems like Celery provide an amazing wealth of general task-execution functionality, and will almost certainly serve you better than rolling your own.

AMQP 1.0's "Timestamp" definition is bogus

The AMQP 1.0.0 draft of 2011-07-04 contains the following language defining the meaning of its 64-bit "timestamp" data type:

Encodes a point in time using a 64 bit signed integer representing milliseconds since Midnight Jan 1, 1970 UTC. For the purpose of this representation, milliseconds are taken to be (1/(24*60*60*1000))th of a day.

This language is broken.1

The first sentence there makes it a fixed offset from TAI, which is good. The second sentence there is totally bogus and can have no purpose beyond confusing the crap out of anyone trying to work from this definition. It'd be massively improved by just removing the second sentence:

Encodes a point in time using a 64 bit signed integer representing milliseconds since Midnight Jan 1, 1970 UTC.

For clarity, you might want to add the word "elapsed" and also reconfirm that it's TAI being discussed here:

Encodes a point in time using a 64 bit signed integer representing elapsed milliseconds since Midnight Jan 1, 1970 UTC. Note that time stamps change in lockstep with TAI, not UTC nor Unix time.

Using a TAI-based metric instead of Unix time (time_t) is good because the Unix time_t definition is a bad design: see this page. Furthermore, UTC, which time_t is an encoding of, cannot be used to talk precisely about moments beyond 6 months in the future, because of the leap-second table that it needs.


TAI-based metrics are almost perfect except that they're damned awkward to use right on most computer systems out there, because most systems use Unix time. Worse, using TAI but offset so that second zero is the time_t epoch gives a number currently within 30s or so of time_t, which could lead to a lot of confusion.

Choosing a representation of an instant for use in a new protocol is hard. You get a tradeoff between:

  • time_t: ambiguous representation and lack of accuracy when referring to moments more than 6mo in the future; on the plus side, wide support and low potential for making mistakes (other than those caused by the inherent flaws in the definition).

  • UTC, but not encoded as a count of seconds since some epoch: unambiguous representation, but lack of accuracy with future dates still and also poor library support plus the hassle of defining some new representation.

  • TAI offset to be based at the time_t epoch: unambiguous representation, fully accurate, but looks so similar to Unix time that people will probably make mistakes. (Will the mistakes be harmful though? How harmful is being off by 30s? You might ask the Mars Climate Orbiter.)

  • Seconds since 2000-01-01T00:00:00Z, in lockstep with TAI: unambiguous representation, fully accurate, looks dissimilar enough that it won't be mistaken for Unix time, but jolly weird and still needs a leap-second table for display purposes, like any other TAI-based metric.

  • Chronological Julian Days in GMT: unambiguous representation, fully accurate, but difficult to convert back and forth to Unix time without a full calendar package.

Additionally, if you ever want to simply subtract one timestamp from another to get some idea of the interval between them, Julian Days and Unix time don't work; TAI-based metrics are the only ones that give accurate results for this use.

Ideally we want these timestamps to be able to be accurate up to and beyond 50 years in the future or so. If it were me, I'd use TAI either with its own built-in epoch, 1958-01-01T00:00:00Z, or with a suitable easy-to-remember epoch such as 2000-01-01T00:00:00Z.

(Thanks to @ciphergoth, @simonmacm, @squaremobius and @michaeljforster for an informative discussion of this issue.)


  1. There's an open bug report in the AMQP protocol JIRA system for this issue: ISSUE-167 

The 0MQ Transport Layer Specification

The 0MQ team has just released a draft specification of the 0MQ transport layer. It's a sweet little spec—go take a look! The nice thing about it is that it concentrates on building a flexible transport layer without putting any constraints on the content, routing or meaning of the datagrams it carries.1

Another interesting aspect of it is that messages are structured as a list of segments (frames)—and that structure is exposed to the next layer up! Such a seemingly small decision has large, and favourable, consequences: the next layer up can use the segmented structure of messages to represent many interesting patterns:

  • a split between message envelope and message contents;
  • a split between message headers and message body;
  • a list of addresses to use in routing a message, like the old-school bang paths;
  • a stream of content being delivered in chunks as it becomes available; and so on.

In my opinion, the AMQP spec should have been split up into similarly small, flexible pieces as part of the work leading up to the current 1.0 drafts. In defence of the working group's current approach, a monolithic spec can make it easier to build a coherent whole; but on the minus side, it can make it so difficult to gain experience with the system being designed that coherence remains out of reach. For instance, it is not now (and probably never will be) possible to write a one-line message sender using bash for AMQP.


  1. Well, that's not quite true: it describes parts of the lowest levels of a few messaging patterns that the transport has been used for in addition to the transport itself; my opinion is that those pieces should be split out from spec:13 and placed in specifications of their own. 

Rube Goldberg contraptions with RabbitMQ

I've just finished building and deploying this website. It uses jekyll to render the content, and the content author uses git to push the changes up to the hosting machine. From there, a nice little chain of programs arranges for the site to be rebuilt on the server and made live:

  • A git post-receive hook uses curl to HTTP POST an empty message into a RabbitMQ exchange via the RabbitHub plugin.

  • D. J. Bernstein's daemontools supervises an instance of amqp-consume, which connects to a queue bound to the exchange the post-receive hook delivers into, and whenever a message is received, invokes a shell-script. The command-line for invoking amqp-consume is roughly

    amqp-consume \
       -s localhost \
       --username=... --password=... \
       -e exchangename \
       -A \
       /path/to/rebuild-website-script
  • The shell-script invoked for every message from RabbitMQ checks out a fresh copy of the website, compiles it, and deploys the resulting static HTML into the correct location on the file system for Apache to pick up.

  • I'm also monitoring the RabbitMQ exchange using the rabbitmq-xmpp plugin talking to my desktop XMPP client, Adium, so whenever anyone does a git push, I get a message appearing in my IM client from exchangename@my.rabbitmq.hostname letting me know a new version of the site has just gone live.