目录

线程基础知识

线程的发起

  • 线程发起顾名思义就是启动一个线程,C++11标准统一了线程操作,可以在定义一个线程变量后,该变量启动线程执行回调逻辑。如下即可发起一个线程
1
2
3
4
5
6
7

void thead_work1(std::string str) {
std::cout << "str is " << str << std::endl;
}
//1 通过()初始化并启动一个线程
std::thread t1(thead_work1, hellostr);

线程等待

  • 当我们启动一个线程后,线程可能没有立即执行,如果在局部作用域启动了一个线程,或者main函数中,很可能子线程没运行就被回收了,回收时会调用线程的析构函数,执行terminate操作。所以为了防止主线程退出或者局部作用域结束导致子线程被析构的情况,我们可以通过join,其作用是让主线程等待子线程启动运行,子线程运行结束后主线程再运行。(也就是让当前进程等待调用join的线程运行结束后在运行)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#include <iostream>
#include <thread>

// 一个简单的线程函数
void threadFunction() {
std::cout << "Hello from thread!\n";
}

int main() {
// 创建一个新的线程
std::thread t(threadFunction);

// 在主线程中做一些其他的事情

// 等待线程结束
t.join();

return 0;
}

线程可以调用的参数

调用函数对象

  • 调用函数对象的时候可能会遇到一种情况,如果按如下方式去调用就会报错
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#include <iostream>
#include <thread>

class func {
public:
void operator()() {
std::cout << "hello thread" << std::endl;
}
};

int main() {
std::thread t1(func());
t1.join();
return 0;
}
  • 原因:因为std::thread t1(func());这句话会被翻译成函数声明,即编译器会将t2当成一个函数对象, 返回一个std::thread类型的值, 函数的参数为一个函数指针,该函数指针返回值为background_task, 参数为void。可以理解为如下
1
"std::thread (*)(background_task (*)())"
  • 解决办法:

    1. 使用{}来替代()

      1
      2
      std::thread t1{func()};
      t1.join();
    2. 多加一层()来

1
2
std::thread t1((func()));
t1.join();

使用lambda表达式当作其参数

1
2
3
4
std::thread t3([](){
// lambda 表达式的函数体
});

使用成员函数指针来当作参数

1
2
3
4
5
6
7
8
9
10
class MyClass {
public:
void memberFunction() {
// 成员函数体
}
};

MyClass obj;
std::thread t2(&MyClass::memberFunction, &obj); // memberFunction 是一个成员函数

注意当其调用函数有参数的时候需要在后面加入对应的参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#include <iostream>
#include <thread>

// 线程函数,接受两个参数,并返回它们的和
void sum(int a, int b) {
std::cout << "Sum of " << a << " and " << b << " is: " << (a + b) << std::endl;
}

int main() {
int x = 10, y = 20;

// 创建一个新线程,并传递参数
std::thread t(sum, x, y);

// 等待线程结束
t.join();

return 0;
}

线程detach

  • detach() 函数的作用是将线程托管到后台来运行,实现主线程与线程分离,这被称为守护线程,分离线程后,父子线程就不会共享同一个内存,但是却可以通过指针和引用共同访问一个堆区资源,但是需要人为增加同步机制。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

