Introducing Crosscut® by Moskitos™ an Exciting new IPaaS offering

For the past two years and a half, Jérémie and I have contributed in creating and growing Moskitos, an Independant Cloud Service Provider where we are building exciting new solutions. In particular, as a Software Architect and CTO at Moskitos, I have been busy designing and building a new Integration Platform as a Service (IPaaS).

Our platform is called Crosscut.

Crosscut Integration Platform as a Service Overview

Crosscut is a middleware solution that runs in the cloud and manages data and process integration. It is based upon many of the same principles found in BizTalk Server but we have implemented our simplified solution from scratch using many services provided by Microsoft Azure. Currently, our platform is used internally by our consultants, who deliver integration projects for our clients. It is also targetted at select partners and cloud service providers that want to push their own offering but have to overcome the fact that some of their customers may not realize they should invest in an integration platform.

In the following weeks, I will post on this blog to highlight our journey building Crosscut.

The focus of this blog did not change. I will still document the technical challenges that we faced, the choices that we made and hopefully post some tips and tricks that may apply to a wide audience.

Crosscut Portal Screenshots

Posted in Non classé | Tagged , , , | Leave a comment

Integrating NLog with Azure Cloud Service Diagnostics

NLog is a popular open-source logging Framework for .Net applications. In my opinion, it is very flexible and is more feature rich than, say, the builtin .Net tracing Framework. For instance, NLog supports asynchronous logging. It also supports flexible trace formatting an has auto-archiving mechanism that I find very useful.

Some articles have already been discussing how using NLog from an Azure Cloud Service could work. Most of these articles instruct you to make NLog log to the builtin .Net Trace sources and, from there, the default support for Azure diagnostics kicks in and persist the logs to a preconfigured Storage account.

However, this approach has the drawback that any Tracing done in your application will also end up in the persisted logs.

In the project I’m working on, I could not use this approach because I wanted to have multiple distinct trace files. This project implements some kind of a Process Runner and I wanted to capture and persist each process’s STDOUT and STDERR streams in a dedicated trace log.

So the strategy I ended up following consists in implementing the following steps:

  1. Write NLog messages to a dedicated file. One file per logger.
  2. Use NLog auto-archiving feature to periodically archive traces to a particular folder.
  3. Configure Azure diagnostics to persist archived traces to a Storage account.

In practice, this works really well.

The remainder of this post will walk you through how to configure you Cloud Service in order to persist NLog file traces to an Azure Storage account.

Writing NLog traces to local storage

First, you need to configure local storage for your Cloud Service, so that NLog can write traces to the filesystem.

For reference, your Cloud Service’s ServiceDefinition.csdef file is modified like so:

<?xml version="1.0" encoding="utf-8"?>
<ServiceDefinition name="Host" xmlns="http://schemas.microsoft.com/ServiceHosting/2008/10/ServiceDefinition" schemaVersion="2014-06.2.4">
  <WorkerRole name="WorkerRole" vmsize="Small">
    <LocalResources>
      <LocalStorage name="logs" cleanOnRoleRecycle="true" />
    </LocalResources>
  </WorkerRole>
</ServiceDefinition>

Next, NLog needs to know where to write the log files. The easiest way I found, was to create an environment variable that refers to the local storage location and use that variable in NLog’s configuration. In order to define an environment variable, you need to manually edit your Cloud Service’s ServiceDefinition.csdef file like so:

<?xml version="1.0" encoding="utf-8"?>
<ServiceDefinition name="Host" xmlns="http://schemas.microsoft.com/ServiceHosting/2008/10/ServiceDefinition" schemaVersion="2014-06.2.4">
  <WorkerRole name="WorkerRole" vmsize="Small">
    <Runtime>
      <Environment>
        <Variable name="NLOGDIR">
          <RoleInstanceValue xpath="/RoleEnvironment/CurrentInstance/LocalResources/LocalResource[@name='logs']/@path" />
        </Variable>
      </Environment>
    </Runtime>
  </WorkerRole>
</ServiceDefinition>

This modification creates an environment variable named NLOGDIR that points to the location of each instance’s local storage resource directory. Having this done in the Service Definition ensures that the environment variable is created before the role starts, so that NLog can take advantage of it during its configuration.

In NLog’s configuration, use the environment variable as the location of the log files.

