首页 > 编程语言 > 详细

C#线程 并行编程

时间:2020-03-22 19:03:16      阅读:83      评论:0      收藏:0      [点我收藏+]

第五部分 并行编程 

 

在本节中,我们将介绍Framework 4.0新增的利用多核处理器的多线程API:

  1. 并行LINQ或PLINQ
  2. Parallel 类
  3. 任务并行性构造
  4. 并发集合
  5. 自旋锁和自旋等待

这些API统称为(松散地)称为PFX(并行框架)。并行类与任务并行性构造一起被称为任务并行库或TPL。

Framework 4.0还添加了许多针对传统多线程的较低级线程构造。我们之前介绍了这些内容:

  • 低延迟信令构造(SemaphoreSlim,ManualResetEventSlim,CountdownEvent和Barrier)
  • 用于合作取消的取消令牌
  • 惰性初始化类
  • ThreadLocal <T>

在继续之前,您需要熟悉1-4部分的基础知识,尤其是锁定和线程安全性。

 

并行编程部分中的所有代码清单都可以在LINQPad中作为交互式样本获得。 LINQPad是C#代码暂存器,是测试代码段的理想选择,而无需创建周围的类,项目或解决方案。要访问示例,请在左下方的LINQPad的“示例”选项卡中单击“下载更多示例”,然后在“ Nutshell:更多章节”中选择“ C#4.0”。

Why PFX?

最近,CPU时钟速度停滞不前,制造商已将重点转移到增加内核数上。作为程序员,这对我们来说是个问题,因为标准的单线程代码不会因为这些额外的内核而自动运行得更快。

对于大多数服务器应用程序而言,利用多个内核很容易,因为每个线程可以独立处理一个单独的客户端请求,但是在桌面上则比较困难-因为通常需要您使用计算密集型代码并执行以下操作:

 

  1. 将其分成小块。
  2. 通过多线程并行执行这些块。
  3. 当结果可用时,以线程安全和高效的方式整理结果。

尽管您可以使用经典的多线程结构来完成所有这些操作,但是这很尴尬-特别是分区和整理的步骤。另一个问题是,当多个线程同时处理同一数据时,通常的线程安全锁定策略会引起很多争用。

PFX库是专门为在这些情况下提供帮助而设计的。

利用多核或多个处理器进行编程称为并行编程。这是更广泛的多线程概念的子集。

PFX 概念

在线程之间划分工作有两种策略:数据并行性和任务并行性。

当必须对多个数据值执行一组任务时,我们可以通过让每个线程对值的子集执行(相同)一组任务来并行化。之所以称为数据并行性,是因为我们正在线程之间划分数据。相反,通过任务并行性,我们可以划分任务;换句话说,我们让每个线程执行不同的任务。

通常,数据并行性更容易并且可以更好地扩展到高度并行的硬件,因为它减少或消除了共享数据(从而减少了争用和线程安全问题)。此外,数据并行利用了这样一个事实,即数据值通常比离散任务多,从而增加了并行的可能性。

数据并行性还有助于结构化并行性,这意味着并行工作单元在程序中的同一位置开始和结束。相反,任务并行性往往是非结构化的,这意味着并行工作单元可能在程序中分散的位置开始和结束。结构化并行机制更简单,更不易出错,并且允许您将分区和线程协调(甚至结果归类)这一艰巨的工作移植到库中。

PFX 组件

PFX包括两层功能。较高的层由两个结构化的数据并行API组成:PLINQ和Parallel类。下层包含任务并行性类-以及一组有助于并行编程活动的其他构造。

 技术分享图片

 

 

PLINQ提供了最丰富的功能:它使并行化的所有步骤自动化-包括将工作划分为任务,在线程上执行这些任务,以及将结果整理为单个输出序列。这就是声明性的-因为您只需声明要并行化您的工作(将其结构化为LINQ查询),然后让框架来处理实现细节即可。相反,其他方法则势在必行,因为您需要显式编写代码以进行分区或整理。对于Parallel类,您必须自己整理结果。使用任务并行性构造,您还必须自己划分工作:

 

 

 

  分区工作 整理结果
PLINQ
Parallel 类
PFX的任务并行性

 

 

并发集合和旋转原语可以帮助您进行较低级别的并行编程活动。这些非常重要,因为PFX不仅可以与当今的硬件一起使用,而且还可以与具有更多内核的下一代处理器一起使用。如果您要移动一堆切碎的木材并且有32名工人来完成这项工作,那么最大的挑战就是在移动木材时不让工人互相妨碍。将算法分配到32个内核中是相同的:如果使用普通锁来保护公共资源,则产生的阻塞可能意味着这些内核中只有一小部分实际上曾经同时处于繁忙状态。并发集合专门针对高度并发访问进行了调整,重点是最小化或消除阻塞。 PLINQ和Parallel类本身依靠并发集合和自旋原语来有效地管理工作。

 

PFX和传统多线程

 

传统的多线程方案是即使在单核计算机上也可以受益的多线程方案,而没有真正的并行化发生。我们之前已经介绍了这些内容:它们包括诸如维护响应式用户界面和一次下载两个网页之类的任务。

 

我们在并行编程部分将介绍的某些构造有时在传统的多线程中也很有用。尤其是:

 

  1. 每当您要并行执行操作然后等待它们完成(结构化并行性)时,PLINQ和Parallel类就很有用。这包括非CPU密集型任务,例如调用Web服务。
  2. 当您要在池线程上运行某些操作,并希望通过连续性和父/子任务管理任务的工作流时,任务并行性构造很有用。
  3. 当您想要线程安全的队列,堆栈或字典时,并发集合有时是合适的。
  4. BlockingCollection提供了一种实现生产者/消费者结构的简便方法。

 

何时使用PFX

PFX的主要用例是并行编程:利用多核处理器来加快计算密集型代码的速度。

 

利用多核的挑战是阿姆达尔定律,该定律指出并行化带来的最大性能改进取决于必须顺序执行的代码部分。例如,如果算法的执行时间只有三分之二是可并行化的,那么即使拥有无限数量的内核,您也永远不会超过三倍的性能提升。

 

因此,在继续之前,值得验证瓶颈在可并行代码中。同样值得考虑的是,您的代码是否需要占用大量计算资源-优化通常是最简单,最有效的方法。不过,这需要权衡取舍,因为有些优化技术会使并行代码变得更加困难。

 

最简单的收获来自所谓的“尴尬的并行问题”,即可以轻松地将一项工作划分为自己有效执行的任务(结构化并行非常适合此类问题)。示例包括许多图像处理任务,光线跟踪以及数学或密码学中的蛮力方法。毫不费力地并行问题的一个示例是实现quicksort算法的优化版本-一个好的结果需要一些思考,并且可能需要非结构化并行。

PLINQ

PLINQ自动并行化本地LINQ查询。 PLINQ的优点是易于使用,因为它减轻了工作分区和结果整理的负担。

要使用PLINQ,只需在输入序列上调用AsParallel(),然后照常继续LINQ查询。以下查询计算3到100,000之间的质数-充分利用目标计算机上的所有内核:

// Calculate prime numbers using a simple (unoptimized) algorithm.
//
// NB: All code listings in this chapter are available as interactive code snippets in LINQPad.
// To activate these samples, click Download More Samples in LINQPad‘s Samples tab in the 
// bottom left, and select C# 4.0 in a Nutshell: More Chapters.
 
IEnumerable<int> numbers = Enumerable.Range (3, 100000-3);
 
var parallelQuery = 
  from n in numbers.AsParallel()
  where Enumerable.Range (2, (int) Math.Sqrt (n)).All (i => n % i > 0)
  select n;
 
int[] primes = parallelQuery.ToArray();

 

 

AsParallel是System.Linq.ParallelEnumerable中的扩展方法。它将输入包装为基于ParallelQuery <TSource>的序列,这将导致随后调用的LINQ查询运算符绑定到在ParallelEnumerable中定义的另一组扩展方法。这些提供了每个标准查询运算符的并行实现。本质上,它们通过将输入序列划分为在不同线程上执行的块,然后将结果整理回单个输出序列以供使用来工作:

技术分享图片

 

 

 

 

调用AsSequential()会解开ParallelQuery序列,以便后续查询运算符绑定到标准查询运算符并按顺序执行。在调用有副作用或不是线程安全的方法之前,这是必需的。

 

对于接受两个输入序列(Join,GroupJoin,Concat,Union,Intersect,Except和Zip)的查询运算符,必??须将AsParallel()应用于两个输入序列(否则,将引发异常)。但是,您不需要在查询进行过程中继续对查询应用AsParallel,因为PLINQ的查询运算符会输出另一个ParallelQuery序列。实际上,再次调用AsParallel会导致效率低下,因为它会强制合并和重新划分查询:

mySequence.AsParallel()           // Wraps sequence in ParallelQuery<int>
          .Where (n => n > 100)   // Outputs another ParallelQuery<int>
          .AsParallel()           // Unnecessary - and inefficient!
          .Select (n => n * n)

 

并非所有查询运算符都可以有效地并行化。对于那些无法执行的操作,PLINQ会依次实现操作符。如果PLINQ怀疑并行化的开销实际上会减慢特定查询的速度,则它也可以顺序运行。

 

PLINQ仅适用于本地集合:它不适用于LINQ to SQL或Entity Framework,因为在这种情况下,LINQ会转换为SQL,然后在数据库服务器上执行。但是,您可以使用PLINQ对从数据库查询获得的结果集执行其他本地查询。

 

如果PLINQ查询引发异常,则将其重新引发为AggregateException,其InnerExceptions属性包含实际的异常(或多个异常)。有关详细信息,请参见使用AggregateException。

Why Isn’t AsParallel the Default?

鉴于AsParallel透明地并行化LINQ查询,出现了一个问题:“ Microsoft为什么不简单地并行化标准查询运算符并将PLINQ设置为默认值?”

 

选择加入的原因有很多。首先,为了使PLINQ有用,必须将其合理数量的计算密集型工作移植到工作线程中。大多数LINQ to Objects查询执行得非常快,不仅不需要并行化,而且分区,整理和协调额外线程的开销实际上可能会使事情变慢。

 

另外:

 

在元素排序方面,PLINQ查询的输出(默认情况下)可能与LINQ查询不同。

PLINQ将异常包装在AggregateException中(以处理引发多个异常的可能性)。

如果查询调用线程不安全的方法,PLINQ将给出不可靠的结果。

最后,PLINQ提供了许多挂钩来进行调整和调整。细微的负担使标准LINQ to Objects API负担加重。

 

 并行执行弹道 

 

像普通的LINQ查询一样,PLINQ查询也会被延迟评估。这意味着仅当您开始使用结果时才会触发执行-通常通过foreach循环(尽管也可以通过转换运算符(例如ToArray)或返回单个元素或值的运算符)触发执行。

 

但是,在枚举结果时,执行过程与普通顺序查询的执行过程有所不同。顺序查询完全由使用者以“拉”方式提供动力:输入序列中的每个元素都是在使用者需要时准确获取的。并行查询通常使用独立线程来在消费者需要时(比方说新闻阅读器的电话提示器或CD播放器中的防跳缓冲区)在需要时从输入序列中稍稍提取元素。然后,它通过查询链并行处理元素,将结果保存在较小的缓冲区中,以便按需为消费者准备就绪。如果使用者提前暂停或退出枚举,查询处理器也会暂停或停止,以免浪费CPU时间或内存。

 

 

您可以通过在AsParallel之后调用WithMergeOptions来调整PLINQ的缓冲行为。通常,AutoBuffered的默认值可以提供最佳的整体效果。 NotBuffered禁用缓冲区,如果您希望尽快看到结果,则很有用; FullyBuffered在将整个结果集呈现给使用者之前会对其进行高速缓存(OrderBy和Reverse运算符自然会以这种方式工作,元素,聚合和转换运算符也是如此)。

 

 

 PLINQ和订购 

 

并行查询操作符的副作用是,整理结果后,结果不一定与提交结果的顺序相同,如上图所示。换句话说,LINQ对序列的常规顺序保留保证不再成立。

 

