C++并发编程一书的笔记,主要是关于std::thread的内容。

除了阅读英文原版书外,还参考了以下视频:

C++ 并发编程(1) 线程基础,为什么线程参数默认传参方式是值拷贝?

一.线程控制

1.启动线程与等待结束

可以直接传入函数,重载()的类(注意如果直接初始化一个类对象传入,要加括号),或lambda表达式创建线程,也可以绑定类的函数。参数也直接在后面传入。通过join等待线程运行结束。

1
2
std::thread t(func,arg1,arg2);
t.join();

示例程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class bg_task{
public:
void operator()(){
std::cout<< "bg_task running..."<<std::endl;
}
};

void func1(const std::string a){
std::cout<<a<<std::endl;
}

int main(){
std::thread t1(func1,"thread t1 call func1");
// std::thread t2(bg_task()); 这会被认为是一个函数! 参数是函数指针,返回值是thread
std::thread t2((bg_task())); //使用括号,里面就是初始化一个变量,就可以正常定义线程了
std::thread t3([](std::string s){std::cout<<s<<std::endl;}, "thread t3 running..."); //lambda表达式

problem1();

t1.join();
t2.join();
t3.join();
return 0;
}

为了保证主线程发生异常时,子线程也能回收,需要使用try catch来捕获主线程的异常。为了方便编写程序,通常使用RAII思想,封装一个类来保证子线程一定可以回收:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class thread_guard{
std::thread &t;
public:
explicit thread_guard(std::thread &_t): t(_t){}
~thread_guard(){
if(t.joinable()){ //需要确认可以join
t.join();
}
}
thread_guard(thread_guard const &) = delete;
thread_guard& operator=(thread_guard const &) = delete;
}

int func(){return 0};

void f(){
std::thread t(func);
thread_guard g(t);
//...
}

也可以不使用join回收线程,只需要设置其为detach状态:

1
t.detach();

绑定类的函数的示例如下,如果有需要,参数补在后面就可以:

1
2
3
4
5
6
7
8
9
10
11
class X{
public:
void do_lengthy_work() {
std::cout << "do_lengthy_work " << std::endl;
}
};
void bind_class_oops() {
X my_x;
std::thread t(&X::do_lengthy_work, &my_x);
t.join();
}

2.传参

和一般的函数传参一样,要注意保证传进去的变量在运行期间存在:

1
2
3
4
5
6
7
8
9
10
11
12
void func2(int &a){
for(int i=0;i<5;i++){
a = i;
}
}

void problem1(){
int x = 0;
std::thread t(func2, std::ref(x));
t.detach(); //x会被回收,func2就不应该使用x了
//解决方式有:使用智能指针;传值;join
}

在上面的代码中可以看到,thread的构造需要将引用参数使用std::ref封装,因为thread会将所有参数作为按值移动处理,再调用函数。

还有下面这种传指针的情况:

1
2
3
4
5
6
7
8
9
10
11
12
13
void f(int i, std::string const &s);
//错误的
void oops(int arg){
char buffer[1024];
sprintf(buffer,"%i",arg);
std::thread(f,3,buffer);
}
//正确的
void oops(int arg){
char buffer[1024];
sprintf(buffer,"%i",arg);
std::thread(f,3,std::string(buffer));
}

在传递参数时,有些需要传递的参数是不可复制的,这时可以通过move将其传递(move将左值转换为右值,使用移动构造函数):

1
2
3
4
5
6
7
8
9
10
11
12
void deal_unique(std::unique_ptr<int> p) {
std::cout << "unique ptr data is " << *p << std::endl;
(*p)++;
std::cout << "after unique ptr data is " << *p << std::endl;
}
void move_oops() {
auto p = std::make_unique<int>(100);
std::thread t(deal_unique, std::move(p));
t.join();
//不能再使用p了,p已经被move废弃
// std::cout << "after unique ptr data is " << *p << std::endl;
}

3.所有权转移

线程管理变量没有拷贝构造和赋值函数,线程的所有权通过move进行转移:

1
2
3
4
5
void func1();
void func2();
//创建线程并转移所有权
std::thread t1(func1);
std::thread t2 = std::move(t1);

4.运行时决定线程数

在选定线程数时,通常希望线程数不大于硬件核数。可以使用std::thread::harware_concurrency()来确定线程数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#include<iostream>
#include<thread>

const int N = 1e5+5;
const int MIN_SIZE = 1000;

void func(int i){
std::cout<<"thread"<<i<<" running..."<<std::endl;
}

