Creating labels for GitHub issue system

I liked the ideas for Managing your backlog with GitHub Issues and the type of labels used but creating them was harder than it should have been because of the unicode characters and custom colors and so using them consistently on multiple projects would mean repeating the same work each time (unless there is a ‘copy labels’ button that I haven’t noticed!).

So, I decided to write a little script to automate the process. It creates a slightly different set of labels as shown below but could be easily adapted to your own needs:

You need to add curl to your path or put it in the same folder as the script which prompts you for your GitHub Profile, Password and Project name.

The actual script (save as a cmd file):

@echo off
SETLOCAL
echo This script creates issue labels for a GitHub repository
echo.
echo Please specify the GitHub Profile containing the Repository, e.g.:
echo https://github.com/MyProfile/MyCoolProject
echo                    ~~~~~~~~~
set /P username= "  Enter Profile   : "
echo.
echo Please specify the GitHub password for that profile:
set /P password= "  Enter Password  : "
echo.
echo Please specify the GitHub Repository, e.g.:
echo https://github.com/MyProfile/MyCoolProject
echo                              ~~~~~~~~~~~~~
echo.
set /P repository= "  Enter Repository: "
echo.
echo Creating labels ...
curl -k -u "%username%:%password%" -d "{\"name\":\"Feature\",\"color\":\"2d9e11\"}" https://api.github.com/repos/%username%/%repository%/labels
curl -k -u "%username%:%password%" -d "{\"name\":\"Bug\",\"color\":\"e10c02\"}" https://api.github.com/repos/%username%/%repository%/labels
curl -k -u "%username%:%password%" -d "{\"name\":\"Rejected\",\"color\":\"000000\"}" https://api.github.com/repos/%username%/%repository%/labels
curl -k -u "%username%:%password%" -d "{\"name\":\"Idea\",\"color\":\"e102d8\"}" https://api.github.com/repos/%username%/%repository%/labels
curl -k -u "%username%:%password%" -d "{\"name\":\"Task\",\"color\":\"0b02e1\"}" https://api.github.com/repos/%username%/%repository%/labels
curl -k -u "%username%:%password%" -d "{\"name\":\"\u2605\",\"color\":\"fffdd6\"}" https://api.github.com/repos/%username%/%repository%/labels
curl -k -u "%username%:%password%" -d "{\"name\":\"\u2605\u2605\",\"color\":\"fff875\"}" https://api.github.com/repos/%username%/%repository%/labels
curl -k -u "%username%:%password%" -d "{\"name\":\"\u2605\u2605\u2605\",\"color\":\"fff200\"}" https://api.github.com/repos/%username%/%repository%/labels
echo.
echo Current labels ...
curl -k -u "%username%:%password%" https://api.github.com/repos/%username%/%repository%/labels
ENDLOCAL
pause

Feb 2013: Script updated to work with latest Curl for Windows that I tried on Windows 8

Introducing Embed.ly client for .NET

First of all if you haven’t heard of Embed.ly you really should check it out:

At it’s core, embedly is an oEmbed provider. oEmbed is a format for allowing an embedded representation of a URL on third party sites. The simple API allows a website to display embedded content (such as photos or videos) when a user posts a link to that resource, without having to parse the resource directly.

If you’ve ever posted a link on facebook and been impressed that it automatically added a title, some descriptive text and one or more preview images to select from or included a playable video automatically and want to build something like that into your own site then this is for you.

There were already client libraries for several other languages but none for .NET so I developed this. I’ve been using it on a forum app to automatically detect links to videos and images to build up a gallery of both and to make them playable within the post. You can also use it on the client via a jQuery plugin but then you lose the ability to build up the gallery and index the additional content. If someone has posted a link to a Bob Dylan video then I’d like that post to be returned if someone searches for ‘Dylan’.

The response from embedly can also include a flag to indicate if the URL is considered ‘safe’ (based on Google’s safe-browsing API).

Example

Here is an example of the original content posted showing how the link is converted into a video and the additional information retrieved.

Original Post

The user makes a post and just copies and pastes a regular link

original

Embedly Enhanced

The HTML is parsed and sanitized (using HtmlAgilityPack and a custom Html cleaning library) and the discovered URL checked with Embedly. We told embedly we wanted a preview of 640px maximum width so the html snippet returned fits perfectly and shows a playable preview:

with-embedly

Thumbnail Gallery

Embedly also returns static thumbnail images which are perfect to add to a gallery of content:

video-library

Additional Content

As well as the html preview and thumbnail, the title, description and other information is returned by embedly which can enhance the page to host the content or make it more searchable on our site:

video-preview

Embedly provides a much richer experience to the end user.

So what does the .NET client do?

Basically, it provides an easy way to make requests to embedly and get strongly-typed results back. It automatically handles the request to the embedly service to get the details of the services they support fully and has a high-performance regex-less way of matching URLs against them to see if they are supported (doing 500+ regex lookups against each URL is too slow when batch processing).

Requests to embedly can be filtered based on the provider information making it easy to limit requests to YouTube videos or Amazon products or perhaps any video or photo provider.

When requesting more than one URL the client will automatically batch them into a single HTTP request to embedly (which supports up to 20 URLs per request) and uses async downloading to handle the response without blocking or using valuable CPU time.

Finally, a caching mechanism helps avoid re-requesting URLs that you have recently checked operating at the URL level – the individual URL results are cached, not the entire embedly response which could be for 20 URLs so if you requested 60 URLs and 40 had already been requested it would only sand a single HTTP request to embedly whatever sequence they were requested in.

