Applying Per-Instance Pipeline Configuration at Runtime

As you know, I’m a big fan of generic pipelines.

As I wrote in a previous article, I’m all in favor of having few dedicated pipelines and take advantage of per-instance configuration, rather than duplicate pipelines with an identical combination of components albeit configured slightly differently.

In some cases, however, this scheme falls short. For instance, when you want to call a pipeline from inside an orchestration, there seems to be no way to specify runtime configuration.

For instance, in many orchestrations, I need to validate the contents of a message against an XSD schema. For this purpose, one needs to use a pipeline containing either the Xml Disassembler or the Xml Validator pipeline component, and specify its DocumentSpecNames or DocSpecNames property respectively.

It is not possible to use the builtin XMLReceive pipeline for this purpose. Since the builtin pipeline has not been explicitely configured it cannot be used for XSD validation out of the box.

Ideally, I would like to call the standard XMLReceive pipeline, or any custom generic pipeline as appropriate, and still be able to apply dynamic per-instance configuration.

In this post, I will demonstrate how solve this problem.

Applying Per-Instance Pipeline Configuration at Runtime

The first part of our solution involves instantiating the specified pipeline as appropriate and apply per-instance configuration at runtime.

Instantiating the pipeline is straightforward, given its runtime type. However, there is no easy way to apply dynamic configuration to a pipeline object. Looking through Reflection, one can notice that a pipeline is a class that inherits the Microsoft.BizTalk.PipelineOM.Pipeline class. This class does have a LoadPerInstanceConfig method that looks promising but is unfortunately declared internal.

Therefore, the following code makes use of reflection to call into the internal method in order to specify per-instance configuration:

public sealed class PipelineInstance
{
    internal Microsoft.BizTalk.PipelineOM.Pipeline pipeline = null;

    public static PipelineInstance 

CreateInstance(Type type)
    {
        PipelineInstance instance = new PipelineInstance();
        instance.pipeline = (Pipeline)Activator.CreateInstance(type);
        return instance;
    }

    public void 

ApplyInstanceConfig(String config)
    {
        try
        {
            using (Stream 

stream = new MemoryStream

(Encoding.Unicode.GetBytes(config)))
                LoadPerInstanceConfig(pipeline, stream);
        }
        catch (Exception 

e)
        {
            System.Diagnostics.Debug.WriteLine(String.Format("{0}", e));
            throw;
        }
    }

    #region Implementation

    private static void LoadPerInstanceConfig(Pipeline pipeline, Stream stream)
    {
        try
        {
            MethodInfo method =
                typeof(Pipeline).GetMethod(
                    "LoadPerInstanceConfig"
                    , BindingFlags.NonPublic | BindingFlags.Instance);
            method.Invoke(pipeline, new object[] { stream });
        }
        catch (System.Exception e)
        {
            System.Diagnostics.Debug.WriteLine(e);
            throw;
        }
    }

    #endregion
}

The LoadPerInstanceConfig method expects a string that represents the XML representation of the per-instance pipeline configuration. You can find the correct configuration from an existing similar pipeline by looking at an XML bindings file.

Executing the Pipeline

Now that the pipeline is correctly configured, we need to "execute" it. The native XLANGPipelineManager contains an internal method that is designed to perform this exact operation, expecting an instance of a Microsoft.BizTalk.PipelineOM.Pipeline object. This method is the one used internally by the other familiar methods exposed from the class.

Again, with a bit of help from Reflection, the following code will do in the case of a Receive pipeline:

public static class PipelineManager
{
    public static ReceivePipelineOutputMessages ExecuteReceivePipeline(Type type, XLANGMessage 

message)
    {
        string EmptyConfig = 
                    @"<Root
                        xmlns:xsd='http://www.w3.org/2001/XMLSchema'
                        xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance'>
                        <Stages />
                    </Root>";

        return ExecuteReceivePipeline(type, EmptyConfig, message);
    }

    public static ReceivePipelineOutputMessages ExecuteReceivePipeline(Type type, String 

config, XLANGMessage message)
    {
        try
        {
            PipelineInstance pipeline = PipelineInstance.CreateInstance(type);
            pipeline.ApplyInstanceConfig(config);
            return ExecuteReceivePipeline(pipeline, message);
        }
        catch (TargetInvocationException e)
        {
            System.Diagnostics.Debug.WriteLine(e);
            ThrowPipelineException(e);
            throw;
        }
        catch (XLANGPipelineManagerException e)
        {
            throw;
        }
        catch (Exception 

e)
        {
            System.Diagnostics.Debug.Assert(false, String.Format("Unexpected exception: {0}", e));
            System.Diagnostics.Debug.WriteLine(e);
            throw;
        }
    }

    public static ReceivePipelineOutputMessages ExecuteReceivePipeline(PipelineInstance pipeline, XLANGMessage message)
    {
        Microsoft.BizTalk.PipelineOM.Pipeline pipelineOM = 

pipeline.pipeline;

        try
        {
            MethodInfo method =
                typeof(XLANGPipelineManager).GetMethod(
                    "ExecutePipeline"
                    , BindingFlags.NonPublic | BindingFlags.Static
                    , null, new 

Type[] { typeof(ReceivePipeline), typeof(XLANGMessage) }
                    , null);
            return (ReceivePipelineOutputMessages)method.Invoke(null, new object[] { 

pipelineOM, message });
        }
        catch (TargetInvocationException e)
        {
            System.Diagnostics.Debug.WriteLine(e);
            ThrowPipelineException(e);
            throw;
        }
    }

    #region Implementation

    private static void ThrowPipelineException(Exception innerException)
    {
        ConstructorInfo constructor =
            typeof(XLANGPipelineManagerException).GetConstructor(
                BindingFlags.NonPublic | BindingFlags.Instance
                , null
                , new Type[] { typeof(String), typeof(Exception) }
                , null);
        throw (XLANGPipelineManagerException)constructor.Invoke(
                new object[] {
                    innerException != null ? innerException.Message : 

String.Empty
                    , innerException
                });
    }

    #endregion
}

Notice that the signature of the method is very similar to the one used when using the native XLANGPipelineManager class. This allows to produce similar code in an orchestration and to prevent confusion to potential maintainers in the future.

A similar method can be designed to call Transmit pipelines in a similar way. This is left as an exercise for the reader.

wrapping Up

Calling a pipeline from inside an orchestration using the code shown above can be done with an Atomic scope.

Calling the helper almost as simple as calling the original XLANGPipelineManager.ExecuteReceivePipeline method. It looks like so:

Of course, it would be better not to hardcode the XML fragment corresponding to the per-instance pipeline configuration directly inside the Expression shape. As a best practice, you could retrieve this from your configuration file or other external dynamic storage location.

Conclusion

In this post, I have shown how to configure the builtin XMLReceive pipeline at runtime so as to validate a document against an XML schema. However, the code shown in this article can be used in more general cases. It allows you to dynamically apply per-instance configuration to any generic pipeline you want to call from inside an orchestration.

The code shown in this article uses Reflection to achieve its purpose. Though unlikely, please, be aware that this code may need to be updated for future versions of BizTalk Server.

This entry was posted in BizTalk. Bookmark the permalink.