1. 线程的创建

C++11中提供了thread类可以直接用来创建线程,创建步骤:

  1. 包含thread所在的头文件 #include <thread>
  2. 在需要的地方直接创建一个thread对象,构造函数中传入一个线程入口函数(或者是一个可调对象)。(普通函数,lambda表达式,重载了()的类都可以传入,普通类成员函数也可以传入)
  3. 选择使用join或者detach的方式对创建的子线程进行管理。

调用join后主线程就会等待子线程 执行完成后汇合。

#include <iostream>
#include <thread>
using namespace std;

//普通函数作为子线程的入口函数
void childThread(const char* str){
    cout << "this is child thread, " << str << endl;
}

//重载()运算符, 类对象可以作为线程入口
class classThread{
public:
    //重载()运算符,变成一个可调用对象
    void operator()(const char* str){
        cout<<"this is class thread, " << str << endl;
    }

    //普通的成员函数也可以作为线程的入口函数
    void func(const char* str){
        cout<<"this is normal class member thread, " << str << endl;
    }
};


int main() {
    thread demo1(childThread, "hello!");
    demo1.join();

    classThread classthread;
    thread demo2(classthread, "hello!");
    demo2.join();

    //lambda表达式作为线程入口函数
    auto func = [](const char* str){
        cout<<"this is lambda thread, "<< str << endl;
    };
    thread demo3(func, "hello!");
    demo3.join();
    
    //普通类成员函数作为线程的入口函数,需要特别注意传入参数!!
    thread demo4(&classThread::func, &classthread, "hello!");
    demo4.join();
    return 0;
}

2. 线程的管理

线程在创建之后,就会被系统调度,为了能够控制线程的状态,有两个函数会在线程创建之后调用。一个是join, 另一个是detach。join比较好理解,调用join之后,主线程就处于阻塞状态,等待子线程执行完成后汇合,join的位置就像是线程的汇合点。

detach相对使用的场合会少一些,相对也比较容易出问题。在调用detach之后,主线程继续执行自己的内容,子线程也去执行自己的东西,两个线程之间没有什么关联。这时候子线程的状态就是“守护线程”,子线程何时结束呢?最简单的情形就是子线程里面的内容执行完成后自动退出(前提是主线程还没有执行完成),另一种情形就是主线程执行完成了,子线程的内容还没有执行完,这时候子线程还是会退出,线程是在进程空间中的,主线程完成,意味着进程的退出,所以整个进程空间中的所有线程都将退出

下面这个例子中我们创建一个子线程,在子线程中我们不断读取系统当前时间,然后把这个时间写到文件里面。如果说主线程退出子线程还在执行的话,文件里面的内容肯定还是在写的。反之主线程退出,就说明子线程退出了。

#include <iostream>
#include <thread>
#include <fstream>
#include <ctime>
#include <cstdio>
using namespace std;

template<typename ... Args>
string string_format(const string& format, Args ... args){
    size_t size = 1 + snprintf(nullptr, 0, format.c_str(), args ...);  // Extra space for \0
    char bytes[size];
    snprintf(bytes, size, format.c_str(), args ...);
    return string(bytes);
}

void getTime(string &str){
    time_t i;
    struct tm *pTimeInfo;
    time(&i);
    pTimeInfo = localtime(&i);
    str = string_format("%02d-%02d-%02d %02d:%02d:%02d",
            pTimeInfo->tm_year + 1900, pTimeInfo->tm_mon + 1, pTimeInfo->tm_mday,
            pTimeInfo->tm_hour, pTimeInfo->tm_min, pTimeInfo->tm_sec);
}

//普通函数作为子线程的入口函数
void childThread(const char* str){
    ofstream f("child.txt");
    string strTime;
    for(;;){
        getTime(strTime);
        f << strTime << endl;
        this_thread::sleep_for(chrono::seconds(5));
    }
}

int main() {
    thread demo1(childThread, "hello!");

    //使用try catch,防止在join之前程序出现异常导致子线程没有join造成问题
    try 
    {
        for (size_t i = 0; i < 100; ++i) {
            this_thread::sleep_for(chrono::milliseconds(10));
            cout << "from main" << endl;
        }
    } catch (...) {
        demo1.detach();
        throw;
    }
    string strTime;
    getTime(strTime);
    cout << strTime << endl;
    demo1.join();
    return 0;
}