<target name="App" xsi:type="File"
  lineEnding="Default" autoFlush="true" keepFileOpen="false" concurrentWrites="true" 
  fileName="${environment:variable=NLOGDIR}\ApplicationLog.txt"
  archiveFileName="${environment:variable=NLOGDIR}\archive\ApplicationLog_${date:format=yyyymmddHH}_{#####}.log"
  archiveEvery="Minute" archiveNumbering="sequence" maxArchiveFiles="720"
  >
...

Persisting NLog traces to an Azure Storage account

In order to persist trace files to an Azure Storage account, you must configure Azure diagnostics to periodically transfer the contents of the trace files from the local storage to a preconfigured Azure Storage account.

First, the Azure Diagnostics Module must be enabled for your Cloud Service. This is usually done by default when creating the Cloud Service project from Visual Studio.

Make sure to configure an appropriate Connection String pointing to an Azure Storage account where trace files will be persisted. During development, of course, the development storage will do.

In theory, you can configure Azure diagnostics declaratively, using the Diagnostics.wadcfg file, stored under your role node in Solution Explorer.

In order for the configuration to take effect, this file must be included, or better linked, to the role project. Make sure to set the “Copy to Output Directory” property of this file to “Copy always” or “Copy if newer”.

The contents of the Diagnostics.wadcfg file must be updated so that the Azure Diagnostics monitor can transfer the contents of the local storage directory to a specified blob container in the configured Storage Account.

<DiagnosticMonitorConfiguration configurationChangePollInterval="PT1M" overallQuotaInMB="4096" xmlns="http://schemas.microsoft.com/ServiceHosting/2010/10/DiagnosticsConfiguration">
  <DiagnosticInfrastructureLogs />
  <Directories scheduledTransferPeriod="PT1M" >
    ...
    <DataSources>
      <DirectoryConfiguration container="wad-custom-container" directoryQuotaInMB="1024">
        <LocalResource name="logs" relativePath=".\archive" />
      </DirectoryConfiguration>
    </DataSources>
  </Directories>

Make sure to not exceed the overall quota otherwise the Diagnostics Monitor will crash upon startup. Thanks Jérémie for the time taken to troubleshoot this!

Posted in Tips, Windows Azure | Tagged , , | 3 Comments

Microsoft® MVP Integration 2015 !

For the fifth consecutive year, I have been receiving the Microsoft Most Valuable Professional award. I’m proud to be part of the Microsoft Integration community for another year.

Congratulations to the 2015 Microsoft MVP!

This year, I had the great pleasure to attend the MVP Summit, so I’m glad I will have the opportunity to meet some of you guys in the community again.

I’m looking forward to some exciting news in the domain of Micro Services in particular and the interactions between the Integration space and the Cloud Computing in general.

Posted in Non classé | Tagged , | Leave a comment

Nouvel article sur l’adoption du Cloud en entreprise dans le Magazine Programmez !

Le marché du Cloud Computing a déjà séduit de très nombreuses entreprises. Pourtant, plusieurs études indépendantes montrent que l’usage de l’informatique dématérialisée dans les entreprises est loin d’être aussi répandu qu’on pourrait le penser.

C’est ainsi que commence l’article que j’ai co-écrit avec Jérémie.

Il est paru dans le numéro de novembre 2014 du Magazine Programmez !
Je vous invite à le lire !

Posted in Article / Presse | Tagged | Leave a comment

PowerShell Provider updated for BizTalk Server 2013 and BizTalk Server 2013 R2

It’s been a while since I’ve contributed to the PowerShell Provider for BizTalk Server.

Well, I’ve recently published an updated version of the provider on CodePlex in order to fix a serious issue that prevented the previous version from launching. I have also updated the code to add the following CmdLets:

  • Export-GroupSettings: for making it easier to export settings of a BizTalk group to file.
  • Import-GroupSettings: for importing a settings file to an existing BizTalk group.
  • Export-Vocabulary: to export Business Rules Engine vocabularies to file.

I have also made possible to work with Business Rules policies, even if they are not associated with a specific BizTalk Application.
You can download an updated Getting Started Guide from the project home page.

Working with Group Settings

In a previous version, I added support for exploring performance, throttling and latency-related settings of a BizTalk Group via the PowerShell Provider for BizTalk Server.

