C++生产者消费者模型以及多种线程安全实现方法示例(互斥锁、读写锁、条件变量..)

C++生产者消费者模型以及多种线程安全实现方法示例(互斥锁、读写锁、条件变量..),第1张

Contents
  • 生产者消费者模型
  • 多线程加锁
    • 多线程化单线程加锁方式
    • 读写锁解决线程安全
    • 条件变量解决线程安全
    • 信号量解决线程安全

生产者消费者模型
  1. 生产者负责生产对象 object,加入容器;
  2. 消费者负责从容器中拿到 object,消费;

一个简单单线程生产者消费者模型(C++):

#include 
#include 
using namespace std;

// Define
class Object{
private:
    uint32_t ID_;
public:
    void consume() {
        cout << "ID:" << ID_ << " consume." << endl;
    }
    Object(uint32_t ID) : ID_(ID) {}
    uint32_t getID() {return ID_;}
};

class ComsumerAndProductor{

private:
    vector<Object*> pool_;
    mutex mtx_;

public:
    // product 
    void add(Object* obj) {
        pool_.push_back(obj);
        cout << "product object, ID:" << obj->getID() << endl;
    }
    // consume
    void pop() {
        if (!pool_.empty()) {
            Object* obj = pool_.back();
            pool_.pop_back();
            obj->consume();
            delete obj;
        }
    }
};

ComsumerAndProductor cap;

void productThreadFunc() {
    uint32_t i = 0;
    while (true) {
        uint32_t ID = i++; 
        Object* obj = new Object(ID);
        cap.add(obj);
    }
}

void consumeThreadFunc() {
    while (true) {
        cap.pop();
    }
}

通过调用ComsumerAndProductoradd方法生产,pop方法消费。

显然,在多线程前提下,这是线程不安全的模型,当多个线程消费的同时生产者也在添加,会造成严重的错误。

多线程加锁 多线程化单线程加锁方式

首先我们尝试如下加锁方式:

#include 
#include 
#include 
#include 
using namespace std;

// Define
class Object{
private:
    uint32_t ID_;
public:
    void consume() {
        cout << "ID:" << ID_ << " consume." << endl;
    }
    Object(uint32_t ID) : ID_(ID) {}
    uint32_t getID() {return ID_;}
};

class ComsumerAndProductor{

private:
    vector<Object*> pool_;
    mutex mtx_;

public:
    // product 
    void add(Object* obj) {
        lock_guard<mutex> lk(mtx_);
        pool_.push_back(obj);
        cout << "product object, ID:" << obj->getID() << endl;
    }
    // consume
    void pop() {
        lock_guard<mutex> lk(mtx_);
        if (!pool_.empty()) {
            Object* obj = pool_.back();
            pool_.pop_back();
            obj->consume();
            delete obj;
        }
    }
};

ComsumerAndProductor cap;

void productThreadFunc() {
    uint32_t i = 0;
    while (true) {
        uint32_t ID = i++; 
        Object* obj = new Object(ID);
        cap.add(obj);
    }
}

void consumeThreadFunc() {
    while (true) {
        cap.pop();
    }
}

int main(int argc, char** argv) {
    vector<thread> consumeThreads;
    vector<thread> productThreads;
    for (int i = 0; i < 3; ++i) {
        consumeThreads.push_back(thread(consumeThreadFunc));
    }
    for (int i = 0; i < 1; ++i) {
        productThreads.push_back(thread(productThreadFunc));
    }
    for (auto &t : consumeThreads) {
        t.join();
    }
    for (auto &t : productThreads) {
        t.join();
    }
    return 0;
}

popadd的开头处增加lock_guard lk(mtx_);,这样的确可以让程序正常运行,但是,当某个线程生产或者消费时,其余所有线程处于等待拿锁的状态,这实际上让多线程再次转化为了单线程,因此这种加锁方式在消费时没有体现多线程的作用。但生产者-消费者模型的初衷只是解决线程安全问题,因此这种写法也是合理的。

相比于mutex功能,lock_guard具有创建时加锁,析构时解锁的功能,类似于智能指针,为了防止在线程使用mutex加锁后异常退出导致死锁的问题,建议使用lock_guard代替mutex。

读写锁解决线程安全

C++17引入了shared_mutex,在读线程使用shared_lock lk(smutex_);,所有读线程共享这把锁,都能往下继续执行。在写线程,使用std::unique_lock lk(smutex_);独占,写线程只有一个能往下执行,且读线程此时阻塞。可能我们很容易写出如下代码,但值得注意的是,这份代码依旧不安全:

#include 
#include 
#include 
#include 
#include 
using namespace std;