在创建了线程之后,还没有调用join或者detach,中间的内容可能会出现异常而退出,从而导致没有调用join或者detach, 所以可以使用try catch把中间的内容包起来,然后处理异常情况。(其实也可以设计一个类,在类的析构函数里面调用join或者detach, 类似lock_guard类。)

//使用try catch处理异常情况。
#include <iostream>
#include <thread>
using namespace std;

//普通函数作为子线程的入口函数
void childThread(const char* str){
    cout << "this is child thread, " << str << endl;
}

int main() {
    thread demo1(childThread, "hello!");

    //使用try catch,防止在join之前程序出现异常导致子线程没有join造成问题
    try {
        for (int i = 0; i < 100; ++i) {
            cout << "form main" << endl;
        }
    } catch (...) {
        demo1.join();
        throw;
    }
    demo1.join();
    return 0;
}

3. data race和mutex

data race就是在多个线程读写同一个数据对象的时候出现一种结果随机的现象。所以造成这个现象必须是在两个或者两个以上的线程存在的情况下。

下面用一个例子来说明一下,对于同一个银行账户,假设原先里面有金额为2000, 现在有两个终端对银行账户进行操作,一个终端向里面每次存1块钱,操作1000次,同时另外一个终端不断的从里面取钱,每次取1块钱,操作1000次。所以等他们执行完成后应该里面还剩下2000就是没有问题的。

#include <iostream>
#include <thread>
using namespace std;
int g_money = 2000;

void deposit(){
    for (int i = 0; i < 1000; ++i) {
        g_money += 1;
        cout<<"deposit: "<< g_money << endl;
    }
}

void withdraw(){
    for (int i = 0; i < 1000; ++i) {
        g_money -= 1;
        cout<<"withdraw: "<< g_money << endl;
    }
}

int main() {
    thread t1(deposit);
    thread t2(withdraw);
    t1.join(); //等待线程执行完成
    t2.join();
    cout<<"finally: "<< g_money<<endl;//运行到最后的结果是随机的,因为前面存在共享数据在两个线程中进行写操作
    return 0;
}

上面的例子中,我们每次运行的结果基本上很少出现一致,因为出现了data race。类似的问题还有很多,比如火车票的售票窗口的售票。为了解决data race,C++11的thread中引入了mutex(对于Windows系统或者Unix系统,提供了更多的锁来对各种不同的情况进行更为精细的处理)。C++11中的mutex的使用相当方便。下面例子中的注释了new的就是新添加的地方。经过修改,下面的代码无论多少次执行都是得到的确定的结果。

mutex的lock和unlock中包含的就是线程间共享读写的数据对象的操作,当一个线程lock成功,其他的线程就只能在lock的位置等待lock成功的那个线程操作完成后unlock。

#include <iostream>
#include <thread>
#include <mutex>                                     //new
using namespace std;
int g_money = 2000;

mutex g_mutex;                                       //new

void deposit(){
    for (int i = 0; i < 1000; ++i) {
        g_mutex.lock();                              //new
        g_money += 1;
        g_mutex.unlock();                            //new
        cout<<"deposit: "<< g_money << endl;
    }
}

void withdraw(){
    for (int i = 0; i < 1000; ++i) {
        g_mutex.lock();                             //new
        g_money -= 1;
        g_mutex.unlock();                           //new
        cout<<"withdraw: "<< g_money << endl;
    }
}

int main() {
    thread t1(deposit);
    thread t2(withdraw);
    t1.join(); //等待线程执行完成
    t2.join();
    cout<<"finally: "<< g_money<<endl;
    return 0;
}

lock之后操作完就必须要unlock, 那么如果一个线程lock之后执行的内容出现了异常导致线程退出,那么其他的线程就永远没有办法执行了?显然,这种调用lock和unlock的方式和前面我们说创建线程后join/detach之前异常退出一样存在风险,所以我们可以使用try catch进行包裹起来,但是这个写法不够优雅。

C++11为我们提供了一个lock_guard模板类,在这个类构造的时候调用lock,析构的时候调用unlock,这样在线程函数出现异常的退出的时候在lock_guard模板类中还是调用了unlock,这样就解决了这种异常导致其他线程死锁的情况发生。

#include <iostream>
#include <thread>
#include <mutex>
using namespace std;
int g_money = 2000;

mutex g_mutex;