如果需要保留订单,可以通过在AsParallel()之后调用AsOrdered()来强制执行以下操作:

 

myCollection.AsParallel()。AsOrdered()...

调用AsOrdered会导致大量元素的性能下降,因为PLINQ必须跟踪每个元素的原始位置。

 

您可以稍后通过调用AsUnordered在查询中取消AsOrdered的效果:这引入了“随机混叠点”,该点使查询从该点开始可以更高效地执行。因此,如果您只想保留前两个查询运算符的输入序列顺序,则可以执行以下操作:

inputSequence.AsParallel().AsOrdered()
  .QueryOperator1()
  .QueryOperator2()
  .AsUnordered()       // From here on, ordering doesn’t matter
  .QueryOperator3()
  ...

 

AsOrdered不是默认值,因为对于大多数查询而言,原始输入顺序并不重要。换句话说,如果默认为AsOrdered,则必须对大多数并行查询应用AsUnordered才能获得最佳性能,这会很麻烦。

 

PLINQ局限性

目前,PLINQ可以并行化方面存在一些实际限制。这些限制可能在后续的Service Pack和Framework版本中放松。

 

以下查询运算符可防止查询并行化,除非源元素处于其原始索引位置:

 

  • Take,TakeWhile,Skip和SkipWhile
  • Select,SelectMany和ElementAt的索引版本

大多数查询运算符都会更改元素的索引位置(包括那些删除元素的索引位置,例如Where)。这意味着,如果要使用前面的运算符,通常需要在查询开始时使用它们。

 

以下查询运算符是可并行化的,但是使用昂贵的分区策略,该策略有时可能比顺序处理要慢:

 

  • JoinGroupByGroupJoinDistinctUnionIntersect, and Except

Aggregate运算符在其标准版本中的种子重载是不可并行化的-PLINQ提供了特殊的重载来处理此问题。

 

所有其他运算符都是可并行化的,尽管使用这些运算符并不能保证您的查询将被并行化。如果PLINQ怀疑并行化的开销会减慢该特定查询的速度,则可以按顺序运行您的查询。您可以通过在AsParallel()之后调用以下代码来覆盖此行为并强制并行化:

 

.WithExecutionMode(ParallelExecutionMode.ForceParallelism)

 示例:并行拼写检查器

假设我们想编写一个拼写检查器,以利用所有可用的内核快速运行非常大的文档。通过将我们的算法表述为LINQ查询,我们可以非常轻松地对其进行并行化。

 

第一步是将英语单词词典下载到HashSet中以进行有效查找:

if (!File.Exists ("WordLookup.txt"))    // Contains about 150,000 words
  new WebClient().DownloadFile (
    "http://www.albahari.com/ispell/allwords.txt", "WordLookup.txt");
 
var wordLookup = new HashSet<string> (
  File.ReadAllLines ("WordLookup.txt"),
  StringComparer.InvariantCultureIgnoreCase);

 

然后,我们将使用单词查找功能创建一个测试“文档”,其中包含一百万个随机单词的数组。构建数组之后,我们将引入一些拼写错误:

var random = new Random();
string[] wordList = wordLookup.ToArray();
 
string[] wordsToTest = Enumerable.Range (0, 1000000)
  .Select (i => wordList [random.Next (0, wordList.Length)])
  .ToArray();
 
wordsToTest [12345] = "woozsh";     // Introduce a couple
wordsToTest [23456] = "wubsie";     // of spelling mistakes.

 

现在,我们可以通过针对wordLookup测试wordsToTest来执行并行拼写检查。 PLINQ使这非常容易:

var query = wordsToTest
  .AsParallel()
  .Select  ((word, index) => new IndexedWord { Word=word, Index=index })
  .Where   (iword => !wordLookup.Contains (iword.Word))
  .OrderBy (iword => iword.Index);
 
query.Dump();     // Display output in LINQPad

这是在LINQPad中显示的输出:

OrderedParallelQuery<IndexedWord> (2 items)

Word

Index

woozsh

12345

wubsie

23456

 

IndexedWord是我们定义的自定义结构,如下所示:

 

structIndexedWord { publicstring Word; publicint Index; }

谓词中的wordLookup.Contains方法为查询提供了一些“肉”,使其值得并行化。

 

我们可以使用匿名类型而不是IndexedWord结构来稍微简化查询。但是,这会降低性能,因为匿名类型(是类,因此是引用类型)会导致基于堆的分配和后续垃圾收集的开销。

 

对于顺序查询而言,这种差异可能不足以解决问题,但对于并行查询,偏向基于堆栈的分配可能会非常有利。这是因为基于堆栈的分配是高度可并行化的(因为每个线程都有自己的堆栈),而所有线程必须竞争同一堆-由单个内存管理器和垃圾收集器管理。

使用 ThreadLocal<T>

让我们通过并行创建随机测试词列表本身来扩展我们的示例。我们将此结构构造为LINQ查询,因此应该很容易。这是顺序版本:

string[] wordsToTest = Enumerable.Range (0, 1000000)
  .Select (i => wordList [random.Next (0, wordList.Length)])
  .ToArray();

 

不幸的是,对random.Next的调用不是线程安全的,因此它不像在查询中插入AsParallel()那样简单。潜在的解决方案是编写一个锁定random的函数。但是,这将限制并发性。更好的选择是使用ThreadLocal <Random>为每个线程创建一个单独的Random对象。然后我们可以并行化查询,如下所示:

var localRandom = new ThreadLocal<Random>
 ( () => new Random (Guid.NewGuid().GetHashCode()) );
 
string[] wordsToTest = Enumerable.Range (0, 1000000).AsParallel()
  .Select (i => wordList [localRandom.Value.Next (0, wordList.Length)])
  .ToArray();

 

在用于实例化Random对象的工厂函数中,我们传入Guid的哈希码,以确保如果在短时间内创建了两个Random对象,它们将产生不同的随机数序列。

 

何时使用PLINQ

 

试图在现有应用程序中搜索LINQ查询并尝试并行化它们很诱人。这通常是无济于事的,因为对于LINQ显然是最佳解决方案的大多数问题,它们往往很快就会执行,因此无法从并行化中受益。更好的方法是找到占用大量CPU的瓶颈,然后考虑“可以将其表示为LINQ查询吗?” (这种重组的令人欢迎的副作用是,LINQ通常使代码更小,更易读。)

 

PLINQ非常适合解决令人尴尬的并行问题。它也可以很好地用于结构化阻止任务,例如一次调用多个Web服务(请参阅调用阻止或I / O密集功能)。

 

PLINQ对于成像而言可能不是一个好的选择,因为将数百万个像素整理到一个输出序列中会造成瓶颈。相反,最好将像素直接写入数组或非托管内存块,并使用Parallel类或任务并行性来管理多线程。 (但是,可以使用ForAll打败结果归类。如果图像处理算法自然地适合LINQ,则这样做很有意义。)

功能纯度

 

由于PLINQ在并行线程上运行查询,因此必须注意不要执行线程不安全的操作。特别是,写入变量会产生副作用,因此线程不安

// The following query multiplies each element by its position.
// Given an input of Enumerable.Range(0,999), it should output squares.
int i = 0;
var query = from n in Enumerable.Range(0,999).AsParallel() select n * i++;

 

我们可以通过使用锁或互锁来使i线程安全递增,但问题仍然存在,即我不一定与输入元素的位置相对应。并且向查询中添加AsOrdered不能解决后一个问题,因为AsOrdered仅确保按与顺序处理顺序一致的顺序输出元素-实际上不会按顺序处理它们。

 

而是,应重写此查询以使用Select的索引版本:

 

varquery = Enumerable.Range(0,999).AsParallel().Select ((n, i) => n * i); 

为了获得最佳性能,从查询运算符调用的任何方法都应该是线程安全的,因为它不会写入字段或属性(无副作用或功能纯净)。如果它们通过锁定是线程安全的,则查询的并行性潜力将受到限制-锁定的持续时间除以该函数所花费的总时间。

 

调用阻塞或I / O密集型函数

有时,查询之所以能够长期运行,并不是因为它占用大量CPU资源,而是因为它会等待某些内容,例如网页下载或某些硬件响应。如果您通过在AsParallel之后调用WithDegreeOfParallelism进行提示,PLINQ可以有效地并行化此类查询。例如,假设我们要同时对六个网站执行ping操作。无需使用笨拙的异步委托或手动拆分六个线程,我们可以通过PLINQ查询轻松完成此任务:

from site in new[]
{
  "www.albahari.com",
  "www.linqpad.net",
  "www.oreilly.com",
  "www.takeonit.com",
  "stackoverflow.com",
  "www.rebeccarey.com"  
}
.AsParallel().WithDegreeOfParallelism(6)
let p = new Ping().Send (site)
select new
{
  site,
  Result = p.Status,
  Time = p.RoundtripTime
}

 


WithDegreeOfParallelism强制PLINQ同时运行指定数量的任务。在调用诸如Ping.Send之类的阻止功能时,这是必需的,因为PLINQ否则假设查询是CPU密集型的,并相应地分配任务。例如,在两核计算机上,PLINQ可能默认为一次仅运行两个任务,这在这种情况下显然是不可取的。

 

PLINQ通常为每个任务提供一个线程,并由线程池分配。您可以通过调用ThreadPool.SetMinThreads来加速线程的初始启动。

 

再举一个例子,假设我们正在编写一个监视系统,并且想要将来自四个安全摄像机的图像重复组合成一个合成图像,以显示在CCTV上。我们将代表以下类别的相机:

class Camera
{
  public readonly int CameraID;
  public Camera (int cameraID) { CameraID = cameraID; }
 
  // Get image from camera: return a simple string rather than an image
  public string GetNextFrame()
  {
    Thread.Sleep (123);       // Simulate time taken to get snapshot
    return "Frame from camera " + CameraID;
  }
}

 

要获得合成图像,我们必须在四个相机对象的每一个上调用GetNextFrame。假设操作是受I / O限制的,那么即使在单核计算机上,我们也可以将并行化的帧速率提高四倍。 PLINQ通过最少的编程工作即可实现:

Camera[] cameras = Enumerable.Range (0, 4)    // Create 4 camera objects.
  .Select (i => new Camera (i))
  .ToArray();
 
while (true)
{
  string[] data = cameras
    .AsParallel().AsOrdered().WithDegreeOfParallelism (4)
    .Select (c => c.GetNextFrame()).ToArray();
 
  Console.WriteLine (string.Join (", ", data));   // Display data...
}

 

GetNextFrame是一种阻塞方法,因此我们使用WithDegreeOfParallelism获得所需的并发性。在我们的示例中,阻塞发生在我们调用Sleep时;在现实生活中,它会阻塞,因为从摄像机获取图像是I / O而不是CPU密集型的。

 

调用AsOrdered可确保图像以一致的顺序显示。因为序列中只有四个元素,所以对性能的影响可以忽略不计。

 

改变并行度

您只能在PLINQ查询中调用WithDegreeOfParallelism。如果需要再次调用它,则必须通过在查询中再次调用AsParallel()来强制合并和重新分区查询:

"The Quick Brown Fox"
  .AsParallel().WithDegreeOfParallelism (2)
  .Where (c => !char.IsWhiteSpace (c))
  .AsParallel().WithDegreeOfParallelism (3)   // Forces Merge + Partition
  .Select (c => char.ToUpper (c))

 

Cancellation消除

取消要在foreach循环中使用其结果的PLINQ查询很容易:只需中断foreach即可,因为隐式处理了枚举数,该查询将被自动取消。

 

对于以转换,元素或聚合运算符结尾的查询,可以通过取消标记从另一个线程中取消它。若要插入令牌,请在调用AsParallel之后传入With CancellationTokenSource对象的Token属性,然后调用WithCancellation。然后,另一个线程可以在令牌源上调用Cancel,这会在查询的使用者上引发OperationCanceledException:

IEnumerable<int> million = Enumerable.Range (3, 1000000);
 
var cancelSource = new CancellationTokenSource(); 
var primeNumberQuery = 
  from n in million.AsParallel().WithCancellation (cancelSource.Token)
  where Enumerable.Range (2, (int) Math.Sqrt (n)).All (i => n % i > 0)
  select n;
 