// Define
class Object{
private:
    uint32_t ID_;
public:
    void consume() {
        cout << "ID:" << ID_ << " consume." << endl;
    }
    Object(uint32_t ID) : ID_(ID) {}
    uint32_t getID() {return ID_;}
};

class ComsumerAndProductor{

private:
    vector<Object*> pool_;
    shared_mutex smutex_;
public:
    // product 
    void add(Object* obj) {
        std::unique_lock<shared_mutex> lk(smutex_);
        pool_.push_back(obj);
        cout << "product object, ID:" << obj->getID() << endl;
    }
    // consume
    void pop() {
        shared_lock<shared_mutex> lk(smutex_);
        if (!pool_.empty()) {   
            obj = pool_.back();
            pool_.pop_back();
            obj->consume();
        }
        delete obj;
    }
};

ComsumerAndProductor cap;

void productThreadFunc() {
    uint32_t i = 0;
    while (true) {
        uint32_t ID = i++; 
        Object* obj = new Object(ID);
        cap.add(obj);
    }
}

void consumeThreadFunc() {
    while (true) {
        cap.pop();
    }
}

int main(int argc, char** argv) {
    vector<thread> consumeThreads;
    vector<thread> productThreads;
    for (int i = 0; i < 3; ++i) {
        consumeThreads.push_back(thread(consumeThreadFunc));
    }
    for (int i = 0; i < 1; ++i) {
        productThreads.push_back(thread(productThreadFunc));
    }
    for (auto &t : consumeThreads) {
        t.join();
    }
    for (auto &t : productThreads) {
        t.join();
    }
    return 0;
}

pop中,由于共享锁机制,可能多个消费线程进入该函数,假设当前消费池有2个待消费object,此时三个线程均检查出消费池不为空,取出消费池一个object,当第三个线程准备取出时,此时消费池已经为空,则出现错误。一种解决方案是在取出object的时候加独占锁(消费在独占锁外):

#include 
#include 
#include 
#include 
#include 
using namespace std;

// Define
class Object{
private:
    uint32_t ID_;
public:
    void consume() {
        cout << "ID:" << ID_ << " consume." << endl;
    }
    Object(uint32_t ID) : ID_(ID) {}
    uint32_t getID() {return ID_;}
};

class ComsumerAndProductor{

private:
    vector<Object*> pool_;
    shared_mutex smutex_;
    mutex emutex_;
public:
    // product 
    void add(Object* obj) {
        std::unique_lock<shared_mutex> lk(smutex_);
        pool_.push_back(obj);
        cout << "product object, ID:" << obj->getID() << endl;
    }
    // consume
    void pop() {
        shared_lock<shared_mutex> lk(smutex_);
        Object* obj = nullptr;
        {
            lock_guard<mutex> lg(emutex_); // 独占锁保护取object的过程
            if (!pool_.empty()) {   
                obj = pool_.back();
                pool_.pop_back();
            }
        }
        if (obj) obj->consume();
        delete obj;
    }
};

ComsumerAndProductor cap;

void productThreadFunc() {
    uint32_t i = 0;
    while (true) {
        uint32_t ID = i++; 
        Object* obj = new Object(ID);
        cap.add(obj);
    }
}

void consumeThreadFunc() {
    while (true) {
        cap.pop();
    }
}

int main(int argc, char** argv) {
    vector<thread> consumeThreads;
    vector<thread> productThreads;
    for (int i = 0; i < 3; ++i) {
        consumeThreads.push_back(thread(consumeThreadFunc));
    }
    for (int i = 0; i < 1; ++i) {
        productThreads.push_back(thread(productThreadFunc));
    }
    for (auto &t : consumeThreads) {
        t.join();
    }
    for (auto &t : productThreads) {
        t.join();
    }
    return 0;
}

这份代码是线程安全的。同时值得注意的是,这份代码支持多个线程同时消费object,而不是某一时刻只有一个线程消费。提升了部分效率。

unique_lock相较于lock_guard更加灵活,其基本功能相同,但是unique_lock可以手动unlock,而lock_guard只能在析构的时候unlock。

条件变量解决线程安全

用条件变量还是属于单线程消费,使用条件变量有许多值得注意的地方,其中消费端一定要把wait()放在while条件循环中,这可以避免虚假唤醒。

#include 
#include 
#include 
#include 
#include 
using namespace std;

// Define
class Object{
private:
    uint32_t ID_;
public:
    void consume() {
        cout << "ID:" << ID_ << " consume." << endl;
    }
    Object(uint32_t ID) : ID_(ID) {}
    uint32_t getID() {return ID_;}
};