void deposit(){
    for (int i = 0; i < 1000; ++i) {
        lock_guard<mutex> lg(g_mutex);
        g_money += 1;
        cout<<"deposit: "<< g_money << endl;
    }
}

void withdraw(){
    for (int i = 0; i < 1000; ++i) {
        lock_guard<mutex> lg(g_mutex);
        g_money -= 1;
        cout<<"withdraw: "<< g_money << endl;
    }
}

int main() {
    thread t1(deposit);
    thread t2(withdraw);
    t1.join(); //等待线程执行完成
    t2.join();
    cout<<"finally: "<< g_money<<endl;
    return 0;
}

4. 死锁

死锁是多个线程之前,使用多个锁的时候,线程之间分别获取了一些锁而等待其他线程释放某些锁,而其他线程又在等待当前线程获取的这些锁。这样就造成了线程之间的相互阻塞,这种现象就叫做死锁。

下面的例子是演示了死锁的出现。线程deposit中g_mutex1.lock()成功,如果这个时候withdraw正好g_mutex2.lock()成功,那么这两个线程就进入了相互等待的死锁状态了。哲学家就餐问题就是一个典型的死锁问题。

#include <iostream>
#include <thread>
#include <mutex>
using namespace std;
int g_money = 2000;

mutex g_mutex1, g_mutex2;

void deposit(){
    for (int i = 0; i < 1000; ++i) {
        g_mutex1.lock();
        g_mutex2.lock();
        g_money += 1;
        g_mutex2.unlock();
        g_mutex1.unlock();
        cout<<"deposit: "<< g_money << endl;
    }
}

void withdraw(){
    for (int i = 0; i < 1000; ++i) {
        g_mutex2.lock();
        g_mutex1.lock();
        g_money -= 1;
        g_mutex1.unlock();
        g_mutex2.unlock();
        cout<<"withdraw: "<< g_money << endl;
    }
}

int main() {
    thread t1(deposit);
    thread t2(withdraw);
    t1.join(); //等待线程执行完成
    t2.join();
    cout<<"finally: "<< g_money<<endl;
    return 0;
}

那么该如何解决上面出现的问题呢?

  1. 一个办法就是我们可以把这些锁的lock的顺序改成一致的,也就是说mutex1写在lock区间的最外层,然后才是mutex2的lock,这样就不会出现相互等待的情形了。
  2. 在实际的使用当中,我们lock的范围一定要最小化,对一个数据进行操作就不会出现两把锁的情况,自然也就不会出现死锁了。
  3. 使用C++11提供的lock函数,lock函数一次可以lock住多个锁,如果有一些lock失败,就会全部unlock。
#include <iostream>
#include <thread>
#include <mutex>
using namespace std;
int g_money = 2000;

mutex g_mutex1, g_mutex2;

void deposit(){
    for (int i = 0; i < 1000; ++i) {
        lock(g_mutex1, g_mutex2);
        g_money += 1;
        g_mutex2.unlock();
        g_mutex1.unlock();
        cout<<"deposit: "<< g_money << endl;
    }
}

void withdraw(){
    for (int i = 0; i < 1000; ++i) {
        lock(g_mutex1, g_mutex2);
        g_money -= 1;
        g_mutex1.unlock();
        g_mutex2.unlock();
        cout<<"withdraw: "<< g_money << endl;
    }
}

int main() {
    thread t1(deposit);
    thread t2(withdraw);
    t1.join(); //等待线程执行完成
    t2.join();
    cout<<"finally: "<< g_money<<endl;
    return 0;
}

上述的代码中我们又使用了lock, unlock。前面我们分析的使用lock_guard解决线程异常退出没有unlock的问题,这里我们还能使用lock_guard吗?好在lock_guard提供了一个adopt_lock参数,填入了这个参数之后在lock_guard的构造函数里面就不再去进行lock了,这个lock_guard就只用负责在析构的时候unlock了。那么下列的例子就是这样:

#include <iostream>
#include <thread>
#include <mutex>
using namespace std;
int g_money = 2000;

mutex g_mutex1, g_mutex2;

void deposit(){
    for (int i = 0; i < 1000; ++i) {
        lock(g_mutex1, g_mutex2);
        lock_guard<mutex> g1(g_mutex1, adopt_lock), g2(g_mutex2, adopt_lock);
        g_money += 1;
        cout<<"deposit: "<< g_money << endl;
    }
}

