home

Building A Queue - Part 4

Mar 18, 2015

So far, we looked at the core pieces needed to write messages to our queue. For reading, the approach is similar to much of the code we've already looked at. In fact, the state data, which contains a segment reference id and offset position, can be used, as-is. A reader is represented as a channel. A channel has a name and state entry. For a topic that tracks new users, you might have a channel named "send welcome" and another named "update stats". They each have their own state data and are thus independent of each other.

One of the interesting things about the channel is that its position doesn't need to increase until after we've confirmed that the message was properly handled. This is part of the at-least once guarantee. The code looks like:

func (c *Channel) handle(message []byte) bool {
  // handler is a callback provided by the consumer
  if err := c.handler(message); err != nil {
    return false
  }
  // we only move forward if the handler didn't return an error
  c.Lock()
  c.state.offset += uint32(len(message) + MESSAGE_OVERHEAD)
  c.Unlock()
  return true
}

By having the entire system rely on pointers to the data, you should be able to see how various other operations, such as peeking (the above is essentially a peek followed by a pop) and batch-gets, are doable.

What I really wanted to talk about today was the way the topic notifies channels that new data is available. It might be tempting to rely on Go channels for this, but that would be a mistake. The problem with Go channels is that they block. Oh, you can create a buffered channel, but at some point, it'll block (or you'll run out of memory).

A common way to handle a block is to drop messages. This isn't an option for us. Another one is to apply back-pressure to the producer. Again, this isn't a particularly good solution for a queue - we want it to be possible for a channel to be disconnected for hours, or days, while messages keep coming.

Our solution is to use a condition variable, which I've talked about before. When a message is written, the topic will call the notify method of each active channel (channels that aren't currently connected don't need to be notified of new messages, when they start up, they'll simply read all the messages until they've caught up to the tail):

func (c *Channel) notify() {
  c.Lock()
  c.waiting += 1
  c.Unlock()
  c.cond.Signal()
}

Not only should this avoid blocking the publisher (as long as no other code holds up to our lock for an unnecessary amount of time), but it also doesn't take an ever-increasing amount of memory as messages pile up.

Let's dump the consuming code, then explain it:

func (c *Channel) Consume(handler Handler) {
  c.Lock()
  for {
    for c.waiting == 0 {
      c.cond.Wait()
    }
    c.Unlock()
    processed := 0
    for {
      message := c.topic.read(c)
      if message == nil {
        c.Lock()
        c.waiting -= processed
        break
      }
      if c.handle(message) {
        processed++
      }
    }
  }
}

The above is a never-ending for loop which blocks (c.cond.Wait()) until it gets notified (via the Signal call we already saw). Remember, Wait automatically unlocks our mutex when we enter, and locks it back up when we exit. Once we have a signal, we read messages until we've drained the queue (when message == nil). At which point, we block again. The code is written in such a way that it'll handle new messages that come in even while processing existing one (that's why we keep track of all the processed messages and only block when we know for sure that there are 0 waiting messages.)

That's the hard part of the channel. Everything else is straightforward and similar to the way the publisher. You can look at it all in more detail at https://github.com/karlseguin/sq.