Simple Service Bus / Message Queue with MongoDB

A service bus or message queue allow producers and subscribers to communicate asynchronously so that a system can handle disconnects, processes being stopped and started or enable peaks of demand to be handled beyond what the subscriber can immediately cope with. The queue acts as a buffer that the producer writes to and the subscriber reads from.

There are lots of implementations such as NServiceBus, MassTransit, Rhino Service Bus and the cloud-provided services such as Amazon’s Simple Queue Service and Window Azure’s AppFabric Service Bus. Some take a little time to get started with and the cloud ones can also rack up charges pretty quickly if you are doing too much polling.

Often, all that is needed is something fairly simple to buffer messages between processes and persist them. I’ve been making good use of MongoDB recently in conjunction with Jonathan Oliver’s EventStore library for a CQRS-based project so it seemed the obvious place to start – why not use MongoDB to store the queue?!

Now, I did have a look round first to see if anyone else had created something already and the closest I got was the post here: Why (and How) I Replaced Amazon SQS with MongoDB. However, from reading the MongoDB website I’d seen that it had Tailable Cursors which, together with the Capped Collections feature, seemed like the ideal tools to build a queue on and possibly more efficient – in fact, MongoDB uses these very features internally for its replication.

Why are these features important?

We don’t want the queue to just grow and grow and grow but would like to put a cap on the size. Once a capped collection in MongoDB is full it wraps round and starts overwriting the oldest records. Capped collections are actually pre-allocated which helps with performance too. All we need is a collection that will be big enough to cope with any downtime from the subscriber so that messages are not lost.

Capped collections also support natural sort order where you can read records in the order they were written to which means we don’t need an index which means both reads and writes will be much faster without MongoDB having as much extra work to do.

Tailable cursors block at the server so we don’t have to keep polling or have to give up some latency. If a cursor is opened and there is no data to return it just sits there waiting but will fire off the next record to you as soon as it comes in (actually, it doesn’t wait indefinitely but somewhere around 4 seconds but the result is the same – we only ‘poll’ every 4 seconds but get immediate notification of a new message).

So, with the newly released Official C# MongoDB Driver in hand I set-out to build my queue …

Before the details though, you can take a look at the finished result from this Jinq screen-cast:

video

We’ll try and keep things really simple for this example so welcome to the simplest queue interfaces ever conceived! We just have an interface for adding things to the queue and another for reading from it:

public interface IPublish<in T> where T : class
{
    void Send(T message);
}
public interface ISubscribe<out T> where T : class
{
    T Receive();
}

And of course we need something to actually send – again, we’ll keep things simple for the demo and have a very simple message with a couple of properties:

public class ExampleMessage
{
    public int Number { get; private set; }
    public string Name { get; private set; }

    public ExampleMessage(int number, string name)
    {
        Number = number;
        Name = name;
    }

    public override string ToString()
    {
        return string.Format("ExampleMessage Number:{0} Name:{1}", Number, Name);
    }
}

The ExampleMessage will be the generic <T> parameter to the queue interfaces but we’re going to want to store a bit more information in MongoDB than the message itself so we’ll also use a MongoMessage class to add the extra properties and act as a container / wrapper for the message itself. Nothing outside of the queue will ever see this though:

public class MongoMessage<T> where T : class
{
    public ObjectId Id { get; private set; }
    public DateTime Enqueued { get; private set; }
    public T Message { get; private set; }

    public MongoMessage(T message)
    {
        Enqueued = DateTime.UtcNow;
        Message = message;
    }
}

This will give each message that we send an Id and also record the date / time that it was enqueued (which would enable us to work-out the latency of the queue). The Id is an ObjectId and this is the default Document ID type that MongoDB uses. All of the messages that we write to our queue will be assigned an Id and these should be sortable which we can use to pick up our position when reading from the queue should we need to re-start.

Here is what the messages look like inside of MongoDB (via the excellent MongoVUE GUI tool):

With the interfaces and commands in place we can add a couple of projects to show how each side will be used. First the producer which will just write commands to our queue:

class Program
{
    private static readonly ManualResetEvent Reset = new ManualResetEvent(false);
    private static long lastWrite;
    private static long writeCount;
    private static Timer timer;
    private static readonly object _sync = new object();

