环形无锁队列-版本1
环形无锁队列,某些情况下又叫做 ring buffer
在底层代码的消息收发过程中一般收和发 也就是读写分开处理的,即一个读线程,一个写线程,这种类似于逻辑上的读写2线程操作,情况下的线程安全可以用环形队列来处理,达到无锁目的。这种队列在多个读写线程下,并不能保证线程安全,只适用于一个读一个写。
end=write start = read
基本思想是read write index下标表示当前读写index,2个线程只做自己的事,环形队列满后吧数据缓存在std::queue里面,然后为了保证一致性,因此在push里面对std::queue里面的内容刷新到buff里,这意味着有一个以下问题:
2个线程分别操作自己的push or pop front ,out_cache是当环形队列buff满后,存储数据的地方,因此out_cache里的数据是push dump到buff中,这样就存在一个bug,就是push次数一定要大于pop次数 才能保证队列里面的数据被被动 写入buff里面,因此只要out_cache里面有数据,始终有数据是未处理状态,尤其是程序退出时。。。
这种处理手法虽然没有了mutex,效率较高,但是带来了一个新问题,就是由于没有了mutex condition等机制,导致线程很容易就空转(CPU空跑)了,比如在队列是空的情况下read线程会满载做无用功,为了尽量利用起来,可以吧一些活交给他们去做,比如encode decode等操作,或者加上Sleep,当然加了sleep在极端情况下就没什么意义了(无锁带来的好处就不是好处了)。。。
在这里,write和read相同时,即表示满也表示空(他们都表示下一个写入读取的index),在这个时候他们只想的buff是2个线程共享的,如果在这里是保证了安全,那么整个队列就是安全的,因为其他情况下read和write根本没有交集,当他们都在读写同一个index的时候,read线程先取走,再把标识置为false,write线程是先把内容写入完后再置true,这个读写和置false true的顺序一定不能乱否则就会标记位准备好了 但是数据没有准备好的BUG, 。。,在这里要反思一下为什么在这里的使用情况下这种操作是安全的,传统多线程读多线程写是不安全的,在传统下的不安全,指的是读到了旧数据,或者同时都在写入数据容易导致数据错误,或者写到一半就被读走了等,在这里的应用都是一个线程读一个线程写,通过bool的控制,避免上述同时写入读出的情况(mov对bool赋值操作是不是原子的无所谓,因为在爆满或者空情况下读写才会有先后顺序,其余情况下都可以同时读写,上述数据写入和标识bool的先后操作保证的,他们不能对bool交叉写不同的数据,read只能写false,write只能写true)。
内存安全性:如果接受无限写入,极端情况下容易导致out_cache爆满。。。。
以下代码中包含了自检代码,注释的verify,,,如果运行中自检不过,程序会退出,反之就是安全性上是没有BUG
#include <fstream> #include <iostream> #include <vector> #include <thread> #include <mutex> #include <iostream> #include <windows.h> #include <atomic> #include <memory> #include <queue> using namespace std; struct Data { int id = 3; }; using QueuePair11 = std::pair<volatile std::atomic<bool>, void*>; using QueuePair = std::pair<volatile bool, void*>; #define MAX_SIZE 1024 #define MAX_OUT_CACHE_SIZE 1024000 class Queue { public: void* front() { if (buff[read].first) { return buff[read].second; } return nullptr; } void pop() { buff[read].first = false; read = ++read %MAX_SIZE; } bool push(void* value) { bool hasDumpFull = false; hasDumpFull = dump(); if (hasDumpFull == false && buff[write].first == false) { buff[write].second = value; buff[write].first = true; write = ++write % MAX_SIZE; } else { if (out_cache.size() > MAX_OUT_CACHE_SIZE)return false; out_cache.push(value); } return true; } //return is dump full ? bool dump() { while (out_cache.empty() == false) { if (buff[write].first) {// 写满为止 return true; } auto &v = out_cache.front(); out_cache.pop(); buff[write].second = v; buff[write].first = true; write = ++write%MAX_SIZE; } return false; } QueuePair buff[MAX_SIZE]; int read = 0; int write = 0; std::queue<void*> out_cache; }; int main(int argc, char* argv[]) { Queue _queue; /*int ss = 1024; int t = 1020; cout << (t&(ss - 1)); */ std::thread t_write([&]() { for (int i = 0;; i++) { if (i > 0xffff) { i = 0; } auto v = new Data(); (v->id) = i; while (!_queue.push(v)){ Sleep(0); }; } }); t_write.detach(); std::thread t_read([&]() { int last = -1; while (true) { auto v = (Data*)_queue.front(); if (v == nullptr) { continue; } cout << v->id << endl; if (v->id >= 0xffff) { last = -1; v->id = 0xffffff; delete v; _queue.pop(); continue;; } if (*(int*)v - last != 1) {// verify exit(0); } last = v->id; v->id = 0xffffff; delete v; _queue.pop(); } }); t_read.detach(); while (true) { Sleep(1); } system("pause"); return 0; }
实测自检两个多小时程序无退出,没测出BUG。
在实现上述的时候一个BUG导致的思考,上述代码的dump换成以下这个版本会出下下图这个诡异的BUG。
bool dump33() { bool hasWrite = false; while (out_cache.empty() == false) { if (buff[write].first) { return hasWrite; } auto & v = out_cache.front(); out_cache.pop(); hasWrite = true; buff[write].second = v; buff[write].first = true; write = ++write%MAX_SIZE; } return hasWrite; }
![](https://static.oschina.net/uploads/space/2017/1007/230056_kfMZ_1391394.png)
这个BUG表象是顺序错了,但是相对顺序没错,即48170按照顺序的话到48179刚好是48179.。
原因是当返回false的时候表示队列满,无法写入,并且dump操作未写入,这个时候会触发push的第二个条件判定即 && buff[write].first == false , 由于当前write指向下一个,某些情况下会写入成功(刚好在这个时候被read走了 被置为false) 那么当前数据会被立刻写入buff,而不是队列,导致这样的BUG顺序。。因为正常情况下新push的数据 肯定是在out_cache里面所有数据之后,而在这个BUG代码处理下会被提前写入。。