Building a Simple FILE to FILE Integration with Crosscut

One of the first thing one tries when evaluating an integration platform, is a simple file to file integration. This Managed File Transfer is the moral equivalent of an “Hello World” program for integration and middleware platforms.

In this post, I will walk through how one does implement a simple FILE to FILE scenario with Crosscut. In the following post, I will outline the technical challenges and talk about the Azure components used under the hood to build such a scenario.

In the following weeks, I will use this dual post approach to explain our technical journey in building Crosscut. One post will outline how the scenario appears to customers and integration partners, while the next will delve into technical details.

So, let’s see how our integration partners use Crosscut to build a simple integration.

Building Publication and Subscription Ports

Like many integration platforms, Crosscut relies on a publish/subscribe integration pattern to transport messages from a source system to a target system. Of course, it also relies on the Message Endpoint pattern, known in Crosscut as Connectors to acquire data from a source system and transmit data to a target system.

So the first thing to do is to build a couple of ports, one that will detect files on the source system and another that will transmit this file to a target system.

So first, connect to Crosscut:

Jump to the “Developer” mode in the portal. That’s where a developer will design data integrations. The other two modes are “Monitoring”, where a user can gain insights into how the integrations behave – such as Message Interchanges, Event Logs, etc. – and “Management”, where an administrator create users and grant permissions to various parts of the Platform.

Select the “Publication Ports” menu on the left, and click on the “New” button to create a new publication port.

Give a meaningful name to the port and click on the “Select a Connector” box.

In this case, select the FILE inbound connector.

By default, only the required properties are shown when configuring a connector. Type in the location of the folder from which files will be taken on the source system, and click “Save”.

Back on the publication port configuration screen, notice that a “Request Pipeline” box is shown. This indicates that the publication port is a One-Way port and that an optional sequence of processing can be applied to the message before being submitted to Crosscut.

In this case, leave the box empty, and click “Save”.

In order to configure the subscription port, select the “Subscription Ports” menu on the left, and follow the same steps. Just select the outbound FILE connector and choose an appropriate location for a folder in which files will be transmitted on the target system.

Creating and Activating a Data Integration Flow

In order to build the data integration, one must link the publication port to the subscription port. Select the “Flows” menu on the left, and click the “New Flow” button to create a new flow.

Select the publication and subscription ports created in the previous steps, and use drag and drop to create a link from the publication port to the subscription port. This instructs Crosscut that whenever a message is published by that particular publication port, a copy should be delivered to the specified subscription port.

It is also possible to click on the link to set Subscription Filters in order to select only a subset of all the messages published by the publication port. But for this post, the subscription filter is a catch-all filter.

A flow must be activated before messages can be delivered to Crosscut.
Click the hamburger menu from the flow tile and select “Activate”. This operation is asynchronous, because it creates all the necessary infrastructure on Azure to allow for message delivery.

Once the flow is activated, the publication and subscription ports must be hosted in some sort of executable process, in order for the data integration to effectively take place.

Creating a Group for Hosting Publication and Subscription Ports

The executable process in which publication and subscription ports are hosted is part of a logical Group that is created and managed centrally in Crosscut. Each group can contain one or more instances of an executable, each of which hosts the publication and subscription ports that are associated with it.

Crosscut comes with a builtin Group, named Cloud Connect to host publication ports and subscription ports that connect to online systems, such as Salesforce.com, or an FTP server, for instance. In order to host publication and subscription ports that connect to a LOB system, one must create one or more dedicated groups.

From the left side, select the “Group” menu and click “New” to create a new Group.

Once a group is created, one or more ports must be associated with it. In a typical scenario, the publication and subscription ports would be associated with differents groups. Each group would map to an instance of a process that runs on a different machine. In this post, I choose to have a single group for demonstration purposes.

The ports associated with a group can be Started or Stopped. Obviously, for messaging to flow from one port to another, the ports must be started…

Running the Data Integration

All is now configured to run the data integration. The remaining steps involve downloading and running an instance of the executable process that hosts the publication and subscription ports associated with the group configured in the previous section of this post.

