Worker thread using parallel tasks

Worker thread is a known pattern – there’s work to do, it needs to be done asynchronously and we want to get all the work results when it’s ready. What we’re going to see is an implementation of it as an alternative to the common implementations. This implementation will take advantage of the new parallel tasks library.
To formalize the requirements:

  • The worker queues items to process
  • The items are processed asynchronously
  • Only one item can be processed at a time
  • The items are processed in the order they were queued
  • The worker will store the processed results in the order they were processed

The worker class

public class Worker
{
private readonly IItemsProcessor itemsProcessor;
private Task lastTask;

public IList ProcessedItems { get; private set; }

public Worker(IItemsProcessor itemsProcessor)
{
this.itemsProcessor = itemsProcessor;
ProcessedItems =
new List();
InitializeNullTask();
}

private void InitializeNullTask()
{
lastTask =
new Task(() => default(TResult));
lastTask.Start();
}

public void ProcessItem(TItem item)
{
var nextTask = lastTask
.ContinueWith(task =>
{
var processItem = itemsProcessor.ProcessItem(item);
ProcessedItems.Add(processItem);
});
lastTask = nextTask;
}

public void WaitForPendingItems()
{
using (var sync = new ManualResetEvent(false))
{
lastTask.ContinueWith(task => sync.Set());
sync.WaitOne();
}
}
}

The worker creates a task for each item which needs to be processed. Each task is executed in the thread pool, the point where we ensure that the tasks are run in the correct order is the ContinueWith call. ContinueWith takes care of the order of the tasks’ execution.

The InitializeNullTask creates a task that, surprisingly, does nothing but set a head to the tasks queue. This task helps us avoid in ProcessItem to check if this is the first item to process. The first task starts with call to Start while all the others start with ContinueWith.

WaitForPendingItems also enqueues a task. This time, the task is only waiting to be executed, which means all other items were already processed. When the task starts it releases the enqueuing thread.

Usage example

In this example we’ll download a list of web pages and check print their sizes. The downloader implements the IItemsProcessor we’ve seen the worker expects.

public class WebUrlsDownloader : IItemsProcessor<string, byte[]>
{
public byte[] ProcessItem(string url)
{
using (var webClient = new WebClient())
{
return webClient.DownloadData(url);
}
}
}

And the actual usage:

public void DownloadFiles()
{
var worker = new Worker<string, byte[]>(new WebUrlsDownloader());
worker.ProcessItem(
@"http://msdn.microsoft.com/en-us/library/dd537608.aspx");
worker.ProcessItem(
@"http://msdn.microsoft.com/en-us/library/dd537609.aspx");
worker.ProcessItem(
@"http://msdn.microsoft.com/en-us/library/dd997405.aspx");
worker.WaitForPendingItems();
Console.WriteLine("Finished downloading files:");
foreach (var processedItem in worker.ProcessedItems)
{
Console.WriteLine("Downloaded file with size: {0}", processedItem.Length);
}
}
Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s