void withdraw(){
    for (int i = 0; i < 1000; ++i) {
        lock(g_mutex1, g_mutex2);
        lock_guard<mutex> g1(g_mutex1, adopt_lock), g2(g_mutex2, adopt_lock);
        g_money -= 1;
        cout<<"withdraw: "<< g_money << endl;
    }
}

int main() {
    thread t1(deposit);
    thread t2(withdraw);
    t1.join(); //等待线程执行完成
    t2.join();
    cout<<"finally: "<< g_money<<endl;
    return 0;
}

5. unique_lock和lazy_initialization

在解决data race和死锁的问题上,前面我们使用的是lock_guard。lock_guard使用起来非常简单,就是因为太过于简单所以给用户的灵活度非常有限。

互斥锁保证了线程间的同步,但是却将并行操作变成了串行操作,这对性能有很大的影响,所以我们要尽可能的减小锁定的区域,也就是使用细粒度锁

这一点lock_guard做的不好,不够灵活,lock_guard只能保证在析构的时候执行解锁操作,lock_guard本身并没有提供加锁和解锁的接口,但是有些时候会有这种需求。看下面的例子。

class LogFile {
    std::mutex _mu;
    ofstream f;
public:
    LogFile() {
        f.open("log.txt");
    }
    ~LogFile() {
        f.close();
    }
    void shared_print(string msg, int id) {
        {
            std::lock_guard<std::mutex> guard(_mu);
            //do something 1
        }
        //do something 2
        {
            std::lock_guard<std::mutex> guard(_mu);
            // do something 3
            f << msg << id << endl;
            cout << msg << id << endl;
        }
    }
 
};

上面的代码中,一个函数内部有两段代码需要进行保护,这个时候使用lock_guard就需要创建两个局部对象来管理同一个互斥锁(其实也可以只创建一个,但是锁的力度太大,效率不行),修改方法是使用unique_lock。它提供了lock()和unlock()接口,能记录现在处于上锁还是没上锁状态,在析构的时候,会根据当前状态来决定是否要进行解锁(lock_guard就一定会解锁)。上面的代码修改如下:

class LogFile {
    std::mutex _mu;
    ofstream f;
public:
    LogFile() {
        f.open("log.txt");
    }
    ~LogFile() {
        f.close();
    }
    void shared_print(string msg, int id) {
        std::unique_lock<std::mutex> guard(_mu);
        //do something 1
        guard.unlock(); //临时解锁
        //do something 2
        guard.lock(); //继续上锁
        // do something 3
        f << msg << id << endl;
        cout << msg << id << endl;
        // 结束时析构guard会临时解锁
        // 这句话可要可不要,不写,析构的时候也会自动执行
        // guard.ulock();
    }
 
};

上面的代码可以看到,在无需加锁的操作时,可以先临时释放锁,然后需要继续保护的时候,可以继续上锁,这样就无需重复的实例化lock_guard对象,还能减少锁的区域。同样,可以使用std::defer_lock设置初始化的时候不进行默认的上锁操作:

void shared_print(string msg, int id) {
    std::unique_lock<std::mutex> guard(_mu, std::defer_lock);
    //do something 1
 
    guard.lock();
    // do something protected
    guard.unlock(); //临时解锁
 
    //do something 2
 
    guard.lock(); //继续上锁
    // do something 3
    f << msg << id << endl;
    cout << msg << id << endl;
    // 结束时析构guard会临时解锁
}

这样使用起来就比lock_guard更加灵活!然后这也是有代价的,因为它内部需要维护锁的状态,所以效率要比lock_guard低一点,在lock_guard能解决问题的时候,就是用lock_guard,反之,使用unique_lock。

后面在学习条件变量的时候,还会有unique_lock的用武之地。

另外,请注意,unique_lock和lock_guard都不能复制,lock_guard不能移动,但是unique_lock可以!

// unique_lock 可以移动,不能复制
std::unique_lock<std::mutex> guard1(_mu);
std::unique_lock<std::mutex> guard2 = guard1;  // error
std::unique_lock<std::mutex> guard2 = std::move(guard1); // ok
 
// lock_guard 不能移动,不能复制
std::lock_guard<std::mutex> guard1(_mu);
std::lock_guard<std::mutex> guard2 = guard1;  // error
std::lock_guard<std::mutex> guard2 = std::move(guard1); // error