This executable process is named OnPrem-Connect because it is typically deployed on a machine running on-premises. Click on the Cloud icon on the right hand side of the group caption to download a setup program that install a pre-configured instance of the executable process.

Complete the installation wizard and wait for the service to start and register itself to the Crosscut portal.

Once registered, an instance of the OnPrem-Connect executable is identified by a green indicator on the group caption bar. There can be other instances of the same group, running on other machines for redundancy and performance purposes.

That’s it.

To complete this post, when a file is dropped in the publication folder…

… it is taken by the OnPrem-Connect instance that hosts the publication port and published to Crosscut as a message.

Once subscribed to, the message is then transferred to the OnPrem-Connect instance that hosts the subscription port, which delivers its payload to the target folder.

Here was a quick overview of a simple FILE to FILE integration scenario with Crosscut.

Posted in Crosscut | Tagged , | Leave a comment

Introducing Crosscut® by Moskitos™ an Exciting new IPaaS offering

For the past two years and a half, Jérémie and I have contributed in creating and growing Moskitos, an Independant Cloud Service Provider where we are building exciting new solutions. In particular, as a Software Architect and CTO at Moskitos, I have been busy designing and building a new Integration Platform as a Service (IPaaS).

Our platform is called Crosscut.

Crosscut Integration Platform as a Service Overview

Crosscut is a middleware solution that runs in the cloud and manages data and process integration. It is based upon many of the same principles found in BizTalk Server but we have implemented our simplified solution from scratch using many services provided by Microsoft Azure. Currently, our platform is used internally by our consultants, who deliver integration projects for our clients. It is also targetted at select partners and cloud service providers that want to push their own offering but have to overcome the fact that some of their customers may not realize they should invest in an integration platform.

In the following weeks, I will post on this blog to highlight our journey building Crosscut.

The focus of this blog did not change. I will still document the technical challenges that we faced, the choices that we made and hopefully post some tips and tricks that may apply to a wide audience.

Crosscut Portal Screenshots

Posted in Non classé | Tagged , , , | 1 Comment

Integrating NLog with Azure Cloud Service Diagnostics

NLog is a popular open-source logging Framework for .Net applications. In my opinion, it is very flexible and is more feature rich than, say, the builtin .Net tracing Framework. For instance, NLog supports asynchronous logging. It also supports flexible trace formatting an has auto-archiving mechanism that I find very useful.

Some articles have already been discussing how using NLog from an Azure Cloud Service could work. Most of these articles instruct you to make NLog log to the builtin .Net Trace sources and, from there, the default support for Azure diagnostics kicks in and persist the logs to a preconfigured Storage account.

However, this approach has the drawback that any Tracing done in your application will also end up in the persisted logs.

In the project I’m working on, I could not use this approach because I wanted to have multiple distinct trace files. This project implements some kind of a Process Runner and I wanted to capture and persist each process’s STDOUT and STDERR streams in a dedicated trace log.

So the strategy I ended up following consists in implementing the following steps:

  1. Write NLog messages to a dedicated file. One file per logger.
  2. Use NLog auto-archiving feature to periodically archive traces to a particular folder.
  3. Configure Azure diagnostics to persist archived traces to a Storage account.

In practice, this works really well.

The remainder of this post will walk you through how to configure you Cloud Service in order to persist NLog file traces to an Azure Storage account.

Writing NLog traces to local storage

First, you need to configure local storage for your Cloud Service, so that NLog can write traces to the filesystem.

For reference, your Cloud Service’s ServiceDefinition.csdef file is modified like so:

<?xml version="1.0" encoding="utf-8"?>
<ServiceDefinition name="Host" xmlns="http://schemas.microsoft.com/ServiceHosting/2008/10/ServiceDefinition" schemaVersion="2014-06.2.4">
  <WorkerRole name="WorkerRole" vmsize="Small">
    <LocalResources>
      <LocalStorage name="logs" cleanOnRoleRecycle="true" />
    </LocalResources>
  </WorkerRole>
</ServiceDefinition>

Next, NLog needs to know where to write the log files. The easiest way I found, was to create an environment variable that refers to the local storage location and use that variable in NLog’s configuration. In order to define an environment variable, you need to manually edit your Cloud Service’s ServiceDefinition.csdef file like so:

