Streaming Data in and out of an SQLServer 2005 Database

Last week, I touched on a lightweight solution that we used to re-submit messages to a BizTalk solution. This allows us to retry messages when they did not succeed initially.

One of the challenges associated with such a solution, is the fact that messages need to be archived somehow, in order to later be re-submitted. In particular, the entire state associated with a message need to be stored for later retrieval. This concerns the message context, the part properties and the entire contents of the message itself.

Storing the message context and part properties is critical to the whole archiving process. This kind of meta-data absolutely needs to be kept, in order to successully route the submitted messages back to the correct processing orchestrations.

Furthermore, we were dealing in our particular case with multi-part messages. This means that we need to store and keep together the contents of all the parts associated with a single message.

For practical reasons, I decided to archive the messages in a database, instead of in some kind of file system. Indeed, archiving messages as files is not convenient, because it is not possible to keep all the message parts in a single unit. Besides, the database gives us a transactional way of storing our messages thus providing a more robust and reliable solution.

When processing potentially large messages within the BizTalk engine, it is absolutely critical to ensure that the memory usage is kept ‘flat’ to reasonable levels. In order to do this, we absolutely need to stream data in and out of the SQL Server 2005 database.

I searched around for various solutions, but could not find any satisfying answers. Indeed, I was quite shocked to discover that SQL Server does not make it easy for us to store and retrieve data in a streaming manner.

The challenge associated with storing the contents of a message or its parts in an SQL Server 2005 database lies therefore in the fact that there is no easy to adopt a streaming approach to storing and retrieving data.

But How does BizTalk Server do it, then?

Interestingly enough, BizTalk is built from the ground up around a streaming approach to manipulating messages from the adapters to the pipeline components to the message box database.

So I set out to check how BizTalk itself does this. It turns out that the solution is really simple:

When processing large messages, BizTalk cuts portions of the messages into chunks of a given, limited size, and store them in associated records in a fragments table inside its BizTalkMsgBoxDb database.

The Solution

In order to support archiving messages to a database, I created a very simple set of database tables, whose sole purpose is to hold all the data associated with a given message.

You will notice that the database schema is nearly identical to the one used by BizTalk in the BizTalkMsgBoxDb. No need to be too creative here and reinvent the wheel!

All accesses to the database are done by calling stored-procedures, used either to store or retrieve messages as appropriate.

Storing Data in the Database

In order to store the entire contents of potentially large messages in the database, I wrote a simple stored procedure whose job is to store a given chunk of the entire data in a new record of the fragments table.

CREATE PROCEDURE [dbo].[usr_InsertFragment] 
	-- Add the parameters for the stored procedure here
	@uidPartID uniqueidentifier,
	@nFragmentSize int,
	@imgFrag image
AS
BEGIN
	-- SET NOCOUNT ON added to prevent extra result sets from
	-- interfering with SELECT statements.
	SET NOCOUNT ON;
	
	-- Insert statements for procedure here
	DECLARE @bInTransaction bit
	SET @bInTransaction = 0

	IF (@@TRANCOUNT > 0)
	BEGIN
		SAVE TRANSACTION usr_InsertFragment
	END
	ELSE
	BEGIN
		BEGIN TRANSACTION usr_InsertFragment
		SET @bInTransaction = 1
	END

	BEGIN TRY

		DECLARE @nFragmentNumber int
		DECLARE @nOffsetStart bigint

		SELECT @nFragmentNumber = ISNULL(MAX(nFragmentNumber) + 1, 0)
		  FROM [dbo].[Fragments]
		 WHERE uidPartID = @uidPartID
		 ;
		 
		SELECT @nOffsetStart = ISNULL(MAX(nOffsetEnd) + 1, 0)
		  FROM [dbo].[Fragments]
		 WHERE uidPartID = @uidPartID
		;

		INSERT INTO [dbo].[Fragments] (
			  uidPartID
			, nFragmentNumber
			, nFragmentSize
			, nOffsetStart
			, nOffsetEnd
			, imgFrag
		)
		VALUES (
			  @uidPartID
			, @nFragmentNumber
			, @nFragmentSize
			, @nOffsetStart
			, @nOffsetStart + @nFragmentSize
			, @imgFrag
		)
		;
		
		UPDATE [dbo].[Parts]
		   SET nNumFragments = nNumFragments + 1
		     , nPartSize = nPartSize + @nFragmentSize
		 WHERE uidPartID = @uidPartID
		;

		IF (@bInTransaction = 1)
		BEGIN
			COMMIT TRANSACTION usr_InsertFragment
		END

	END TRY

	BEGIN CATCH

		DECLARE @ErrorMsg nvarchar(2048)
		SET @ErrorMsg = ERROR_MESSAGE()

		ROLLBACK TRANSACTION usr_InsertFragment

		RAISERROR (@ErrorMsg, 16, 1)

	END CATCH

