c++11提供了线程及其相关操作的库,可以很方便的支持并发编程。

线程的使用

std::threadc++11提供的线程类,不可拷贝,只支持移动语义,相比于pthread_t使用起来非常方便。
支持的构造函数:

thread() noexcept;
thread( thread&& other ) noexcept;
template< class Function, class... Args >
explicit thread( Function&& f, Args&&... args );
thread( const thread& ) = delete;

简单的用例:

#include <iostream>
#include <thread>

void print() {
std::cout << "hello world" << std::endl;
}

int main() {

std::thread t(print);
t.get_id(); // 获取线程id
std::this_thread::sleep_for(std::chrono::seconds(1)); // 线程休眠
t.join();

return 0;
}

当线程创建以后,线程就开始运行,但工作还没有结束,我们需要对线程执行joindetach操作,否则程序会崩溃。

  • join() : 等待线程执行完毕。
  • detach() : 将该线程从父线程的管理中分离出去。

c++20中提供的jthread线程类则不需要手动调用这两种操作,在jthread类释放时会主动调用join操作。

获取线程id

std::this_thread::get_id()

线程休眠:

std::this_thread::sleep_for(std::chrono::seconds(1))

线程中共享数据

std::mutex

考虑两个线程同时操作一个共享变量的情况:

#include <thread>

int i = 0;

void add() {
for(int i = 0; i < 5; ++i) {
i++;
}
}

int main() {
std::thread t1(&add);
std::thread t2(&add);
t1.join();
t2.join();
return 0;
}

由于i++操作不是原子类型的,会存在线程不安全的情况。为了避免这种情况,我们需要对共享数据进行加锁操作,确保每次只有一个线程访问共享变量。c++11提供了互斥锁std::mutex,支持获取锁和释放锁。

lock()
unlock()

现在利用std::mutex来保障上述示例的线程安全:

#include <thread>

std::mutex test_mutex;
int i = 0;

void add() {
for(int i = 0; i < 5; ++i) {
test_mutex.lock();
i++;
test_mutex.unlock();
}
}

int main() {
std::thread t1(&add);
std::thread t2(&add);
t1.join();
t2.join();
return 0;
}

然而每次主动的获取锁和释放锁是一个很麻烦的过程,c++11中的std::lock_guardstd::unique_lock能很方便的帮我们管理std::mutexunique_locklock_guard的升级版本。当std::lock_guard创建时获取锁,析构时释放锁。上述的例子继续改造成:

#include <thread>

std::mutex test_mutex;
int i = 0;

void add() {
for(int i = 0; i < 5; ++i) {
std::lock_guard<std::mutex> lock(test_mutex); // 获取锁
i++;
}
}

int main() {
std::thread t1(&add);
std::thread t2(&add);
t1.join();
t2.join();
return 0;
}

std::shared_mutex

c++17提供了共享锁std::shared_mutex,能够提供两种访问等级:

  • 共享:多个线程能获取同一个锁
  • 独占:只有一个线程能获取锁

lock() : 独占的方式获取锁
lock_shared() :共享的方式获取锁
std::shared_lock< std::shared_mutex> lock(mutex); // 共享的方式获取锁
std::unique_lock< std::shared_mutex> lock(mutex); // 独占的方式获取锁

一个线程获取独占锁,其他线程无法获取该锁;如果一个线程获取共享锁,其他线程能够获取共享锁,但不能获取独占锁。
我们可以用它来实现一个读写锁,读操作的时候能多个线程共享,写操作的时候只能一个线程独占。

一个简单的样例:

#include <iostream>
#include <mutex>
#include <shared_mutex>
#include <thread>


std::shared_mutex read_write_mutex;
std::mutex mutex;
int count = 0;

