Async CTP: Task Completion in TPL Dataflow

   Posted by: Kyle Libbert

In our previous posts we introduced the concept of TPL Dataflow and even showed how you can take advantage of ActionBlock<T> to process incoming data in parallel. We also showed how you can control the degree of parallelism applied in a parallel dataflow through the use of the ExecutionDataflowBlockOptions MaxDegreeOfParallelism property. In this post we'll introduce the concept of task completion in a parallel dataflow.

Often times when dealing with these types of data loads in your applications you may want to post a large amount of asynchronous data for processing and then be notified when that processing has completed. We can do this easily in TPL DataFlow because each TPL Dataflow block has a lifetime, and each block type exposes mechanisms that allow us to monitor and control that lifetime such that we can wait for the tasks to complete all of their processing, or be notified when they have completed their processing. This can be accomplished via mechanisms on the Completion property exposed by ActionBlock<T>. The Completion property is a property hanging off of one of the core interfaces implemented by all TPL Dataflow blocks called IDataFlowBlock. As such, all TPL Dataflow blocks, including ActionBlock<T> expose the IDataFlowBlock.Completion property. Using the IDataflowBlock.Completion.Wait() statement we can wait for processing to complete after we've submitted all of our data.

Let's illustrate this in an example. In this example we'll use IDataflowBlock.Completion.Wait to wait for processing to complete. One interesting thing to note is that the Wait method will wait forever because it will never know that ActionBlock<T> is no longer receiving data. To circumvent this, we can tell ActionBlock<T> when we've finished posting data by issuing ActionBlock<T>'s base interface IDataflowBlock.Complete method. This will notify ActionBlock<T> that it will never be giving it more data, so once it's finished processing its queue it can shut itself down, thus notifying us that it has completed processing by releasing the Completion.Wait() call that our main thread is blocking on.

 
class Program
{
    static void Main(string[] args)
    {
        var action = new ActionBlock<int>(i =>
        {
            Thread.Sleep(500);
            Console.WriteLine(i);
        }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
 
        for (int i = 0; i < 10; i++) action.Post(i);
        action.Complete();
        Console.WriteLine("Posting Complete!");
        action.Completion.Wait();
    }
}

clip_image002

Besides IDataflowBlock.Complete and IDataflowBlock.Completion, IDataflowBlock also provides a Fault completion mechanism as well. Say for instance we are preprocessing and validating the incoming data before we queue it to our ActionBlock<T> in the example above. If we encounter invalid data during processing we may want to issue a IDataflowBlock.Fault() telling the ActionBlock<T> that we've encountered an invalid state and that is should discontinue processing.

In our next post we will pick up where we left off in our previous post and describe additional dataflow blocks available in TPL Dataflow.

Note: If you want to try these examples out make sure to add a reference to the new System.Threading.Tasks.DataFlow library in your project. Keep in mind; by default the Visual Studio Async CTP bits will be installed in your My Documents library under Microsoft Visual Studio Async CTP, so make sure to browse to that location under the Samples folder to add a reference to the System.Threading.Tasks.DataFlow assembly.

Async CTP: TPL Dataflow - ActionBlock<T>

   Posted by: Kyle Libbert

In our previous post, Async CTP - TPL DataFlow, we introduced TPL Dataflow a new library that was released for preview as part of the Microsoft Visual Studio Async CTP. In that post we introduced and described the capabilities that that library provides for building data parallelism into your .NET applications. In this post and the next several posts, we'll expand further on the TPL Dataflow capabilities and provide some examples for using the TPL Dataflow blocks in your applications.

There are several different types of dataflow blocks in the TPL Dataflow library that let you solve a variety of processing scenarios (Join, Transform, etc). One such type is ActionBlock<T>. If you recall from our previous post (Async CTP - TPL DataFlow) where we introduced TPL Dataflow, ActionBlock<T> is an ISourceBlock that accepts and buffers incoming data and performs some action on that data.

You provide ActionBlock<T> with a delegate that will act on the data being posted to it. That delegate executes on every piece of data, in order, that is provided to ActionBlock<T>. The data is posted to ActionBlock<T> via a Post method. The Post method gives ActionBlock<T> data which it will buffer up and internally start spinning tasks to process that data as fast as it can. We can bet illustrate this with an example.

The example below illustrates configuring an ActionBlock<T> with a delegate that simply writes integers out to the console in the order received. Outside ActionBlock<T> is a simple loop that posts integers to it for processing. To make it a little bit easier to see what's going if you try this yourself, we'll introduce a Thread.Sleep so that we can see when and in what order things complete.

 
class Program
{
    static void Main(string[] args)
    {
        var action = new ActionBlock<int>(i =>
            {
                Thread.Sleep(500);
                Console.WriteLine(i);
            });
 
        for (int i = 0; i < 10; i++) action.Post(i);
        Console.WriteLine("Posting Complete!");
        Console.ReadLine();
    }
}
 
clip_image002

There are a couple of interesting things to note from the output. First, you can see that almost immediately the 10 integers were posted to ActionBlock<T>, denoted by the "Posting Complete!" message displayed to the console. The second and more important thing is that the data is being processed in order. This is a key feature of TPL DataFlow. By default, ordering is maintained. This also means that while data is being processed asynchronously, it is also processed sequentially, guaranteeing the order of processing. In other words the data is being processed with a maximum degree of parallelism of 1. What this is telling us is that, in this scenario, the default behavior is not actually to process the input in parallel, but to process it in order, thus guaranteeing the order of processing. Since processing in parallel means processing concurrently and most likely, if not definitely means that things could get processed out of order.

That said there are certain processing scenarios where we are willing to give up on some of these ordering guarantees. ActionBlock<T> is configurable via a mechanism known as ExecutionDataflowBlockOptions that control the degree of parallelism that occurs. The ExecutionDataflowBlockOptions type is essentially an options bag for controlling how ActionBlock<T> will act. One of the options available on ExecutionDataflowBlockOptions is MaxDegreeOfParallelism which, among other things, allows you to specify the maximum number of parallel tasks to execute concurrently on the incoming data. By default this option is set to 1, thus guaranteeing the order of processing.

In this next example we illustrate this by increasing the MaxDegreeOfParallelism to 4. Here the task scheduler will create up to 4 parallel tasks to concurrently process the incoming data.

 
class Program
{
    static void Main(string[] args)
    {
        var action = new ActionBlock<int>(i =>
            {
                Thread.Sleep(500);
                Console.WriteLine(i);
            }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });
            
