1. 程式人生 > >無鎖佇列--基於linuxkfifo實現

無鎖佇列--基於linuxkfifo實現

一直想寫個無鎖的佇列,來提高專案後臺的效率。

偶然看到linux核心的kfifo.h 實現原理。於是自己仿照了這個實現,目前linux應該是可以對外提供介面了。

#ifndef _NO_LOCK_QUEUE_H_
#define _NO_LOCK_QUEUE_H_

#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <string>
#include <pthread.h>
#include <iostream>
using namespace  std;

#ifndef max
#define max(x, y) ({				\
	typeof(x) _max1 = (x);			\
	typeof(y) _max2 = (y);			\
	(void) (&_max1 == &_max2);		\
	_max1 > _max2 ? _max1 : _max2; })
#endif

#ifndef min
#define min(x, y) ({				\
	typeof(x) _min1 = (x);			\
	typeof(y) _min2 = (y);			\
	(void) (&_min1 == &_min2);		\
	_min1 < _min2 ? _min1 : _min2; })
#endif

class Kfifo 
{
public:
	Kfifo(unsigned int  isize);
	~Kfifo();
	unsigned int get(unsigned char *buffer, unsigned int len);
	unsigned int put(const unsigned char *buffer, unsigned int len);
	static unsigned long roundup_power_of_two(unsigned long val);
private:
	inline bool is_power_of_2(unsigned long n)
	{
		return (n != 0 && ((n & (n - 1)) == 0)); 
	};
	inline unsigned int unused()
	{
		return (mask + 1) - (in - out);
	}
	
private:
	unsigned int size;
	unsigned int in; 
	unsigned int out;
	unsigned int	mask;
	unsigned char* buffer;
};

struct proto
{
	unsigned int msgid;
	unsigned int cmd;
	unsigned int info;
	proto():msgid(0),cmd(0),info(0){}
};

#endif

實現檔案
#include "MKfifo.h"

Kfifo::~Kfifo()
{
	if (buffer) free(buffer);	
	size = in = out=0;
}
unsigned long Kfifo::roundup_power_of_two(unsigned long val)
{
	if (val & (val-1) == 0)
	{
		return val;
	}
	unsigned long maxulong = (unsigned long )((unsigned long ) ~0);
	unsigned long andv = ~(maxulong&(maxulong>>1));
	while((andv & val) == 0)
	andv = andv>>1;
	return andv<<1;
}
Kfifo::Kfifo(unsigned int isize):size(isize),in(0),out(0),mask(size - 1)
{
	if (!is_power_of_2(isize))
	{
		size = roundup_power_of_two(isize);
	}
	buffer =(unsigned char*) malloc(isize);
}

unsigned int Kfifo::get(unsigned char *_buffer, unsigned int len)
{
	unsigned int l;
	len = min(len, in - out);
	__sync_synchronize();

	l = min(len,size -(out&(size-1)));
	memcpy(_buffer,buffer + (out& (size-1)),l);
	memcpy(_buffer + l,buffer,len - l);

	 __sync_synchronize();

	 out +=len;
	 return len;

}
unsigned int Kfifo::put(const unsigned char *_buffer, unsigned int len)
{
	unsigned int l;
	len = min(len, size - in + out);
	 __sync_synchronize();
	 l = min(len, size - (in & (size - 1)));
	 memcpy(buffer + (in & (size - 1)), _buffer, l);
	 memcpy(buffer, _buffer + l, len - l);
	  __sync_synchronize();
	   in += len;
	   return len;

}