int main(){
int k = std::thread::hardware_concurrency();
int max_threads = N/MIN_SIZE;
int num_threads = std::min(k!=0?k:2, max_threads);
std::thread threads[num_threads-1];
std::cout<<"hardware_concurrency is "<<k<<" and num_threads is "<<num_threads<<std::endl;
for(int i=0;i<num_threads-1;i++){
threads[i] = std::thread(func,i);
}
std::cout<<"main thread running..."<<std::endl;
for(int i=0;i<num_threads-1;i++){
threads[i].join();
}
return 0;
}

不均匀的部分由主线程来完成,创建num_threads-1个线程。

二.共享数据

1.使用锁保护共享数据

锁的使用

C++提供了锁来在多线程时保护共享数据,并提供了lock_guard类,实现锁的RAII,还提供了一个unique_lock类来配合条件变量的使用,unique_lock可以手动解锁或用于条件变量中解锁:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include<list>
#include<mutex>

std::list<int> li;
std::mutex mtx;

void add_to_list(int val){
//lock_guard只有构造和析构函数,在构造时上锁,析构时解锁
std::lock_guard<std::mutex> guard(mtx);
li.push_back(val);
}

//std::unique_lock的例子
void wait_fn(ThreadState* thread_state) {
// A lock must be held in order to wait on a condition variable.
// This lock is atomically released before the thread goes to sleep
// when `wait()` is called. The lock is atomically re-acquired when
// the thread is woken up using `notify_all()`.
std::unique_lock<std::mutex> lk(*thread_state->mutex_);
thread_state->condition_variable_->wait(lk);
// Increment the shared counter with the lock re-acquired to inform the
// signaling thread that this waiting thread has successfully been
// woken up.
thread_state->counter_++;
printf("Lock re-acquired after wait()...\n");
lk.unlock();
}

并不是添加了锁,就可以保护共享数据,通常锁会封装到需要操作的类里,而如果函数返回指针或引用,就会导致锁无法控制对数据的修改和访问。不仅是返回指针和引用,调用函数也有风险,如果作为参数将指针或引用传入,函数就可以不需要锁也能访问共享数据。

除了以上情况,还有一种情况也不能保证对共享数据的访问不会导致竞态,这种情况是由于接口本身的设计导致的。如果多个线程对一个栈进行操作,栈只有一个元素,两个栈轮流调用empty()判断栈不空,然后都pop()出栈,就会产生错误。类似的情况是都在判空后调用top(),两个线程看到相同的值,然后各pop一次,导致一个值丢失。这是设计上的问题,考虑stack>,vector的复制是要开辟空间然后复制数据的,如果空间开辟不够,就会产生std:bad_alloc,如果pop直接获取元素,可能会在复制时失败,而元素也出栈,这个元素就丢失了。为了解决这个问题,stl设计者将操作分为top和pop,这样top没有取到值会先产生std::bad_alloc,数据不会丢失。

解决上述竞态问题有不同的方案:

Option 1:传递引用。这种方式的局限性在于需要在调用pop之前进行构造,这是消耗时间与资源的,并且对于用户定义的类可能不支持赋值操作。

1
2
std::vector<int> result;
s.pop(result);

Option2:使用会抛出异常的拷贝构造和移动构造。确定是不可能在编译时找出不抛出异常的这两种构造。

Option3:返回pop item的指针。返回指针能有效解决传值失败的问题,如果采用这种方法,最好使用shared_ptr,可以避免内存泄露。

以上的方案也可以同时采用,书中给出了线程安全的栈的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
struct empty_stack : std::exception
{
const char* what() const throw();
};
template<typename T>
class threadsafe_stack
{
private:
std::stack<T> data;
mutable std::mutex m;
public:
threadsafe_stack() {}
threadsafe_stack(const threadsafe_stack& other)
{
std::lock_guard<std::mutex> lock(other.m);
//①在构造函数的函数体(constructor body)内进行复制操作
data = other.data;
}
threadsafe_stack& operator=(const threadsafe_stack&) = delete;
void push(T new_value)
{
std::lock_guard<std::mutex> lock(m);
data.push(std::move(new_value));
}
std::shared_ptr<T> pop()
{
std::lock_guard<std::mutex> lock(m);
//②试图弹出前检查是否为空栈
if (data.empty()) throw empty_stack();
//③改动栈容器前设置返回值
std::shared_ptr<T> const res(std::make_shared<T>(data.top()));
data.pop();
return res;
}
void pop(T& value)
{
std::lock_guard<std::mutex> lock(m);
if (data.empty()) throw empty_stack();
value = data.top();
data.pop();
}
bool empty() const
{
std::lock_guard<std::mutex> lock(m);
return data.empty();
}
};