        for (int i = 0; i < 10; i++) action.Post(i);
        Console.WriteLine("Posting Complete!");
        Console.ReadLine();
    }
}
clip_image004

From the output we can see that the data was processed out of order due to the fact that ActionBlock<T> scheduled multiple tasks concurrently to process the incoming data. It is important to note that the MaxDegreeOfParallelism option is not specifying the actual number of tasks to use to process the data, but it is setting a limit on the number of tasks to use. The actual number of tasks used is controlled by the task scheduler, which incidentally can be changed via ExecutionDataflowBlockOptions. There are a variety of ExecutionDataFlowBlockOptions available for controlling how the TPL Dataflow blocks will work. We'll try to get to those in future posts.

ActionBlock<T> is just one example of data flow that exists in TPL Dataflow. However, there are others that further compliment the ability to easily perform parallel processing in your applications. Look for future postings that further describe these blocks as well as the other parallel computing capabilities of the TPF Dataflow library. In our next post we'll diverge slightly and talk about task completion and how we can control that use mechanisms provided to you by IDataflowBlock.

Async CTP: Intro to TPL DataFlow

   Posted by: Kyle Libbert

Many of you are probably already familiar with the parallel processing capabilities offered by the Task Parallel Library (TPL) that was first introduced in the .NET Framework 4. In .NET 4 TPL provided some core building blocks and algorithms for doing parallel computing in your .NET applications. Now that the development for the next version of the .NET platform is well under way, it is no surprise that Microsoft is looking for new ways to improve upon a good thing. One such update which should make the final cut is yet another way for developers to build highly responsive code using the new Async capabilities of .NET (see Andrew Troelson's Async CTP Anyone?), currently available as a community technology preview known as the Async CTP. One component of the Async CTP is TPL Dataflow, a library for doing agent/actor based data processing using parallel techniques.

The TPL Dataflow library was first released for preview as part of the Microsoft Visual Studio Async CTP, and it is also available in a separate TPL Dataflow CTP, both of which you can download from http://msdn.microsoft.com/vstudio/async. Both the Async CTP and TPL Dataflow CTP bits can be installed and used with Visual Studio 2010 SP1.

Typically when you did parallel programming with .NET 4 you were being proactive. It was usually the case that you had some data and you wanted to perform some computation on that data. For example, you may have a range of data that you wanted to perform some computation on, or you might need to filter your data in such a way that it was useful to you in your application. To solve these problems in .NET 4 with the Task Parallel Library you might have used Parallel.For or Parallel.Foreach, or possibly even a PLINQ query to iterate through the data and take advantage of the parallel constructs provided to you by TPL. In all these cases is was typically data first, then computations on that data later using primitives for tasks and data parallelism provided to you by the Task Parallel Library. What was missing was the ability to be reactive. Essentially the ability to setup your computation framework first, and then react to the data as it is coming in.

This reactive method of processing data is commonly referred to as dataflow parallelism. Essentially what you're doing is creating computational networks through which data can flow. Agent-based and actor-based message passing patterns like the producer/consumer pattern follow this reactive model. With the introduction of TPL Dataflow you will be able to use the parallel programming paradigms in your own .NET applications to build reactive dataflow networks. This is not to say that you couldn't have solved these problems in .NET 4 using Task Parallel Library, however it would have involved quite a bit of code to manage buffering data, scheduling tasks and dealing with the inter-process communications required to perform the work. The TPL Dataflow library gives you a lot of the primitives that you need to solve these problems out of the box without having to worry about the minutia required to implement such architectures.

So what is TPL Dataflow? Essentially it is a parallel processing library that exposes primitives for doing in-process messaging passing and processing. TPL Dataflow provides a set of agents, commonly referred to as dataflow blocks, or simply blocks, that contain the infrastructure required to buffer and process your data in an asynchronous and parallel manner. TPL Dataflow provides the infrastructure for being able to build data parallelism into your applications.

At the core of the TPL Dataflow library is its interface hierarchy, pictured below. These interfaces describe the behavior of a dataflow block. At the very top of the hierarchy is IDataflowBlock, defining contract for dealing with the lifetime of a dataflow block. Below IDataflowBlock are three sub-interfaces that define contracts for blocks that can be a source of data, blocks that can be a target for data, and blocks that can be both a source and target for data. ISourceBlock represents a source of data, defining a contract for buffering and receiving data. ITargetBlock represents a target for incoming data, defining a contract for buffering and passing data. Finally IPropagatorBlock represents a block that can be both a source and a target for data, defining a contract for receiving data from sources, possibly transforming that data and propagating the result on to other targets.

clip_image002

With these interfaces you can develop your own blocks to build your own custom dataflow networks, but there are also several built in blocks for the most common scenarios like buffering and propagation of data, acting on data, and even blocks for joining data from multiple sources then buffering and presenting the result to a target.

For buffering and propagation of data TPL Dataflow provides the following blocks:

BufferBlock<T> - buffers incoming data and delivers that data to a target in an asynchronous and parallel fashion.

WriteOnceBlock<T> - accepts one piece of data and delivers that piece of data in a broadcast fashion to all target blocks interested in that piece of data.

BroadcastBlock<T> - accepts multiple pieces of data, buffering them as they arrive and broadcasts each piece of data to all interested target blocks.

For acting on incoming data TPL Dataflow provides the following dataflow blocks:

ActionBlock<Tin> - an ISourceBlock that accepts and buffers incoming data and performs some action on that data.

TransformBlock<Tin, Tout> - an IPropagatorBlock that accepts and buffers incoming data, executes some function on that data and then buffers the output to be consumed by a target.

TransformManyBlock<Tin, Tout> - very similar to TransformBlock, however the relationship to its targets is one-to-many. It buffers and transforms incoming data, then buffers and makes that data available to multiple targets.

For joining and grouping data TPL Dataflow provides the following dataflow blocks:

BatchBlock<T> - An IPropagatorBlock that accepts and buffers incoming data and groups that data into batches (arrays).

JoinBlock<T1, T2> - Accepts and buffers data from two different source blocks, combines data from each source into tuples, then buffers and makes those tuples available to a target.

BatchedJoinBlock<T1, T2> - a combination of BatchBlock and JoinBlock that groups pairs of collections of incoming data into tuples of those collections, buffering and presenting that data as output to a target.

As you can see TPL Dataflow provides you with some significant capabilities for developing parallel dataflow capabilities into your own applications without having to reinvent the infrastructure to do so. This was a quick look at the TPL Dataflow and the interface hierarchy upon which it is build, as well as some of the built in capabilities for building applications that incorporate data parallelism. Watch for future posts where we'll look at some of these built in mechanisms along with examples demonstrating how you can get started using TPL Dataflow.

If you are interested in looking at the current state of this new technology, here are some helpful links: 

Find Us
Contact Us 651-288-7000 1-800-866-9884
Home | Training | Curriculum | Course Finder | Schedule | Enroll | Twin Cities Java User Group | Consulting | Foundation | Jobs | About Us | Our Story | Press Room | Instructors | President | Map & Directions | Sitemap

Java Training | JSF / Struts / Spring / Hibernate Training | Java Power Tools Training | .NET 4.0 & Visual Studio 2010 Training | Microsoft Web Development Training | Prism / MVVM / MEF Training | .NET 3.5 and Visual Studio 2008 Training | .NET 2.0 and Visual Studio 2003 Training | Cloud Computing Training | Ajax / Web Services / XML Training | Groovy and Grails Training | SQL Server 2012 Training | SQL Server 2008 Training | SQL Server 2005 Training | Mobile Development Training | SharePoint 2010 Training | SharePoint 2007 Training | Agile, Process, Analysis & Design Training | Arch/Design Patterns Training | Microsoft Official Curriculum Training | Web Development Training | Ruby Training | Rational Application Developer (RAD) Training | WebSphere Application Server Training | WebSphere Portal Training | WebLogic Training | Boot Camp Training | Project Management Training | C / C++ Training | Metro / WinRT / Windows 8 Development Training | Retired

Intertech delivers training on-site and virtually serving cities including Phoenix, AZ | San Francisco, CA | Los Angeles, CA | San Diego, CA | San Jose, CA | Washington, DC | Chicago, IL | Orlando, FL | Boston, MA | Duluth, MN | Minneapolis St. Paul, MN | Rochester, MN | Raleigh-Durham, NC | New York, NY | Philadelphia, PA | Austin, TX | Dallas, TX | Houston, TX | Seattle, WA.