Modern Effective C++ 条款三十九:对于一次性事件通信考虑使用void的futures
前置:C++中条件变量的使用
std::condition_variable 的 wait 函数有多个重载版本。
基本形式,等待直到被通知:
template< class Predicate >
void wait( std::unique_lock<std::mutex>& lock, Predicate pred );
等待直到被通知或直到指定的时间点:
template< class Clock, class Duration >
cv_status wait_until( unique_lock<mutex>& lock,
const chrono::time_point<Clock, Duration>& abs_time );
template< class Clock, class Duration, class Predicate >
bool wait_until( unique_lock<mutex>& lock,
const chrono::time_point<Clock, Duration>& abs_time,
Predicate pred );
等待直到被通知或直到经过指定的时间间隔:
template< class Rep, class Period >
cv_status wait_for( unique_lock<mutex>& lock,
const chrono::duration<Rep, Period>& rel_time );
template< class Rep, class Period, class Predicate >
bool wait_for( unique_lock<mutex>& lock,
const chrono::duration<Rep, Period>& rel_time,
Predicate pred );
最常用的两个是带有谓词(predicate)的 wait 和 wait_for/wait_until,它们能够处理虚假唤醒的问题,并且可以确保线程只会在条件真正满足时才继续执行。
对于带有谓词的 wait 方法,它的模板参数 Predicate 应该是一个可调用对象(比如函数、lambda 表达式、函数对象等),它接受零个参数并返回一个可转换为 bool 的值。谓词会被 wait 函数反复评估,只有当谓词返回 true 时,wait 才会终止,线程才会继续执行。
例如,使用带有谓词的 wait 方法:
std::condition_variable cv;
std::mutex mtx;
bool ready = false;
//反应任务代码
{
std::unique_lock<std::mutex> lk(mtx);
cv.wait(lk, []{ return ready; }); // 等待直到ready为true
// 对事件作出反应...
}
cv.wait(lk, []{ return ready; }); 会一直阻塞当前线程,直到 ready 变量变为 true 或者发生虚假唤醒。如果发生虚假唤醒,wait 会再次检查谓词,确保条件真正满足之后才继续执行。
互斥锁+条件变量的不足:
互斥锁的使用
条件变量总是与互斥锁一起使用,这是因为条件变量本身不提供同步保护。当一个线程等待一个条件变量时,它会释放关联的互斥锁,并进入等待状态。当条件变量被通知时,线程会被唤醒并重新获取互斥锁,然后继续执行。
即使检测任务和反应任务之间没有共享数据的竞争,我们仍然需要互斥锁来防止所谓的“竞态条件”,即检测任务可能在反应任务检查条件之前改变了条件,但又在反应任务开始等待之前再次改变条件。这种情况下,反应任务可能会错过通知。
漏失的通知
如果检测任务在反应任务调用 wait 之前调用了 notify_one() 或 notify_all(),那么这个通知就会被漏失,因为此时还没有任何线程在等待条件变量。为了避免这种情况,通常的做法是将条件变量与一个条件谓词(predicate)结合使用。条件谓词是一个布尔表达式,用来表示线程应该等待的条件是否满足。
虚假唤醒
虚假唤醒是指线程在没有收到实际通知的情况下从 wait 中醒来。
简单的轮询方法
使用共享的布尔型 flag 进行线程间通信的方法确实避免了条件变量设计中的一些复杂性,如互斥锁的需要和虚假唤醒的问题。这种方法有其自身的缺点,与性能和资源利用相关。
简单的轮询方法
在这种方法中,一个线程(检测线程)通过设置一个 std::atomic<bool> 类型的标志位来通知另一个线程(反应线程)。反应线程则通过不断地检查这个标志位来等待事件的发生:
std::atomic<bool> flag(false); // 共享的flag;使用原子操作保证线程安全
// 检测线程代码
{
// 检测某个事件...
flag = true; // 事件发生后将flag置位
}
// 反应线程代码
while (!flag.load(std::memory_order_acquire)); // 等待事件
// 对事件作出反应...
这种方法的优点是简单直接,不需要复杂的同步机制。但是,它存在明显的缺陷:
(1)CPU 资源浪费:反应线程在while (!flag)循环中不断检查 flag 的状态,这被称为“忙等”或“自旋”。即使没有实际的工作要做,该线程也会持续占用 CPU 时间片,从而导致 CPU 资源的浪费,并可能影响其他任务的执行。
(2)能量消耗:由于 CPU 核心不会进入节能模式,这种轮询方式会增加设备的能量消耗,尤其是在移动设备或服务器环境中,这对能效是一个重要的考虑因素。
条件变量+条件谓词
为了处理这个问题,应该始终在一个循环中使用条件变量的 wait 函数,并且传递一个条件谓词给它。这样可以确保线程只有在条件真正满足时才会继续执行,而不会因为虚假唤醒或漏失的通知而错误地继续执行。
正确的代码结构
std::condition_variable cv;
std::mutex m;
bool ready = false; // 条件谓词:代表事件是否已发生
// 检测任务代码
{
std::lock_guard<std::mutex> lk(m); // 简单锁定,自动解锁
// 检测事件...
ready = true; // 改变条件谓词的状态
}
cv.notify_one(); // 或者 notify_all() 如果有多个反应任务
// 反应任务代码
{
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, []{ return ready; }); // 等待直到ready为true
// 对事件作出反应...
}
ready 是条件谓词,用来表示事件是否已经发生。
cv.wait(lk, []{ return ready; }); 这一行保证了反应任务只会在线程确实准备好了(即 ready == true)之后才继续执行,从而解决了虚假唤醒的问题。同时,由于检测任务在改变 ready 的状态之前锁定了互斥锁,所以也避免了漏失的通知。
避免虚假唤醒:由于 wait 函数使用了谓词(lambda),即使出现虚假唤醒,反应任务也会检查 flag 是否为 true,只有当 flag 确实为 true 时才会继续执行。
防止漏失通知:无论检测任务是在反应任务进入 wait 之前还是之后调用 notify_one,只要 flag 在 wait 返回时为 true,反应任务就会正确地作出响应。
无需轮询:反应任务真正地阻塞在 wait 上,不会占用 CPU 资源。
虽然这种方法有效,但确实有些不那么优雅的地方,比如检测任务需要分别设置 flag 和调用 notify_one,这可能看起来有点冗余。然而,这种设计实际上是合理的,因为它保证了顺序性和原子性。
(1)顺序性:先设置 flag 再通知,确保反应任务看到的是最新的状态。
(2)原子性:虽然 flag 的设置和通知不是原子操作,但是由于 flag 是在互斥锁保护下修改的,所以对于其他线程来说,这两个操作是原子的。
promise和future替代线程间通信
简化代码:相比于条件变量和标志位的方法,使用 std::promise 和 std::future 可以显著简化代码。检测任务只需要设置 std::promise<void> 来通知事件的发生,而反应任务只需在对应的 std::future<void> 上等待。
#include <future>
#include <iostream>
// 检测任务代码
void detecting_task(std::promise<void>& p) {
// 模拟检测某个事件...
std::cout << "事件被检测到\n";
p.set_value(); // 通知反应任务
}
// 反应任务代码
void reacting_task(std::future<void> f) {
std::cout << "准备作出反应...\n";
f.wait(); // 等待事件
std::cout << "对事件作出反应\n";
}
避免虚假唤醒:std::future::wait() 只会在 std::promise 被设置后返回,因此不存在虚假唤醒的问题。
真正的阻塞:与轮询方法不同,std::future::wait() 会真正阻塞反应任务,不会占用 CPU 资源,直到 std::promise 被设置。
不需要互斥锁:由于 std::promise 和 std::future 内部处理了同步问题,所以不再需要显式的互斥锁来保护共享数据。
int main() {
std::promise<void> p;
std::future<void> f = p.get_future();
// 启动反应任务(可以是另一个线程)
std::thread react(reacting_task, std::move(f));
// 模拟一些工作...
std::this_thread::sleep_for(std::chrono::seconds(1));
// 启动检测任务(可以在当前线程中执行)
detecting_task(p);
react.join();
}
std::promise 和 std::future 的缺点
- 动态分配开销:std::promise 和 std::future 之间的共享状态是动态分配的,每次创建它们时都会产生堆内存分配和释放的开销。
- 一次性通信:std::promise 一旦设置就不能再次设置,使得它只能用于一次性的通信。如果需要多次通信,则必须为每次通信创建新的 std::promise 和 std::future 对象,增加了管理的复杂度。
- 不适合频繁通信:由于上述原因,对于需要频繁通信的场景,基于 std::promise 和 std::future 的设计可能不是最佳选择。
尽管 std::promise 和 std::future 通常用于异步调用模式,但也可以用于任何需要从程序的一个地方传递信息到另一个地方的情况。例如,创建一个挂起的系统线程,以便在实际使用之前对其进行配置(如设置优先级或核心亲和性)。这种情况下,std::promise 和 std::future 提供了一种简单的方式来进行初始化后的线程启动信号传递。
虽然 std::promise 和 std::future 提供了一种简洁且有效的线程间通信方式,并解决了条件变量和标志位设计中的某些问题,但它们也有自己的局限性和适用范围。选择哪种方法取决于具体的应用场景和需求。对于需要频繁通信或复用通信通道的场景,条件变量可能是更好的选择;而对于简单的、一次性通信,std::promise 和 std::future 则提供了更简洁的解决方案。
std::promise<void> 和 std::future<void> 来挂起和取消挂起线程的方案
单个线程挂起与取消挂起
对于仅需要对某线程进行一次挂起(即在线程创建后但在执行线程函数前),可以使用 std::promise<void> 和 std::future<void> 来实现:
#include <iostream>
#include <thread>
#include <future>
void react() {
std::cout << "Reacting to the event...\n";
}
void detect() {
std::promise<void> p;
std::thread t([&p]() {
p.get_future().wait(); // 挂起t直到future被置位
react();
});
// 这里,t在调用react前挂起
p.set_value(); // 解除挂起t(因此调用react)
// 做其他工作...
t.join(); // 使t不可结合
}
使用 RAII 管理线程生命周期
为了确保所有离开 detect 函数的路径中线程都被正确地 join,可以使用 RAII 类来管理线程的生命周期。然而,这里有一个潜在的问题:如果在 p.set_value() 之前发生异常,lambda 中的 wait 将永远不会返回,导致线程不会结束,而 RAII 对象会在析构时尝试 join,从而造成死锁。
class ThreadRAII {
public:
ThreadRAII(std::thread&& t, DtorAction action) : th(std::move(t)), action_(action) {}
~ThreadRAII() {
if (th.joinable()) {
if (action_ == DtorAction::join)
th.join();
else if (action_ == DtorAction::detach)
th.detach();
}
}
private:
std::thread th;
DtorAction action_;
};
enum class DtorAction { join, detach };
void detect_with_raii() {
std::promise<void> p;
ThreadRAII tr(std::thread([sf = p.get_future().share()]{
sf.wait();
react();
}), ThreadRAII::DtorAction::join);
// 如果这里抛出异常,p.set_value() 不会被调用,导致tr中的线程永远等待
p.set_value(); // 解除挂起tr中的线程
}
支持多个反应线程
为了解决上述异常处理的问题并支持多个反应线程,可以使用 std::shared_future<void>。每个反应线程都拥有一个指向共享状态的 std::shared_future<void> 的副本,这允许它们共同等待同一个事件的发生。
#include <iostream>
#include <thread>
#include <future>
#include <vector>
void react() {
std::cout << "Reacting to the event...\n";
}
void detect_multiple_reactors(size_t threadsToRun) {
std::promise<void> p;
auto sf = p.get_future().share(); // 创建可共享的future
std::vector<std::thread> vt;
for (size_t i = 0; i < threadsToRun; ++i) {
vt.emplace_back([sf]() { // 按值捕获shared_future
sf.wait(); // 在局部副本上wait
react();
});
}
// 如果这里的“...”区域抛出异常,所有的线程将永久挂起!
try {
// 模拟一些工作...
p.set_value(); // 所有线程解除挂起
} catch (...) {
// 异常处理逻辑
throw; // 或者其他处理方式
}
for (auto& t : vt) {
t.join(); // 确保所有线程完成
}
}
int main() {
detect_multiple_reactors(5); // 示例:启动5个反应线程
return 0;
}
单次通信:std::promise<void> 和 std::future<void> 提供了一种简单的方法来进行一次性的线程间通信。
异常安全:通过 std::shared_future<void> 和 RAII 类,可以在异常情况下安全地管理多个线程的生命周期。
多线程支持:std::shared_future<void> 允许多个线程共同等待同一事件,简化了多线程同步的实现。
请记住:
- 对于简单的事件通信,基于条件变量的设计需要一个多余的互斥锁,对检测和反应任务的相对进度有约束,并且需要反应任务来验证事件是否已发生。
- 基于flag的设计避免的上一条的问题,但是是基于轮询,而不是阻塞。
- 条件变量和flag可以组合使用,但是产生的通信机制很不自然。
- 使用
std::promise
和future的方案避开了这些问题,但是这个方法使用了堆内存存储共享状态,同时有只能使用一次通信的限制。
原文地址:https://blog.csdn.net/m0_52043808/article/details/144358786
免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!