同步队列

参考连接:https://github.com/cameron314/concurrentqueue

在项目引入concurrentqueue.h和blockingconcurrentqueue.h即可使用同步队列

moodycamel::BlockingConcurrentQueue<int> q;

一个简化版的同步队列

//
// Copyright (c) 2013 Juan Palacios [email protected]
// Subject to the BSD 2-Clause License
// - see < http://opensource.org/licenses/BSD-2-Clause>
//

#ifndef CONCURRENT_QUEUE_
#define CONCURRENT_QUEUE_

#include <queue>
#include <thread>
#include <mutex>
#include <condition_variable>

template <typename T>
class Queue
{
 public:

  T pop() 
  {
    std::unique_lock<std::mutex> mlock(mutex_);
    while (queue_.empty())
    {
      cond_.wait(mlock);
    }
    auto val = queue_.front();
    queue_.pop();
    return val;
  }

  void pop(T& item)
  {
    std::unique_lock<std::mutex> mlock(mutex_);
    while (queue_.empty())
    {
      cond_.wait(mlock);
    }
    item = queue_.front();
    queue_.pop();
  }

  void push(const T& item)
  {
    std::unique_lock<std::mutex> mlock(mutex_);
    queue_.push(item);
    mlock.unlock();
    cond_.notify_one();
  }
  Queue()=default;
  Queue(const Queue&) = delete;            // disable copying
  Queue& operator=(const Queue&) = delete; // disable assignment

 private:
  std::queue<T> queue_;
  std::mutex mutex_;
  std::condition_variable cond_;
};

#endif

同步HASHMAP

#ifndef CONCURRENT_MAP
#define CONCURRENT_MAP

#include <boost/thread.hpp>
#include <map>

template<typename Key, typename Value>
class ConcurrentMap
{
    typedef boost::shared_lock shared_lock;
    typedef boost::shared_mutex shared_mutex;

public:
    ConcurrentMap();
    bool has(Key k) const
    {
        shared_lock<shared_mutex> lock(schemaAccess);
        return m.find(k) == m.end();
    }

    void erase(Key k)
    {
        upgrade_lock<shared_mutex> schemaLock(schemaAccess);
        upgrade_to_unique_lock<shared_mutex> schemaUniqueLock(schemaLock);

        valueAccess.erase(k);
        m.erase(k);
    }


    void set(Key k, Value v)
    {
        shared_lock<shared_mutex> lock(schemaAccess);

        // set k, v
        if(m.find(k) == m.end()) {

            upgrade_lock<shared_mutex> valueLock(*valueAccess[k]);
            upgrade_to_unique_lock<shared_mutex> valueUniqueLock(valueLock);

            m.at(k) = v;

        }
        // insert k, v
        else {
            upgrade_lock<shared_mutex> schemaLock(schemaAccess);
            lock.unlock();
            upgrade_to_unique_lock<shared_mutex> schemaUniqueLock(schemaLock);

            valueAccess.insert(k, new shared_mutext());
            m.insert(std::pair(k, v));
        }
    }


    Value get(Key k) const
    {
        shared_lock<shared_mutex> lock(schemaAccess);
        return m.at(k);
    }


    void insert(Key k, Value v)
    {
        upgrade_lock<shared_mutex> schemaLock(schemaAccess);
        upgrade_to_unique_lock<shared_mutex> schemaUniqueLock(schemaLock);

        valueAccess.insert(k, new shared_mutext());
        m.insert(std::pair(k, v));
    }


private:
    std::map m;

    std::map<Key, std::shared_ptr<shared_mutex> > valueAccess;
    shared_mutex schemaAccess;
}

plain lock

std::mutex m;

//获得锁
m.lock();
//释放锁
m.unlock();

lock_guard

std::mutex m;

//获得锁
std::lock_guard<std::mutex> l(m);

unique_lock

std::mutex m;

std::unique_lock<std::mutex> l(m);

shared_mutex又称读写锁,读写锁可以被多个 读者持有,只能被一个写者持有

boost::shared_mutex wr_mutex_;

//读者获得锁的方法
boost::shared_lock<wr_mutex_> read_lock_;//读锁
//写者获得锁的方法
boost::unique_lock<wr_mutex_> write_lock_;//写锁

scope_lock实际上是unique_lock

条件变量

std::mutex m;
std::condition_variable cv;

//等待信号方
unique_lock<std::mutex> l(m);
cv.wait(l);

//信号发送方
unique_lock<std::mutex> l(m);
cv.notify_one();
cv.notify_all();

生产者消费者例子

#include <iostream>
#include <thread>
#include "blockingconcurrentqueue.h"
using namespace std;

class Task
{
public:
    string name;
    int data;
};
class Dispatcher
{
public:
    Dispatcher(shared_ptr<moodycamel::BlockingConcurrentQueue<Task>> q)
    {
        _q = q;
    }
    ~Dispatcher()
    {
        t->join();
    }
    void start()
    {
        auto f = bind(&Dispatcher::run, this);
        t = std::make_unique<thread>(f);
    }
    void run()
    {
        while(true)
        {
            Task task;
            _q->wait_dequeue(task);
            cout << task.data << endl;
            if(task.data==9)
            {
                break;
            }
        }
        cout << "Hello World" << endl;
        std::this_thread::sleep_for(chrono::seconds(2));
    }
    void join()
    {
        t->join();
    }
private:
    shared_ptr<moodycamel::BlockingConcurrentQueue<Task>> _q;
    shared_ptr<thread> t;
};

class Producer
{
public:
    Producer(shared_ptr<moodycamel::BlockingConcurrentQueue<Task>> q):_q(q){}
    ~Producer()
    {
        t->join();
    }

    void start()
    {
        auto f = bind(&Producer::run, this);
        t = make_shared<thread>(f);
    }
    void run()
    {
        for(int i=0; i<10; ++i)
        {
            _q->enqueue(Task{ "zhang", i });
        }
    }

    void join()
    {
        t->join();
    }
private:
    shared_ptr<thread> t;
    shared_ptr<moodycamel::BlockingConcurrentQueue<Task>> _q;
};
int main(int argc, char* argv[])
{
    auto q = make_shared<moodycamel::BlockingConcurrentQueue<Task>>();
    Dispatcher d(q);
    Producer p(q);
    p.start();
    d.start();
    system("pause");
    return 0;
}

results matching ""

    No results matching ""