    static void Main(string[] args)
    {
        Console.WriteLine("Publisher");
        Console.WriteLine("Press 'R' to Run, 'P' to Pause, 'X' to Exit ...");

        timer = new Timer(TickTock, null, 1000, 1000);

        var t = new Thread(Run);
        t.Start();

        var running = true;
        while (running)
        {
            if (!Console.KeyAvailable) continue;

            var keypress = Console.ReadKey(true);
            switch (keypress.Key)
            {
                case ConsoleKey.X:
                    Reset.Reset();
                    running = false;
                    break;
                case ConsoleKey.P:
                    Reset.Reset();
                    Console.WriteLine("Paused ...");
                    break;
                case ConsoleKey.R:
                    Reset.Set();
                    Console.WriteLine("Running ...");
                    break;
            }
        }

        t.Abort();
    }

    public static void Run()
    {
        IPublish<ExampleMessage> queue = Configuration.GetQueue<ExampleMessage>();

        var i = 0;

        while (true)
        {
            Reset.WaitOne();
            i++;

            var message = new ExampleMessage(i, "I am number " + i);
            queue.Send(message);
            Interlocked.Increment(ref writeCount);

            if (i == int.MaxValue)
                i = 0;
        }
    }

    public static void TickTock(object state)
    {
        lock (_sync)
        {
            Console.WriteLine("Sent {0} {1}", writeCount, writeCount - lastWrite);
            lastWrite = writeCount;
        }
    }
}

… and the consumer which will read from the queue:

class Program
{
    private static readonly ManualResetEvent Reset = new ManualResetEvent(false);
    private static long lastRead;
    private static long readCount;
    private static Timer timer;
    private static readonly object _sync = new object();

    static void Main(string[] args)
    {
        Console.WriteLine("Subscriber");
        Console.WriteLine("Press 'R' to Run, 'P' to Pause, 'X' to Exit ...");

        timer = new Timer(TickTock, null, 1000, 1000);

        var t = new Thread(Run);
        t.Start();

        var running = true;
        while (running)
        {
            if (!Console.KeyAvailable) continue;

            var keypress = Console.ReadKey(true);
            switch (keypress.Key)
            {
                case ConsoleKey.X:
                    Reset.Reset();
                    running = false;
                    break;
                case ConsoleKey.P:
                    Reset.Reset();
                    Console.WriteLine("Paused ...");
                    break;
                case ConsoleKey.R:
                    Reset.Set();
                    Console.WriteLine("Running ...");
                    break;
            }
        }

        t.Abort();
    }

    public static void Run()
    {
        ISubscribe<ExampleMessage> queue = Configuration.GetQueue<ExampleMessage>();

        while (true)
        {
            Reset.WaitOne();
            var message = queue.Receive();
            Interlocked.Increment(ref readCount);
        }
    }

    public static void TickTock(object state)
    {
        lock (_sync)
        {
            Console.WriteLine("Received {0} {1}", readCount, readCount - lastRead);
            lastRead = readCount;
        }
    }
}

Both show the total number of messages sent or received and also the number in the last second.

Finally, the MongoQueue implementation. It could be a little simpler but I wanted to make sure things were as simple as possible for the consumers and should be easy enough to follow.

public class MongoQueue<T> : IPublish<T>, ISubscribe<T> where T : class
{
    private readonly MongoDatabase _database;
    private readonly MongoCollection<MongoMessage<T>> _queue;	// the collection for the messages
    private readonly MongoCollection<BsonDocument> _position;	// used to record the current position
    private readonly QueryComplete _positionQuery;

    private ObjectId _lastId = ObjectId.Empty;					// the last _id read from the queue

    private MongoCursorEnumerator<MongoMessage<T>> _enumerator;	// our cursor enumerator
    private bool _startedReading = false;						// initial query on an empty collection is a special case

    public MongoQueue(string connectionString, long queueSize)
    {
        // our queue name will be the same as the message class
        var queueName = typeof(T).Name;
        _database = MongoDatabase.Create(connectionString);

        if (!_database.CollectionExists(queueName))
        {
            try
            {
                Console.WriteLine("Creating queue '{0}' size {1}", queueName, queueSize);

                var options = CollectionOptions
                    // use a capped collection so space is pre-allocated and re-used
                    .SetCapped(true)
                    // we don't need the default _id index that MongoDB normally created automatically
                    .SetAutoIndexId(false)
                    // limit the size of the collection and pre-allocated the space to this number of bytes
                    .SetMaxSize(queueSize);

                _database.CreateCollection(queueName, options);
            }
            catch
            {
                // assume that any exceptions are because the collection already exists ...
            }
        }

        // get the queue collection for our messages
        _queue = _database.GetCollection<MongoMessage<T>>(queueName);

        // check if we already have a 'last read' position to start from
        _position = _database.GetCollection("_queueIndex");
        var last = _position.FindOneById(queueName);
        if (last != null)
            _lastId = last["last"].AsObjectId;

        _positionQuery = Query.EQ("_id", queueName);
    }