In this release, I have added a couple of CmdLets to make this process easier.

PS BizTalk:\> Export-GroupSettings -Path BizTalk:\ -Destination C:\Temp\group_settings.xml
PS BizTalk:\>
PS BizTalk:\> Import-GroupSettings -Path BizTalk:\ -Source C:\Temp\group_settings.xml
PS BizTalk:\>

As most CmdLets, those CmdLets also accept pipeline input, like so:

PS BizTalk:\> (Get-Item BizTalk:\) | Export-GroupSettings -Destination C:\Temp\group_settings.xml
PS BizTalk:\>

Working with Policies and Vocabularies

Support for working with Business Rules Engine policies has been included in the PowerShell Provider for a long time now.

This release adds the ability to work with Business Rules Engine policies and vocabularies, even for policies not explicitely associated with any particular BizTalk Application. New to this release, is the possibility to export vocabularies themselves as standalone artifacts.

In order to make this possible, I have added a virtual folder under the root of the Provider:

PS BizTalk:\> Get-ChildItem

    Path: Biztalk:


Name
----
All Artifacts
Applications
Parties
Platform Settings
Health and Activity
Rule Engine

PS BizTalk:\>

The Export-Policy CmdLet has been updated to make it possible to export Business Rules Engine policies even though it may not be associated with any particular BizTalk application.

PS BizTalk:\> Export-Policy -Path 'BizTalk:\Rule Engine\Policies\MyPolicy' -Version 1.1 -Destination C:\Temp\MyPolicy.1.1.xml
PS BizTalk:\>

The new Export-Vocabulary CmdLet has been specifically included to make it possible to export Business Rules Engine vocabularies as standalone export file.

PS BizTalk:\> Export-Vocabulary -Path 'BizTalk:\Rule Engine\Vocabularies\MyVocabulary' -Version 1.1 -Destination C:\Temp\MyVocabulary.1.1.xml -AddReferencedVocabularies
PS BizTalk:\>

I hope you’ll enjoy this release.

Posted in PowerShell | Tagged , , | Leave a comment

Capturing REST API Calls with an OWIN Middleware

Update:I have set a GitHub repository hosting an updated and more complete version of this code.

With the advent of the Cloud First, Mobile First world we live in, you are aware that REST APIs are becoming more and more important every day. In one of our projects, we wanted to capture details of every access to our APIs for security and audit purposes. Ideally, we wanted to build some kind of interception mechanism, so that the developer of the API does not have to deal with the intricacies and the details of the capturing process.

This is a form of Aspect Oriented Programming (AOP), where the concern of capturing API calls is orthogonal to the concern of performing the work the API is designed for. There are many ways to achieve this, and many frameworks to complete this task.

Fortunately, we are exposing our REST APIs using Katana, which is Microsoft’s open-source implementation of the OWIN protocol. OWIN has the concept of Middleware, which are layered on top of one another, in order to perform some processing. One of these Middleware handles processing and routing requests to ASP.NET WebAPI Controllers:

namespace App
{
    using System;
    using System.Web.Http;
    using Microsoft.Owin.Hosting;
    using Owin;
    
    class Program
    {
        static void Main(string[] args)
        {
            const string address = "http://localhost:12345";

            using (WebApp.Start(address))
            {
                Console.WriteLine("Listening requests on {0}.", address);
                Console.WriteLine("Please, press ENTER to shutdown.");
                Console.ReadLine();
            }
        }
    }

    public sealed class Startup
    {
        private readonly HttpConfiguration configuration_ = new HttpConfiguration();

        public void Configuration(IAppBuilder application)
        {
            application.UseWebApi(configuration_);

            configuration_.MapHttpAttributeRoutes();
        }
    }
}

Creating an OWIN Middleware

In order to capture REST API calls, we will be creating a custom OWIN Middleware. Middleware in OWIN form a chain and are included in the processing pipeline. Each Middleware has a reference to and is responsible for calling the next Middleware in the chain as part of its processing.

The absolute minimal do-nothing OWIN Middleware looks like so:

namespace Middleware
{
    using Microsoft.Owin;

    public sealed class TrackingMiddleware : OwinMiddleware
    {
        public TrackingMiddleware(OwinMiddleware next)
            : base(next)
        {
        }