new Thread (() => {
                    Thread.Sleep (100);      // Cancel query after
                    cancelSource.Cancel();   // 100 milliseconds.
                  }
           ).Start();
try 
{
  // Start query running:
  int[] primes = primeNumberQuery.ToArray();
  // We‘ll never get here because the other thread will cancel us.
}
catch (OperationCanceledException)
{
  Console.WriteLine ("Query canceled");
}

 

PLINQ不会抢先中止线程,因为这样做有危险。取而代之的是,取消操作会等待每个工作线程以其当前元素结束,然后再结束查询。这意味着查询调用的任何外部方法都将运行完成。

 

PLINQ优化 

输出端优化

PLINQ的优点之一是可以方便地将并行工作的结果整理到单个输出序列中。不过,有时,您最终使用该序列所做的所有工作都会在每个元素上运行一次功能:

 

foreach (int n in parallelQuery)
  DoSomething (n);

 

如果是这种情况,并且您不关心元素的处理顺序,则可以使用PLINQ的ForAll方法提高效率。

 

ForAll方法在ParallelQuery的每个输出元素上运行委托。它直接链接到PLINQ的内部,而无需进行整理和枚举结果的步骤。举个简单的例子:

"abcdef".AsParallel().Select (c => char.ToUpper(c)).ForAll (Console.Write);

技术分享图片

 

 

 

 

整理和枚举结果并不是一项昂贵的操作,因此,当有大量快速执行的输入元素时,ForAll优化可带来最大的收益。

 

输入端优化

PLINQ具有三种用于将输入元素分配给线程的分区策略:

策略

元素分配

相对性能

块分区

Dynamic

普通

范围分区

Static

较好

哈希分区

Static

对于需要比较元素(GroupBy,Join,GroupJoin,Intersect,Except,Union和Distinct)的查询运算符,您别无选择:PLINQ始终使用哈希分区。哈希分区的效率相对较低,因为它必须预先计算每个元素的哈希码(以便具有相同哈希码的元素可以在同一线程上进行处理)。如果发现此速度太慢,则唯一的选择是调用AsSequential以禁用并行化。

 

对于所有其他查询运算符,您可以选择使用范围分区还是块分区。默认情况下:

 

  • 如果输入序列是可索引的(如果它是数组或实现IList <T>),则PLINQ选择范围分区。
  • 否则,PLINQ选择块分区。

简而言之,范围较长的序列可以更快地进行分区,对于每个序列,每个元素都需要花费相似的CPU时间来处理。否则,块分区通常会更快。

 

强制范围划分:

 

  • 如果查询以Enumerable.Range开头,则将后者替换为ParallelEnumerable.Range。
  • 否则,只需在输入序列上调用ToList或ToArray(显然,这本身会导致性能损失,您应该考虑在内)。

ParallelEnumerable.Range不仅仅是调用Enumerable.Range(...)。AsParallel()的快捷方式。它通过激活范围分区来更改查询的性能。

 

要强制进行块分区,请将输入序列包装在对Partitioner.Create(在System.Collection.Concurrent中)的调用中,如下所示:

int[] numbers = { 3, 4, 5, 6, 7, 8, 9 };
var parallelQuery =
  Partitioner.Create (numbers, true).AsParallel()
  .Where (...)

 

Partitioner.Create的第二个参数指示您要对查询进行负载平衡,这是您希望对块进行分区的另一种方式。

 

块分区的工作原理是让每个工作线程定期从输入序列中抓取元素的小“块”以进行处理。 PLINQ首先分配非常小的块(一次分配一个或两个元素),然后随着查询的进行增加块的大小:这可确保小序列有效地并行化,大序列不会引起过多的往返。如果一个工人碰巧(容易处理)获得“简单”元素,那么最终将得到更多的块。该系统使每个线程保持同等繁忙(内核处于“平衡”状态);唯一的缺点是从共享输入序列中获取元素需要同步(通常是互斥锁),这可能会导致一些开销和争用。

 

 技术分享图片

 

 

 

范围分区绕过常规的输入端枚举,并为每个工作程序预分配了相等数量的元素,从而避免了对输入序列的争用。但是,如果某些线程碰巧获得了轻松的元素并尽早完成,那么它们将处于空闲状态,而其余线程则继续工作。我们较早的素数计算器在范围划分上可能会表现不佳。范围分区最适合的示例是计算前一千万个整数的平方根之和:

ParallelEnumerable.Range (1, 10000000).Sum (i => Math.Sqrt (i))

 

ParallelEnumerable.Range返回ParallelQuery <T>,因此您无需随后调用AsParallel。

 

范围分区并不一定要在连续的块中分配元素范围,而是可以选择“分段”策略。例如,如果有两个工作程序,则一个工作程序可能处理奇数元素,而另一个工作程序则处理偶数元素。 TakeWhile运算符几乎可以肯定会触发条带化策略,以避免在序列的后面不必要地处理元素

并行自定义聚合 (Parallelizing Custom Aggregations)

PLINQ无需额外干预即可有效地并行化Sum,Average,Min和Max运算符。但是,聚合运算符对PLINQ提出了特殊的挑战。

 

如果您不熟悉此运算符,则可以将Aggregate视为Sum,Average,Min和Max的通用版本-换句话说,该运算符可让您插入自定义的累积算法来实现异常聚合。以下内容演示了聚合如何完成Sum的工作

int[] numbers = { 2, 3, 4 };
int sum = numbers.Aggregate (0, (total, n) => total + n);   // 9

 

他对聚合的第一个争论是种子,从种子开始积累。第二个参数是一个表达式,用于在给定新元素的情况下更新累计值。您可以选择提供第三个参数,以根据累加值投影最终结果值。

 

使用foreach循环和更熟悉的语法可以轻松解决针对Aggregate设计的大多数问题。聚合的优点恰恰是大型或复杂的聚合可以使用PLINQ声明性地并行化。

 

非种子聚合

您可以在调用Aggregate时忽略种子值,在这种情况下,第一个元素成为隐式种子,而聚合从第二个元素开始。这是前面的示例,没有种子:

int[] numbers = { 1, 2, 3 };
int sum = numbers.Aggregate ((total, n) => total + n);   // 6

 

这样得出的结果与以前相同,但是实际上我们在做不同的计算。之前,我们计算的是0 + 1 + 2 + 3;现在我们计算1 + 2 + 3。我们可以通过乘而不是加来更好地说明差异:

int[] numbers = { 1, 2, 3 };
int x = numbers.Aggregate (0, (prod, n) => prod * n);   // 0*1*2*3 = 0
int y = numbers.Aggregate (   (prod, n) => prod * n);   //   1*2*3 = 6

 

稍后我们将看到,无种子聚合具有可并行化的优点,而无需使用特殊的重载。但是,存在一个带有非种子聚合的陷阱:非种子聚合方法旨在用于可交换和关联的委托。如果以其他方式使用,结果要么是不直观的(对于普通查询),要么是不确定的(在将查询与PLINQ并行化的情况下)。例如,考虑以下功能:

 

(total, n) => total + n * n

这既不是可交换的也不是关联的。 (例如1 + 2 * 2!= 2 + 1 * 1)。让我们看看使用它求和数字2、3和4的平方时会发生什么:

 

int[] numbers = { 2,3,4 };
int sum = numbers.Aggregate ((total, n) => total + n * n);    // 27

代替计算:

 

2*2 + 3*3 + 4*4    // 29

它计算:

 

2 + 3*3 + 4*4      // 27

我们可以通过多种方式解决此问题。首先,我们可以将0作为第一个元素:

 

int[] numbers = { 0,2,3,4 };

这不仅不雅致,而且如果进行并行化,它仍然会给出不正确的结果-因为PLINQ通过选择多个元素作为种子来利用该函数假定的关联性。为了说明,如果我们将聚合函数表示如下:

 

f(total, n) => total + n * n

然后LINQ to Objects会计算得出:

 

f(f(f(0,2),3),4)

而PLINQ可以这样做:

 

f(f(0,2),f(3,4))

结果如下:

 

第一个分区:a = 0 + 2 * 2(= 4)

第二分区:b = 3 + 4 * 4(= 19)

最终结果:a + b * b(= 365)

甚至偶数:b + a * a(= 35)

有两个好的解决方案。首先是将其转换为种子聚合-种子为零。唯一的麻烦是,使用PLINQ,我们需要使用特殊的重载,才能使查询不按顺序执行(我们将很快看到)。

 

第二种解决方案是重构查询,以使聚合函数可交换和关联:

 

int sum = numbers.Select (n => n * n).Aggregate ((total, n) => total + n);

当然,在这种简单的情况下,您可以(并且应该)使用Sum运算符而不是Aggregate:

 

int sum = numbers.Sum (n => n * n);

实际上,仅求和与平均值就可以走得很远。例如,您可以使用“平均值”来计算均方根:

 

Math.Sqrt (numbers.Average (n => n * n))

甚至标准差:

 

double mean = numbers.Average();
double sdev = Math.Sqrt (numbers.Average (n =>
              {
                double dif = n - mean;
                return dif * dif;
              }));

 

两者都是安全,高效且完全可并行化的。

并行聚集

我们刚刚看到,对于非种子聚合,提供的委托必须是关联的和可交换的。如果违反此规则,PLINQ将给出错误的结果,因为它会从输入序列中提取多个种子,以便同时聚合该序列的多个分区。

 

对于PLINQ,显式种子聚合似乎是一个安全的选择,但不幸的是,由于依赖于单个种子,因此这些种子通常按顺序执行。为了减轻这种情况,PLINQ提供了另一个Aggregate重载,可让您指定多个种子-或更确切地说,是种子工厂函数。对于每个线程,它执行此功能以生成单独的种子,该种子成为线程本地的累加器,在其中将元素本地聚合到其中。

 

您还必须提供一个函数来指示如何组合本地和主要累加器。最后,此Aggregate重载(某种程度上是免费的)期望委托对结果执行任何最终转换(您可以通过在结果上自己运行一些函数来轻松实现此最终转换)。因此,这是四个代表,按照传递的顺序排列:

 

seedFactory

 

返回一个新的本地累加器

 

updateAccumulatorFunc

 

将元素聚合到本地累加器中

 

CombineAccumulatorFunc

 

将本地累加器与主累加器组合

 

resultSelector

 

将任何最终变换应用于最终结果

 

在简单的情况下,您可以指定种子值而不是种子工厂。当种子是您希望突变的引用类型时,此策略将失败,因为每个线程将共享同一实例。

 

为了给出一个非常简单的示例,以下内容将数字数组中的值相加:

numbers.AsParallel().Aggregate (
  () => 0,                                     // seedFactory
  (localTotal, n) => localTotal + n,           // updateAccumulatorFunc
  (mainTot, localTot) => mainTot + localTot,   // combineAccumulatorFunc
  finalResult => finalResult)                  // resultSelector

 

This example is contrived in that we could get the same answer just as efficiently using simpler approaches (such as an unseeded aggregate, or better, the Sum operator). To give a more realistic example, suppose we wanted to calculate the frequency of each letter in the English alphabet in a given string. A simple sequential solution might look like this:

string text = "Let’s suppose this is a really long string";
var letterFrequencies = new int[26];
foreach (char c in text)
{
  int index = char.ToUpper (c) - ‘A‘;
  if (index >= 0 && index <= 26) letterFrequencies [index]++;
};

An example of when the input text might be very long is in gene sequencing. The “alphabet” would then consist of the letters acg, and t.

To parallelize this, we could replace the foreach statement with a call to Parallel.ForEach (as we’ll cover in the following section), but this will leave us to deal with concurrency issues on the shared array. And locking around accessing that array would all but kill the potential for parallelization.

Aggregate offers a tidy solution. The accumulator, in this case, is an array just like the letterFrequencies array in our preceding example. Here’s a sequential version using Aggregate:

int[] result =
  text.Aggregate (
    new int[26],                // Create the "accumulator"
    (letterFrequencies, c) =>   // Aggregate a letter into the accumulator
    {
      int index = char.ToUpper (c) - ‘A‘;
      if (index >= 0 && index <= 26) letterFrequencies [index]++;
      return letterFrequencies;
    });