<?xml version="1.0" encoding="utf-8"?>
<ServiceDefinition name="Host" xmlns="http://schemas.microsoft.com/ServiceHosting/2008/10/ServiceDefinition" schemaVersion="2014-06.2.4">
  <WorkerRole name="WorkerRole" vmsize="Small">
    <Runtime>
      <Environment>
        <Variable name="NLOGDIR">
          <RoleInstanceValue xpath="/RoleEnvironment/CurrentInstance/LocalResources/LocalResource[@name='logs']/@path" />
        </Variable>
      </Environment>
    </Runtime>
  </WorkerRole>
</ServiceDefinition>

This modification creates an environment variable named NLOGDIR that points to the location of each instance’s local storage resource directory. Having this done in the Service Definition ensures that the environment variable is created before the role starts, so that NLog can take advantage of it during its configuration.

In NLog’s configuration, use the environment variable as the location of the log files.

<target name="App" xsi:type="File"
  lineEnding="Default" autoFlush="true" keepFileOpen="false" concurrentWrites="true" 
  fileName="${environment:variable=NLOGDIR}\ApplicationLog.txt"
  archiveFileName="${environment:variable=NLOGDIR}\archive\ApplicationLog_${date:format=yyyymmddHH}_{#####}.log"
  archiveEvery="Minute" archiveNumbering="sequence" maxArchiveFiles="720"
  >
...

Persisting NLog traces to an Azure Storage account

In order to persist trace files to an Azure Storage account, you must configure Azure diagnostics to periodically transfer the contents of the trace files from the local storage to a preconfigured Azure Storage account.

First, the Azure Diagnostics Module must be enabled for your Cloud Service. This is usually done by default when creating the Cloud Service project from Visual Studio.

Make sure to configure an appropriate Connection String pointing to an Azure Storage account where trace files will be persisted. During development, of course, the development storage will do.

In theory, you can configure Azure diagnostics declaratively, using the Diagnostics.wadcfg file, stored under your role node in Solution Explorer.

In order for the configuration to take effect, this file must be included, or better linked, to the role project. Make sure to set the “Copy to Output Directory” property of this file to “Copy always” or “Copy if newer”.

The contents of the Diagnostics.wadcfg file must be updated so that the Azure Diagnostics monitor can transfer the contents of the local storage directory to a specified blob container in the configured Storage Account.

<DiagnosticMonitorConfiguration configurationChangePollInterval="PT1M" overallQuotaInMB="4096" xmlns="http://schemas.microsoft.com/ServiceHosting/2010/10/DiagnosticsConfiguration">
  <DiagnosticInfrastructureLogs />
  <Directories scheduledTransferPeriod="PT1M" >
    ...
    <DataSources>
      <DirectoryConfiguration container="wad-custom-container" directoryQuotaInMB="1024">
        <LocalResource name="logs" relativePath=".\archive" />
      </DirectoryConfiguration>
    </DataSources>
  </Directories>

Make sure to not exceed the overall quota otherwise the Diagnostics Monitor will crash upon startup. Thanks Jérémie for the time taken to troubleshoot this!

Posted in Tips, Windows Azure | Tagged , , | 3 Comments

Microsoft® MVP Integration 2015 !

For the fifth consecutive year, I have been receiving the Microsoft Most Valuable Professional award. I’m proud to be part of the Microsoft Integration community for another year.

Congratulations to the 2015 Microsoft MVP!

This year, I had the great pleasure to attend the MVP Summit, so I’m glad I will have the opportunity to meet some of you guys in the community again.

I’m looking forward to some exciting news in the domain of Micro Services in particular and the interactions between the Integration space and the Cloud Computing in general.

Posted in Non classé | Tagged , | Leave a comment

Nouvel article sur l’adoption du Cloud en entreprise dans le Magazine Programmez !

Le marché du Cloud Computing a déjà séduit de très nombreuses entreprises. Pourtant, plusieurs études indépendantes montrent que l’usage de l’informatique dématérialisée dans les entreprises est loin d’être aussi répandu qu’on pourrait le penser.