class ComsumerAndProductor{

private:
    vector<Object*> pool_;
    condition_variable condConsumer_;
    mutex mtx_;
    
public:
    // case 1: condition variable 
    // product 
    void add(Object* obj) {
        unique_lock<mutex> lk(mtx_);
        pool_.push_back(obj);
        cout << "product object, ID:" << obj->getID() << endl;
        lk.unlock();
        condConsumer_.notify_one();
    }
    // consume
    void pop() {
        unique_lock<mutex> lk(mtx_);
        while (pool_.empty()) {
            condConsumer_.wait(lk);
        }
        Object* obj = pool_.back();
        pool_.pop_back();   
        obj->consume();
        delete obj;
    }
};

ComsumerAndProductor cap;

void productThreadFunc() {
    uint32_t i = 0;
    while (true) {
        uint32_t ID = i++; 
        Object* obj = new Object(ID);
        cap.add(obj);
    }
}

void consumeThreadFunc() {
    while (true) {
        cap.pop();
    }
}

int main(int argc, char** argv) {
    vector<thread> consumeThreads;
    vector<thread> productThreads;
    for (int i = 0; i < 3; ++i) {
        consumeThreads.push_back(thread(consumeThreadFunc));
    }
    for (int i = 0; i < 2; ++i) {
        productThreads.push_back(thread(productThreadFunc));
    }
    for (auto &t : consumeThreads) {
        t.join();
    }
    for (auto &t : productThreads) {
        t.join();
    }
    return 0;
}

条件变量的使用注意事项:

wait端:

lock(mutex);
while (queue.empty()) {
	cond.wait();
}
x = queue.pop();
unlock(mutex);

signal/broadcast端:

lock(mutex);
queue.push_back(x);
unlock(mutex);
cond.notify();

虚假唤醒分析:

在本文这个生产者-消费者例子中,如果把pop中的while换成if,线程A B C一开始wait住,此时线程D生产一个对象,唤醒B(假设B竞争成功),B拿到锁开始消费逻辑,消费完毕释放锁,此时线程D再次生产一个对象,唤醒A(假设A C竞争A成功),A需要加锁,在还没加上锁时,B再次拿到锁进入if判断,发现此时消费队列不为空,直接取出对象消费,然后释放锁,之后A拿到锁,往下执行,但是消费队列原来给它消费的对象被线程B“窃取”了,再次取对象时出现错误。

如果把if换成while,在A拿到锁的时候会再次判断while中队列是否为空,此时由于被窃取对象,队列为空,因此又会执行wait(),再次进入等待。这样就解决了虚假唤醒的现象。

信号量解决线程安全

一种简单得用信号量替换互斥锁的方案:

#include 
#include 
#include 
#include 
using namespace std;

// Define
class Object{
private:
    uint32_t ID_;
public:
    void consume() {
        cout << "ID:" << ID_ << " consume." << endl;
    }
    Object(uint32_t ID) : ID_(ID) {}
    uint32_t getID() {return ID_;}
};

class ComsumerAndProductor{

private:
    vector<Object*> pool_;
    sem_t sem_;
    
public:
    ComsumerAndProductor(){
        sem_init(&sem_, 0, 1);
    }
    // case 1: semaphore
    // product 
    void add(Object* obj) {
        sem_wait(&sem_);
        pool_.push_back(obj);
        cout << "product object, ID:" << obj->getID() << endl;
        sem_post(&sem_);
    }
    // consume
    void pop() {
        sem_wait(&sem_);
        if (!pool_.empty()){
            Object* obj = pool_.back();
            pool_.pop_back();   
            obj->consume();
            delete obj;
        }
        sem_post(&sem_);
    }
};

ComsumerAndProductor cap;

void productThreadFunc() {
    uint32_t i = 0;
    while (true) {
        uint32_t ID = i++; 
        Object* obj = new Object(ID);
        cap.add(obj);
    }
}

void consumeThreadFunc() {
    while (true) {
        cap.pop();
    }
}

int main(int argc, char** argv) {
    vector<thread> consumeThreads;
    vector<thread> productThreads;
    for (int i = 0; i < 3; ++i) {
        consumeThreads.push_back(thread(consumeThreadFunc));
    }
    for (int i = 0; i < 2; ++i) {
        productThreads.push_back(thread(productThreadFunc));
    }
    for (auto &t : consumeThreads) {
        t.join();
    }
    for (auto &t : productThreads) {
        t.join();
    }
    return 0;
}

欢迎分享,转载请注明来源:内存溢出

原文地址:https://www.54852.com/langs/3002163.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2022-09-27
下一篇2022-09-27

发表评论

登录后才能评论

评论列表(0条)

    保存