And now the parallel version, using PLINQ’s special overload:

int[] result =
  text.AsParallel().Aggregate (
    () => new int[26],             // Create a new local accumulator
 
    (localFrequencies, c) =>       // Aggregate into the local accumulator
    {
      int index = char.ToUpper (c) - ‘A‘;
      if (index >= 0 && index <= 26) localFrequencies [index]++;
      return localFrequencies;
    },
                                   // Aggregate local->main accumulator
    (mainFreq, localFreq) =>
      mainFreq.Zip (localFreq, (f1, f2) => f1 + f2).ToArray(),
 
    finalResult => finalResult     // Perform any final transformation
  );                               // on the end result.

Notice that the local accumulation function mutates the localFrequencies array. This ability to perform this optimization is important — and is legitimate because localFrequencies is local to each thread.

The Parallel Class

PFX provides a basic form of structured parallelism via three static methods in the Parallel class:

Parallel.Invoke

Executes an array of delegates in parallel

Parallel.For

Performs the parallel equivalent of a C# for loop

Parallel.ForEach

Performs the parallel equivalent of a C# foreach loop

All three methods block until all work is complete. As with PLINQ, after an unhandled exception, remaining workers are stopped after their current iteration and the exception (or exceptions) are thrown back to the caller — wrapped in an AggregateException.

Parallel.Invoke

Parallel.Invoke executes an array of Action delegates in parallel, and then waits for them to complete. The simplest version of the method is defined as follows:

public static void Invoke (params Action[] actions);

Here’s how we can use Parallel.Invoke to download two web pages at once:

Parallel.Invoke (
 () => new WebClient().DownloadFile ("http://www.linqpad.net", "lp.html"),
 () => new WebClient().DownloadFile ("http://www.jaoo.dk", "jaoo.html"));

On the surface, this seems like a convenient shortcut for creating and waiting on two Task objects (or asynchronous delegates). But there’s an important difference: Parallel.Invoke still works efficiently if you pass in an array of a million delegates. This is because it partitions large numbers of elements into batches which it assigns to a handful of underlying Tasks — rather than creating a separate Task for each delegate.

As with all of Parallel’s methods, you’re on your own when it comes to collating the results. This means you need to keep thread safety in mind. The following, for instance, is thread-unsafe:

var data = new List<string>();
Parallel.Invoke (
 () => data.Add (new WebClient().DownloadString ("http://www.foo.com")),
 () => data.Add (new WebClient().DownloadString ("http://www.far.com")));

Locking around adding to the list would resolve this, although locking would create a bottleneck if you had a much larger array of quickly executing delegates. A better solution is to use a thread-safe collection such as ConcurrentBag would be ideal in this case.

Parallel.Invoke is also overloaded to accept a ParallelOptions object:

public static void Invoke (ParallelOptions options,
                           params Action[] actions);

With ParallelOptions, you can insert a cancellation token, limit the maximum concurrency, and specify a custom task scheduler. A cancellation token is relevant when you’re executing (roughly) more tasks than you have cores: upon cancellation, any unstarted delegates will be abandoned. Any already-executing delegates will, however, continue to completion. See Cancellation for an example of how to use cancellation tokens.

Parallel.For and Parallel.ForEach

Parallel.For and Parallel.ForEach perform the equivalent of a C# for and foreach loop, but with each iteration executing in parallel instead of sequentially. Here are their (simplest) signatures:

public static ParallelLoopResult For (
  int fromInclusive, int toExclusive, Action<int> body)
 
public static ParallelLoopResult ForEach<TSource> (
  IEnumerable<TSource> source, Action<TSource> body)

The following sequential for loop:

for (int i = 0; i < 100; i++)
  Foo (i);

is parallelized like this:

Parallel.For (0, 100, i => Foo (i));

or more simply:

Parallel.For (0, 100, Foo);

And the following sequential foreach:

foreach (char c in "Hello, world")
  Foo (c);

is parallelized like this:

Parallel.ForEach ("Hello, world", Foo);

To give a practical example, if we import the System.Security.Cryptography namespace, we can generate six public/private key-pair strings in parallel as follows:

var keyPairs = new string[6];
 
Parallel.For (0, keyPairs.Length,
              i => keyPairs[i] = RSA.Create().ToXmlString (true));

As with Parallel.Invoke, we can feed Parallel.For and Parallel.ForEach a large number of work items and they’ll be efficiently partitioned onto a few tasks.

The latter query could also be done with PLINQ:

string[] keyPairs =
  ParallelEnumerable.Range (0, 6)
  .Select (i => RSA.Create().ToXmlString (true))
  .ToArray();

Outer versus inner loops

Parallel.For and Parallel.ForEach usually work best on outer rather than inner loops. This is because with the former, you’re offering larger chunks of work to parallelize, diluting the management overhead. Parallelizing both inner and outer loops is usually unnecessary. In the following example, we’d typically need more than 100 cores to benefit from the inner parallelization:

Parallel.For (0, 100, i =>
{
  Parallel.For (0, 50, j => Foo (i, j));   // Sequential would be better
});                                        // for the inner loop.

Indexed Parallel.ForEach

Sometimes it’s useful to know the loop iteration index. With a sequential foreach, it’s easy:

int i = 0;
foreach (char c in "Hello, world")
  Console.WriteLine (c.ToString() + i++);

Incrementing a shared variable, however, is not thread-safe in a parallel context. You must instead use the following version of ForEach:

public static ParallelLoopResult ForEach<TSource> (
  IEnumerable<TSource> source, Action<TSource,ParallelLoopState,long> body)

We’ll ignore ParallelLoopState (which we’ll cover in the following section). For now, we’re interested in Action’s third type parameter of type long, which indicates the loop index:

Parallel.ForEach ("Hello, world", (c, state, i) =>
{
   Console.WriteLine (c.ToString() + i);
});

To put this into a practical context, we’ll revisit the spellchecker that we wrote with PLINQ. The following code loads up a dictionary along with an array of a million words to test:

if (!File.Exists ("WordLookup.txt"))    // Contains about 150,000 words
  new WebClient().DownloadFile (
    "http://www.albahari.com/ispell/allwords.txt", "WordLookup.txt");
 
var wordLookup = new HashSet<string> (
  File.ReadAllLines ("WordLookup.txt"),
  StringComparer.InvariantCultureIgnoreCase);
 
var random = new Random();
string[] wordList = wordLookup.ToArray();
 
string[] wordsToTest = Enumerable.Range (0, 1000000)
  .Select (i => wordList [random.Next (0, wordList.Length)])
  .ToArray();
 
wordsToTest [12345] = "woozsh";     // Introduce a couple
wordsToTest [23456] = "wubsie";     // of spelling mistakes.

We can perform the spellcheck on our wordsToTest array using the indexed version of Parallel.ForEach as follows:

var misspellings = new ConcurrentBag<Tuple<int,string>>();
 
Parallel.ForEach (wordsToTest, (word, state, i) =>
{
  if (!wordLookup.Contains (word))
    misspellings.Add (Tuple.Create ((int) i, word));
});

Notice that we had to collate the results into a thread-safe collection: having to do this is the disadvantage when compared to using PLINQ. The advantage over PLINQ is that we avoid the cost of applying an indexed Select query operator — which is less efficient than an indexed ForEach.

ParallelLoopState: Breaking early out of loops

Because the loop body in a parallel For or ForEach is a delegate, you can’t exit the loop early with a break statement. Instead, you must call Break or Stop on a ParallelLoopState object:

public class ParallelLoopState
{
  public void Break();
  public void Stop();
 
  public bool IsExceptional { get; }
  public bool IsStopped { get; }
  public long? LowestBreakIteration { get; }
  public bool ShouldExitCurrentIteration { get; }
}

Obtaining a ParallelLoopState is easy: all versions of For and ForEach are overloaded to accept loop bodies of type Action<TSource,ParallelLoopState>. So, to parallelize this:

foreach (char c in "Hello, world")
  if (c == ‘,‘)
    break;
  else
    Console.Write (c);

do this:

Parallel.ForEach ("Hello, world", (c, loopState) =>
{
  if (c == ‘,‘)
    loopState.Break();
  else
    Console.Write (c);
});
Hlloe

You can see from the output that loop bodies may complete in a random order. Aside from this difference, calling Break yields at least the same elements as executing the loop sequentially: this example will always output at least the letters Hell, and o in some order. In contrast, calling Stop instead of Break forces all threads to finish right after their current iteration. In our example, calling Stop could give us a subset of the letters Hell, and o if another thread was lagging behind. Calling Stop is useful when you’ve found something that you’re looking for — or when something has gone wrong and you won’t be looking at the results.

The Parallel.For and Parallel.ForEach methods return a ParallelLoopResult object that exposes properties called IsCompleted and LowestBreakIteration. These tell you whether the loop ran to completion, and if not, at what cycle the loop was broken.

If LowestBreakIteration returns null, it means that you called Stop (rather than Break) on the loop.

If your loop body is long, you might want other threads to break partway through the method body in case of an early Break or Stop. You can do this by polling the ShouldExitCurrentIteration property at various places in your code; this property becomes true immediately after a Stop — or soon after a Break.

ShouldExitCurrentIteration also becomes true after a cancellation request — or if an exception is thrown in the loop.

IsExceptional lets you know whether an exception has occurred on another thread. Any unhandled exception will cause the loop to stop after each thread’s current iteration: to avoid this, you must explicitly handle exceptions in your code.

Optimization with local values

Parallel.For and Parallel.ForEach each offer a set of overloads that feature a generic type argument called TLocal. These overloads are designed to help you optimize the collation of data with iteration-intensive loops. The simplest is this:

public static ParallelLoopResult For <TLocal> (
  int fromInclusive,
  int toExclusive,
  Func <TLocal> localInit,  Func <int, ParallelLoopState, TLocal, TLocal> body,
  Action <TLocal> localFinally);

These methods are rarely needed in practice because their target scenarios are covered mostly by PLINQ (which is fortunate because these overloads are somewhat intimidating!).

Essentially, the problem is this: suppose we want to sum the square roots of the numbers 1 through 10,000,000. Calculating 10 million square roots is easily parallelizable, but summing their values is troublesome because we must lock around updating the total:

object locker = new object();
double total = 0;
Parallel.For (1, 10000000,
              i => { lock (locker) total += Math.Sqrt (i); });

The gain from parallelization is more than offset by the cost of obtaining 10 million locks — plus the resultant blocking.

The reality, though, is that we don’t actually need 10 million locks. Imagine a team of volunteers picking up a large volume of litter. If all workers shared a single trash can, the travel and contention would make the process extremely inefficient. The obvious solution is for each worker to have a private or “local” trash can, which is occasionally emptied into the main bin.

The TLocal versions of For and ForEach work in exactly this way. The volunteers are internal worker threads, and the local value represents a local trash can. In order for Parallel to do this job, you must feed it two additional delegates that indicate:

  1. How to initialize a new local value
  2. How to combine a local aggregation with the master value

Additionally, instead of the body delegate returning void, it should return the new aggregate for the local value. Here’s our example refactored:

object locker = new object();
double grandTotal = 0;
 
Parallel.For (1, 10000000,
 
  () => 0.0,                        // Initialize the local value.
 
  (i, state, localTotal) =>         // Body delegate. Notice that it
     localTotal + Math.Sqrt (i),    // returns the new local total.
 
  localTotal =>                                    // Add the local value
    { lock (locker) grandTotal += localTotal; }    // to the master value.
);

We must still lock, but only around aggregating the local value to the grand total. This makes the process dramatically more efficient.

As stated earlier, PLINQ is often a good fit in these scenarios. Our example could be parallelized with PLINQ simply like this:

ParallelEnumerable.Range(1, 10000000)
                  .Sum (i => Math.Sqrt (i))

(Notice that we used ParallelEnumerable to force range partitioning: this improves performance in this case because all numbers will take equally long to process.)