C’est ainsi que commence l’article que j’ai co-écrit avec Jérémie.

Il est paru dans le numéro de novembre 2014 du Magazine Programmez !
Je vous invite à le lire !

Posted in Article / Presse | Tagged | Leave a comment

PowerShell Provider updated for BizTalk Server 2013 and BizTalk Server 2013 R2

It’s been a while since I’ve contributed to the PowerShell Provider for BizTalk Server.

Well, I’ve recently published an updated version of the provider on CodePlex in order to fix a serious issue that prevented the previous version from launching. I have also updated the code to add the following CmdLets:

  • Export-GroupSettings: for making it easier to export settings of a BizTalk group to file.
  • Import-GroupSettings: for importing a settings file to an existing BizTalk group.
  • Export-Vocabulary: to export Business Rules Engine vocabularies to file.

I have also made possible to work with Business Rules policies, even if they are not associated with a specific BizTalk Application.
You can download an updated Getting Started Guide from the project home page.

Working with Group Settings

In a previous version, I added support for exploring performance, throttling and latency-related settings of a BizTalk Group via the PowerShell Provider for BizTalk Server.

In this release, I have added a couple of CmdLets to make this process easier.

PS BizTalk:\> Export-GroupSettings -Path BizTalk:\ -Destination C:\Temp\group_settings.xml
PS BizTalk:\>
PS BizTalk:\> Import-GroupSettings -Path BizTalk:\ -Source C:\Temp\group_settings.xml
PS BizTalk:\>

As most CmdLets, those CmdLets also accept pipeline input, like so:

PS BizTalk:\> (Get-Item BizTalk:\) | Export-GroupSettings -Destination C:\Temp\group_settings.xml
PS BizTalk:\>

Working with Policies and Vocabularies

Support for working with Business Rules Engine policies has been included in the PowerShell Provider for a long time now.

This release adds the ability to work with Business Rules Engine policies and vocabularies, even for policies not explicitely associated with any particular BizTalk Application. New to this release, is the possibility to export vocabularies themselves as standalone artifacts.

In order to make this possible, I have added a virtual folder under the root of the Provider:

PS BizTalk:\> Get-ChildItem

    Path: Biztalk:


Name
----
All Artifacts
Applications
Parties
Platform Settings
Health and Activity
Rule Engine

PS BizTalk:\>

The Export-Policy CmdLet has been updated to make it possible to export Business Rules Engine policies even though it may not be associated with any particular BizTalk application.

PS BizTalk:\> Export-Policy -Path 'BizTalk:\Rule Engine\Policies\MyPolicy' -Version 1.1 -Destination C:\Temp\MyPolicy.1.1.xml
PS BizTalk:\>

The new Export-Vocabulary CmdLet has been specifically included to make it possible to export Business Rules Engine vocabularies as standalone export file.

PS BizTalk:\> Export-Vocabulary -Path 'BizTalk:\Rule Engine\Vocabularies\MyVocabulary' -Version 1.1 -Destination C:\Temp\MyVocabulary.1.1.xml -AddReferencedVocabularies
PS BizTalk:\>

I hope you’ll enjoy this release.

Posted in PowerShell | Tagged , , | Leave a comment

Capturing REST API Calls with an OWIN Middleware

Update:I have set a GitHub repository hosting an updated and more complete version of this code.

With the advent of the Cloud First, Mobile First world we live in, you are aware that REST APIs are becoming more and more important every day. In one of our projects, we wanted to capture details of every access to our APIs for security and audit purposes. Ideally, we wanted to build some kind of interception mechanism, so that the developer of the API does not have to deal with the intricacies and the details of the capturing process.

This is a form of Aspect Oriented Programming (AOP), where the concern of capturing API calls is orthogonal to the concern of performing the work the API is designed for. There are many ways to achieve this, and many frameworks to complete this task.

Fortunately, we are exposing our REST APIs using Katana, which is Microsoft’s open-source implementation of the OWIN protocol. OWIN has the concept of Middleware, which are layered on top of one another, in order to perform some processing. One of these Middleware handles processing and routing requests to ASP.NET WebAPI Controllers:

namespace App
{
    using System;
    using System.Web.Http;
    using Microsoft.Owin.Hosting;
    using Owin;
    
    class Program
    {
        static void Main(string[] args)
        {
            const string address = "http://localhost:12345";

            using (WebApp.Start(address))
            {
                Console.WriteLine("Listening requests on {0}.", address);
                Console.WriteLine("Please, press ENTER to shutdown.");
                Console.ReadLine();
            }
        }
    }

    public sealed class Startup
    {
        private readonly HttpConfiguration configuration_ = new HttpConfiguration();

        public void Configuration(IAppBuilder application)
        {
            application.UseWebApi(configuration_);

            configuration_.MapHttpAttributeRoutes();
        }
    }
}

Creating an OWIN Middleware

In order to capture REST API calls, we will be creating a custom OWIN Middleware. Middleware in OWIN form a chain and are included in the processing pipeline. Each Middleware has a reference to and is responsible for calling the next Middleware in the chain as part of its processing.

The absolute minimal do-nothing OWIN Middleware looks like so:

namespace Middleware
{
    using Microsoft.Owin;

    public sealed class TrackingMiddleware : OwinMiddleware
    {
        public TrackingMiddleware(OwinMiddleware next)
            : base(next)
        {
        }

        public override async System.Threading.Tasks.Task Invoke(IOwinContext context)
        {
            await Next.Invoke(context);
        }
    }
}

Our Tracking Middleware is layered on top of the chain, so that it is called first when an HTTP request comes in and called last when the corresponding HTTP response comes back from the Web API. Thus, our Middleware has a chance to track details about both the request and the response.

    public sealed class Startup
    {
        ...
        public void Configuration(IAppBuilder application)
        {
            application.Use<Middleware.TrackingMiddleware>();
            application.UseWebApi(configuration_);

            configuration_.MapHttpAttributeRoutes();
        }
    }

But first, some book-keeping

In order to make this post more meaningful, we need to have some boilerplate code in place first. So, for illustration purposes, let’s create a very simple WebApi controller that will be used in the rest of this article.

namespace WebApi
{
    using System;
    using System.Threading.Tasks;
    using System.Web.Http;

    [RoutePrefix("api/custom")]
    public sealed class CustomController : ApiController
    {
        [HttpGet]
        [Route("resources")]
        public IHttpActionResult GetAllResources()
        {
            var collection = new[]
            {
                "one",
                "two",
                "three",
            };

            return Ok(collection);
        }

        [HttpPost]
        [Route("post")]
        public async Task<IHttpActionResult> PostStream()
        {
            // consume the stream, to simulate processing
            using (var stream = await Request.Content.ReadAsStreamAsync())
            {
                var count = 0;
                var buffer = new byte[4096];
                while ((count = await stream.ReadAsync(buffer, 0, buffer.Length)) != 0)
                    /* do nothing */;
            }

            return Ok();
        }
    }
}

Tracked API calls need to be persisted somewhere. In our project, we decided to store the details of each call in a Microsoft Azure Storage Table. For the purposes of unit-testing, we have abstracted this behind the interface and definitions shown below. Implementing this interface is outside the scope of this post and left as an exercise to the reader :-)

namespace Interop
{
    using System;
    using System.Collections.Generic;
    using System.Threading.Tasks;

    public interface ITableStorage<T> where T : class, new()
    {
        Task<IEnumerable<T>>  EnumRecordsAsync(string query = "");

        Task InsertRecordAsync(T record);
        Task UpdateRecordAsync(T record);
        Task DeleteRecordAsync(T record);
    }
}

Details of a captured REST API call are encapsulated in the following ApiEntry class. It includes important things such as the caller identity and IP address for security purposes, as well as the call duration in order to assess the performance of our API over time.

namespace Storage
{
    using System;
    using System.Collections.Generic;

    using Interop;

    public sealed class ApiEntry
    {
        public ApiEntry()
        {
            TrackingId = Guid.NewGuid();
            CallDateTime = DateTime.UtcNow;
        }
        public Guid TrackingId { get; set; }

        public string CallerIdentity { get; set; }
        public DateTime CallDateTime { get; set; }
        public string CallerAddress { get; set; }