        public override async System.Threading.Tasks.Task Invoke(IOwinContext context)
        {
            await Next.Invoke(context);
        }
    }
}

Our Tracking Middleware is layered on top of the chain, so that it is called first when an HTTP request comes in and called last when the corresponding HTTP response comes back from the Web API. Thus, our Middleware has a chance to track details about both the request and the response.

    public sealed class Startup
    {
        ...
        public void Configuration(IAppBuilder application)
        {
            application.Use<Middleware.TrackingMiddleware>();
            application.UseWebApi(configuration_);

            configuration_.MapHttpAttributeRoutes();
        }
    }

But first, some book-keeping

In order to make this post more meaningful, we need to have some boilerplate code in place first. So, for illustration purposes, let’s create a very simple WebApi controller that will be used in the rest of this article.

namespace WebApi
{
    using System;
    using System.Threading.Tasks;
    using System.Web.Http;

    [RoutePrefix("api/custom")]
    public sealed class CustomController : ApiController
    {
        [HttpGet]
        [Route("resources")]
        public IHttpActionResult GetAllResources()
        {
            var collection = new[]
            {
                "one",
                "two",
                "three",
            };

            return Ok(collection);
        }

        [HttpPost]
        [Route("post")]
        public async Task<IHttpActionResult> PostStream()
        {
            // consume the stream, to simulate processing
            using (var stream = await Request.Content.ReadAsStreamAsync())
            {
                var count = 0;
                var buffer = new byte[4096];
                while ((count = await stream.ReadAsync(buffer, 0, buffer.Length)) != 0)
                    /* do nothing */;
            }

            return Ok();
        }
    }
}

Tracked API calls need to be persisted somewhere. In our project, we decided to store the details of each call in a Microsoft Azure Storage Table. For the purposes of unit-testing, we have abstracted this behind the interface and definitions shown below. Implementing this interface is outside the scope of this post and left as an exercise to the reader :-)

namespace Interop
{
    using System;
    using System.Collections.Generic;
    using System.Threading.Tasks;

    public interface ITableStorage<T> where T : class, new()
    {
        Task<IEnumerable<T>>  EnumRecordsAsync(string query = "");

        Task InsertRecordAsync(T record);
        Task UpdateRecordAsync(T record);
        Task DeleteRecordAsync(T record);
    }
}

Details of a captured REST API call are encapsulated in the following ApiEntry class. It includes important things such as the caller identity and IP address for security purposes, as well as the call duration in order to assess the performance of our API over time.

namespace Storage
{
    using System;
    using System.Collections.Generic;

    using Interop;

    public sealed class ApiEntry
    {
        public ApiEntry()
        {
            TrackingId = Guid.NewGuid();
            CallDateTime = DateTime.UtcNow;
        }
        public Guid TrackingId { get; set; }

        public string CallerIdentity { get; set; }
        public DateTime CallDateTime { get; set; }
        public string CallerAddress { get; set; }

        public string Verb { get; set; }
        public Uri RequestUri { get; set; }
        public IDictionary<String, String[]> RequestHeaders { get; set; }
        public string Request { get; set; }

        public int StatusCode { get; set; }
        public string ReasonPhrase { get; set; }
        public IDictionary<String, String[]> ResponseHeaders { get; set; }
        public string Response { get; set; }

        public TimeSpan CallDuration { get; set; }
    }

    public sealed class ApiTableStorage : ITableStorage<ApiEntry>
    {
       ...
    }
}

Ok, let’s now go back to implementing our Middleware.

A Tracking Owin Middleware

private readonly ITableStorage<ApiEntry> storage_;

