管理线程
1. 基本线程管理
每个C++程序都拥有至少一个线程,它是由C++在运行时启动的,该进程运行着main()函数。你的程序可以继续启动具有其他函数作为入口的线程。然后,这些线程连同初始线程一起,并发运行。正如程序会在main()函数返回时退出那样,当指定的入口函数返回时,该线程就会退出。如你所见,如果你有线程的std::thread对象,你就可以等待它完成。但首先你得启动它,所以让我们来看一看如何启动线程。
1.1 启动线程
线程是通过构造std::thread对象来开始的,该对象指定了线程上要运行的任务。在最简单的情况下,该任务仅仅是一个普通的返回void且不接受参数的函数。这个函数在自己的线程上运行,直到返回,然后线程停止。但从另一个极端看,该任务可能是一个接受额外参数的函数对象,并且需要执行一系列相互独立的操作,运行这些独立的操作是通过某种消息机制所指定的;同样的,当线程需要停止,线程也是通过某种消息机制接收到信号时才会停止。无论线程将要做什么或是从哪里启动,使用C++线程库来开始一个线程总归是要构造一个std::thread对象:
- void do_some_work();
- std::thread my_thread(do_some_work);
就是这么简单。当然,你必须确保引入了<thread>头文件,从而编译器可以找到std::thread类的定义。与许多C++标准库相似,std::thread可以与任何可调用(callable)类型一同工作,所以你可以将一个带有函数调用操作符的类的实例传递给std::thread的构造函数来进行代替。
- class background_task
- {
- public:
- void operator()() const
- {
- std::cout << "Hello Concurrent World\n";
- }
- };
- int main()
- {
- background_task f;
- std::thread my_thread(f);
- my_thread.join();
- }
在这种情况下,所提供的函数对象被复制(copied)到属于新创建的执行线程的存储器中,并从那里调用。因此重要的是,副本与原版有着等效的行为,否则结果可能与预期不符。
当给线程构造函数传递一个函数对象时要考虑的一件事是避免所谓的“C++的最棘手的解析”。如果你传递一个临时的且未命名的变量,那么其语法可能与函数声明一样,在这种情况下,编译器会将其解释成如下这样,而非对象定义。例如,
- std::thread my_thread(background_task());
hanhan笔记:在自己的当前编译环境下提示警告——未调用原型函数(是否是有意用变量定义的?)。
声明了函数my_thread,它接受单个参数(参数类型是指向不接受参数同时返回background_task对象的函数的指针),并返回std::thread对象,而不是启动一个新线程。你可以像前面说的那样通过命名函数对象来避免这种情况,通过使用一组额外的括号,或使用新的统一初始化语法,例如:
- std::thread my_thread((background_task()));
- std::thread my_thread{ background_task() };
在第一个例子中,额外的括号避免其解释成函数声明,从而让my_thread被声明为std::thread类型的变量。第二个例子使用新的统一初始化语法,用大括号而不是括号,同样也是声明一个变量。
有一种避免了此问题的可调用对象类型,就是lambda表达式(lambda expression)。这是C++11中的一项新功能,其基本功能是允许你编写一个局部函数,并可能捕捉一些局部变量,同时避免传递额外参数的需求。前面的例子可以用lambda表达式编写如下。
- std::thread my_thread([] {
- std::cout << "Hello Concurrent World\n";
- });
一旦开始了线程,你需要显式地决定是要等待它完成(通过结合它),还是让它自行运行(通过分离它)。如果你在std::thread对象被销毁前未作决定,那么你的程序会被终止(std::thread的析构函数调用std::terminate())。因此,即便在异常存在的情况下,确保线程正确地结合或是分离都是你的当务之急。需要注意的是,你只需要在std::thread对象被销毁之前做出这个决定即可——线程本身可能在你结合或分离它之前早就已经结束了,而且如果你分离它,那么该线程可能在std::thread对象被销毁后很久都还在运行。
如果你不等待线程完成,那么你需要确保通过该线程访问的数据是有效的,直到该线程完成为止。这并不是新的问题——即便是在单线程代码中,在对象被销毁后还访问它也是未定义的行为——但线程的使用提供了遇到这种生命周期的额外机会。
你可能遇到这样问题的一种情况是,当线程函数持有局部变量的指针或引用,且当函数退出的时候线程尚未完成时,下面代码清单展示的就是这样一个场景的例子:
- struct func
- {
- int& i;
- func(int& i_) :i(i_) {};
- void operator()()
- {
- for (unsigned int j = 0; j < 1000000; j++)
- {
- std::cout << i << std::endl;
- }
- }
- };
- void oops()
- {
- int some_local_state = 0;
- func my_func(some_local_state);
- std::thread my_thread(my_func);
- my_thread.detach();
- }
hanhan笔记:以下代码是自己实验的触发访问悬空引用。可以发现打印的i值都是错误的。将上述代码中的detach()改为join()则打印正确。
- /*
- * 打乱一下栈值
- */
- void oops1()
- {
- int a = 1000;
- }
- int main()
- {
- oops();
- oops1(); // 打乱一下栈值
- while (1);
- }
在这种情况下,当oops退出时与my_thread相关联的新线程可能仍在运行,因为通过调用detach()你可以显式地决定不等待它。如果线程仍在运行,则在下次使用变量i进行相关操作时就会访问一个已被销毁的变量。这就像普通的单线程代码那样——允许对局部变量的指针或引用持续到函数退出之后绝不是一个好主意——但是对于多线程代码更容易犯这样的错误,因为当它发生的时候,并不一定是显而易见的。
一个常见的处理这种情况的方式是使线程函数自包含、并且把数据复制到该线程中而不是共享数据。如果你为线程函数使用了一个可调用对象,该对象本身被复制到该线程中,那么原始对象就可以立即被销毁。但是你仍然需要警惕包含有指针或者引用的对象,就像上面的代码那样。特别地,在一个访问局部变量的函数中创建线程是个糟糕的注意,除非能保证线程在函数退出前完成。
另外,通过结合(joining)线程,你可以确保在函数退出前,该线程执行完毕。
1.2 等待线程完成
如果你需要等待线程完成,你可以通过在相关联的std::thread实例上调用join()来实现。在上述代码清单的情况下,把函数体右括号前对my_thread.detach()的调用替换为调用my_thread.join(),就将足以确保函数退出之前,即局部变量被销毁之前,该线程就已结束。在这种情况下,就意味着在独立的线程上运行上述这个函数是没什么意义的,因为第一个线程在此期间将做不了任何有用的事情,但在实际的代码当中,初始化线程可能要么有自己的工作要做,要不是在等待所有线程完成之前就要启动多个线程来做有用的事情。
join()很简单也很暴力——你要么等待一个线程完成要么就不等。如果你需要对等待线程进行更细粒度的控制,比如检查线程是否完成,或只是在一段特定的时间内进行等待,那么就必须使用替代机制,例如条件变量和future。调用join()的行为也会清理所有与该线程相关联的存储器,这样std::thread对象不再与现已完成的线程相关联,它也不与任何线程相关联。这就意味着,你只能对一个给定的线程调用一次join(),一旦你调用了join(),此std::thread对象不再是可连接的,并且joinable()将返回false。
1.3 在异常环境下的等待
如前所示,你要确保在std::thread对象被销毁前已调用join()或detach()函数。如果要分离线程,通常在线程启动之后就可以立即调用detach(),所以这不是个问题。但是如果打算等待该线程,就需要仔细地选择在代码的哪个位置调用join()。这意味着,如果在线程开始之后但又是在调用join()之前引发了异常,对join()的调用就容易被跳过。
为了避免应用程序在引发异常的时候被终止,你需要在这种情况决定要做什么。一般来说,如果你打算在非异常的情况下调用join(),你还需要在存在异常时调用join(),以避免意外的生命周期问题。下列清单展示了这样的简单代码。
- struct func;
- void f()
- {
- int some_local_state = 0;
- func my_func(some_local_state);
- std::thread t(my_func);
- try
- {
- do_something_in_current_thread();
- }
- catch (...)
- {
- t.join();
- throw;
- }
- t.join();
- }
上述清单的代码使用了try/catch块,以确保访问局部状态的线程在函数退出之前结束,无论函数是正常退出还是异常中断。使用try/catch块很啰嗦,而且容易将作用域弄乱,所以并不是一个理想的方案。如果确保线程必须在函数退出之前完成是很重要的——无论是因为它具有对其他局部变量的引用还是任何其他原因——那么确保这是所有可能的退出路径的情况是很重要的,无论正常还是异常,并且希望提高一个这样做的简单明了的机制。
这样做的方法之一是使用标准的资源获取即初始化(RAII)惯用语法,并提供一个类,在它的析构函数中进行join(),正如下清单的代码。看看它是如何简化函数f()的。
- class thread_guard
- {
- std::thread& t;
- public:
- explicit thread_guard(std::thread& t_) :
- t(t_) {};
- ~thread_guard()
- {
- if (t.joinable())
- {
- t.join();
- }
- }
- thread_guard(thread_guard const&)=delete;
- thread_guard& operator=(thread_guard const&)=delete;
- };
- struct func;
- void f()
- {
- int some_local_state = 0;
- func my_func(some_local_state);
- std::thread t(my_func);
- thread_guard g(t);
- do_something_in_current_thread();
- }
hanhan笔记:无论正常或者异常,都会调用局部变量的析构函数?好像标准库里也有一个thread_guard,应该原理类似。
在当前线程的执行到达f末尾时,局部对象会按照构造函数的逆序被销毁。因此,thread_guard对象g首先被销毁,并且析构函数中线程被结合。即便是当函数因do_something_in_current_thread()引发异常而退出的情况下也会发生。
在上述代码清单中,在调用join()之前,thread_guard的析构函数首先测试std::thread对象是否是joinable()的。这很重要,因为对于一个给定的执行线程join()只能被调用一次,所以如果线程已经被结合,这样做就是错误的。
拷贝构造函数和拷贝赋值运算符被标记为=delete,以确保它们不会由编译器自动提供。复制或赋值这样一个对象可能是危险的,因为它可能比它要结合的线程的作用域存在得更久。通过将它们得声明为已删除,任何复制thread_guard对象的企图都将产生编译错误。
如果无需等待线程完成,可以通过分离(detaching)它来避免这种异常安全错误。这打破了线程与thread_guard对象的联系并确保当std::thread对象被销毁时std::terminate()不会被调用,即使线程仍在后台运行。
1.4 在后台运行线程
在std::thread对象上调用detach()会把线程丢到后台运行,也没有直接的方法与之通信。也不再可能等待该线程完成;如果一个线程成为分离的,获取一个引用它的std::thread对象也是不可能的,所以它也不再能够被结合。分离的线程确实是在后台运行;所有权和控制权被转交给C++运行时库,以确保与线程相关联的资源在线程退出后能够被正确地回收。
参照UNIX的守护进程(daemon process)概念,被分离的线程通常被称为守护线程(daemon threads),它们无需任何显式的用户界面,而运行在后台。这样的线程通常是长时间运行的,它们可能在应用程序的几乎整个生命周期中都在运行,执行后台任务,例如监控文件系统、清除对象缓冲中的未使用项或是优化数据结构。在另一个极端,有另一种鉴别线程何时完成的机制,或者线程被用作“即用即忘”任务,在这里使用分离线程也是有意义的。
通过调用std::thread对象的detach()的成员函数来分离线程。在调用完成后,std::thread对象不再与执行的实际线程相关联,同时也不能够被结合。
- std::thread t(do_background_work);
- t.detach();
- assert(!t.joinable());
为了从一个std::thread对象中分离线程,必须有一个线程供分离:你不能在一个没有与执行线程相关联的std::thread对象上调用detach()。这对于join()也是同样的要求,你可以用完全相同的方法进行检查——你只能在t.joinable()返回true的时候,为一个std::thread对象t调用t.detach()。
考虑一个类似于字处理器的应用程序,它可以一次编辑多个文档。有许多种方法在UI级别和内部来处理这个问题。有一种现在看起来越来越普遍的方式,是具有多个相互独立的顶层窗口,与正在编辑的文档一一对应。尽管这些窗口看起来完全独立,各自拥有自己的菜单等,但它们是在同一个应用程序的实例上运行的。一种在内部处理这个问题的方式是在其自己的线程中运行各自的文档编辑窗口;每个线程都运行相同的代码,但拥有与被编辑文档相关的不同的数据以及相应的窗口属性。打开一个新的文档就需要启动一个新的线程。处理请求的线程并不在乎等待其他的线程完成,因为它在一个不相关的文件上工作,所以运行分离的线程就成为了首选。
以下清单展示了这种方法的简单的代码大纲。
- void edit_document(std::string const& filename)
- {
- open_document_and_display_gui(filename);
- while (!done_edit())
- {
- user_command cmd = get_user_input();
- if (cmd.type == open_new_document)
- {
- std::string const new_name = get_filename_from_user();
- std::thread t(edit_document, new_name);
- t.detach();
- }
- else
- {
- process_user_input(cmd);
- }
- }
- }
如果用户选择打开一个新的文档,它会提示其有文档要打开,启动新线程来打开该文档,然后分离它。因为新的线程与当前线程做着同样的操作,只是文件不同,你可以用新选定的文件名作为参数,重用同一个函数(edit_document)。
这个例子还展示了一个案例,它有助于传递参数给用来启动线程的函数:并非仅仅将函数名传递给std::thread构造函数,你可以传递文件名参数。虽然也有其他机制能够做到这一点,例如使用具有成员数据的函数对象取代普通的带有参数的函数,但线程库提供了一个简单方法来实现它。
2. 传递参数给线程函数
如上述代码清单所示,传递参数给可调用对象或函数,基本上就是简单地将额外的参数传递给std::thread构造函数。但重要的是,参数会以默认的方式被复制(copied)到内部存储空间,在那里新创建的执行线程可以访问它们,即便函数中的相应参数期待着引用。这里有一个简单的例子。
- void f(int i, std::string const& s);
- std::thread t(f, 3, "hello");
这里创建一个新的与t相关联的执行线程,称为f(3,"hello")。注意即使f接受std::string作为第二参数,字符串字面值仅在新线程的上下文中才会作为char const*传送,并转化为std::string。尤其重要的是当提供的参数是一个自动变量的指针时。如下所示:
- void f(int i, std::string const& s);
- void oops(int some_param)
- {
- char buffer[1024];
- sprintf(buffer, "%i", some_param);
- std::thread t(f, 3, buffer);
- t.detach();
- }
hanhan笔记:自己做了以下实验,oops()函数后紧跟着oops1()函数就会出错,如果之间添加多条打印语句拖延栈内容变更的时间,打印的内容就是正确的。
结合文中的内容,std::thread的构造函数应该是“泛型”的,指定的参数会传入新的线程上下文。新的线程启动后应该有个初始化操作,初始化完成之后才会调用传入的指定的函数。由于传入的参数是指针,所以可能在初始化时、调用线程执行函数之前,指向的内容就悬空了,所以发生了错误。
而把传入的参数改为std::string,则传入新的线程上下文时,会发生一次“深拷贝”操作,这样就和原来的内容没有关系了,所以不会发生错误。
- int main()
- {
- oops(1);
- printf("hello");
- // printf("hello");
- // printf("hello");
- oops1(); /* 更改栈内容 */
- while (1);
- }
在这种情况下,正是局部变量buffer的指针被传递到新线程,还有一个重要的时机,即函数oops会在buffer指针在新线程上被转化为std::string之前就退出了,从而导致未定义的行为。解决之道是在将缓冲传递给std::thread的构造函数之前转换为std::string。
- void f(int i, std::string const& s);
- void oops(int some_param)
- {
- char buffer[1024];
- sprintf(buffer, "%i", some_param);
- std::thread t(f, 3, std::string(buffer) );
- t.detach();
- }
在这种情况下,问题就出在你依赖从缓冲区的指针到函数所期待的std::string对象的隐式转化,因为std::thread构造函数原样复制了所提供的值,并未转换为期待的参数类型。
也有可能得到相反的情况,对象被复制,而你想要的是引用。这可能发生在当线程正在更新一个通过引用传递来的数据结构时,例如,
- void update_data_for_widget(widget_id w, widget_data& data);
- void oops_again(widget_id w)
- {
- widget_data data;
- std::thread t(update_data_for_widget, w, data);
- display_status();
- t.join();
- process_widget_data(data);
- }
hanhan笔记:以上代码传递data参数的方式,在目前自己本地的编译器上已经不能通过了。
尽管update_data_for_widget希望通过引用传递第二个参数,std::thread的构造函数却并不知道;它无视函数所期待的类型,并且盲目地复制了所提供的值。当它调用update_data_for_widget时,它最后将传递data在内部的副本的引用而非对data自身的引用。于是,当线程完成时,随着所提供参数的内部副本的销毁,这些改动都将舍弃,将会传递一个未改变的data,而非正确更新的版本给update_data_for_widget。对于熟悉std::bind的人来说,解决方案也是显而易见的,你需要用std::ref来包装确实需要被引用的参数。在这种情况下,如果你将对线程的调用改为
- std::thread t(update_data_for_widget, w, std::ref(data) );
那么update_data_for_widget将被正确地传入data的引用,而非data副本(copy)的引用。
如果你熟悉std::bind,那么参数传递语义就不足为奇,因为std::thread构造函数和std::bind的操作都是依据相同的机制定义的。这意味着,例如,你可以传递一个成员函数的指针作为函数,前提是提供一个合适的对象指针作为第一个参数。
- class X
- {
- public:
- void do_lengthy_work();
- };
- X my_x;
- std::thread t(&X::do_lengthy_work, &my_x);
hanhan笔记:这种方式也比较好理解,内部成员函数的实现,第一个参数就是this指针。
这段代码将在新线程上调用my_x.do_lengthy_work(),因为my_x的地址是作为对象指针提供的。你也可以提供参数给这样的成员函数调用:std::thread构造函数的第三个参数将作为成员函数的第一个参数等等。
提供参数的另一个有趣的场景是,这里的参数不能被复制但只能被移动(moved):一个对象内保存的数据被转移到另一个对象,使原来的对象变成“空壳”。这种类型的一个例子是std::unique_ptr,它提供了动态分配对象的自动内存管理。只有一个std::unique_ptr实例可以在某一时刻指向一个给定的对象,当改实例被销毁时,其指向的对象将被删除。移动构造函数(move constructor)和移动赋值运算符(move assignment operator)允许一个对象的所有权在std::unique_ptr实例之间进行转移。这种转移给源对象留下一个NULL指针。这种值的移动使得该类型的对象可以用作函数的参数被接受,也可以用作函数的返回值被返回。如果源对象是临时变量,这种移动是自动的,但如果源对象是一个命名值,转移必须通过调用std::move()来请求。下面的示例展示了运用std::move()将动态对象的所有权转移到一个线程中。
- void process_big_object(std::unique_ptr<big_object>);
- std::unique_ptr<big_object> p(new big_object);
- p->prepare_data(42);
- std::thread t(process_big_object, std::move(p) );
通过在std::thread构造函数中指定std::move(p),big_object的所有权先被转移进新创建的线程的内部存储中,然后进入process_big_object。
标准线程库中的一些类表现出与std::unique_ptr相同的所有权语义,std::thread就是其中之一。虽然std::thread实例并不拥有与std::unique_ptr同样方式的动态对象,但他们却拥有资源,每一个实例负责管理一个执行线程。这种所有权可以在实例之间进行转移,因为std::thread的实例是可移动的(moveable),即使它们不是可复制的(copyable)。这确保了在允许程序员选择在对象之间转换所有权的时候,在任意时刻只有一个对象与某个特定的执行线程相关联。
3. 转移线程的所有权
假设你想要编写一个函数,它创建一个在后台运行的线程,但是向调用函数回传新线程的所有权,而非等待其完成,又或者你想要反过来做,创建一个线程,并将所有权传递给要等待它完成的函数。在任意一种情况下,你都需要将所有权从一个地方转移到另一个地方。
这就是std::thread支持移动的由来。正如在上一节所描述的,在C++标准库里许多拥有资源的类型,如std::ifstream和std::unique_ptr是可移动的,而非可复制的,并且std::thread就是其中之一。这意味着一个特定执行线程的所有权在std::thread实例之间移动,如同接下来的例子。该示例展示了创建两个执行线程,以及在三个std::thread示例t1、t2、t3之间对那些线程的所有权进行转移。
- void some_fumction();
- void some_other_function();
- std::thread t1(some_fumction);
- std::thread t2 = std::move(t1);
- t1 = std::thread(some_other_function);
- std::thread t3;
- t3 = std::move(t2);
- t1 = std::move(t3);
首先,启动一个新线程并与t1相关联。然后当t2构建完成时所有权被转移给t2,通过调用std::move()来显式地转移所有权。此刻,t1不再拥有相关联的执行线程,运行some_fumction的线程现在与t2相关联。
然后,启动一个新的线程并与临时的std::thread对象相关联。接下来将所有权转移到t1中,是不需要调用std::move()来显式移动所有权的,因为此处所有者是一个临时对象——从临时对象中进行移动是自动和隐式的。
t3是默认构造的,这意味着它的创建没有任何相关联的执行线程。当前与t2相关联的线程的所有权转移到t3,再次通过显式调用std::move(),因为t2是一个命名对象。在所有这些移动之后,t1与运行some_other_function的线程相关联,t2没有相关联的线程,t3与运行some_fumction的线程相关联。
最后一次移动将运行some_fumction的线程的所有权转回给t1。但是在这种情况下已经有了一个相关联的线程(运行着some_other_fumction),所以会调用std::terminate()来终止程序。这样做是为了与std::thread的析构函数保持一致。你必须在析构前显式地等待线程完成或是分离,这同样适用于赋值:你不能仅仅通过向管理一个线程的std::thread对象赋值一个新的值来“舍弃”一个线程。
std::thread支持移动意味着所有权可以很容易地从一个函数中被转移出,如下代码清单所示。
- std::thread f()
- {
- void some_fumction();
- return std::thread(some_fumction);
- }
- std::thread g()
- {
- void some_other_function(int);
- std::thread t(some_other_function, 42);
- return t;
- }
同样的,如果要把所有权移到函数中,它只能以值的形式接受std::thread的实例作为其中一个参数,如下所示。
- void f(std::thread t);
- void g()
- {
- void some_fumction();
- f(std::thread(some_fumction) );
- std::thread t(some_fumction);
- f(std::move(t) );
- }
std::thread支持移动的好处之一,就是你可以在thread_guard类的基础上,同时使它实际上获得线程的所有权。这可以避免thread_guard对象在引用它的线程结束后继续存在所造成的不良影响,同时也意味着一旦所有权转移到了该对象,那么其他对象都不可以结合或分离该线程。因为这只要是为了确保在退出一个作用域之前线程都已完成,我把这个类称为scoped_thread。其实现如下代码清单所示,同时附带一个简单的示例。
- class scoped_thread
- {
- std::thread t;
- public:
- explicit scoped_thread(std::thread t_) :
- t(std::move(t_))
- {
- if (!t.joinable())
- throw std::logic_error("No thread");
- }
- ~scoped_thread()
- {
- t.join();
- }
- scoped_thread(scoped_thread const&)=delete;
- scoped_thread& operator=(scoped_thread const&)=delete;
- };
- void func(int);
- void f()
- {
- int some_local_state;
- scoped_thread t(std::thread(func, some_local_state) );
- }
这个例子和之前类似,但是新线程被直接传递到scoped_thread,而不是为它创建一个单独的命名空间。当初始线程到达f的结尾时,scoped_thread对象被销毁,然后结合提供给构造函数的线程。使用之前的thread_guard类,析构函数必须检查线程是不是仍然可结合,你可以在构造函数中来做,如果不是则引发异常。
std::thread对移动的支持同样考虑了std::thread对象的容器,如果那些容器是移动感知的(如更新后的std::vector<>。这意味着你可以编写像如下清单中的代码,生成一批线程,然后等待它们完成。
- void do_work(unsigned int id);
- void f()
- {
- std::vector<std::thread> threads;
- for (int i = 0; i < 20; i++)
- {
- threads.push_back(std::thread(do_work, i) );
- }
- std::for_each(threads.begin(), threads.end(),
- std::mem_fn(&std::thread::join) );
- }
如果线程是被用来细分某种算法的工作,这往往正是所需的。在返回调用者之前,所有线程必须全都完成。当然,以上简单结构意味着由线程所做的工作是自包含的,同时它们的操作的结果纯粹是共享数据的副作用。如果f()向调用者返回一个依赖于这些线程的操作结果的值,那么正如所写的这样,该返回值就得通过检查线程终止后得共享数据来决定。在线程间转移操作结果的替代方案将在第4章中讨论。
将std::thread对象放在std::vector中是线程迈向自动管理的一步。与其为那些线程创建独立的变量并直接与之结合,不如将它们视为群组。你可以进一步创建在运行时确定的动态数量的线程,更进一步地利用这一步,而不是如上面清单中的那样创建固定的数量。
4. 在运行时选择线程数量
C++标准库中对此有所帮助的特性是std::thread::hardware_currency()。这个函数返回一个对于给定程序执行时能够真正并发运行的线程数量的指示。例如,在多核系统上它可能是CPU核心的数量。它仅仅是一个提示,如果该信息不可用则函数可能会返回0,但它对于在线程间分割任务是一个有用的指南。
以下清单展示了std::accumulate的一个简单的并行版本实现。它在线程之间划分所做的工作,使得每个线程具有最小数目的元素以避免过多线程的开销。请注意,该实现假定所有的操作都不会引发异常,即便异常可能发生。例如,std::thread构造函数如果不能启动一个新的执行线程那么它将引发异常。在这样的算法中处理异常超出了这个简单示例的范围。
- template<typename Iterator, typename T>
- struct accumulate_block
- {
- void operator() (Iterator first, Iterator last, T& result)
- {
- result = std::accumulate(first, last, result);
- }
- };
- template<typename Iterator, typename T>
- T parallel_accumulate(Iterator first, Iterator last, T init)
- {
- unsigned long const length = std::distance(first, last);
- if (!length)
- return init;
- unsigned long const min_per_thread = 25;
- unsigned long const max_threads =
- (length + min_per_thread - 1) / min_per_thread;
- unsigned long const hardware_threads =
- std::thread::hardware_concurrency();
- unsigned long const num_threads =
- std::min(hardware_threads ? hardware_threads : 2, max_threads);
- unsigned long const block_size = length / num_threads;
- std::vector<T> results(num_threads);
- std::vector<std::thread> threads(num_threads - 1);
- Iterator block_start = first;
- for (unsigned long i = 0; i < (num_threads - 1); i++)
- {
- Iterator block_end = block_start;
- std::advance(block_end, block_size);
- threads[i] = std::thread(
- accumulate_block<Iterator,T>(),
- block_start, block_end, std::ref(results[i])
- );
- block_start = block_end;
- }
- accumulate_block<Iterator, T>() (
- block_start, last, results[num_threads - 1]
- );
- std::for_each(threads.begin(), threads.end(),
- std::mem_fn(&std::thread::join) );
- return std::accumulate(results.begin(), results.end(), init);
- }
虽然这是一个相当长的函数,但它实际上是很直观的。如果输入范围为空,只返回初始值init。否则,此范围内至少有一个元素,于是你将要处理的元素除以最小块的块大小,以获取线程的最大数量。这是为了避免当范围中只有五个值时,在一个32核的机器上创建32个线程。
要运行的线程数是你计算出的最大值和硬件线程数量的较小值。你不会想要运行比硬件所能支持的更多线程(超额订阅,oversubscription),因为上下文切换将意味着更多的线程会降低性能。如果对std::thread::hardware_concurrency()的调用返回0,你只需简单地替换上你所选择的数量,在这个例子中我选择了2。你不会想要运行过多的线程,因为在单核的机器上这会使事情变慢,但同样地你也不希望运行的过少,因为那样的话,你就错过可用的并发。
每个待处理的线程的条目数量是范围的长度除以线程的数量。如果你担心数量不能整除,没必要——稍后再来处理。
既然你知道有多少个线程,你可以为中间结果创建一个std::vector<T>,同时为线程创建一个std::vector<std::thread>。请注意,你需要启动比num_threads少一个的线程,因为已经有一个了。
启动线程是个简单的循环:递进block_end迭代器到当前块的结尾,并启动一个新线程来累计此块的结果。下一个块的开始是这一个的结束。
当你启动了所有的线程后,这个线程就可以处理最后的块。这就是你处理所有未被整除的地方。你知道最后一块的结尾只可能是last,无论在那个块里有多少元素。一旦累计出最后一块的结果,你可以等待所有使用std::for_each生成的线程,接着通过最后调用std::accumulate将结果累加起来。
在你离开这个例子前,值得指出的是在类型T的加法运算符不满足结合律的地方(如float和double),这个parallel_accumulate的结果可能会跟std::accumulate的有所出入,这是将范围分组成块导致的。此外,对迭代器的需求要更严格一点,它们必须至少是向前迭代器(forward iterators),然而std::accumulate可以和单通输入迭代器(input iterators)一起工作,同时T必须是可默认构造的(default constructible)以使得你能够创建result向量。这些需求的各种变化是并行算法很常见的;就其本质而言,它们以某种方式的不同是为了使其并行,并且在结果和需求上产生影响。另外值得一提的是,因为你不能直接从一个线程中返回值,所以你必须将相关项的引用传入results向量中。从线程中返回结果的替代方法,会在之后介绍通过使用future来实现。
在这种情况,每个线程所需的所有信息在线程开始时传入,包括存储其计算结果的位置。实际情况并非总是如此。有时,作为进程的一部分有必要能够以某种方式标识线程。你可以传入一个标识数,如同之前清单中的i的值,但是如果需要此标识符的函数在调用栈中深达数个层次,并且可能从任意线程中被调用,那样做就很不方便。当我们设计C++线程库时就遇见了这方面的需求,所以每个线程都有一个唯一的标识符。
5. 标识线程
线程标识符是std::thread::id类型的,并且有两种获取方式。其一,线程的标识符可以通过与之相关联的std::thread对象中通过调用get_id()成员函数来获得。如果std::thread对象没有相关联的执行线程,对get_id()的调用返回一个默认构造的std::thread::id,表示“没有线程”。另外,当前线程的标识符,可以通过调用std::this_thread::get_id()d获得,这也是定义在
线程标识符是std::thread::id类型的,并且有两种获取方式。其一,线程的标识符可以通过与之相关联的std::thread对象中通过调用get_id()成员函数来获得。如果std::thread对象没有相关联的执行线程,对get_id()的调用返回一个默认构造的std::thread::id,表示“没有线程”。另外,当前线程的标识符,可以通过调用std::this_thread::get_id()获得,这也是定义在<thread>头文件中的。
std::thread::id类型的对象可以自由地复制和比较;否则,它们作为标识符就没有大用处。如果两个std::thread::id类型的对象相等,则它们代表着同一线程,或两者都具有“没有线程”的值。如果两个对象不相等,则代表着不同的线程,或其中一个代表着线程,而另一个具有“没有线程”的值。
线程库不限制你检查线程的标识符是否相同,std::thread::id类型的对象提供了一套完整的比较运算符,提供了所有不同值的总排序。这就允许它们在关系型容器中被用作主键,或是被排序,或者任何作为程序员的你认为合适的方式进行比较。比较运算符为std::thread::id所有不相等的值提供了一个总的排序,所以它们表现为你直觉上期待的那样:如果a<b且b<c,那么a<c,等等。标准库还提供了std::hash<std::thread::id>,使得std::thread::id类型的值可以在新的无序关系型容器中作为主键来用。
std::thread::id的实例常被用来检查一个线程是否需要执行某些操作。例如,如果线程像在之前代码里那样被用来分配工作,启动了其他线程的初始线程在需要做的工作可能会在算法上略有不同。在这种情况下,它可以启动其他线程之前存储std::this_thread::get_id()的结果,然后算法的核心部分(这对所有线程都是公共的)可以对照所存储的值来检查自己的线程ID。
- std::thread::id master_thread;
- void some_core_part_of_algorithm()
- {
- if (std::this_thread::get_id() == master_thread)
- {
- do_master_thread_work();
- }
- do_common_work();
- }
另外,当前线程的std::thread::id可以作为操作的一部分而存储在数据结构中。以后在相同数据结构上的操作可以对照执行此操作的线程ID来检查所存储的ID,来确定哪些操作是允许的/需要的。
类似地,线程ID可以指定的数据需要与一个线程进行关联,并且诸如线程局部存储这样的替代机制不适用的地方,用作关系型容器的主键。例如这样的一个容器,它可以被控制线程用来存储关于在它控制下的每个线程的信息,或是在线程之间传递信息。
这种想法就是,在大多数情况下,std::thread::id足以作为线程的通用标识符。只有当标识符具有与其相关联的语义(比如作为数组的索引)时,才有必要用替代方案。你甚至可以将一个std::thread::id实例写入诸如std::cout这样的输出流中。
- std::count << std::this_thread::get_id();
你得到的确切的输出,严格取决于实现;标准给定的唯一保证是,比较结果相等的线程ID应该产生相同的输出,而哪些比较结果不相等的应该给出不同的输出。因此,这主要是对调试和日志有用,但数值是没有语义的,所以也没有更多可说的了。
思考和总结
这一章讲的内容非常详尽,单就std::thread这个类,介绍了它的方方面面。并且每一节衔接的很好,一节的末尾都会引出下一节应用的场景。印象最深的就是std::thread构造函数当中的参数传递,需要考虑隐式转化和线程上下文环境,解决了之前的疑惑。同时书中还给出了一个加法运算的并行版本,虽说小巧但是感觉很有借鉴意义,是自己接触的第一个实际多线程的完整例子。同时书中也说明提及了目前自己的疑惑会在后续章节解答:如何获得线程的返回值?如何在线程中同步数据?需要接下来继续学习。