        public string Verb { get; set; }
        public Uri RequestUri { get; set; }
        public IDictionary<String, String[]> RequestHeaders { get; set; }
        public string Request { get; set; }

        public int StatusCode { get; set; }
        public string ReasonPhrase { get; set; }
        public IDictionary<String, String[]> ResponseHeaders { get; set; }
        public string Response { get; set; }

        public TimeSpan CallDuration { get; set; }
    }

    public sealed class ApiTableStorage : ITableStorage<ApiEntry>
    {
       ...
    }
}

Ok, let’s now go back to implementing our Middleware.

A Tracking Owin Middleware

private readonly ITableStorage<ApiEntry> storage_;

public override async Task Invoke(IOwinContext context)
{
    var request = context.Request;
    var response = context.Response;

    // capture details about the caller identity

    var identity =
        request.User != null && request.User.Identity.IsAuthenticated ?
            request.User.Identity.Name :
            "(anonymous)"
            ;

    var record = new ApiEntry
    {
        CallerIdentity = identity,
    };

    // replace the request stream in order to intercept downstream reads

    var requestBuffer = new MemoryStream();
    var requestStream = new ContentStream(requestBuffer, request.Body);
    request.Body = requestStream;

    // replace the response stream in order to intercept downstream writes

    var responseBuffer = new MemoryStream();
    var responseStream = new ContentStream(responseBuffer, response.Body);
    response.Body = responseStream;

    // add the "Http-Tracking-Id" response header

    context.Response.OnSendingHeaders(state =>
    {
        var ctx = state as IOwinContext;
        var resp = ctx.Response;

        // adding the tracking id response header so that the user
        // of the API can correlate the call back to this entry

        resp.Headers.Add("Http-Tracking-Id", new[] { record.TrackingId.ToString("d"), });

    }, context)
    ;

    // invoke the next middleware in the pipeline

    await Next.Invoke(context);

    // rewind the request and response buffers
    // and record their content

    WriteRequestHeaders(request, record);
    record.Request = await WriteContentAsync(requestStream, record.RequestHeaders);

    WriteResponseHeaders(response, record);
    record.Response = await WriteContentAsync(responseStream, record.ResponseHeaders);

    // persist details of the call to durable storage

    await storage_.InsertRecordAsync(record);
}

The Middleware is very simple, and I will walk you through the code.

Remember that this Middleware must be the first in the pipeline, so that is has a chance to intercept the request first and then intercept the response before it returns back to the caller.

So, the Middleware first creates a new instance of the ApiEntry class, which initializes itself with a brand new Tracking Identifier for this request.

Next, the Middleware replaces both the request and response streams taken from the context by custom classes that wrap the streams in order to intercept reads and writes. It gives the Middleware a chance to buffer what’s being read from the request, as well as intercept what’s being written to the response by downstream Middleware components in the pipeline.

When something is first written to in the response Stream, the OnSendingHeaders callback is triggered. The Middleware uses this opportunity to insert the Tracking Identifier in the response headers, so that the client of the call can correlate this invocation with a specific identifier.

The idea being that, should something go wrong with the call, the client will be able to contact Customer support who have all the information available from this call recorded in durable storage, including the reason for the error and, possibly, the underlying exception and its call stack as well.

Then, the Middleware invokes the next Middleware component in the pipeline, thus triggerring the sequence of the calls to the ultimate REST API.

Finally, the Middleware rewinds the local Stream buffers and records all the details of this call to durable storage.

The following helper methods are used in the Middleware implementation, shown here for completeness:

private static void WriteRequestHeaders(IOwinRequest request, HttpEntry record)
{
    record.Verb = request.Method;
    record.RequestUri = request.Uri;
    record.RequestHeaders = request.Headers;
}

private static void WriteResponseHeaders(IOwinResponse response, HttpEntry record)
{
    record.StatusCode = response.StatusCode;
    record.ReasonPhrase = response.ReasonPhrase;
    record.ResponseHeaders = response.Headers;
}

