
- 生产者消费者模型
- 多线程加锁
- 多线程化单线程加锁方式
- 读写锁解决线程安全
- 条件变量解决线程安全
- 信号量解决线程安全
- 生产者负责生产对象
object,加入容器; - 消费者负责从容器中拿到
object,消费;
一个简单单线程生产者消费者模型(C++):
#include
#include
using namespace std;
// Define
class Object{
private:
uint32_t ID_;
public:
void consume() {
cout << "ID:" << ID_ << " consume." << endl;
}
Object(uint32_t ID) : ID_(ID) {}
uint32_t getID() {return ID_;}
};
class ComsumerAndProductor{
private:
vector<Object*> pool_;
mutex mtx_;
public:
// product
void add(Object* obj) {
pool_.push_back(obj);
cout << "product object, ID:" << obj->getID() << endl;
}
// consume
void pop() {
if (!pool_.empty()) {
Object* obj = pool_.back();
pool_.pop_back();
obj->consume();
delete obj;
}
}
};
ComsumerAndProductor cap;
void productThreadFunc() {
uint32_t i = 0;
while (true) {
uint32_t ID = i++;
Object* obj = new Object(ID);
cap.add(obj);
}
}
void consumeThreadFunc() {
while (true) {
cap.pop();
}
}
通过调用ComsumerAndProductor的add方法生产,pop方法消费。
显然,在多线程前提下,这是线程不安全的模型,当多个线程消费的同时生产者也在添加,会造成严重的错误。
多线程加锁 多线程化单线程加锁方式首先我们尝试如下加锁方式:
#include
#include
#include
#include
using namespace std;
// Define
class Object{
private:
uint32_t ID_;
public:
void consume() {
cout << "ID:" << ID_ << " consume." << endl;
}
Object(uint32_t ID) : ID_(ID) {}
uint32_t getID() {return ID_;}
};
class ComsumerAndProductor{
private:
vector<Object*> pool_;
mutex mtx_;
public:
// product
void add(Object* obj) {
lock_guard<mutex> lk(mtx_);
pool_.push_back(obj);
cout << "product object, ID:" << obj->getID() << endl;
}
// consume
void pop() {
lock_guard<mutex> lk(mtx_);
if (!pool_.empty()) {
Object* obj = pool_.back();
pool_.pop_back();
obj->consume();
delete obj;
}
}
};
ComsumerAndProductor cap;
void productThreadFunc() {
uint32_t i = 0;
while (true) {
uint32_t ID = i++;
Object* obj = new Object(ID);
cap.add(obj);
}
}
void consumeThreadFunc() {
while (true) {
cap.pop();
}
}
int main(int argc, char** argv) {
vector<thread> consumeThreads;
vector<thread> productThreads;
for (int i = 0; i < 3; ++i) {
consumeThreads.push_back(thread(consumeThreadFunc));
}
for (int i = 0; i < 1; ++i) {
productThreads.push_back(thread(productThreadFunc));
}
for (auto &t : consumeThreads) {
t.join();
}
for (auto &t : productThreads) {
t.join();
}
return 0;
}
在pop和add的开头处增加lock_guard,这样的确可以让程序正常运行,但是,当某个线程生产或者消费时,其余所有线程处于等待拿锁的状态,这实际上让多线程再次转化为了单线程,因此这种加锁方式在消费时没有体现多线程的作用。但生产者-消费者模型的初衷只是解决线程安全问题,因此这种写法也是合理的。
读写锁解决线程安全相比于mutex功能,lock_guard具有创建时加锁,析构时解锁的功能,类似于智能指针,为了防止在线程使用mutex加锁后异常退出导致死锁的问题,建议使用lock_guard代替mutex。
C++17引入了shared_mutex,在读线程使用shared_lock,所有读线程共享这把锁,都能往下继续执行。在写线程,使用std::unique_lock独占,写线程只有一个能往下执行,且读线程此时阻塞。可能我们很容易写出如下代码,但值得注意的是,这份代码依旧不安全:
#include
#include
#include
#include
#include
using namespace std;
// Define
class Object{
private:
uint32_t ID_;
public:
void consume() {
cout << "ID:" << ID_ << " consume." << endl;
}
Object(uint32_t ID) : ID_(ID) {}
uint32_t getID() {return ID_;}
};
class ComsumerAndProductor{
private:
vector<Object*> pool_;
shared_mutex smutex_;
public:
// product
void add(Object* obj) {
std::unique_lock<shared_mutex> lk(smutex_);
pool_.push_back(obj);
cout << "product object, ID:" << obj->getID() << endl;
}
// consume
void pop() {
shared_lock<shared_mutex> lk(smutex_);
if (!pool_.empty()) {
obj = pool_.back();
pool_.pop_back();
obj->consume();
}
delete obj;
}
};
ComsumerAndProductor cap;
void productThreadFunc() {
uint32_t i = 0;
while (true) {
uint32_t ID = i++;
Object* obj = new Object(ID);
cap.add(obj);
}
}
void consumeThreadFunc() {
while (true) {
cap.pop();
}
}
int main(int argc, char** argv) {
vector<thread> consumeThreads;
vector<thread> productThreads;
for (int i = 0; i < 3; ++i) {
consumeThreads.push_back(thread(consumeThreadFunc));
}
for (int i = 0; i < 1; ++i) {
productThreads.push_back(thread(productThreadFunc));
}
for (auto &t : consumeThreads) {
t.join();
}
for (auto &t : productThreads) {
t.join();
}
return 0;
}
在pop中,由于共享锁机制,可能多个消费线程进入该函数,假设当前消费池有2个待消费object,此时三个线程均检查出消费池不为空,取出消费池一个object,当第三个线程准备取出时,此时消费池已经为空,则出现错误。一种解决方案是在取出object的时候加独占锁(消费在独占锁外):
#include
#include
#include
#include
#include
using namespace std;
// Define
class Object{
private:
uint32_t ID_;
public:
void consume() {
cout << "ID:" << ID_ << " consume." << endl;
}
Object(uint32_t ID) : ID_(ID) {}
uint32_t getID() {return ID_;}
};
class ComsumerAndProductor{
private:
vector<Object*> pool_;
shared_mutex smutex_;
mutex emutex_;
public:
// product
void add(Object* obj) {
std::unique_lock<shared_mutex> lk(smutex_);
pool_.push_back(obj);
cout << "product object, ID:" << obj->getID() << endl;
}
// consume
void pop() {
shared_lock<shared_mutex> lk(smutex_);
Object* obj = nullptr;
{
lock_guard<mutex> lg(emutex_); // 独占锁保护取object的过程
if (!pool_.empty()) {
obj = pool_.back();
pool_.pop_back();
}
}
if (obj) obj->consume();
delete obj;
}
};
ComsumerAndProductor cap;
void productThreadFunc() {
uint32_t i = 0;
while (true) {
uint32_t ID = i++;
Object* obj = new Object(ID);
cap.add(obj);
}
}
void consumeThreadFunc() {
while (true) {
cap.pop();
}
}
int main(int argc, char** argv) {
vector<thread> consumeThreads;
vector<thread> productThreads;
for (int i = 0; i < 3; ++i) {
consumeThreads.push_back(thread(consumeThreadFunc));
}
for (int i = 0; i < 1; ++i) {
productThreads.push_back(thread(productThreadFunc));
}
for (auto &t : consumeThreads) {
t.join();
}
for (auto &t : productThreads) {
t.join();
}
return 0;
}
这份代码是线程安全的。同时值得注意的是,这份代码支持多个线程同时消费object,而不是某一时刻只有一个线程消费。提升了部分效率。
条件变量解决线程安全unique_lock相较于lock_guard更加灵活,其基本功能相同,但是unique_lock可以手动unlock,而lock_guard只能在析构的时候unlock。
用条件变量还是属于单线程消费,使用条件变量有许多值得注意的地方,其中消费端一定要把wait()放在while条件循环中,这可以避免虚假唤醒。
#include
#include
#include
#include
#include
using namespace std;
// Define
class Object{
private:
uint32_t ID_;
public:
void consume() {
cout << "ID:" << ID_ << " consume." << endl;
}
Object(uint32_t ID) : ID_(ID) {}
uint32_t getID() {return ID_;}
};
class ComsumerAndProductor{
private:
vector<Object*> pool_;
condition_variable condConsumer_;
mutex mtx_;
public:
// case 1: condition variable
// product
void add(Object* obj) {
unique_lock<mutex> lk(mtx_);
pool_.push_back(obj);
cout << "product object, ID:" << obj->getID() << endl;
lk.unlock();
condConsumer_.notify_one();
}
// consume
void pop() {
unique_lock<mutex> lk(mtx_);
while (pool_.empty()) {
condConsumer_.wait(lk);
}
Object* obj = pool_.back();
pool_.pop_back();
obj->consume();
delete obj;
}
};
ComsumerAndProductor cap;
void productThreadFunc() {
uint32_t i = 0;
while (true) {
uint32_t ID = i++;
Object* obj = new Object(ID);
cap.add(obj);
}
}
void consumeThreadFunc() {
while (true) {
cap.pop();
}
}
int main(int argc, char** argv) {
vector<thread> consumeThreads;
vector<thread> productThreads;
for (int i = 0; i < 3; ++i) {
consumeThreads.push_back(thread(consumeThreadFunc));
}
for (int i = 0; i < 2; ++i) {
productThreads.push_back(thread(productThreadFunc));
}
for (auto &t : consumeThreads) {
t.join();
}
for (auto &t : productThreads) {
t.join();
}
return 0;
}
条件变量的使用注意事项:
wait端:
lock(mutex);
while (queue.empty()) {
cond.wait();
}
x = queue.pop();
unlock(mutex);
signal/broadcast端:
lock(mutex);
queue.push_back(x);
unlock(mutex);
cond.notify();
虚假唤醒分析:
在本文这个生产者-消费者例子中,如果把pop中的while换成if,线程A B C一开始wait住,此时线程D生产一个对象,唤醒B(假设B竞争成功),B拿到锁开始消费逻辑,消费完毕释放锁,此时线程D再次生产一个对象,唤醒A(假设A C竞争A成功),A需要加锁,在还没加上锁时,B再次拿到锁进入if判断,发现此时消费队列不为空,直接取出对象消费,然后释放锁,之后A拿到锁,往下执行,但是消费队列原来给它消费的对象被线程B“窃取”了,再次取对象时出现错误。
如果把if换成while,在A拿到锁的时候会再次判断while中队列是否为空,此时由于被窃取对象,队列为空,因此又会执行wait(),再次进入等待。这样就解决了虚假唤醒的现象。
一种简单得用信号量替换互斥锁的方案:
#include
#include
#include
#include
using namespace std;
// Define
class Object{
private:
uint32_t ID_;
public:
void consume() {
cout << "ID:" << ID_ << " consume." << endl;
}
Object(uint32_t ID) : ID_(ID) {}
uint32_t getID() {return ID_;}
};
class ComsumerAndProductor{
private:
vector<Object*> pool_;
sem_t sem_;
public:
ComsumerAndProductor(){
sem_init(&sem_, 0, 1);
}
// case 1: semaphore
// product
void add(Object* obj) {
sem_wait(&sem_);
pool_.push_back(obj);
cout << "product object, ID:" << obj->getID() << endl;
sem_post(&sem_);
}
// consume
void pop() {
sem_wait(&sem_);
if (!pool_.empty()){
Object* obj = pool_.back();
pool_.pop_back();
obj->consume();
delete obj;
}
sem_post(&sem_);
}
};
ComsumerAndProductor cap;
void productThreadFunc() {
uint32_t i = 0;
while (true) {
uint32_t ID = i++;
Object* obj = new Object(ID);
cap.add(obj);
}
}
void consumeThreadFunc() {
while (true) {
cap.pop();
}
}
int main(int argc, char** argv) {
vector<thread> consumeThreads;
vector<thread> productThreads;
for (int i = 0; i < 3; ++i) {
consumeThreads.push_back(thread(consumeThreadFunc));
}
for (int i = 0; i < 2; ++i) {
productThreads.push_back(thread(productThreadFunc));
}
for (auto &t : consumeThreads) {
t.join();
}
for (auto &t : productThreads) {
t.join();
}
return 0;
}
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)