struct func {
int& _i;
func(int & i): _i(i){}
void operator()() {
for (int i = 0; i < 3; i++) {
_i = i;
std::cout << "_i is " << _i << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
};
void oops() {
int some_local_state = 0;
func myfunc(some_local_state);
std::thread functhread(myfunc);
//隐患,访问局部变量,局部变量可能会随着}结束而回收或随着主线程退出而回收
functhread.detach();
}
// detach 注意事项
oops();
//防止主线程退出过快,需要停顿一下,让子线程跑起来detach
std::this_thread::sleep_for(std::chrono::seconds(1));

  • 上面的例子存在隐患,因为some_local_state是局部变量, 当oops调用结束后局部变量some_local_state就可能被释放了,而线程还在detach后台运行,容易出现崩溃。

解决办法

  • 使用智能指针shared_ptr,使用引用计数来保证局部变量不被释放,是伪闭包策略
  • 可以将引用传递改为按值传递,这样会需要拷贝构造,增加空间消耗,减少效率
  • 可以让主线程等待子线程完毕后再执行,但这样往往会影响程序的逻辑(因为这样就不是后台运行了)
1
2
3
4
5
6
7
8
9
10

void use_join() {
int some_local_state = 0;
func myfunc(some_local_state);
std::thread functhread(myfunc);
functhread.join();
}
// join 用法
use_join();

异常处理

  • 当我们启动一个线程后,如果主线程产生崩溃,会导致子线程也会异常退出,就是调用terminate,如果子线程在进行一些重要的操作比如将充值信息入库等,丢失这些信息是很危险的。所以常用的做法是捕获异常,并且在异常情况下保证子线程稳定运行结束后,主线程抛出异常结束运行。如下面的逻辑:
1
2
3
4
5
6
7
8
9
10
11
12
13
void catch_exception() {
int some_local_state = 0;
func myfunc(some_local_state);
std::thread functhread{ myfunc };
try {
//本线程做一些事情,可能引发崩溃
std::this_thread::sleep_for(std::chrono::seconds(1));
}catch (std::exception& e) {
functhread.join();
throw;
}
functhread.join();
}
  • 但是这种写法不完美,你需要自行添加 还得自行判断哪个必须要执行完才行,所以有什么方法可以自动判定而不需要人为选择呢? 那就是采用RAII思想
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38

struct func {
int& _i;
func(int & i): _i(i){}
void operator()() {
for (int i = 0; i < 3; i++) {
_i = i;
std::cout << "_i is " << _i << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
};

class thread_guard {
private:
std::thread& _t;
public:
explicit thread_guard(std::thread& t):_t(t){}
~thread_guard() {
//join只能调用一次
if (_t.joinable()) {
_t.join();
}
}
thread_guard(thread_guard const&) = delete;
thread_guard& operator=(thread_guard const&) = delete;
};

void auto_guard() {
int some_local_state = 0;
func my_func(some_local_state);
std::thread t(my_func);
thread_guard g(t);
//本线程做一些事情
std::cout << "auto guard finished " << std::endl;
}
auto_guard();

慎用隐式转换

  • C++中会有一些隐式转换,比如char* 转换为string等。这些隐式转换在线程的调用上可能会造成崩溃问题
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void print_str(int & i, const std::string & buf){
std::cout << "i is" << i << "buf is" << buf << std::endl;
}

void danger_oops(int som_param) {
char buffer[1024];
sprintf(buffer, "%i", som_param);
//在线程内部将char const* 转化为std::string
//指针常量 char * const 指针本身不能变
//常量指针 const char * 指向的内容不能变
std::thread t(print_str, 3, buffer);// 这里buffer 类型为char * 被隐式转换为 string
t.detach();
std::cout << "danger oops finished " << std::endl;
}

  • 当我们定义一个线程变量thread t时,传递给这个线程的参数buffer会被保存到thread的成员变量中。而在线程对象t内部启动并运行线程时,参数才会被传递给调用函数print_str。而此时buffer可能随着’}’运行结束而释放了。
  • 改进的方式很简单,我们将参数传递给thread时显示转换为string就可以了,这样thread内部保存的是string类型。
1
2
3
4
5
6
void safe_oops(int some_param) {
char buffer[1024];
sprintf(buffer, "%i", some_param);
std::thread t(print_str, 3, std::string(buffer));
t.detach();
}

引用参数

  • 当线程要调用的回调函数参数为引用类型时,需要将参数显示转化为引用对象传递给线程的构造函数,如果采用如下调用会编译失败
1
2
3
4
5
6
7
8
9
10
11
void change_param(int& param) {
param++;
}
void ref_oops(int some_param) {
std::cout << "before change , param is " << some_param << std::endl;
//需使用引用显示转换
std::thread t2(change_param, some_param);
t2.join();
std::cout << "after change , param is " << some_param << std::endl;
}

  • 即使函数change_param的参数为int&类型,我们传递给t2的构造函数为some_param,也不会达到在change_param函数内部修改关联到外部some_param的效果。因为some_param在传递给thread的构造函数后(底层会转换为右值)会转变为右值保存,右值传递给一个左值引用会出问题,所以编译出了问题。
  • 改为如下调用就可以了:
1
2
3
4
5
6
7
8
9

void ref_oops(int some_param) {
std::cout << "before change , param is " << some_param << std::endl;
//需使用引用显示转换
std::thread t2(change_param, std::ref(some_param));
t2.join();
std::cout << "after change , param is " << some_param << std::endl;
}

什么是std::ref?

  • std::ref的作用是将一个值包装为reference_Wrapper,这个对象在bind 和 thread时会被识别为引用,这样就解决了bind与thread 无法传递引用的问题(因为原本会被拷贝为右值)

  • 大致可以这么理解:在底层 ref函数会把 一个值的地址和类型封装成reference_wrapper,当我们调用的时候触发了仿函数(),取得了该地址下的值,使其表现为左值引用

1
2
3
4
5
6
 _CONSTEXPR20 operator _Ty&() const noexcept {
return *_Ptr;
}
_NODISCARD _CONSTEXPR20 _Ty& get() const noexcept {
return *_Ptr;
}
std::ref 和不同引用的区别
  • std::ref只是尝试模拟引用传递,并不能真正变成引用,在非模板情况下,std::ref根本没法实现引用传递,只有模板自动推导类型或类型隐式转换时,std::ref能用包装类型reference_wrapper来代替原本会被识别的值类型,而reference_wrapper能隐式转换为被引用的值的引用类型。
总结
  • 我来给总结下,首先我们讲解了std::ref的一些用法,然后我们讲解std::ref是通过std::reference_wrapper实现,然后我们借助了cppreference上的实现来给大家剖析了他本质就是存放了对象的地址(类似指针的用法😁),还讲解了noexcept等语法,最后我们讲解了下std::bind为什么要使用到reference_wrapper。
  • std::bind使用的是参数的拷贝而不是引用,当可调用对象期待入参为引用时,必须显示利用std::ref来进行引用绑定。
  • 多线程std::thread的可调用对象期望入参为引用时,也必须显式通过std::ref来绑定引用进行传参。

使用move操作

  • 有时候传递给线程的参数是独占的,所谓独占就是不支持拷贝赋值和构造,但是我们可以通过std::move的方式将参数的所有权转移给线程,如下
1
2
3
4
5
6
7
8
9
10
11
12
void deal_unique(std::unique_ptr<int> p) {
std::cout << "unique ptr data is " << *p << std::endl;
(*p)++;
std::cout << "after unique ptr data is " << *p << std::endl;
}
void move_oops() {
auto p = std::make_unique<int>(100);
std::thread t(deal_unique, std::move(p));
t.join();
//不能再使用p了,p已经被move废弃
// std::cout << "after unique ptr data is " << *p << std::endl;
}

线程底层做的处理

  • 传递给线程的参数首先会去掉引用,并保存副本在tuple里,无论传递的是左值还是右值,最后都会被转换成右值引用给invoke调用你

C++线程管控

线程归属权

1
2
3
4
5
6
void some_function() {
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
std::thread t1(some_function);
  • t1是一个线程变量,管理一个线程,该线程执行some_function(),对于std::thread C++ 不允许其执行拷贝构造和拷贝赋值, 所以只能通过移动和局部变量返回的方式将线程变量管理的线程转移给其他变量管理。
  • C++ 中类似的类型还有std::mutex, std::ifstream, std::unique_ptr
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void some_function() {
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
void some_other_function() {
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
//t1 绑定some_function
std::thread t1(some_function);
//2 转移t1管理的线程给t2,转移后t1无效,因为t1move后是右值,调用的是移动构造函数
std::thread t2 = std::move(t1);
//3 t1 可继续绑定其他线程,执行some_other_function
t1 = std::thread(some_other_function);
//4 创建一个线程变量t3
std::thread t3;
//5 转移t2管理的线程给t3
t3 = std::move(t2);
//6 转移t3管理的线程给t1
t1 = std::move(t3);
std::this_thread::sleep_for(std::chrono::seconds(2000));

  • 上面的代码会引发崩溃,是因为步骤6造成的崩溃。
  • 让主函数睡眠2000秒,是为了告诉规避主函数退出引发崩溃的问题,因为我们在之前给大家演示过,如果线程不detach或者join,主线程退出时会引发崩溃,而我们这些线程没有join和detach,为了给大家演示是因为步骤6引发的崩溃,所以让主线程睡眠2000秒暂时不退出,但是程序仍然会崩溃,说明是步骤6导致的崩溃。
  • 上面代码将t2管理的线程交给t3,之后将t3管理的线程交给t1,此时t1管理线程运行着 some_function,步骤6导致崩溃的原因就是将t3管理的线程交给t1,而此时t1正在管理线程运行some_other_function。
  • 所以我们可以得出一个结论,就是不要将一个线程的管理权交给一个已经绑定线程的变量,否则会触发线程的terminate函数引发崩溃。

  • 和std::unique_ptr一样,我们可以在函数内部返回一个局部的std::thread变量。如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 采用了RVO机制,编译器优化了返回值,执行了移动构造函数
std::thread f() {
return std::thread(some_function);
}
void param_function(int a) {
while (true) {
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
std::thread g() {
//这是NRVO机制,具名的返回值优化
std::thread t(param_function, 43);
return t;
}
  • 因为C++ 在返回局部变量时,会优先寻找这个类的拷贝构造函数,如果没有就会使用这个类的移动构造函数。(RVO与NRVO机制)

joining_thread

  • 曾经有一份C++17标准的备选提案,主张引入新的类joining_thread,它与std::thread类似,但只要其执行析构函数,线程即能自动汇合,这点与scoped_thread非常像。可惜C++标准委员会未能达成共识,结果C++17标准没有引入这个类,后来它改名为std::jthread,依然进入了C++20标准的议程(现已被正式纳入C++20标准)。除去这些,实际上joining_thread类的代码相对容易编写
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class joining_thread {
std::thread _t;
public:
joining_thread() noexcept = default;
template<typename Callable, typename ... Args>
explicit joining_thread(Callable&& func, Args&& ...args):
_t(std::forward<Callable>(func), std::forward<Args>(args)...){}
explicit joining_thread(std::thread t) noexcept: _t(std::move(t)){}
joining_thread(joining_thread&& other) noexcept: _t(std::move(other._t)){}
joining_thread& operator=(joining_thread&& other) noexcept
{
//如果当前线程可汇合,则汇合等待线程完成再赋值
if (joinable()) {
join();
}
_t = std::move(other._t);
return *this;
}
joining_thread& operator=(joining_thread other) noexcept
{
//如果当前线程可汇合,则汇合等待线程完成再赋值
if (joinable()) {
join();
}
_t = std::move(other._t);
return *this;
}
~joining_thread() noexcept {
if (joinable()) {
join();
}
}
void swap(joining_thread& other) noexcept {
_t.swap(other._t);
}
std::thread::id get_id() const noexcept {
return _t.get_id();
}
bool joinable() const noexcept {
return _t.joinable();
}
void join() {
_t.join();
}
void detach() {
_t.detach();
}
std::thread& as_thread() noexcept {
return _t;
}
const std::thread& as_thread() const noexcept {
return _t;
}
};
  • 用起来就比较简单了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
void use_jointhread() {
//1 根据线程构造函数构造joiningthread
joining_thread j1([](int maxindex) {
for (int i = 0; i < maxindex; i++) {
std::cout << "in thread id " << std::this_thread::get_id()
<< " cur index is " << i << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}, 10);
//2 根据thread构造joiningthread
joining_thread j2(std::thread([](int maxindex) {
for (int i = 0; i < maxindex; i++) {
std::cout << "in thread id " << std::this_thread::get_id()
<< " cur index is " << i << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}, 10));
//3 根据thread构造j3
joining_thread j3(std::thread([](int maxindex) {
for (int i = 0; i < maxindex; i++) {
std::cout << "in thread id " << std::this_thread::get_id()
<< " cur index is " << i << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}, 10));
//4 把j3赋值给j1,joining_thread内部会等待j1汇合结束后
//再将j3赋值给j1
j1 = std::move(j3);
}

容器存储

  • 容器存储线程时,比使用emplace_back(),直接创造匿名对象可以节约一次构造函数
1
2
3
4
5
6
7
8
9
void use_vector() {
std::vector<std::thread> threads;
for (unsigned i = 0; i < 10; ++i) {
threads.emplace_back(param_function, i);
}
for (auto& entry : threads) {
entry.join();
}
}

选择运行数量

  • 借用C++标准库的std::thread::hardware_concurrency()函数,它的返回值是一个指标,表示程序在各次运行中可真正并发的线程数量.
    我们可以模拟实现一个并行计算的功能,计算容器内所有元素的和
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
template<typename Iterator, typename T>
T parallel_accumulate(Iterator first, Iterator last, T init)
{
unsigned long const length = std::distance(first, last);
if (!length)
return init; //⇽-- - ①
unsigned long const min_per_thread = 25;
unsigned long const max_threads =
(length + min_per_thread - 1) / min_per_thread; //⇽-- - ②
unsigned long const hardware_threads =
std::thread::hardware_concurrency();
unsigned long const num_threads =
std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads); //⇽-- - ③
unsigned long const block_size = length / num_threads; //⇽-- - ④
std::vector<T> results(num_threads);
std::vector<std::thread> threads(num_threads - 1); // ⇽-- - ⑤
Iterator block_start = first;
for (unsigned long i = 0; i < (num_threads - 1); ++i)
{
Iterator block_end = block_start;
std::advance(block_end, block_size); //⇽-- - ⑥
threads[i] = std::thread(//⇽-- - ⑦
accumulate_block<Iterator, T>(),
block_start, block_end, std::ref(results[i]));
block_start = block_end; //⇽-- - ⑧
}
accumulate_block<Iterator, T>()(
block_start, last, results[num_threads - 1]); //⇽-- - ⑨
for (auto& entry : threads)
entry.join(); //⇽-- - ⑩
return std::accumulate(results.begin(), results.end(), init); //⇽-- - ⑪
}
void use_parallel_acc() {
std::vector <int> vec(10000);
for (int i = 0; i < 10000; i++) {
vec.push_back(i);
}
int sum = 0;
sum = parallel_accumulate<std::vector<int>::iterator, int>(vec.begin(),
vec.end(), sum);
std::cout << "sum is " << sum << std::endl;
}

上面的代码1处判断要计算的容器内元素为0个则返回。

2处计算最大开辟的线程数,我们预估每个线程计算25个数据长度。

但是我们可以通过std::thread::hardware_concurrency返回cpu的核数,我们期待的是开辟的线

程数小于等于cpu核数,这样才不会造成线程过多时间片切换开销。

所以3处计算了适合开辟线程数的最小值。

4处计算了步长,根据步长移动迭代器然后开辟线程计算。

5处初始化了线程数-1个大小的vector,因为主线程也参与计算,所以这里-1.

6处移动步长,7处开辟线程,8处更新起始位置。

9处为主线程计算。

10 处让所有线程join

11 处最后将所有计算结果再次调用std的accumulate算出结果。

识别线程

  • 所谓识别线程就是获取线程id,可以根据线程id是否相同判断是否同一个线程。
  • 比如我们启动了一个线程,我们可以通过线程变量的get_id()获取线程id
1
2
3
4
std::thread t([](){
std::cout << "thread start" << std::endl;
});
t.get_id();
  • 但是如果我们想在线程的运行函数中区分线程,或者判断哪些是主线程或者子线程,可以通过这种方式
1
2
3
4
5
std::thread t([](){
std::cout << "in thread id " <<
std::this_thread::get_id() << std::endl;
std::cout << "thread start" << std::endl;
});

互斥和死锁

锁的使用

  • 我们可以通过mutex对共享数据进行加锁,防止多线程访问共享区造成数据不一致问题。如下,我们初始化一个共享变量shared_data,然后定义了一个互斥量std::mutex,接下来启动了两个线程,分别执行use_lock增加数据,和一个lambda表达式减少数据。
    结果可以看到两个线程对于共享数据的访问是独占的,单位时间片只有一个线程访问并输出日志。

  • 一个线程不能再次加一个自己已经锁定的锁: 如果一个线程已经持有某个锁(比如 std::mutex),则再次尝试在同一个线程中对该锁进行加锁会导致死锁。因为锁是互斥的,同一线程不能重复持有同一个互斥量。

  • 其他线程尝试加已经锁定的锁会被阻塞: 如果一个线程已经持有某个锁,并且另一个线程尝试在此时对同一个锁进行加锁,则后者线程会被阻塞,直到锁被释放。这是互斥量的基本行为,它确保了在任意时刻只有一个线程可以访问被保护的临界区,从而避免了数据竞争和不确定的行为。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// lock mutex.cpp : 此文件包含 "main" 函数。程序执行将在此处开始并结束。
//

#include <iostream>
#include <mutex>
std::mutex mtx1;
int share_data = 100;

void use_lock() {
while (true) {
mtx1.lock();
share_data++;
std::cout << "current thread is" << std::this_thread::get_id() << std::endl;
std::cout << "share_data is " << share_data << std::endl;
mtx1.unlock();
std::this_thread::sleep_for(std::chrono::microseconds(10));
}
}

void test_lock() {
std::thread t1(use_lock);
std::thread t2([]() {
while (true) {
mtx1.lock();
share_data--;
std::cout << "current thread is" << std::this_thread::get_id() << std::endl;
std::cout << "share_data is " << share_data << std::endl;
mtx1.unlock();
std::this_thread::sleep_for(std::chrono::microseconds(10));
}
});
t1.join();
t2.join();
}

int main()
{
test_lock();
std::cout << "Hello World!\n";
return 0;
}

lock_guard的使用

  • 当然我们可以用lock_guard自动加锁和解锁,比如上面的函数可以等价简化为
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

void test_lock() {
std::thread t1(use_lock);
std::thread t2([]() {
while (true) {
{
std::lock_guard<std::mutex> lk_lock(mtx1);
share_data--;
std::cout << "current thread is" << std::this_thread::get_id() << std::endl;
std::cout << "share_data is " << share_data << std::endl;
}
std::this_thread::sleep_for(std::chrono::microseconds(10));
}
});
t1.join();
t2.join();
}

  • 注意:使用lock_guard的时候,它是在创造的时候加锁,在被析构的时候解锁,如果你不注意它的作用域,就会导致一些资源浪费问题,甚至其他线程无法工作的情况,所以加入 { } 是必要的,没有加入的话t2线程将会一直占用共享数据,这么做的一个好处是简化了一些特殊情况从函数中返回的写法,比如异常或者条件不满足时,函数内部直接return,锁也会自动解开

如何保证数据安全

  • 有时候我们可以将对共享数据的访问和修改聚合到一个函数,在函数内加锁保证数据的安全性。但是对于读取类型的操作,即使读取函数是线程安全的,但是返回值抛给外边使用,存在不安全性。比如一个栈对象,我们要保证其在多线程访问的时候是安全的,可以在判断栈是否为空,判断操作内部我们可以加锁,但是判断结束后返回值就不在加锁了,就会存在线程安全问题。

  • 比如我定义了如下栈, 对于多线程访问时判断栈是否为空,此后两个线程同时出栈,可能会造成崩溃。如下所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
// lock mutex.cpp : 此文件包含 "main" 函数。程序执行将在此处开始并结束。
//

#include <iostream>
#include <mutex>
#include <stack>

template<typename T>
class threadsafe_stack1 {
private:
std::stack<T> stk;
mutable std::mutex m;
public:
threadsafe_stack1(){}
threadsafe_stack1(const threadsafe_stack1& other) {
std::lock_guard<std::mutex> lock(other.m);
stk = other.stk;
}

threadsafe_stack1& operator = (const threadsafe_stack1&) = delete;
void push(T new_value) {
std::lock_guard<std::mutex> lock(m);
stk.push(std::move(new_value));
}

//问题代码
T pop() {
std::lock_guard<std::mutex> lock(m);
auto element = stk.top();
stk.pop();
return element;
}

//危险
bool empty() const{
std::lock_guard<std::mutex> lock(m);
return stk.empty();
}

};

void test_threadsafe_stack1() {
threadsafe_stack1<int> safe_stack;
safe_stack.push(1);

std::thread t1([&safe_stack]() {
if (!safe_stack.empty()) {
std::this_thread::sleep_for(std::chrono::seconds(1));
safe_stack.pop();
}
});

std::thread t2([&safe_stack]() {
if (!safe_stack.empty()) {
std::this_thread::sleep_for(std::chrono::seconds(1));
safe_stack.pop();
}
});

t1.join();
t2.join();
}

int main()
{
//test_lock();
test_threadsafe_stack1();
//std::cout << "Hello World!\n";
return 0;
}

  • 在这段代码会在出栈的时候报错,因为可能栈里就一个元素但是我们却出栈了两次

解决的办法

  • 定义一个空栈异常,然后实现我们的出栈函数,如下所示
1
2
3
4
5
6
7
8
9
10
11
12
13
14
struct empty_stack : std::exception
{
const char* what() const throw();
};

T pop()
{
std::lock_guard<std::mutex> lock(m);
if (data.empty()) throw empty_stack();
auto element = data.top();
data.pop();
return element;
}

  • 这么做就需要在外层使用的时候捕获异常。这是C++ 并发编程中提及的建议。但是我觉得可以在函数pop内部再次判断栈是否为空,若为空则返回一个非法数据,这样比抛出异常好一些
  • 但是如果T是一个复杂类型,我们很难定义一个非法值给外界知晓,这一点可以通过智能指针进行优化。之后我们再介绍更优化的方案,因为现在这个pop函数仍存在问题,比如T是一个vector类型,那么在pop函数内部element就是vector类型,开始element存储了一些int值,程序没问题,函数执行pop操作, 假设此时程序内存暴增,导致当程序使用的内存足够大时,可用的有效空间不够, 函数返回element时,就会就会存在vector做拷贝赋值时造成失败。即使我们捕获异常,释放部分空间但也会导致栈元素已经出栈,数据丢失了。这其实是内存管理不当造成的,但是C++ 并发编程一书中给出了优化方案。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
struct empty_stack : std::exception
{
const char* what() const throw();
};
template<typename T>
class threadsafe_stack
{
private:
std::stack<T> data;
mutable std::mutex m;
public:
threadsafe_stack() {}
threadsafe_stack(const threadsafe_stack& other)
{
std::lock_guard<std::mutex> lock(other.m);
//①在构造函数的函数体(constructor body)内进行复制操作
data = other.data;
}
threadsafe_stack& operator=(const threadsafe_stack&) = delete;
void push(T new_value)
{
std::lock_guard<std::mutex> lock(m);
data.push(std::move(new_value));
}

//方法二
std::shared_ptr<T> pop()
{
std::lock_guard<std::mutex> lock(m);
//②试图弹出前检查是否为空栈
if (data.empty()) throw empty_stack();
//③改动栈容器前设置返回值
std::shared_ptr<T> const res(std::make_shared<T>(data.top()));
data.pop();
return res;
}

//方法一
void pop(T& value)
{
std::lock_guard<std::mutex> lock(m);
if (data.empty()) throw empty_stack();
value = data.top();
data.pop();
}
bool empty() const
{
std::lock_guard<std::mutex> lock(m);
return data.empty();
}

};
  • 我们提供了两个版本的pop操作,一个是带引用类型的参数的,一个是直接pop出智能指针类型,这样在pop函数内部减少了数据的拷贝,防止内存溢出,其实这两种做法确实是相比之前直接pop固定类型的值更节省内存,运行效率也好很多。我们也完全可以基于之前的思想,在pop时如果队列为空则返回空指针,这样比抛出异常更有好一些
1
2
3
4
5
6
7
8
9
10
std::shared_ptr<T> pop()
{
std::lock_guard<std::mutex> lock(m);
//②试图弹出前检查是否为空栈
if (data.empty()) return nullptr;
//③改动栈容器前设置返回值
std::shared_ptr<T> const res(std::make_shared<T>(data.top()));
data.pop();
return res;
}

死锁是怎么造成的

  • 死锁一般是由于调运顺序不一致导致的,比如两个线程循环调用。当线程1先加锁A,再加锁B,而线程2先加锁B,再加锁A。那么在某一时刻就可能造成死锁。比如线程1对A已经加锁,线程2对B已经加锁,那么他们都希望彼此占有对方的锁,又不释放自己占有的锁导致了死锁。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
std::mutex t_lock1;
std::mutex t_lock2;

int m_1 = 0;
int m_2 = 1;

void dead_lock1() {
while (true) {
std::cout << "dead_lock1 begin " << std::endl;
t_lock1.lock();
m_1 = 1024;
t_lock2.lock();
m_2 = 2048;
t_lock2.unlock();
t_lock1.unlock();
std::cout << "dead_lock1 end " << std::endl;
}
}
void dead_lock2() {
while (true) {
std::cout << "dead_lock2 begin " << std::endl;
t_lock2.lock();
m_2 = 2048;
t_lock1.lock();
m_1 = 1024;
t_lock1.unlock();
t_lock2.unlock();
std::cout << "dead_lock2 end " << std::endl;
}
}

void test_dead_lock() {
std::thread t1(dead_lock1);
std::thread t2(dead_lock2);
t1.join();
t2.join();
}

int main()
{
//test_lock();
//test_threadsafe_stack1();
test_dead_lock();
return 0;
}
  • 这样运行之后在某一个时刻一定会导致死锁

死锁解决办法

  • 实际工作中避免死锁的一个方式就是将加锁和解锁的功能封装为独立的函数,这样能保证在独立的函数里执行完操作后就解锁,不会导致一个函数里使用多个锁的情况,即加锁和解锁作为原子操作解耦合,各自只管理自己的功能
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//加锁和解锁作为原子操作解耦合,各自只管理自己的功能
void atomic_lock1() {
std::cout << "lock1 begin lock" << std::endl;
t_lock1.lock();
m_1 = 1024;
t_lock1.unlock();
std::cout << "lock1 end lock" << std::endl;
}
void atomic_lock2() {
std::cout << "lock2 begin lock" << std::endl;
t_lock2.lock();
m_2 = 2048;
t_lock2.unlock();
std::cout << "lock2 end lock" << std::endl;
}
void safe_lock1() {
while (true) {
atomic_lock1();
atomic_lock2();
std::this_thread::sleep_for(std::chrono::milliseconds(5));
}
}
void safe_lock2() {
while (true) {
atomic_lock2();
atomic_lock1();
std::this_thread::sleep_for(std::chrono::milliseconds(5));
}
}
void test_safe_lock() {
std::thread t1(safe_lock1);
std::thread t2(safe_lock2);
t1.join();
t2.join();
}

同时加锁

  • 当我们无法避免在一个函数内部使用两个互斥量,并且都要解锁的情况,那我们可以采取同时加锁的方式。我们先定义一个类,假设这个类不推荐拷贝构造,但我们也提供了这个类的拷贝构造和移动构造
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class som_big_object {
public:
som_big_object(int data) :_data(data) {}
//拷贝构造
som_big_object(const som_big_object& b2) :_data(b2._data) {
_data = b2._data;
}
//移动构造
som_big_object(som_big_object&& b2) :_data(std::move(b2._data)) {
}
//重载输出运算符
friend std::ostream& operator << (std::ostream& os, const som_big_object& big_obj) {
os << big_obj._data;
return os;
}
//重载赋值运算符
som_big_object& operator = (const som_big_object& b2) {
// _data = std::move(b2._data);
if (this == &b2) {
return *this;
}
_data = b2._data;
return *this;
}
//交换数据
friend void swap(som_big_object& b1, som_big_object& b2) {
som_big_object temp = std::move(b1);
b1 = std::move(b2);
b2 = std::move(temp);
}
private:
int _data;
};

  • 以上代码的一个小知识:当我们提供了一个 移动构造函数的时候,就不会提供默认的拷贝和移动赋值函数了

  • 接下来我们定义一个类对上面的类做管理,为防止多线程情况下数据混乱, 包含了一个互斥量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class big_object_mgr {
public:
big_object_mgr(int data = 0) :_obj(data) {}
void printinfo() {
std::cout << "current obj data is " << _obj << std::endl;
}
friend void danger_swap(big_object_mgr& objm1, big_object_mgr& objm2);
friend void safe_swap(big_object_mgr& objm1, big_object_mgr& objm2);
friend void safe_swap_scope(big_object_mgr& objm1, big_object_mgr& objm2);
private:
std::mutex _mtx;
som_big_object _obj;
};

  • 接下来定义了三个交换函数看看,三个函数哪些是危险的以及为什么
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
void danger_swap(big_object_mgr& objm1, big_object_mgr& objm2) {
std::cout << "thread [ " << std::this_thread::get_id() << " ] begin" << std::endl;
if (&objm1 == &objm2) {
return;
}
std::lock_guard <std::mutex> gurad1(objm1._mtx);
//此处为了故意制造死锁,我们让线程小睡一会
std::this_thread::sleep_for(std::chrono::seconds(1));
std::lock_guard<std::mutex> guard2(objm2._mtx);
swap(objm1._obj, objm2._obj);
std::cout << "thread [ " << std::this_thread::get_id() << " ] end" << std::endl;
}

void safe_swap(big_object_mgr& objm1, big_object_mgr& objm2) {
std::cout << "thread [ " << std::this_thread::get_id() << " ] begin" << std::endl;
if (&objm1 == &objm2) {
return;
}

// 同时加多个锁
std::lock(objm1._mtx, objm2._mtx);

//领养锁管理它自动释放,std::adopt_lock有了这个操作后gurad1就只会负责解锁而不负责加锁
std::lock_guard <std::mutex> gurad1(objm1._mtx, std::adopt_lock);
//此处为了故意制造死锁,我们让线程小睡一会
std::this_thread::sleep_for(std::chrono::seconds(1));

std::lock_guard <std::mutex> gurad2(objm2._mtx, std::adopt_lock);

swap(objm1._obj, objm2._obj);
std::cout << "thread [ " << std::this_thread::get_id() << " ] end" << std::endl;
}

// 当然上面加锁的方式可以简化,C++17 scope_lock可以对多个互斥量同时加锁,并且自动释放
void safe_swap_scope(big_object_mgr& objm1, big_object_mgr& objm2) {
std::cout << "thread [ " << std::this_thread::get_id() << " ] begin" << std::endl;
if (&objm1 == &objm2) {
return;
}
std::scoped_lock guard(objm1._mtx, objm2._mtx);
//等价于
//std::scoped_lock<std::mutex, std::mutex> guard(objm1._mtx, objm2._mtx);
swap(objm1._obj, objm2._obj);
std::cout << "thread [ " << std::this_thread::get_id() << " ] end" << std::endl;
}

void test_danger_swap() {
big_object_mgr objm1(5);
big_object_mgr objm2(100);
std::thread t1(danger_swap, std::ref(objm1), std::ref(objm2));
std::thread t2(danger_swap, std::ref(objm2), std::ref(objm1));
t1.join();
t2.join();
objm1.printinfo();
objm2.printinfo();
}

void test_safe_swap() {
big_object_mgr objm1(5);
big_object_mgr objm2(100);
std::thread t1(safe_swap, std::ref(objm1), std::ref(objm2));
std::thread t2(safe_swap, std::ref(objm2), std::ref(objm1));
t1.join();
t2.join();
objm1.printinfo();
objm2.printinfo();
}

  • C++17的scoped_lock,用于多个互斥锁的免死锁 RAII 封装器,是一种更加灵活和安全的互斥量管理方式。是一种独占互斥锁,它可以同时锁定多个互斥锁,并保证以原子方式获得所有互斥锁,从而有效避免死锁。

  • 它可以接受多个互斥锁作为参数,并在构造函数中自动锁定这些互斥锁。

  • 当std::scoped_lock对象出作用域时,它会析构并自动解锁所有已经锁定的互斥锁,确保互斥访问的安全性和正确性。

thread_local

  • 这个修饰符的意思是创建一个线程变量,它只会在线程创建时初始化一次,在同一个线程内部的所有对象是共享的,但是不同线程的对象之间不是共享的,也就意味着这个是线程安全的

层级加锁

  • 现实开发中常常很难规避同一个函数内部加多个锁的情况,我们要尽可能避免循环加锁,所以可以自定义一个层级锁,保证实际项目中对多个互斥量加锁时是有序的,这可以用于检测你所加的锁是否会引起死锁
  • 大概原理就是:首先每个线程默认都会有一个层级(优先级)(不同线程不共享),这个优先级是ulong的最大值,还有一个变量是用来计算当前允许加锁的层级(不同线程共享),还有一个变量是记录的上一次线程加锁的层值,当线程的层级大于锁的层级时才可以加锁,否则不能,析构的时候就把层级还原(赋给thread_local的层级变量)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
//层级锁
class hierarchical_mutex {
public:
explicit hierarchical_mutex(unsigned long value) :_hierarchy_value(value),
_previous_hierarchy_value(0) {}


hierarchical_mutex(const hierarchical_mutex&) = delete;
hierarchical_mutex& operator=(const hierarchical_mutex&) = delete;

//加锁
void lock() {
check_for_hierarchy_violation();
_internal_mutex.lock();
update_hierarchy_value();
}

//解锁
void unlock() {
if (_this_thread_hierarchy_value != _hierarchy_value) {
throw std::logic_error("mutex hierarchy violated");
}
_this_thread_hierarchy_value = _previous_hierarchy_value;
_internal_mutex.unlock();
}

//尝试加锁
bool try_lock() {
check_for_hierarchy_violation();
if (!_internal_mutex.try_lock()) {
return false;
}
update_hierarchy_value();
return true;
}
private:

//内在的锁
std::mutex _internal_mutex;
//当前层级值(优先级),只有大于等于这个值的锁才可以加锁
unsigned long const _hierarchy_value;
//上一次层级值
unsigned long _previous_hierarchy_value;
//本线程记录的层级值
static thread_local unsigned long _this_thread_hierarchy_value;

// 检查是否能加锁,当线程当前层级大于允许加锁层级是才可加锁,否则不行
void check_for_hierarchy_violation() {
if (_this_thread_hierarchy_value <= _hierarchy_value) {
throw std::logic_error("mutex hierarchy violated");
}
}

//更新层级
void update_hierarchy_value() {
_previous_hierarchy_value = _this_thread_hierarchy_value;
_this_thread_hierarchy_value = _hierarchy_value;
}
};

// 定义静态成员,thread_local 表示线程本地,是属于一个线程作用域,不是共用的,也就是说t1与t2不共享这个
thread_local unsigned long hierarchical_mutex::_this_thread_hierarchy_value(ULONG_MAX);

void test_hierarchy_lock() {
hierarchical_mutex hmtx1(1000);
hierarchical_mutex hmtx2(500);
std::thread t1([&hmtx1, &hmtx2]() {
hmtx1.lock();
hmtx2.lock();
hmtx2.unlock();
hmtx1.unlock();
});
std::thread t2([&hmtx1, &hmtx2]() {
hmtx2.lock();
hmtx1.lock();
hmtx1.unlock();
hmtx2.unlock();
});
t1.join();
t2.join();
}
  • 层级锁能保证我们每个线程加锁时,一定是先加权重高的锁。并且释放时也保证了顺序。主要原理就是将当前锁的权重保存在线程变量中,这样该线程再次加锁时判断线程变量的权重和锁的权重是否大于,如果满足条件则继续加锁。

C++unique_lock,共享锁和递归锁

unique_lock

  • unique_lock和lock_guard基本用法相同,构造时默认加锁,析构时默认解锁,但unique_lock有个好处就是可以手动解锁。这一点尤为重要,方便我们控制锁住区域的粒度(加锁的范围大小),也能支持和条件变量配套使用,至于条件变量我们之后再介绍,本文主要介绍锁的相关操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void use_unique(){
std::unique_lock<std::mutex> lock(mtx1);
std::cout << "lock success" << std::endl;
share_data++;
//可以手动解锁
lock.unlock();
}

int main()
{
/*som_big_object obj1(100);
som_big_object obj2(100);
obj2 = std::move(obj1);*/
//test_lock();
//test_threadsafe_stack1();
//test_dead_lock();
//test_hierarchy_lock();
return 0;
}

unique_lock的owns_lock判断是否持有锁

  • 我们可以通过unique_lock的owns_lock判断是否持有锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//可判断是否占有锁
void owns_lock() {
//lock可自动解锁,也可手动解锁
std::unique_lock<std::mutex> lock(mtx);
shared_data++;
if (lock.owns_lock()) {
std::cout << "owns lock" << std::endl;
}
else {
std::cout << "doesn't own lock" << std::endl;
}
lock.unlock();
if (lock.owns_lock()) {
std::cout << "owns lock" << std::endl;
}
else {
std::cout << "doesn't own lock" << std::endl;
}
}

unique_lock的延迟加锁defer_lock

  • 允许延迟加锁, std::defer_lock 表示延迟加锁的意思
1
2
3
4
5
6
7
8
9
//可以延迟加锁
void defer_lock() {
//延迟加锁,锁正常初始化,但并没有自动加锁
std::unique_lock<std::mutex> lock(mtx, std::defer_lock);
//可以加锁
lock.lock();
//可以自动析构解锁,也可以手动解锁
lock.unlock();
}

unique_lock的领养加锁

  • 和lock_guard一样,unique_lock也支持领养锁,尽管是领养的,但是打印还是会出现owns lock,因为不管如何锁被加上,就会输出owns lock。
  • 注意:领养一个锁前一定要加锁,否则就会报错,因为你领养前如果不加锁,后面他会自动解锁,而你本来没有加锁就会让一个没加锁的锁解锁就会报错
1
2
3
4
5
6
7
8
9
10
11
void use_own_adopt() {
mtx1.lock();
std::unique_lock<std::mutex> lock(mtx1, std::adopt_lock);
if (lock.owns_lock()) {
std::cout << "owns lock" << std::endl;
}
else {
std::cout << "dose not have the lock" << std::endl;
}
lock.unlock();
}
  • 既然unique_lock支持领养操作也支持延迟加锁,那么可以用两种方式实现前文lock_guard实现的swap操作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//之前的交换代码可以可以用如下方式等价实现
int a = 10;
int b = 99;
std::mutex mtx1;
std::mutex mtx2;
void safe_swap() {
std::lock(mtx1, mtx2);
std::unique_lock<std::mutex> lock1(mtx1, std::adopt_lock);
std::unique_lock<std::mutex> lock2(mtx2, std::adopt_lock);
std::swap(a, b);
// 错误用法,因为此时mtx1已经没有所有权了
//mtx1.unlock();
//mtx2.unlock();
}
void safe_swap2() {
std::unique_lock<std::mutex> lock1(mtx1, std::defer_lock);
std::unique_lock<std::mutex> lock2(mtx2, std::defer_lock);
//需用lock1,lock2加锁
std::lock(lock1, lock2);
//错误用法
//std::lock(mtx1, mtx2);
std::swap(a, b);
}
  • 大家注意一旦mutex被unique_lock管理,加锁和释放的操作就交给unique_lock,不能调用mutex加锁和解锁,因为锁的使用权已经交给unique_lock了
  • unique_lock 在底层实现了移动赋值函数以及移动构造函数,禁止了拷贝构造和拷贝复制函数

  • 我们知道mutex是不支持移动和拷贝的,但是unique_lock支持移动,当一个mutex被转移给unique_lock后,可以通过unique_ptr转移其归属权.

1
2
3
4
5
6
7
8
9
10
11
12
//转移互斥量所有权
//互斥量本身不支持move操作,但是unique_lock支持
std::unique_lock <std::mutex> get_lock() {
std::unique_lock<std::mutex> lock(mtx);
shared_data++;
return lock;
}
void use_return() {
std::unique_lock<std::mutex> lock(get_lock());
shared_data++;
}

锁的粒度

  • 锁的粒度表示加锁(范围)的精细程度,一个锁的粒度足够大,保证可以锁住要访问的共享数据
  • 同时一个锁的粒度足够小,保证非共享数据不被锁住影响效率而unique_lock则很好的支持手动解锁
1
2
3
4
5
6
7
8
9
void precision_lock() {
std::unique_lock<std::mutex> lock(mtx);
shared_data++;
lock.unlock();
//不设计共享数据的耗时操作不要放在锁内执行
std::this_thread::sleep_for(std::chrono::seconds(1));
lock.lock();
shared_data++;
}

共享锁shared_lock以及shared_mutex

  • 试想这样一个场景,对于一个DNS服务,我们可以根据域名查询服务对应的ip地址,它很久才更新一次,比如新增记录,删除记录或者更新记录等。平时大部分时间都是提供给外部查询,对于查询操作,即使多个线程并发查询不加锁也不会有问题,但是当有线程修改DNS服务的ip记录或者增减记录时,其他线程不能查询,需等待修改完再查询。或者等待查询完,线程才能修改。也就是说读操作并不是互斥的,同一时间可以有多个线程同时读,但是写和读是互斥的,写与写是互斥的,简而言之,写操作需要独占锁。而读操作需要共享锁。
  • 要想使用共享锁,需使用共享互斥量std::shared_mutex,std::shared_mutex是C++17标准提出的。
  • C++14标准可以使用std::shared_time_mutex
  • C++11的话可以使用boost
  • std::shared_mutex 和 std::shared_timed_mutex 都是用于实现多线程并发访问共享数据的互斥锁,但它们之间存在一些区别:
    1. std::shared_mutex:
1
2
3
* 提供了 `lock()`, `try_lock()`, 和 `try_lock_for()` 以及 `try_lock_until()` 函数,这些函数都可以用于获取互斥锁。
* 提供了 `try_lock_shared()` 和 `lock_shared()` 函数,这些函数可以用于获取共享锁。
* 当 `std::shared_mutex` 被锁定后,其他尝试获取该锁的线程将会被阻塞,直到该锁被解锁。
  1. std::shared_timed_mutex:
1
2
3
* 与 `std::shared_mutex` 类似,也提供了 `lock()`, `try_lock()`, 和 `try_lock_for()` 以及  `try_lock_until()` 函数用于获取互斥锁。
* 与 `std::shared_mutex` 不同的是,它还提供了 `try_lock_shared()` 和 `lock_shared()` 函数用于获取共享锁,这些函数在尝试获取共享锁时具有超时机制。
* 当 `std::shared_timed_mutex` 被锁定后,其他尝试获取该锁的线程将会被阻塞,直到该锁被解锁,这与 `std::shared_mutex` 相同。然而,当尝试获取共享锁时,如果不能立即获得锁,`std::shared_timed_mutex` 会设置一个超时,超时过后如果仍然没有获取到锁,则操作将返回失败。
  • 因此,std::shared_timed_mutex 提供了额外的超时机制,这使得它在某些情况下更适合于需要处理超时的并发控制。然而,如果不需要超时机制,可以使用更简单的 std::shared_mutex。

  • 下面我们定义了一个DNS类,查询使用共享锁,写数据使用独占锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class DNSserver {
public:
DNSserver(){}
std::string QueryDNS(const std::string& dnsname) {
std::shared_lock<std::shared_mutex> shared_locks(_shared_mtx);
auto iter = _dns_info.find(dnsname);
if (iter != _dns_info.end()) {
return iter->second;
}
return"";
}
void AddDNSInfo(const std::string& dnsname,const std::string& dnsentry) {
std::lock_guard<std::shared_mutex> lock_guards(_shared_mtx);
auto iter = _dns_info.find(dnsname);
if (iter != _dns_info.end()) {
_dns_info[dnsname] = dnsentry;
}
else _dns_info.insert(std::make_pair(dnsname, dnsentry));
}
private:
mutable std::shared_mutex _shared_mtx;
std::map<std::string, std::string> _dns_info;
};

递归锁

  • 有时候我们在实现接口的时候内部加锁,接口内部调用完结束自动解锁。会出现一个接口调用另一个接口的情况,如果用普通的std::mutex就会出现卡死,因为嵌套加锁导致卡死。但是我们可以使用递归锁。

  • 但我个人并不推荐递归锁,可以从设计源头规避嵌套加锁的情况,我们可以将接口相同的功能抽象出来,统一加锁。下面的设计演示了如何使用递归锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class RecursiveDemo {
public:
RecursiveDemo() {}
bool QueryStudent(std::string name) {
std::lock_guard<std::recursive_mutex> recursive_lock(_recursive_mtx);
auto iter_find = _students_info.find(name);
if (iter_find == _students_info.end()) {
return false;
}
return true;
}
void AddScore(std::string name, int score) {
std::lock_guard<std::recursive_mutex> recursive_lock(_recursive_mtx);
if (!QueryStudent(name)) {
_students_info.insert(std::make_pair(name, score));
return;
}
_students_info[name] = _students_info[name] + score;
}
//不推荐采用递归锁,使用递归锁说明设计思路并不理想,需优化设计
//推荐拆分逻辑,将共有逻辑拆分为统一接口
void AddScoreAtomic(std::string name, int score) {
std::lock_guard<std::recursive_mutex> recursive_lock(_recursive_mtx);
auto iter_find = _students_info.find(name);
if (iter_find == _students_info.end()) {
_students_info.insert(std::make_pair(name, score));
return;
}
_students_info[name] = _students_info[name] + score;
return;
}
private:
std::map<std::string, int> _students_info;
std::recursive_mutex _recursive_mtx;
};

  • 我们可以看到AddScore函数内部调用了QueryStudent, 所以采用了递归锁。
  • 但是我们同样可以改变设计,将两者公有的部分抽离出来生成一个新的接口AddScoreAtomic.
  • AddScoreAtomic可以不适用递归锁,照样能完成线程安全操作的目的。

C++线程安全单例模式的演变

局部静态变量

  • C++11之后可以使用静态内部方法来实现,我们知道当一个函数中定义一个局部静态变量,那么这个局部静态变量只会初始化一次,就是在这个函数第一次调用的时候,以后无论调用几次这个函数,函数内的局部静态变量都不再初始化。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
class Single2 {
public:
static Single2& GetInstance() {
static Single2 single;
return single;
}
private:
Single2() {

}
Single2(const Single2&) = delete;
Single2& operator=(const Single2&) = delete;
};

饿汉与懒汉式,详情看面试笔记

智能指针

  • 懒汉与饿汉式存在一个缺陷,存在多重释放或者不知道哪个指针释放的问题。
  • 所以我们想到可以用智能指针来自动释放
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
//可以利用智能指针完成自动回收
class SingleAuto
{
private:
SingleAuto()
{
}
SingleAuto(const SingleAuto&) = delete;
SingleAuto& operator=(const SingleAuto&) = delete;
public:
~SingleAuto()
{
std::cout << "single auto delete success " << std::endl;
}
static std::shared_ptr<SingleAuto> GetInst()
{
if (single != nullptr)
{
return single;
}
s_mutex.lock();
if (single != nullptr)
{
s_mutex.unlock();
return single;
}
single = std::shared_ptr<SingleAuto>(new SingleAuto);
s_mutex.unlock();
return single;
}
private:
static std::shared_ptr<SingleAuto> single;
static std::mutex s_mutex;
};

std::shared_ptr<SingleAuto> SingleAuto::single = nullptr;
std::mutex SingleAuto::s_mutex;
void test_singleauto()
{
auto sp1 = SingleAuto::GetInst();
auto sp2 = SingleAuto::GetInst();
std::cout << "sp1 is " << sp1 << std::endl;
std::cout << "sp2 is " << sp2 << std::endl;
//此时存在隐患,可以手动删除裸指针,造成崩溃
// delete sp1.get();
}
  • 这样开辟的资源交给智能指针管理免去了回收资源的麻烦。但是有些人觉得虽然智能指针能自动回收内存,如果有开发人员手动delete指针怎么办?
  • 可以直接把析构函数设为私有吗? 不行,因为析构的时候会调用智能指针的析构函数,在智能指针的析构函数里会调用单例模式的析构,而单例模式的析构是私有无法析构
  • 所以有人提出了利用辅助类帮助智能指针释放资源,将智能指针的析构设置为私有

改进方案

  • 辅助类帮助智能指针释放资源,将智能指针的析构设置为私有
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
//为了规避用户手动释放内存,可以提供一个辅助类帮忙回收内存
//并将单例类的析构函数写为私有
class SingleAutoSafe;
class SafeDeletor
{
public:
void operator()(SingleAutoSafe* sf)
{
std::cout << "this is safe deleter operator()" << std::endl;
delete sf;
}
};
class SingleAutoSafe
{
private:
SingleAutoSafe() {}
~SingleAutoSafe()
{
std::cout << "this is single auto safe deletor" << std::endl;
}
SingleAutoSafe(const SingleAutoSafe&) = delete;
SingleAutoSafe& operator=(const SingleAutoSafe&) = delete;
//定义友元类,通过友元类调用该类析构函数
friend class SafeDeletor;
public:
static std::shared_ptr<SingleAutoSafe> GetInst()
{
//1处
if (single != nullptr)
{
return single;
}
s_mutex.lock();
//2处
if (single != nullptr)
{
s_mutex.unlock();
return single;
}
//额外指定删除器
//3 处
single = std::shared_ptr<SingleAutoSafe>(new SingleAutoSafe, SafeDeletor());
//也可以指定删除函数
// single = std::shared_ptr<SingleAutoSafe>(new SingleAutoSafe, SafeDelFunc);
s_mutex.unlock();
return single;
}
private:
static std::shared_ptr<SingleAutoSafe> single;
static std::mutex s_mutex;
};
  • SafeDeletor就是删除的辅助类,实现了仿函数。构造智能指针时指定了SafeDeletor对象,这样就能帮助智能指针释放了。

  • 但是上面的代码存在危险,比如懒汉式的使用方式,当多个线程调用单例时,有一个线程加锁进入3处的逻辑。

  • 其他的线程有的在1处,判断指针非空则跳过初始化直接使用单例的内存会存在问题。
    主要原因在于SingleAutoSafe * temp = new SingleAutoSafe() 这个操作是由三部分组成的

    1. 调用allocate开辟内存
    2. 调用construct执行SingleAutoSafe的构造函数
    3. 调用赋值操作将地址赋值给temp
  • 而现实中2和3的步骤可能颠倒,所以有可能在一些编译器中通过优化是1,3,2的调用顺序,
    其他线程取到的指针就是非空,还没来的及调用构造函数就交给外部使用造成不可预知错误。
    为解决这个问题,C++11 推出了std::call_once函数保证多个线程只执行一次

call_once

  • C++11 提出了call_once函数,我们可以配合一个局部的静态变量once_flag实现线程安全的初始化。
  • 多线程调用call_once函数时,会判断once_flag是否被初始化,如没被初始化则进入初始化流程,调用我们提供的初始化函数。但是同一时刻只有一个线程能进入这个初始化函数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class SingletonOnce {
private:
SingletonOnce() = default;
SingletonOnce(const SingletonOnce&) = delete;
SingletonOnce& operator = (const SingletonOnce& st) = delete;
static std::shared_ptr<SingletonOnce> _instance;
public :
static std::shared_ptr<SingletonOnce> GetInstance() {
static std::once_flag s_flag;
std::call_once(s_flag, [&]() {
_instance = std::shared_ptr<SingletonOnce>(new SingletonOnce);
});
return _instance;
}
void PrintAddress() {
std::cout << _instance.get() << std::endl;
}
~SingletonOnce() {
std::cout << "this is singleton destruct" << std::endl;
}
};
std::shared_ptr<SingletonOnce> SingletonOnce::_instance = nullptr;
void TestSingle() {
std::thread t1([]() {
std::this_thread::sleep_for(std::chrono::seconds(1));
SingletonOnce::GetInstance()->PrintAddress();
});
std::thread t2([]() {
std::this_thread::sleep_for(std::chrono::seconds(1));
SingletonOnce::GetInstance()->PrintAddress();
});
t1.join();
t2.join();
}
  • 为了使用单例类更通用,比如项目中使用多个单例类,可以通过继承实现多个单例类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//为了让单例更加通用,可以做成模板类
template <typename T>
class Singleton {
protected:
Singleton() = default;
Singleton(const Singleton<T>&) = delete;
Singleton& operator=(const Singleton<T>& st) = delete;
static std::shared_ptr<T> _instance;
public:
static std::shared_ptr<T> GetInstance() {
static std::once_flag s_flag;
std::call_once(s_flag, [&]() {
_instance = std::shared_ptr<T>(new T);
});
return _instance;
}
void PrintAddress() {
std::cout << _instance.get() << std::endl;
}
~Singleton() {
std::cout << "this is singleton destruct" << std::endl;
}
};
template <typename T>
std::shared_ptr<T> Singleton<T>::_instance = nullptr;

//想使用单例类,可以继承上面的模板,我们在网络编程中逻辑单例类用的就是这种方式
class LogicSystem :public Singleton<LogicSystem>
{
friend class Singleton<LogicSystem>;
public:
~LogicSystem(){}
private:
LogicSystem(){}
};

总结1

  • 如果你只是实现一个简单的单例类推荐使用返回局部静态变量的方式
  • 如果想大规模实现多个单例类可以用call_once实现的模板类。

C++11多线程同步

  • 在多线程编程中,有许多情况需要线程之间的同步,比如你得先登录才能执行其他操作等情况,这可以通过std::futurestd::promise来实现

使用future与promise

  • std::future是一个对象,可以从某个对象或函数获取值,并在不同线程之间提供恰当的同步访问。
  • std::promise是可以存储类型T的值的对象,该值可以被另一线程的std::future对象获取,并提供了同步机制。

  • 简单来说就是,promise存储了一个在未来会改变其值(类型为T)的对象,并且在未来会改变该值,而future则存储一个已知其值在将来会改变的值,并且在该值改变后可以线程安全地读取。

  • 这两一般配合使用,以下为线程间共享一个需要另一个线程设置变量时的代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
void init(std::promise<int> * pro) {
std::cout << "Thread B" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(3));
pro->set_value(23);
}

void test_future() {
std::promise<int> promObj;
// 将promise<T>转成future<T>
std::future<int> futObj = promObj.get_future();
std::thread t1(init, &promObj);
// 只有在promise被设置值的时候这个才不会被阻塞,否则就会一直阻塞直到设置值为止
std::cout << futObj.get() << std::endl;
t1.join();
}
  • future与promise的关系:promise就相当于生产者生成的一个东西,消费者通过把它转换为future来读取这个东西,也就是说生产者线程可以使用 std::promise 设置异步任务的结果,而消费者线程可以通过 std::future 等待异步任务的完成并获取结果

  • 实现线程间的同步操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
void loginSrv(std::promise<bool> * promObj)
{
std::cout << "Thread B" << std::endl;
// 模拟登录操作,假设登录成功
std::this_thread::sleep_for(std::chrono::seconds(3));
promObj->set_value(true); // 设置登录结果为 true
}

int main()
{
std::promise<bool> promiseObj;
std::future<bool> futureObj = promiseObj.get_future();

std::thread th(loginSrv, &promiseObj); // 启动线程B

// 等待登录结果,最多等待10秒
auto status = futureObj.wait_for(std::chrono::seconds(10));
if (status == std::future_status::ready) {
// 异步操作已完成
if (futureObj.get()) {
// 登录成功
std::cout << "Login succeeded!" << std::endl;
// 进行登录成功后的操作
} else {
// 登录失败
std::cout << "Login failed!" << std::endl;
// 进行登录失败后的操作
}
} else if (status == std::future_status::timeout) {
// 超时未登录成功
std::cout << "Timeout! Login failed." << std::endl;
// 进行超时后的操作
}

th.join();

return 0;
}
  • future对象的wait_for函数阻塞等待结果变得可用,可用的标志为以下两种情况之一:

  • 设置的时间超时

  • 共享对象的状态变为ready

  • 原型如下:

1
2
3
template< class Rep, class Period >
std::future_status wait_for( const std::chrono::duration<Rep,Period>& timeout_duration ) const;

  • 返回值标识了结果的状态,为:

    1. future_status::deferred :计算结果的函数未启动
    2. future_status::ready:结果ready
    3. future_status::timeout:超时

使用条件变量来实现同步

  • 当某个线程的执行需要另一个线程完成后才能进行,可以使用条件变量(condition_variable)。
  • 也可以把它理解为信号通知机制,一个线程负责发送信号,其他线程等待该信号的触发,condition_variable 存在一些问题,如虚假唤醒

  • 通知方

  • 获取 std::mutex, 通常是 std::lock_guard

  • 修改共享变量(即使共享变量是原子变量,也需要在互斥对象内进行修改,以保证正确地将修改发布到等待线程)
  • 在 condition_variable 上执行 notify_one/notify_all 通知条件变量(该操作不需要锁)

  • 等待方

  • 获取相同的std::mutex,使用std::unique_lock

  • 执行wait,wait_for或wait_until(该操作会自动释放锁并阻塞)
  • 接收到条件变量通知、超时或者发生虚假唤醒时,线程被唤醒,并自动获取锁。唤醒的线程负责检查共享变量,如果是虚假唤醒,则应继续等待

  • 注意:

    1. std :: condition_variable仅适用于 std::unique_lock, 此限制允许在某些平台上获得最大效率。
    2. std :: condition_variable_any提供可与任何BasicLockable对象一起使用的条件变量,例如std :: shared_lock。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
std::mutex mtx_num;
std::condition_variable cvA;
std::condition_variable cvB;
int num = 1;
void ResonableImplemention() {
std::thread t1([]() {
for (; ;) {
std::unique_lock<std::mutex> lock(mtx_num);
//方法一
cvA.wait(lock, []() {
return num == 1;
});
num++;
std::cout << "thread A point 1......" << std::endl;
cvB.notify_one();
}
});
std::thread t2([]() {
for (;;) {
std::unique_lock<std::mutex> lock(mtx_num);
//方法二
while (num != 2) {
cvB.wait(lock);
}
num--;
std::cout << "thread B point 2......." << std::endl;
cvA.notify_one();
}
});
t1.join();
t2.join();
}

利用条件变量实现线程安全队列

  • 如何让线程按一定顺序执行,也就是说如何控制并发的同步操作?可以用一个变量num 当num为1,执行线程A,当num为2执行线程B, 如果当num为1时碰到了线程B 就直接让线程b睡一会,实现大概如下:

不良实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
int num = 1;
void PoorImpleman() {
std::thread t1([]() {
for (;;) {
{
std::lock_guard<std::mutex> lock(mtx_num);
if (num == 1) {
std::cout << "thread A print 1....." << std::endl;
num++;
continue;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
});
std::thread t2([]() {
for (;;) {
{
std::lock_guard<std::mutex> lock(mtx_num);
if (num == 2) {
std::cout << "thread B print 2....." << std::endl;
num--;
continue;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
});
t1.join();
t2.join();
}
  • PoorImpleman虽然能实现我们交替打印的功能,会造成消息处理的不及时,因为线程A要循环检测num值,如果num不为1,则线程A就睡眠了,在线程A睡眠这段时间很可能B已经处理完打印了,此时A还在睡眠,是对资源的浪费,也错过了最佳的处理时机。

条件变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
std::mutex mtx_num;
std::condition_variable cvA;
std::condition_variable cvB;
int num = 1;
void ResonableImplemention() {
std::thread t1([]() {
for (; ;) {
std::unique_lock<std::mutex> lock(mtx_num);
//方法一
cvA.wait(lock, []() {
return num == 1;
});
num++;
std::cout << "thread A point 1......" << std::endl;
cvB.notify_one();
}
});
std::thread t2([]() {
for (;;) {
std::unique_lock<std::mutex> lock(mtx_num);
//方法二
while (num != 2) {
cvB.wait(lock);
}
num--;
std::cout << "thread B point 2......." << std::endl;
cvA.notify_one();
}
});
t1.join();
t2.join();
}
  • 当条件不满足时(num 不等于1 时)cvA.wait就会挂起,等待线程B通知通知线程A唤醒,线程B采用的是cvA.notifyone。
  • 这么做的好处就是线程交替处理非常及时。比起sleep的方式,我们可以从控制台上看出差异效果,sleep的方式看出日志基本是每隔1秒才打印一次,效率不高。

实现安全队列

  • 之前我们实现过线程安全的栈,对于pop操作,我们如果在线程中调用empty判断是否为空,如果不为空,则pop,因为empty和pop内部分别加锁,是两个原子操作,导致pop时可能会因为其他线程提前pop导致队列为空,从而引发崩溃。我们当时的处理方式是实现了两个版本的pop,一种是返回智能指针类型,一种通过参数为引用的方式返回。对于智能指针版本我们发现队列为空则返回空指针,对于引用版本,发现队列为空则抛出异常,这么做并不是很友好,所以我们可以通过条件变量完善之前的程序,不过这次我们重新实现一个线程安全队列。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
template<typename T>
class threadsafe_queue {
public:
threadsafe_queue() = default;
threadsafe_queue(const threadsafe_queue& q) {
//防止拷贝构造的时候被拷贝对象执行了其他操作
std::lock_guard<std::mutex> lock(q.mtx);
tq = q.tq;
}
threadsafe_queue(const threadsafe_queue&& q) {
std::lock_guard<std::mutex> lock(q.mtx);
tq = std::move(q.tq);
}
threadsafe_queue<T>& operator = (const threadsafe_queue & q){
if (this == &q) return *this;
std::lock_guard<std::mutex> lock(q.mtx);
tq = q.tq;
return *this;
}
threadsafe_queue<T>& operator = (const threadsafe_queue&& q) noexcept {
if (this == &q) return *this;
std::lock_guard<std::mutex> lock(q.mtx);
tq = std::move(q.tq);
return *this;
}
void push(const T value) {
std::lock_guard<std::mutex> lock(mtx);
tq.push(value);
//加入这条语句是为了让其他因为队列为空而挂起的线程继续执行
cvA.notify_one();
}
void wait_and_pop(T& value) {
std::unique_lock<std::mutex> lock(mtx);
//[this] 就是捕获这个类 使得lambda表达式可以使用该类的所有函数
cvA.wait(lock, [this]() {return !tq.empty(); });
// 只有一次拷贝
value = tq.front();
tq.pop();
}
std::shared_ptr<T> wait_and_pop() {
std::unique_lock<std::mutex> lock(mtx);
cvA.wait(lock, [this]() {return !tq.empty(); });
std::shared_ptr<T> res = std::make_shared<T>(tq.front());
tq.pop();
return res;
}
bool try_pop(T& value) {
std::lock_guard<std::mutex> lock(mtx);
if (tq.empty()) return false;
value = tq.front();
tq.pop();
return true;
}
std::shared_ptr<T> try_pop() {
std::lock_guard<std::mutex> lock(mtx);
if (tq.empty()) return std::shared_ptr<T>();
std::shared_ptr<T> res = std::make_shared<T>(tq.front());
tq.pop();
return res;
}
bool empty()const {
std::lock_guard<std::mutex> lock(mtx);
return tq.empty();
}

private:
mutable std::mutex mtx;
std::condition_variable cvA;
std::queue<T> tq;
};

void test_safe_que() {
threadsafe_queue<int> safe_que;
std::mutex mtx_print;
std::thread producer(
[&]() {
for (int i = 0; ; i++) {
safe_que.push(i);
{
// 为了打印完整一行
std::lock_guard<std::mutex> printlk(mtx_print);
std::cout << "producer push data is " << i << std::endl;
}
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
}
);
std::thread consumer1(
[&]() {
for (;;) {
auto data = safe_que.wait_and_pop();
{
std::lock_guard<std::mutex> printlk(mtx_print);
std::cout << "consumer1 wait and pop data is " << *data << std::endl;
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}
);
std::thread consumer2(
[&]() {
for (;;) {
auto data = safe_que.try_pop();
if (data != nullptr) {
{
std::lock_guard<std::mutex> printlk(mtx_print);
std::cout << "consumer2 try_pop data is " << *data << std::endl;
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
}
);
producer.join();
consumer1.join();
consumer2.join();
}
  • 我们可以启动三个线程,一个producer线程用来向队列中放入数据。一个consumer1线程用来阻塞等待pop队列中的元素。

  • 另一个consumer2尝试从队列中pop元素,如果队列为空则直接返回,如果非空则pop元素。

  • 打印时为了保证线程输出在屏幕上不会乱掉,所以加了锁保证互斥输出

并发三剑客async,promise,future

async用法

  • std::async 是一个用于异步执行函数的模板函数,它返回一个 std::future 对象,该对象用于获取函数的返回值。
  • 以下是一个使用 std::async 的示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#include <iostream>
#include <future>
#include <chrono>
// 定义一个异步任务
std::string fetchDataFromDB(std::string query) {
// 模拟一个异步任务,比如从数据库中获取数据
std::this_thread::sleep_for(std::chrono::seconds(5));
return "Data: " + query;
}
int main() {
// 使用 std::async 异步调用 fetchDataFromDB
std::future<std::string> resultFromDB = std::async(std::launch::async, fetchDataFromDB, "Data");
// 在主线程中做其他事情
std::cout << "Doing something else..." << std::endl;
// 从 future 对象中获取数据
std::string dbData = resultFromDB.get();
std::cout << dbData << std::endl;
return 0;
}
  • 在这个示例中,std::async 创建了一个新的线程(或从内部线程池中挑选一个线程)并自动与一个 std::promise对象相关联。std::promise 对象被传递给 fetchDataFromDB 函数,函数的返回值被存储在 std::future 对象中。在主线程中,我们可以使用 std::future::get 方法从 std::future 对象中获取数据。注意,在使用 std::async 的情况下,我们必须使用 std::launch::async 标志来明确表明我们希望函数异步执行

async的启动策略

  • std::async函数可以接受几个不同的启动策略,这些策略在std::launch枚举中定义。除了std::launch::async之外,还有以下启动策略:

    1. std::launch::deferred:这种策略意味着任务将在调用std::future::get()或std::future::wait()函数时延迟执行。换句话说,任务将在需要结果时同步执行。
    2. std::launch::async | std::launch::deferred:这种策略是上面两个策略的组合。任务可以在一个单独的线程上异步执行,也可以延迟执行,具体取决于实现。
  • 默认情况下,std::async使用std::launch::async | std::launch::deferred策略。这意味着任务可能异步执行,也可能延迟执行,具体取决于实现。需要注意的是,不同的编译器和操作系统可能会有不同的默认行为。

future的wait和get

  • std::future::get() 和 std::future::wait() 是 C++ 中用于处理异步任务的两个方法,它们的功能和用法有一些重要的区别。
  1. std::future::get():
    • std::future::get() 是一个阻塞调用,用于获取 std::future 对象表示的值或异常。如果异步任务还没有完成,get() 会阻塞当前线程,直到任务完成。如果任务已经完成,get() 会立即返回任务的结果。重要的是,get() 只能调用一次,因为它会移动或消耗掉 std::future 对象的状态。一旦 get() 被调用,std::future 对象就不能再被用来获取结果。
  2. std::future::wait():
    • std::future::wait() 也是一个阻塞调用,但它与 get() 的主要区别在于 wait() 不会返回任务的结果。它只是等待异步任务完成。如果任务已经完成,wait() 会立即返回。如果任务还没有完成,wait() 会阻塞当前线程,直到任务完成。与 get() 不同,wait() 可以被多次调用,它不会消耗掉 std::future 对象的状态。
  • 总结下这两个方法的主要区别:
    1. std::future::get() 用于获取并返回任务的结果,而 std::future::wait() 只是等待任务完成
    2. get() 只能调用一次,而 wait() 可以被多次调用。
    3. 如果任务还没有完成,get() 和 wait() 都会阻塞当前线程,但 get() 会一直阻塞直到任务完成并返回结果,而 wait() 只是在等待任务完成。
  • 可以使用std::future的wait_for()或wait_until()方法来检查异步操作是否已完成。这些方法返回一个表示操作状态的std::future_status值。
1
2
3
4
5
if(fut.wait_for(std::chrono::seconds(2)) == std::future_status::ready){
//操作已完成
}else{
//操作失败
}

将任务和future关联(paskaged_task)

  • std::packaged_task和std::future是C++11中引入的两个类,它们用于处理异步任务的结果。
  • std::packaged_task是一个可调用目标,它包装了一个任务,该任务可以在另一个线程上运行。它可以捕获任务的返回值或异常,并将其存储在std::future对象中,以便以后使用。
  • std::future代表一个异步操作的结果。它可以用于从异步任务中获取返回值或异常。
  • 以下是使用std::packaged_task和std::future的基本步骤:
    1. 创建一个std::packaged_task对象,该对象包装了要执行的任务。
    2. 调用std::packaged_task对象的get_future()方法,该方法返回一个与任务关联的std::future对象。
    3. 在另一个线程上调用std::packaged_task对象的operator(),以执行任务。
    4. 在需要任务结果的地方,调用与任务关联的std::future对象的get()方法,以获取任务的返回值或异常
  • 以下是一个简单的示例代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int my_task() {
std::this_thread::sleep_for(std::chrono::seconds(5));
std::cout << "my task run 5 s" << std::endl;
return 42;
}
void use_package() {
// 创建一个包装了任务的 std::packaged_task 对象
std::packaged_task<int()> task(my_task);
// 获取与任务关联的 std::future 对象
std::future<int> result = task.get_future();
// 在另一个线程上执行任务
std::thread t(std::move(task));
t.detach(); // 将线程与主线程分离,以便主线程可以等待任务完成
// 等待任务完成并获取结果
int value = result.get();
std::cout << "The result is: " << value << std::endl;
}
  • 在上面的示例中,我们创建了一个包装了任务的std::packaged_task对象,并获取了与任务关联的std::future对象。然后,我们在另一个线程上执行任务,并等待任务完成并获取结果。最后,我们输出结果。

  • 我们可以使用 std::function 和 std::package_task 来包装带参数的函数。std::package_task 是一个模板类,它包装了一个可调用对象,并允许我们将其作为异步任务传递。

  • 注意在传入package_task对象给线程的时候,一定要使用std::move函数

promise的用法

  • C++11引入了std::promise和std::future两个类,用于实现异步编程。std::promise用于在某一线程中设置某个值或异常,而std::future则用于在另一线程中获取这个值或异常。
  • 下面是std::promise的基本用法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
void set_value(std::promise<int> prom) {
std::this_thread::sleep_for(std::chrono::seconds(3));
// 设置 promise 的值
prom.set_value(10);
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "value set success" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(5));
std::cout << "子线程执行完毕" << std::endl;
}

void use_promise() {
// 创建一个 promise 对象
std::promise<int> prom;
// 获取与 promise 相关联的 future 对象
std::future<int> fut = prom.get_future();
// 在新线程中设置 promise 的值,传入参数的时候一定要使用move
std::thread t(set_value, std::move(prom));
//在主线程中获取future的值
std::cout << "Waiting for the thread to set the value...\n";
std::cout << "Value set by the thread: " << fut.get() << '\n';
t.join();
}
  • 程序输出
1
2
3
4
Waiting for the thread to set the value...
Value set by the thread: 10
value set success
子线程执行完毕
  • 在上面的代码中,我们首先创建了一个std::promise对象,然后通过调用get_future()方法获取与之相关联的std::future对象。然后,我们在新线程中通过调用set_value()方法设置promise的值,并在主线程中通过调用fut.get()方法获取这个值。注意,在调用fut.get()方法时,如果promise的值还没有被设置,则该方法会阻塞当前线程,直到值被设置为止。
  • 注意promise的值被设置后就可以被get,而不需要等待子线程执行完成才能get,这个就是和前面package_task的区别之一

  • 除了set_value()方法外,std::promise还有一个set_exception()方法,用于设置异常。该方法接受一个std::exception_ptr参数,该参数可以通过调用std::current_exception()方法获取。

  • 下面是一个例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void set_exception(std::promise<void> prom) {
try {
//抛出一个异常
throw std::runtime_error("An error occurred!");
}
catch (...) {
//设置 promise的异常
prom.set_exception(std::current_exception());
}
}

void use_promise_exception() {
std::promise<void> prom;
std::future<void> fut = prom.get_future();
std::thread t(set_exception, std::move(prom));
// 在主线程中获取 future 的异常
try {
std::cout << "Waiting for the thread to set the exception...\n";
fut.get();
}
catch (const std::exception& e) {
std::cout << "Exception set by the thread: " << e.what() << '\n';
}
std::cout << "hello world!" << std::endl;
t.join();
}
  • 子线程抛出了异常一定要在主线程里也捕获异常

  • 当然我们使用promise时要注意一点,如果promise被释放了,而其他的线程还未使用与promise关联的future,当其使用这个future时会报错。

共享类型的future

  • 当我们需要多个线程等待同一个执行结果时,需要使用std::shared_future
  • 以下是一个适合使用std::shared_future的场景,多个线程等待同一个异步操作的结果:
  • 假设你有一个异步任务,需要多个线程等待其完成,然后这些线程需要访问任务的结果。在这种情况下,你可以使用std::shared_future来共享异步任务的结果。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
void myFunction(std::promise<int>&& promise) {
// 模拟一些工作
std::this_thread::sleep_for(std::chrono::seconds(1));
promise.set_value(42); // 设置 promise 的值
}
void threadFunction(std::shared_future<int> future) {
try {
int result = future.get();
std::cout << "Result: " << result << std::endl;
}
catch (const std::future_error& e) {
std::cout << "Future error: " << e.what() << std::endl;
}
}
void use_shared_future() {
std::promise<int> promise;
std::shared_future<int> future = promise.get_future();
std::thread myThread1(myFunction, std::move(promise)); // 将 promise 移动到线程中
// 使用 share() 方法获取新的 shared_future 对象
std::thread myThread2(threadFunction, future);
std::thread myThread3(threadFunction, future);
myThread1.join();
myThread2.join();
myThread3.join();
}
  • 在这个示例中,我们创建了一个std::promise对象promise和一个与之关联的std::shared_future对象future。然后,我们将promise对象移动到另一个线程myThread1中,该线程将执行myFunction函数,并在完成后设置promise的值。我们还创建了两个线程myThread2和myThread3,它们将等待future对象的结果。如果myThread1成功地设置了promise的值,那么future.get()将返回该值。这些线程可以同时访问和等待future对象的结果,而不会相互干扰。
  • 但是大家要注意,如果一个future被移动给两个shared_future是错误的。
1
2
3
4
5
6
7
8
9
10
void use_shared_future_error() {
std::promise<int> promise;
std::shared_future<int> future = promise.get_future();
std::thread myThread1(myFunction, std::move(promise)); // 将 promise 移动到线程中
std::thread myThread2(threadFunction, std::move(future));
std::thread myThread3(threadFunction, std::move(future));
myThread1.join();
myThread2.join();
myThread3.join();
}
  • 这种用法是错误的,一个future通过隐式构造传递给shared_future之后,这个shared_future被移动传递给两个线程是不合理的,因为第一次移动后shared_future的生命周期被转移了,接下来myThread3构造时用的std::move(future)future已经失效了,会报错,一般都是no state 之类的错误。

异常处理1

  • std::future 是C++的一个模板类,它用于表示一个可能还没有准备好的异步操作的结果。你可以通过调用 std::future::get 方法来获取这个结果。如果在获取结果时发生了异常,那么 std::future::get 会重新抛出这个异常。

以下是一个例子,演示了如何在 std::future 中获取异常:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void may_throw()
{
// 这里我们抛出一个异常。在实际的程序中,这可能在任何地方发生。
throw std::runtime_error("Oops, something went wrong!");
}
int main()
{
// 创建一个异步任务
std::future<void> result(std::async(std::launch::async, may_throw));
try
{
// 获取结果(如果在获取结果时发生了异常,那么会重新抛出这个异常)
result.get();
}
catch (const std::exception &e)
{
// 捕获并打印异常
std::cerr << "Caught exception: " << e.what() << std::endl;
}
return 0;
}
  • 在这个例子中,我们创建了一个异步任务 may_throw,这个任务会抛出一个异常。然后,我们创建一个 std::future 对象 result 来表示这个任务的结果。在 main 函数中,我们调用 result.get() 来获取任务的结果。如果在获取结果时发生了异常,那么 result.get() 会重新抛出这个异常,然后我们在 catch 块中捕获并打印这个异常。

线程池

  • 我们可以利用上面提到的std::packaged_task和std::promise构建线程池,提高程序的并发能力。
    先了解什么是线程池:

  • 线程池是一种多线程处理形式,它处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。

-线程池可以避免在处理短时间任务时创建与销毁线程的代价,它维护着多个线程,等待着监督管理者分配可并发执行的任务,从而提高了整体性能。

  • 线程池注意事项
  • 保证并发,且执行时无序的(可以用线程池)
  • 互斥性很大,强关联(不能用线程池)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
#include <iostream>
#include <thread>
#include <condition_variable>
#include <atomic>
#include <future>
#include <queue>
#include <vector>
#include <mutex>

class ThreadPool {
public:
using Task = std::packaged_task<void()>;
~ThreadPool() {
stop();
}
static ThreadPool& getInstance() {
static ThreadPool ins;
return ins;
}
// 返回一个future,将一个可调用函数封装成无参数无返回值类型
template <class F, class... Args>
auto commit(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {
using RetType = decltype(f(args...));
if (stop_.load()) return std::future<RetType>{};
// 打包一个无参数的可调用函数
auto task = std::make_shared<std::packaged_task<RetType()>>(
//重新绑定可调用函数,让有参形式变得无参
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
/*auto task = std::make_shared<std::packaged_task<RetType()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));*/
std::future<RetType> ret = task->get_future();
{
//由于队列被多线程共享所以需要互斥添加
std::lock_guard<std::mutex> cv_mt(cv_mt_);
//调用packaged_task的operator()让有返回值函数变为无返回值
tasks_.emplace([task] {(*task)(); });
}
//通知被阻塞的线程
cv_lock_.notify_one();
return ret;
}

int idleThreadCount() {
return thread_nums_;
}
private:
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
ThreadPool(unsigned int num = 5): stop_(false) {
if (num < 1) thread_nums_ = 1;
else thread_nums_ = num;
start();
}
//创建num个线程
void start() {
for (int i = 0; i < thread_nums_; i++) {
pool_.emplace_back([this]() {
while (!stop_.load()) {
Task task;
{
std::unique_lock<std::mutex> cv_mt(cv_mt_);
this->cv_lock_.wait(cv_mt, [this] {return this->stop_.load() || !this->tasks_.empty(); });
if (this->tasks_.empty()) return;
//因为package_task里需要右值
task = std::move(this->tasks_.front());
//首先++,--操作是原子操作其次它是在构造函数里进行的,不存在创建线程的时候thread_nums_变化
this->tasks_.pop();
}
this->thread_nums_--;
task();
this->thread_nums_++;
}
});
}
}
void stop() {
stop_.store(true);
cv_lock_.notify_all();
// 当线程还没执行完任务时让他先执行完
for (auto& td : pool_) {
if (td.joinable()) {
std::cout << "join thread " << td.get_id() << std::endl;
td.join();
}
}
}
private:
std::mutex cv_mt_;
std::condition_variable cv_lock_;
//表示线程池的启动状态,false表示启动
std::atomic_bool stop_;
//表示线程池的线程数量
std::atomic_int thread_nums_;
//任务队列
std::queue<Task> tasks_;
//存放预先创建好的线程
std::vector<std::thread> pool_;
};

int main() {
auto& tp = ThreadPool::getInstance();
int m = 0;
tp.commit([](int& m) {
m = 1024;
std::cout << "inner set m is " << m << std::endl;
std::cout << "m address is " << &m << std::endl;
}, m);
tp.commit([](int& m) {
m = 1024;
std::cout << "inner set m is " << m << std::endl;
std::cout << "m address is " << &m << std::endl;
}, std::ref(m));
return 0;
}

两种并发设计模式

Actor设计模式

  • 简单点说,actor通过消息传递的方式与外界通信。消息传递是异步的。每个actor都有一个邮箱,该邮箱接收并缓存其他actor发过来的消息,actor一次只能同步处理一个消息,处理消息过程中,除了可以接收消息,不能做任何其他操作。
    每一个类独立在一个线程里称作Actor,Actor之间通过队列通信,比如Actor1 发消息给Actor2, Actor2 发消息给Actor1都是投递到对方的队列中。好像给对方发邮件,对方从邮箱中取出一样。如下图:

示例图片

  • Actor模型的另一个好处就是可以消除共享状态,因为它每次只能处理一条消息,所以actor内部可以安全的处理状态,而不用考虑锁机制。

示例图片

  • 我们在网络编程中对于逻辑层的处理就采用了将要处理的逻辑消息封装成包投递给逻辑队列,逻辑类从队列中消费的思想,其实就是一种Actor设计模式。Erlang是天然支持Actor的语言。

  • 应用场景
    一个游戏有很多地图,每个地图都由一个独立的线程管理,在每个地图上都能购物,如果只有一个购物类给其他线程提供购物接口,其他线程调用找个接口,为了考虑线程安全性就得考虑频繁加锁,并且如果购物类崩了,就会导致用到这个接口的所有线程都会崩,所以可以采用Actor设计模式,将一个购物类安排在一个线程里,并且它有一个线程安全队列来接受购物请求,其他线程需要购物的时往这个队列加入请求,购物类消费请求,这样就可以做到解耦,并且如果购物类崩了很容易排查

CSP模式

  • SP 是 Communicating Sequential Process 的简称,中文可以叫做通信顺序进程,是一种并发编程模型,是一个很强大的并发数据模型,是上个世纪七十年代提出的,用于描述两个独立的并发实体通过共享的通讯 channel(管道)进行通信的并发模型。相对于Actor模型,CSP中channel是第一类对象,它不关注发送消息的实体,而关注与发送消息时使用的channel。go是天然支持csp模式的语言。

  • CSP和Actor类似,只不过CSP将消息投递给channel,至于谁从channel中取数据,发送的一方是不关注的。简单的说Actor在发送消息前是直到接收方是谁,而接受方收到消息后也知道发送方是谁,更像是邮件的通信模式。而csp是完全解耦合的。

示例图片

  • C++ 风格CSP
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
template<class T>
class channel {
public:
channel(size_t capacity = 0):close_(false), capacity_(capacity){}
bool send(T value) {
std::unique_lock<std::mutex> lock(mtx_);
cv_producer_.wait(lock, [this]() {
return (capacity_ == 0 && queue_.empty()) || close_ || queue_.size() < capacity_;
});
if (close_) return false;
queue_.push(value);
cv_consumer_.notify_one();
return true;
}
bool receive(T& value) {
std::unique_lock<std::mutex> lock(mtx_);
cv_consumer_.wait(lock, [this]() {
return !queue_.empty() || close_;
});
if (close_ && queue_.empty()) return false;
value = queue_.front();
queue_.pop();
cv_producer_.notify_one();
return true;
}
void close() {
std::unique_lock<std::mutex> lock(mtx_);
close_ = true;
cv_consumer_.notify_all();
cv_producer_.notify_all();
}
private:
std::queue<int> queue_;
std::mutex mtx_;
std::condition_variable cv_producer_;
std::condition_variable cv_consumer_;
size_t capacity_;
bool close_;
};
void use_channel() {
channel<int> ch(10);
std::thread producer([&]() {
for (int i = 0; i < 10; i++) {
ch.send(i);
std::cout << "Sent: " << i << std::endl;
}
ch.close();
});
std::thread consumer([&]() {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
int val;
while (ch.receive(val)) {
std::cout << "Received: " << val << std::endl;
}
});
producer.join();
consumer.join();
}
  • 消息队列:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
#pragma once

#include <mutex>
#include <condition_variable>
#include <queue>
#include <memory>
namespace messaging
{
//①消息基类。队列中存储的项目
struct message_base
{
virtual ~message_base()
{}
};

//②每种消息都具有特化类型
template<typename Msg>
struct wrapped_message :
message_base
{
Msg contents;
explicit wrapped_message(Msg const& contents_) :
contents(contents_)
{}
};

//③消息队列
class queue
{
std::mutex m;
std::condition_variable c;
//④以内部队列存储message_base型共享指针
std::queue<std::shared_ptr<message_base> > q;
public:
template<typename T>
void push(T const& msg)
{
std::lock_guard<std::mutex> lk(m);
//⑤包装发布的消息,并存储相关的指针
q.push(std::make_shared<wrapped_message<T> >(msg));
c.notify_all();
}
std::shared_ptr<message_base> wait_and_pop()
{
std::unique_lock<std::mutex> lk(m);
//⑥如果队列为空,就发生阻塞
c.wait(lk,[&] {return !q.empty(); });
auto res = q.front();
q.pop();
return res;
}
};
}

thread 源码解读及一些常见问题

默认移动构造

  • 关于局部变量返回值的问题我曾在视频中说会通过构造函数返回一个局部变量给调用者,编译器会先执行拷贝构造,如果没有拷贝构造再寻找移动构造。这么说是有问题的。
    有热心的粉丝查询了chatgpt,当函数返回一个类类型的局部变量时会先调用移动构造,如果没有移动构造再调用拷贝构造。
  • 所以对于一些没有拷贝构造但是实现了移动构造的类类型也支持通过函数返回局部变量。
  • 在 C++11 之后,编译器会默认使用移动语义(move semantics)来提高性能
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class TestCopy {
public:
TestCopy(){}
TestCopy(const TestCopy& tp) {
std::cout << "Test Copy Copy " << std::endl;
}
TestCopy(TestCopy&& cp) {
std::cout << "Test Copy Move " << std::endl;
}
};
TestCopy TestCp() {
TestCopy tp;
return tp;
}

int main(){
TestCp();
return 0;
}
  • 发现打印的是”Test Copy Move” .这说明优先调用的是移动构造,这也提醒我们,如果我们自定义的类实现了拷贝构造和移动构造,而这个类的移动给构造和拷贝构造实现方式不同时,要注意通过函数内部局部变量返回该类时调用移动构造是否会存在一些逻辑或安全的问题。

  • 优先按照移动构造的方式返回局部的类对象,有一个好处就是可以返回一些只支持移动构造的类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
std::unique_ptr<int> ReturnUniquePtr() {
std::unique_ptr<int> uq_ptr = std::make_unique<int>(100);
return uq_ptr;
}
std::thread ReturnThread() {
std::thread t([]() {
int i = 0;
while (true) {
std::cout << "i is " << i << std::endl;
i++;
if (i == 5) {
break;
}
std::this_thread::sleep_for(std::chrono::seconds(1));
}
});
return t;
}

int main(){
auto rt_ptr = ReturnUniquePtr();
std::cout << "rt_ptr value is " << *rt_ptr << std::endl;
std::thread rt_thread = ReturnThread();
rt_thread.join();
return 0;
}

线程归属权问题

  • 我们不能将一个线程的归属权转移给一个已经绑定线程的变量。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
void ThreadOp() {
std::thread t1([]() {
int i = 0;
while (i < 5) {
std::this_thread::sleep_for(std::chrono::seconds(1));
i++;
}
});
std::thread t2([]() {
int i = 0;
while (i < 10) {
std::this_thread::sleep_for(std::chrono::seconds(1));
i++;
}
});
//不能将一个线程归属权绑定给一个已经绑定线程的变量,否则会触发terminate导致崩溃
t1 = std::move(t2);
t1.join();
t2.join();
}
  • 这样就会发生崩溃

示例图片

  • t1已经绑定了一个线程执行循环操作直到i<5。如果在t1没运行完的情况下将t2的归属权给t1,则会引发terminate崩溃错误。

  • 所以综上所述,std::thread向回调函数传递值是以副本的方式,回调函数参数是引用类型,可以将传递的实参用std::ref包装达到修改的效果。
    因为std::ref其实是构造了reference_wrapper类对象,这个类实现了仿函数

1
2
3
4
5


_CONSTEXPR20 operator _Ty&() const noexcept {
return *_Ptr;
}
  • 所以当线程接收std::ref包裹的参数时会调用仿函数通过指针解引用的方式获取外部实参,以_Ty&返回,从而达到修改的效果。

future 析构的细节

  • future在析构之前会等待任务结束才会正确析构,否则会阻塞,它内部会有一个引用计数,只有当引用计数为0的时候future才会析构
1
2
3
4
5
6
7
8
9
10
11

void BlockAsync() {
std::cout << "begin block async" << std::endl;
{
std::async(std::launch::async, []() {
std::this_thread::sleep_for(std::chrono::seconds(3));
std::cout << "std::async called " << std::endl;
});
}
std::cout << "end block async" << std::endl;
}
  • 输出如下:
1
2
3
begin block async
std::async called
end block async
  • 我们发现并没有并行运行,因为会在async局部作用域的时候阻塞,等待任务执行完主线程才会继续执行

  • 需求

  • 你的需求是func1 中要异步执行asyncFunc函数。
  • func2中先收集asyncFunc函数运行的结果,只有结果正确才执行
  • func1启动异步任务后继续执行,执行完直接退出不用等到asyncFunc运行完
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
int asyncFunc() {
std::this_thread::sleep_for(std::chrono::seconds(3));
std::cout << "this is asyncFunc" << std::endl;
return 0;
}
void func1(std::future<int>& future_ref) {
std::cout << "this is func1" << std::endl;
future_ref = std::async(std::launch::async, asyncFunc);
}
void func2(std::future<int>& future_ref) {
std::cout << "this is func2" << std::endl;
auto future_res = future_ref.get();
if (future_res == 0) {
std::cout << "get asyncFunc result success !" << std::endl;
}
else {
std::cout << "get asyncFunc result failed !" << std::endl;
return;
}
}
//提供多种思路,这是第一种
void first_method() {
std::future<int> future_tmp;
func1(future_tmp);
func2(future_tmp);
}
  • 上面的例子我们保证在func1和func2使用的是future的引用即可。这样func1内不会因为启动async而阻塞,因为future_ref不是shared state最后持有者。

  • 纯异步代码

1
2
3
4
5
6
7
8
9
10
11
template<typename Func, typename... Args  >
auto ParallenExe(Func&& func, Args && ... args) -> std::future<decltype(func(args...))> {
typedef decltype(func(args...)) RetType;
std::function<RetType()> bind_func = std::bind(std::forward<Func>(func), std::forward<Args>(args)...);
std::packaged_task<RetType()> task(bind_func);
auto rt_future = task.get_future();
std::thread t(std::move(task));
t.detach();
return rt_future;
}

原子操作和内存模型

改动序列

  • 在一个C++程序中,每个对象都具有一个改动序列,它由所有线程在对象上的全部写操作构成,其中第一个写操作即为对象的初始化。
  • 大部分情况下,这个序列会随程序的多次运行而发生变化,但是在程序的任意一次运行过程中,所含的全部线程都必须形成相同的改动序列。

改动序列基本要求如下:

  1. 只要某线程看到过某个对象,则该线程的后续读操作必须获得相对新近的值,并且,该线程就同一对象的后续写操作,必然出现在改动序列后方。(也就是说只有在修改后才能继续修改或读取)
  2. 如果某线程先向一个对象写数据,过后再读取它,那么必须读取前面写的值。
  3. 若在改动序列中,上述读写操作之间还有别的写操作,则必须读取最后写的值。
  4. 在程序内部,对于同一个对象,全部线程都必须就其形成相同的改动序列,并且在所有对象上都要求如此.(多个线程作用在一个对象上需要井然有序)
  5. 多个对象上的改动序列只是相对关系,线程之间不必达成一致

原子类型

  • 标准原子类型的定义位于头文件内。我们可以通过atomic<>定义一些原子类型的变量,如atomic,atomic 这些类型的操作全是原子化的。

  • 从C++17开始,所有的原子类型都包含一个静态常量表达式成员变量,std::atomic::is_always_lock_free(判断是否天生原子性)。这个成员变量的值表示在任意给定的目标硬件上,原子类型X是否始终以无锁结构形式实现。如果在所有支持该程序运行的硬件上,原子类型X都以无锁结构形式实现,那么这个成员变量的值就为true;否则为false。

  • 只有一个原子类型不提供is_lock_free()成员函数:std::atomic_flag 。类型std::atomic_flag的对象在初始化时清零,随后即可通过成员函数test_and_set()查值并设置成立,或者由clear()清零。整个过程只有这两个操作。其他的atomic<>的原子类型都可以基于其实现。

  • std::atomic_flag的test_and_set成员函数是一个原子操作,他会先检查std::atomic_flag当前的状态是否被设置过,

test_ans_set的作用:

  1. 如果没被设置过(比如初始状态或者清除后),将std::atomic_flag当前的状态设置为true,并返回false。

  2. 如果被设置过则直接返回ture。

对于std::atomic类型的原子变量,还支持load()(读操作)和store()(写操作)、exchange()、compare_exchange_weak()(比较期望值,不相等交换)和compare_exchange_strong()等操作。

内存次序

  • 对于原子类型上的每一种操作,我们都可以提供额外的参数,从枚举类std::memory_order取值,用于设定所需的内存次序语义(memory-ordering semantics)。
  • 枚举类std::memory_order具有6个可能的值,分别是:std::memory_order_relaxed、std::memory_order_consume、std::memory_order_acquire、std::memory_order_release、std::memory_order_acq_rel或std::memory_order_seq_cst。
  1. 包括std::memory_order_relaxed、std:: memory_order_acquire、std::memory_order_consume、

std::memory_order_acq_rel、std::memory_order_release和 std::memory_order_seq_cst。

  1. 存储(store)操作,可选用的内存次序有std::memory_order_relaxed、std::memory_order_release或std::memory_order_seq_cst(全局一致性)。

  2. 载入(load)操作,可选用的内存次序有std::memory_order_relaxed、std::memory_order_consume、std::memory_order_acquire或std::memory_order_seq_cst。

  3. “读-改-写”(read-modify-write)操作,可选用的内存次序有std::memory_order_relaxed、std::memory_order_consume、std::memory_order_acquire、std::memory_order_release、std::memory_order_acq_rel或std::memory_order_seq_cst。

  4. 原子操作默认使用的是std::memory_order_seq_cst次序。

这六种内存顺序相互组合可以实现三种顺序模型 (ordering model)

  1. Sequencial consistent ordering. 实现同步, 且保证全局顺序一致 (single total order) 的模型. 是一致性最强的模型, 也是默认的顺序模型.
  2. Acquire-release ordering. 实现同步, 但不保证保证全局顺序一致的模型.
  3. Relaxed ordering. 不能实现同步, 只保证原子性的模型.

实现自旋锁

  • 自旋锁是一种在多线程环境下保护共享资源的同步机制。它的基本思想是,当一个线程尝试获取锁时,如果锁已经被其他线程持有,那么该线程就会不断地循环检查锁的状态,直到成功获取到锁为止。
  • 我们可以用std::atomic_flag来实现自旋锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class SpinLock {
public:
void lock() {
//自旋等待直到可获取锁
while (flag.test_and_set(std::memory_order_acquire));
}
void unlock() {
//释放锁
flag.clear(std::memory_order_release);
}
private:
//初始化为0
std::atomic_flag flag = ATOMIC_FLAG_INIT;
};

void TestSpinLock() {
SpinLock spinlock;
std::thread t1([&spinlock]() {
spinlock.lock();
for (int i = 0; i < 3; i++) {
std::cout << "*";
}
std::cout << std::endl;
spinlock.unlock();
});
std::thread t2([&spinlock]() {
spinlock.lock();
for (int i = 0; i < 3; i++) {
std::cout << "?";
}
std::cout << std::endl;
spinlock.unlock();
});
t1.join();
t2.join();
}
  • 在多线程调用时,仅有一个线程在同一时刻进入test_and_set,因为atomic_flag初始状态为false,所以test_and_set将atomic_flag设置为true,并且返回false。

  • 比如线程A调用了test_and_set返回false,这样lock函数返回,线程A继续执行加锁区域的逻辑。此时线程B调用test_and_set,test_and_set会返回true,导致线程B在while循环中循环等待,达到自旋检测标记的效果。当线程A直行至2处调用clear操作后,atomic_flag被设置为清空状态,线程B调用test_and_set会将状态设为成立并返回false,B线程执行加锁区域的逻辑。

宽松内存序

  • 为了给大家介绍不同的字节序,我们先从最简单的字节序std::memory_order_relaxed(宽松字节序)介绍。
  • 因为字节序是为了实现改动序列的,所以为了理解字节序还要结合改动序列讲起。
  • 我们先看一个CPU和内存结构图:

示例图片

  • 其中StoreBuffer就是一级Cache, Catche是二级Cache,Memory是三级Cache。

  • 每个标识CPU的块就是core,上图画的就是4核结构。每两个core构成一个bank,共享一个cache。四个core共享memory。

  • 每个CPU所作的store均会写到store buffer中,每个CPU会在任何时刻将store buffer中结果写入到cache或者memory中。

  • 那该如何保证数据一致性?这就要提及MESI一致性协议

  • MESI 协议,是一种叫作写失效(Write Invalidate)的协议。在写失效协议里,只有一个 CPU 核心负责写入数据,其他的核心,只是同步读取到这个写入。在这个 CPU 核心写入 cache 之后,它会去广播一个“失效”请求告诉所有其他的 CPU 核心
  • MESI 协议对应的四个不同的标记,分别是:
  1. M:代表已修改(Modified):“已修改”用来告诉其他cpu已经修改完成,其他cpu可以向cache中写入数据

  2. I:代表已失效(Invalidated)

  3. E:代表独占(Exclusive):“独占”表示数据只是加载到当前 CPU核 的store buffer中,其他的 CPU 核,并没有加载对应的数据到自己的 store buffer 里。,这个时候,如果要向独占的 store buffer 写入数据,我们可以自由地写入数据,而不需要告知其他 CPU 核。

  4. S:代表共享(Shared):共享状态就是在多核中同时加载了同一份数据。所以在共享状态下想要修改数据要先向所有的其他 CPU 核心广播一个请求,要求先把其他 CPU 核心里面的 cache ,都变成无效的状态,然后再更新当前 cache 里面的数据,我们可以这么理解,如果变量a此刻在各个cpu的StoreBuffer中,那么CPU1核修改这个a的值,放入cache时通知其他CPU核写失效,因为同一时刻仅有一个CPU核可以写数据,但是其他CPU核是可以读数据的,那么其他核读到的数据可能是CPU1核修改之前的。这就涉及我们提到的改动序列了。

这里给大家简单介绍两个改动序列的术语

  1. “synchronizes-with“ : 同步, “A synchronizes-with B” 的意思就是 A和B同步,简单来说如果多线程环境下,有一个线程先修改了变量m,我们将这个操作叫做A,之后有另一个线程读取变量m,我们将这个操作叫做B,如果A先修改了,那么B一定读取A修改m之后的最新值。也可以称作 A “happens-before“ B,即A操作的结果对B操作可见。

  2. “happens-before“ : 先行,”A happens-before B” 的意思是如果A操作先于B操作,那么A操作的结果对B操作可见。”happens-before“包含很多种境况,不仅仅是我们上面提到的”synchronizes-with“,之后给大家一个脑图详细说明”happens-before“的几种情况。

  • 我的理解是先行关系多数情况下描述对同一变量的先后操作,只有变量之间存在依赖关系才会有对不同变量的先后操作存在先行关系

  • 我们接下来谈谈std::memory_order_relaxed

  • 关于std::memory_order_relaxed具备如下几个功能:

    1. 作用于原子变量(写的时候是完整的,不会发生写一半而被终止)
    2. 不具有synchronizes-with关系(同步)(也就是说在多线程里不能保证读操作能读到写操作后的值
    3. 对于同一个原子变量,在同一个线程中具有happens-before关系, 在同一线程中不同的原子变量不具有happens-before关系,可以乱序执行。
    4. 多线程情况下不具有happens-before关系。
  • 由上述可知,如果采用最松散的内存顺序模型,在一个线程中,如果某个表达式已经看到原子变量的某个值a,则该表达式的后续表达式只能看到a或者比a更新的值。

  • 我们看下面的代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
std::atomic<bool> x, y;
std::atomic<int> z;
void write_x_then_y() {
x.store(true, std::memory_order_relaxed); // 1
y.store(true, std::memory_order_relaxed); // 2
}
void read_y_then_x() {
while (!y.load(std::memory_order_relaxed)) { // 3
std::cout << "y load false" << std::endl;
}
if (x.load(std::memory_order_relaxed)) { //4
++z;
}
}
void TestOrderRelaxed() {
std::thread t1(write_x_then_y);
std::thread t2(read_y_then_x);
t1.join();
t2.join();
assert(z.load() != 0); // 5
}
  • 我们从两个角度分析
  1. 从cpu架构分析

假设线程t1运行在CPU1上,t2运行在CPU3上,那么t1对x和y的操作,t2是看不到的。

比如当线程t1运行至1处将x设置为true,t1运行至2处将y设置为true。这些操作仅在CPU1的store buffer中,还未放入cache和memory中,CPU2自然不知道。

如果CPU1先将y放入memory,那么CPU2就会读取y的值为true。那么t2就会运行至3处从while循环退出,进而运行至4处,此时CPU1还未将x的值写入memory,

t2读取的x值为false,进而线程t2运行结束,然后CPU1将x写入true, t1结束运行,最后主线程运行至5处,因为z为0,所以触发断言。

  1. 从宽松内存序分析

因为memory_order_relaxed是宽松的内存序列,它只保证操作的原子性,并不能保证多个变量之间的顺序性,也不能保证同一个变量在不同线程之间的可见顺序。

比如t1可能先运行2处代码再运行1处代码,因为我们的代码会被编排成指令执行,编译器在不破坏语义的情况下(2处和1处代码无耦合,可调整顺序),2可能先于1执行。如果这样,t2运行至3处退出while循环,继续运行4处,此时t1还未执行1初代码,则t2运行4处条件不成立不会对z做增加,t2结束。这样也会导致z为0引发断言。

如图:
示例图片

  • 我们在看一个例子
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
void TestOderRelaxed2() {
std::atomic<int> a{ 0 };
std::vector<int> v3, v4;
std::thread t1([&a]() {
for (int i = 0; i < 10; i += 2) {
a.store(i, std::memory_order_relaxed);
}
});
std::thread t2([&a]() {
for (int i = 1; i < 10; i += 2)
a.store(i, std::memory_order_relaxed);
});
std::thread t3([&v3, &a]() {
for (int i = 0; i < 10; ++i)
v3.push_back(a.load(std::memory_order_relaxed));
});
std::thread t4([&v4, &a]() {
for (int i = 0; i < 10; ++i)
v4.push_back(a.load(std::memory_order_relaxed));
});
t1.join();
t2.join();
t3.join();
t4.join();
for (int i : v3) {
std::cout << i << " ";
}
std::cout << std::endl;
for (int i : v4) {
std::cout << i << " ";
}
std::cout << std::endl;
}
  • 可能的输出
1
2
3
4
5
6
7
0 9 9 9 9 9 9 9 9 9
9 9 9 9 9 9 9 9 9 9
````

```cpp
0 8 8 8 8 8 8 8 8 8
9 9 9 9 9 9 9 9 9 9
  • 由此可见,std::memory_order_relaxed对于多个线程它是乱序执行的,因为多个线程仅操作了a变量,通过memory_order_relaxed的方式仅能保证对a的操作是原子的(同一时刻仅有一个线程写a的值,但是可能多个线程读取a的值)。

  • 但是多个线程之间操作不具备同步关系,也就是线程t1将a改为7,那么线程t3不知道a改动的最新值为7,它读到a的值为1。只是要过一阵子可能会读到7或者a变为7之后又改动的其他值。

  • 但是t3,t4两个线程读取a的次序是一致的,比如t3和t4都读取了7和9,t3读到7在9之前,那么t4也读取到7在9之前。

  • 因为我们memory_order_relaxed保证了多线程对同一个变量的原子操作的安全性,只是可见性会有延迟罢了。

先行

  • Happens-before 是一个非常重要的概念. 如前文我们提及:
  • 如果操作 a “happens-before” 操作 b, 则操作 a 的结果对于操作 b 可见. happens-before 的关系可以建立在用一个线程的两个操作之间, 也可以建立在不同的线程的两个操作之间。

顺序先行

  • 单线程情况下前面的语句先执行,后面的语句后执行。操作a先于操作b,那么操作b可以看到操作a的结果。我们称操作a顺序先行于操作b。也就是”a sequenced-before b”。

这种情况下”a happens before b

  • sequencde-before”具备传递性,比如操作 a “sequenced-before” 操作 b, 且操作 b “sequenced-before” 操作 m, 则操作 a “sequenced-before” 操作 m.

示例图片

线程间先行

  • 线程间先行又叫做”inter-thread-happens-before”,这是多线程情况的”happens-before”.

  • 我们前面提到的”synchronizes-with” 可以构成 “happens-before”。

  • 如果线程 1 中的操作 a “synchronizes-with” 线程 2 中的操作 b, 则操作 a “inter-thread happens-before” 操作 b.

示例图片

  • 此外 synchronizes-with 还可以 “后接” 一个 sequenced-before 关系组合成 inter-thread happens-before 的关系:

  • 比如操作 a “synchronizes-with” 操作 b, 且操作 b “sequenced-before” 操作 m, 则操作 a “inter-thread happens-before” 操作 m.

示例图片

  • 那同样的道理, Inter-thread happens-before 关系则可以 “前接” 一个 sequenced-before 关系以延伸它的范围; 而且 inter-thread happens-before 关系具有传递性:
  1. 如果操作 a “sequenced-before” 操作 k, 且操作 k “inter-thread happens-before” 操作 b, 则操作 a “inter-thread happens-before” 操作 b.

示例图片

  1. 如果操作 a “inter-thread happens-before” 操作 k, 且操作 k “inter-thread happens-before” 操作 b, 则操作 a “inter-thread happens-before” 操作 b.

示例图片

依赖关系

  • 依赖关系有 carries dependency 和 dependency-ordered before.

  • 单线程情况下a “sequenced-before” b, 且 b 依赖 a 的数据, 则 a “carries a dependency into” b. 称作 a 将依赖关系带给 b, 也理解为b依赖于a。

1
2
3
4
5
6
7
8
void TestDependency() {
// 1 处
std::string str = "hello world!";
// 2 处
int i = 3;
// 3 处
std::cout << str[i] << std::endl;
}
  • 函数TestDependency内部打印str[i]的值。3处代码需要依赖1处和2处两个变量的值,所以达成依赖关系。

  • 我们看单线程情况下按顺序执行1,2,3处代码,1 “sequenced-before” 3,且3 依赖 1的数据,则 1 “carries a dependency into” 3

  • 同样的道理 2 “sequenced-before” 3, 且3依赖2 的数据,则2 “carries a dependency into” 3.

“carries a dependency into” 也被归为”happens-before”。

示例图片

  • 多线程情况下
  • 线程1执行操作A(比如对i自增),线程2执行操作B(比如根据i访问字符串下表的元素), 如果线程1先于线程2执行,且操作A的结果对操作B可见,我们将这种叫做
    A “dependency-ordered before” B. 有人会说这不是前面说到的A “synchronizes with “ B吗?你可以这么理解。就当作他们达到的效果是一致的,只不过A “dependency-ordered before” B 更细化一点,表述了一种依赖,比如操作A仅仅对i增加,而没有对字符串修改。而操作B需要通过i访问字符串数据。那操作B实际上是依赖于A的。

Happens-before不代表指令执行顺序

  • Happens-before不代表指令实际执行顺序,C++编译器可以对不相关的指令任意编排达到优化效果,Happens-before仅是C++语义层面的描述,表示 a “Happens-before” b仅能说明a操作的结果对b操作可见。
1
2
3
4
5
6
7
8
9
int  Add() {
int a = 0, b = 0;
//1 处
a++;
// 2 处
b++;
// 3 处
return a + b;
}
  • 单线程执行上述代码,操作1一定是happens-before 操作2 的(a “sequenced-before” b),就是我们理解的 a++ 先于 b++。

  • 但是计算机的指令可能不是这样,一条C++语句对于多条计算机指令。

  • 有可能是先将b值放入寄存器eax做加1,再将a的值放入寄存器edx做加1,然后再将eax寄存器的值写回a,将edx写回b。

  • 因为对于计算机来说1处操作和2处操作的顺序对于3处来说并无影响。只要3处返回a+b之前能保证a和b的值是增加过的即可。

  • 那我们语义上的”Happens-before”有意义吗? 是有意义的,因为如果 a “sequenced-before” b, 那么无论指令如何编排,最终写入内存的顺序一定是a先于b。

  • 只不过C++编译器不断优化尽可能不造成指令编排和语义理解的差异,上面C++的代码转换为汇编指令如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

int a = 0, b = 0;
00A1C8F5 mov dword ptr [a],0
00A1C8FC mov dword ptr [b],0
//1 处
a++;
00A1C903 mov eax,dword ptr [a]
00A1C906 add eax,1
00A1C909 mov dword ptr [a],eax
// 2 处
b++;
00A1C90C mov eax,dword ptr [b]
00A1C90F add eax,1
00A1C912 mov dword ptr [b],eax
return a + b;
00A1C915 mov eax,dword ptr [a]
00A1C918 add eax,dword ptr [b]

脑图

  • 我们将”happens-before” 的几种情况做成脑图,方便理解

示例图片

  • 我们画一个框将”happens-before” 的几种情况框起来

示例图片

用内存顺序实现内存模型

memory_order_seq_cst

  • memory_order_seq_cst代表全局一致性顺序,可以用于 store, load 和 read-modify-write 操作, 实现 sequencial consistent 的顺序模型. 在这个模型下, 所有线程看到的所有操作都有一个一致的顺序, 即使这些操作可能针对不同的变量, 运行在不同的线程.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
void write_x_then_y() {
x.store(true, std::memory_order_seq_cst); // 1
y.store(true, std::memory_order_seq_cst); // 2
}
void read_y_then_x() {
while (!y.load(std::memory_order_seq_cst)) { // 3
std::cout << "y load false" << std::endl;
}
if (x.load(std::memory_order_seq_cst)) { //4
++z;
}
}
void TestOrderSeqCst() {
std::thread t1(write_x_then_y);
std::thread t2(read_y_then_x);
t1.join();
t2.join();
assert(z.load() != 0); // 5
}
  • 上面的代码x和y采用的是memory_order_seq_cst, 所以当线程t2执行到3处并退出循环时我们可以断定y为true,因为是全局一致性顺序,所以线程t1已经执行完2处将y设置为true,那么线程t1也一定执行完1处代码并对t2可见,所以当t2执行至4处时x为true,那么会执行z++保证z不为零,从而不会触发断言。

  • 实现 sequencial consistent 模型有一定的开销. 现代 CPU 通常有多核, 每个核心还有自己的缓存. 为了做到全局顺序一致, 每次写入操作都必须同步给其他核心. 为了减少性能开销, 如果不需要全局顺序一致, 我们应该考虑使用更加宽松的顺序模型.

  • 为了保持全局一致性,当CPU执行写操作的时候不仅会给别的cpu发送写失效,还会发送不让读的指令,这样就保住了全局一致性

memory_order_relaxed

  • memory_order_relaxed 可以用于 store, load 和 read-modify-write 操作, 实现 relaxed 的顺序模型.
    前文我们介绍过这种模型下, 只能保证操作的原子性和修改顺序 (modification order) 一致性, 无法实现 synchronizes-with 的关系。

  • 详见上一段笔记

Acquire-Release

  • 在 acquire-release 模型中, 会使用 memory_order_acquire, memory_order_release 和 memory_order_acq_rel 这三种内存顺序. 它们的用法具体是这样的:

    1. 对原子变量的 load 可以使用 memory_order_acquire 内存顺序. 这称为 acquire 操作
    2. 对原子变量的 store 可以使用 memory_order_release 内存顺序. 这称为 release 操作.(发布操作)
    3. read-modify-write 操作即读 (load) 又写 (store), 它可以使用 memory_order_acquire, memory_order_release 和 memory_order_acq_rel:
      1. 如果使用 memory_order_acquire, 则作为 acquire 操作;
      2. 如果使用 memory_order_release, 则作为 release 操作;
      3. 如果使用 memory_order_acq_rel, 则同时为两者.
  • Acquire-release 可以实现 synchronizes-with 的关系. 如果一个 acquire 操作在同一个原子变量上读取到了一个 release 操作写入的值, 则这个 release 操作 “synchronizes-with” 这个 acquire 操作.(就是说对于一个原子变量,如果store relase先发生,则会保证 load acuire读取到的是操作后的值)

  • 不要想太复杂,release给变量store时加上了一个标记,另一个线程acquire读到该变量时看到该标记就知道要先进行同步,就是同步更新了各自缓冲区的数据(比如线程1告诉线程2我的x更新了,线程2刷新缓冲区),保证数据的一致性,而不是说加上了release标记的就必须得先执行。
  • 我们可以通过Acquire-release 修正 TestOrderRelaxed函数以达到同步的效果
1
2
3
4
5
6
7
8
9
10
11
12
13
void TestReleaseAcquire() {
std::atomic<bool> rx, ry;
std::thread t1([&]() {
rx.store(true, std::memory_order_relaxed); // 1
ry.store(true, std::memory_order_release); // 2
});
std::thread t2([&]() {
while (!ry.load(std::memory_order_acquire)); //3
assert(rx.load(std::memory_order_relaxed)); //4
});
t1.join();
t2.join();
}
  • 上面的例子中我们看到ry.store使用的是std::memory_order_release, ry.load使用的是std::memory_order_relaxed.

  • 对于ry.store(true, std::memory_order_release);,它确保在存储操作之前的所有操作都发生在存储操作之前,并且不会被重排序到存储操作之后。对于ry.load(std::memory_order_acquire);,它确保在加载操作之后的所有操作都发生在加载操作之后,并且不会被重排序到加载操作之前。”,然后又问了synchronizes with相关的:“具体来说,如果一个原子操作A与另一个原子操作B具有“synchronizes-with”关系,那么:

    1. 所有在A之前的操作都在B之前发生。
    2. 所有在B之后的操作都在A之后发生。
  • 可以断定4 不会触发断言。

1
2
3
4
5
6
7
8
9
10
11
12
13
void TestReleaseAcquire() {
std::atomic<bool> rx, ry;
std::thread t1([&]() {
ry.store(true, std::memory_order_release); // 1
rx.store(true, std::memory_order_relaxed); // 2
});
std::thread t2([&]() {
while (!ry.load(std::memory_order_acquire)); //3
assert(rx.load(std::memory_order_relaxed)); //4
});
t1.join();
t2.join();
}
  • 这就又可能发生断言,因为rx.store(true, std::memory_order_relaxed);在1之后操作不会保证同步性

  • 我们从cpu结构图理解这一情景

示例图片

  • 到此大家一定要记住仅 Acquire-release能配合达到 synchronizes-with效果,再就是memory_order_seq_cst可以保证全局顺序唯一,其他情况的内存顺序都能保证顺序,使用时需注意。

  • Acquire-release 的开销比 sequencial consistent 小. 在 x86 架构下, memory_order_acquire 和 memory_order_release 的操作不会产生任何其他的指令, 只会影响编译器的优化: 任何指令都不能重排到 acquire 操作的前面, 且不能重排到 release 操作的后面; 否则会违反 acquire-release 的语义. 因此很多需要实现 synchronizes-with 关系的场景都会使用 acquire-release.

Release sequences

  • 我们再考虑一种情况,多个线程对同一个变量release操作,另一个线程对这个变量acquire,那么只有一个线程的release操作和这个acquire线程构成同步关系。

  • 以下为危险示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void ReleasAcquireDanger2() {
std::atomic<int> xd{0}, yd{ 0 };
std::atomic<int> zd;
std::thread t1([&]() {
xd.store(1, std::memory_order_release); // (1)
yd.store(1, std::memory_order_release); // (2)
});
std::thread t2([&]() {
yd.store(2, std::memory_order_release); // (3)
});
std::thread t3([&]() {
while (!yd.load(std::memory_order_acquire)); //(4)
assert(xd.load(std::memory_order_acquire) == 1); // (5)
});
t1.join();
t2.join();
t3.join();
}
  • 当3被当作4的同步操作的时候,可能会发生断言(xd != 1)
  • 当2被当作4的同步操作的时候,不会发生断言

  • 并不是只有在 acquire 操作读取到 release 操作写入的值时才能构成 synchronizes-with 关系. 为了说这种情况, 我们需要引入 release sequence 这个概念.

  • 针对一个原子变量 M 的 release 操作 A 完成后, 接下来 M 上可能还会有一连串的其他操作. 如果这一连串操作是由

    1. 同一线程上的写操作
    2. 任意线程上的 read-modify-write 操作,这两种构成的, 则称这一连串的操作为以 release 操作 A 为首的 release sequence. 这里的写操作和 read-modify-write 操作可以使用任意内存顺序.
  • 如果一个 acquire 操作在同一个原子变量上读到了一个 release 操作写入的值, 或者读到了以这个 release 操作为首的 release sequence 写入的值, 那么这个 release 操作 “synchronizes-with” 这个 acquire 操作.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void ReleaseSequence() {
std::vector<int> data;
std::atomic<int> flag{ 0 };
std::thread t1([&]() {
data.push_back(42); //(1)
flag.store(1, std::memory_order_release); //(2)
});
std::thread t2([&]() {
int expected = 1;
while (!flag.compare_exchange_strong(expected, 2, std::memory_order_relaxed)) // (3)
expected = 1;
});
std::thread t3([&]() {
while (flag.load(std::memory_order_acquire) < 2); // (4)
assert(data.at(0) == 42); // (5)
});
t1.join();
t2.join();
t3.join();
}

  • 我们考虑t3要想退出首先flag要等于2,那么就要等到t2将flag设置为2,而flag设置为2又要等到t1将flag设置为1. 所以我们捋一下顺序 2->3->4

  • t1中操作2是release操作,以2为开始,其他线程(t2)的读改写在release操作之后,我们称之为release sequence, t3要读取release sequence写入的值,所以我们称t1的release操作 “synchronizes with “ t3的 acquire 操作。

memory_order_consume

  • memory_order_consume 其实是 acquire-release 模型的一部分, 但是它比较特殊, 它涉及到数据间相互依赖的关系. 就是前文我们提及的 carries dependency和 dependency-ordered before.
1
2
3
p++;   // (1)
i++; // (2)
p[i] // (3)
  • (1) “sequenced-before” (2), (2) “sequenced-before” (3), 而(1)和(2)的值作为(3)的下表运算符[]的操作数。

  • 我们可以称(1) “carries a dependency into “ (3), (2) “carries a dependency into “ (3), 但是(1)和(2)不是依赖关系。

  • memory_order_consume 可以用于 load 操作. 使用 memory_order_consume 的 load 称为 consume 操作. 如果一个 consume 操作在同一个原子变量上读到了一个 release 操作写入的值, 或以其为首的 release sequence 写入的值, 则这个 release 操作 “dependency-ordered before” 这个 consume 操作.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void ConsumeDependency() {
std::atomic<std::string*> ptr;
int data;
std::thread t1([&]() {
std::string* p = new std::string("Hello World"); // (1)
data = 42; // (2)
ptr.store(p, std::memory_order_release); // (3)
});
std::thread t2([&]() {
std::string* p2;
while (!(p2 = ptr.load(std::memory_order_consume))); // (4)
assert(*p2 == "Hello World"); // (5)
assert(data == 42); // (6)
});
t1.join();
t2.join();
}
  • t2执行到(4)处时,需要等到ptr非空才能退出循环,这就依赖t1执行完(3)操作。

-因此(3) “dependency-ordered before” (4), 根据前文我们介绍了dependency等同于synchronizes ,所以(3) “inter-thread happens-before”. (4)

-因为(1) “sequenced before” (3), 所以(1) “happens-before “ (4)

-因为(4) “sequenced before” (5), 所以(1) “happens-before “ (5)

-所以(5)处断言也不会触发。

-因为(2) 和(3)不构成先行关系,所以(6)处断言可能触发。

  • release只能保证其他线程可见,并不能保证其他线程能够正确读到,正因为如此才引入acquire。 而consumer只能保证相关原子能够读到

单例模型改良

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
//利用智能指针解决释放问题
class SingleMemoryModel
{
private:
SingleMemoryModel()
{
}
SingleMemoryModel(const SingleMemoryModel&) = delete;
SingleMemoryModel& operator=(const SingleMemoryModel&) = delete;
public:
~SingleMemoryModel()
{
std::cout << "single auto delete success " << std::endl;
}
static std::shared_ptr<SingleMemoryModel> GetInst()
{
// 1 处
if (_b_init.load(std::memory_order_acquire))
{
return single;
}
// 2 处
s_mutex.lock();
// 3 处
if (_b_init.load(std::memory_order_relaxed))
{
s_mutex.unlock();
return single;
}
// 4处
single = std::shared_ptr<SingleMemoryModel>(new SingleMemoryModel);
_b_init.store(true, std::memory_order_release);
s_mutex.unlock();
return single;
}
private:
static std::shared_ptr<SingleMemoryModel> single;
static std::mutex s_mutex;
static std::atomic<bool> _b_init ;
};
std::shared_ptr<SingleMemoryModel> SingleMemoryModel::single = nullptr;
std::mutex SingleMemoryModel::s_mutex;
std::atomic<bool> SingleMemoryModel::_b_init = false;
  • 锁要求全局一致性,所以在3处可以用relaxed

用内存顺序和原子变量实现无锁队列

环形队列

  • 我们要实现无锁并发,经常会用到一种结构无锁队列,而无锁队列和我们经常使用的队列颇有不同,它采用的是环状的队列结构,为什么成环呢?主要有两个好处,一个是成环的队列大小是固定的,另外一个我们通过移动头和尾就能实现数据的插入和取出

示例图片

  • 图1表示队列为空的时候,头节点和尾节点交会在一起,指向同一个扇区。

  • 图2表示当我们你插入一个数字1后,队列大小为1,此时tail指针移动到下一个扇区,head指向头部,1被存储在头部了。

  • 图3表示当我们将数字1出队后,head指针向后移动一个扇区,此时head和tail指向同一个扇区,表示队列又为空了。那有人会问队列中数字1为什么不清空呢?其实不用清空,因为当我们插入新数据时就可以覆盖掉1这个无效的数据。

比如我们继续3图,连续插入几个数字,将队列填满。

  • 当tail + 1 == head的时候队列就满了,所以如果你想存储10个数你得开辟11个空间最后一个空间用来判断满

用锁实现环形队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
#include <iostream>
#include <mutex>
#include <memory>
template<typename T, size_t Cap>
class CircularQueLk :private std::allocator<T> {
public:
CircularQueLk() :_max_size(Cap + 1),_data(std::allocator<T>::allocate(_max_size)), _head(0), _tail(0) {}
CircularQueLk(const CircularQueLk&) = delete;
CircularQueLk& operator = (const CircularQueLk&) volatile = delete;
CircularQueLk& operator = (const CircularQueLk&) = delete;
~CircularQueLk() {
//循环销毁
std::lock_guard<std::mutex> lock(_mtx);
//调用内部元素的析构函数
while (_head != _tail) {
std::allocator<T>::destroy(_data + _head);
_head = (_head+1)%_max_size;
}
//调用回收操作
std::allocator<T>::deallocate(_data, _max_size);
}
//先实现一个可变参数列表版本的插入函数最为基准函数
template <typename ...Args>
bool emplace(Args && ... args) {
std::lock_guard<std::mutex> lock(_mtx);
//判断队列是否满了
if ((_tail + 1) % _max_size == _head) {
std::cout << "circular que full ! " << std::endl;
return false;
}
//在尾部位置构造一个T类型的对象,构造参数为args...
std::allocator<T>::construct(_data + _tail, std::forward<Args>(args)...);
//更新尾部元素位置
_tail = (_tail + 1) % _max_size;
return true;
}
//push 实现两个版本,一个接受左值引用,一个接受右值引用
//接受左值引用版本
bool push(const T& val) {
std::cout << "called push const T& version" << std::endl;
return emplace(val);
}
//接受右值引用版本,当然也可以接受左值引用,T&&为万能引用
// 但是因为我们实现了const T&
bool push(T&& val) {
std::cout << "called push T&& version" << std::endl;
return emplace(std::move(val));
}
//出队函数
bool pop(T& val) {
std::lock_guard<std::mutex> lock(_mtx);
//判断头部和尾部指针是否重合,如果重合则队列为空
if (_head == _tail) {
std::cout << "circular que empty ! " << std::endl;
return false;
}
//取出头部指针指向的数据
val = std::move(_data[_head]);
//更新头部指针
_head = (_head + 1) % _max_size;
return true;
}
private:
size_t _max_size;
T* _data;
std::mutex _mtx;
size_t _head = 0;
size_t _tail = 0;
};

void TestCircularQue() {
//最大容量为10
CircularQueLk<MyClass, 5> cq_lk;
MyClass mc1(1);
MyClass mc2(2);
cq_lk.push(mc1);
cq_lk.push(std::move(mc2));
for (int i = 3; i <= 5; i++) {
MyClass mc(i);
auto res = cq_lk.push(mc);
if (res == false) {
break;
}
}
cq_lk.push(mc2);
for (int i = 0; i < 5; i++) {
MyClass mc1;
auto res = cq_lk.pop(mc1);
if (!res) {
break;
}
std::cout << "pop success, " << mc1 << std::endl;
}
auto res = cq_lk.pop(mc1);
}

无锁队列

1
2
bool std::atomic<T>::compare_exchange_weak(T &expected, T desired);
bool std::atomic<T>::compare_exchange_strong(T &expected, T desired);
  • compare_exchange_strong会比较原子变量atomic的值和expected的值是否相等,如果相等则执行交换操作,将atomic的值换为desired并且返回true,否则将expected的值修改为T变量的值,并且返回false.
  • 其伪代码可以这么理解
1
2
3
4
5
6
7
8
template <typename T>
bool atomic<T>::compare_exchange_strong(T &expected, T desired) {
std::lock_guard<std::mutex> guard(m_lock);
if (m_val == expected)
return m_val = desired, true;
else
return expected = m_val, false;
}
  • compare_exchange_weak功能比compare_exchange_strong弱一些,他不能保证atomic的值和expected的值相等时也会做交换,很可能原子变量和预期值相等也会返回false,所以使用要多次循环使用。
  • 我们们定义一个类CircularQueSeq, 其内容和之前我们定义的类CircularQueLk差不多,只不过将类的成员变量mutex换成atomic类型的原子变量, 我们可以利用自旋锁的思路将锁替换为原子变量循环检测的方式,进而达到锁住互斥逻辑的效果。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
template<typename T, size_t Cap>
class CircularQueSeq :private std::allocator<T> {
public:
CircularQueSeq() :_max_size(Cap + 1), _data(std::allocator<T>::allocate(_max_size)), _atomic_using(false),_head(0), _tail(0) {}
CircularQueSeq(const CircularQueSeq&) = delete;
CircularQueSeq& operator = (const CircularQueSeq&) volatile = delete;
CircularQueSeq& operator = (const CircularQueSeq&) = delete;
~CircularQueSeq() {
//循环销毁
bool use_expected = false;
bool use_desired = true;
do
{
use_expected = false;
use_desired = true;
}
while (!_atomic_using.compare_exchange_strong(use_expected, use_desired));
//调用内部元素的析构函数
while (_head != _tail) {
std::allocator<T>::destroy(_data + _head);
_head = (_head+1)% _max_size;
}
//调用回收操作
std::allocator<T>::deallocate(_data, _max_size);
do
{
use_expected = true;
use_desired = false;
}
while (!_atomic_using.compare_exchange_strong(use_expected, use_desired));
}
//先实现一个可变参数列表版本的插入函数最为基准函数
template <typename ...Args>
bool emplace(Args && ... args) {
bool use_expected = false;
bool use_desired = true;
do
{
use_expected = false;
use_desired = true;
}
while (!_atomic_using.compare_exchange_strong(use_expected, use_desired));
//判断队列是否满了
if ((_tail + 1) % _max_size == _head) {
std::cout << "circular que full ! " << std::endl;
do
{
use_expected = true;
use_desired = false;
}
while (!_atomic_using.compare_exchange_strong(use_expected, use_desired));
return false;
}
//在尾部位置构造一个T类型的对象,构造参数为args...
std::allocator<T>::construct(_data + _tail, std::forward<Args>(args)...);
//更新尾部元素位置
_tail = (_tail + 1) % _max_size;
do
{
use_expected = true;
use_desired = false;
}
while (!_atomic_using.compare_exchange_strong(use_expected, use_desired));
return true;
}
//push 实现两个版本,一个接受左值引用,一个接受右值引用
//接受左值引用版本
bool push(const T& val) {
std::cout << "called push const T& version" << std::endl;
return emplace(val);
}
//接受右值引用版本,当然也可以接受左值引用,T&&为万能引用
// 但是因为我们实现了const T&
bool push(T&& val) {
std::cout << "called push T&& version" << std::endl;
return emplace(std::move(val));
}
//出队函数
bool pop(T& val) {
bool use_expected = false;
bool use_desired = true;
do
{
use_desired = true;
use_expected = false;
} while (!_atomic_using.compare_exchange_strong(use_expected, use_desired));
//判断头部和尾部指针是否重合,如果重合则队列为空
if (_head == _tail) {
std::cout << "circular que empty ! " << std::endl;
do
{
use_expected = true;
use_desired = false;
}
while (!_atomic_using.compare_exchange_strong(use_expected, use_desired));
return false;
}
//取出头部指针指向的数据
val = std::move(_data[_head]);
//更新头部指针
_head = (_head + 1) % _max_size;
do
{
use_expected = true;
use_desired = false;
}while (!_atomic_using.compare_exchange_strong(use_expected, use_desired));
return true;
}
private:
size_t _max_size;
T* _data;
std::atomic<bool> _atomic_using;
size_t _head = 0;
size_t _tail = 0;
};
  • 原子变量为false 才能进函数执行逻辑
  • 多线程情况下也能保证安全是因为原子变量循环检测保证有且只有一个线程修改成功。读取也是这样。

单一原子变量的弊端

  • 我们考虑上述单一原子变量的弊端

  • 多个线程push和pop操作耦合读太高,同一时刻仅有一个线程pop或者push,而且互斥逻辑的精度不够。影响效率。

  • 我们需要考虑将pop和push操作解耦,我们采用的是环形队列,将tail和head作为原子变量可以实现精细控制。

  • 比如我们做push操作的时候,一个线程更新完tail标签和数据后,其他线程就可以pop或者push了,精细控制的好处就是效率提升。

  • 我们定义一个新的类CircularQueLight,类的基本数据结构和CircularQueSeq差不多。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
template<typename T, size_t Cap>
class CircularQueLight: private std::allocator<T>
{
public:
CircularQueLight():_max_size(Cap + 1),
_data(std::allocator<T>::allocate(_max_size))
, _head(0), _tail(0) {}
CircularQueLight(const CircularQueLight&) = delete;
CircularQueLight& operator = (const CircularQueLight&) volatile = delete;
CircularQueLight& operator = (const CircularQueLight&) = delete;
bool pop(T& val) {
size_t h;
do
{
h = _head.load(); //1 处
//判断头部和尾部指针是否重合,如果重合则队列为空
if(h == _tail.load())
{
return false;
}
val = _data[h]; // 2处
} while (!_head.compare_exchange_strong(h,
(h+1)% _max_size)); //3 处
return true;
}
private:
size_t _max_size;
T* _data;
std::atomic<size_t> _head;
std::atomic<size_t> _tail;
};
  • 在pop逻辑里我们在1处load获取头部head的值,在2处采用了复制的方式将头部元素取出赋值给val,而不是通过std::move,因为多个线程同时pop最后只有一个线程成功执行3处代码退出,而失败的则需要继续循环,从更新后的head处pop元素。所以不能用std::move,否则会破坏原有的队列数据。

  • 接下来看看push的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
bool push(T& val)
{
size_t t;
do
{
t = _tail.load(); //1
//判断队列是否满
if( (t+1)%_max_size == _head.load())
{
return false;
}
_data[t] = val; //2
} while (!_tail.compare_exchange_strong(t,
(t + 1) % _max_size)); //3
return true;
}
  • 这样会存在危险,push函数的逻辑乍一看和pop一样,但是我们会发现多线程push的情况存在线程安全问题。

  • 比如我们线程1 push(1) 而线程2 push(2). 很有可能的顺序是

  • 1.1 -> 1.2 -> 2.1 -> 2.2 -> 1.3

  • 这样我们看到的效果就是_data[t]被存储为2了,而实际情况应该是被存储为1,因为线程1的原子变量生效,而线程2的原子变量不满足需继续循环。所以_data[t]必须修改为1.

  • 那我们改进一下push的函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
bool push(T& val)
{
size_t t;
do
{
t = _tail.load(); //1
//判断队列是否满
if( (t+1)%_max_size == _head.load())
{
return false;
}
} while (!_tail.compare_exchange_strong(t,
(t + 1) % _max_size)); //3
_data[t] = val; //2
return true;
}
  • 这样可能还会有安全问题,因为你可能一个线程执行到2处,还没来得及更新,另一个线程就开始执行pop操作了,会使得pop出一个未更新的值
  • 为了解决这个问题,我们可以增加另一个原子变量_tail_update来标记尾部数据是否修改完成,如果尾部数据没有修改完成,此时其他线程pop时获取的数据就是不安全的,所以pop要返回false。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
bool push(const T& val)
{
size_t t;
do
{
t = _tail.load(); //1
//判断队列是否满
if( (t+1)%_max_size == _head.load())
{
return false;
}
} while (!_tail.compare_exchange_strong(t,
(t + 1) % _max_size)); //3
_data[t] = val; //2
size_t tailup;
do
{
tailup = t;
} while (_tail_update.compare_exchange_strong(tailup,
(tailup + 1) % _max_size));
return true;
}

bool pop(T& val) {
size_t h;
do
{
h = _head.load(); //1 处
//判断头部和尾部指针是否重合,如果重合则队列为空
if(h == _tail.load())
{
return false;
}
//判断如果此时要读取的数据和tail_update是否一致,如果一致说明尾部数据未更新完
if(h == _tail_update.load())
{
return false;
}
val = _data[h]; // 2处
} while (!_head.compare_exchange_strong(h,
(h+1)% _max_size)); //3 处
return true;
}
  • pop版本也是,先判断队列是否为空,再判断h是否和_tail_update的值相等,如果相等说明有写数据的没更新完,直接返回false或者循环等待也行,为了方便我们直接返回false即可。

  • 因为我们知道原子操作默认采用的是memory_order_seq_cst内存顺序,性能上不是最优的,我们可以用acquire和release的内存顺序实现同步的效果。

  • **这样的改动就相当于使用了3个指针,一个指针指向头,一个指向尾,另一个为修改数据后的下一个位置,(也就是说在这个指针之前的位置都是修改好的,这个指针指向的位置是正在被修改)如果一个线程想要读取一个未被修改的值,会被卡住

优化性能

  • 我们用acquire和release模型优化上述代码,实现同步。
  • 最简单的方式就是将load的地方变为memory_order_relaxed,compare_exchange_strong的地方变为memory_order_release

  • 我们先看pop操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
bool pop(T& val) {
size_t h;
do
{
h = _head.load(std::memory_order_relaxed); //1 处
//判断头部和尾部指针是否重合,如果重合则队列为空
if (h == _tail.load(std::memory_order_acquire)) //2处
{
std::cout << "circular que empty ! " << std::endl;
return false;
}
//判断如果此时要读取的数据和tail_update是否一致,如果一致说明尾部数据未更新完
if (h == _tail_update.load(std::memory_order_acquire)) //3处
{
return false;
}
val = _data[h]; // 2处
} while (!_head.compare_exchange_strong(h,
(h + 1) % _max_size, std::memory_order_release, std::memory_order_relaxed)); //4 处
std::cout << "pop data success, data is " << val << std::endl;
return true;
}
  • 1 处为memory_order_relaxed是因为即使多个线程pop,每个线程获取的head可能不及时,这个没关系,因为我们有4处的while来重试。

  • 2 compare_exchange_strong操作,在期望的条件匹配时采用memory_order_release, 期望的条件不匹配时memory_order_relaxed可以提升效率,毕竟还是要重试的。

  • 再看push操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
bool push(const T& val)
{
size_t t;
do
{
t = _tail.load(std::memory_order_relaxed); //5
//判断队列是否满
if ((t + 1) % _max_size == _head.load(std::memory_order_acquire))
{
std::cout << "circular que full ! " << std::endl;
return false;
}
} while (!_tail.compare_exchange_strong(t,
(t + 1) % _max_size, std::memory_order_release, std::memory_order_relaxed)); //6
_data[t] = val;
size_t tailup;
do
{
tailup = t;
} while (_tail_update.compare_exchange_strong(tailup,
(tailup + 1) % _max_size, std::memory_order_release, std::memory_order_relaxed)); //7
std::cout << "called push data success " << val << std::endl;
return true;
}
  • 两个线程协同工作,一个线程先push,另一个线程后pop,那么对于tail部分和_tail_update,我们要保证push的结果_data[t] = val;先于pop的结果val = _data[h];

  • 所以push线程中对于_tail_update的compare_exchange_strong操作采用memory_order_release方式。

  • pop线程对于_tail_update的load操作采用memory_order_acquire。

  • 如果一个线程先pop,另一个线程先push,那么对于head部分,我们要保证pop的结果val = _data[h];先于pop的结果_data[t] = val;。

思考

  • 优势

  • 无锁高并发. 虽然存在循环重试, 但是这只会在相同操作并发的时候出现. push 不会因为与 pop 并发而重试, 反之亦然.

  • 缺陷

  • 这样队列只应该存储标量, 如果存储类对象时,多个push线程只有一个线程push成功,而拷贝复制的开销很大,其他线程会循环重试,每次重试都会有开销。

利用栅栏实现同步模型

线程可见顺序

  • 我们提到过除了memory_order_seq_cst顺序,其他的顺序都不能保证原子变量修改的值在其他多线程中看到的顺序是一致的。

  • 但是可以通过同步机制保证一个线程对原子变量的修改对另一个原子变量可见。通过“Syncronizes With” 的方式达到先行的效果。

  • 但是我们说的先行是指 “A Syncronizes With B ”, 如果A 的结果被B读取,则A 先行于B。

  • 有时候我们线程1对A的store操作采用release内存序,而线程2对B的load采用acquire内存序,并不能保证A 一定比 B先执行。因为两个线程并行执行无法确定先后顺序,我们指的先行不过是说如果B读取了A操作的结果,则称A先行于B

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include <iostream>
#include <atomic>
#include <thread>
#include <cassert>
std::atomic<bool> x, y;
std::atomic<int> z;

void write_x()
{
x.store(true, std::memory_order_release); //1
}
void write_y()
{
y.store(true, std::memory_order_release); //2
}
void read_x_then_y()
{
while (!x.load(std::memory_order_acquire));
if (y.load(std::memory_order_acquire)) //3
++z;
}
void read_y_then_x()
{
while (!y.load(std::memory_order_acquire));
if (x.load(std::memory_order_acquire)) //4
++z;
}
  • 我们写一个函数测试,函数TestAR中初始化x和y为false, 启动4个线程a,b,c,d,分别执行write_x, write_y, read_x_then_y, read_y_then_x.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void TestAR()
{
x = false;
y = false;
z = 0;
std::thread a(write_x);
std::thread b(write_y);
std::thread c(read_x_then_y);
std::thread d(read_y_then_x);
a.join();
b.join();
c.join();
d.join();
assert(z.load() != 0); //5
std::cout << "z value is " << z.load() << std::endl;
}
  • 有的读者可能会觉5处的断言不会被触发,他们认为c和d肯定会有一个线程对z执行++操作。他们的思路是这样的。 1 如果c线程执行read_x_then_y没有对z执行加加操作,那么说明c线程读取的x值为true, y值为false。 2 之后d线程读取时,如果保证执行到4处说明y为true,等d线程执行4处代码时x必然为true。 3 他们的理解是如果x先被store为true,y后被store为true,c线程看到y为false时x已经为true了,那么d线程y为true时x也早就为true了,所以z一定会执行加加操作。

  • 上述理解是不正确的,我们提到过即便是releas和acquire顺序也不能保证多个线程看到的一个变量的值是一致的,更不能保证看到的多个变量的值是一致的。

  • 变量x和y的载入操作3和4有可能都读取false值(与宽松次序的情况一样),因此有可能令断言触发错误。变量x和y分别由不同线程写出,所以两个释放操作都不会影响到对方线程。

  • 看下图

示例图片

  • 无论x和y的store顺序谁先谁后,线程c和线程d读取的x和y顺序都不一定一致。

  • 从CPU的角度我们可以这么理解

示例图片

  • 在一个4核CPU结构的主机上,a,b,c,d分别运行在不同的CPU内核上。

  • a执行x.store(true)先被线程c读取,而此时线程b对y的store还没有被c读取到新的值,所以此时c读取的x为true,y为false。

  • 同样的道理,d可以读取b修改y的最新值,但是没来的及读取x的最新值,那么读取到y为true,x为false。

  • 即使我们采用release和acquire方式也不能保证全局顺序一致。如果一个线程对变量执行release内存序的store操作,另一个线程不一定会马上读取到。这个大家要理解。

栅栏

  • 有时候我们可以通过栅栏保证指令编排顺序。

  • 看下面一段代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include <atomic>
#include <thread>
#include <assert.h>
std::atomic<bool> x,y;
std::atomic<int> z;
void write_x_then_y()
{
x.store(true,std::memory_order_relaxed); // 1
y.store(true,std::memory_order_relaxed); // 2
}
void read_y_then_x()
{
while(!y.load(std::memory_order_relaxed)); // 3
if(x.load(std::memory_order_relaxed)) // 4
++z;
}
int main()
{
x=false;
y=false;
z=0;
std::thread a(write_x_then_y);
std::thread b(read_y_then_x);
a.join();
b.join();
assert(z.load()!=0); //5
}
  • 上面的代码我们都采用的是memory_order_relaxed, 所以无法保证a线程将x,y修改后b线程看到的也是先修改x,再修改y的值。b线程可能先看到y被修改为true,x后被修改为true,那么b线程执行到4处时x可能为false导致z不会加加,5处断言会被触发。

  • 那我们之前做法可以解决这个问题

1
2
3
4
5
6
7
8
9
10
11
12
void write_x_then_y3()
{
x.store(true, std::memory_order_relaxed); // 1
y.store(true, std::memory_order_release); // 2
}

void read_y_then_x3()
{
while (!y.load(std::memory_order_acquire)); // 3
if (x.load(std::memory_order_relaxed)) // 4
++z;
}
  • 可以通过std::memory_order_release和std::memory_order_acquire形成同步关系。

  • 线程a执行write_x_then_y3,线程b执行read_y_then_x3,如果线程b执行到4处,说明y已经被线程a设置为true。

  • 线程a执行到2,也必然执行了1,因为是memory_order_release的内存顺序,所以线程a能2操作之前的指令在2之前被写入内存。

  • 同样的道理,线程b在3处执行的是memory_order_acquire的内存顺序,所以能保证4不会先于3写入内存,这样我们能知道1一定先行于4.

  • 进而推断出z会加加,所以不会触发assert(z.load() != 0);的断言。

  • 其实我们可以通过栅栏机制保证指令的写入顺序。栅栏的机制和memory_order_release类似。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void write_x_then_y_fence()
{
x.store(true, std::memory_order_relaxed); //1
std::atomic_thread_fence(std::memory_order_release); //2
y.store(true, std::memory_order_relaxed); //3
}

void read_y_then_x_fence()
{
while (!y.load(std::memory_order_relaxed)); //4
std::atomic_thread_fence(std::memory_order_acquire); //5
if (x.load(std::memory_order_relaxed)) //6
++z;
}
  • 测试
1
2
3
4
5
6
7
8
9
10
11
void TestFence()
{
x = false;
y = false;
z = 0;
std::thread a(write_x_then_y_fence);
std::thread b(read_y_then_x_fence);
a.join();
b.join();
assert(z.load() != 0); //7
}
  • 处的断言也不会触发。我们可以分析一下,

  • 线程a运行write_x_then_y_fence,线程b运行read_y_then_x_fence.

  • 当线程b执行到5处时说明4已经结束,此时线程a看到y为true,那么线程a必然已经执行完3.

  • 尽管4和3我们采用的是std::memory_order_relaxed顺序,但是通过逻辑关系保证了3的结果同步给4,进而”3 happens-before 4”

  • 因为我们采用了栅栏std::atomic_fence所以,5处能保证6不会先于5写入内存,(memory_order_acquire保证其后的指令不会先于其写入内存)

  • 2处能保证1处的指令先于2写入内存,进而”1 happens-before 6”, 1的结果会同步给 6

  • 所以”atomic_thread_fence”其实和”release-acquire”相似,都是保证memory_order_release之前的指令不会排到其后,memory_order_acquire之后的指令不会排到其之前。

基于锁是新鲜线程安全队列与栈

线程安全的栈

  • 实现一个线程安全的栈,我们能想到的是基于锁控制push和pop操作,比如下面的逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
#include <exception>
#include <mutex>
#include <stack>
#include <condition_variable>

struct empty_stack : std::exception
{
const char* what() const throw();
};
template<typename T>
class threadsafe_stack
{
private:
std::stack<T> data;
mutable std::mutex m;
public:
threadsafe_stack() {}

threadsafe_stack(const threadsafe_stack& other)
{
std::lock_guard<std::mutex> lock(other.m);
data = other.data;
}

threadsafe_stack& operator=(const threadsafe_stack&) = delete;

void push(T new_value)
{
std::lock_guard<std::mutex> lock(m);
data.push(std::move(new_value)); // ⇽-- - 1
}

std::shared_ptr<T> pop()
{
std::lock_guard<std::mutex> lock(m);
if (data.empty()) throw empty_stack(); // ⇽-- - 2
std::shared_ptr<T> const res(
std::make_shared<T>(std::move(data.top()))); // ⇽-- - 3
data.pop(); // ⇽-- - 4
return res;
}

void pop(T& value)
{
std::lock_guard<std::mutex> lock(m);
if (data.empty()) throw empty_stack();
value = std::move(data.top()); // ⇽-- - 5
data.pop(); // ⇽-- - 6
}

bool empty() const
{
std::lock_guard<std::mutex> lock(m);
return data.empty();
}
};
  • 我们实现了push操作和pop操作

  • push操作里加锁,然后将数据通过std::move的方式移动放入stack中。我们思考如果1处因为机器内存不足push导致异常,此种情况并不会对栈已有的数据产生危险。
    但是vector容器大家要考虑,因为vector存在内存不足时将数据拷贝转移到新空间的过程。那么对于vector这种动态扩容的容器该如何保证容器内数据在移动过程中出现了异常仍能不丢失呢?

  • 我想到的一个方式就是管理vector的capacity,每次push的时候要判断一下vector的size和capacity是否相等,如果相等则手动扩容并将数据转移到新的vector,再释放旧有的vector。

  • 但是同样会存在一个问题就是会造成内存溢出,因为vector的capacity会随着数据增加而增加,当vector中没有数据的时候capacity仍然很大。这种方式也可以通过swap的方式将当前大容量的vector和一个空的vector做交换,快速清空内存。这些操作和思路需结合实际开发情况而定。

  • pop提供了两个版本,一个是返回智能指针一个是返回bool类型,这两种我们分析,比如3处和4处也很可能因为内存不足导致构造智能指针失败,或者5处赋值失败,这种情况下抛出异常并不会影响栈内数据,因为程序没走到4和6处就抛出异常了。

  • pop函数内部判断栈是否空,如果为空则抛出异常,这种情况我们不能接受,异常是用来处理和预判突发情况的,对于一个栈为空这种常见现象,仅需根据返回之后判断为空再做尝试或放弃出栈即可。

  • 为了解决栈为空就抛出异常的问题,我们可以做如下优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
template<typename  T>
class threadsafe_stack_waitable
{
private:
std::stack<T> data;
mutable std::mutex m;
std::condition_variable cv;
public:
threadsafe_stack_waitable() {}

threadsafe_stack_waitable(const threadsafe_stack_waitable& other)
{
std::lock_guard<std::mutex> lock(other.m);
data = other.data;
}

threadsafe_stack_waitable& operator=(const threadsafe_stack_waitable&) = delete;

void push(T new_value)
{
std::lock_guard<std::mutex> lock(m);
data.push(std::move(new_value)); // ⇽-- - 1
cv.notify_one();
}

std::shared_ptr<T> wait_and_pop()
{
std::unique_lock<std::mutex> lock(m);
cv.wait(lock, [this]()
{
if(data.empty())
{
return false;
}
return true;
}); // ⇽-- - 2


std::shared_ptr<T> const res(
std::make_shared<T>(std::move(data.top()))); // ⇽-- - 3
data.pop(); // ⇽-- - 4
return res;
}

void wait_and_pop(T& value)
{
std::unique_lock<std::mutex> lock(m);
cv.wait(lock, [this]()
{
if (data.empty())
{
return false;
}
return true;
});

value = std::move(data.top()); // ⇽-- - 5
data.pop(); // ⇽-- - 6
}

bool empty() const
{
std::lock_guard<std::mutex> lock(m);
return data.empty();
}

bool try_pop(T& value)
{
std::lock_guard<std::mutex> lock(m);
if(data.empty())
{
return false;
}

value = std::move(data.top());
data.pop();
return true;
}

std::shared_ptr<T> try_pop()
{
std::lock_guard<std::mutex> lock(m);
if(data.empty())
{
return std::shared_ptr<T>();
}

std::shared_ptr<T> res(std::make_shared<T>(std::move(data.top())));
data.pop();
return res;
}

};
  • 我们将pop优化为四个版本,四个版本又可以分为两个大类,两个大类分别为try_pop版本和wait_and_pop版本。

  • try_pop版本不阻塞等待队列有数据才返回,而是直接返回,try_pop又有两个版本,分别返回bool值和指针值。如果队列为空返回false或者空指针。

  • wait_and_pop版本阻塞等待队列有数据才返回,同样有两个版本,分别返回bool值和指针值。

  • 但是上面的代码我们分析,假设此时栈为空,有一个线程A从队列中消费数据,调用wait_and_pop挂起, 此时另一个线程B向栈中放入数据调用push操作,notify一个线程消费队列中的数据。

  • 此时A从wait_and_pop唤醒,但是在执行3或者5处时,因为内存不足引发了异常,我们之前分析过,即使引发异常也不会影响到栈内数据,所以对于栈的数据来说是安全的,但是线程A异常后,其他线程无法从队列中消费数据,除非线程B再执行一次push。因为我们采用的是notify_one的方式,所以仅有一个线程被激活,如果被激活的线程异常了,就不能保证该数据被其他线程消费了,解决这个问题,可以采用几个方案。

    1. wai_and_pop失败的线程修复后再次取一次数据。
    2. 将notify_one改为notify_all,这样能保证通知所有线程。但是notify_all将导致所有线程竞争,并不可取。
    3. 我们可以通过栈存储智能指针的方式进行,因为智能指针在赋值的时候不会引发异常。
  • 稍后我们提供的线程安全队列的版本使用的就是第三种优化。

线程安全队列

  • 队列和栈最大的不同就是队列为先入先出,有了线程安全的栈的开发思路,我们很快实现一个支持线程安全的队列
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
#include <mutex>
#include <queue>

template<typename T>
class threadsafe_queue
{
private:

mutable std::mutex mut;
std::queue<T> data_queue;
std::condition_variable data_cond;

public:
threadsafe_queue()
{}

void push(T new_value)
{
std::lock_guard<std::mutex> lk(mut);
data_queue.push(std::move(new_value));
data_cond.notify_one(); //⇽-- - ①
}

void wait_and_pop(T& value) //⇽-- - ②
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this] {return !data_queue.empty(); });
value = std::move(data_queue.front());
data_queue.pop();
}

std::shared_ptr<T> wait_and_pop() // ⇽-- - ③
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this] {return !data_queue.empty(); }); // ⇽-- - ④
std::shared_ptr<T> res(
std::make_shared<T>(std::move(data_queue.front())));
data_queue.pop();
return res;
}

bool try_pop(T& value)
{
std::lock_guard<std::mutex> lk(mut);
if (data_queue.empty())
return false;
value = std::move(data_queue.front());
data_queue.pop();
return true;
}

std::shared_ptr<T> try_pop()
{
std::lock_guard<std::mutex> lk(mut);
if (data_queue.empty())
return std::shared_ptr<T>(); //⇽-- - ⑤
std::shared_ptr<T> res(
std::make_shared<T>(std::move(data_queue.front())));
data_queue.pop();
return res;
}

bool empty() const
{
std::lock_guard<std::mutex> lk(mut);
return data_queue.empty();
}
};
  • 关于异常情况的分析和栈一样,上面的队列版本不存在异常导致数据丢失的问题。但是同样面临线程执行wait_and_pop时如果出现了异常,导致数据被滞留在队列中,其他线程也无法被唤醒的情况。

  • 为了解决这种情况,我们前文提到了采用智能指针的方式,重新实现一下线程安全队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
template<typename T>
class threadsafe_queue_ptr
{
private:
mutable std::mutex mut;
std::queue<std::shared_ptr<T>> data_queue;
std::condition_variable data_cond;
public:
threadsafe_queue_ptr()
{}
void wait_and_pop(T& value)
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this] {return !data_queue.empty(); });
value = std::move(*data_queue.front()); //⇽-- - 1
data_queue.pop();
}
bool try_pop(T& value)
{
std::lock_guard<std::mutex> lk(mut);
if (data_queue.empty())
return false;
value = std::move(*data_queue.front()); // ⇽-- - 2
data_queue.pop();
return true;
}
std::shared_ptr<T> wait_and_pop()
{
std::unique_lock<std::mutex> lk(mut);
data_cond.wait(lk, [this] {return !data_queue.empty(); });
std::shared_ptr<T> res = data_queue.front(); // ⇽-- - 3
data_queue.pop();
return res;
}
std::shared_ptr<T> try_pop()
{
std::lock_guard<std::mutex> lk(mut);
if (data_queue.empty())
return std::shared_ptr<T>();
std::shared_ptr<T> res = data_queue.front(); // ⇽-- - 4
data_queue.pop();
return res;
}
void push(T new_value)
{
std::shared_ptr<T> data(
std::make_shared<T>(std::move(new_value))); // ⇽-- - 5
std::lock_guard<std::mutex> lk(mut);
data_queue.push(data);
data_cond.notify_one();
}
bool empty() const
{
std::lock_guard<std::mutex> lk(mut);
return data_queue.empty();
}
};
  • 在5处,我们push数据时需要先构造智能指针,如果构造的过程失败了也就不会push到队列中,不会污染队列中的数据。

  • 2,3处和4,5处我们仅仅时将智能指针取出来赋值给一个新的智能指针并返回。关于智能指针的赋值不会引发异常这一点在C++并发编程中提及,这一点我觉得有些存疑,我觉得书中表述的意思应该是指针在64位机器占用8个字节,所有智能指针共享引用计数所以在复制时仅为8字节开销,降低了内存消耗。

  • 所以推荐大家存储数据放入容器中时尽量用智能指针,这样能保证复制和移动过程中开销较小,也可以实现一定意义的数据共享。

  • 但是我们分析上面的代码,队列push和pop时采用的是一个mutex,导致push和pop等操作串行化,我们要考虑的是优化锁的精度,提高并发,那有什么办法吗?

  • 我们分析,队列和栈最本质的区别是队列是首尾操作。我们可以考虑将push和pop操作分化为分别对尾和对首部的操作。对首和尾分别用不同的互斥量管理就可以实现真正意义的并发了。

  • 我们引入虚位节点的概念,表示一个空的节点,没有数据,是一个无效的节点,初始情况下,队列为空,head和tail节点都指向这个虚位节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
template<typename T>
class threadsafe_queue_ht
{
private:
struct node
{
std::shared_ptr<T> data;
std::unique_ptr<node> next;
};
std::mutex head_mutex;
std::unique_ptr<node> head;
std::mutex tail_mutex;
node* tail;
std::condition_variable data_cond;

node* get_tail()
{
std::lock_guard<std::mutex> tail_lock(tail_mutex);
return tail;
}
std::unique_ptr<node> pop_head()
{
std::unique_ptr<node> old_head = std::move(head);
head = std::move(old_head->next);
return old_head;
}
std::unique_lock<std::mutex> wait_for_data()
{
std::unique_lock<std::mutex> head_lock(head_mutex);
data_cond.wait(head_lock,[&] {return head.get() != get_tail(); }); //5
return std::move(head_lock);
}
std::unique_ptr<node> wait_pop_head()
{
std::unique_lock<std::mutex> head_lock(wait_for_data());
return pop_head();
}
std::unique_ptr<node> wait_pop_head(T& value)
{
std::unique_lock<std::mutex> head_lock(wait_for_data());
value = std::move(*head->data);
return pop_head();
}


std::unique_ptr<node> try_pop_head()
{
std::lock_guard<std::mutex> head_lock(head_mutex);
if (head.get() == get_tail())
{
return std::unique_ptr<node>();
}
return pop_head();
}
std::unique_ptr<node> try_pop_head(T& value)
{
std::lock_guard<std::mutex> head_lock(head_mutex);
if (head.get() == get_tail())
{
return std::unique_ptr<node>();
}
value = std::move(*head->data);
return pop_head();
}
public:

threadsafe_queue_ht() : // ⇽-- - 1
head(new node), tail(head.get())
{}

threadsafe_queue_ht(const threadsafe_queue_ht& other) = delete;
threadsafe_queue_ht& operator=(const threadsafe_queue_ht& other) = delete;

std::shared_ptr<T> wait_and_pop() // <------3
{
std::unique_ptr<node> const old_head = wait_pop_head();
return old_head->data;
}

void wait_and_pop(T& value) // <------4
{
std::unique_ptr<node> const old_head = wait_pop_head(value);
}


std::shared_ptr<T> try_pop()
{
std::unique_ptr<node> old_head = try_pop_head();
return old_head ? old_head->data : std::shared_ptr<T>();
}
bool try_pop(T& value)
{
std::unique_ptr<node> const old_head = try_pop_head(value);
return old_head;
}
bool empty()
{
std::lock_guard<std::mutex> head_lock(head_mutex);
return (head.get() == get_tail());
}

void push(T new_value) //<------2
{
std::shared_ptr<T> new_data(
std::make_shared<T>(std::move(new_value)));
std::unique_ptr<node> p(new node);
node* const new_tail = p.get();
std::lock_guard<std::mutex> tail_lock(tail_mutex);
tail->data = new_data;
tail->next = std::move(p);
tail = new_tail;
}
};
  • node为节点类型,包含data和next两个成员。 data为智能指针类型存储T类型的数据。next为指向下一个节点的智能指针,以此形成链表。

  • 上述代码我们的head是一个node类型的智能指针。而tail为node类型的普通指针,读者也可以用智能指针。

  • 在1处构造函数那里,我们将head和tail初始指向的位置设置为虚位节点。

  • 在2 处我们push数据的时候先构造T类型的智能指针存储数据new_data,然后我们构造了一个新的智能指针p, p取出裸指针就是新的虚位节点new_tail,我们将new_data赋值给现在的尾节点,并且让尾节点的next指针指向p, 然后将tail更新为我们新的虚位节点。

  • 3,4处都是wait_and_pop的不同版本,内部调用了wait_pop_head,wait_pop_head内部先调用wait_for_data判断队列是否为空,这里判断是否为空主要是判断head是否指向虚位节点。如果不为空则返回unique_lock,我们显示的调用了move操作,返回unique_lock仍保留对互斥量的锁住状态。

  • 回到wait_pop_head中,接下来执行pop_head将数据pop出来。

  • 值得注意的是get_tail()返回tail节点,那么我们思考如果此时有多个线程push数据,tail节点已经变化了,我们此时在5处的判断可能是基于push之前的tail信息,但是不影响逻辑,因为如果head和tail相等则线程挂起,等待通知,如果不等则继续执行,push操作只会将tail向后移动不会导致逻辑问题。

  • pop_head中我们将head节点移动给一个old_head变量,然后将old_head的next节点更新为新的head。这里我觉得可以简化写为head=head->next.

实现线程安全的查找表

散列表

散列表(Hash table,也叫哈希表),是根据键(Key)而直接访问在存储器存储位置的数据结构。 也就是说,它通过计算出一个键值的函数,将所需查询的数据映射到表中一个位置来让人访问,这加快了查找速度。 这个映射函数称做散列函数,存放记录的数组称做散列表。

设计思路

  • 我们要实现上述逻辑,可以考虑将11,12,13等hash值放入一个vector中。多线程根据key计算得出hash值的过程并不需要加锁,可以实现并行计算。

  • 但是对于链表的增删改查需要加锁。

  • 所以我们考虑将链表封装为一个类bucket_type,支持数据的增删改查。

  • 我们将整体的查找表封装为threadsafe_lookup_table类,实现散列规则和调度bucket_type类。

代码实现

  • 我们先实现内部的bucket_type类. 为了threadsafe_lookup_table可以访问他,所以将threadsafe_lookup_table设置为其友元类
  • 接下来我们设计threadsafe_lookup_table类。我们用一个vector存储上面的bucket_type类型。 因为我们要计算hash值,key可能是多种类型string, int等,所以我们采用std的hash算法作为散列函数即可.
  • get_bucket函数不需要加锁,各个线程可以并行计算哈希值,取出key对应的桶。如果多线程调用同一个bucket的增删改查,就通过bucket内部的互斥解决线程安全问题。 接下来我们完善threadsafe_lookup_table的对外接口

  • 查找表头文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
#pragma once
#include <iostream>
#include <algorithm>
#include <thread>
#include <map>
#include <mutex>
#include <memory>
#include <list>
#include <shared_mutex>
#include <vector>

template<typename Key, typename Value, typename Hash = std::hash<Key>>
class threadsafe_lockup_table {
private:
class bucket_type {
friend class threadsafe_lockup_table;
private:
typedef std::pair<Key, Value> bucket_value;
typedef std::list<bucket_value> bucket_data;
// 在模板中指定类型的时候要加typename
typedef typename bucket_data::iterator bucket_iterator;
bucket_data data;
mutable std::shared_mutex mutex;

bucket_iterator find_entry_for(const Key& key) {
return std::find_if(data.begin(), data.end(), [&](const bucket_value& item) {
return item.first == key;
});
}
public:
//查找key值,找到返回对应的value,没有找到返回提供的默认值
Value value_for(const Key& key, const Value& default_value) {
std::shared_lock<std::shared_mutex> lock(mutex);
bucket_iterator const found_entry = find_entry_for(key);
return (found_entry == data.end()) ? default_value: found_entry->second;
}

//添加key和value,找到更新,没找到就添加
void add_or_update_mapping(const Key& key, const Value& value) {
auto found_entry = find_entry_for(key);
if (found_entry == data.end()) {
data.push_back(bucket_value(key, value));
}
else {
found_entry->second = value;
}
}

//删除对应的key
void remove_mapping(Key const& key)
{
std::unique_lock<std::shared_mutex> lock(mutex);
bucket_iterator const found_entry = find_entry_for(key);
if (found_entry != data.end())
{
data.erase(found_entry);
}
}
};
private:
//用vector存储桶类型
std::vector<std::unique_ptr<bucket_type>> buckets;
//hash<Key> 哈希表 用来根据key生成哈希值
Hash hasher;
public:
//根据key生成数字,并对桶的大小取余得到下标,根据下标返回对应的桶智能指针
bucket_type& get_bucket(Key const& key) const
{
std::size_t const bucket_index = hasher(key) % buckets.size();
return *buckets[bucket_index];
}

threadsafe_lockup_table(unsigned num_buckets = 19, Hash const& hasher_ = Hash()) :
buckets(num_buckets), hasher(hasher_)
{
for (unsigned i = 0; i < num_buckets; ++i)
{
//让智能指针转绑定
buckets[i].reset(new bucket_type);
}
}

threadsafe_lockup_table(threadsafe_lockup_table const& other) = delete;
threadsafe_lockup_table& operator=(threadsafe_lockup_table const& other) = delete;

// Value() 就是用构造函数构造一个默认值,这是查找算法
Value value_for(Key const& key,Value const& default_value = Value())
{
return get_bucket(key).value_for(key, default_value);
}

void add_or_update_mapping(Key const& key, Value const& value)
{
get_bucket(key).add_or_update_mapping(key, value);
}

void remove_mapping(Key const& key)
{
get_bucket(key).remove_mapping(key);
}

std::map<Key, Value> get_map()
{
//对所有的桶都加锁
std::vector<std::unique_lock<std::shared_mutex>> locks;
for (unsigned i = 0; i < buckets.size(); ++i)
{
locks.push_back(std::unique_lock<std::shared_mutex>(buckets[i]->mutex));
}
std::map<Key, Value> res;
for (unsigned i = 0; i < buckets.size(); ++i)
{
//需用typename告诉编译器bucket_type::bucket_iterator是一个类型,以后再实例化
//当然此处可简写成auto it = buckets[i]->data.begin();
typename bucket_type::bucket_iterator it = buckets[i]->data.begin();
for (; it != buckets[i]->data.end(); ++it)
{
res.insert(*it);
}
}
return res;
}
};
  • 自定义类型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
class MyClass
{
public:
MyClass(int i):_data(i){}

friend std::ostream& operator << (std::ostream& os, const MyClass& mc){
os << mc._data;
return os;
}


private:
int _data;
};
  • 测试函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
#include <iostream>
#include <set>
#include <thread>
#include "MyClass.h"
#include "ThreadSafeLockupTable.h"

void TestThreadSafeHash() {
std::set<int> removeSet;
threadsafe_lockup_table<int, std::shared_ptr<MyClass>> table;
std::thread t1([&]() {
for (int i = 0; i < 100; i++)
{
auto class_ptr = std::make_shared<MyClass>(i);
table.add_or_update_mapping(i, class_ptr);
}
});

std::thread t2([&]() {
for (int i = 0; i < 100; )
{
auto find_res = table.value_for(i, nullptr);
if (find_res)
{
table.remove_mapping(i);
removeSet.insert(i);
i++;
}

std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
});

std::thread t3([&]() {
for (int i = 100; i < 200; i++)
{
auto class_ptr = std::make_shared<MyClass>(i);
table.add_or_update_mapping(i, class_ptr);
}
});


t1.join();
t2.join();
t3.join();

for (auto& i : removeSet)
{
std::cout << "remove data is " << i << std::endl;
}

auto copy_map = table.get_map();
for (auto& i : copy_map)
{
std::cout << "copy data is " << *(i.second) << std::endl;
}
}
int main()
{
TestThreadSafeHash();
return 0;
}

实现线程安全链表

  • 如果做一个支持多线程并发访问的链表,我们首先想到的是用一个互斥量控制整个链表,达到多线程访问时串行的效果。但是这么做精度不够,需要分化互斥量的功能。我们想到的一个办法就是每个节点都维护一个互斥量,这样能保证多个线程操作不同节点时加不同的锁,减少耦合性。

  • 另外我们将head独立为一个虚节点,所谓虚节点就是不存储数据,只做头部标记。我们每次从头部插入只需要修将新的节点的next指针指向原来head的next指向的节点,再将head的next指针指向新的节点

线程间切分任务的方法

按数量切分

  • 对于大量处理的数据,可以按照任务数量区分,简单来说如果我们要处理n个任务,总计有m个线程,那么我们可以简单的规划每个线程处理n/m个任务。

线程池原理和实现