END

From the client-side code, this stored procedure is called multiple times, until the entire contents of the data to be stored is exhausted.

public static void InsertFragment(
      SqlTransaction transaction
    , IBaseMessage message
    , IBaseMessagePart part
    , byte[] buffer
    , int offset
    , int count)
{
    byte[] bytes = buffer;

    if ((offset != 0) || (offset + count) != buffer.Length)
    {
        bytes = new byte[count];
        Array.Copy(buffer, offset, bytes, 0, count); 
    }

    using (SqlCommand command = new SqlCommand())
    {
        command.Connection = transaction.Connection;
        command.Transaction = transaction;
        command.CommandText = "usr_InsertFragment";
        command.CommandType = CommandType.StoredProcedure;

        command.Parameters.Add(new SqlParameter("uidPartID", part.PartID));
        command.Parameters.Add(new SqlParameter("nFragmentSize", count));
        command.Parameters.Add(new SqlParameter("imgFrag", bytes));

        command.ExecuteNonQuery();
    }

public void InsertMessage(IBaseMessage message)
{
    using (SqlConnection connection = new SqlConnection(connectionString_))
    {
        connection.Open();

        using (SqlTransaction transaction = connection.BeginTransaction())
        {
            ...

            // insert part contents

            string partName;
            IBaseMessagePart part = message.GetPartByIndex(i, out partName);
            
            Stream stream = part.GetOriginalDataStream();

            int count = 0;
            byte[] buffer = new byte[IO_BUFFER_SIZE];
            while ((count = stream.Read(buffer, 0, buffer.Length)) != 0)
                InsertFragment(transaction, message, part, buffer, 0, count);

            transaction.Commit();
        }
    }
}

I could have create a writeable System.IO.Stream wrapper around this, but the resulting code is so simple that it’s not necessarily worthwhile.

Retrieving data from the database.

Retrieving data from the database is more interesting.

Likewize, it starts with a simple stored procedure, whose job is to list the records in the fragments table that make up the entire contents to be retrieved.

CREATE PROCEDURE [dbo].[usr_SelectMessageFragment]
	-- Add the parameters for the stored procedure here
	@uidPartID uniqueidentifier,
	@nFragmentNumber int = NULL
AS
BEGIN
	-- SET NOCOUNT ON added to prevent extra result sets from
	-- interfering with SELECT statements.
	SET NOCOUNT ON;

    -- Insert statements for procedure here
    IF @nFragmentNumber IS NULL
    BEGIN
    
		SELECT Fragments.uidPartID
		     , Parts.nPartSize
		     , Fragments.nFragmentNumber
			 , Fragments.nFragmentSize
			 , Fragments.nOffsetStart
			 , Fragments.nOffsetEnd
			 , Fragments.imgFrag
		  FROM [dbo].[Fragments] Fragments
		INNER JOIN [dbo].[Parts] Parts ON (Parts.uidPartID = Fragments.uidPartID)
		 WHERE Fragments.uidPartID = @uidPartID
		ORDER BY Fragments.nFragmentNumber
		;
		
	END
	ELSE
	BEGIN

		SELECT Fragments.uidPartID
		     , Parts.nPartSize
		     , Fragments.nFragmentNumber
			 , Fragments.nFragmentSize
			 , Fragments.nOffsetStart
			 , Fragments.nOffsetEnd
			 , Fragments.imgFrag
		  FROM [dbo].[Fragments] Fragments
		INNER JOIN [dbo].[Parts] Parts ON (Parts.uidPartID = Fragments.uidPartID)
		 WHERE Fragments.uidPartID = @uidPartID
		   AND Fragments.nFragmentNumber = @nFragmentNumber
		;
		
	END
END

In the case of our re-submit solution, we do not want to retrieve the whole contents from the database in one fell swoop. Note that calling this stored procedure from the client-side code with a Sequential Access mode allows us to retrieve these records one at a time, so as to keep our memory requirements reasonable.

Furthermore, in order to make it easy for the calling code to retrieve the data, I wrapped the retrieval code in a read-only System.IO.Stream wrapper. The job of this wrapper is to hand out portions of the data to the calling code and only read from the database when necessary.

public class MessagePartStream : System.IO.Stream
{
    private SqlConnection connection_;
    private SqlCommand command_;
    private SqlDataReader reader_;

    private Int64 offset_ = 0;
    private Int64 length_ = 0;

    ...
}

The stream class is initialized with a connection string because it will create and maintain a new connection to the database for as long as the cursor needs to be available to the calling code.

protected MessagePartStream()
{
}

public static MessagePartStream CreateStream(string connectionString, Guid messageID, Guid partID)
{
    MessagePartStream stream = new MessagePartStream();
    stream.Connect(connectionString, messageID, partID);

    return stream;
}

Upon initialization, various resources are created and some bookkeeping is done. We first call into the database, so as to know the length of the entire stream. We then prepare the cursor to the fragments records in sequential access as we explained earlier.

private void Connect(string connectionString, Guid messageID, Guid partID)
{
    connection_ = new SqlConnection(connectionString);
    connection_.Open();

    // retrieve part meta data

    using (SqlCommand command = new SqlCommand())
    {
        command.Connection = connection_;
        command.CommandText = "usr_SelectMessagePart";
        command.CommandType = CommandType.StoredProcedure;

        command.Parameters.Add(new SqlParameter("uidMessageID", messageID));
        command.Parameters.Add(new SqlParameter("uidPartID", partID));

        using (SqlDataReader reader = command.ExecuteReader())
            if (reader.Read())
                length_ = Convert.ToInt64(reader["nPartSize"]);
    }

    // start sequential access to part fragments

    command_ = new SqlCommand();
    command_.Connection = connection_;
    command_.CommandText = "usr_SelectMessageFragment";
    command_.CommandType = CommandType.StoredProcedure;

    command_.Parameters.Add(new SqlParameter("uidPartID", partID));

    reader_ = command_.ExecuteReader(CommandBehavior.SequentialAccess);
}

The connection, along with other unmanaged resources, are reclaimed when the stream is disposed of.

protected override void Dispose(bool disposing)
{
    if (reader_ != null)
    {
        reader_.Dispose();
        reader_ = null;
    }

    if (command_ != null)
    {
        command_.Dispose();
        command_ = null;
    }

    if (connection_ != null)
    {
        connection_.Dispose();
        connection_ = null;
    }

    base.Dispose(disposing);
}

The most interesting part of the class is the Read() method, that performs the work of handing any requested data to the calling code, as well as read from the database if necessary.

private List<byte> remaining_bytes = new List<byte>();
private byte[] bytes_ = null;

public override int Read(byte[] buffer, int offset, int count)
{
    // prepend remaining bytes from previous reads

    List<byte> bytes = new List<byte>(count);
    bytes.AddRange(remaining_bytes);
    remaining_bytes.RemoveRange(0, remaining_bytes.Count);

    while (bytes.Count < count)
    {
        if (bytes_ == null)
        {
            if (reader_.Read())
                bytes_ = (byte[])reader_["imgFrag"];
            else
                break;
        }

        bytes.AddRange(bytes_);
        bytes_ = null;
    }

    // copy the temporary buffer to the requested output buffer

    int output_count = Math.Min(bytes.Count, count);

    bytes.CopyTo(0, buffer, offset, output_count);
    bytes.RemoveRange(0, output_count);
    remaining_bytes.AddRange(bytes);

    // update current stream position

    offset_ += output_count;

    // return number of bytes actually retrieved

    return output_count;
}

Each time some data is read from the stream, the class looks up any remaining data that that has been read from the database but not yet returned to the calling code.

Next, the class reads from the database as necessary, until enough data has been read so as to satisfy the length of the requested buffer.

Finally, when no more data is available from the database, the class hands out the remaining bits until there are no more left.

There you have a very simple, yet efficient way of retrieving potentially large amounts of data from an SQL Server 2005 in a streaming manner.

This entry was posted in BizTalk. Bookmark the permalink.