    public void Send(T message)
    {
        // sending a message is easy - we just insert it into the collection
        // it will be given a new sequential Id and also be written to the end (of the capped collection)
        _queue.Insert(new MongoMessage<T>(message));
    }

    public T Receive()
    {
        // for reading, we give the impression to the client that we provide a single message at a time
        // which means we maintain a cursor and enumerator in the background and hide it from the caller

        if (_enumerator == null)
            _enumerator = InitializeCursor();

        // there is no end when you need to sit and wait for messages to arrive
        while (true)
        {
            try
            {
                // do we have a message waiting?
                // this may block on the server for a few seconds but will return as soon as something is available
                if (_enumerator.MoveNext())
                {
                    // yes - record the current position and return it to the client
                    _startedReading = true;
                    _lastId = _enumerator.Current.Id;
                    _position.Update(_positionQuery, Update.Set("last", _lastId), UpdateFlags.Upsert, SafeMode.False);
                    return _enumerator.Current.Message;
                }

                if (!_startedReading)
                {
                    // for an empty collection, we'll need to re-query to be notified of new records
                    Thread.Sleep(500);
                    _enumerator.Dispose();
                    _enumerator = InitializeCursor();
                }
                else
                {
                    // if the cursor is dead then we need to re-query, otherwise we just go back to iterating over it
                    if (_enumerator.IsDead)
                    {
                        _enumerator.Dispose();
                        _enumerator = InitializeCursor();
                    }
                }
            }
            catch (IOException)
            {
                _enumerator.Dispose();
                _enumerator = InitializeCursor();
            }
            catch (SocketException)
            {
                _enumerator.Dispose();
                _enumerator = InitializeCursor();
            }
        }
    }

    private MongoCursorEnumerator<MongoMessage<T>> InitializeCursor()
    {
        var cursor = _queue
            .Find(Query.GT("_id", _lastId))
            .SetFlags(
                QueryFlags.AwaitData |
                QueryFlags.NoCursorTimeout |
                QueryFlags.TailableCursor
            )
            .SetSortOrder(SortBy.Ascending("$natural"));

        return (MongoCursorEnumerator<MongoMessage<T>>)cursor.GetEnumerator();
    }
}

After opening a cursor we get an enumerator and try to read records. The call to MoveNext() will block for a few seconds if we’re already at the end of the cursor and may then timeout without returning anything. In this case we need to dispose of the enumerator and get another from the cursor but we don’t need to re-run the query – it’s still connected and available and we just need to ‘get more’ on it.

The reason for the _startedReading flag is that the initial query against an empty collection will result in an invalid cursor and we need to re-query in this case. However, we don’t want to re-query after that as it’s more efficient to let the cursor wait for additional results (unless the cursor is dead when we do need to re-query).

Occasionally, the connection will be broken which will cause an exception so we need to catch that and setup the cursor and enumerator again.

Assuming we got a record back then we return it to the client (yield return) and go back to get the next item. We also store the position of the last item read in the queue so that when we re-start we can skip any existing entries.

Here is an explanation of the query flags.

Query Flags:

AwaitData

If we get to the end of the cursor and there is no data we’d like the server to wait for a while until some more arrives. The default appears to be around 2-4 seconds.

TailableCursor

Indicates that we want a tailable cursor where it will wait for new data to arrive.

NoCursorTimeout

We don’t want our cursor to timeout.

So there it is – a simple but easy to use message queue or service bus that hopefully makes splitting an app into multiple processes with re-startability and fast asynchronous communication a little less challenging. I’ve found the performance of MongoDB to be outstanding and ease of setting this up beats the ‘proper’ message queue solutions. When it comes to the cloud, the small amount of blocking that the cursor does at the server saves us having to do a lot of polling while still giving us the fast low-latency response we want.

Please let me know what you think of the article and if you run into any issues or have any ideas for improvement for this approach.

