同步队列
参考连接:https://github.com/cameron314/concurrentqueue
在项目引入concurrentqueue.h和blockingconcurrentqueue.h即可使用同步队列
moodycamel::BlockingConcurrentQueue<int> q;
一个简化版的同步队列
//
// Copyright (c) 2013 Juan Palacios [email protected]
// Subject to the BSD 2-Clause License
// - see < http://opensource.org/licenses/BSD-2-Clause>
//
#ifndef CONCURRENT_QUEUE_
#define CONCURRENT_QUEUE_
#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>
template <typename T>
class Queue
{
public:
T pop()
{
std::unique_lock<std::mutex> mlock(mutex_);
while (queue_.empty())
{
cond_.wait(mlock);
}
auto val = queue_.front();
queue_.pop();
return val;
}
void pop(T& item)
{
std::unique_lock<std::mutex> mlock(mutex_);
while (queue_.empty())
{
cond_.wait(mlock);
}
item = queue_.front();
queue_.pop();
}
void push(const T& item)
{
std::unique_lock<std::mutex> mlock(mutex_);
queue_.push(item);
mlock.unlock();
cond_.notify_one();
}
Queue()=default;
Queue(const Queue&) = delete; // disable copying
Queue& operator=(const Queue&) = delete; // disable assignment
private:
std::queue<T> queue_;
std::mutex mutex_;
std::condition_variable cond_;
};
#endif
同步HASHMAP
#ifndef CONCURRENT_MAP
#define CONCURRENT_MAP
#include <boost/thread.hpp>
#include <map>
template<typename Key, typename Value>
class ConcurrentMap
{
typedef boost::shared_lock shared_lock;
typedef boost::shared_mutex shared_mutex;
public:
ConcurrentMap();
bool has(Key k) const
{
shared_lock<shared_mutex> lock(schemaAccess);
return m.find(k) == m.end();
}
void erase(Key k)
{
upgrade_lock<shared_mutex> schemaLock(schemaAccess);
upgrade_to_unique_lock<shared_mutex> schemaUniqueLock(schemaLock);
valueAccess.erase(k);
m.erase(k);
}
void set(Key k, Value v)
{
shared_lock<shared_mutex> lock(schemaAccess);
// set k, v
if(m.find(k) == m.end()) {
upgrade_lock<shared_mutex> valueLock(*valueAccess[k]);
upgrade_to_unique_lock<shared_mutex> valueUniqueLock(valueLock);
m.at(k) = v;
}
// insert k, v
else {
upgrade_lock<shared_mutex> schemaLock(schemaAccess);
lock.unlock();
upgrade_to_unique_lock<shared_mutex> schemaUniqueLock(schemaLock);
valueAccess.insert(k, new shared_mutext());
m.insert(std::pair(k, v));
}
}
Value get(Key k) const
{
shared_lock<shared_mutex> lock(schemaAccess);
return m.at(k);
}
void insert(Key k, Value v)
{
upgrade_lock<shared_mutex> schemaLock(schemaAccess);
upgrade_to_unique_lock<shared_mutex> schemaUniqueLock(schemaLock);
valueAccess.insert(k, new shared_mutext());
m.insert(std::pair(k, v));
}
private:
std::map m;
std::map<Key, std::shared_ptr<shared_mutex> > valueAccess;
shared_mutex schemaAccess;
}
锁
plain lock
std::mutex m;
//获得锁
m.lock();
//释放锁
m.unlock();
lock_guard
std::mutex m;
//获得锁
std::lock_guard<std::mutex> l(m);
unique_lock
std::mutex m;
std::unique_lock<std::mutex> l(m);
shared_mutex又称读写锁,读写锁可以被多个 读者持有,只能被一个写者持有
boost::shared_mutex wr_mutex_;
//读者获得锁的方法
boost::shared_lock<wr_mutex_> read_lock_;//读锁
//写者获得锁的方法
boost::unique_lock<wr_mutex_> write_lock_;//写锁
scope_lock实际上是unique_lock
条件变量
std::mutex m;
std::condition_variable cv;
//等待信号方
unique_lock<std::mutex> l(m);
cv.wait(l);
//信号发送方
unique_lock<std::mutex> l(m);
cv.notify_one();
cv.notify_all();
生产者消费者例子
#include <iostream>
#include <thread>
#include "blockingconcurrentqueue.h"
using namespace std;
class Task
{
public:
string name;
int data;
};
class Dispatcher
{
public:
Dispatcher(shared_ptr<moodycamel::BlockingConcurrentQueue<Task>> q)
{
_q = q;
}
~Dispatcher()
{
t->join();
}
void start()
{
auto f = bind(&Dispatcher::run, this);
t = std::make_unique<thread>(f);
}
void run()
{
while(true)
{
Task task;
_q->wait_dequeue(task);
cout << task.data << endl;
if(task.data==9)
{
break;
}
}
cout << "Hello World" << endl;
std::this_thread::sleep_for(chrono::seconds(2));
}
void join()
{
t->join();
}
private:
shared_ptr<moodycamel::BlockingConcurrentQueue<Task>> _q;
shared_ptr<thread> t;
};
class Producer
{
public:
Producer(shared_ptr<moodycamel::BlockingConcurrentQueue<Task>> q):_q(q){}
~Producer()
{
t->join();
}
void start()
{
auto f = bind(&Producer::run, this);
t = make_shared<thread>(f);
}
void run()
{
for(int i=0; i<10; ++i)
{
_q->enqueue(Task{ "zhang", i });
}
}
void join()
{
t->join();
}
private:
shared_ptr<thread> t;
shared_ptr<moodycamel::BlockingConcurrentQueue<Task>> _q;
};
int main(int argc, char* argv[])
{
auto q = make_shared<moodycamel::BlockingConcurrentQueue<Task>>();
Dispatcher d(q);
Producer p(q);
p.start();
d.start();
system("pause");
return 0;
}