In more complex scenarios, you might use LINQ’s Aggregate operator instead of Sum. If you supplied a local seed factory, the situation would be somewhat analogous to providing a local value function with Parallel.For.

Task Parallelism

Task parallelism is the lowest-level approach to parallelization with PFX. The classes for working at this level are defined in the System.Threading.Tasks namespace and comprise the following:

Class

Purpose

Task

For managing a unit for work

Task<TResult>

For managing a unit for work with a return value

TaskFactory

For creating tasks

TaskFactory<TResult>

For creating tasks and continuations with the same return type

TaskScheduler

For managing the scheduling of tasks

TaskCompletionSource

For manually controlling a task’s workflow

Essentially, a task is a lightweight object for managing a parallelizable unit of work. A task avoids the overhead of starting a dedicated thread by using the CLR’s thread pool: this is the same thread pool used by ThreadPool.QueueUserWorkItem, tweaked in CLR 4.0 to work more efficiently with Tasks (and more efficiently in general).

Tasks can be used whenever you want to execute something in parallel. However, they’re tuned for leveraging multicores: in fact, the Parallel class and PLINQ are internally built on the task parallelism constructs.

Tasks do more than just provide an easy and efficient way into the thread pool. They also provide some powerful features for managing units of work, including the ability to:

Tasks also implement local work queues, an optimization that allows you to efficiently create many quickly executing child tasks without incurring the contention overhead that would otherwise arise with a single work queue.

The Task Parallel Library lets you create hundreds (or even thousands) of tasks with minimal overhead. But if you want to create millions of tasks, you’ll need to partition those tasks into larger work units to maintain efficiency. The Parallel class and PLINQ do this automatically.

Visual Studio 2010 provides a new window for monitoring tasks (Debug | Window | Parallel Tasks). This is equivalent to the Threads window, but for tasks. The Parallel Stacks window also has a special mode for tasks.

Creating and Starting Tasks

As we described in Part 1 in our discussion of thread pooling, you can create and start a Task by calling Task.Factory.StartNew, passing in an Action delegate:

Task.Factory.StartNew (() => Console.WriteLine ("Hello from a task!"));

The generic version, Task<TResult> (a subclass of Task), lets you get data back from a task upon completion:

Task<string> task = Task.Factory.StartNew<string> (() =>    // Begin task
{
  using (var wc = new System.Net.WebClient())
    return wc.DownloadString ("http://www.linqpad.net");
});
 
RunSomeOtherMethod();         // We can do other work in parallel...
 
string result = task.Result;  // Wait for task to finish and fetch result.

Task.Factory.StartNew creates and starts a task in one step. You can decouple these operations by first instantiating a Task object, and then calling Start:

var task = new Task (() => Console.Write ("Hello"));
...
task.Start();

A task that you create in this manner can also be run synchronously (on the same thread) by calling RunSynchronously instead of Start.

You can track a task’s execution status via its Status property.

Specifying a state object

When instantiating a task or calling Task.Factory.StartNew, you can specify a state object, which is passed to the target method. This is useful should you want to call a method directly rather than using a lambda expression:

static void Main()
{
  var task = Task.Factory.StartNew (Greet, "Hello");
  task.Wait();  // Wait for task to complete.
}
 
static void Greet (object state) { Console.Write (state); }   // Hello

Given that we have lambda expressions in C#, we can put the state object to better use, which is to assign a meaningful name to the task. We can then use the AsyncState property to query its name:

static void Main()
{
  var task = Task.Factory.StartNew (state => Greet ("Hello"), "Greeting");
  Console.WriteLine (task.AsyncState);   // Greeting
  task.Wait();
}
 
static void Greet (string message) { Console.Write (message); }

Visual Studio displays each task’s AsyncState in the Parallel Tasks window, so having a meaningful name here can ease debugging considerably.

TaskCreationOptions

You can tune a task’s execution by specifying a TaskCreationOptions enum when calling StartNew (or instantiating a Task). TaskCreationOptions is a flags enum with the following (combinable) values:

LongRunning
PreferFairness
AttachedToParent

LongRunning suggests to the scheduler to dedicate a thread to the task. This is beneficial for long-running tasks because they might otherwise “hog” the queue, and force short-running tasks to wait an unreasonable amount of time before being scheduled. LongRunning is also good for blocking tasks.

The task queuing problem arises because the task scheduler ordinarily tries to keep just enough tasks active on threads at once to keep each CPU core busy. Not oversubscribing the CPU with too many active threads avoids the degradation in performance that would occur if the operating system was forced to perform a lot of expensive time slicing and context switching.

PreferFairness tells the scheduler to try to ensure that tasks are scheduled in the order they were started. It may ordinarily do otherwise, because it internally optimizes the scheduling of tasks using local work-stealing queues. This optimization is of practical benefit with very small (fine-grained) tasks.

AttachedToParent is for creating child tasks.

Child tasks

When one task starts another, you can optionally establish a parent-child relationship by specifying TaskCreationOptions.AttachedToParent:

Task parent = Task.Factory.StartNew (() =>
{
  Console.WriteLine ("I am a parent");
 
  Task.Factory.StartNew (() =>        // Detached task
  {
    Console.WriteLine ("I am detached");
  });
 
  Task.Factory.StartNew (() =>        // Child task
  {
    Console.WriteLine ("I am a child");
  }, TaskCreationOptions.AttachedToParent);
});

A child task is special in that when you wait for the parent task to complete, it waits for any children as well. This can be particularly useful when a child task is a continuation, as we’ll see shortly.

Waiting on Tasks

You can explicitly wait for a task to complete in two ways:

  • Calling its Wait method (optionally with a timeout)
  • Accessing its Result property (in the case of Task<TResult>)

You can also wait on multiple tasks at once — via the static methods Task.WaitAll (waits for all the specified tasks to finish) and Task.WaitAny (waits for just one task to finish).

WaitAll is similar to waiting out each task in turn, but is more efficient in that it requires (at most) just one context switch. Also, if one or more of the tasks throw an unhandled exception, WaitAll still waits out every task — and then rethrows a single AggregateException that accumulates the exceptions from each faulted task. It’s equivalent to doing this:

// Assume t1, t2 and t3 are tasks:
var exceptions = new List<Exception>();
try { t1.Wait(); } catch (AggregateException ex) { exceptions.Add (ex); }
try { t2.Wait(); } catch (AggregateException ex) { exceptions.Add (ex); }
try { t3.Wait(); } catch (AggregateException ex) { exceptions.Add (ex); }
if (exceptions.Count > 0) throw new AggregateException (exceptions);

Calling WaitAny is equivalent to waiting on a ManualResetEventSlim that’s signaled by each task as it finishes.

As well as a timeout, you can also pass in a cancellation token to the Wait methods: this lets you cancel the wait — not the task itself.

Exception-Handling Tasks

When you wait for a task to complete (by calling its Wait method or accessing its Result property), any unhandled exceptions are conveniently rethrown to the caller, wrapped in an AggregateException object. This usually avoids the need to write code within task blocks to handle unexpected exceptions; instead we can do this:

int x = 0;
Task<int> calc = Task.Factory.StartNew (() => 7 / x);
try
{
  Console.WriteLine (calc.Result);
}
catch (AggregateException aex)
{
  Console.Write (aex.InnerException.Message);  // Attempted to divide by 0
}

You still need to exception-handle detached autonomous tasks (unparented tasks that are not waited upon) in order to prevent an unhandled exception taking down the application when the task drops out of scope and is garbage-collected (subject to the following note). The same applies for tasks waited upon with a timeout, because any exception thrown after the timeout interval will otherwise be “unhandled.”

The static TaskScheduler.UnobservedTaskException event provides a final last resort for dealing with unhandled task exceptions. By handling this event, you can intercept task exceptions that would otherwise end the application — and provide your own logic for dealing with them.

For parented tasks, waiting on the parent implicitly waits on the children — and any child exceptions then bubble up:

TaskCreationOptions atp = TaskCreationOptions.AttachedToParent;
var parent = Task.Factory.StartNew (() => 
{
  Task.Factory.StartNew (() =>   // Child
  {
    Task.Factory.StartNew (() => { throw null; }, atp);   // Grandchild
  }, atp);
});
 
// The following call throws a NullReferenceException (wrapped
// in nested AggregateExceptions):
parent.Wait();

Interestingly, if you check a task’s Exception property after it has thrown an exception, the act of reading that property will prevent the exception from subsequently taking down your application. The rationale is that PFX’s designers don’t want you ignoring exceptions — as long as you acknowledge them in some way, they won’t punish you by terminating your program.

An unhandled exception on a task doesn’t cause immediate application termination: instead, it’s delayed until the garbage collector catches up with the task and calls its finalizer. Termination is delayed because it can’t be known for certain that you don’t plan to call Wait or check its Result or Exception property until the task is garbage-collected. This delay can sometimes mislead you as to the original source of the error (although Visual Studio’s debugger can assist if you enable breaking on first-chance exceptions).

As we’ll see soon, an alternative strategy for dealing with exceptions is with continuations.

Canceling Tasks

You can optionally pass in a cancellation token when starting a task. This lets you cancel tasks via the cooperative cancellation pattern described previously:

var cancelSource = new CancellationTokenSource();
CancellationToken token = cancelSource.Token;
 
Task task = Task.Factory.StartNew (() => 
{
  // Do some stuff...
  token.ThrowIfCancellationRequested();  // Check for cancellation request
  // Do some stuff...
}, token);
...
cancelSource.Cancel();

To detect a canceled task, catch an AggregateException and check the inner exception as follows:

try 
{
  task.Wait();
}
catch (AggregateException ex)
{
  if (ex.InnerException is OperationCanceledException)
    Console.Write ("Task canceled!");
}

If you want to explicitly throw an OperationCanceledException (rather than calling token.ThrowIfCancellationRequested), you must pass the cancellation token into OperationCanceledException’s constructor. If you fail to do this, the task won’t end up with a TaskStatus.Canceled status and won’t trigger OnlyOnCanceled continuations.

If the task is canceled before it has started, it won’t get scheduled — an OperationCanceledException will instead be thrown on the task immediately.

Because cancellation tokens are recognized by other APIs, you can pass them into other constructs and cancellations will propagate seamlessly:

var cancelSource = new CancellationTokenSource();
CancellationToken token = cancelSource.Token;
 
Task task = Task.Factory.StartNew (() =>
{
  // Pass our cancellation token into a PLINQ query:
  var query = someSequence.AsParallel().WithCancellation (token)...
  ... enumeratequery ...
});

Calling Cancel on cancelSource in this example will cancel the PLINQ query, which will throw an OperationCanceledException on the task body, which will then cancel the task.

The cancellation tokens that you can pass into methods such as Wait and CancelAndWait allow you to cancel the wait operation and not the task itself.

Continuations

Sometimes it’s useful to start a task right after another one completes (or fails). The ContinueWith method on the Task class does exactly this:

Task task1 = Task.Factory.StartNew (() => Console.Write ("antecedant.."));
Task task2 = task1.ContinueWith (ant => Console.Write ("..continuation"));

As soon as task1 (the antecedent) finishes, fails, or is canceled, task2 (the continuation) automatically starts. (If task1 had completed before the second line of code ran, task2 would be scheduled to execute right away.) The ant argument passed to the continuation’s lambda expression is a reference to the antecedent task.

Our example demonstrated the simplest kind of continuation, and is functionally similar to the following:

Task task = Task.Factory.StartNew (() =>
{
  Console.Write ("antecedent..");
  Console.Write ("..continuation");
});

The continuation-based approach, however, is more flexible in that you could first wait on task1, and then later wait on task2. This is particularly useful if task1 returns data.

Another (subtler) difference is that by default, antecedent and continuation tasks may execute on different threads. You can force them to execute on the same thread by specifying TaskContinuationOptions.ExecuteSynchronously when calling ContinueWith: this can improve performance in very fine-grained continuations by lessening indirection.

Continuations and Task<TResult>

Just like ordinary tasks, continuations can be of type Task<TResult> and return data. In the following example, we calculate Math.Sqrt(8*2) using a series of chained tasks and then write out the result:

Task.Factory.StartNew<int> (() => 8)
  .ContinueWith (ant => ant.Result * 2)
  .ContinueWith (ant => Math.Sqrt (ant.Result))
  .ContinueWith (ant => Console.WriteLine (ant.Result));   // 4

Our example is somewhat contrived for simplicity; in real life, these lambda expressions would call computationally intensive functions.

Continuations and exceptions

A continuation can find out if an exception was thrown by the antecedent via the antecedent task’s Exception property. The following writes the details of a NullReferenceException to the console:

Task task1 = Task.Factory.StartNew (() => { throw null; });
Task task2 = task1.ContinueWith (ant => Console.Write (ant.Exception));

If an antecedent throws and the continuation fails to query the antecedent’s Exception property (and the antecedent isn’t otherwise waited upon), the exception is considered unhandled and the application dies (unless handled by TaskScheduler.UnobservedTaskException).

A safe pattern is to rethrow antecedent exceptions. As long as the continuation is Waited upon, the exception will be propagated and rethrown to the Waiter:

Task continuation = Task.Factory.StartNew     (()  => { throw null; })
                                .ContinueWith (ant =>
  {
    if (ant.Exception != null) throw ant.Exception;    // Continue processing...
  });
 
continuation.Wait();    // Exception is now thrown back to caller.

Another way to deal with exceptions is to specify different continuations for exceptional versus nonexceptional outcomes. This is done with TaskContinuationOptions:

Task task1 = Task.Factory.StartNew (() => { throw null; });
 
Task error = task1.ContinueWith (ant => Console.Write (ant.Exception),
                                 TaskContinuationOptions.OnlyOnFaulted);
 
Task ok = task1.ContinueWith (ant => Console.Write ("Success!"),
                              TaskContinuationOptions.NotOnFaulted);

This pattern is particularly useful in conjunction with child tasks, as we’ll see very soon.

The following extension method “swallows” a task’s unhandled exceptions:

public static void IgnoreExceptions (this Task task)
{
  task.ContinueWith (t => { var ignore = t.Exception; },
    TaskContinuationOptions.OnlyOnFaulted);
} 

(This could be improved by adding code to log the exception.) Here’s how it would be used:

Task.Factory.StartNew (() => { throw null; }).IgnoreExceptions();

Continuations and child tasks

A powerful feature of continuations is that they kick off only when all child tasks have completed. At that point, any exceptions thrown by the children are marshaled to the continuation.

In the following example, we start three child tasks, each throwing a NullReferenceException. We then catch all of them in one fell swoop via a continuation on the parent:

TaskCreationOptions atp = TaskCreationOptions.AttachedToParent;
Task.Factory.StartNew (() =>
{
  Task.Factory.StartNew (() => { throw null; }, atp);
  Task.Factory.StartNew (() => { throw null; }, atp);
  Task.Factory.StartNew (() => { throw null; }, atp);
})
.ContinueWith (p => Console.WriteLine (p.Exception),
                    TaskContinuationOptions.OnlyOnFaulted);

 

Conditional continuations

By default, a continuation is scheduled unconditionally — whether the antecedent completes, throws an exception, or is canceled. You can alter this behavior via a set of (combinable) flags included within the TaskContinuationOptions enum. The three core flags that control conditional continuation are:

NotOnRanToCompletion = 0x10000,
NotOnFaulted = 0x20000,
NotOnCanceled = 0x40000,

These flags are subtractive in the sense that the more you apply, the less likely the continuation is to execute. For convenience, there are also the following precombined values:

OnlyOnRanToCompletion = NotOnFaulted | NotOnCanceled,
OnlyOnFaulted = NotOnRanToCompletion | NotOnCanceled,
OnlyOnCanceled = NotOnRanToCompletion | NotOnFaulted

(Combining all the Not* flags [NotOnRanToCompletionNotOnFaultedNotOnCanceled] is nonsensical, as it would result in the continuation always being canceled.)

“RanToCompletion” means the antecedent succeeded — without cancellation or unhandled exceptions.

“Faulted” means an unhandled exception was thrown on the antecedent.

“Canceled” means one of two things:

  • The antecedent was canceled via its cancellation token. In other words, an OperationCanceledException was thrown on the antecedent — whose CancellationToken property matched that passed to the antecedent when it was started.
  • The antecedent was implicitly canceled because it didn’t satisfy a conditional continuation predicate.

It’s essential to grasp that when a continuation doesn’t execute by virtue of these flags, the continuation is not forgotten or abandoned — it’s canceled. This means that any continuations on the continuation itself will then run — unless you predicate them with NotOnCanceled. For example, consider this:

Task t1 = Task.Factory.StartNew (...);
 
Task fault = t1.ContinueWith (ant => Console.WriteLine ("fault"),
                              TaskContinuationOptions.OnlyOnFaulted);
 
Task t3 = fault.ContinueWith (ant => Console.WriteLine ("t3"));

As it stands, t3 will always get scheduled — even if t1 doesn’t throw an exception. This is because if t1 succeeds, the fault task will be canceled, and with no continuation restrictions placed on t3t3 will then execute unconditionally.

 

If we want t3 to execute only if fault actually runs, we must instead do this:

Task t3 = fault.ContinueWith (ant => Console.WriteLine ("t3"),
                              TaskContinuationOptions.NotOnCanceled);

(Alternatively, we could specify OnlyOnRanToCompletion; the difference is that t3 would not then execute if an exception was thrown within fault.)

Continuations with multiple antecedents

Another useful feature of continuations is that you can schedule them to execute based on the completion of multiple antecedents. ContinueWhenAll schedules execution when all antecedents have completed; ContinueWhenAny schedules execution when one antecedent completes. Both methods are defined in the TaskFactory class:

var task1 = Task.Factory.StartNew (() => Console.Write ("X"));
var task2 = Task.Factory.StartNew (() => Console.Write ("Y"));
 
var continuation = Task.Factory.ContinueWhenAll (
  new[] { task1, task2 }, tasks => Console.WriteLine ("Done"));

This writes “Done” after writing “XY” or “YX”. The tasks argument in the lambda expression gives you access to the array of completed tasks, which is useful when the antecedents return data. The following example adds together numbers returned from two antecedent tasks:

// task1 and task2 would call complex functions in real life:
Task<int> task1 = Task.Factory.StartNew (() => 123);
Task<int> task2 = Task.Factory.StartNew (() => 456);
 
Task<int> task3 = Task<int>.Factory.ContinueWhenAll (
  new[] { task1, task2 }, tasks => tasks.Sum (t => t.Result));
 
Console.WriteLine (task3.Result);           // 579

We’ve included the <int> type argument in our call to Task.Factory in this example to clarify that we’re obtaining a generic task factory. The type argument is unnecessary, though, as it will be inferred by the compiler.

Multiple continuations on a single antecedent

Calling ContinueWith more than once on the same task creates multiple continuations on a single antecedent. When the antecedent finishes, all continuations will start together (unless you specify TaskContinuationOptions.ExecuteSynchronously, in which case the continuations will execute sequentially).

The following waits for one second, and then writes either “XY” or “YX”:

var t = Task.Factory.StartNew (() => Thread.Sleep (1000));
t.ContinueWith (ant => Console.Write ("X"));
t.ContinueWith (ant => Console.Write ("Y"));

Task Schedulers and UIs

task scheduler allocates tasks to threads. All tasks are associated with a task scheduler, which is represented by the abstract TaskScheduler class. The Framework provides two concrete implementations: the default scheduler that works in tandem with the CLR thread pool, and the synchronization context scheduler. The latter is designed (primarily) to help you with the threading model of WPF and Windows Forms, which requires that UI elements and controls are accessed only from the thread that created them. For example, suppose we wanted to fetch some data from a web service in the background, and then update a WPF label called lblResult with its result. We can divide this into two tasks:

  1. Call a method to get data from the web service (antecedent task).
  2. Update lblResult with the results (continuation task).

If, for a continuation task, we specify the synchronization context scheduler obtained when the window was constructed, we can safely update lblResult:

public partial class MyWindow : Window
{
  TaskScheduler _uiScheduler;   // Declare this as a field so we can use
                                // it throughout our class.
  public MyWindow()
  {    
    InitializeComponent();
 
    // Get the UI scheduler for the thread that created the form:
    _uiScheduler = TaskScheduler.FromCurrentSynchronizationContext();
 
    Task.Factory.StartNew<string> (SomeComplexWebService)
      .ContinueWith (ant => lblResult.Content = ant.Result, _uiScheduler);
  }
 
  string SomeComplexWebService() { ... }
}

It’s also possible to write our own task scheduler (by subclassing TaskScheduler), although this is something you’d do only in very specialized scenarios. For custom scheduling, you’d more commonly use TaskCompletionSource, which we’ll cover soon.

TaskFactory

When you call Task.Factory, you’re calling a static property on Task that returns a default TaskFactory object. The purpose of a task factory is to create tasks — specifically, three kinds of tasks:

  • “Ordinary” tasks (via StartNew)
  • Continuations with multiple antecedents (via ContinueWhenAll and ContinueWhenAny)
  • Tasks that wrap methods that follow the asynchronous programming model (via FromAsync)

Interestingly, TaskFactory is the only way to achieve the latter two goals. In the case of StartNewTaskFactory is purely a convenience and technically redundant in that you can simply instantiate Task objects and call Start on them.

Creating your own task factories

TaskFactory is not an abstract factory: you can actually instantiate the class, and this is useful when you want to repeatedly create tasks using the same (nonstandard) values for TaskCreationOptionsTaskContinuationOptions, or TaskScheduler. For example, if we wanted to repeatedly create long-running parented tasks, we could create a custom factory as follows:

var factory = new TaskFactory (
  TaskCreationOptions.LongRunning | TaskCreationOptions.AttachedToParent,
  TaskContinuationOptions.None);

Creating tasks is then simply a matter of calling StartNew on the factory:

Task task1 = factory.StartNew (Method1);
Task task2 = factory.StartNew (Method2);
...

The custom continuation options are applied when calling ContinueWhenAll and ContinueWhenAny.

TaskCompletionSource

The Task class achieves two distinct things:

  • It schedules a delegate to run on a pooled thread.
  • It offers a rich set of features for managing work items (continuations, child tasks, exception marshaling, etc.).

Interestingly, these two things are not joined at the hip: you can leverage a task’s features for managing work items without scheduling anything to run on the thread pool. The class that enables this pattern of use is called TaskCompletionSource.

To use TaskCompletionSource you simply instantiate the class. It exposes a Task property that returns a task upon which you can wait and attach continuations—just like any other task. The task, however, is entirely controlled by the TaskCompletionSource object via the following methods:

public class TaskCompletionSource<TResult>
{
  public void SetResult (TResult result);
  public void SetException (Exception exception);
  public void SetCanceled();
 
  public bool TrySetResult (TResult result);
  public bool TrySetException (Exception exception);
  public bool TrySetCanceled();
  ...
}

If called more than once, SetResultSetException, or SetCanceled throws an exception; the Try* methods instead return false.

TResult corresponds to the task’s result type, so TaskCompletionSource<int> gives you a Task<int>. If you want a task with no result, create a TaskCompletionSource of object and pass in null when calling SetResult. You can then cast the Task<object> to Task.

The following example prints 123 after waiting for five seconds:

var source = new TaskCompletionSource<int>();
 
new Thread (() => { Thread.Sleep (5000); source.SetResult (123); })
  .Start();
 
Task<int> task = source.Task;      // Our "slave" task.
Console.WriteLine (task.Result);   // 123 

Later on, we‘ll show how BlockingCollection can be used to write a producer/consumer queue. We then demonstrate how TaskCompletionSource improves the solution by allowing queued work items to be waited upon and canceled.

Working with AggregateException

As we’ve seen, PLINQ, the Parallel class, and Tasks automatically marshal exceptions to the consumer. To see why this is essential, consider the following LINQ query, which throws a DivideByZeroException on the first iteration:

try
{
  var query = from i in Enumerable.Range (0, 1000000)
              select 100 / i;
  ...
}
catch (DivideByZeroException)
{
  ...
}

