Introduction

DataFlow is a component of the TPL (Task Parallel Library) who provides the ability to create complex pipeline tasks. It supports asynchronous and parallelism programming. For more information see http://msdn.microsoft.com/en-us/library/hh228603(v=vs.110).aspx.

To install TPL Dataflow, run the following command in the Package Manager Console.

PM> Install-Package Microsoft.Tpl.Dataflow

It’s like Lego™

DataFlow works with different blocks who, once linked, make a pipeline for your data. You just need to choose the good one !

ActionBlock<TInput>

  • Provides a block that invokes an Action<T> delegate for each element received
  • You can’t link the output with another block
private async void Initialize()
{
	ActionBlock<int> incrementActionBlock = new ActionBlock<int>(number =>
	{
		Console.WriteLine(++number);
	});

	for (int i = 0; i < 10; i++)
	{
		await incrementActionBlock.SendAsync(i);
	}

	// CONSOLE OUTPUT
	//
	// 1
	// 4
	// 5
	// 6
	// 7
	// 8
	// 9
	// 10
	// 2
	// 3
}

TransformBlock<TInput, TOutput>

  • Provides a block that invokes an Func<T, TResult> delegate for each element received
  • You can link the output with another block
private async void Initialize()
{
	TransformBlock<string, int> convertTransformBlock = new TransformBlock<string, int>(text =>
	{
		return int.Parse(text);
	}, new ExecutionDataflowBlockOptions
	{
		MaxDegreeOfParallelism = 2
	});

	ActionBlock<int> incrementActionBlock = new ActionBlock<int>(number =>
	{
		Console.WriteLine(++number);
	}, new ExecutionDataflowBlockOptions
	{
		MaxDegreeOfParallelism = 2
	});

	// Link the blocks to create a pipeline !
	convertTransformBlock.LinkTo(incrementActionBlock);

	List<string> numbers = new List<string> { "0", "1", "2", "3", "4", "5", "6", "7", "8", "9" };

	// Send data to the TransformBlock
	foreach (var number in numbers)
	{
		await convertTransformBlock.SendAsync(number);
	}

	// CONSOLE OUTPUT
	//
	// 2
	// 1
	// 3
	// 4
	// 5
	// 6
	// 7
	// 9
	// 10
	// 8
}

We notice that the results in the output are displayed randomly due to the call of asynchronous method SendAsync and because we activate parallelism with the parameter MaxDegreeOfParallelism.

BufferBlock<T>

  • Provides a block to store data
  • You can link the output with another block
TransformBlock<string, int> convertTransformBlock = new TransformBlock<string, int>(text =>
{
 return int.Parse(text);
});

BufferBlock<int> temporaryBufferBlock = new BufferBlock<int>();

ActionBlock<int> incrementActionBlock = new ActionBlock<int>(number =>
{
 Console.WriteLine(++number);
});

// Link the blocks to create a pipeline !
convertTransformBlock.LinkTo(temporaryBufferBlock, new DataflowLinkOptions { PropagateCompletion = true });

List<string> numbers = new List<string> { "0", "1", "2" };

// Send data to the TransformBlock and buffer them
foreach (var number in numbers)
{
 convertTransformBlock.SendAsync(number);
}

// Signal to the block that all data was sent
convertTransformBlock.Complete();

// Do something crazy here... :)

// Link the buffer to the ActionBlock to end the process
temporaryBufferBlock.LinkTo(incrementActionBlock);

// CONSOLE OUTPUT
//
// 1
// 2
// 3

We use the DataflowLinkOptions to set the property PropagateCompletion to automatically signal to the target block when the source block is completed.

JoinBlock<T1, T2>

  • Provides a block that joins multiple data sources and waits one item of each kind in order to create a Tuple.
  • You can link the output with another block
JoinBlock<double, float> joinBlock = new JoinBlock<double, float>();

ActionBlock<Tuple<double, float>> incrementActionBlock = new ActionBlock<Tuple<double, float>>(number =>
{
	// Performs action only when the block receives each kind of number.
	Console.WriteLine(number.Item1 + number.Item2);
});

// Link the blocks to create a pipeline !
joinBlock.LinkTo(incrementActionBlock, new DataflowLinkOptions { PropagateCompletion = true });

List<double> doubles = new List<double> { 0, 1, 2 };
List<float> floats = new List<float> { 0, 1, 2 };

// Send data to the transform block and buffer them
foreach (var doubleNumber in doubles)
{
	joinBlock.Target1.SendAsync(doubleNumber);
}

foreach (var floatNumber in floats)
{
	joinBlock.Target2.SendAsync(floatNumber);
}

// Signal to the block that all data was sent
joinBlock.Complete();

// CONSOLE
//
// 0
// 2
// 4

The data passed to the JoinBlock are not necessarily of the same type.

BatchBlock<T>

  • Provides a block that batchs data into an array of a specific size
  • You can link the output with another block
BatchBlock<int> groupBatchBlock = new BatchBlock<int>(3);

ActionBlock<int[]> incrementActionBlock = new ActionBlock<int[]>(numbers =>
{
	Console.WriteLine(numbers.Aggregate((begin, end) => begin + end));
});

groupBatchBlock.LinkTo(incrementActionBlock, new DataflowLinkOptions { PropagateCompletion = true });

for (int i = 0; i < 10; i++)
{
	groupBatchBlock.Post(i);
}

// Signal to the block that all data was sent
groupBatchBlock.Complete();

// CONSOLE OUTPUT
//
// 3
// 12
// 21
// 9

BroadcastBlock<T>

  • Provides a buffer for storing at most one element at time, overwriting each message with the next as it arrives
  • You can link the output with other blocks
private async void Initialize()
{
	BroadcastBlock<string> broadcastBlock = new BroadcastBlock<string>(text =>
	{
		return text.ToUpper();
	}, new DataflowBlockOptions { BoundedCapacity = 1 });

	ActionBlock<string> helloActionBlock = new ActionBlock<string>(text =>
	{
		Console.WriteLine("Hello {0}", text);
	});

	ActionBlock<string> byeActionBlock = new ActionBlock<string>(text =>
	{
		Console.WriteLine("Bye {0}", text);
	});

	// Broadcast the message to others blocks
	broadcastBlock.LinkTo(helloActionBlock, new DataflowLinkOptions { PropagateCompletion = true });
	broadcastBlock.LinkTo(byeActionBlock, new DataflowLinkOptions { PropagateCompletion = true });

	List<string> names = new List<string> { "Max", "Jon", "Michel" };

	foreach (var name in names)
	{
		await broadcastBlock.SendAsync(name);
	}

	// Signal to the block that all data was sent
	broadcastBlock.Complete();

	// CONSOLE OUTPUT
	//
	// Bye MAX
	// Bye JON
	// Hello MAX
	// Hello JON
	// Hello MICHEL
	// Bye MICHEL
}

We set the BoundedCapacity of our block to be able to receive one and only message to treat.
Once again, the results in the output are displayed randomly due to the call of asynchronous method SendAsync.