public override async Task Invoke(IOwinContext context)
{
    var request = context.Request;
    var response = context.Response;

    // capture details about the caller identity

    var identity =
        request.User != null && request.User.Identity.IsAuthenticated ?
            request.User.Identity.Name :
            "(anonymous)"
            ;

    var record = new ApiEntry
    {
        CallerIdentity = identity,
    };

    // replace the request stream in order to intercept downstream reads

    var requestBuffer = new MemoryStream();
    var requestStream = new ContentStream(requestBuffer, request.Body);
    request.Body = requestStream;

    // replace the response stream in order to intercept downstream writes

    var responseBuffer = new MemoryStream();
    var responseStream = new ContentStream(responseBuffer, response.Body);
    response.Body = responseStream;

    // add the "Http-Tracking-Id" response header

    context.Response.OnSendingHeaders(state =>
    {
        var ctx = state as IOwinContext;
        var resp = ctx.Response;

        // adding the tracking id response header so that the user
        // of the API can correlate the call back to this entry

        resp.Headers.Add("Http-Tracking-Id", new[] { record.TrackingId.ToString("d"), });

    }, context)
    ;

    // invoke the next middleware in the pipeline

    await Next.Invoke(context);

    // rewind the request and response buffers
    // and record their content

    WriteRequestHeaders(request, record);
    record.Request = await WriteContentAsync(requestStream, record.RequestHeaders);

    WriteResponseHeaders(response, record);
    record.Response = await WriteContentAsync(responseStream, record.ResponseHeaders);

    // persist details of the call to durable storage

    await storage_.InsertRecordAsync(record);
}

The Middleware is very simple, and I will walk you through the code.

Remember that this Middleware must be the first in the pipeline, so that is has a chance to intercept the request first and then intercept the response before it returns back to the caller.

So, the Middleware first creates a new instance of the ApiEntry class, which initializes itself with a brand new Tracking Identifier for this request.

Next, the Middleware replaces both the request and response streams taken from the context by custom classes that wrap the streams in order to intercept reads and writes. It gives the Middleware a chance to buffer what’s being read from the request, as well as intercept what’s being written to the response by downstream Middleware components in the pipeline.

When something is first written to in the response Stream, the OnSendingHeaders callback is triggered. The Middleware uses this opportunity to insert the Tracking Identifier in the response headers, so that the client of the call can correlate this invocation with a specific identifier.

The idea being that, should something go wrong with the call, the client will be able to contact Customer support who have all the information available from this call recorded in durable storage, including the reason for the error and, possibly, the underlying exception and its call stack as well.

Then, the Middleware invokes the next Middleware component in the pipeline, thus triggerring the sequence of the calls to the ultimate REST API.

Finally, the Middleware rewinds the local Stream buffers and records all the details of this call to durable storage.

The following helper methods are used in the Middleware implementation, shown here for completeness:

private static void WriteRequestHeaders(IOwinRequest request, HttpEntry record)
{
    record.Verb = request.Method;
    record.RequestUri = request.Uri;
    record.RequestHeaders = request.Headers;
}

private static void WriteResponseHeaders(IOwinResponse response, HttpEntry record)
{
    record.StatusCode = response.StatusCode;
    record.ReasonPhrase = response.ReasonPhrase;
    record.ResponseHeaders = response.Headers;
}

private static async Task<string> WriteContentAsync(ContentStream stream, IDictionary<string, string[]> headers)
{
    const string ContentType = "Content-Type";

    var contentType = 
        headers.ContainsKey(ContentType) ?
        headers[ContentType][0] :
        null
        ;

    return await stream.ReadContentAsync(contentType);
}

We’re using a custom Stream class that helps us intercept the reads and writes as well as provide some helper code to read the streams as text. If the HTTP content-type response header denotes a “string” content, for instance, application/json or text/plain, the Stream class attempts to identify the encoding and read the Stream as text. Otherwise, it might be binary so no attempt to read the Stream is made.

public class ContentStream : Stream
{
    protected readonly Stream buffer_;
    protected readonly Stream stream_;

    private long contentLength_ = 0L;
        
    public ContentStream(Stream buffer, Stream stream)
    {
        buffer_ = buffer;
        stream_ = stream;
    }

    /// <summary>
    /// Returns the recorded length of the underlying stream.
    /// </summary>
    public virtual long ContentLength
    {
        get { return contentLength_; }
    }

    public async Task<String> ReadContentAsync(string contentType, long maxCount)
    {
        if (!IsTextContentType(contentType))
        {
            contentType = String.IsNullOrEmpty(contentType) ? "N/A" : contentType;
            return String.Format("{0} [{1} bytes]", contentType, ContentLength);
        }

        buffer_.Seek(0, SeekOrigin.Begin);

        var length = Math.Min(ContentLength, maxCount);

        var buffer = new byte[length];
        var count = await buffer_.ReadAsync(buffer, 0, buffer.Length);

        return
            GetEncoding(contentType)
            .GetString(buffer, 0, count)
            ;
    }