If we asked PLINQ to parallelize this query and it ignored the handling of exceptions, a DivideByZeroException would probably be thrown on a separate thread, bypassing our catch block and causing the application to die.

Hence, exceptions are automatically caught and rethrown to the caller. But unfortunately, it’s not quite as simple as catching a DivideByZeroException. Because these libraries leverage many threads, it’s actually possible for two or more exceptions to be thrown simultaneously. To ensure that all exceptions are reported, exceptions are therefore wrapped in an AggregateException container, which exposes an InnerExceptions property containing each of the caught exception(s):

try
{
  var query = from i in ParallelEnumerable.Range (0, 1000000)
              select 100 / i;
  // Enumerate query
  ...
}
catch (AggregateException aex)
{
  foreach (Exception ex in aex.InnerExceptions)
    Console.WriteLine (ex.Message);
}

Both PLINQ and the Parallel class end the query or loop execution upon encountering the first exception — by not processing any further elements or loop bodies. More exceptions might be thrown, however, before the current cycle is complete. The first exception in AggregateException is visible in the InnerException property.

Flatten and Handle

The AggregateException class provides a couple of methods to simplify exception handling: Flatten and Handle.

Flatten

AggregateExceptions will quite often contain other AggregateExceptions. An example of when this might happen is if a child task throws an exception. You can eliminate any level of nesting to simplify handling by calling Flatten. This method returns a new AggregateException with a simple flat list of inner exceptions:

catch (AggregateException aex)
{
  foreach (Exception ex in aex.Flatten().InnerExceptions)
    myLogWriter.LogException (ex);
}

Handle

Sometimes it’s useful to catch only specific exception types, and have other types rethrown. The Handle method on AggregateException provides a shortcut for doing this. It accepts an exception predicate which it runs over every inner exception:

public void Handle (Func<Exception, bool> predicate)

If the predicate returns true, it considers that exception “handled.” After the delegate has run over every exception, the following happens:

  • If all exceptions were “handled” (the delegate returned true), the exception is not rethrown.
  • If there were any exceptions for which the delegate returned false (“unhandled”), a new AggregateException is built up containing those exceptions, and is rethrown.

For instance, the following ends up rethrowing another AggregateException that contains a single NullReferenceException:

var parent = Task.Factory.StartNew (() => 
{
  // We’ll throw 3 exceptions at once using 3 child tasks:
 
  int[] numbers = { 0 };
 
  var childFactory = new TaskFactory
   (TaskCreationOptions.AttachedToParent, TaskContinuationOptions.None);
 
  childFactory.StartNew (() => 5 / numbers[0]);   // Division by zero
  childFactory.StartNew (() => numbers [1]);      // Index out of range
  childFactory.StartNew (() => { throw null; });  // Null reference
});
 
try { parent.Wait(); }
catch (AggregateException aex)
{
  aex.Flatten().Handle (ex =>   // Note that we still need to call Flatten
  {
    if (ex is DivideByZeroException)
    {
      Console.WriteLine ("Divide by zero");
      return true;                           // This exception is "handled"
    }
    if (ex is IndexOutOfRangeException)
    {
      Console.WriteLine ("Index out of range");
      return true;                           // This exception is "handled"   
    }
    return false;    // All other exceptions will get rethrown
  });
}

Concurrent Collections

Framework 4.0 provides a set of new collections in the System.Collections.Concurrent namespace. All of these are fully thread-safe:

Concurrent collection

Nonconcurrent equivalent

ConcurrentStack<T>

Stack<T>

ConcurrentQueue<T>

Queue<T>

ConcurrentBag<T>

(none)

BlockingCollection<T>

(none)

ConcurrentDictionary<TKey,TValue>

Dictionary<TKey,TValue>

The concurrent collections can sometimes be useful in general multithreading when you need a thread-safe collection. However, there are some caveats:

  • The concurrent collections are tuned for parallel programming. The conventional collections outperform them in all but highly concurrent scenarios.
  • A thread-safe collection doesn’t guarantee that the code using it will be thread-safe.
  • If you enumerate over a concurrent collection while another thread is modifying it, no exception is thrown. Instead, you get a mixture of old and new content.
  • There’s no concurrent version of List<T>.
  • The concurrent stack, queue, and bag classes are implemented internally with linked lists. This makes them less memory-efficient than the nonconcurrent Stack and Queue classes, but better for concurrent access because linked lists are conducive to lock-free or low-lock implementations. (This is because inserting a node into a linked list requires updating just a couple of references, while inserting an element into a List<T>-like structure may require moving thousands of existing elements.)

In other words, these collections don’t merely provide shortcuts for using an ordinary collection with a lock. To demonstrate, if we execute the following code on a single thread:

var d = new ConcurrentDictionary<int,int>();
for (int i = 0; i < 1000000; i++) d[i] = 123;

it runs three times more slowly than this:

var d = new Dictionary<int,int>();
for (int i = 0; i < 1000000; i++) lock (d) d[i] = 123;

(Reading from a ConcurrentDictionary, however, is fast because reads are lock-free.)

The concurrent collections also differ from conventional collections in that they expose special methods to perform atomic test-and-act operations, such as TryPop. Most of these methods are unified via the IProducerConsumerCollection<T> interface.

IProducerConsumerCollection<T>

A producer/consumer collection is one for which the two primary use cases are:

  • Adding an element (“producing”)
  • Retrieving an element while removing it (“consuming”)

The classic examples are stacks and queues. Producer/consumer collections are significant in parallel programming because they’re conducive to efficient lock-free implementations.

The IProducerConsumerCollection<T> interface represents a thread-safe producer/consumer collection. The following classes implement this interface:

ConcurrentStack<T>
ConcurrentQueue<T>
ConcurrentBag<T>

IProducerConsumerCollection<T> extends ICollection, adding the following methods:

void CopyTo (T[] array, int index);
T[] ToArray();
bool TryAdd (T item);
bool TryTake (out T item);

The TryAdd and TryTake methods test whether an add/remove operation can be performed, and if so, they perform the add/remove. The testing and acting are performed atomically, eliminating the need to lock as you would around a conventional collection:

int result;
lock (myStack) if (myStack.Count > 0) result = myStack.Pop();

TryTake returns false if the collection is empty. TryAdd always succeeds and returns true in the three implementations provided. If you wrote your own concurrent collection that prohibited duplicates, however, you’d make TryAdd return false if the element already existed (an example would be if you wrote a concurrent set).

The particular element that TryTake removes is defined by the subclass:

  • With a stack, TryTake removes the most recently added element.
  • With a queue, TryTake removes the least recently added element.
  • With a bag, TryTake removes whatever element it can remove most efficiently.

The three concrete classes mostly implement the TryTake and TryAdd methods explicitly, exposing the same functionality through more specifically named public methods such as TryDequeue and TryPop.

ConcurrentBag<T>

ConcurrentBag<T> stores an unordered collection of objects (with duplicates permitted). ConcurrentBag<T> is suitable in situations when you don’t care which element you get when calling Take or TryTake.

The benefit of ConcurrentBag<T> over a concurrent queue or stack is that a bag’s Add method suffers almost no contention when called by many threads at once. In contrast, calling Add in parallel on a queue or stack incurs some contention (although a lot less than locking around a nonconcurrent collection). Calling Take on a concurrent bag is also very efficient — as long as each thread doesn’t take more elements than it Added.

Inside a concurrent bag, each thread gets it own private linked list. Elements are added to the private list that belongs to the thread calling Add, eliminating contention. When you enumerate over the bag, the enumerator travels through each thread’s private list, yielding each of its elements in turn.

When you call Take, the bag first looks at the current thread’s private list. If there’s at least one element, it can complete the task easily and (in most cases) without contention. But if the list is empty, it must “steal” an element from another thread’s private list and incur the potential for contention.

So, to be precise, calling Take gives you the element added most recently on that thread; if there are no elements on that thread, it gives you the element added most recently on another thread, chosen at random.

Concurrent bags are ideal when the parallel operation on your collection mostly comprises Adding elements — or when the Adds and Takes are balanced on a thread. We saw an example of the former previously, when using Parallel.ForEach to implement a parallel spellchecker:

var misspellings = new ConcurrentBag<Tuple<int,string>>();
 
Parallel.ForEach (wordsToTest, (word, state, i) =>
{
  if (!wordLookup.Contains (word))
    misspellings.Add (Tuple.Create ((int) i, word));
});

A concurrent bag would be a poor choice for a producer/consumer queue, because elements are added and removed by different threads.

BlockingCollection<T>

If you call TryTake on any of the producer/consumer collections we discussed previously:

ConcurrentStack<T>
ConcurrentQueue<T>
ConcurrentBag<T>

and the collection is empty, the method returns false. Sometimes it would be more useful in this scenario to wait until an element is available.

Rather than overloading the TryTake methods with this functionality (which would have caused a blowout of members after allowing for cancellation tokens and timeouts), PFX’s designers encapsulated this functionality into a wrapper class called BlockingCollection<T>. A blocking collection wraps any collection that implements IProducerConsumerCollection<T> and lets you Take an element from the wrapped collection — blocking if no element is available.

A blocking collection also lets you limit the total size of the collection, blocking the producer if that size is exceeded. A collection limited in this manner is called a bounded blocking collection.

To use BlockingCollection<T>:

  1. Instantiate the class, optionally specifying the IProducerConsumerCollection<T> to wrap and the maximum size (bound) of the collection.
  2. Call Add or TryAdd to add elements to the underlying collection.
  3. 3.Call Take or TryTake to remove (consume) elements from the underlying collection.

If you call the constructor without passing in a collection, the class will automatically instantiate a ConcurrentQueue<T>. The producing and consuming methods let you specify cancellation tokens and timeouts. Add and TryAdd may block if the collection size is bounded; Take and TryTake block while the collection is empty.

Another way to consume elements is to call GetConsumingEnumerable. This returns a (potentially) infinite sequence that yields elements as they become available. You can force the sequence to end by calling CompleteAdding: this method also prevents further elements from being enqueued.

Previously, we wrote a producer/consumer queue using Wait and Pulse. Here’s the same class refactored to use BlockingCollection<T> (exception handling aside):

public class PCQueue : IDisposable
{
  BlockingCollection<Action> _taskQ = new BlockingCollection<Action>(); 
  public PCQueue (int workerCount)
  {
    // Create and start a separate Task for each consumer:
    for (int i = 0; i < workerCount; i++)
      Task.Factory.StartNew (Consume);
  }
 
  public void Dispose() { _taskQ.CompleteAdding(); }
 
  public void EnqueueTask (Action action) { _taskQ.Add (action); }
 
  void Consume()
  {
    // This sequence that we’re enumerating will block when no elements
    // are available and will end when CompleteAdding is called. 
    foreach (Action action in _taskQ.GetConsumingEnumerable())
      action();     // Perform task.
  }
}

Because we didn’t pass anything into BlockingCollection’s constructor, it instantiated a concurrent queue automatically. Had we passed in a ConcurrentStack, we’d have ended up with a producer/consumer stack.

BlockingCollection also provides static methods called AddToAny and TakeFromAny, which let you add or take an element while specifying several blocking collections. The action is then honored by the first collection able to service the request.

Leveraging TaskCompletionSource

The producer/consumer that we just wrote is inflexible in that we can’t track work items after they’ve been enqueued. It would be nice if we could:

  • Know when a work item has completed.
  • Cancel an unstarted work item.
  • Deal elegantly with any exceptions thrown by a work item.

An ideal solution would be to have the EnqueueTask method return some object giving us the functionality just described. The good news is that a class already exists to do exactly this — the Task class. All we need to do is to hijack control of the task via TaskCompletionSource:

public class PCQueue : IDisposable
{
  class WorkItem
  {
    public readonly TaskCompletionSource<object> TaskSource;
    public readonly Action Action;
    public readonly CancellationToken? CancelToken;
 
    public WorkItem (
      TaskCompletionSource<object> taskSource,
      Action action,
      CancellationToken? cancelToken)
    {
      TaskSource = taskSource;
      Action = action;
      CancelToken = cancelToken;
    }
  }
 
