home

Consistent Database and Message Queue Writes

Jan 22, 2020

If you've ever needed to write to a message queue as part of an action that also involved your database, you've probably run into a problem: how to coordinate the two actions?

If you publish to your queue outside the transaction, there's a chance the publish fails and you'll have no easy way to rollback the [now committed] database changes. If you publish inside the transaction, you'll have sent out a message that isn't consistent with your database..

What we need is a way for our publish to participate in the database transaction. The solution that I've been leaning on heavily of late is the transactional outbox pattern.

The gist of the pattern is to stage messages in the database as part of the transaction. We still need to get these staged messages out of the database and into the actual message queue, but that's a trivial challenge.

Let's look at this in more detail. First, we'll add a table:

create table outbox (
  id int generated by default as identity primary key,
  route text not null,
  payload bytea not null
)

Now, given code that wrote to the queue outside our transaction:

DB.transaction(fn conn ->
  # maybe do stuff...
  DB.query!(conn, "insert into users ....")
  # ... and more stuff
end)
Queue.write("user.create", payload)

Or that wrote inside the transaction:

DB.transaction(fn conn ->
  # maybe do stuff...
  DB.query!(conn, "insert into users ....")
  # ... and more stuff
  Queue.write("user.create", payload)
end)

We'd rewrite as:

DB.transaction(fn conn ->
  # maybe do stuff...
  DB.query!(conn, "insert into users ....")
  # ... and more stuff

  DB.query!(conn, "
    insert into outbox (route, payload)
    values ($1, $2)
  ", ["user.create", payload])
end)

Again, the advantage of the above is we get an all-or-nothing across both writes. But we're [obviously] not done. We still need to, you know, actually write to our message queue.

Exactly how you trigger the next code is up to you. You could use a DB listener (which isn't without its own challenge) or poll the DB every few seconds. In the next part I'll show the solution that we're using. But, for now, setting asside how/when this code is executed, the publishing code looks like:

DB.transaction(fn conn ->
  rows = DB.query!(conn, "
    with outbox as (
      delete from outbox order by id  -- optionally limit this
      returning *
    )
    select route, payload
    from outbox
    order by id
  ")

  for row <- rows do
    Queue.write(row.route, row.payload)
  end
end)

You might be thinking I haven't solve the problem, I've only shifted where it happens. If it was valid to be concerned about a queue write failure before, it's valid to be concerned about it now, right? But the original problem was either writing to the queue but not to the database (when the publish is inside the transaction) or writing to the database but not the queue (when the publish was outside the transaction). Neither of these are now possible. What is now possible is that we publish the same message to the queue multiple times. This would happen if we publish but fail to commit the delete.

The above shouldn't be a problem though, or rather, it shouldn't be a new problem. You're already dealing with (or should be dealing with) at-least-once messaging semantics. Your consumers already have to handle re-delivery (likely by being idempotent). Conversely, if you're dealing with at-most-once, then you wouldn't care about possibly losing messages in the first place (so you could just stick the publishing to the message queue after the transaction).

The above code is just an outline; there's a lot of flexibility in the actual implementation. You could leverage the message staging in the outbox table to do de-duplication. You could add parallelism to the code that drains the outbox table (if some types require ordering, you'd add a group column to outbox and have a drain per group). You could use triggers to populate the outbox (particularly useful for projects that have a lot of different places that write data (including manual manipulation)).

In the next part I'll also show how we trigger the draining process in a way that minimizes latency without having to constantly poll. Consequently, I find our approach an illustrative demonstration of the elegance of Elixir.