同步并发操作
在上一章中,我们看到了各种方法去保护在线程间共享的数据。但是有时候你不只是需要保护数据,还需要在独立的线程上进行同步操作。例如,一个线程在能够完成其任务之前可能需要等待另一个线程完成任务。一般来说,希望一个线程等待特定事件的发生或是一个条件变为 true 是常见的事情。虽然通过定期检查“任务完成”的标识或是在共享数据中存储类似的东西能够做到这一点,但却不甚理想。对于像这样的线程间同步操作的需求是如此常见,以至于 C++ 标准库提供了以条件变量(condition variables)和期值(future)为形式的工具来处理它。
在本章中,我将讨论如何使用条件变量和期值来等待事件,以及如何使用它们来简化操作的同步。
1. 等待事件或其他条件
假设你正乘坐通宵列车旅行。一个可以确保你在正确的车站下车的方法就是整夜保持清醒并注意火车停靠的地方。你不会误站,但你到那儿时就会觉得很累。或者,你可以查一下时间表,了解火车会在何时到达,将闹钟定的稍微提前一点,然后去睡觉。这是可以的;你不会错过站,但是如果火车晚点了,你就会醒的太早。也有可能闹钟的电池没电了,你就会睡过头以至于错过站。理想的状态是,你只管去睡觉,让某个人或某个东西在火车站到站时叫醒你,无论何时。
这如何与线程相关呢?那么,如果一个线程正等待着第二个线程完成一项任务,它有几个选择。首先,它可以一直检查共享数据(由互斥元保护)中的标识,并且让第二个线程在完成任务时设置该标识。这有两项浪费,线程占用了宝贵的处理时间去反复检查该标识,以及当互斥元被等待的线程锁定时,就不能被任何其他线程锁定。两者都反对线程进行等待,因为它们限制了等待中的线程的可用资源,甚至阻止它在完成任务时设置标识。这类似于整夜保持清醒地与火车司机交谈,他不得不把火车开得更慢,因为你一直在干扰它,所以需要更长的时间才能到达。同样的,等待中的线程消耗了本可以被系统中其他线程使用的资源,并且最终等待的时间可能会比所需的更长。
第二个选择是使用std::this_thread::sleep_for()函数,让等待中的线程在检查之间休眠一会儿。
- bool flag = false;
- std::mutex m;
- void wait_for_flag()
- {
- std::unique_lock<std::mutex> lk(m);
- while (!flag)
- {
- lk.unlock(); // ① 解锁互斥元
- std::this_thread::sleep_for(std::chrono::milliseconds(100) ); // ② 休眠 100 毫秒
- lk.lock(); // ③重新锁定互斥元
- }
- }
在这个循环中,函数在休眠之前 ② 解锁该互斥元 ①,并在之后再次锁定之 ③,所以另一个线程有机会获取它并设置标识。
这是一个进步,因为线程在休眠时并不浪费处理时间,但得到正确的休眠时间是很难的。检查之间休眠得过短,线程仍然会浪费处理时间进行检查;休眠得过长,即使线程在等待的任务已经完成,它还会继续休眠,导致延迟。这种过度休眠很少直接影响程序的操作,但它可能意味着快节奏的游戏中丢帧,或者在实时应用程序中过度运行一个时间片。
第三个选择,同时也是首选选择,是使用 C++ 标准库提供的工具来等待事件本身。等待由另一个线程触发一个事件的最基本机制(例如前面提到的管道中存在的额外操作)是条件变量。从概念上说,条件变量与某些事件或其他条件相关,并且一个或多个线程可以等待该条件被满足。当某个线程已经确定条件得到满足,它就可以通知一个或多个正在条件变量上进行等待的线程,以便唤醒它们并让它们继续处理。
1.1 用条件变量等待条件
标准 C++ 库提供了两个条件变量的实现:std::condition_variable和std::condition_variable_any。这两个实现都在<condition_variable>库的头文件中声明。两者都需要和互斥元一起工作,以便提供恰当的同步;前者仅限于和std::mutex一起工作,而后者则可以与符合成为类似互斥元的最低标准的任何东西一起工作,因此以 _any 为后缀。因为std::condition_variable_any更加普遍,所以会有大小、性能或者操作系统资源方面的形式的额外代价的可能,因此应该首选std::condition_variable,除非需要额外的灵活性。
那么,如何使用std::condition_variable去处理引言中的例子——怎么让正在等到工作的线程休眠,直到有数据要处理?清单 1 展示了一种方法,你可以用条件变量来实现这一点。
- std::mutex mut;
- std::queue<data_chunk> data_queue;
- std::condition_variable data_cond;
- void data_preparation_thread()
- {
- while (more_data_to_prepare())
- {
- data_chunk const data = prepare_data();
- std::lock_guard<std::mutex> lk(mut);
- data_queue.push(data);
- data_cond.notify_one();
- }
- }
- void data_processing_thread()
- {
- while (true)
- {
- std::unique_lock<std::mutex> lk(mut);
- data_cond.wait(lk,
- [] {return !data_queue.empty(); });
- data_chunk data = data_queue.front();
- data_queue.pop();
- lk.unlock();
- process(data);
- if (is_last_chunk(data))
- break;
- }
- }
首先,你拥有一个用来在两个线程之间传递数据的队列。当数据就绪时,准备数据的线程使用std::lock_guard去锁定保护队列的互斥元,并且将数据压入队列中。然后,它在std::condition_variable的实例上调用notify_one()成员函数,以通知等待中的线程(如果有的话)。
在另外一侧,你还有处理线程。该线程首先锁定互斥元,但是这次使用的是std::unique_lock而不是std::lock_guard——你很快就会明白为什么。该线程接下来在std::condition_variable上调用wait(),传入锁对象以及表示正在等待的条件的 lambda 函数。lambda 函数是 C++ 中的一个新功能,它允许你编写一个匿名函数作为另一个表达式的一部分,它们非常适合于为类似于wait()这样的标准库函数指定断言。在这个例子中,简单的 lambda 函数[] {return !data_queue.empty(); }检查 data_queue 是否不为empty(),即队列中已有数据准备处理。
wait()的实现接下来检查条件(通过调用所提供的 lambda 函数),并在满足时返回(lambda 返回 true)。如果条件不满足(lambda 函数返回 false),wait()解锁互斥元,并将该线程置于阻塞或等待状态。当来自数据准备线程中对notify_one()的调用通知条件变量时,线程从睡眠状态中苏醒(解除其阻塞),重新获得互斥元上的锁,并再次检查条件,如果条件已经满足,就从wait()返回值,互斥元仍被锁定。如果条件不满足,该线程解锁互斥元,并恢复等待。这就是为什么需要std::unique_lock而不是std::lock_guard——等待中的线程在等待期间必须解锁互斥元,并在这之后重新将其锁定,而std::lock_guard没有提供这样的灵活性。如果互斥元在线程休眠期间始终被锁定,数据准备线程将无法锁定该互斥元,以便将项目添加至队列,并且等待中的线程将永远无法看到其条件得到满足。
清单 1 为等待使用了一个简单的 lambda 函数,该函数检查队列是否为非空的,但是任何函数或可调用对象都可以传入。如果你已经有一个函数来检查条件(也许因为它比这样一个简单的试验更加复杂),那么这个函数就可以直接传入,没有必要将其封装在 lambda 中。在对wait()的调用中,条件变量可能会对所提供的条件检查任意多次。然而它并非直接响应另一个线程的通知,这就是所谓的伪唤醒(spurious wake)。由于所有的这种伪唤醒的次数和频率根据定义是不确定的,所以使用对于条件检查具有副作用的函数是不可取的。如果你这样做,就必须做好多次产生副作用的准备。
解锁std::unique_lock的灵活性不仅适用于对wait()的调用;它还可用于你有待处理但仍未处理的数据。处理数据可能是一个耗时的操作,在互斥元上持有锁超过所需的时间就是个不好的情况。
清单 1 所示的使用队列在线程之间传输数据,是很常见的场景。做得好的话,同步可以被限制在队列本身,大大减少了同步问题和竞争条件大概的数量。鉴于此,现在让我们从清单 1 中提取一个泛型的线程安全队列。
1.2 使用条件变量建立一个线程安全的队列
如果你设计一个泛型队列,花几分钟考虑一下可能需要的操作是值得的,就像你之前对线程安全队列所做的那样。让我们看一看 C++ 标准库来寻找灵感,以清单 2 所示的 std::queue<> 的容器适配器的形式。
- template <class T, class Container = std::deque<T>>
- class queue{
- public:
- explicit queue(const Container&);
- explicit queue(Container&& = Container() );
- template <class Alloc> explicit queue(const Alloc&);
- template <class Alloc> explicit queue(const Container&, const Alloc&);
- template <class Alloc> explicit queue(Container&&, const Alloc&);
- template <class Alloc> explicit queue(queue&&, const Alloc&);
- void swap(queue& q);
- bool empty() const;
- size_type size() const;
- T& front();
- const T& front() const;
- T& back();
- const T& back() const;
- void push(const T& x);
- void push(T&& x);
- void pop();
- template <class... Args> void emplace(Args&&... args);
- };
如果忽略构造函数、赋值和交换操作,那么还剩下 3 组操作:查询整个队列的状态( empty() 和 size() )、查询队列的元素( front() 和 back() )以及修改队列( push() 、 pop() 和 emplace() )。这些操作与你之前对堆栈的操作是相同的,因此你也遇到相同的有关接口中固有的竞争条件的问题。所以,你需要将 front() 和 pop() 组合到单个函数调用中,就像你为了堆栈组合 top() 和 pop() 那样。清单 1 中的代码增加了新的细微差别,但是,当使用队列在线程间传递数据时,接收线程往往需要等待数据。我们为 pop() 提供了两个变体: try_pop() ,它试图从队列中弹出值,但总是立即返回(带有失败指示符),即使没有也能获取到值。以及 wait_and_pop() ,它会一直等待,直到有值要获取。如果将栈示例中的特征带到此处,则接口看起来如清单 3 所示。
- #include <memory> // 为了 std::shared_ptr
- template<typename T>
- class threadsafe_queue
- {
- public:
- threadsafe_queue();
- threadsafe_queue(const threadsafe_queue&);
- threadsafe_queue& operator=(
- const threadsafe_queue&) = delete; // 为简单起见不允许赋值
- void push(T new_value);
- bool try_pop(T& value);
- std::shared_ptr<T> try_pop();
- void wait_and_pop(T& value);
- std::shared_ptr<T> wait_and_pop();
- bool empty() const;
- };
就像你为堆栈做的那样,减少构造函数并消除赋值以简化代码。如以前一样,还提供了 try_pop() 和 wait_and_pop() 的两个版本。 try_pop() 的第一个重载将获取到的值存储在引用变量中,所以它可以将返回值用作状态;如果它获取到的值就返回 true,否则返回 false。第二个重载不能这么做,因为它直接返回获取到的值。但是如果没有值可被获取到,则返回的指针可以设置为 NULL。
那么,所有这一切如何与清单 1 关联起来呢?嗯,你可以从那里提取 push() 以及 wait_and_pop() 的代码,如清单 4 所示。
- #include <queue>
- #include <mutex>
- #include <condition_variable>
- template<typename T>
- class threadsafe_queue
- {
- private:
- std::mutex mut;
- std::queue<T> data_queue;
- std::condition_variable data_cond;
- public:
- void push(T new_value)
- {
- std::lock_guard<std::mutex> lk(mut);
- data_queue.push(new_value);
- data_cond.notify_one();
- }
- void wait_and_pop(T& value)
- {
- std::unique_lock<std::mutex> lk(mut);
- data_cond.wait(lk, [this] {return !data_queue.empty(); });
- value = data_queue.front();
- data_queue.pop();
- }
- };
- threadsafe_queue<data_chunk> data_queue;
- void data_preparation_thread()
- {
- while (more_data_to_prepare())
- {
- data_chunk const data = prepare_data();
- data_queue.push(data);
- }
- }
- void data_processing_thread()
- {
- while (true)
- {
- data_chunk data;
- data_queue.wait_and_pop(data);
- process(data);
- if (is_last_chunk(data))
- break;
- }
- }
互斥元和条件变量现在包含在 threadsafe_queue 的实例中,所以不再需要单独的变量,并且调用 push() 不再需要外部的同步。此外, wait_and_pop() 负责条件变量等待。
wait_and_pop() 的另一个重载现在很容易编写,其余的函数几乎可以一字不差地从之前的栈示例中复制过来。清单 5 展示了最终的队列实现。
- #include <queue>
- #include <mutex>
- #include <condition_variable>
- template<typename T>
- class threadsafe_queue
- {
- private:
- mutable std::mutex mut;
- std::queue<T> data_queue;
- std::condition_variable data_cond;
- public:
- threadsafe_queue()
- {}
- threadsafe_queue(threadsafe_queue& other)
- {
- std::lock_guard<std::mutex> lk(other.mut);
- data_queue = other.data_queue;
- }
- void push(T new_value)
- {
- std::lock_guard<std::mutex> lk(mut);
- data_queue.push(new_value);
- data_cond.notify_one();
- }
- void wait_and_pop(T& value)
- {
- std::unique_lock<std::mutex> lk(mut);
- data_cond.wait(lk, [this] {return !data_queue.empty(); });
- value = data_queue.front();
- data_queue.pop();
- }
- std::shared_ptr<T> wait_and_pop()
- {
- std::unique_lock<std::mutex> lk(mut);
- data_cond.wait(lk, [this] {return !data_queue.empty(); });
- std::shared_ptr<T> res(std::make_shared<T>(data_queue.front()) );
- data_queue.pop();
- return res;
- }
- bool try_pop(T& value)
- {
- std::lock_guard<std::mutex> lk(mut);
- if (data_queue.empty())
- return false;
- value = data_queue.front();
- data_queue.pop();
- return true;
- }
- std::shared_ptr<T> try_pop()
- {
- std::lock_guard<std::mutex> lk(mut);
- if (data_queue.empty())
- return std::shared_ptr<T>();
- std::shared_ptr<T> res(std::make_shared<T>(data_queue.front() ));
- data_queue.pop();
- return res;
- }
- bool empty() const
- {
- std::lock_guard<std::mutex> lk(mut);
- return data_queue.empty();
- }
- };
虽然 empty() 是一个 const 成员函数,并且拷贝构造的 other 参数是一个 const 引用,但是其他线程可能会有到该对象的非 const 引用,并调用可变的成员函数,所以我们仍然需要锁定互斥元。由于锁定互斥元是一种可变的操作,故互斥元对象必须标记为 mutable,以便其可以被锁定在 empty() 和拷贝构造函数中。
条件变量在多个线程等待同一个事件的场合也是很有用的。如果线程被用于划分工作负载,那么应该只有一个线程去响应通知,可以使用与清单 1 中所示完全相同的结构,只要运行多个数据处理线程的实例。当新的数据准备就绪, notify_one() 的调用将触发其中一个正在执行 wait() 的线程去检查其条件,然后从 wait() 返回(因为你刚向 data_queue 中增加了一项)。谁也不能保证哪个线程会被通知到,即使是某个线程正在等待通知;所有的处理线程可能仍在处理数据。
另一种可能性是,多个线程正等待着同一个事件,并且它们都需要做出响应。这可能发生在共享数据正在初始化的场合下,处理线程都可以使用同一个数据,但需要等待其初始化完毕(虽然有比这更好的机制),或者是线程需要等待共享共享数据更新的地方,比如周期性的重新初始化。在这些案例中,准备数据的线程可以在条件变量上调用 notify_all() 成员函数而不是 notify_one()。顾名思义,这将导致所有当前执行着 wait() 的线程检查其等待中的条件。
如果等待线程只打算等待依次,那么当条件为 true 时它就不会再等待这个条件变量了,条件变量未必是同步机制的最佳选择。如果所等待的条件是一个特定数据块的可用性时,这尤其正确。在这个场景中,使用期值(future)可能会更合适。
2. 使用 future 等待一次性事件
假设你要乘飞机去国外度假。在到达机场并且完成了各种值机手续后,你仍然需要等待航班准备登机的通知,也许要几个小时。当然,你可以找到一些方法来消磨时间,比如看书、上网或者在昂贵的机场咖啡厅吃东西,但是从根本上来说,你只是在等待一个事情:登机时间到了的信号。不仅如此,一个给定的航班只进行一次,你下次去度假的时候,就会等待不同的航班。
C++ 标准库使用 future 为这类一次性事件建模。如果一个线程需要等待特定一次性的事件,那么它就会获取一个 future 来代表这一事件。然后,该线程可以周期性地在这个 future 上等待一小段时间以检查事件是否发生(检查出发告示板),而在检查间隙执行其他地任务(在高价咖啡厅吃东西)。另外,它还可以去做另外一件事情,直到其所需的事情已发生才继续进行,随后就等待 future 变为就绪(ready)。future 可能会有与之相关的数据(比如你的航班在哪个登机口登机),或可能没有。一旦事情已经发生(即 future 变为就绪),future 就无法复位。
C++ 标准库中有两类 future ,是由 <future> 库的头文件中声明的两个类模板实现的:唯一future(unique futures, std::future<>)和共享future(shared futures,std::shared_future<>)。这两个类模板是参照 std::unique_ptr 和 std::shared_ptr 建立的。std::future 的实例是仅有的一个指向其关联事件的实例,而 std::shared_future 的实例则可以指向同一个事件。对后者而言,所有实例将同时变为就绪,并且它们都可以访问所有与该事件相关联的数据。这些关联的数据,就是这两种 future 成为模板的原因,像 std::unique_ptr 和 std::shared_ptr 一样,模板参数就是关联的类型。std::future<void>、std::shared_future<void> 模板特化应该用于无关数据的场合。虽然 future 被用于线程间的同步,但是 future 对象本身却并不提供同步访问。如果多个线程需要访问同一个 future 对象,它们必须通过互斥元或其他同步机制来保护访问。然而,正如 2.5 节中看到的,多个线程可以分别访问自己的std::shared_future<>副本而无需进一步的同步,即使它们都指向同一个异步结果。
最基本的一次性事件是在后台运行着的计算结果。早在之前你就看到过 std::thread 并没有提供一种简单的从这一任务返回值的方法,我保证这将在这章中通过 future 加以解决——现在是时候去看看如何做了。
2.1 从后台任务中返回值
假设你有一个长期运行的计算,预期最终将得到一个有用的结果,但是现在,你还不需要这个值,也许你已经找到一种方法来确定生命、宇宙及万物的答案,这是从 Douglas Adams 那里偷来的一个例子。你可以启动一个新线程来执行该计算,但这也意味着你必须注意将结果传回来,因为 std::thread 并没有提供直接的机制来这样做。这就是 std::async 函数模板(同样声明于 <future> 头文件中)的由来。
在不需立即得到结果的时候,你可以使用 std::async 来启动一个异步任务(asynchronous task)。 std::async 返回一个 std::future 对象,而不是给你一个 std::thread 对象让你在上面等待,std::future 对象最终持有函数的返回值。当你需要这个值时,只要在 future 上调用 get(),线程就会阻塞直到 future 就绪,然后返回该值。清单 6 展示了一个简单的例子。
- #include <future>
- #include <iostream>
- int find_the_answer_to_ltuae();
- void do_other_stuff();
- int main()
- {
- std::future<int> the_answer = std::async(find_the_answer_to_ltuae);
- do_other_stuff();
- std::cout << "The answer is " << the_answer.get() << std::endl;
- }
std::async 允许你通过将额外的参数添加到调用中,来将附加参数传递给函数,这与 std::thread 是同样的方式。如果第一个参数是指向成员函数的指针,第二个参数则提供了用来应用该成员函数的对象(直接地,或通过指针,或封装在 std::ref 中),其余的参数则作为参数传递给该成员函数。否则,第二个及后续的参数将作为参数,传递给第一个参数所指定的函数或可调用对象。和 std::thread 一样,如果参数是右值,则通过移动原来的参数来创建副本。这就允许使用可移动的类型同时作为函数对象和参数。请看清单 7。
- #include <string>
- #include <future>
- struct X
- {
- void foo(int, std::string const&);
- std::string bar(std::string const&);
- };
- X x;
- auto f1 = std::async(&X::foo, &x, 42, "hello"); // 调用 p->foo(42,"hello"),其中p是&x
- auto f2 = std::async(&X::bar, &x, "goodbye"); // 调用 tmpx.bar("goodbye"),其中tmpx是x的副本
- struct Y
- {
- double operator()(double);
- };
- Y y;
- auto f3 = std::async(Y(), 3.141); // 调用 tmpy(3.141),其中tmpy是从Y()移动构造的
- auto f4 = std::async(std::ref(y), 2.718); // 调用 y(2.718)
- X baz(X&);
- std::async(baz, std::ref(x)); // 调用 baz(x)
- class move_only
- {
- public:
- move_only();
- move_only(move_only&&);
- move_only(move_only const&) = delete;
- move_only& operator=(move_only&&);
- move_only& operator=(move_only const&) = delete;
- void operator()();
- };
- auto f5 = std::async(move_only() ); // 调用 tmp(),其中 tmp 是从 std::move(move_only()) 构造的
默认情况下, std::async 是否启动一个新线程,或者在等待 future 时任务是否同步运行都取决于具体实现方式。在大多数情况下这就是你想要的,但你可以在函数调用之前使用一个额外的参数来指定究竟使用何种方式。这个参数为 std::launch 类型,可以是 std::launch::deferred ,以表明该函数调用将会延迟,直到在 future 上调用 wait() 或 get() 为止,或者是 std::launch::async ,以表明该函数必须运行在它自己的线程上,又或者是 std::launch::deferred | std::launch::async ,以表明可以由具体实现来选择。最后一个选项是默认的。如果函数调用被延迟,它有可能永远都不会实际运行。例如,
- auto f6 = std::async(std::launch::async, Y(), 1.2); // 在新线程中运行
- auto f7 = std::async(std::launch::deferred, baz, std::ref(x)); // 在 wait() 或 get() 中运行
- auto f8 = std::async(
- std::launch::deferred | std::launch::async,
- baz, std::ref(x)); // 由具体实现来选择
- auto f9 = std::async(baz, std::ref(x)); // 由具体实现来选择
- f7.wait(); // 调用延迟的函数
正如你稍后将在本章看到,并将在后续中再次看到的那样,使用 std::async 能够轻易地将算法转化成可以并行运行的任务。然而,这并不是将 std::future 与任务相关联的唯一方式;你还可以通过将任务封装在 std::packaged_task<> 类模板的一个实例中,或者通过编写代码,用 std::promise<> 类模板显式设置值等方式来实现。std::packaged_task 是比 std::promise 更高层次的抽象,所以我将从它开始。
2.2 将任务与 future 相关联
std::packaged_task<> 将一个 future 绑定到一个函数或可调用对象上。当 std::packaged_task<> 对象被调用时,它就调用相关联的函数或可调用对象,并且让 future 就绪,将返回值作为关联数据存储。这可以被用作线程池的构件,或者其他任务管理模式,例如在每个任务自己的线程上运行,或在一个特定的后台线程上按顺序运行所有任务。如果一个大型操作可以分成许多自包含的子任务,其中每一个都可以被封装在一个 std::packaged_task<> 实例中,然后将该实例传给任务调度器或线程池。这样就抽象出了任务的详细信息,调度程序仅需处理 std::packaged_task<> 实例,而非各个函数。
std::packaged_task<> 类模板的模板参数为函数签名,比如 void() 表示无参数无返回值的函数, 或是像 int(std::string&, double*) 表示接受对 std::string 的非 const 引用和指向 double 的指针,并返回 int 的函数。当你构造 std::packaged_task 实例的时候,你必须传入一个函数或可调用对象,它可以接受指定的参数并且返回指定的返回类型。类型无需严格匹配,你可以用一个接受 int 并返回 float 的函数构造 std::packaged_task<double(double)> ,因为这些类型是可以隐式转换的。
指定的函数签名的返回类型确定了从 get_future() 成员函数返回的 std::future<> 的类型,而函数签名的参数列表用来指定封装任务的函数调用运算符的签名。例如, std::packaged_task<std::string(std::vector<char>*, int)> 的部分类定义如清单 8 所示。
- template<>
- class packaged_task<std::string(std::vector<char>*, int)>
- {
- public:
- template<typename Callable>
- explicit packaged_task(Callable&& f);
- std::future<std::string> get_future();
- void operator()(std::vector<char>*, int);
- };
该 std::packaged_task 对象是一个可调用对象,它可以被封装入一个 std::function 对象,作为线程函数传给 std::thread ,或传给需要可调用对象的另一个函数,或者干脆直接调用。当 std::packaged_task 作为函数对象被调用时,提供给函数调用运算符的参数被传给所包含的函数,并且将返回值作为异步结果,存储在由 get_future() 获取的 std::future 中。因此,你可以将任务封装在 std::packaged_task 中,并且在把 std::packaged_task 对象传到别的地方进行适当调用之前获取 future。当你需要结果时,你可以等待 future 变为就绪,清单 9 的例子实际展示了这一点。
在线程之间传递任务许多 GUI 框架要求从特定的线程来完成 GUI 的更新,所以,如果另一个线程需要更新 GUI,它必须向正确的线程来发送消息来实现这一点。std::packaged_task 提供了一种更新 GUI 的方法,该方法无需为每个与 GUI 相关的活动自定义的消息。
- #include <deque>
- #include <mutex>
- #include <future>
- #include <thread>
- #include <utility>
- std::mutex m;
- std::deque<std::packaged_task<void()>> tasks;
- bool gui_shutdown_message_received();
- void get_and_process_gui_message();
- void gui_thread()
- {
- while (!gui_shutdown_message_received())
- {
- get_and_process_gui_message();
- std::packaged_task<void()> task;
- {
- std::lock_guard<std::mutex> lk(m);
- if (tasks.empty())
- continue;
- task = std::move(tasks.front() );
- tasks.pop_front();
- }
- task();
- }
- }
- std::thread gui_bg_thread(gui_thread);
- template<typename Func>
- std::future<void> post_task_for_gui_thread(Func f)
- {
- std::packaged_task<void()> task(f);
- std::future<void> res = task.get_future();
- std::lock_guard<std::mutex> lk(m);
- tasks.push_back(std::move(task) );
- return res;
- }
hanhan笔记:这个 demo 能学到很多东西。虽然现在还不知道线程池是什么,但是感觉在这个队列的基础上已经初步具备任务调度和执行的能力了。
此代码非常简单:GUI 线程循环直到收到通知 GUI 停止的消息,反复轮询待处理的 GUI 消息,比如用户点击,以及任务队列中的任务。如果队列中没有任务,则再次循环;否则,从任务队列中提取任务,解除队列中的锁,并允许任务。当任务再次返回时,与该任务相关联的 future 被设置为就绪。
在队列上发布任务也同样简单:利用所提供的函数创建一个新的任务包,通过调用 get_future() 成员函数从任务中获取 future,同时在返回 future 到调用之前将任务至于列表之上。发出消息给 GUI 线程的代码如果需要知道任务已完成,则可以等待该 future;若无需知道,则可以丢弃该 future。
本示例中的任务使用 std::packaged_task<void()> ,它封装了一个接受零参数且返回 void 的函数或可调用对象(如果它返回了别的东西,则返回值被丢弃)。这是最简单的任务,但如你在前面所看到的,std::packaged_task 也可以用于跟复杂的情况——通过指定一个不同的函数签名作为模板,你可以改变返回类型(以及在 future 的关联状态中存储的数据类型)和函数调用运算符的参数类型。这个示例可以进行简单的扩展,让那些在 GUI 线程上运行的任务接受参数,并且返回 std::future 中的值,而不是仅一个完成指示符。
那些无法用一个简单函数调用表达的任务和那些结果可能来自不止一个地方的任务又当如何?这些情况都可以通过第三种创建 future 的方式来处理:使用 std::promise 来显示地设置值。
2.3 生成 (std::)promise
当有一个需要处理大量网络连接的应用程序时,通常倾向于在独立的线程上分别处理每个连接,因为这能使网络通信更易于理解也更易于编程。这对于较低的连接数(因而线程数也比较低)效果很好。然而,随着连接数的增加,这就变得不那么合适了;大量的线程就会消耗大量操作系统资源,并可能导致大量的上下文切换(当线程数超过了可用的硬件并发),进而影响性能。在极端的情况下,操作系统可能在其网络连接能力用尽之前,就为运行新的线程而耗尽资源。在具有超大量网络连接的应用中,通常用少量线程(可能仅有一个)来处理连接,每个线程一次处理多个连接。
考虑其中一个处理这种连接的线程。数据包将以基本上随机的顺序来自于待处理的各个连接,同样地,数据包将以随机顺序进行排队发送。在多数情况下,应用程序的其他部分将通过特定的网络连接,等待着数据被成功地发送或是新一批数据被成功地接受。
std::promise<T> 提供一种设置值(类型T)方式,它可以在这之后通过相关联的 std::future<T> 对象进行读取。一对 std::promise / std::future 为这一设施提供了一个可能的机制;等待中的线程可以阻塞 future,同时提供数据的线程可以使用配对中的 promise 项,来设置相关的值并使 future 就绪。
你可以通过调用 get_future() 成员函数来获取与给定的 std::promise 相关的 std::future 对象,比如 std::packaged_task。当设置完 promise 的值(使用 set_value() 成员函数), future 会变为就绪,并且可以用来获取所存储的数值。如果销毁 std::promise 时未设置值,则将存入一个异常。2.4 节描述了异常是如何跨线程转移的。
清单 10 展示了处理如前文所述的连接的示例代码。在这个例子中,使用一对 std::promise<bool> / std::future<bool> 对来标识一块传出数据的成功传输;与 future 关联的值就是一个简单的成功 / 失败标志。对于传入的数据包,与 future 关联的数据为数据包的负载。
- void process_connections(connection_set& connections)
- {
- while (!done(connections))
- {
- for (connection_iterator connection = connections.begin(),
- end = connections.end(); connection != end; ++connection)
- {
- if (connection->has_incoming_data() )
- {
- data_packet data = connection->incoming();
- std::promise<payload_type>& p = connection->get_promise(data.id);
- p.set_value(data.payload);
- }
- if (connection->has_outgoing_data())
- {
- outgoing_packet data =
- connection->top_of_outgoing_queue();
- connection->send(data.payload);
- data.promise.set_value(true);
- }
- }
- }
- }
函数 process_connections() 一直循环到 done() 返回 true。每次循坏中,轮流检查每个连接,在有传入数据时获取之或是发送队列中的传出数据。此处假定一个输入数据包具有 ID 和包含实际数据在内的负载。此 ID 被映射至 std::promise (可能通过在关联容器中进行查找),并且该值被设为数据包的负载。对于传出的数据包,数据包取自传出队列,并实际上通过此连接发送。一旦发送完毕,与传出数据关联的 promise 被设为 true 以表示传输成功。此映射对于实际网络协议是否完好,取决于协议本身;这种 promise/future 风格的结构可能不适用于某特定情况,尽管它确实与某些操作系统支持的异步 I/O 具有相似的结构。
迄今为止的所有代码完全忽略了异常。虽然现象一个万物都始终运转良好的世界是美好的,但却不切实际。有时候磁盘满了,有时候你要找的东西恰好不在那里,有时候网络故障,有时候数据库损坏。如果你正在需要结果的线程中执行操作,代码可能只是用异常报告了一个错误,因此仅仅因为你想用 std::packaged_task 或 std::promise ,就限制地要求所有事情都正常工作是不必要的。因此 C++ 标准库提供一个简便的方式,来处理这种场景下的异常,并允许他们作为相关结果的一部分而保存。
hanhan笔记:书上的例子没有上下文,不清楚 std::promise 具体是怎么用的,以下是网上找的例子:
- #include <future>
- void test(std::future<int> f)
- {
- int res = f.get();
- printf("res = %d\n", res);
- }
- int main()
- {
- std::promise<int> p;
- std::future<int> f = p.get_future();
- std::thread t(test, std::move(f) );
- std::this_thread::sleep_for(std::chrono::milliseconds(5000) );
- p.set_value(100);
- t.join();
- }
2.4 为 future 保存异常
考虑下面简短的代码片段。如果将 -1 传入 square_root() 函数,会引发一个异常,同时将被调用者所看到。
- double square_root(double x)
- {
- if (x < 0)
- {
- throw std::out_of_range("x < 0");
- }
- return sqrt(x);
- }
现在假设不是仅从当前线程调用 square_root():
- double y = square_root(-1);
而是以异步调用的形式运行调用:
- std::future<double> f = std::async(square_root, -1);
- double y = f.get();
两者行为完全一致自然是最理想的;与 y 得到函数调用的任意一种结果一样,如果调用 f.get() 的线程能像在单线程情况一样,能够看到里面的异常,那是极好的。
实际情况则是,如果作为 std::async 一部分的函数调用引发了异常,该异常会被存储在 future 中,代替所存储的值,future 变为就绪,并且对 get() 的调用会重新引发所存储的异常(注:重新引发的是原始异常对象抑或其副本,C++ 标准并没有指定,不同的编译器和库在此问题上作出了不同的选择)。这同样发生在将函数封装入 std::packaged_task 的时候——当任务被调用时,如果封装的函数引发异常,该异常代替结果存入 future,准备在 get() 时引发。
顺理成章, std::promise 在显式的函数调用下提供相同的功能。如果期值存储一个异常而不是一个值,则调用 set_exception() 成员函数而不是 set_value()。这通常是在引发异常作为算法的一部分时用在 catch 块中,将该异常填入 promise。
- extern std::promise<double> some_promise;
- try
- {
- some_promise.set_value(calculate_value());
- }
- catch (...)
- {
- some_promise.set_exception(std::current_exception());
- }
这里使用 std::current_exception() 来获取已发生的异常。作为替代,可以使用 std::copy_exception() 直接存储新的异常而不对其引发。
- some_promise.set_exception(std::copy_exception(std::logic_error("foo")));
在异常的类型已知时,这比使用 try/catch 块更为简洁,并且应该优先使用,这不仅仅简化了代码,也为编译器提供更多的优化代码的机会。
另一种将异常存储至 future 的方式,是销毁与 future 关联的 std::promise 或 std::packaged_task ,而无需在 promise 上调用设置函数或是调用打包任务。在任何一种情况下,如果 future 尚未就绪,std::promise 或 std::packaged_task 的析构函数会将具有 std::future_errc::broken_promise 错误代码的 std::future_error 异常存储在相关联的状态中。通过创建 future,你承诺提供一个值或异常,而通过销毁该值或异常的来源,你违背了承诺。在这种情况下如果编译器没有将任何东西存进 future,等待中的线程可能会永远等待下去。
到目前为止,所有的例子都使用了 std::future。然而,std::future 有其局限性,最起码,只有一个线程能等待结果。如果需要多于一个的线程等待同一个事件,则需要使用 std::shared_future 来代替。
2.5 等待自多个线程
尽管 std::future 能处理从一个线程向另一个线程转移数据所需的全部必要的同步,但是调用某个特定的 std::future 实例的成员函数却并没有相互同步。如果从多个线程访问单个 std::future 对象而不进行额外的同步,就会出现数据竞争和未定义的行为。这是有意为之的,std::future 模型统一了异步结果的所有权,同时 get() 的单发性质使得这样的并发访问没有意义——只有一个线程可以获取值,因为在首次调用 get() 后,就没有任何可获取的值留下了。
如果你的并发代码的绝妙设计要求多个线程能够等待同一事件,目前还无需失去信心; std::shared_future 完全能够实现这一点。鉴于 std::future 是仅可移动的,所以所有权可以在实例间转移,但是一次只有一个实例指向特定的异步结果。std::shared_future 实例是可复制的,因此可以有多个对象引用同一个相关状态。
现在,即便有了 std::shared_future ,各个对象的成员函数仍然是不同步的,所以为了避免从多个线程访问单个对象时出现数据竞争,必须使用锁来保护访问。首选的使用方式,是用一个对象的副本来代替,并且让每个线程访问自己的副本。从多个线程访问共享的异步状态,如果每个线程都是通过自己的 std::shared_future 对象去访问刚状态,那么就是安全的,见图 1。
std::shared_future 的一个潜在用处,是实现类似复杂电子表格的并行执行。每个单元都有一个单独的终值,可以被多个其他单元格的公式使用。用来计算各个单元格结果的公式可以使用 std::shared_future 来引用第一个单元。如果所有独立单元格的公式被并行执行,那么可以继续完成的任务可以进行,而那些依赖于其他单元格的公式将阻塞,直到其依赖关系准备就绪。这就使得系统能最大限度地利用可用的硬件并发。
引用了异步状态的 std::shared_future 实例可以通过引用这些状态的 std::future 实例来构造。由于 std::future 对象不和其他对象共享异步状态的所有权,因此该所有权必须通过 std::move 转移到 std::shared_future,将 std::future 留在空状态,就像它被默认构造了一样。
- std::promise<int> p;
- std::future<int> f(p.get_future());
- assert(f.valid() ); // future f 是有效的
- std::shared_future<int> sf(std::move(f) );
- assert(!f.valid() ); // f 不再有效
- assert(sf.valid() ); // sf 现在有效
此时,future f 刚开始是有效的,因为它引用了 promise p 的异步状态,但是将状态转移至 sf 后,f 不再有效,而 sf 有效。
正如其他可移动的对象那样,所有权的转移对于右值是隐式的,因此可以从 std::promise 对象的 get_future() 成员函数的返回值直接构造一个 std::shared_future,例如
- std::promise<std::string> p;
- std::shared_future<std::string> sf(p.get_future() ); // 所有权的隐式转移
此处,所有权的转移是隐式的,std::shared_future<> 根据 std::future<std::string> 类型的右值进行构造。
std::future 具有一个额外特性,即从变量的初始化自动推断变量类型的功能,使得 std::shared_future 的使用更加方便。std::future 具有一个 share() 成员函数,可以构造一个新的 std::shared_future ,并且直接将所有权转移给它。这可以节省大量的录入,也使代码更易于更改。
- std::promise<std::map<SomeIndexType, SomeDataType, SomeComparator,
- SomeAllocator>::iterator> p;
- auto sf = p.get_future().share();
这种情况下,sf 的类型被推断为相当拗口的 std::shared_future<std::map<SomeIndexType, SomeDataType, SomeComparator, SomeAllocator>::iterator> 。如果比较器改变或分配器改变了,仅需改变 promise 的类型,future 的类型将自动更新以匹配。
有些时候,你会想要限制等待事件的时长,无论是因为某段代码能够占用的时间有着硬性的限制,还是因为如果事件不会很快发生,线程就可以去做其他有用的工作。为了处理这个功能,许多等待函数具有能够指定超时的变量。
3. 有时间限制的等待
前面介绍的所有阻塞调用都会阻塞一个不确定的时间段,挂起线程直到等待的事件发生。在许多情况下这是没问题的,但在某些情况下你会希望给等待加一个限制。这就使得能够发送某种形式的“我还活着”的消息给交互用户或者另一个进程,或是在用户已经放弃等待并且按下取消键时,干脆放弃等待。
有两类可供指定的超时:一为基于时间段的超时,即等待一个指定的时间长度(例如 30ms),或是绝对超时,即等到一个指定的时间点(例如世界标准时间 2011 年 11 月 30 日 17:30:15.045987023)。大多数等待函数提供这两种形式超时的变量,处理基于时间段超时的变量具有 _for 后缀,而处理绝对超时的变量具有 _until 后缀。
例如,std::condition_variable 具有两个重载版本的 wait_for() 成员函数和两个重载版本的 wait_until() 成员函数,对应于重载版本的 wait() ——一个重载只是等待到收到信号,或超时,或发生伪唤醒;另一个重载在唤醒时检测所给的断言,并只在所给的断言为 true (以及条件变量已收到信号)或超时的情况下才返回。
在细看使用超时的函数之前,让我们从时钟开始,看一看在 C++ 中是如何指定时间的。
3.1 时钟
就 C++ 标准库所关注的而言,时钟是时间信息的来源。具体来说,时钟是提供以下四种不同部分信息的类。
■ 现在时间。
■ 用来表示从时钟获取到的时间值的类型。
■ 时钟的节拍周期。
■ 时钟是否以均匀的速率进行计时,决定其是否为匀速(steady)时钟。
时钟的当前时间可以通过调用该时钟类的静态成员函数 now() 来获取。例如,std::chrono::system_clock::now() 返回系统时钟的当前时间。对于具体某个时钟的时间点类型,是通过 time_point 成员的 typedef 来指定的,因此 some_clock::now() 的返回类型是 some_clock::time_point 。
时钟的节拍周期是由分数秒指定的,它由时钟的 period 成员 typedef 给出——每秒走 25 拍的时钟,就具有 std::ratio<1,25> 的 period,而每 2.5 秒走一拍的时钟则具有 std::ratio<2.5,1> 的 period。如果时钟的节拍周期直到运行时方可知晓,或者可能所给的应用程序运行期间可变,则 peroid 可以指定为平均的节拍周期、最小可能的节拍周期,或是编写类库的人认为合适的一个值。在所给的一次程序的执行中,无法保证观察到的节拍周期与该时间所指定的 period 相符。
如果一个时钟以均匀速率计时(无论该时速是否匹配 peroid)且不能被调整,则该时钟被称为匀速时钟。如果时钟是匀速的,则时钟类的 is_steady 静态数据成员为 true,反之为 false。通常情况下,std::chrono::system_clock 是不匀速的,因为时钟可以调整,考虑到本地时钟漂移,这种调整甚至是自动执行的。这样的调整可能会引起调用 now() 所返回的值,比之前调用 now() 所返回的值更小,这违背了均匀计时速率的要求。如你马上要看到的那样,匀速时钟对于计算超时来说非常重要,因此 C++ 标准库提供形式为 std::chrono::steady_clock 的匀速时钟。由于 C++ 标准库提供的其他时钟包括 std::chrono::system_clock ,它代表系统的“真实时间”时钟,并为时间点和 time_t 值之间的相互转化提供函数,还有 std::chrono::high_resolution_clock ,它提供所有类库时钟中最小可能的节拍周期(和可能的最高精度)。它实际上可能是其他时钟之一的 typedef。这些时钟与其他时间工具都定义在 <chrono>类库头文件中。
我们马上要看看如何表示时间点,但在此之前,先来看看时间段是如何表示的。
3.2 时间段
时间段是时间支持中的最简单部分,它们是由 std::chrono::duration<> 类模板(线程库使用的所有 C++ 时间处理工具均位于 std::chrono 的命名空间中)进行处理的。第一个模板参数为所代表类型(如 int、long 或 double);第二个参数是个分数,指定每个时间段单位表示多少秒。例如,以 short 类型存储的几分钟的数目表示为 std::chrono::duration<short, std::ratio<60,1>> ,因为 1 分钟有 60 秒。另一方面,以 double 类型存储的毫秒数则表示为 std::chrono::duration<double, std::ratio<1,1000>>, 因为 1 毫秒为 1/1000 秒。
标准库在 std::chrono 命名空间中为各种时间段提供了一组预定义的 typedef : nanoseconds、microseconds、milliseconds、seconds、minutes 和 hours 。它们均使用一个足够大的整数类型以供表示,以至于如果你希望的话,可以使用合适的单位来表示超过 500 年的时间段。还有针对所有国际单位比例的 typedef 可供指定自定义时间段时使用,从 std::atto(10-18)至 std::exa(1018)(还有更大的,若你的平台具有 128 位整数类型),例如 std::chrono::duration<double,std::centi> 是以 double 类型表示的 1/100 秒的计时。
在无需截断值的场合,时间段之间的转换是隐式的(因此将小时转换为秒是可以的,但将秒转换成小时则不然)。显式转换可以通过 std::chrono::duration_cast<> 实现:
- std::chrono::milliseconds ms(54802);
- std::chrono::seconds s =
- std::chrono::duration_cast<std::chrono::seconds>(ms);
结果是截断而非四舍五入,因此在此例中 s 值为 54。
时间段支持算术运算,因此可以加、减时间段来得到新的时间段,或者可以乘、除一个底层表示类型(第一个模板参数)的常数。因此 5*seconds(1) 和 seconds(5) 或 minutes(1) - seconds(55) 是相同的。时间段中单位数量的计数可以通过 count() 成员函数获取。因此 std::chrono::milliseconds(1234).count() 为 1234。
基于时间段的等待是通过 std::chrono::duration_cast<> 实例完成的。例如,可以等待 future 就绪最多 35 毫秒。
- std::future<int> f = std::async(some_task);
- if (f.wait_for(std::chrono::milliseconds(35)) == std::future_status::ready)
- do_something_with(f.get() );
等待函数都会返回一个状态以表示等待是否超时,或者所等待的事件是否发生。在这种情况下,你在等待一个 future,若等待超时,函数返回 std::future_status::timeout ,若 future 就绪,则返回 std::future_status::ready, 或者如果 future 任务推迟,则返回 std::future_status::deferred。基于时间段的等待使用类库内部的匀速时钟来衡量时间,因此 35 毫秒意味着 35 毫秒的逝去时间,即便系统时钟在等待期间进行了调整(向前或者向后)。当然,系统调度的多变和 OS 时钟的不同精度意味着线程之间发出调用并返回的实际时间可能远远长于 35 毫秒。
在看过时间段后,接下可以继续看看时间点。
3.3 时间点
时钟的时间点是通过 std::chrono::time_point<> 类模板的实例来表示的,它以第一个模板参数指定其参考的时钟,并且以第二个模板参数指定计量单位(std::chrono::duration_cast<> 的特化)。时间点的值是时间的长度(指定时间段的倍数),因而一个特定时间点被称为时钟的纪元(epoch)。时钟的纪元是一项基本参数,但却不能直接查询,也未被 C++ 标准指定。典型的纪元包括 1970 年 1 月 1 日 00:00,以及运行应用程序的计算机引导启动的瞬间。时钟可以共享纪元或拥有独立的纪元。如果两个时钟共享一个纪元,则在一个类中的 time_point typedef 可指定另一个类作为与 time_point 相关联的时钟类型。虽然无法找出纪元的时间所在,但可以获取给定 time_point 的 time_since_epoch()。该成员函数返回一个时间段的值,其指定了从时钟纪元到该时间点的时间长度。
例如,你可以指定一个时间点为 std::chrono::time_point<std::chrono::system_clock, std::chrono::minutes>。这将保持时间于系统时钟相关,但却以分钟而不是系统的时钟固有测量精度(通常为秒或更小)进行测量。
你可以从 std::chrono::time_point<> 的实例加上和减去时间段来产生新的时间点,因此 std::chrono::high_resolution_clock::now() + std::chrono::nanoseconds(500) 将在 future 中给你 500 纳秒的时间。这对于在知道代码块的最大时间段情况下计算绝对超时是极好的,但若其中有对等待函数的多个调用,或是在等待函数之前有非等待函数,就会占据一部分时间。
你还可以从另一个共享同一个时钟的时间点减去一个时间点。结果为指定两个时间点之间长度的时间段。这对于代码的计时非常有用,例如,
- auto start = std::chrono::high_resolution_clock::now();
- do_something();
- auto stop = std::chrono::high_resolution_clock::now();
- std::cout << "do_something() took "
- << std::chrono::duration_cast<std::chrono::seconds>(stop - start).count()
- << " seconds" << std::endl;
然而 std::chrono::time_point<> 实例的时钟参数能做的不仅仅是指定纪元。当你将时间点传给到接受绝对超时的等待函数时,时间点的时钟参数可以用来测量时间。当时钟改变时会产生一个重要的影响,因为这一等待会跟踪时钟的变化,并且在时钟的 now() 函数返回一个晚于指定超时的值之前都不会返回。如果时钟向前调整,将减少等待的总长度(按照匀速时钟计量),反之如果向后调整,就可能增加等待的总长度。
如你所料,时间点和等待函数的 _until 变种共同使用。典型的用例是在用作从程序中一个固定的某个时钟 now() 开始的偏移量,尽管与系统时钟相关联的时间点可以通过在对用户可见的时间,用 std::chrono::system_clock::to_time_point() 静态成员函数从 time_t 转换而来。例如,如果有一个最大值为 500 毫秒的时间,来等待一个与条件变量相关的事件,你可以按清单 11 所示来做。
- #include <condition_variable>
- #include <mutex>
- #include <chrono>
- std::condition_variable cv;
- bool done;
- std::mutex m;
- bool wait_loop()
- {
- auto const timeout = std::chrono::steady_clock::now() +
- std::chrono::milliseconds(500);
- std::unique_lock<std::mutex> lk(m);
- while (!done)
- {
- if (cv.wait_until(lk, timeout) == std::cv_status::timeout)
- break;
- }
- return done;
- }
如果没有向等待传递断言,那么这是在有时间限制下等待条件变量的推荐方式。这种情况下,循环的总长度有限。如 1.1 节所示,当不传入断言时,需要通过循环来使用条件变量,一遍处理伪唤醒。如果在循环中使用 wait_for(),可能在伪唤醒前,就已结束等待几乎全部时长,并且在经过下一次等待开始后再来一次。这可能会重复任意次,使得总的等待时间无穷无尽。
在看过了指定超时的基础知识后,让我们来看看能够使用超时的函数。
3.4 接受超时的函数
超时的最简单用法,是将延迟添加到特定线程的处理过程中,以便在它无所事事的时候避免占用其他线程的处理时间。在 1 节你曾见过这样的例子,在循环中轮询一个“完成”标记。处理它的两个函数是 std::this_thread::sleep_for() 和 std::this_thread::sleep_until()。它们像一个基本的闹钟一样工作:在指定的时间段(使用 sleep_for())或直至指定的时间点(使用 sleep_until()),线程进入睡眠状态。sleep_for() 对于那些类似于 1 节中的例子是有意义的,其中一些事情必须周期性地进行,并且逝去的时间是重要的。另一方面,sleep_until() 允许安排线程在特定时间点唤醒。这可以用来触发半夜里的备份,或在早上 6:00 打印工资条,或在做视频回放时暂停线程直至下一帧的刷新。
当然,睡眠并不是唯一的接受超时的工具。你已经看到了可以将超时与条件变量和 future 一起使用。如果互斥元支持的话,甚至可以试图在互斥元获得锁时使用超时。普通的 std::mutex 和 std::recursive_mutex 并不支持锁定上的超时,但是 std::timed_mutex 和 std::recursive_timed_mutex 支持。这两种类型均支持 try_lock_for() 和 try_lock_until() 成员函数,它们可以在指定时间段内或在指定时间点之前尝试获取锁。表 1 展示了 C++ 标准库中可以接受超时的函数及其参数和返回值。列作时间段的参数必须为 std::chrono::duration<> 的实例,而那些列作时间点的必须为 std::chrono::time_point<> 的实例。
类/命名空间 | 函数 | 返回值 |
---|---|---|
std::this_thread 命名空间 | sleep_for(duration) sleep_until(time_point) |
不可用 |
std::condition_variable 或 std::condition_variable_any |
wait_for(lock,duration) wait_until(lock,time_point) |
std::cv_status::timeout 或 std::cv_status::no_timeout |
wait_for(lock,duration,predicate) wait_until(lock,time_point,predicate) |
bool——当唤醒时 predicate 的返回值 | |
std::timed_mutex 或 std::recursive_timed_mutex |
try_lock_for(duration) try_lock_until(time_point) |
bool——true 如果获得了锁,否则 false |
std::unique_lock<TimedLockable> | unique_lock(lockable,duration) unique_lock(lockable,time_point) |
不可以——owns_lock() 在新构造的对象上;如果获得了锁返回 true,否则 false |
unique_lock(duration) unique_lock(time_point) |
bool——true 如果获得了锁,否则 false | |
std::future<ValueType> 或 std::shared_future<ValueType> |
wait_for(duration) wait_until(time_point) |
std::future_status::timeout 如果等待超时 std::future_status::ready 如果 future 就绪 std::future_status::deferred 如果 future 持有的延迟函数还没有开始 |
目前,我已经介绍了条件变量、future、promise 和打包任务的机制,接下来是时候看一看更广的图景,以及如何利用它们来简化线程间操作的同步。
4. 使用操作同步来简化代码
使用截至目前在本章中描述的同步工具作为构件模块,允许你着重关注需要同步的操作而非机制。一种可以简化代码的方式,是采用一种更加函数式的(functional,在函数式编程意义上)的方法来编写并发程序。并非直接在线程之间共享数据,而是每个任务都可以提供它所需要的数据,并通过使用 future 将结果传播至需要它的线程。
4.1 带有 future 的函数式编程
函数式编程(functional programming,FP)指的是一种编程风格,函数调用的结果仅单纯依赖于该函数的参数而不依赖于任何外部状态。这与函数的数学概念相关,同时也意味着如果用同一参数执行一个函数两次,结果是完全一样的。这是许多 C++ 标准库中数学函数,如 sin、cos 和 sqrt,以及基本类型简单操作如 3+3、6*9 或 1.3/4.7 的特性。纯函数也不修改任何外部状态,函数的影响完全局限在返回值上。
这使得事情变得易于思考,尤其当涉及并发时,因为第 3 章中讨论的许多与共享内存相关的问题不复存在。如果没有修改共享内存,那么就不会有竞争条件,因此也就没有必要使用互斥元来保护共享数据。这是一个如此强大的简化,使得诸如 Haskell 这样的编程语言,在默认情况下其所有函数都是纯函数,开始在编写并发系统中变得更为流行。因为大多数东西都是纯的,实际上的确修改共享状态的非纯函数就显得鹤立鸡群,因而也更易于理解它们是如何纳入应用程序整体结构的。
然而函数式编程的好处不仅仅局限在那些将其作为默认范型的语言。C++ 是一种多范型语言,它完全可以用 FP 风格编写程序。随着 lambda 函数的到来,从 Boost 到 TR1 的 std::bind 合并,和自动变量类型推断的引入,C++11 比 C++98 更为容易实现函数式编程。 future 是使得 C++ 中 FP 风格的并发切实可行的最后一块拼图。一个 future 可以在线程间来回传递,使得一个线程的计算结果依赖于另一个的结果,而无需任何对共享数据的显式访问。
1. FP 风格快速排序
为了说明在 FP 风格并发中 future 的使用,让我们来看一个简单的快速排序算法的实现。算法的基本思想很简单,给定一列值,取一个元素作为中轴,然后将列表分为两组——比中轴小的为一组,大于等于中轴的为一组。列表的已排序副本,可以通过对这两组进行排序,并按照先是比中轴小的值已排序列表,接着是中轴,再后返回大于等于中轴的值已排序列表的顺序进行返回来获取。图 2 展示了 10 个整数的列表是如何根据此步骤进行排序的。FP 风格的顺序实现在随后的代码中展示;它通过值的形式接受并返回列表,而不是像 std::sort() 那样就地排序。
- template<typename T>
- std::list<T> sequential_quick_sort(std::list<T> input)
- {
- if (input.empty())
- {
- return input;
- }
- std::list<T> result;
- result.splice(result.begin(), input, input.begin());
- T const& pivot = *result.begin();
- auto divide_point = std::partition(input.begin(), input.end(), [&](T const& t) {return t < pivot; });
- std::list<T> lower_part;
- lower_part.splice(lower_part.end(), input, input.begin(), divide_point);
- auto new_lower(
- sequential_quick_sort(std::move(lower_part)));
- auto new_higher(
- sequential_quick_sort(std::move(input)));
- result.splice(result.end(), new_higher);
- result.splice(result.begin(), new_lower);
- return result;
- }
尽管接口是 FP 风格的,如果你自始自终使用 FP 风格,就会制作很多副本,即你在内部使用了“标准”祈使风格。取出第一个元素作为中轴,方法是用 splice() 将其从列表前端切下。虽然这样可能会导致一个次优排序(考虑到比较和交换的次数),由于列表遍历的缘故,用 std::list 做任何其他事都可能增加很多的时间。你已知要在结果中得到它,因此可以直接将其拼接至将要使用的列表中。现在,你还会想要将其作比较,因此让我们对它进行引用,以避免复制。接着可以使用 std::partition 将序列划分成小于中轴的值和不小于中轴的值。指定划分依据的最简单方式是使用一个 lambda 函数;使用引用捕获以避免复制 pivot 的值。
std::partition() 就地重新排序列表,并返回一个迭代器,它标记着第一个不小于中轴值的元素。迭代器的完整类型可能相当冗长,因此可以使用 auto 类型说明符,使得编译器帮你解决。
现在,你已经选择了 FP 风格接口,因此如果打算使用递归来排序这两“半”,则需要创建两个列表。可以通过再次使用 splice() 将从 input 到 divide_point 的值移动至一个新的列表:lower_part。这使得 input 中只仅留下剩余的值。你可以接着用递归调用对这两个列表进行排序。通过使用 std::move() 传入列表,也可以在此处避免复制——但是结果也将隐式地移动出来。最后,你可以再次使用 splice() 将 result 以正确地顺序连起来。new_higher 值在中轴之后直到末尾,而 new_lower 值从开始起,直到中轴之前。
2. FP 风格并行快速排序由于已经使用了函数式风格,通过 future 将其转换成并行版本就很容易了,如清单 13 所示。这组操作与之相同,区别在于其中一部分现在并行地运行。此版本使用 future 和函数式风格实现快速排序算法。
- template<typename T>
- std::list<T> parallel_quick_sort(std::list<T> input)
- {
- if (input.empty())
- {
- return input;
- }
- std::list<T> result;
- result.splice(result.begin(), input, input.begin() );
- T const& pivot = *result.begin();
- auto divide_point = std::partition(input.begin(), input.end(),
- [&](T const& t) {return t < pivot; });
- std::list<T> lower_part;
- lower_part.splice(lower_part.end(), input, input.begin(),
- divide_point);
- std::future<std::list<T>> new_lower(
- std::async(¶llel_quick_sort<T>, std::move(lower_part) ));
- auto new_higher(
- parallel_quick_sort(std::move(input)) );
- result.splice(result.end(), new_higher);
- result.splice(result.begin(), new_lower.get());
- return result;
- }
这里最大的变化是,没有在当前线程中对较小的部分进行排序,而是在另一个线程中使用 std::async() 对其进行排序。列表的上部分跟之前一样直接递归进行排序。通过递归调用 parallel_quick_sort() ,你可以充分利用现有的硬件并发能力。如果 std::async() 每次启动一个新的线程,那么如果你向下递归三次,就会有 8 个线程在运行;如果向下递归 10 次(对大约 1000 个元素),如果硬件可以处理的话,就将有 1024 个线程在运行。如果类库认定产生了过多的任务(也许是因为任务的数量超过了可用的硬件并发能力),则可能改为同步地产生新任务。他们会在调用 get() 的线程中运行,而不是在新的线程,从而避免在不利于性能的情况下把任务传递到另一个线程的开销。值得注意的是,对于 std::async 的实现来说,只有显式指定了 std::launch::deferred 再为每一个任务开启一个新线程(甚至在面对大量的过度订阅时),或是只有显式指定了 std::launch::async 再同步运行所有任务,才是完全符合的。如果依靠类库进行自动判定,建议你查阅一下这一实现的文档,看一看它究竟表现出什么样的行为。
与其使用 std::async() ,不如自行编写 spawn_task() 函数作为 std::packaged_task 和 std::thread 的简单封装,如清单 14 所示,为函数调用结果创建了一个 std::packaged_task ,从中获取 future,在线程中运行之并返回 future。其本身并不会带来多少优点(实际上可能导致大量的过度订阅),但它为迁移到一个更复杂的实现做好准备,它通过一个工作线程池,将任务添加到一个即将运行的队列里。我们会在之后研究线程池。相比于使用 std::async ,只有在你确实知道将要做什么,并且希望想要通过线程池建立的方式进行完全掌控和执行任务的时候,才值得首选这种方法。
- template<typename F, typename A>
- std::future<std::result_of<F(A&&)>::type>
- spawn_task(F&& f, A&& a)
- {
- typedef std::result_of<F(A&&)>::type result_type;
- std::packaged_task<result_type(A&&)>
- task(std::move(f));
- std::future<result_type> res(task.get_future());
- std::thread t(std::move(task), std::move(a));
- t.detach();
- return res;
- }
总之,回到 parallel_quick_sort。因为只是使用递归获取 new_higher,你可以将其拼接到之前的位置。但是现在 new_lower 是 std::future<std::list<T>> 而不仅是列表,因此需要在调用 splice() 之前调用 get() 来获取值。这就会等待后台任务完成,并将结果移动至 splice() 调用;get() 返回一个引用了所包含结果的右值,所以它可以被移除。
即使假设 std::async() 对可用的硬件并发能力进行最优化的使用,这仍不是快速排序的理想并行实现。原因之一是,std::partition 完成了很多工作,且仍为一个连续调用,但对目前已经足够好了。如果你对最快可能的并行实现有兴趣,请查阅学术文献。
函数式编程并不是唯一的避开共享可变数据的并发编程范式;另一种范式为 CSP(Communicating Sequential Process,通信顺序处理),这种范式下线程在概念上完全独立,没有共享数据,但是具有允许消息在它们之间进行传递的通信通道。这是被编程语言 Erlang 所采用的范式,也通常被 MPI(Message Passing Interface,消息传递接口)环境用于 C 和 C++ 中的高性能计算。我可以肯定到目前为止,你不会对这在 C++ 中也可以在一些准则下得到支持而感到意外,接下来的一节将讨论这一点的一种方式。
4.2 具有消息传递的同步操作
CSP 的思想很简单:如果没有共享数据,则每一个线程可以完全独立地推理得到,只需基于它对所接收到地消息如何进行反应。因此每个线程实际上可以等效为一个状态机:当它接收到消息时,它会根据初始状态进行操作,并以某种方式更新其状态,并可能向其他线程发送一个或多个消息。编写这种线程的一种方式,是将其形式化并实现一个有限状态机模型,但这并不是唯一的方式。状态机在应用程序结构中可以是隐式的。在任一给定的情况下,究竟哪种方法更佳,取决于具体形势的行为需求和编程团队的专长。但是选择实现每一个线程,将其分割成独立的进程可能会从共享数据并发中移除很多复杂性,因而使得编程更加容易,降低了错误率。
真正的通信序列进程并不共享数据,所有的通信都通过消息队列,但由于 C++ 线程共享一个地址空间,因此不可能强制执行这一需求。这就是准则介入的地方。作为应用程序或类库的作者,我们的责任是确保不在线程间共享数据。当然,为了线程之间的通信,消息队列必须是共享的,但是其细节可以封装在类库内。
现象一下,你正在实现 ATM 的代码。此代码需要处理人试图取钱的交互和与相关银行的交互,还要控制物理机器接受此人的卡片,显式恰当的信息,处理按键,出钞并退回用户的卡片。
处理这一切事情的其中一种方法,是将代码分成三个独立的线程:一个处理物理机器、一个处理 ATM 逻辑、还有一个与银行进行通信。这些线程可以单纯地通过传递消息而非共享数据进行通信。例如,当有人在机器旁边将卡插入或按下按钮时,处理机器的线程可以发送消息至逻辑线程,同时逻辑线程将发送消息至机器线程,指示要发多少钱等等。
对 ATM 逻辑进行建模的一种方式,是将其视为一个状态机。在每种状态中,线程等待可接受的消息,然后对其进行处理。这样可能转换到一个新的状态,并且循环继续。一个简单实现中涉及的状态如图 3 所示。在这个简化了的实现中,系统等待卡片插入。一旦卡片插入,便等待用户输入其密码,每次一个数字。他们可以删除最后输入的数字。一旦输入的数字足够多,就验证密码。如果密码错误,则结束,将卡片退回给用户,并继续等待有人插入卡片。如果密码正确,则等待用户取消交易或选择取出的金额。如果用户取消,则结束,并退出银行卡。如果用户选择了一个金额,则等待银行确认后发放现金并退出其卡片,或者显示“资金不足”的消息并退出其卡片。显然,真正的 ATM 比这复杂得多,但这已经足以阐述整个想法。
有了 ATM 逻辑的状态机设计之后,就可以使用一个具有表示每一个状态的成员函数的类来实现之。每一个成员函数都可以等待指定的一组传入消息,并在它们到达后进行处理,其中可能触发切换到另一状态。每个不同的消息类型可以由一个独立的 struct 来表示。清单 15 展示了这样一个系统中 ATM 逻辑的部分简单实现,包括主循环和第一个状态的实现,即等待卡片插入。
如你所见,所有消息传递所必需的同步完全隐藏于消息传递库内部。
hanhan笔记:这部分代码用了许多模板类,看不太懂,日后再来添加。
5. 小结
线程间的同步操作,是编写一个使用并发的应用程序的重要组成部分。如果没有同步,那么线程本质上是独立的,就可以被写作独立的应用程序,由于它们之间存在相关活动而作为群组运行。在本章中,我介绍了同步操作的各种方式,从最基本条件变量,到 future、promise 以及 打包任务。我还讨论了解决同步问题的方式,函数式编程,其中每个任务产生的结果完全依赖于它的输入而不是外部环境,以及消息传递,其中线程间的通信是通过一个扮演中介角色的消息子系统发送异步消息来实现的。
我们已经讨论了许多在 C++ 中可用的高阶工具,现在是时候来看看令这一切得以运转的底层工具了: C++ 内存模型和原子操作。