    protected void WriteContent(byte[] buffer, int offset, int count)
    {
        buffer_.Write(buffer, offset, count);
    }

    #region Implementation

    private static bool IsTextContentType(string contentType)
    {
        if (contentType == null)
            return false;

        var isTextContentType =
            contentType.StartsWith("application/json") ||
            contentType.StartsWith("application/xml") ||
            contentType.StartsWith("text/")
            ;
        return isTextContentType;
    }

    private static Encoding GetEncoding(string contentType)
    {
        var charset = "utf-8";
        var regex = new Regex(@";\s*charset=(?<charset>[^\s;]+)");
        var match = regex.Match(contentType);
        if (match.Success)
            charset = match.Groups["charset"].Value;

        try
        {
            return Encoding.GetEncoding(charset);
        }
        catch (ArgumentException e)
        {
            return Encoding.UTF8;
        }
    }

    #endregion

    #region System.IO.Stream Overrides

    ...

    public override int Read(byte[] buffer, int offset, int count)
    {
        // read content from the request stream

        count = stream_.Read(buffer, offset, count);
        contentLength_ += count;

        // record the read content into our temporary buffer

        if (count != 0)
            WriteContent(buffer, offset, count);

        return count;
    }

    public override void Write(byte[] buffer, int offset, int count)
    {
        // store the bytes into our local stream

        WriteContent(buffer, 0, count);

        // write the bytes to the response stream
        // and record the actual number of bytes written

        stream_.Write(buffer, offset, count);
        contentLength_ += count;
    }

    #endregion
       
    #region IDisposable Implementation

    protected override void Dispose(bool disposing)
    {
        buffer_.Dispose();

        // not disposing the stream_ member
        // which is owned by the Owin infrastructure
    }

    #endregion
}

That’s it. As you can see, it’s very easy to implement a custom Middleware in order to intercept the request and/or the response. A couple of points of interest in this post have shown you how you can replace the response stream, how you can insert your own headers in the response, etc. A further point of note is that by using a custom Stream-based class, the processing happens at the same time as the downstream Middleware components are reading from the request and/or writing to the response streams.

In a real production code, I would not use instances of the MemoryStream class for my temporary buffers because I’m concerned about having the whole content of both the request and the response live in memory for the duration of the call. In my project, I’m using an instance of the VirtualStream class, available as a sample from BizTalk Server, in order to make sure that not too much memory is taken by the streams.

Code available on GitHub

I have set a GitHub repository hosting an updated and more complete version of this code.

Please, feel free to have a look.

Posted in Owin | Leave a comment

Performing Parallel Processing in Azure Web Jobs

Azure Web Jobs are a new feature available on Azure Web Sites. They allow you to perform long-running, continuous background tasks alongside your web sites. For instance, you might have a blog and would like to resize the images that the contributors of your blog upload to the site. Until now, you had to create a separate thread in your web site or even create a separate Worker Role to tackle these kind of resource intensive tasks.

Thankfully, Azure Web Jobs solve this problem quite neatly.

By the way, if you did not know about Azure Web Jobs or if you haven’t already had the chance to fiddle with this new feature, I suggest you read the excellent introductory post from Scott Hanselman on this subject. Bear in mind that Azure Web Jobs are still in preview and there is hardly any documentation available yet.

In what follows, I’m assuming that you already know about and have implemented Azure Web Jobs.

One of the neat use cases for using Web Jobs is to trigger custom processing upon receiving a message from a queue. This is easily done by creating a simple function with a parameter decorated with the QueueInput attribute. In that case, the Web Job host infrastructure will automatically invoke your method with the contents of the queued message as a parameter.

One of the benefits of Azure Web Jobs is that they scale by default with your web site. This means that there will be as many instances of your web job as there are instances of your web site, thus, allowing for some degree of parallelism in your background processing.

If, however, you do not want some of your Azure Web Jobs to scale, you can optionnaly opt for them to run as singletons. This means that whatever the number of instances you web site runs, only a single instance of the web job will run. This is great, because this provides some kind of automatic failover, should the singleton instance of your webjob crash unexpectedly.

Create Scalable Azure Web Jobs