void tread() {
std::shared_lock<std::shared_mutex> read_lock(read_write_mutex);
for(int i = 0; i < 5; ++i){
{
std::unique_lock<std::mutex> lock(mutex);
std::cout << "count:" << count << " id:" << std::this_thread::get_id() << std::endl;
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}

void tadd() {
std::unique_lock<std::shared_mutex> write_lock(read_write_mutex);
count++;
std::cout << "add count: " << count << std::endl;
}

int main() {

std::thread t1(tread);
std::thread t2(tread);
std::thread t3(tadd);
std::thread t4(tread);

t1.join();
t2.join();
t3.join();
t4.join();

return 0;
}

线程的同步操作

std::condition_variable

有时候线程中某个操作的执行需要依赖于另一个线程中的操作完成,例如下面的样例中,线程A想要从while循环跳出继续执行,需要满足flag = true,而flag = true的执行在线程B中:

flag = false;
线程A中操作
while(!flag) ;

线程B中的操作
flag = true;

c++11中提供了条件变量std::condition_variable来协助我们在保证线程安全的情况下完成上述的操作。std::condition_variable提供了两个重要的操作waitnotify

  • wait() : 使当前线程阻塞,直到接收到notify信息。
  • notify_one()/notify_all() : notify_one唤醒一个正在等待的线程;notify_all()唤醒所有正在等待的线程。

wait()函数提供了两个重载实现:

void wait( std::unique_lock<std::mutex>& lock );
template< class Predicate >
void wait( std::unique_lock<std::mutex>& lock, Predicate stop_waiting );

两个wait实现都是需要传入一个锁,即先获取这个锁,当阻塞该线程的时候,条件变量会释放这个锁,当接收到notify信号后,又会重新尝试获取这个锁。对于第一个wait实现,会立即阻塞当前线程,然后释放锁,当接收到notify信号后,尝试获取锁,获取成功后,继续执行后续的逻辑。对于第二个wait的实现,会先判断bool(stop_waiting()) == true,如果是则直接执行后续的逻辑,如果不是则阻塞该线程,然后释放锁,当接收到notify信号后,会先尝试获取锁,获取成功后,会再次判断bool(stop_waiting()) == true。等价于:

while(!stop_waiting()){
wait(lock);
}

用一个简单的样例来进行说明:

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>

bool ready{false};
std::mutex mu;
std::condition_variable cond;

void run() {
std::unique_lock<std::mutex> lock(mu);
cond.wait(lock, []{ // #2
std::cout << "check" << std::endl;
return ready;
});
std::cout << "run end" << std::endl;
}

int main() {

std::thread t(&run); // #1
std::this_thread::sleep_for(std::chrono::seconds(1));
{
std::unique_lock<std::mutex> lock(mu);
ready = true; // #3
}
cond.notify_one(); // #4
std::cout << "ok" << std::endl;
t1.join();
return 0;
}
  • #1创建一个线程,线程开始执行。
  • #2条件变量cond获取锁lock,然后判断ready == true,条件为假,条件变量cond释放lock,然后阻塞该线程,等待notify信号唤醒。
  • #3修改变量ready的值为true
  • #4条件变量cond发出notify信号,唤醒被阻塞的线程。
  • #2让我们再回到第二步,条件变量cond尝试获取lock,获取成功后,判断ready == true,条件成真,继续执行该线程中的后续逻辑。

std::call_once

当使用多线程时,如果只想在一个线程中执行某个操作,那么可以使用std::call_once,声明如下:

#include <mutex>

template< class Callable, class... Args >
void call_once( std::once_flag& flag, Callable&& f, Args&&... args );

需要提供std::once_flag变量和可调用的对象。call_once的并发调用都保证能观察到主动调用的变化,而无需额外同步。,即并发调用call_once,会阻塞在那。简单样例:

#include <iostream>
#include <thread>
#include <mutex>

std::once_flag test_flag;

void print() {
std::call_once(test_flag, []{
std::this_thread::sleep_for(std::chrono::seconds(2));
std::cout << "just print once" << std::endl;
});
std::cout << "id:" << std::this_thread::get_id() << " end" << std::endl;
}

int main() {

std::thread t1(print);
std::thread t2(print);
std::thread t3(print);

t1.join();
t2.join();
t3.join();

return 0;
}

std::future

std::future提供了一种访问异步操作结果的机制:

std::future<T> f;
T t = f.get();

可以和std::promise一起使用,用来获取线程的返回值,需要将promise对象传入线程中。简单的样例:

#include <iostream>
#include <thread>
#include <future>

std::promise<int> promise;
std::future<int> future = promise.get_future();

void test(std::promise<int> promise) {
promise.set_value(2);
}

int main() {

std::thread t(test, std::move(promise));
std::cout << "value: " << future.get() << std::endl;
t.join();

return 0;
}

也能和std::packaged_task搭配使用实现同样的效果,同样需要将packaged_task传入到线程中。样例如下:

#include <iostream>
#include <thread>
#include <future>

int test2() {
return 1;
}

int main() {

std::packaged_task<int()> task(test2);
std::future<int> f = task.get_future();
std::thread t2(std::move(task));
std::cout << "value: " << f.get() << std::endl;
t2.join();

return 0;
}

std::async

如果想获取线程中的返回值,那么也可以使用std::asyncc++17中的声明如下:

#include <future>

template< class Function, class... Args >
std::future<std::invoke_result_t<std::decay_t<Function>,
std::decay_t<Args>...>>
async( std::launch policy, Function&& f, Args&&... args );

std::future中的类型就是可调用对象的返回类型。参数std::launch policy提供两个执行选项:

  • std::launch::deferred:指定当前线程延后执行任务,等待std::future对象调用waitget,才创建线程开始执行任务。
  • std::launch::async:创建一个新的线程立马执行任务。

简单用例:

#include <iostream>
#include <future>
#include <thread>

int test1() {
std::cout << "test1, pid: " << std::this_thread::get_id() << std::endl;
return 2;
}

void test2() {
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "test2, pid: " << std::this_thread::get_id() << std::endl;
}

int main() {
// std::future<int> the_answer = std::async(std::launch::deferred, test1);
std::future<int> the_answer = std::async(std::launch::async, test1);
test2();
std::cout << "the answer is: " << the_answer.get() << std::endl;

return 0;
}

原子类型

c++11提供了一些更细粒度的“锁”,我们称之为原子类型,每次只有一个线程能操作该类型的变量。原子类型的声明如下:

template< class T >
struct atomic;

原子类型提供了读取操作(load)和写入操作(store),两个操作默认使用std::memory_order_seq_cst内存序。

void store( T desired, std::memory_order order = std::memory_order_seq_cst ) noexcept;

T load( std::memory_order order = std::memory_order_seq_cst ) const noexcept;

原子类型提供了常用的运算操作:加法运算fetch_add、减法运算fetch_sub、与运算fetch_and、或运算fetch_or、异或运算fetch_xor。这里介绍其中加法运算符的使用,其他都是一样的。该操作会将当前值与value和arg的算术加法结果原子性地替换。该操作是读取-修改-写入操作。

T fetch_add( T arg, std::memory_order order = std::memory_order_seq_cst ) noexcept;

原子类型还提供了一些对操作符的重载。

operator++
operator--
operator+=
operator-=
operator&=
operator|=
operator^=

当原子类型调用相关函数的时候,都需要传入一个std::memory_order类型的值,这个就是内存序。内存序指定围绕原子操作的内存访问(包括常规的非原子内存访问)应该如何进行排序。当多个线程同时读取和写入几个变量时,一个线程可以观测到这些值以与另一个线程写入它们的顺序不同的顺序更改,原子类型中一共提供了6种内存序。

  • memory_order_relaxd
  • memory_order_consume
  • memory_order_acquire
  • memory_order_release
  • memory_order_acq_rel
  • memory_order_seq_cst

我们最常用的是memory_order_relaxdmemory_order_acquirememory_order_releasememory_order_seq_cst这四种内存序。内存序中“锁”特性越强,则性能越弱。memory_order_relaxd是最宽松的内存序,对其他读取或写入操作没有同步约束,只能保证自身的原子性;memory_order_seq_cst是最严格的内存序,提供了同步的语意;memory_order_acquire用于读操作,memory_order_release用于写操作。两个进行配对的时候,release写操作前面的所有写操作不会放到release操作后面执行,acquire读操作后面的所有都操作不会放到acquire操作前面执行,这就构成了一种同步关系。