void * consumer(void * arg)
{
	printf("consumer\n");
	Kfifo* fifo = (Kfifo*) arg;
	if (!fifo)
	{
		return NULL;
	}

	for (;;)
	{
		proto p;
		unsigned int len = fifo->get((unsigned char*)&p,sizeof(p));
		if (len>0)
		{
			cout << "~~~~~~~~~~~~~~~~~~~~"<<endl;
			cout << "consumer proto msg id :"<<p.msgid<<endl;
			cout << "consumer proto msg cmd :"<<p.cmd<<endl;
			cout << "consumer proto msg info :"<<p.info<<endl;
			cout << "~~~~~~~~~~~~~~~~~~~~"<<endl;
		}
	}

	
	return (void *)fifo;
}
void* producer(void* args)
{
	Kfifo* fifo = (Kfifo*) args;
	
	if (!fifo)
	{
		return NULL;
	}
	unsigned int i=0;
	for (;;)
	{
		proto p;
		p.msgid = i++;
		p.cmd =  333;
		p.info = 44444;
		fifo->put((const unsigned char*)&p,sizeof(p));
		cout<<"producer put msgid :"<<p.msgid<<endl;
	}
	return (void*)fifo;
}
int main()
{
	Kfifo *fifo = new Kfifo(1024);
	pthread_t consumeid,producerid;
	pthread_create(&producerid,NULL,producer,(void*)fifo);
	pthread_create(&consumeid,NULL,consumer,(void*)fifo);

	printf("info!!\n");
	pthread_join(consumeid,NULL);
	pthread_join(producerid,NULL);
	return 0;
}

經過測試是可用的,我將在該佇列的基礎上,豐富其使用。


相關推薦

佇列--基於linuxkfifo實現

一直想寫個無鎖的佇列,來提高專案後臺的效率。 偶然看到linux核心的kfifo.h 實現原理。於是自己仿照了這個實現,目前linux應該是可以對外提供介面了。 #ifndef _NO_LOCK_QUEUE_H_ #define _NO_LOCK_QUEUE_H_ #i

Go語言佇列元件的實現 (chan/interface/select)

1. 背景 go程式碼中要實現非同步很簡單,go funcName()。 但是程序需要控制協程數量在合理範圍內,對應大批量任務可以使用“協程池 + 無鎖佇列”實現。 2. golang無鎖佇列實現思路 Channel是Go中的一個核心型別,你可以把它看成一個管道,通過它併發核心單元就可以傳送或者接

基於陣列的佇列(譯)

原文 1 引言 最近對於注重效能的應用程式,我們有了一種能顯著提高程式效能的選擇:多執行緒.執行緒的概念實際上已經存在了很長時間.在過去,多數計算機只有一個處理器,執行緒主要用於將一個大的任務拆分成一系列更小的執行單元.以使得當其中某些執行單元因為等待資源而被阻塞的時候剩餘的執行單元能繼續執行。舉個示例,一個

lockFreeQueue 佇列實現與總結

無鎖佇列 介紹   在工程上,為了解決兩個處理器互動速度不一致的問題,我們使用佇列作為快取,生產者將資料放入佇列,消費者從佇列中取出資料。這個時候就會出現四種情況,單生產者單消費者,多生產者單消費者,單生成者多消費者,多生產者多消費者。我們知道,多執行緒往往會帶來資料不一致的情況,一般需要靠加鎖解決問題。

多執行緒佇列實現

一、什麼是多執行緒無鎖佇列? 多執行緒無鎖佇列還是有鎖的,只不過是用了cpu層面的CAS原子操作,用到這個操作,只需要在取佇列元素和新增佇列元素的時候利用CAS原子操作,就可以保證多個執行緒對佇列元素的有序存取; 二、什麼是CAS操作? CAS = Compare &am

一個用 C++ 實現的快速佇列

在程序間傳遞資料很煩人,真心煩人。一步做錯,資料就會損壞。(相較於其他讀寫方式)即使資料排列正確,也更易出錯。 一如既往的,有兩種方式處理這個問題:簡單的方式、麻煩的方式。 簡單的方式 使用你使用平臺提供的鎖(互斥、臨界區域,或等效)。這從概念上不難理解,使用上更簡單。你無需擔心排列問題,庫

佇列的原理與實現

最近幾天在思考無鎖佇列,看了相關文章,也讀了一些部落格,最後寫了一份程式碼,程式碼實現對在多執行緒環境下對佇列的讀和寫是不需要加鎖的。程式碼如下所示: #include <windows.h> #pragma comment(lib, "Kernel32.li

一讀一寫佇列c++實現

限制一個執行緒讀,一個執行緒寫,不加鎖的佇列,使用單鏈表實現,測試環境:centos 5.9  [[email protected] test]# cat  test.cpp     #include <iostream> #include <

單生產者,單消費者佇列實現(c)

根據上面連結所說的原理實現的單生產者,單消費者無鎖佇列 bool __sync_bool_compare_and_swap (type *ptr, type oldval,type newval, ...) 函式提供原子的比較和交換,如果*ptr == oldval

