锐单电子商城 , 一站式电子元器件采购平台!
  • 电话:400-990-0325

《C++ Concurrencyin Action》第8章--并发代码设计

时间:2023-02-06 16:00:00 16zs矩形连接器sunx电涡流位移传感器j6d系列矩形电连接器j6wq24j6a连接器

本章的主要内容

· 线程划分数据的技术

· 影响并发代码性能的因素

· 性能因素是如何影响数据结构设计的

· 多线程代码中的异常安全

· 可扩展性

· 实现并行算法

前一章重点介绍使用C 并发代码写在11中的新工具上。在第六章和第七章中,我们了解到如何使用这些工具来设计可并发访问的基本数据结构。这就像一个木匠,他不仅知道如何制作铰链、组合柜或桌子;并发代码的使用比基本数据结构的使用/设计要频繁得多。为了开阔视野,需要建立更大的结构,高效工作。我将使用多线程C 以标准库算法为例,但这里的原则也适用于扩展其他应用程序。

认真思考如何进行并发化设计,对于每个编程项目来说都很重要。不过,写多线程代码的时候,需要考虑的因素比写序列化代码多得多。不仅包括一般性因素,例如:封装,耦合和聚合(这些在很多软件设计书籍中有很详细的介绍),还要考虑哪些数据需要共享,如何同步访问数据,哪些线程需要等待哪些线程,等等。

本章将关注这些问题,考虑如何使用线程,哪些代码应该在哪些线程上执行,以及如何影响代码的清晰度,并了解如何构建共享数据来优化性能。

先来看看如何在线程间划分工作。

8.1线程间划分工作的技术****

想象一下,你被要求建房子。为了完成任务,需要挖基础,砌墙,加水暖,接电线等。理论上,如果你擅长建房,这些事情都可以由你来完成,但这需要很长时间,需要不断切换任务。或者,你可以雇人帮你建房子。现在你需要决定雇佣多少人,雇员有什么技能。比如你可以雇几个人,这些人什么都能做。现在你必须不断地切换任务,但是因为你雇佣了很多人,你会比以前快得多。

或者,你可以雇佣一个由瓦工、木匠、电工和水管工组成的承包团队(专家组)。你的承包商只擅长,所以当没有水暖任务时,水管工会坐在那里休息,喝茶或咖啡。因为人多,比以前快多了。水管工打扫卫生时,电工可以把电线连接到厨房,但当没有自己的任务时,有人会休息。即使有人在休息,你也可能会觉得包工队比雇佣一群什么都能做的人要快。承包商不需要更换工具,每个人的任务都比有能力的人快。取决于具体情况——需要尝试和观察。

即使雇佣承包商,你仍然可以选择不同数量的团队(也许在一个团队中,瓦工的数量超过了电工)。同样,这将是一种补充,当建造不止一栋房子时,它将改变整体效率。即使水管工没有太多的任务,你仍然可以让他在建房后总是忙碌。当承包商无事可做时,你不会给他们钱;即使每次只有少数人工作,你也需要承担整个团队的费用。

建造例子足以解释问题;这与线程所做的事情有什么关系?在线程中也会出现这些问题。你需要决定使用多少线程,这些线程应该做什么。还需要决定是用全能线程完成所有任务,还是用专业线程只完成一件事,还是混合两种方法。并发使用时,需要做出许多选择来驱动并发,这将决定代码的性能和清晰度。因此,这里的选择非常重要,所以在设计应用程序的结构时,做出适当的决定。在本节中,将看到很多划分任务的技术,就先从线程间划分数据开始吧!

8.1.1** 在线程处理前划分数据**

最简单的并行算法是并行化std::for_each,它将对一个数据集中的每个元素进行相同的操作。为了并行化算法,可以为数据集中的每个元素分配一个处理线程。如何划分最佳性能在很大程度上取决于数据结构实现的细节,并在以后的性能问题章节中提到。

最简单的分配方法:第一组N元素分配一个线程,下一组N元素分配一个线程,等等.1.无论数据如何划分,每个线程都会操作分配给它的元素,但在处理完成之前不会与其他线程沟通。