The caching can be disabled completed if required and there is also an InMemory cache provided as well as examples of an ADO / SQL Client cache and a MongoDB cache (which is the one I’m using myself).

What doesn’t it do?

At the moment it works for the base oEmbed endpoint only but I plan on adding support for the Preview and Objectify endpoints in the future.

Where do I get it?

You can download the source from GitHub: https://github.com/CaptainCodeman/embedly-dotnet or get a binary version as a NuGet package:

http://nuget.org/List/Packages/embedly

NOTE: I’ll probabably be splitting the NuGet version into a core / base package and separate cache providers to avoid bloating the dependencies.

How do I use it?

The source includes a sample project showing some of the ways you can use it but I’ll give a brief summary here.

Create a client

All requests go through the client which, at a minimum, needs an embedly account key provided which you can store however you want (the sample shows it stored in a .config file using the standard .NET COnfigurationManager). You can sign-up for a free account at http://embed.ly/pricing to get a key

var key = ConfigurationManager.AppSettings["embedly.key"]; 
var client = new Client(key);

Use a Cache

If you want to use a cache then this should be passed into the client constructor. Here’s an example using the MongoDB cache:

var key = ConfigurationManager.AppSettings["embedly.key"]; 
var database = ConfigurationManager.ConnectionStrings["embedly.cache"]; 
var cache = new MongoResponseCache(database.ConnectionString); 
var client = new Client(key, cache);

The final optional parameter when creating a client is the embedly request timeout. If the HTTP request to embedly takes longer than this then it is aborted and an exception returned instead of the embedly result. The default timeout for requests is 30 seconds.

List Embed.ly Providers

Once you have a client then you can see the list of providers that embedly supports:

foreach (var provider in client.Providers) 
{ 
    Console.WriteLine("{0} {1}", provider.Type, provider.Name); 
}

Check if a URL is supported:

Embed.ly supports over 200 different providers (all the big names like YouTube) although they will return results for the non-provider backed requests too.

var url = new Uri(@"http://www.youtube.com/watch?v=YwSZvHqf9qM") 
var supported = client.IsUrlSupported(url);

Get provider information for a URL:

You can get the provider for a URL (this does not make any additional requests to embedly beyond the initial retrieval of the provider list itself).

var url = new Uri(@"http://www.youtube.com/watch?v=YwSZvHqf9qM") 
var supported = client.IsUrlSupported(url); 
Console.WriteLine("Supported      : {0}", supported); 
Console.WriteLine();

var provider = client.GetProvider(url); 
Console.WriteLine("PROVIDER"); 
Console.WriteLine("About          : {0}", provider.About); 
Console.WriteLine("DisplayName    : {0}", provider.DisplayName); 
Console.WriteLine("Domain         : {0}", provider.Domain); 
Console.WriteLine("Favicon        : {0}", provider.Favicon); 
Console.WriteLine("Name           : {0}", provider.Name); 
Console.WriteLine("Regexs         : {0}", string.Join(", ", provider.Regexs)); 
Console.WriteLine("Subdomains     : {0}", string.Join(", ", provider.Subdomains)); 
Console.WriteLine("Types          : {0}", provider.Type);

Get the oEmbed information for a single URL:

The API supports single URL requests.

var url = new Uri(@"http://www.youtube.com/watch?v=YwSZvHqf9qM") 
var result = client.GetOEmbed(url, new RequestOptions { MaxWidth = 320 }); 

// basic response information 
var response = result.Response; 
Console.WriteLine("Type           : {0}", response.Type); 
Console.WriteLine("Version        : {0}", response.Version);

// link details 
var link = result.Response.AsLink; 
Console.WriteLine("Author         : {0}", link.Author); 
Console.WriteLine("AuthorUrl      : {0}", link.AuthorUrl); 
Console.WriteLine("CacheAge       : {0}", link.CacheAge); 
Console.WriteLine("Description    : {0}", link.Description); 
Console.WriteLine("Provider       : {0}", link.Provider); 
Console.WriteLine("ProviderUrl    : {0}", link.ProviderUrl); 
Console.WriteLine("ThumbnailHeight: {0}", link.ThumbnailHeight); 
Console.WriteLine("ThumbnailUrl   : {0}", link.ThumbnailUrl); 
Console.WriteLine("ThumbnailWidth : {0}", link.ThumbnailWidth); 
Console.WriteLine("Title          : {0}", link.Title); 
Console.WriteLine("Url            : {0}", link.Url);

// video specific details 
var video = result.Response.AsVideo; 
Console.WriteLine("Width          : {0}", video.Width); 
Console.WriteLine("Height         : {0}", video.Height); 
Console.WriteLine("Html           : {0}", video.Html);

Get oEmbed information for a list of URLs:

Any IEnumerable<Uri> list of URLs can be processed as a batch. The .NET client will return results as they arrive.

var results = client.GetOEmbeds(urls, new RequestOptions { MaxWidth = 320 })

Limit the URLs to request to supported providers only:

(embedly can return results for ‘unsupported’ providers but the supported ones typically have richer content.

var results = client.GetOEmbeds(urls, provider => provider.IsSupported);

Limit the URLs to request to a single provider:

A lambda expression enables the request to be filtered on any property of the provider identified for a URL.

var results = client.GetOEmbeds(urls, provider => provider.Name == "youtube")

Limit the URLs to request based on the type of provider:

Each provider has a Type to indicate the content they return so if you are only interested in video links you can filter on that type.

var results = client.GetOEmbeds(urls, provider => provider.Type == ProviderType.Video);

NOTE: ‘urls’ is an IEnumerable<Uri> in the above.

NOTE: RequestOptions enables a number of additional request arguments to be specified, see: http://embed.ly/docs/endpoints/arguments

The Result returned contains the original request (URL and any matching provider) an Exception (if the HTTP request failed) or a Response which could be an embedly Error (used to indicate if the URL being inspected doesn’t exist for instance) or one of the specific response types (Link, Photo, Rich and Video).

Extension methods enable the results to be filtered as a convenience for:

result.Success()

Returns results that were successful only

result.Failed()

Returns results that failed (HTTP error during request to embedly)

result.Errors()

Returns results that embedly responded with an error code. i.e. the request to embedly was successful but maybe the URL doesn’t exist

result.Link()

Returns results that are of type Link

result.Photos()

Returns results that are of type Photo

result.Richs()

Returns results that are of type Rich

result.Videos()

Returns results that are of type Video

If you are iterating over multiple results and want to handle them correctly then the first step is to check each result’s Exception property. If there was an exception during the HTTP request to embedly then this will be set. If it is null then the request to embedly was successful in that embedly returned a response but that response may be an Error, a Link or a Phot, Rich or Video. The Respone.Type will indicate the response and the As[type] property is a convenience way to get the Response as the particular type.

foreach (var result in results.Successful()) 
{ 
    if (result.Exception == null) 
    { 
        Console.WriteLine("{0} found for {1} ({2})", result.Response.Type, result.Request.Url, result.Request.Provider.Name); 
        switch (result.Response.Type) 
        { 
            case ResourceType.Error: 
                var error = result.Response.AsError; 
                Console.WriteLine("  code:{0} message:{1}", error.ErrorCode, error.ErrorMessage); 
                break; 
            case ResourceType.Link: 
                var link = result.Response.AsLink; 
                Console.WriteLine("  title:{0}", link.Title); 
                Console.WriteLine("  url:{0}", link.Url); 
                break; 
            case ResourceType.Photo: 
                var photo = result.Response.AsPhoto; 
                Console.WriteLine("  title:{0} ({1}x{2})", photo.Title, photo.Width, photo.Height); 
                Console.WriteLine("  url:{0}", photo.Url); 
                break; 
            case ResourceType.Rich: 
                var rich = result.Response.AsRich; 
                Console.WriteLine("  title:{0} ({1}x{2})", rich.Title, rich.Width, rich.Height); 
                Console.WriteLine("  url:{0}", rich.Url); 
                break; 
            case ResourceType.Video: 
                var video = result.Response.AsVideo; 
                Console.WriteLine("  title:{0} ({1}x{2})", video.Title, video.Width, video.Height); 
                Console.WriteLine("  url:{0}", video.Url); 
                break; 
        } 
    } 
    else 
    { 
        Console.WriteLine("Exception requesting {0} : {1}", result.Request.Url, result.Exception);                
    } 
}

Logging

The library uses the Common.Logging 2 library so you can plug it in to whatever your preferred logging framework is. The log output isn’t very rich right now but I’ll be expanding that in future so you can peek into what is happening.

Reactive Extensions

The other dependency is the Reactive Extensions which I’m new to but it really made the caching of individual LINQ responses much easier than it would otherwise be. The Push vs Pull model allows the pipeline to be split with cached items going to the return pipeline immediately and non-cached requests going through the full download pipeline. I’ll try and make a further post describing how this works.

Roadmap

I’d like to add support for the other embedly endpoints (Preview and Objectify) although I’m not using them myself at the moment – let me know if you’d find these useful.

Some custom Windows Performance Counters would probably be good to track how many requests are going through the library and what the cache-hit ratio is.

The current caching system is very simple and doesn’t have much support for expiring items which should be added.

Feedback

If you find the library useful or have any comments or suggestions to improve things I’d welcome any feedback.

Event-sourcing benefit: migrating between storage types

Here’s an example of one benefit of Event Sourcing …

How much work would typically result from “hey, we need to change platform and store our data in a different database …“? Even using NHibernate and going from one relational database to another, you’d potentially be looking at a significant piece of effort.

Now imagine instead trying to migrate between fundamentally different storage engines such as from SQL Server to Amazon S3 or from Oracle to MongoDB?!

Well, EventSourcing not only makes it possible, it also makes it trivially simple too. Here’s an actual EventStore migrator implementation that took all of two minutes to write:

	var source = GetSourceEventStore();
	var destination = GetDestinationEventStore();

	foreach (var commit in source.GetFrom(StartDate))
		destination.Commit(commit);

Oh look, it’s done !! Ok, before you accuse me of being slow there is the event-store wire-up which is where one of the minutes went (I could add some XML DI configuration to make it really generic but it’s probably not worth the effort):

	private static IPersistStreams GetSourceEventStore()
	{
		return Wireup.Init()
			.UsingMongoPersistence("source", new DocumentObjectSerializer())
			.UsingSynchronousDispatcher()
				.PublishTo(new NullDispatcher())
			.Build()
			.Advanced;
	}

	private static IPersistStreams GetDestinationEventStore()
	{
		return Wireup.Init()
			.UsingSqlPersistence("destination")
				.WithDialect(new MsSqlDialect())
				.InitializeStorageEngine()
				.UsingBinarySerialization()
				.Compress()
			.UsingSynchronousDispatcher()
				.PublishTo(new NullDispatcher())
			.Build()
			.Advanced;
	}

BTW: The “source” and “destination” strings are the names of connection strings in the <connectionStrings> section of the config file.

Version 2 of the “EventStore Migrator Enterprise EditionTM” may add the ability to store the last commit DateTime migrated and then restart from there to enable incremental replication. A real-life version would also need a little exception handling and that would probably make use of the last DateTime to continue from the right point after any failure.

So, with a couple of minutes work you can easily migrate between different storage engines and also have a mechanism to incrementally keep them in sync – maybe for disaster recovery or backup where you use a lower cost / higher latency event store in an emergency (and still have a way to migrate back).

Event sourcing and immutable events makes all this possible.

Simple Service Bus / Message Queue with MongoDB

A service bus or message queue allow producers and subscribers to communicate asynchronously so that a system can handle disconnects, processes being stopped and started or enable peaks of demand to be handled beyond what the subscriber can immediately cope with. The queue acts as a buffer that the producer writes to and the subscriber reads from.

There are lots of implementations such as NServiceBus, MassTransit, Rhino Service Bus and the cloud-provided services such as Amazon’s Simple Queue Service and Window Azure’s AppFabric Service Bus. Some take a little time to get started with and the cloud ones can also rack up charges pretty quickly if you are doing too much polling.

Often, all that is needed is something fairly simple to buffer messages between processes and persist them. I’ve been making good use of MongoDB recently in conjunction with Jonathan Oliver’s EventStore library for a CQRS-based project so it seemed the obvious place to start – why not use MongoDB to store the queue?!

Now, I did have a look round first to see if anyone else had created something already and the closest I got was the post here: Why (and How) I Replaced Amazon SQS with MongoDB. However, from reading the MongoDB website I’d seen that it had Tailable Cursors which, together with the Capped Collections feature, seemed like the ideal tools to build a queue on and possibly more efficient – in fact, MongoDB uses these very features internally for its replication.

Why are these features important?

We don’t want the queue to just grow and grow and grow but would like to put a cap on the size. Once a capped collection in MongoDB is full it wraps round and starts overwriting the oldest records. Capped collections are actually pre-allocated which helps with performance too. All we need is a collection that will be big enough to cope with any downtime from the subscriber so that messages are not lost.

Capped collections also support natural sort order where you can read records in the order they were written to which means we don’t need an index which means both reads and writes will be much faster without MongoDB having as much extra work to do.

Tailable cursors block at the server so we don’t have to keep polling or have to give up some latency. If a cursor is opened and there is no data to return it just sits there waiting but will fire off the next record to you as soon as it comes in (actually, it doesn’t wait indefinitely but somewhere around 4 seconds but the result is the same – we only ‘poll’ every 4 seconds but get immediate notification of a new message).

So, with the newly released Official C# MongoDB Driver in hand I set-out to build my queue …

Before the details though, you can take a look at the finished result from this Jinq screen-cast:

video

We’ll try and keep things really simple for this example so welcome to the simplest queue interfaces ever conceived! We just have an interface for adding things to the queue and another for reading from it:

public interface IPublish<in T> where T : class
{
    void Send(T message);
}
public interface ISubscribe<out T> where T : class
{
    T Receive();
}

And of course we need something to actually send – again, we’ll keep things simple for the demo and have a very simple message with a couple of properties:

public class ExampleMessage
{
    public int Number { get; private set; }
    public string Name { get; private set; }

    public ExampleMessage(int number, string name)
    {
        Number = number;
        Name = name;
    }

    public override string ToString()
    {
        return string.Format("ExampleMessage Number:{0} Name:{1}", Number, Name);
    }
}

The ExampleMessage will be the generic <T> parameter to the queue interfaces but we’re going to want to store a bit more information in MongoDB than the message itself so we’ll also use a MongoMessage class to add the extra properties and act as a container / wrapper for the message itself. Nothing outside of the queue will ever see this though:

public class MongoMessage<T> where T : class
{
    public ObjectId Id { get; private set; }
    public DateTime Enqueued { get; private set; }
    public T Message { get; private set; }

    public MongoMessage(T message)
    {
        Enqueued = DateTime.UtcNow;
        Message = message;
    }
}

This will give each message that we send an Id and also record the date / time that it was enqueued (which would enable us to work-out the latency of the queue). The Id is an ObjectId and this is the default Document ID type that MongoDB uses. All of the messages that we write to our queue will be assigned an Id and these should be sortable which we can use to pick up our position when reading from the queue should we need to re-start.

Here is what the messages look like inside of MongoDB (via the excellent MongoVUE GUI tool):

With the interfaces and commands in place we can add a couple of projects to show how each side will be used. First the producer which will just write commands to our queue:

class Program
{
    private static readonly ManualResetEvent Reset = new ManualResetEvent(false);
    private static long lastWrite;
    private static long writeCount;
    private static Timer timer;
    private static readonly object _sync = new object();

    static void Main(string[] args)
    {
        Console.WriteLine("Publisher");
        Console.WriteLine("Press 'R' to Run, 'P' to Pause, 'X' to Exit ...");

        timer = new Timer(TickTock, null, 1000, 1000);

        var t = new Thread(Run);
        t.Start();

        var running = true;
        while (running)
        {
            if (!Console.KeyAvailable) continue;

            var keypress = Console.ReadKey(true);
            switch (keypress.Key)
            {
                case ConsoleKey.X:
                    Reset.Reset();
                    running = false;
                    break;
                case ConsoleKey.P:
                    Reset.Reset();
                    Console.WriteLine("Paused ...");
                    break;
                case ConsoleKey.R:
                    Reset.Set();
                    Console.WriteLine("Running ...");
                    break;
            }
        }

        t.Abort();
    }

    public static void Run()
    {
        IPublish<ExampleMessage> queue = Configuration.GetQueue<ExampleMessage>();

        var i = 0;

        while (true)
        {
            Reset.WaitOne();
            i++;

            var message = new ExampleMessage(i, "I am number " + i);
            queue.Send(message);
            Interlocked.Increment(ref writeCount);

            if (i == int.MaxValue)
                i = 0;
        }
    }

    public static void TickTock(object state)
    {
        lock (_sync)
        {
            Console.WriteLine("Sent {0} {1}", writeCount, writeCount - lastWrite);
            lastWrite = writeCount;
        }
    }
}

… and the consumer which will read from the queue:

class Program
{
    private static readonly ManualResetEvent Reset = new ManualResetEvent(false);
    private static long lastRead;
    private static long readCount;
    private static Timer timer;
    private static readonly object _sync = new object();

    static void Main(string[] args)
    {
        Console.WriteLine("Subscriber");
        Console.WriteLine("Press 'R' to Run, 'P' to Pause, 'X' to Exit ...");

        timer = new Timer(TickTock, null, 1000, 1000);

        var t = new Thread(Run);
        t.Start();

        var running = true;
        while (running)
        {
            if (!Console.KeyAvailable) continue;

            var keypress = Console.ReadKey(true);
            switch (keypress.Key)
            {
                case ConsoleKey.X:
                    Reset.Reset();
                    running = false;
                    break;
                case ConsoleKey.P:
                    Reset.Reset();
                    Console.WriteLine("Paused ...");
                    break;
                case ConsoleKey.R:
                    Reset.Set();
                    Console.WriteLine("Running ...");
                    break;
            }
        }

        t.Abort();
    }

    public static void Run()
    {
        ISubscribe<ExampleMessage> queue = Configuration.GetQueue<ExampleMessage>();

        while (true)
        {
            Reset.WaitOne();
            var message = queue.Receive();
            Interlocked.Increment(ref readCount);
        }
    }

    public static void TickTock(object state)
    {
        lock (_sync)
        {
            Console.WriteLine("Received {0} {1}", readCount, readCount - lastRead);
            lastRead = readCount;
        }
    }
}

Both show the total number of messages sent or received and also the number in the last second.

Finally, the MongoQueue implementation. It could be a little simpler but I wanted to make sure things were as simple as possible for the consumers and should be easy enough to follow.

public class MongoQueue<T> : IPublish<T>, ISubscribe<T> where T : class
{
    private readonly MongoDatabase _database;
    private readonly MongoCollection<MongoMessage<T>> _queue;	// the collection for the messages
    private readonly MongoCollection<BsonDocument> _position;	// used to record the current position
    private readonly QueryComplete _positionQuery;

    private ObjectId _lastId = ObjectId.Empty;					// the last _id read from the queue

    private MongoCursorEnumerator<MongoMessage<T>> _enumerator;	// our cursor enumerator
    private bool _startedReading = false;						// initial query on an empty collection is a special case

    public MongoQueue(string connectionString, long queueSize)
    {
        // our queue name will be the same as the message class
        var queueName = typeof(T).Name;
        _database = MongoDatabase.Create(connectionString);

        if (!_database.CollectionExists(queueName))
        {
            try
            {
                Console.WriteLine("Creating queue '{0}' size {1}", queueName, queueSize);

                var options = CollectionOptions
                    // use a capped collection so space is pre-allocated and re-used
                    .SetCapped(true)
                    // we don't need the default _id index that MongoDB normally created automatically
                    .SetAutoIndexId(false)
                    // limit the size of the collection and pre-allocated the space to this number of bytes
                    .SetMaxSize(queueSize);

                _database.CreateCollection(queueName, options);
            }
            catch
            {
                // assume that any exceptions are because the collection already exists ...
            }
        }

        // get the queue collection for our messages
        _queue = _database.GetCollection<MongoMessage<T>>(queueName);

        // check if we already have a 'last read' position to start from
        _position = _database.GetCollection("_queueIndex");
        var last = _position.FindOneById(queueName);
        if (last != null)
            _lastId = last["last"].AsObjectId;

        _positionQuery = Query.EQ("_id", queueName);
    }

    public void Send(T message)
    {
        // sending a message is easy - we just insert it into the collection
        // it will be given a new sequential Id and also be written to the end (of the capped collection)
        _queue.Insert(new MongoMessage<T>(message));
    }

    public T Receive()
    {
        // for reading, we give the impression to the client that we provide a single message at a time
        // which means we maintain a cursor and enumerator in the background and hide it from the caller

        if (_enumerator == null)
            _enumerator = InitializeCursor();

        // there is no end when you need to sit and wait for messages to arrive
        while (true)
        {
            try
            {
                // do we have a message waiting?
                // this may block on the server for a few seconds but will return as soon as something is available
                if (_enumerator.MoveNext())
                {
                    // yes - record the current position and return it to the client
                    _startedReading = true;
                    _lastId = _enumerator.Current.Id;
                    _position.Update(_positionQuery, Update.Set("last", _lastId), UpdateFlags.Upsert, SafeMode.False);
                    return _enumerator.Current.Message;
                }

                if (!_startedReading)
                {
                    // for an empty collection, we'll need to re-query to be notified of new records
                    Thread.Sleep(500);
                    _enumerator.Dispose();
                    _enumerator = InitializeCursor();
                }
                else
                {
                    // if the cursor is dead then we need to re-query, otherwise we just go back to iterating over it
                    if (_enumerator.IsDead)
                    {
                        _enumerator.Dispose();
                        _enumerator = InitializeCursor();
                    }
                }
            }
            catch (IOException)
            {
                _enumerator.Dispose();
                _enumerator = InitializeCursor();
            }
            catch (SocketException)
            {
                _enumerator.Dispose();
                _enumerator = InitializeCursor();
            }
        }
    }

    private MongoCursorEnumerator<MongoMessage<T>> InitializeCursor()
    {
        var cursor = _queue
            .Find(Query.GT("_id", _lastId))
            .SetFlags(
                QueryFlags.AwaitData |
                QueryFlags.NoCursorTimeout |
                QueryFlags.TailableCursor
            )
            .SetSortOrder(SortBy.Ascending("$natural"));

        return (MongoCursorEnumerator<MongoMessage<T>>)cursor.GetEnumerator();
    }
}

After opening a cursor we get an enumerator and try to read records. The call to MoveNext() will block for a few seconds if we’re already at the end of the cursor and may then timeout without returning anything. In this case we need to dispose of the enumerator and get another from the cursor but we don’t need to re-run the query – it’s still connected and available and we just need to ‘get more’ on it.

The reason for the _startedReading flag is that the initial query against an empty collection will result in an invalid cursor and we need to re-query in this case. However, we don’t want to re-query after that as it’s more efficient to let the cursor wait for additional results (unless the cursor is dead when we do need to re-query).

Occasionally, the connection will be broken which will cause an exception so we need to catch that and setup the cursor and enumerator again.

Assuming we got a record back then we return it to the client (yield return) and go back to get the next item. We also store the position of the last item read in the queue so that when we re-start we can skip any existing entries.

Here is an explanation of the query flags.

Query Flags:

AwaitData

If we get to the end of the cursor and there is no data we’d like the server to wait for a while until some more arrives. The default appears to be around 2-4 seconds.

TailableCursor

Indicates that we want a tailable cursor where it will wait for new data to arrive.

NoCursorTimeout

We don’t want our cursor to timeout.

So there it is – a simple but easy to use message queue or service bus that hopefully makes splitting an app into multiple processes with re-startability and fast asynchronous communication a little less challenging. I’ve found the performance of MongoDB to be outstanding and ease of setting this up beats the ‘proper’ message queue solutions. When it comes to the cloud, the small amount of blocking that the cursor does at the server saves us having to do a lot of polling while still giving us the fast low-latency response we want.

Please let me know what you think of the article and if you run into any issues or have any ideas for improvement for this approach.

UPDATED: Source code now on GitHub (https://github.com/CaptainCodeman/mongo-queue)

Elmah Error Logging with MongoDB Official Driver

This basically takes the work that Pablo M. Cibraro did to use Elmah with the Samus CSharp Driver and converts it to work with the Official 10Gen CSharp Driver instead plus a few additional minor changes:

  1. A capped collection is still used but the maximum size (in bytes) and the document limit can now be set using the ‘maxDocuments’ and ‘maxSize’ parameters in the configuration. By default the limit is based on size only with 100mb allocated.
  2. The paged-results for the Elmah reporting page are sorted in descending order so the latest errors are shown first. This uses the $natural sort order of the capped collection.
  3. I’ve used the native MongoDB ObjectId for the error id which should be slightly faster that using a Guid and sorts better (also, if you were interested in saving a few bytes this stores the date and time too so could avoid saving it separately).
  4. Finally, I’ve use the convention of calling the collection ‘Elmah’ when there is no ApplicationName set and ‘Elmah-ApplicationName’ when it is.

Configuration is very similar (but uses the 10Gen driver connection string format):

<elmah>
    <errorLog type="Elmah.MongoErrorLog, Elmah.MongoDB" connectionStringName="ELMAH.MongoDB" maxSize="10485760" maxDocuments="10000" />
</elmah>
<connectionStrings>
    <add name="ELMAH.MongoDB" connectionString="server=localhost;database=elmah;"/>
</connectionStrings>

The source code and compiled binaries are attached. I’ll look at submitting this to the Elmah codebase.

Thanks again to Pablo for doing the hard work !

Running ElasticSearch as a Service on Windows 2008 x64

I think I first started using Apache Lucene for full-text indexing as part of NHibernate Search. At some point I decided I needed more control and did my own indexing using Lucene directly. Now, it seems the easiest approach is to make use of a packaged up search service and so I’ve been looking at ElasticSearch. So far, I’m very happy with it – it’s doing everything it say’s on the box and lets me offload all the full-text indexing and search functionality.

The only issue I’ve come across is trying to run it as a service on 64-bit Windows 7 or Windows 2008. While there is a service-wrapper available it just wasn’t working for me and I think the x64 platform may be part of that as there was only a elasticsearch-windows-x86-32.exe included, no elasticsearch-windows-x86-64.exe. This service wrapper seems to be based off a product that doesn’t appear to have a free community edition for 64-bit Windows.

So, I had a hunt around for ‘how to run a Java app as a Windows Service’ and came across the Apache Commons Daemon or ‘procrun‘. This worked so I thought I’d share it here in case anyone else is trying to do the same thing.

First of all, there are the pre-requisites: it’s a Java app so you need to have the Sun Java SDK installed and JAVA_HOME environment variable set.

Download ElasticSearch and extract it to a folder. I’m using 0.16.0 and put it into D:elasticsearch (because Program Files and UAC caused too many issues for me).

Before trying to set it to run as a service it’s best to make sure it runs as a regular app first. To start ElasticSearch on Windows there is a “binelasticsearch.bat” file to launch it which should show it running. As an extra check, there is a handy little web-based admin tool you can get called elasticsearch-head which will show the running status and provides a neat little browser / search interface. I extract this to D:elasticsearchtools. When you open the index.html file it lets you connect to your elasticsearch instance and show it’s status:

created

Downloading the Apache Commons Daemon or procrun is a little harder because it isn’t in the links on the download page. Instead you need to follow the ‘browse native binaries download area…’ link, then look in the windows folder for the zip file. The file I used was: commons-daemon-1.0.5-bin-windows.zip

Extract this to D:elasticsearchservice and then copy the amd64prunsrv.exe to the D:elasticsearchservice folder to replace the x86 version (or skip this step if you are actually running on a 32-bit OS).

Although we can set everything up with the exe files as they are, we’re going to rename them because it makes it clearer what is running on Windows Task Manager if you have other processes using this service runner. The convention is to use the service name and append a ‘w’ to the GUI manager exe so they become:

prunsvr.exe => ElasticSearch.exe
prunmgr.exe => ElasticSearchw.exe

Because we’ll be running things as a service it will be running under a different account than the regular process does when we run it interactively. I used the ‘NETWORK SERVICE’ account which is able to handle network traffic and gave this account full permissions to the D:elasticsearch folder so it will also be able to create data and log files.

Figuring out the command line to actually run the service is what took the longest. With a bit of trial and error and looking at the output from the batch file to launch elasticsearch I ended up with this which ‘works on my machine’. If it doesn’t work on yours try enabling the echo output from the batch file and checking the parameters are the same.

It’s easiest to put this into a create.cmd file to make editing and running it easier:

ElasticSearch.exe //IS//ElasticSearch --DisplayName="ElasticSearch" --Description="Distributed RESTful Full-Text Search Engine based on Lucene (http://www.elasticsearch.org/) --Install=D:elasticsearchserviceElasticSearch.exe --Classpath="D:elasticsearchlibelasticsearch-0.16.0.jar;D:elasticsearchlib*;D:elasticsearchlibsigar*" --Jvm="C:Program FilesJavajre6binserverjvm.dll" --JvmMx=512 --JvmOptions="-Xms256m;-Xmx1g;-XX:+UseCompressedOops;-Xss128k;-XX:+UseParNewGC;-XX:+UseConcMarkSweepGC;-XX:+CMSParallelRemarkEnabled;-XX:SurvivorRatio=8;-XX:MaxTenuringThreshold=1;-XX:CMSInitiatingOccupancyFraction=75;-XX:+UseCMSInitiatingOccupancyOnly;-XX:+HeapDumpOnOutOfMemoryError;-Djline.enabled=false;-Delasticsearch;-Des-foreground=yes;-Des.path.home=D:elasticsearch" --StartMode=jvm --StartClass=org.elasticsearch.bootstrap.Bootstrap --StartMethod=main --StartParams="" --StopMode=jvm --StopClass=org.elasticsearch.bootstrap.Bootstrap --StopMethod=main --StdOutput=auto --StdError=auto --LogLevel=Debug --LogPath="D:elasticsearchlogs" --LogPrefix=service --ServiceUser="NT AUTHORITYNetworkService" --Startup=auto

Phew !

Running that should create the service and running the ElasticSearchw.exe should how pop-up a GUI that lets us view and edit all the settings. The various tabs are shown below and should correspond to the settings defined above:

1-general 2-logon

3-logging4-java

5-startup 6-shutdown

You can also have the GUI run as a task-tray which gives you a handy way to start and stop the service while you’re developing. To do this, create a monitor.cmd file with the following command:

start ElasticSearchw.exe //MS

You should be able to right-click on the new tray icon and start the service:

starting

This isn’t mandatory though – the service should appear in the normal Windows Service Manager where it can be stopped and started as usual:

windows-services

Whether everything starts or not, you should get some useful information written to the log files. Here’s how mine looked after the service was started successfully.

service.2011-05-19.log:

[2011-05-19 10:21:30] [debug] ( prunsrv.c:1494) Commons Daemon procrun log initialized
[2011-05-19 10:21:30] [info]  (          :0   ) Commons Daemon procrun (1.0.5.0 64-bit) started
[2011-05-19 10:21:30] [info]  (          :0   ) Running 'ElasticSearch' Service...
[2011-05-19 10:21:30] [debug] ( prunsrv.c:1246) Inside ServiceMain...
[2011-05-19 10:21:30] [info]  (          :0   ) Starting service...
[2011-05-19 10:21:30] [debug] ( javajni.c:206 ) loading jvm 'C:Program FilesJavajre6binserverjvm.dll'
[2011-05-19 10:21:30] [debug] ( javajni.c:660 ) Jvm Option[0] -Xms256m
[2011-05-19 10:21:30] [debug] ( javajni.c:660 ) Jvm Option[1] -Xmx1g
[2011-05-19 10:21:30] [debug] ( javajni.c:660 ) Jvm Option[2] -XX:+UseCompressedOops
[2011-05-19 10:21:30] [debug] ( javajni.c:660 ) Jvm Option[3] -Xss128k
[2011-05-19 10:21:30] [debug] ( javajni.c:660 ) Jvm Option[4] -XX:+UseParNewGC
[2011-05-19 10:21:30] [debug] ( javajni.c:660 ) Jvm Option[5] -XX:+UseConcMarkSweepGC
[2011-05-19 10:21:30] [debug] ( javajni.c:660 ) Jvm Option[6] -XX:+CMSParallelRemarkEnabled
[2011-05-19 10:21:30] [debug] ( javajni.c:660 ) Jvm Option[7] -XX:SurvivorRatio=8
[2011-05-19 10:21:30] [debug] ( javajni.c:660 ) Jvm Option[8] -XX:MaxTenuringThreshold=1
[2011-05-19 10:21:30] [debug] ( javajni.c:660 ) Jvm Option[9] -XX:CMSInitiatingOccupancyFraction=75
[2011-05-19 10:21:30] [debug] ( javajni.c:660 ) Jvm Option[10] -XX:+UseCMSInitiatingOccupancyOnly
[2011-05-19 10:21:30] [debug] ( javajni.c:660 ) Jvm Option[11] -XX:+HeapDumpOnOutOfMemoryError
[2011-05-19 10:21:30] [debug] ( javajni.c:660 ) Jvm Option[12] -Djline.enabled=false
[2011-05-19 10:21:30] [debug] ( javajni.c:660 ) Jvm Option[13] -Delasticsearch
[2011-05-19 10:21:30] [debug] ( javajni.c:660 ) Jvm Option[14] -Des-foreground=yes
[2011-05-19 10:21:30] [debug] ( javajni.c:660 ) Jvm Option[15] -Des.path.home=D:elasticsearch
[2011-05-19 10:21:30] [debug] ( javajni.c:660 ) Jvm Option[16] -Djava.class.path=C:Program Files (x86)Javajre6libextQTJava.zip;D:elasticsearchlibelasticsearch-0.16.1.jar;D:elasticsearchlibelasticsearch-0.16.1.jar;D:elasticsearchlibjline-0.9.94.jar;D:elasticsearchlibjna-3.2.7.jar;D:elasticsearchliblog4j-1.2.15.jar;D:elasticsearchliblucene-analyzers-3.1.0.jar;D:elasticsearchliblucene-core-3.1.0.jar;D:elasticsearchliblucene-highlighter-3.1.0.jar;D:elasticsearchliblucene-memory-3.1.0.jar;D:elasticsearchliblucene-queries-3.1.0.jar;D:elasticsearchlibsigarsigar-1.6.4.jar
[2011-05-19 10:21:30] [debug] ( javajni.c:660 ) Jvm Option[17] -Xmx512m
[2011-05-19 10:21:31] [debug] ( javajni.c:891 ) Java Worker thread started org/elasticsearch/bootstrap/Bootstrap:main
[2011-05-19 10:21:32] [debug] ( prunsrv.c:1058) Java started org/elasticsearch/bootstrap/Bootstrap
[2011-05-19 10:21:32] [info]  (          :0   ) Service started in 2066 ms.
[2011-05-19 10:21:32] [debug] ( prunsrv.c:1369) Waiting for worker to finish...
[2011-05-19 10:21:39] [debug] ( javajni.c:907 ) Java Worker thread finished org/elasticsearch/bootstrap/Bootstrap:main with status=0
[2011-05-19 10:21:39] [debug] ( prunsrv.c:1374) Worker finished.
[2011-05-19 10:21:39] [debug] ( prunsrv.c:1397) Waiting for all threads to exit

elasticsearch.log:

[2011-05-19 10:21:32,709][INFO ][node                     ] [Hack] {elasticsearch/0.16.1}[2344]: initializing ...
[2011-05-19 10:21:32,711][INFO ][plugins                  ] [Hack] loaded []
[2011-05-19 10:21:36,149][INFO ][node                     ] [Hack] {elasticsearch/0.16.1}[2344]: initialized
[2011-05-19 10:21:36,150][INFO ][node                     ] [Hack] {elasticsearch/0.16.1}[2344]: starting ...
[2011-05-19 10:21:36,268][INFO ][transport                ] [Hack] bound_address {inet[/0.0.0.0:9300]}, publish_address {inet[/10.0.1.8:9300]}
[2011-05-19 10:21:39,311][INFO ][cluster.service          ] [Hack] new_master [Hack][Gkn9PLFTR0KdX2X__ybpIQ][inet[/10.0.1.8:9300]], reason: zen-disco-join (elected_as_master)
[2011-05-19 10:21:39,337][INFO ][discovery                ] [Hack] elasticsearch/Gkn9PLFTR0KdX2X__ybpIQ
[2011-05-19 10:21:39,351][INFO ][gateway                  ] [Hack] recovered [0] indices into cluster_state
[2011-05-19 10:21:39,366][INFO ][http                     ] [Hack] bound_address {inet[/0.0.0.0:9200]}, publish_address {inet[/10.0.1.8:9200]}
[2011-05-19 10:21:39,366][INFO ][node                     ] [Hack] {elasticsearch/0.16.1}[2344]: started

Hopefully, this helps you get ElasticSearch up and running as a service on Windows x64. It’s a great app and really worth looking at. I’m hoping to make good use of it on a couple of projects, particularly the faceted search feature.