佇列實現

耗子叔曾經寫過一篇同名的部落格,主要參考了John D. Valois 1994年10月在拉斯維加斯的並行和分散式系統國際大會上的一篇論文——《Implementing Lock-Free Queues》。但是從目前現狀來看,這篇論文中提到的演算法是有問題的,並

簡單佇列實現和使用

無鎖佇列越來越流行,在特定的場合使用不同的無鎖佇列,可以起到節省鎖開銷,提高程式效率。 Linux核心中有無鎖佇列的實現,可謂簡潔而不簡單。核心判斷部分利用了整數溢位機制,這個有很多文章專門介紹,我們就不詳細講了。 裡面註釋很詳細,直接來kfifo的原始碼,大家看原始碼註釋

【DPDK】【ring】從DPDK的ring來看x86佇列實現

【前言】   佇列是眾多資料結構中最常見的一種之一。曾經有人和我說過這麼一句話,叫做“程式等於資料結構+演算法”。因此在設計模組、寫程式碼時,佇列常常作為一個很常見的結構出現在模組設計中。DPDK不僅是一個加速網路IO的框架,其內部還提供眾多的功能元件,rte_ring就是DPDK內部提供的一種無鎖佇列,本篇

隊列的實現

fun del isempty data 測試 ++ 末尾 class pop 鎖是高性能程序的殺手,但是為了保證數據的一致性,在多線程的應用環境下又不得不加鎖。但是在某些特殊的場景下, 是可以通過優化數據結構來達到無鎖的目的。那麽我們就來看一下如何實現一個無鎖隊列。 隊

Disruptor佇列淺析

    近期在看作業系統相關資料的時候,閱讀到“訊號量與PV操作”,主要分三塊:互斥控制,同步控制,生產者與消費者問題。因為我日常與伺服器及訊息佇列打交道較多,對生產者與消費者問題比較感興趣,正好之前曾經研究過“Disruptor無鎖佇列”的實現原理,正好再結合PV操作重新

Boost佇列

在開發接收轉發agent時,採用了多執行緒的生產者-消費者模式,用了加互斥鎖的方式來實現執行緒同步。互斥鎖會阻塞執行緒,所以壓測時,效率並不高。所以想起用無鎖佇列來實現,效能確實提升了。 首先介紹下lock-free和wait-free的區別: 阻塞演算法可

深入理解dpdk rte_ring佇列

同樣用面向物件的思想來理解無鎖佇列ring。dpdk的無鎖佇列ring是借鑑了linux核心kfifo無鎖佇列。ring的實質是FIFO的環形佇列。 ring的特點: 無鎖出入隊(除了cas(compare and swap)操作)多消費/生產者同時出入隊 使用方法: 1.建立一個ring物件。 介面:s

c語言資料結構應用-陣列佇列佇列)在多執行緒中的使用

一、背景 上篇文章《c語言資料結構實現-陣列佇列/環形佇列》講述了陣列佇列的原理與實現,本文編寫一個雙執行緒進行速度測試 二、相關知識 多執行緒程式設計介面: 1) 建立執行緒 pthread_create 函式 SYNOPSIS #include <

evpp效能測試(3): 對佇列boost::lockfree::queue和moodycamel::ConcurrentQueue做一個性能對比測試

Brief 我們使用https://github.com/Qihoo360/evpp專案中的EventLoop::QueueInLoop(...)函式來做這個效能測試。我們通過該函式能夠將一個仿函式執行體從一個執行緒排程到另一個執行緒中執行。這是一個典

併發佇列學習(單生產者單消費者模型)

1、引言 本文介紹單生產者單消費者模型的佇列。根據寫入佇列的內容是定長還是變長,分為單生產者單消費者定長佇列和單生產者單消費者變長佇列兩種。單生產者單消費者模型的佇列操作過程是不需要進行加鎖的。生產者通過寫索引控制入隊操作,消費者通過讀索引控制出佇列操作。二者

C++程式設計資料,佇列

1. Lamport's Lock-Free Ring Buffer        [Lamport, Comm. of ACM, 1977]      也就常說的單生產者-單消費者 的ringbuffer, 限制就是隻能一個讀執行緒(消費者),一個寫程序(生產者)。