private static async Task<string> WriteContentAsync(ContentStream stream, IDictionary<string, string[]> headers)
{
    const string ContentType = "Content-Type";

    var contentType = 
        headers.ContainsKey(ContentType) ?
        headers[ContentType][0] :
        null
        ;

    return await stream.ReadContentAsync(contentType);
}

We’re using a custom Stream class that helps us intercept the reads and writes as well as provide some helper code to read the streams as text. If the HTTP content-type response header denotes a “string” content, for instance, application/json or text/plain, the Stream class attempts to identify the encoding and read the Stream as text. Otherwise, it might be binary so no attempt to read the Stream is made.

public class ContentStream : Stream
{
    protected readonly Stream buffer_;
    protected readonly Stream stream_;

    private long contentLength_ = 0L;
        
    public ContentStream(Stream buffer, Stream stream)
    {
        buffer_ = buffer;
        stream_ = stream;
    }

    /// <summary>
    /// Returns the recorded length of the underlying stream.
    /// </summary>
    public virtual long ContentLength
    {
        get { return contentLength_; }
    }

    public async Task<String> ReadContentAsync(string contentType, long maxCount)
    {
        if (!IsTextContentType(contentType))
        {
            contentType = String.IsNullOrEmpty(contentType) ? "N/A" : contentType;
            return String.Format("{0} [{1} bytes]", contentType, ContentLength);
        }

        buffer_.Seek(0, SeekOrigin.Begin);

        var length = Math.Min(ContentLength, maxCount);

        var buffer = new byte[length];
        var count = await buffer_.ReadAsync(buffer, 0, buffer.Length);

        return
            GetEncoding(contentType)
            .GetString(buffer, 0, count)
            ;
    }

    protected void WriteContent(byte[] buffer, int offset, int count)
    {
        buffer_.Write(buffer, offset, count);
    }

    #region Implementation

    private static bool IsTextContentType(string contentType)
    {
        if (contentType == null)
            return false;

        var isTextContentType =
            contentType.StartsWith("application/json") ||
            contentType.StartsWith("application/xml") ||
            contentType.StartsWith("text/")
            ;
        return isTextContentType;
    }

    private static Encoding GetEncoding(string contentType)
    {
        var charset = "utf-8";
        var regex = new Regex(@";\s*charset=(?<charset>[^\s;]+)");
        var match = regex.Match(contentType);
        if (match.Success)
            charset = match.Groups["charset"].Value;

        try
        {
            return Encoding.GetEncoding(charset);
        }
        catch (ArgumentException e)
        {
            return Encoding.UTF8;
        }
    }

    #endregion

    #region System.IO.Stream Overrides

    ...

    public override int Read(byte[] buffer, int offset, int count)
    {
        // read content from the request stream

        count = stream_.Read(buffer, offset, count);
        contentLength_ += count;

        // record the read content into our temporary buffer

        if (count != 0)
            WriteContent(buffer, offset, count);

        return count;
    }

    public override void Write(byte[] buffer, int offset, int count)
    {
        // store the bytes into our local stream

        WriteContent(buffer, 0, count);

        // write the bytes to the response stream
        // and record the actual number of bytes written

        stream_.Write(buffer, offset, count);
        contentLength_ += count;
    }

    #endregion
       
    #region IDisposable Implementation

    protected override void Dispose(bool disposing)
    {
        buffer_.Dispose();

        // not disposing the stream_ member
        // which is owned by the Owin infrastructure
    }

    #endregion
}

That’s it. As you can see, it’s very easy to implement a custom Middleware in order to intercept the request and/or the response. A couple of points of interest in this post have shown you how you can replace the response stream, how you can insert your own headers in the response, etc. A further point of note is that by using a custom Stream-based class, the processing happens at the same time as the downstream Middleware components are reading from the request and/or writing to the response streams.

In a real production code, I would not use instances of the MemoryStream class for my temporary buffers because I’m concerned about having the whole content of both the request and the response live in memory for the duration of the call. In my project, I’m using an instance of the VirtualStream class, available as a sample from BizTalk Server, in order to make sure that not too much memory is taken by the streams.

Code available on GitHub

I have set a GitHub repository hosting an updated and more complete version of this code.

Please, feel free to have a look.

Posted in Owin | Leave a comment