  BlockingCollection<WorkItem> _taskQ = new BlockingCollection<WorkItem>();
 
  public PCQueue (int workerCount)
  {
    // Create and start a separate Task for each consumer:
    for (int i = 0; i < workerCount; i++)
      Task.Factory.StartNew (Consume);
  }
 
  public void Dispose() { _taskQ.CompleteAdding(); }
 
  public Task EnqueueTask (Action action) 
  {
    return EnqueueTask (action, null);
  }
 
  public Task EnqueueTask (Action action, CancellationToken? cancelToken)
  {
    var tcs = new TaskCompletionSource<object>();
    _taskQ.Add (new WorkItem (tcs, action, cancelToken));
    return tcs.Task;
  }
 
  void Consume()
  {
    foreach (WorkItem workItem in _taskQ.GetConsumingEnumerable())
      if (workItem.CancelToken.HasValue && 
          workItem.CancelToken.Value.IsCancellationRequested)
      {
        workItem.TaskSource.SetCanceled();
      }
      else
        try
        {
          workItem.Action();
          workItem.TaskSource.SetResult (null);   // Indicate completion
        }
        catch (OperationCanceledException ex)
        {
          if (ex.CancellationToken == workItem.CancelToken)
            workItem.TaskSource.SetCanceled();
          else
            workItem.TaskSource.SetException (ex);
        }
        catch (Exception ex)
        {
          workItem.TaskSource.SetException (ex);
        }
  }
}

In EnqueueTask, we enqueue a work item that encapsulates the target delegate and a task completion source — which lets us later control the task that we return to the consumer.

In Consume, we first check whether a task has been canceled after dequeuing the work item. If not, we run the delegate and then call SetResult on the task completion source to indicate its completion.

Here’s how we can use this class:

var pcQ = new PCQueue (1);
Task task = pcQ.EnqueueTask (() => Console.WriteLine ("Easy!"));
...

We can now wait on task, perform continuations on it, have exceptions propagate to continuations on parent tasks, and so on. In other words, we’ve got the richness of the task model while, in effect, implementing our own scheduler.

SpinLock and SpinWait

In parallel programming, a brief episode of spinning is often preferable to blocking, as it avoids the cost of context switching and kernel transitions. SpinLock and SpinWait are designed to help in such cases. Their main use is in writing custom synchronization constructs.

SpinLock and SpinWait are structs and not classes! This design decision was an extreme optimization technique to avoid the cost of indirection and garbage collection. It means that you must be careful not to unintentionally copy instances — by passing them to another method without the ref modifier, for instance, or declaring them as readonly fields. This is particularly important in the case of SpinLock.

SpinLock

The SpinLock struct lets you lock without incurring the cost of a context switch, at the expense of keeping a thread spinning (uselessly busy). This approach is valid in high-contention scenarios when locking will be very brief (e.g., in writing a thread-safe linked list from scratch).

If you leave a spinlock contended for too long (we’re talking milliseconds at most), it will yield its time slice, causing a context switch just like an ordinary lock. When rescheduled, it will yield again — in a continual cycle of “spin yielding.” This consumes far fewer CPU resources than outright spinning — but more than blocking.

On a single-core machine, a spinlock will start “spin yielding” immediately if contended.

Using a SpinLock is like using an ordinary lock, except:

  • Spinlocks are structs (as previously mentioned).
  • Spinlocks are not reentrant, meaning that you cannot call Enter on the same SpinLock twice in a row on the same thread. If you violate this rule, it will either throw an exception (if owner tracking is enabled) or deadlock (if owner tracking is disabled). You can specify whether to enable owner tracking when constructing the spinlock. Owner tracking incurs a performance hit.
  • SpinLock lets you query whether the lock is taken, via the properties IsHeld and, if owner tracking is enabled, IsHeldByCurrentThread.
  • There’s no equivalent to C#‘s lock statement to provide SpinLock syntactic sugar.

Another difference is that when you call Enter, you must follow the robust pattern of providing a lockTaken argument (which is nearly always done within a try/finally block).

Here’s an example:

var spinLock = new SpinLock (true);   // Enable owner tracking
bool lockTaken = false;
try
{
  spinLock.Enter (ref lockTaken);
  // Do stuff...
}
finally
{
  if (lockTaken) spinLock.Exit();
}

As with an ordinary lock, lockTaken will be false after calling Enter if (and only if) the Enter method throws an exception and the lock was not taken. This happens in very rare scenarios (such as Abort being called on the thread, or an OutOfMemoryException being thrown) and lets you reliably know whether to subsequently call Exit.

SpinLock also provides a TryEnter method which accepts a timeout.

Given SpinLock’s ungainly value-type semantics and lack of language support, it’s almost as if they want you to suffer every time you use it! Think carefully before dismissing an ordinary lock.

SpinLock makes the most sense when writing your own reusable synchronization constructs. Even then, a spinlock is not as useful as it sounds. It still limits concurrency. And it wastes CPU time doing nothing useful. Often, a better choice is to spend some of that time doing something speculative — with the help of SpinWait.

SpinWait

SpinWait helps you write lock-free code that spins rather than blocks. It works by implementing safeguards to avoid the dangers of resource starvation and priority inversion that might otherwise arise with spinning.

Lock-free programming with SpinWait is as hardcore as multithreading gets and is intended for when none of the higher-level constructs will do. A prerequisite is to understand Nonblocking Synchronization.

Why we need SpinWait

Suppose we wrote a spin-based signaling system based purely on a simple flag:

bool _proceed;
void Test()
{
  // Spin until another thread sets _proceed to true:
  while (!_proceed) Thread.MemoryBarrier();
  ...
}

This would be highly efficient if Test ran when _proceed was already true — or if _proceed became true within a few cycles. But now suppose that _proceed remained false for several seconds — and that four threads called Test at once. The spinning would then fully consume a quad-core CPU! This would cause other threads to run slowly (resource starvation) — including the very thread that might eventually set _proceed to true (priority inversion). The situation is exacerbated on single-core machines, where spinning will nearly always cause priority inversion. (And although single-core machines are rare nowadays, single-core virtual machines are not.)

SpinWait addresses these problems in two ways. First, it limits CPU-intensive spinning to a set number of iterations, after which it yields its time slice on every spin (by calling Thread.Yield and Thread.Sleep), lowering its resource consumption. Second, it detects whether it’s running on a single-core machine, and if so, it yields on every cycle.

How to use SpinWait

There are two ways to use SpinWait. The first is to call its static method, SpinUntil. This method accepts a predicate (and optionally, a timeout):

bool _proceed;
void Test()
{
  SpinWait.SpinUntil (() => { Thread.MemoryBarrier(); return _proceed; });
  ...
}

The other (more flexible) way to use SpinWait is to instantiate the struct and then to call SpinOnce in a loop:

bool _proceed;
void Test()
{
  var spinWait = new SpinWait();
  while (!_proceed) { Thread.MemoryBarrier(); spinWait.SpinOnce(); }
  ...
}

The former is a shortcut for the latter.

How SpinWait works

In its current implementation, SpinWait performs CPU-intensive spinning for 10 iterations before yielding. However, it doesn’t return to the caller immediately after each of those cycles: instead, it calls Thread.SpinWait to spin via the CLR (and ultimately the operating system) for a set time period. This time period is initially a few tens of nanoseconds, but doubles with each iteration until the 10 iterations are up. This ensures some predictability in the total time spent in the CPU-intensive spinning phase, which the CLR and operating system can tune according to conditions. Typically, it’s in the few-tens-of-microseconds region — small, but more than the cost of a context switch.

On a single-core machine, SpinWait yields on every iteration. You can test whether SpinWait will yield on the next spin via the property NextSpinWillYield.

If a SpinWait remains in “spin-yielding” mode for long enough (maybe 20 cycles) it will periodically sleep for a few milliseconds to further save resources and help other threads progress.

Lock-free updates with SpinWait and Interlocked.CompareExchange

SpinWait in conjunction with Interlocked.CompareExchange can atomically update fields with a value calculated from the original (read-modify-write). For example, suppose we want to multiply field x by 10. Simply doing the following is not thread-safe:

x = x * 10;

for the same reason that incrementing a field is not thread-safe, as we saw in Nonblocking Synchronization.

The correct way to do this without locks is as follows:

  1. Take a “snapshot” of x into a local variable.
  2. Calculate the new value (in this case by multiplying the snapshot by 10).
  3. Write the calculated value back if the snapshot is still up-to-date (this step must be done atomically by calling Interlocked.CompareExchange).
  4. If the snapshot was stale, spin and return to step 1.

For example:

int x;
 
void MultiplyXBy (int factor)
{
  var spinWait = new SpinWait();
  while (true)
  {
    int snapshot1 = x;
    Thread.MemoryBarrier();
    int calc = snapshot1 * factor;
    int snapshot2 = Interlocked.CompareExchange (ref x, calc, snapshot1);
    if (snapshot1 == snapshot2) return;   // No one preempted us.
    spinWait.SpinOnce();
  }
}

We can improve performance (slightly) by doing away with the call to Thread.MemoryBarrier. We can get away with this because CompareExchange generates a memory barrier anyway — so the worst that can happen is an extra spin if snapshot1 happens to read a stale value in its first iteration.

Interlocked.CompareExchange updates a field with a specified value if the field’s current value matches the third argument. It then returns the field’s old value, so you can test whether it succeeded by comparing that against the original snapshot. If the values differ, it means that another thread preempted you, in which case you spin and try again.

CompareExchange is overloaded to work with the object type too. We can leverage this overload by writing a lock-free update method that works with all reference types:

static void LockFreeUpdate<T> (ref T field, Func <T, T> updateFunction)
  where T : class
{
  var spinWait = new SpinWait();
  while (true)
  {
    T snapshot1 = field;
    T calc = updateFunction (snapshot1);
    T snapshot2 = Interlocked.CompareExchange (ref field, calc, snapshot1);
    if (snapshot1 == snapshot2) return;
    spinWait.SpinOnce();
  }
}

Here’s how we can use this method to write a thread-safe event without locks (this is, in fact, what the C# 4.0 compiler now does by default with events):

EventHandler _someDelegate;
public event EventHandler SomeEvent
{
  add    { LockFreeUpdate (ref _someDelegate, d => d + value); }
  remove { LockFreeUpdate (ref _someDelegate, d => d - value); }
}

SpinWait Versus SpinLock

We could solve these problems instead by wrapping access to the shared field around a SpinLock. The problem with spin locking, though, is that it allows only one thread to proceed at a time — even though the spinlock (usually) eliminates the context-switching overhead. With SpinWait, we can proceed speculativelyand assume no contention. If we do get preempted, we simply try again. Spending CPU time doing something that might work is better than wasting CPU time in a spinlock!

Finally, consider the following class:

class Test
{
  ProgressStatus _status = new ProgressStatus (0, "Starting");
 
  class ProgressStatus    // Immutable class
  {
    public readonly int PercentComplete;
    public readonly string StatusMessage;
 
    public ProgressStatus (int percentComplete, string statusMessage)
    {
      PercentComplete = percentComplete;
      StatusMessage = statusMessage;
    }
  }
}

We can use our LockFreeUpdate method to “increment” the PercentComplete field in _status as follows:

LockFreeUpdate (ref _status,
  s => new ProgressStatus (s.PercentComplete + 1, s.StatusMessage));

Notice that we’re creating a new ProgressStatus object based on existing values. Thanks to the LockFreeUpdate method, the act of reading the existing PercentComplete value, incrementing it, and writing it back can’t get unsafely preempted: any preemption is reliably detected, triggering a spin and retry.

 

 

 

AsParallel是System.Linq.ParallelEnumerable中的扩展方法。它将输入包装为基于ParallelQuery <TSource>的序列,这将导致随后调用的LINQ查询运算符绑定到在ParallelEnumerable中定义的另一组扩展方法。这些提供了每个标准查询运算符的并行实现。本质上,它们通过将输入序列划分为在不同线程上执行的块,然后将结果整理回单个输出序列以供使用来工作:

技术分享图片

C#线程 并行编程

原文:https://www.cnblogs.com/wxs121/p/12547399.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!