上面的例子中,Client 端总是在初始化的时候构造一个LogFile对象,在构造函数里面需要进行open file操作。试想这样的情况,如果在程序运行过程中一直都不调用shared_print那么我们开始的进行的open file的这个操作岂不是很多余吗?特别是在有大量的初始化构造的对象的时候,这对性能有不小的影响。所以就出现了lazy_initialization的概念,lazy_initialization就是在使用到它的时候才去创建它。就像下面的代码那样:

class LogFile {
    std::mutex _mu;
    ofstream f;
public:
    LogFile() {}
    ~LogFile() {
        f.close();
    }
    void shared_print(string msg, int id) {
        if(!f.isopen()){
            f.open("log.txt");
        }
        std::lock_guard<std::mutex> guard(_mu);
        f << msg << id << endl;
        cout << msg << id << endl;
    }
};

上述的代码很明显暴漏了一个问题,在多线程的情形下,f.isopen()可能会调用多次。使用The Double-Checked Locking Pattern 来解决这个问题:

class LogFile {
    std::mutex _mu, _mu2;
    ofstream f;
public:
    LogFile() {}
    ~LogFile() {
        f.close();
    }
    void shared_print(string msg, int id) {
        if(!f.isopen()){
        	std::lock_guard<std::mutex> guard1(_mu2);
        	if(!f.isopen()){
        		f.open("log.txt");
        	}
        }
        std::lock_guard<std::mutex> guard(_mu);
        f << msg << id << endl;
        cout << msg << id << endl;
    }
};

但是即使是这样也是存在有风险的,论文C++ and the Perils of Double-Checked Locking就这个问题进行了分析。

然而在C++11中引入了std::call_once对这个情况进行了更为优雅的处理方式:

class LogFile {
    std::mutex _mu;
    std::once_flag m_fopenOnce;
    ofstream f;
public:
    LogFile() {}
    ~LogFile() {
        f.close();
    }
    void shared_print(string msg, int id) {
        std::call_once(m_fopenOnce,[&](){m_file.open("log.txt");});
        std::lock_guard<std::mutex> guard(_mu);
        f << msg << id << endl;
    }
};

6. 条件变量

首先还是从下面的一个小程序开始看起:

#include <iostream>
#include <thread>
#include <mutex>
#include <deque>
using namespace std;

mutex g_mutex;
deque<int> q;

void function1(){
    int count = 10;
    while(count > 0){
        unique_lock<mutex> locker(g_mutex);
        q.push_front(count);
        locker.unlock();
        std::this_thread::sleep_for(chrono::second(1));
        count--;
    }
}

void function2(){
    int data = 0;
    while(data != 1){
        unique_lock<mutex> locker(g_mutex);
        if(!q.empty()){
            data = q.back();
            q.pop_back();
            locker.unlock();

            count<<"t2 get data;"<< data <<endl;
        }
        else{
            locker.unlock();
        }

    }
}

int main() {
    thread t1(function1);
    thread t2(function2);
    t1.join(); //等待线程执行完成
    t2.join();
    return 0;
}

上面的程序中有两个线程,一个向队列中添加数据,另一个则从队列中取出数据。添加数据的线程中有一个sleep的操作,这导致了取数据的线程一直在循环检查队列中是否有数据。这种空轮循的方式是不太好的,占用cpu cycle但是也没有做实际的工作。一个小小的trick就是在读取数据线程函数中如果没有获取到数据就进行短暂的sleep, 但这个sleep的时间是比较难控制的,时间太短还是和没有sleep强不到哪里去,如果sleep的时间太长的话那么队列中的数据又会积压的太多。这时候条件变量就派上用场了。

#include <iostream>
#include <thread>
#include <mutex>
#include <deque>
using namespace std;

mutex g_mutex;
condition_variable cond;
deque<int> q;

void function1(){
    int count = 10;
    while(count > 0){
        unique_lock<mutex> locker(g_mutex);
        q.push_front(count);
        locker.unlock();
        cond.notify_one();
        std::this_thread::sleep_for(chrono::second(1));
        count--;
    }
}

void function2(){
    int data = 0;
    while(data != 1){
        unique_lock<mutex> locker(g_mutex);
        cond.wait(locker, [](){return !q.empty();}); //spurious wake 
        data = q.back();
        q.pop_back();
        locker.unlock();
    }
}

int main() {
    thread t1(function1);
    thread t2(function2);
    t1.join(); //等待线程执行完成
    t2.join();
    return 0;
}

7. Future & Promise

8.