UPDATED: Source code now on GitHub (https://github.com/CaptainCodeman/mongo-queue)

About these ads

23 thoughts on “Simple Service Bus / Message Queue with MongoDB

  1. I forgot to mention … this relies on a recent update to the MongoDB driver to support tailable cursors so you need to use the latest from github.

    • Thanks. I haven’t tried multiple subscribers with the queue but the MongoDB replication works very fast for other things so I would imagine the speed would be good (the capped collection / tailable cursor is what MongoDB uses itself for replication of course). The problem would be having multiple consumers – this works OK as a simple queue but it would probably need some flag to indicate which messages had been processed (which looks like what boxedice may be doing from that link). The other approach is to use FindAndModify or FindAndRemove but the capped-collection just seems ‘right’ for a queue.

  2. using capped collections for performance is good. But if the capped collection gets filled up then mongo will start overriding old messages. Which means loosing a messaging. It looks like to me that, in your case you are ok to loose those older messages. BUt I would think at any queuing system thats build should not loose a message. So why use capped collections at all.

    • Hi Aayush. This actually applies to all messaging queues – none can really ever offer infinite capacity and will typically have a configurable limit. For example MSMQ has a default of 8Gb (I think).

      The exact capacity required really needs to be enough to store enough messages to cover any downtime or disconnect so if you have a lot of messages or very large messages and need to cover longer periods of disconnect then the queue would need more space available.

      An important part of any distributed messaging-based system is monitoring and management – being able to see how long messages are taking to go through the queue or how many messages are in the queue can be the trigger that tells you some admin intervention is required or you need more processes to handle the messages.

      This is usually a better approach than having a system fill it’s disk which is what could happen without a limit and is why I think having a capped collection makes sense (the preallocated space is faster and also avoid the need to clean-up).

      Hope that explains the thinking behind it. There is an article here that goes into more detail: http://msdn.microsoft.com/en-us/magazine/cc663023.aspx#id0090052

      • OK. that makes sense.
        Another thing. Is your design based of a single subscriber or multiple subscribers. Because for multiple subscribers some form of locking has to be done so that multiple subscribers dont pick up the same message.

      • It’s really just a proof-of-concept but for true pub-sub (with multiple subscribers) all that would be needed is for each one to keep their own pointer to where they were in the queue so that would work OK (similar to having multiple slaves using the same MongoDB oplog to keep in sync).

        If instead you were after more of a ‘competing consumer’ pattern where you wanted each message to only be picked up by a single process (with the capability to add processes) then you’d need to have a flag on each message to indicate whether it was picked up or not. MongoDB has a FindAndModify option that could be used for this and can atomically get the next entry and flag it. The capped collection may be less appropriate in this case (from a need to use a tailable cursor) but would still work (because the flag update wouldn’t be changing the size of the entry).

        Hope that makes sense. Thanks for the interesting discussion!

  3. I’m trying to accomplish essentially the same thing. Thanks for sharing this code — it really helped me get everything working together.

    Have you had any problems with out of order messages getting lost? I ask because the docs say that ObjectIds should be in “mostly increasing” order, but does not guarantee it. In my application, I have many rows being inserted per second, and I’ve noticed that while the order is generally increasing, it is definitely not consistent. Therefore, every time my process restarts, and the query runs again, I lose some rows and re-process others (since some rows that were previously before the last id are now after and rows that were previously after are not greater than the last id, so they are excluded from the find).

    A capped collection with a tailable cursor seems to be the perfect fit for processing lots of rows as they come in, but I can’t seem to come up with a way to do it losslessly, since tailable cursors only support natural order. It would be nice if there was “resume” operation that would let me set up a tailable cursor directly from an id.

    My options, as I see them are:

    1) Tailable cursor. Advantages: keeps collection at a fixed size, easy/fast to process rows as they come in. Disadvantages: Ids are not necessarily in increasing order so I lose some every time the query is run again (using > last_id as a query param). Cannot order by id since tailable cursor forces $natural sort only.

    2) Tailable cursor with no find parameters. Advantages: stable. Disadvantages: Have to scan through whole table to get to last processed id every time query is run. Skip() doesn’t help as it scans the whole table anyway, plus I don’t know how many rows to skip.

    3) Regular cursor, periodic batch queries, order by increasing id. Advantages: stable. Disadvantages: requires index for select to be fast, which slows down inserts (I need these as fast as possible).

    4) Use a flag on new rows, modifying it after its processed. Advantages: stable. Disadvantages: Now have update calls in addition to inserts and finds. This definitely needs an index.

    Anybody have any thoughts or suggestions? Thanks!

    • I think the ObjectIds created by a single client (or server) are in order but if you have multiple producers with slightly different clocks then you could have some overlap because the time is the most significant part of the Id.

      Normally, this doesn’t affect the tailable cursor used by the server oplog as it’s the only ObjectId producer and for our own queue it doesn’t matter when we’re reading through in natural order. Where it potentially hits is, as you say, when the cursor needs to be re-started and for this the last ObjectId read is used.

      What you could do to make sure you don’t lose any messages is create a new ObjectId based on the last one read but set a few seconds previously (whatever the longest time that you think the clocks could be out by). This slight-rewind should ensure that nothing was missed at the expense of re-reading a few messages when the cursor needs to be restarted. If the number of messages was so high that this was an issue then you should probably be looking at other queuing systems anyway (ZeroMQ if you have a really high rate).

      The other problem is common with nearly all queuing systems where you *may* get repeated messages. This is caused by the problem above where the system is trying to guarantee that nothing is missed and is why most messaging systems actually guarantee “at least once delivery” – i.e. you may get some messages more than once and you have to handle it. Because of this, it’s good to make the message handling idempotent when possible or have some recent-cache on the receiver to avoid re-processing duplicates.

      So, for your options:

      1). Use the last id to rewind slightly when creating the new cursor. This is quite efficient and once done you are going through in $natural order so it’s quick and you get all the tailable cursor advantages (blocking on the server with immediate response for new messages etc…)

      2). I don’t think this is a viable option other than for very small queues. #1 should be more efficient. It is how the example starts when nothing has been read but you always have to handle the cursor dying and needing to be restarted so you’ll be doing #1 anyway (I don’t think .Skip() works because a capped collection wraps round so keeping track of how much to skip may be impossible).

      3). You could use ‘findAndModify’ or ‘findAndRemove’ for this but they were slower than using a capped collection when I tried them.

      4). You can actually still have a flag with the capped collection / tailable cursor as long as you are doing an in-place update (i.e. you can’t add a ‘processed’ flag but you could switch an existing processed flag to true). This gives you more certainty in what has and hasn’t been processed in order to re-start the cursor at the expense of having to do an update for every read (which, to be fair, the example above is doing to maintain the last-read id). I don’t think you need an index for this either.

      Hope that helps, thanks for the interesting discussion!

    • I thought of another option …

      If you build a sparse index on a datetime field (e.g. the timestamp when the item was added) and then $unset this field when the item is processed then sorting the collection will only return the items that haven’t been processed yet and also return them in the correct order.

      The only items included in the index will be those in the queue so the index is kept smaller than if you were just using a regular index on a collection which should help performance (by keeping it in RAM).

  4. Pingback: Possible for PHP app built on top of codeigniter to connect to a MySQL AND a mongoDB database at the same time? | Gravity Layouts

  5. Pingback: Possible for PHP app built on top of codeigniter to connect to a MySQL AND a mongoDB database at the same time? | SeekPHP.com

  6. For multiple consumers, you can avoid having them consume the same data by using a single broker with a tail connection that splits the data out to the consumers. If the consumer tells the broker they completed the job with the time limit, then the broker unsets the flag (datetime) field (as per your last comment). Otherwise log an error and hand the job to a different consumer. If the broker fails and is restarted it will query according to the flag to find its starting position, and the consumers will need to reconnect and tell it about any jobs finished in the mean time or they will need to mark the jobs finished themselves.

  7. Hello Captain,
    I was trying to download the source code but the link seems to be broken.
    I am very impressed with your approach, so please share the source code for this article as soon as possible.
    Regards
    Reena

  8. HI Codeman
    have you tried how many messages it can process in per second? just curious how it works if you give 200K/sec or something in that order?

    thanks

    • YMMV but on my PC I think it stabilized at around 20-25k per second.

      You’d just need to make sure that the capped collection size was big enough to buffer message peaks (i.e. aim for average throughput)

  9. Hi! everything works great except that on the latest MongoDB C# driver MongoCursorEnumerator<MongoMessage> InitializeCursor() throws casting exception on Subscriber Receive:

    Unable to cast object of type ‘d__0[TestProject.Common.Entities.MongoMessage`1[TestProject.Common.Entities.ExampleMessage]]’ to type ‘MongoDB.Driver.MongoCursorEnumerator`1[TestProject.Common.Entities.MongoMessage`1[TestProject.Common.Entities.ExampleMessage]].

    Have you guys been able to make this work on latest C# MongoDB driver ver (1.8.3.9)? Whats the workaround for it?

    Thank you!

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s