[外链图片存储失败,源站可能有防盗链机制,建议保存图片直接上传(img-s9j6jLrd-1657616056592)(file:///C:/Users/sunx/AppData/Local/Temp/msohtmlclip1/01/clip_image002.gif)]

图8.1 连续数据块向线程分发

使用过MPI(Message PassingInterface)[1]和OpenMP[2]人们必须熟悉这个结构:一个任务分为多个任务,并将其放入并行任务集中,并独立执行这些任务。因此,它将在主线程中合并。这种方式在2.4节中的accumulate在这个例子中使用过;在这个例子中,所有务和主线程任务都是累积的。对于for_each主线程将无事可做,因为这个计算不需要最终处理。

最后一步对并行程序非常重要;如清单2.8中原始实现的最后一步是串行。但这一步也可以并行化;accumulate其实是递减操作,所以清单2.当线程数量大于一个线程上的最小处理项时,8中accumulate递归调用。或者,工作线程就像做一个完整的任务来减少步骤,而不是每次都产生新的线程。

虽然这项技术非常强大,但它并不适用于任何地方。有时不能像以前那样整齐地划分任务,因为只有在处理数据后才能明确划分。递归算法特别适用于快速排序;让我们来看看这种特殊的方法。

8.1.2** 递归划分**

快速排序有两个基本步骤:将数据划分为中枢元素之前或之后,然后快速排序中枢元素之前和之后的两半数组。这里不能通过简单的数据划分来平行,因为只有在一次排序结束后,我们才能知道哪些项目在中枢元素之前和之后。当这种算法并行化时,自然会想到使用递归。各级递归将多次调用quick_sort函数,因为在中枢元素之前和之后需要知道哪些元素。递归调用是完全独立的,因为它访问不同的数据集,并且每次迭代都可以并发执行。图8.展示了这样的递归划分。

[外链图片存储失败,源站可能有防盗链机制,建议保存图片直接上传(img-sYI5N1Zs-1657616056593)(file:///C:/Users/sunx/AppData/Local/Temp/msohtmlclip1/01/clip_image004.gif)]

图 8.2 对数据进行划分

这种实现已经在第四章中看到了。与大于和小于的数据块递归调用函数相比,使用std::async()可生成每个级别小于数据块的异步任务。std::async()时,C 线程库可以决定何时执行新的线程和同步执行任务。

重要的是:在对大数据集进行排序时,当每层递归产生新的线程时,最终会产生大量的线程。如果线程太多,你会看到它对性能的影响,那么你的应用程序就会运行得很慢。如果数据集太大,线程就会耗尽。那么在递归的基础上划分任务是个好主意;你只需要打包一定数量的数据,然后交给线程。std::async()这种简单的情况可以出去,但这不是唯一的选择。

另一种选择是使用std::thread::hardware_concurrency()函数确定线程的数量,就像列表2一样.8中的并行版accumulate()一样。然后,您可以将排序数据推到线程安全栈(如第6章和第7章中提到的栈)。当线程无所事事时,要么完成对自己数据块的梳理,要么等待一组排序数据的生成;线程可以从栈中获取这组数据并对其进行排序。

使用上述方法实现以下代码。

清单8.1 使用栈并行快速排序算法-等待数据块排序

template

struct sorter // 1

{

struct chunk_to_sort

{

std::list data; 

std::promise promise;

};

thread_safe_stack chunks; // 2

std::vectorstd::threadthreads; // 3

unsigned const max_thread_count;

std::atomicend_of_data;

sorter():max_thread_count(std::thread::hardware_concurrency()-1),end_of_data(false){}

~sorter() //** 4**

{

end_of_data=true;  //** 5**  for(unsignedi=0;i

}

void try_sort_chunk()

{

boost::shared_ptr chunk=chunks.pop(); // 7

if(chnk)

{

 sort_chunk(chunk);  // **8**

}

}

std::listdo_sort(std::list& chunk_data) // 9

{

if(chunk_data.empty())

{

  return chunk_data;

}



std::list result;

result.splice(result.begin(),chunk_data,chunk_data.begin());

T const& partition_val=*result.begin();

//** 10**

typenamestd::list::iterator divide_point= 

  std::partition(chunk_data.begin(),chunk_data.end(), [&](T const&val){return val

std::futurenew_lower=new_lower_chunk.promise.get_future();

chunks.push(std::move(new_lower_chunk)); // 11

if(threads.size()::sort_thread,this));

}



std::listnew_higher(do_sort(chunk_data));

result.splice(result.end(),new_higher);

while(new_lower.wait_for(std::chrono::seconds(0)) !=std::future_status::ready) //** 13**

{

  try_sort_chunk();  // **14**

}

result.splice(result.begin(),new_lower.get());

return result;

}

voidsort_chunk(boost::shared_ptr const& chunk)

{

chunk->promise.set_value(do_sort(chunk->data)); //** 15**

}

void sort_thread()

{

while(!end_of_data)  // **16**

{

  try_sort_chunk();  // **17**

 std::this_thread::yield();  // **18**

}

}

};

template

std::list parallel_quick_sort(std::list input) // 19

{

if(input.empty())

{

return input;

}

sorter s;

return s.do_sort(input); // 20

}

这里,parallel_quick_sort函数⑲代表了sorter类①的功能,其支持在栈上简单的存储无序数据块②,并且对线程进行设置③。do_sort成员函数⑨主要做的就是对数据进行划分⑩。相较于对每一个数据块产生一个新的线程,这次会将这些数据块推到栈上⑪;并在有备用处理器⑫的时候,产生新线程。因为小于部分的数据块可能由其他线程进行处理,那么就得等待这个线程完成⑬。为了让所有事情顺利进行(只有一个线程和其他所有线程都忙碌时),当线程处于等待状态时⑭,就让当前线程尝试处理栈上的数据。try_sort_chunk只是从栈上弹出一个数据块⑦,并且对其进行排序⑧,将结果存在promise中,让线程对已经存在于栈上的数据块进行提取⑮。

当end_of_data没有被设置时⑯,新生成的线程还在尝试从栈上获取需要排序的数据块⑰。在循环检查中,也要给其他线程机会⑱,可以从栈上取下数据块进行更多的操作。这里的实现依赖于sorter类④对线程的清理。当所有数据都已经排序完成,do_sort将会返回(即使还有工作线程在运行),所以主线程将会从parallel_quick_sort⑳中返回,在这之后会销毁sorter对象。析构函数会设置end_of_data标志⑤,以及等待所有线程完成工作⑥。标志的设置将终止线程函数内部的循环⑯。

在这个方案中,不用为spawn_task产生的无数线程所困扰,并且也不用再依赖C++线程库,为你选择执行线程的数量(就像std::async()那样)。该方案制约线程数量的值就是std::thread::hardware_concurrency()的值,这样就能避免任务过于频繁的切换。不过,这里还有两个问题:线程管理和线程通讯。要解决这两个问题就要增加代码的复杂程度。虽然,线程对数据项是分开处理的,不过所有对栈的访问,都可以向栈添加新的数据块,并且移出数据块以作处理。这里重度的竞争会降低性能(即使使用无(无阻塞)栈),原因将会在后面提到。

这个方案使用到了一个特殊的线程池——所有线程的任务都来源于一个等待链表,然后线程会去完成任务,完成任务后会再来链表提取任务。这个线程池很有问题(包括对工作链表的竞争),这个问题的解决方案将在第9章提到。关于多处理器的问题,将会在本章后面的章节中做出更为详细的介绍(详见8.2.1)。

几种划分方法:1,处理前划分;2,递归划分(都需要事先知道数据的长度固定),还有上面的那种划分方式。事情并非总是这样好解决;当数据是动态生成,或是通过外部输入,那么这里的办法就不适用了。在这种情况下,基于任务类型的划分方式,就要好于基于数据的划分方式。

8.1.3** 通过任务类型划分工作**

虽然为每个线程分配不同的数据块,但工作的划分(无论是之前就划分好,还是使用递归的方式划分)仍然在理论阶段,因为这里每个线程对每个数据块的操作是相同的。而另一种选择是让线程做专门的工作,也就是每个线程做不同的工作,就像水管工和电工在建造一所屋子的时候所做的不同工作那样。线程可能会对同一段数据进行操作,但它们对数据进行不同的操作。

对分工的排序,也就是从并发分离关注结果;每个线程都有不同的任务,这就意味着真正意义上的线程独立。其他线程偶尔会向特定线程交付数据,或是通过触发事件的方式来进行处理;不过总体而言,每个线程只需要关注自己所要做的事情即可。其本身就是基本良好的设计,每一段代码只对自己的部分负责。

分离关注

当有多个任务需要持续运行一段时间,或需要及时进行处理的事件(比如,按键事件或传入网络数据),且还有其他任务正在运行时,单线程应用采用的是单职责原则处理冲突。单线程的世界中,代码会执行任务A(部分)后,再去执行任务B(部分),再检查按钮事件,再检查传入的网络包,然后在循环回去,执行任务A。这将会使得任务A复杂化,因为需要存储完成状态,以及定期从主循环中返回。如果在循环中添加了很多任务,那么程序将运行的很慢;并且用户会发现,在他/她按下按键后,很久之后才会有反应。我确定你已经在一些程序中见过这种情况:你给程序分配一项任务后,发现接口会封锁,直到这项任务完成。

当使用独立线程执行任务时,操作系统会帮你处理接口问题。在执行任务A时,线程可以专注于执行任务,而不用为保存状态从主循环中返回。操作系统会自动保存状态,当需要的时候,将线程切换到任务B或任务C。如果目标系统是带有多核或多个处理器,任务A和任务B可很可能真正的并发执行。这样处理按键时间或网络包的代码,就能及时执行了。所有事情都完成的很好,用户得到了及时的响应;当然,作为开发者只需要写具体操作的代码即可,不用再将控制分支和使用用户交互混在一起了。

听起来不错,玫瑰色的愿景呀。事实真像上面所说的那样简单?一切取决于细节。如果每件事都是独立的,那么线程间就不需要交互,这样的话一切都很简单了。不幸的是,现实没那么美好。后台那些优雅的任务,经常会被用户要求做一些事情,并且它们需要通过更新用户接口的方式,来让用户知道它们完成了任务。或者,用户可能想要取消任务,这就需要用户向接口发送一条消息,告知后台任务停止运行。这两种情况都需要认真考虑,设计,以及适当的同步,不过这里担心的部分还是分离的。用户接口线程只能处理用户接口,当其他线程告诉该线程要做什么时,用户接口线程会进行更新。同样,后台线程只运行它们所关注的任务;只是,有时会发生“允许任务被其他线程所停止”的情况。在这两种情况下,后台线程需要照顾来自其他线程的请求,线程本身只知道它们请求与自己的任务有所关联。

多线程下有两个危险需要分离关注。第一个是对错误担忧的分离,主要表现为线程间共享着很多的数据,或者不同的线程要相互等待;这两种情况都是因为线程间很密切的交互。当这种情况发生,就需要看一下为什么需要这么多交互。当所有交互都有关于同样的问题,就应该使用单线程来解决,并将引用同一原因的线程提取出来。或者,当有两个线程需要频繁的交流,且没有其他线程时,那么就可以将这两个线程合为一个线程。

当通过任务类型对线程间的任务进行划分时,不应该让线程处于完全隔离的状态。当多个输入数据集需要使用同样的操作序列,可以将序列中的操作分成多个阶段,来让每个线程执行。

划分任务序列

当任务会应用到相同操作序列,去处理独立的数据项时,就可以使用流水线(pipeline)系统进行并发。这好比一个物理管道:数据流从管道一端进入,在进行一系列操作后,从管道另一端出去。

使用这种方式划分工作,可以为流水线中的每一阶段操作创建一个独立线程。当一个操作完成,数据元素会放在队列中,以供下一阶段的线程提取使用。这就允许第一个线程在完成对于第一个数据块的操作,并要对第二个数据块进行操作时,第二个线程可以对第一个数据块执行管线中的第二个操作。

这就是在线程间划分数据的一种替代方案(如8.1.1描述);这种方式适合于操作开始前,且对输入数据处长度不清楚的情况。例如,数据来源可能是从网络,或者可能是通过扫描文件系统来确定要处理的文件。

流水线对于队列中耗时的操作处理的也很合理;通过对线程间任务的划分,就能对应用的性能所有改善。假设有20个数据项,需要在四核的机器上处理,并且每一个数据项需要四个步骤来完成操作,每一步都需要3秒来完成。如果你将数据分给了四个线程,那么每个线程上就有5个数据项要处理。假设在处理的时候,没有其他线程对处理过程进行影响,在12秒后4个数据项处理完成,24秒后8个数据项处理完成,以此类推。当20个数据项都完成操作,就需要1分钟的时间。在管线中就会完全不同。四步可以交给四个内核。那么现在,第一个数据项可以被每一个核进行处理,所以其还是会消耗12秒。的确,在12秒后你就能得到一个处理过的数据项,这相较于数据划分并没有好多少。不过,当流水线流动起来,事情就会不一样了;在第一个核处理第一个数据项后,数据项就会交给下一个内核,所以第一个核在处理完第一个数据项后,其还可以对第二个数据项进行处理。那么在12秒后,每3秒将会得到一个已处理的数据项,这就要好于每隔12秒完成4个数据项。

为什么整批处理的时间要长于流水线呢?因为你需要在最终核开始处理第一个元素前等待9秒。更平滑的操作,能在某些情况下获益更多。考虑如下情况:当一个系统用来播放高清数字视频。为了让视频能够播放,你至少要保证25帧每秒的解码速度。同样的,这些图像需要有均匀的间隔,才会给观众留有连续播放的感觉;一个应用可以在1秒解码100帧,不过在解完就需要暂停1s的时候,这个应用就没有意义了。另一方面,观众能接受在视频开始播放的时候有一定的延迟。这种情况,并行使用流水线就能得到稳定的解码率。

看了这么多线程间划分工作的技术,接下来让我们来看一下在多线程系统中有哪些因素会影响性能,并且这些因素是如何影响你选择划分方案的。

8.2影响并发代码性能的因素****

在多处理系统中,使用并发的方式来提高代码的效率时,你需要了解一下有哪些因素会影响并发的效率。即使已经使用多线程对关注进行分离,还需要确定是否会对性能造成负面影响。因为,在崭新16核机器上应用的速度与单核机器相当时,用户是不会打死你的。

之后你会看到,在多线程代码中有很多因素会影响性能——对线程处理的数据做一些简单的改动(其他不变),都可能对性能产生戏剧性的效果。所以,多言无益,让我们来看一下这些因素吧,从明显的开始:目标系统有多少个处理器?

8.2.1** 有多少个处理器?**

处理器个数是影响多线程应用的首要因素。在某些情况下,你对目标硬件会很熟悉,并且针对硬件进行设计,并在目标系统或副本上进行测量。如果是这样,那你很幸运;不过,要知道这些都是很奢侈的。你可能在一个类似的平台上进行开发,不过你所使用的平台与目标平台的差异很大。例如,你可能会在一个双芯或四芯的系统上做开发,不过你的用户系统可能就只有一个处理器(可能有很多芯),或多个单芯处理器,亦或是多核多芯的处理器。在不同的平台上,并发程序的行为和性能特点就可能完全不同,所以你需要仔细考虑那些地方会被影响到,如果会被影响,就需要在不同平台上进行测试。

一个单核16芯的处理器和四核双芯或十六核单芯的处理器相同:在任何系统上,都能运行16个并发线程。当线程数量少于16个时,会有处理器处于空闲状态(除非系统同时需要运行其他应用,不过我们暂时忽略这种可能性)。另一方面,当多于16个线程在运行的时候(都没有阻塞或等待),应用将会浪费处理器的运算时间在线程间进行切换,如第1章所述。这种情况发生时,我们称其为超额认购(oversubscription)。

为了扩展应用线程的数量,与硬件所支持的并发线程数量一致,C++标准线程库提供了std::thread::hardware_concurrency()。使用这个函数就能知道在给定硬件上可以扩展的线程数量了。

需要谨慎使用std::thread::hardware_concurrency(),因为代码不会考虑有其他运行在系统上的线程(除非已经将系统信息进行共享)。最坏的情况就是,多线程同时调用std::thread::hardware_concurrency()函数来对线程数量进行扩展,这样将导致庞大的超额认购。std::async()就能避免这个问题,因为标准库会对所有的调用进行适当的安排。同样,谨慎的使用线程池也可以避免这个问题。

不过,即使你已经考虑到所有在应用中运行的线程,程序还要被同时运行的其他程序所影响。虽然,在单用户系统中,使用多个CPU密集型应用程序很罕见,但在某些领域,这种情况就很常见了。虽然系统能提供选择线程数量的机制,但这种机制已经超出C++标准的范围。这里的一种选择是使用与std::async()类似的工具,来为所有执行异步任务的线程的数量做考虑;另一种选择就是,限制每个应用使用的处理芯个数。我倒是希望,这种限制能反映到std::thread::hardware_concurrency()上面(不能保证)。如果你需要处理这种情况,可以看一下你所使用的系统说明,了解一下是否有相关选项可供使用。

理想算法可能会取决于问题规模与处理单元的比值。大规模并行系统中有很多的处理单元,算法可能就会同时执行很多操作,让应用更快的结束;这就要快于执行较少操作的平台,因为该平台上的每一个处理器只能执行很少的操作。

随着处理器数量的增加,另一个问题就会来影响性能:多个处理器尝试访问同一个数据。

8.2.2** 数据争用与乒乓缓存**

当两个线程并发的在不同处理器上执行,并且对同一数据进行读取,通常不会出现问题;因为数据将会拷贝到每个线程的缓存中,并且可以让两个处理器同时进行处理。不过,当有线程对数据进行修改的时候,这个修改需要更新到其他核芯的缓存中去,就要耗费一定的时间。根据线程的操作性质,以及使用到的内存序,这样的修改可能会让第二个处理器停下来,等待硬件内存更新缓存中的数据。即便是精确的时间取决于硬件的物理结构,不过根据CPU指令,这是一个特别特别慢的操作,相当于执行成百上千个独立指令。

思考下面简短的代码段:

std::atomic counter(0);

void processing_loop()

{

while(counter.fetch_add(1,std::memory_order_relaxed)<100000000)

{

do_something();

}

}

counter变量是全局的,所以任何线程都能调用processing_loop()去修改同一个变量。因此,当新增加的处理器时,counter变量必须要在缓存内做一份拷贝,再改变自己的值,或其他线程以发布的方式对缓存中的拷贝副本进行更新。即使用std::memory_order_relaxed,编译器不会为任何数据做同步操作,fetch_add是一个“读-改-写”操作,因此就要对最新的值进行检索。如果另一个线程在另一个处理器上执行同样的代码,counter的数据需要在两个处理器之间进行传递,那么这两个处理器的缓存中间就存有counter的最新值(当counter的值增加时)。如果do_something()足够短,或有很多处理器来对这段代码进行处理时,处理器将会互相等待;一个处理器准备更新这个值,另一个处理器正在修改这个值,所以该处理器就不得不等待第二个处理器更新完成,并且完成更新传递时,才能执行更新。这种情况被称为高竞争(high contention)。如果处理器很少需要互相等待,那么这种情况就是低竞争(low contention)。

在这个循环中,counter的数据将在每个缓存中传递若干次。这就叫做乒乓缓存(cache ping-pong),这种情况会对应用的性能有着重大的影响。当一个处理器因为等待缓存转移而停止运行时,这个处理器就不能做任何事情,所以对于整个应用来说,这就是一个坏消息。

你可能会想,这种情况不会发生在你身上;因为,你没有使用任何循环。你确定吗?那么互斥锁呢?如果你需要在循环中放置一个互斥量,那么你的代码就和之前从数据访问的差不多了。为了锁住互斥量,另一个线程必须将数据进行转移,就能弥补处理器的互斥性,并且对数据进行修改。当这个过程完成时,将会再次对互斥量进行修改,并对线程进行解锁,之后互斥数据将会传递到下一个需要互斥量的线程上去。转移时间,就是第二个线程等待第一个线程释放互斥量的时间:

std::mutex m;

my_data data;

void processing_loop_with_mutex()

{

while(true)

{

std::lock_guardstd::mutex lk(m);

if(done_processing(data)) break;

}

}

接下来看看最糟糕的部分:数据和互斥量已经准备好让多个线程进访问之后,当系统中的核心数和处理器数量增加时,很可能看到高竞争,以及一个处理器等待其他处理器的情况。如果在多线程情况下,能更快的对同样级别的数据进行处理,线程就会对数据和互斥量进行竞争。这里有很多这样的情况,很多线程会同时尝试对互斥量进行获取,或者同时访问变量,等等。

互斥量的竞争通常不同于原子操作的竞争,最简单的原因是,互斥量通常使用操作系统级别的序列化线程,而非处理器级别的。如果有足够的线程去执行任务,当有线程在等待互斥量时,操作系统会安排其他线程来执行任务,而处理器只会在其他线程运行在目标处理器上时,让该处理器停止工作。不过,对互斥量的竞争,将会影响这些线程的性能;毕竟,只能让一个线程在同一时间运行。

回顾第3章,一个很少更新的数据结构可以被一个“单作者,多读者”互斥量(详见3.3.2)。乒乓缓存效应可以抵消互斥所带来的收益(工作量不利时),因为所有线程访问数据(即使是读者线程)都会对互斥量进行修改。随着处理器对数据的访问次数增加,对于互斥量的竞争就会增加,并且持有互斥量的缓存行将会在核芯中进行转移,因此会增加不良的锁获取和释放次数。有一些方法可以改善这个问题,其本质就是让互斥量对多行缓存进行保护,不过这样的互斥量需要自己去实现。

如果乒乓缓存是一个糟糕的现象,那么该怎么避免它呢?在本章后面,答案会与提高并发潜能的指导意见相结合:减少两个线程对同一个内存位置的竞争。

虽然,要实现起来并不简单。即使给定内存位置被一个线程所访问,可能还是会有乒乓缓存的存在,是因为另一种叫做伪共享(false sharing)的效应。

8.2.3** 伪共享**

处理器缓存通常不会用来处理在单个存储位置,但其会用来处理称为缓存行(cache lines)的内存块。内存块通常大小为32或64字节,实际大小需要由正在使用着的处理器模型来决定。因为硬件缓存进处理缓存行大小的内存块,较小的数据项就在同一内存行的相邻内存位置上。有时,这样的设定还是挺不错:当线程访问的一组数据是在同一数据行中,对于应用的性能来说就要好于向多个缓存行进行传播。不过,当在同一缓存行存储的是无关数据,且需要被不同线程访问,这就会造成性能问题。

假设你有一个int类型的数组,并且有一组线程可以访问数组中的元素,且对数组的访问很频繁(包括更新)。通常int类型的大小要小于一个缓存行,同一个缓存行中可以存储多个数据项。因此,即使每个线程都能对数据中的成员进行访问,硬件缓存还是会产生乒乓缓存。每当线程访问0号数据项,并对其值进行更新时,缓存行的所有权就需要转移给执行该线程的处理器,这仅是为了让更新1号数据项的线程获取1号线程的所有权。缓存行是共享的(即使没有数据存在),因此使用伪共享来称呼这种方式。这个问题的解决办法就是对数据进行构造,让同一线程访问的数据项存在临近的内存中(就像是放在同一缓存行中),这样那些能被独立线程访问的数据将分布在相距很远的地方,并且可能是存储在不同的缓存行中。在本章接下来的内容中看到,这种思路对代码和数据设计的影响。

如果多线程访问同一内存行是一种糟糕的情况,那么在单线程下的内存布局将会如何带来哪些影响呢?

8.2.4** 如何让数据紧凑?**

伪共享发生的原因:某个线程所要访问的数据过于接近另一线程的数据,另一个是与数据布局相关的陷阱会直接影响单线程的性能。问题在于数据过于接近:当数据能被单线程访问时,那么数据就已经在内存中展开,就像是分布在不同的缓存行上。另一方面,当内存中有能被单线程访问紧凑的数据时,就如同数据分布在同一缓存行上。因此,当数据已传播,那么将会有更多的缓存行将会从处理器的缓存上加载数据,这会增加访问内存的延迟,以及降低数据的系能(与紧凑的数据存储地址相比较)。

同样的,如果数据已传播,在给定缓存行上就即包含于当前线程有关和无关的数据。在极端情况下,当有更多的数据存在于缓存中,你会对数据投以更多的关注,而非这些数据去做了什么。这就会浪费宝贵的缓存空间,增加处理器缓存缺失的情况,即使这个数据项曾经在缓存中存在过,还需要从主存中添加对应数据项到缓存中,因为在缓存中其位置已经被其他数据所占有。

现在,对于单线程代码来说就很关键了,何至于此呢?原因就是任务切换(task switching)。如果系统中的线程数量要比核芯多,每个核上都要运行多个线程。这就会增加缓存的压力,为了避免伪共享,努力让不同线程访问不同缓存行。因此,当处理器切换线程的时候,就要对不同内存行上的数据进行重新加载(当不同线程使用的数据跨越了多个缓存行时),而非对缓存中的数据保持原样(当线程中的数据都在同一缓存行时)。

如果线程数量多于内核或处理器数量,操作系统可能也会选择将一个线程安排给这个核芯一段时间,之后再安排给另一个核芯一段时间。因此就需要将缓存行从一个内核上,转移到另一个内核上;这样的话,就需要转移很多缓存行,也就意味着要耗费很多时间。虽然,操作系统通常避免这样的情况发生,不过当其发生的时候,对性能就会有很大的影响。

当有超级多的线程准备运行时(非等待状态),任务切换问题就会频繁发生。这个问题我们之前也接触过:超额认购(oversubscription)。

8.2.5** 超额认购和频繁的任务切换**

多线程系统中,通常线程的数量要多于处理的数量。不过,线程经常会花费时间来等待外部I/O完成,或被互斥量阻塞,或等待条件变量,等等;所以等待不是问题。应用使用额外的线程来完成有用的工作,而非让线程在处理器处以闲置状态时继续等待。

这也并非长久之计,如果有很多额外线程,就会有很多线程准备执行,而且数量远远大于可用处理器的数量,不过操作系统就会忙于在任务间切换,以确保每个任务都有时间运行。如第1章所见,这将增加切换任务的时间开销,和缓存问题造成同一结果。当无限制的产生新线程,超额认购就会加剧,如第4章的递归快速排序那样;或者在通过任务类型对任务进行划分的时候,线程数量大于处理器数量,这里对性能影响的主要来源是CPU的能力,而非I/O。

如果只是简单的通过数据划分生成多个线程,那可以限定工作线程的数量,如8.1.2节中那样。如果超额认购是对工作的天然划分而产生,那么不同的划分方式对这种问题就没有太多益处了。之前的情况是,需要选择一个合适的划分方案,可能需要对目标平台有着更加详细的了解,不过这也只限于性能已经无法接受,或是某种划分方式已经无法提高性能的时候。

其他因素也会影响多线程代码的性能。即使CPU类型和时钟周期相同,乒乓缓存的开销可以让程序在两个单核处理器和在一个双核处理器上,产生巨大的性能差,不过这只是那些对性能影响可见的因素。接下来,让我们看一下这些因素如何影响代码与数据结构的设计。

8.3为多线程性能设计数据结构****

8.1节中,我们看到了各种划分方法;并且在8.2节,了解了对性能影响的各种因素。如何在设计数据结构的时候,使用这些信息提高多线程代码的性能?这里的问题与第6、7章中的问题不同,之前是关于如何设计能够安全、并发访问的数据结构。在8.2节中,单线程中使用的数据布局就会对性能产生巨大冲击(即使数据并未与其他线程进行共享)。

关键的是,当为多线程性能而设计数据结构的时候,需要考虑竞争(contention),伪共享(false sharing)和数据距离(data proximity)。这三个因素对于性能都有着重大的影响,并且你通常可以改善的是数据布局,或者将赋予其他线程的数据元素进行修改。首先,让我们来看一个轻松方案:线程间划分数组元素。

8.3.1** 为复杂操作划分数组元素**

假设你有一些偏数学计算任务,比如,需要将两个很大的矩阵进行相乘。对于矩阵相乘来说,将第一个矩阵中的首行每个元素和第二个矩阵中首列每个元素相乘后,再相加,从而产生新矩阵中左上角的第一个元素。然后,第二行和第一列,产生新矩阵第一列上的第二个结果,第二行和第二列,产生新矩阵中第二列的第一个结果,以此类推。如图8.3所示,高亮展示的就是在新矩阵中第二行-第三列中的元素产生的过程。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-sUiOUnXT-1657616056594)(file:///C:/Users/sunx/AppData/Local/Temp/msohtmlclip1/01/clip_image006.gif)]

图8.3 矩阵相乘

现在,让我们假设两个矩阵都有上千行和上千列,为了使用多线程来优化矩阵乘法。通常,非稀疏矩阵可以用一个大数组来代表,也就是第二行的元素紧随着第一行的,以此类推。为了完成矩阵乘法,这里就需要三个大数组。为了优化性能,你需要仔细考虑数据访问的模式,特别是向第三个数组中写入的方式。

线程间划分工作是有很多种方式的。假设矩阵的行或列数量大于处理器的数量,可以让每个线程计算出结果矩阵列上的元素,或是行上的元素,亦或计算一个子矩阵。

回顾一下8.2.3和8.2.4节,对于一个数组来说,访问连续的元素是最好的方式,因为这将会减少缓存的使用,并且降低伪共享的概率。如果要让每个线程处理几行,线程需要读取第一个矩阵中的每一个元素,并且读取第二个矩阵上的相关行上的数据,不过这里只需要对列的值进行写入。给定的两个矩阵是以行连续的方式存储,这就意味着当你访问第一个矩阵的第一行的前N个元素,然后是第二行的前N个元素,以此类推(N是列的数量)。其他线程会访问每行的的其他元素;很明显的,应该访问相邻的列,所以从行上读取的N个元素也是连续的,这将最大程度的降低伪共享的几率。当然,如果空间已经被N个元素所占有,且N个元素也就是每个缓存行上具体的存储元素数量,就会让伪共享的情况消失,因为线程将会对独立缓存行上的数据进行操作。

另一方面,当每个线程处理一组行,就需要读取第二个矩阵上的每一个数据,还要读取第一个矩阵中的相关行上的值,不过这里只需要对行上的值进行写入。因为矩阵是以行连续的方式存储,那么现在可以以N行的方式访问所有的元素。如果再次选择相邻行,这就意味着线程现在只能写入N行,这里就有不能被其他线程所访问的连续内存块。那么让线程对每组列进行处理就是一个改进,因为伪共享只可能有在一个内存块的最后几个元素和下一个元素的开始几个上发生,不过具体的时间还要根据目标架构来决定。

第三个选择——将矩阵分成小矩阵块?这可以看作先对列进行划分,再对行进行划分。因此,划分列的时候,同样有伪共享的问题存在。如果你可以选择内存块所拥有行的数量,就可以有效的避免伪共享;将大矩阵划分为小块,对于读取来说是有好处的:就不再需要读取整个源矩阵了。这里,只需要读取目标矩形里面相关行列的值就可以了。具体的来看,考虑1,000行和1,000列的两个矩阵相乘。就会有1百万个元素。如果有100个处理器,这样就可以每次处理10行的数据,也就是10,000个元素。不过,为了计算着10,000个元素,就需要对第二个矩阵中的全部内容进行访问(1百万个元素),再加上10,000个相关行(第一个矩阵)上的元素,大概就要访问1,010,000个元素。另外,硬件能处理100x100的数据块(总共10,000个元素),这就需要对第一个矩阵中的100行进行访问(100x1,000=100,000个元素),还有第二个矩阵中的100列(另外100,000个)。这才只有200,000个元素,就需要五轮读取才能完成。如果这里读取的元素少一些,缓存缺失的情况就会少一些,对于性能来说就好一些。

因此,将矩阵分成小块或正方形的块,要比使用单线程来处理少量的列好的多。当然,可以根据源矩阵的大小和处理器的数量,在运行时对块的大小进行调整。和之前一样,当性能是很重要的指标,就需要对目标架构上的各项指标进行测量。

如果不做矩阵乘法,该如何对上面提到的方案进行应用呢?同样的原理可以应用于任何情况,这种情况就是有很大的数据块需要在线程间进行划分;仔细观察所有数据访问的各个方面,以及确定性能问题产生的原因。各种领域中,出现问题的情况都很相似:改变划分方式就能够提高性能,而不需要对基本算法进行任何修改。

OK,我们已经了解了访问数组是如何对性能产生影响的。那么其他类型的数据结构呢?

8.3.2** 其他数据结构中的数据访问模式**

根本上讲,同样的考虑适用于想要优化数据结构的数据访问模式,就像优化对数组的访问:

· 尝试调整数据在线程间的分布,就能让同一线程中的数据紧密联系在一起。

· 尝试减少线程上所需的数据量。

· 尝试让不同线程访问不同的存储位置,以避免伪共享。

当然,应用于其他数据结构上会比较麻烦。例如,对二叉树划分就要比其他结构困难,有用与没用要取决于树的平衡性,以及需要划分的节点数量。同样,树的的属性决定了其节点会动态的进行分配,并且在不同的地方进行释放。

现在,节点在不同的地方释放倒不是一个严重的问题,不过这就意味着处理器需要在缓存中存储很多东西,这实际上是有好处的。当多线程需要旋转树的时候,就需要对树中的所有节点进行访问,不过当树中的节点只包括指向实际值的指针时,处理器只能从主存中对数据进行加载。如果数据正在被访问线程所修改,这就能避免节点数据,以及树数据结构间的伪共享。

这里就和用一个互斥量来保护数据类似了。假设你有一个简单的类,包含一些数据项和一个用于保护数据的互斥量(在多线程环境下)。如果互斥量和数据项在内存中很接近,对与一个需要获取互斥量的线程来说是很理想的情况;需要的数据可能早已存入处理器的缓存中了,因为在之前为了对互斥量进行修改,已经加载了需要的数据。不过,这还有一个缺点:当其他线程尝试锁住互斥量时(第一个线程还没有是释放),线程就能对对应的数据项进行访问。互斥锁是当做一个“读-改-写”原子操作实现的,对于相同位置的操作都需要先获取互斥量,如果互斥量已锁,那就会调用系统内核。这种“读-改-写”操作,可能会让数据存储在缓存中,让线程获取的互斥量变得毫无作用。从目前互斥量的发展来看,这并不是个问题;线程不会直到互斥量解锁,才接触互斥量。不过,当互斥量共享同一缓存行时,其中存储的是线程已使用的数据,这时拥有互斥量的线程将会遭受到性能打击,因为其他线程也在尝试锁住互斥量。

一种测试伪共享问题的方法是:对大量的数据块填充数据,让不同线程并发的进行访问。比如,你可以使用:

struct protected_data

{

std::mutex m;

char padding[65536]; // 65536字节已经超过一个缓存行的数量级

my_data data_to_protect;

};

用来测试互斥量竞争或

struct my_data

{

data_item1 d1;

data_item2 d2;

char padding[65536];

};

my_data some_array[256];

用来测试数组数据中的伪共享。如果这样能够提高性能,你就能知道伪共享在这里的确存在。

当然,在设计并发的时候有更多的数据访问模式需要考虑,现在让我们一起来看一些附加的注意事项。

8.4设计并发代码的注意事项****

目前为止,在本章中我们已经看到了很多线程间划分工作的方法,影响性能的因素,以及这些因素是如何影响你选择数据访问模式和数据结构的。虽然,已经有了很多设计并发代码的内容。你还需要考虑很多事情,比如异常安全和可扩展性。随着系统中核数的增加,性能越来越高(无论是在减少执行时间,还是增加吞吐率),这样的代码称为“可扩展”代码。理想状态下,性能随着核数的增加线性增长,也就是当系统有100个处理器时,其性能是系统只有1核时的100倍。

虽然,非扩展性代码依旧可以正常工作——单线程应用就无法扩展——例如,异常安全是一个正确性问题。如果你的代码不是异常安全的,最终会破坏不变量,或是造成条件竞争,亦或是你的应用意外终止,因为某个操作会抛出异常。有了这个想法,我们就率先来看一下异常安全的问题。

8.4.1** 并行算法中的异常安全**

异常安全是衡量C++代码一个很重要的指标,并发代码也不例外。实际上,相较于串行算法,并行算法常会格外要求注意异常问题。当一个操作在串行算法中抛出一个异常,算法只需要考虑对其本身进行处理,以避免资源泄露和损坏不变量;这里可以允许异常传递给调用者,由调用者对异常进行处理。通过对比,在并行算法中很多操作要运行在独立的线程上。在这种情况下,异常就不再允许被传播,因为这将会使调用堆栈出现问题。如果一个函数在创建一个新线程后带着异常退出,那么这个应用将会终止。

作为一个具体的例子,让我们回顾一下清单2.8中的parallel_accumulate函数:

清单8.2 std::accumulate的原始并行版本(源于清单2.8)

template

struct accumulate_block

{

void operator()(Iteratorfirst,Iterator last,T& result)

{

result=std::accumulate(first,last,result); // 1

}

};

template

T parallel_accumulate(Iterator first,Iterator last,T init)

{

unsigned long constlength=std::distance(first,last); // 2

if(!length)

return init;

unsigned long constmin_per_thread=25;

unsigned long constmax_threads=(length+min_per_thread-1)/min_per_thread;

unsigned long consthardware_threads=std::thread::hardware_concurrency();

unsigned long constnum_threads=std::min(hardware_threads!=0?hardware_threads:2,max_threads);

unsigned long constblock_size=length/num_threads;

std::vectorresults(num_threads); //** 3**

std::vectorstd::threadthreads(num_threads-1); // 4

Iterator block_start=first; // 5

for(unsigned longi=0;i<(num_threads-1);++i)

{

Iteratorblock_end=block_start;  // **6**

std::advance(block_end,block_size);

threads[i]=std::thread(accumulate_block(),block_start,block_end,std::ref(results[i]));//**7**

block_start=block_end;  // **8**

}

accumulate_block()(block_start,last,results[num_threads-1]); // 9

std::for_each(threads.begin(),threads.end(),std::mem_fn(&std::thread::join));

returnstd::accumulate(results.begin(),results.end(),init); // 10

}

现在让我们来看一下异常要在哪抛出:基本上就是在调用函数的地方抛出异常,或在用户定义类型上执行某个操作时可能抛出异常。

首先,需要调用distance②,其会对用户定义的迭代器类型进行操作。因为,这时还没有做任何事情,所以对于调用线程来说,所有事情都没问题。接下来,就需要分配results③和threads④。再后,调用线程依旧没有做任何事情,或产生新的线程,所以到这里也是没有问题的。当然,如果在构造threads抛出异常,那么对已经分配的results将会被清理,析构函数会帮你打理好一切。

跳过block_start⑤的初始化(因为也是安全的),来到了产生新线程的循环⑥⑦⑧。当在⑦处创建了第一个线程,如果再抛出异常,就会出问题的;对于新的std::thread对象将会销毁,程序将调用std::terminate来中断程序的运行。使用std::terminate的地方,可不是什么好地方。

accumulate_block⑨的调用就可能抛出异常,就会产生和上面类似的结果;线程对象将会被销毁,并且调用std::terminate。另一方面,最终调用std::accumulate⑩可能会抛出异常,不过处理起来没什么难度,因为所有的线程在这里已经汇聚回主线程了。

上面只是对于主线程来说的,不过还有很多地方会抛出异常:对于调用accumulate_block的新线程来说就会抛出异常①。没有任何catch块,所以这个异常不会被处理,并且当异常发生的时候会调用std::terminater()来终止应用的运行。

也许这里的异常问题并不明显,不过这段代码是非异常安全的。

添加异常安全

好吧,我们已经确定所有抛出异常的地方了,并且知道异常所带来的恶性后果。能为其做些什么呢?就让我们来解决一下在新线程上的异常问题。

在第4章时已经使用过工具来做这件事。如果你仔细的了解过新线程用来完成什么样的工作,要返回一个计算的结果的同时,允许代码产生异常。这可以将std::packaged_task和std::future相结合,来解决这个问题。如果使用std::packaged_task重新构造代码,代码可能会是如下模样。

清单8.3 使用std::packaged_task的并行std::accumulate

template

struct accumulate_block

{

T operator()(Iteratorfirst,Iterator last) // 1

{

returnstd::accumulate(first,last,T());  // **2**

}

};

template

T parallel_accumulate(Iterator first,Iterator last,T init)

{

unsigned long constlength=std::distance(first,last);

if(!length)

return init;

unsigned long constmin_per_thread=25;

unsigned long constmax_threads=(length+min_per_thread-1)/min_per_thread;

unsigned long consthardware_threads=std::thread::hardware_concurrency();

unsigned long constnum_threads=std::min(hardware_threads!=0?hardware_threads:2,max_threads);

unsigned long constblock_size=length/num_threads;

std::vector futures(num_threads-1); // 3

std::vectorstd::threadthreads(num_threads-1);

Iterator block_start=first;

for(unsigned longi=0;i<(num_threads-1);++i)

{

Iterator block_end=block_start;

std::advance(block_end,block_size);

std::packaged_task task(accumulate_block());//4

futures[i]=task.get_future(); // 5

threads[i]=std::thread(std::move(task),block_start,block_end); //** 6**

block_start=block_end;

}

Tlast_result=accumulate_block()(block_start,last); //** 7**

std::for_each(threads.begin(),threads.end(),std::mem_fn(&std::thread::join));

T result=init; // 8

for(unsigned longi=0;i<(num_threads-1);++i)

{

result+=futures[i].get();  //** 9**

}

result += last_result; // 10

return result;

}

第一个修改就是调用accumulate_block的操作现在就是直接将结果返回,而非使用引用将结果存储在某个地方①。使用std::packaged_task和std::future是线程安全的,所以你可以使用它们来对结果进行转移。当调用std::accumulate②时,需要你显示传入T的默认构造函数,而非复用result的值,不过这只是一个小改动。

下一个改动就是,不用向量来存储结果,而使用futures向量为每个新生线程存储std::future③。在新线程生成循环中,首先要为accumulate_block创建一个任务④。std::packaged_task声明,需要操作的两个Iterators和一个想要获取的T。然后,从任务中获取future⑤,再将需要处理的数据块的开始和结束信息传入⑥,让新线程去执行这个任务。当任务执行时,future将会获取对应的结果,以及任何抛出的异常。

使用future,就不能获得到一组结果数组,所以需要将最终数据块的结果赋给一个变量进行保存⑦,而非对一个数组进行填槽。同样,因为需要从future中获取结果,使用简单的for循环,就要比使用std::accumulate好的多;循环从提供的初始值开始⑧,并且将每个future上的值进行累加⑨。如果相关任务抛出一个异常,那么异常就会被future捕捉到,并且使用get()的时候获取数据时,这个异常会再次抛出。最后,在返回结果给调用者之前,将最后一个数据块上的结果添加入结果中⑩。

这样,一个问题就已经解决:在工作线程上抛出的异常,可以在主线程上抛出。如果不止一个工作线程抛出异常,那么只有一个能在主线程中抛出,不过这不会有产生太大的问题。如果这个问题很重要,你可以使用类似std::nested_exception来对所有抛出的异常进行捕捉。

剩下的问题就是,当生成第一个新线程和当所有线程都汇入主线程时,抛出异常;这样会让线程产生泄露。最简单的方法就是捕获所有抛出的线程,汇入的线程依旧是joinable()的,并且会再次抛出异常:

try

{

for(unsigned longi=0;i<(num_threads-1);++i)

{

// ... as before

}

Tlast_result=accumulate_block()(block_start,last);

std::for_each(threads.begin(),threads.end(),

std::mem_fn(&std::thread::join));

}

catch(…)

{

for(unsigned longi=0;i<(num_thread-1);++i)

{

if(threads[i].joinable())

thread[i].join();

}

throw;

}

现在好了,无论线程如何离开这段代码,所有线程都可以被汇入。不过,try-catch很不美观,并且这里有重复代码。可以将“正常”控制流上的线程在catch块上执行的线程进行汇入。重复代码是没有必要的,因为这就意味着更多的地方需要改变。不过,现在让我们来提取一个对象的析构函数;毕竟,析构函数是C++中处理资源的惯用方式。看一下你的类:

class join_threads

{

std::vectorstd::thread& threads;

public:

explicitjoin_threads(std::vectorstd::thread& threads_):threads(threads_){}

~join_threads()

{

for(unsigned longi=0;i

}

};

这个类和在清单2.3中看到的thread_guard类很相似,除了使用向量的方式来扩展线程量。用这个类简化后的代码如下所示:

清单8.4 异常安全版std::accumulate

template

T parallel_accumulate(Iterator first,Iterator last,T init)

{

unsigned long constlength=std::distance(first,last);

if(!length)

return init;

unsigned long constmin_per_thread=25;

unsigned long constmax_threads=(length+min_per_thread-1)/min_per_thread;

unsigned long consthardware_threads=std::thread::hardware_concurrency();

unsigned long constnum_threads=std::min(hardware_threads!=0?hardware_threads:2,max_threads);

unsigned long constblock_size=length/num_threads;

std::vector futures(num_threads-1);

std::vectorstd::threadthreads(num_threads-1);

join_threads joiner(threads); //** 1**

Iterator block_start=first;

for(unsigned longi=0;i<(num_threads-1);++i)

{

Iterator block_end=block_start;

std::advance(block_end,block_size);

std::packaged_tasktask(accumulate_block());

futures[i]=task.get_future();

threads[i]=std::thread(std::move(task),block_start,block_end);

block_start=block_end;

}

T last_result=accumulate_block()(block_start,last);

T result=init;

for(unsigned longi=0;i<(num_threads-1);++i)

{

result+=futures[i].get();  //** 2**

}

result += last_result;

return result;

}

当创建了线程容器,就对新类型创建了一个实例①,可让退出线程进行汇入。然后,可以再显式的汇入循环中将线程删除,在原理上来说是安全的:因为线程,无论怎么样退出,都需要汇入主线程。注意这里对futures[i].get()②的调用,将会阻塞线程,直到结果准备就绪,所以这里不需要显式的将线程进行汇入。和清单8.2中的原始代码不同:原始代码中,你需要将线程汇入,以确保results向量被正确填充。不仅需要异常安全的代码,还需要较短的函数实现,因为这里已经将汇入部分的代码放到新(可复用)类型中去了。

**std::async()**的异常安全

现在,你已经了解了,当需要显式管理线程的时候,需要代码是异常安全的。那现在让我们来看一下使用std::async()是怎么样完成异常安全的。在本例中,标准库对线程进行了较好的管理,并且当“期望”处以就绪状态的时候,就能生成一个新的线程。对于异常安全,还需要注意一件事,如果在没有等待的情况下对“期望”实例进行销毁,析构函数会等待对应线程执行完毕后才执行。这就能桥面的必过线程泄露的问题,因为线程还在执行,且持有数据的引用。下面的代码将展示使用std::async()完成异常安全的实现。

清单8.5 异常安全并行版std::accumulate——使用std::async()

template

T parallel_accumulate(Iterator first,Iterator last,T init)

{

unsigned long constlength=std::distance(first,last); // 1

unsigned long constmax_chunk_size=25;

if(length<=max_chunk_size)

{

returnstd::accumulate(first,last,init);  //** 2**

}

else

{

Iterator mid_point=first;

std::advance(mid_point,length

锐单商城拥有海量元器件数据手册IC替代型号,打造电子元器件IC百科大全!

相关文章