There is one kind of scenarios, however, that is not directly supported by Azure Web Jobs, that is performing parallel processing. It is important to note that the method that performs the bulk of the processing in your Web Job cannot be reentrant. Indeed, each instance of a Web Job will only perform its processing sequentially upon receiving a message from, say, an Azure Queue.

In one of our projects, we are using Web Jobs in order to frequently process several thousand items from a queue. Each individual piece of processing is somewhat lightweight and involves calling a third-party REST API web site. But, the fact the items from the queue are processed sequentially increases considerably the total execution time.

For illustration purposes, consider the following code:

private static void Main(string[] args)
{
    new JobHost().RunAndBlock();
}

public static void Work([QueueInput("webjobq")] string item)
{
    Console.WriteLine("This is a web job invocation: Process Id: {0}, Thread Id: {1}.", System.Diagnostics.Process.GetCurrentProcess().Id, Thread.CurrentThread.ManagedThreadId);
    Console.WriteLine("&gt;&gt; Yawn ...");
    Thread.Sleep(2500);
    Console.WriteLine(">> ... I think I fell asleep for awhile.");
    Console.WriteLine(">>; Done.");
}

When running, the Azure Web Job will process each incoming queue item sequentially. Here is the result of the method invocations:

You can see that each item takes roughly 25 seconds to complete, essentially taken by the Thread.Sleep() instruction in the code above. Therefore the total execution time of sequentially processing input from an Azure Queue will be proportional to the number of items to process. In order to reduce the total processing time, we need to somehow perform some parallel processing.

Performing Parallel Processing in Azure Web Jobs

The obvious and simplest solution is to run multiple instances of the web job. For instance, you could deploy, say, five instances of this web job alongside your web site. This allows to have a number of Web Job instances equal to five times the number of instances of your web sites.

However this solution is not easily maintainable, unless proper automation is put in place.

Another attempt was to simply try and make the Web Job method asynchronous and returning a Task. However, this does not yield the correct result and is not supported as it seems that the Web Job infrastructure is awaiting the returned task anyway.

Another solution would be to offload the processing to a certain number of separate threads, in a single instance of a Web Job. This achieves the same result, but without having to deploy multiple identical instances of a single Web Job algonside your web site.

By using a simple Semaphore object, it is possible to limit to a fixed or configurable quantity the number of concurrent threads allowed to process an item from the queue. Here is the updated code:

public static void Work([QueueInput("webjobq")] string item)
{
    LoggerFactory.GetCurrentClassLogger().Debug("Performing work...");

    try
    {
        // wait for a slot to become available
        // then spawn a new thread

        semaphore_.Wait();
        new Thread(ThreadProc).Start();
    }
    catch (Exception e)
    {
        Console.Error.WriteLine(e);
    }
}

private const int MaxNumberOfThreads = 3;
private static readonly SemaphoreSlim semaphore_ = new SemaphoreSlim(MaxNumberOfThreads, MaxNumberOfThreads);

public static void ThreadProc()
{
    try
    {
        Work();
    }
    catch (Exception e)
    {
        Console.Error.WriteLine(">> Error: {0}", e);
    }
    finally
    {
        // release a slot for another thread
        semaphore_.Release();
    }
}

public static void Work()
{
    Console.WriteLine("This is a web job invocation: Process Id: {0}, Thread Id: {1}.", System.Diagnostics.Process.GetCurrentProcess().Id, Thread.CurrentThread.ManagedThreadId);
    Console.WriteLine(">> Yawn ...");
    Thread.Sleep(25000);
    Console.WriteLine(">> ... I think I fell asleep for awhile.");
    Console.WriteLine(">> Done.");
}

In that case, each time an item is received from an Azure Queue, a new processing thread is created if there is a slot available. Here is the result of the invocations on many queue items.

You can see that several threads are created in quick succession (several hundreds of milliseconds at most) and then, when the maximum number of concurrent threads is reached, the next one is waiting for a previously created thread to complete.

There you have it. A simple way to perform parallel processing from within a Web Job. Of course, this technique is not specific to Web Jobs, but I think it allows you to work with Azure Web Jobs in a more flexible way. In our project, we used a slightly modified version of these code, taking advantage of a CancellationToken to gracefully stop the web job from either the Azure infrastructure or from the command-line.

Posted in Tips, Windows Azure | 5 Comments