1. 程式人生 > >簡單無鎖佇列的實現和使用

簡單無鎖佇列的實現和使用

無鎖佇列越來越流行,在特定的場合使用不同的無鎖佇列,可以起到節省鎖開銷,提高程式效率。

Linux核心中有無鎖佇列的實現,可謂簡潔而不簡單。核心判斷部分利用了整數溢位機制,這個有很多文章專門介紹,我們就不詳細講了。

裡面註釋很詳細,直接來kfifo的原始碼,大家看原始碼註釋應該就可以理解了。原始碼是linux上實現的,為了跨平臺,增加了其他平臺下的實現。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#define __u32 unsigned long
#define __u64 unsigned long long

#if defined(__GNUC__)
#define min(x,y) ({ 		\
        typeof(x) _x = (x); \
        typeof(y) _y = (y);	\
        (void) (&_x == &_y);\
        _x < _y ? _x : _y; })

#define max(x,y) ({ 		\
        typeof(x) _x = (x);	\
        typeof(y) _y = (y);	\
        (void) (&_x == &_y);\
        _x > _y ? _x : _y; })

#else
#define max(a,b)            (((a) > (b)) ? (a) : (b))
#define min(a,b)            (((a) < (b)) ? (a) : (b))
#endif

#define MAX_KFIFO_SIZE 0x1000

struct kfifo { 
    unsigned char *buffer; /* the buffer holding the data */ 
    unsigned int size; /* the size of the allocated buffer */ 
    unsigned int in; /* data is added at offset (in % size) */ 
    unsigned int out; /* data is extracted from off. (out % size) */ 
};

/**
 * fls - find last bit set
 * @x: the word to search
 *
 * This is defined the same way as ffs:
 * - return 32..1 to indicate bit 31..0 most significant bit set
 * - return 0 to indicate no bits set
 */
#if defined(__GNUC__)
static inline int fls(int x)
{
    int r;

    __asm__("bsrl %1,%0\n\t"
            "jnz 1f\n\t"
            "movl $-1,%0\n"
            "1:" : "=r" (r) : "rm" (x));
    return r+1;
}
#else
static inline int fls(int x)
{
	int position;
	int i;
	if(0 != x)
	{
		for (i = (x >> 1), position = 0; i != 0; ++position)
			i >>= 1;
	}
	else
	{
		position = -1;
	}		
	return position+1;
}
#endif
/**
 * fls64 - find last bit set in a 64-bit value
 * @n: the value to search
 *
 * This is defined the same way as ffs:
 * - return 64..1 to indicate bit 63..0 most significant bit set
 * - return 0 to indicate no bits set
 */
static inline int fls64(__u64 x)
{
    __u32 h = x >> 32;
    if (h)
        return fls(h) + 32;
    return fls(x);
}

static inline unsigned fls_long(unsigned long l)
{
    if (sizeof(l) == 4)
        return fls(l);
    return fls64(l);
}

static inline unsigned long roundup_pow_of_two(unsigned long x)
{
    return 1UL << fls_long(x - 1);
}

/**
 * * kfifo_alloc - allocates a new FIFO and its internal buffer
 * * @size: the size of the internal buffer to be allocated.
 * * @gfp_mask: get_free_pages mask, passed to kmalloc()
 * * @lock: the lock to be used to protect the fifo buffer
 * *
 * * The size will be rounded-up to a power of 2.
 * */
struct kfifo *kfifo_alloc(unsigned int size)   
{   
    unsigned char *buffer;   
    struct kfifo *fifo;   
  
    /*  
     *       * round up to the next power of 2, since our 'let the indices  
     *            * wrap' tachnique works only in this case.  
     *                 */   
    if (size & (size - 1)) {   
            if(size > 0x80000000);
				return NULL;
            size = roundup_pow_of_two(size);            
        }
  
    buffer = (unsigned char *)malloc(size);   
    if (!buffer)   
        return NULL;   
  
    fifo = (struct kfifo*)malloc(sizeof(struct kfifo));   
  
    if (!fifo)   
    {
         free(buffer);
         return NULL;
    }

    fifo->buffer = buffer;
    fifo->size = size;
    fifo->in = fifo->out = 0;
  
    return fifo;   
} 

/**
 * * kfifo_free - frees the FIFO
 * * @fifo: the fifo to be freed.
 * */
void kfifo_free(struct kfifo *fifo)
{
    free(fifo->buffer);
    free(fifo);
}

/**
* __kfifo_put - puts some data into the FIFO, no locking version
* @fifo: the fifo to be used.
* @buffer: the data to be added.
* @len: the length of the data to be added.
*
* This function copies at most @len bytes from the @buffer into
* the FIFO depending on the free space, and returns the number of
* bytes copied.
*
* Note that with only one concurrent reader and one concurrent
* writer, you don't need extra locking to use these functions.
*/
unsigned int __kfifo_put(struct kfifo *fifo,
                        const unsigned char *buffer, unsigned int len)
{
        unsigned int l;

        len = min(len, fifo->size - fifo->in + fifo->out);

        /* first put the data starting from fifo->in to buffer end */
        l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));
        memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);

        /* then put the rest (if any) at the beginning of the buffer */
        memcpy(fifo->buffer, buffer + l, len - l);

        fifo->in += len;

        return len;
}

/**
* __kfifo_get - gets some data from the FIFO, no locking version
* @fifo: the fifo to be used.
* @buffer: where the data must be copied.
* @len: the size of the destination buffer.
*
* This function copies at most @len bytes from the FIFO into the
* @buffer and returns the number of copied bytes.
*
* Note that with only one concurrent reader and one concurrent
* writer, you don't need extra locking to use these functions.
*/
unsigned int __kfifo_get(struct kfifo *fifo,
                         unsigned char *buffer, unsigned int len)
{
        unsigned int l;

        len = min(len, fifo->in - fifo->out);

        /* first get the data from fifo->out until the end of the buffer */
        l = min(len, fifo->size - (fifo->out & (fifo->size - 1)));
        memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l);

        /* then get the rest (if any) from the beginning of the buffer */
        memcpy(buffer + l, fifo->buffer, len - l);

        fifo->out += len;

        return len;
}

/**
* __kfifo_reset - removes the entire FIFO contents, no locking version
* @fifo: the fifo to be emptied.
*/
static inline void __kfifo_reset(struct kfifo *fifo)
{
        fifo->in = fifo->out = 0;
}

/**
* __kfifo_len - returns the number of bytes available in the FIFO, no locking version
* @fifo: the fifo to be used.
*/
static inline unsigned int __kfifo_len(struct kfifo *fifo)
{
        return fifo->in - fifo->out;
}
使用的部分寫了一個類,採用了模板封裝,提供了模板型別存取的使用方法。
template <typename T>
class zFifo
{
    private:
        kfifo* _kfifo;
    public:
        zFifo()
        {
            _kfifo = kfifo_alloc(MAX_KFIFO_SIZE);
        }
        ~zFifo()
        {
            if(NULL != _kfifo)
              kfifo_free(_kfifo);
        }
        bool push(T data);
        T get();
};

template <typename T>
bool zFifo<T>::push(T data)
{
    int len = 0;
    len = __kfifo_put(_kfifo, (const unsigned char *)&data, sizeof(T));
    if(len > 0)
        return true;
    else
        return false;
}

template <typename T>
T zFifo<T>::get()
{
    T data;
    int len = __kfifo_get(_kfifo, (unsigned char *)&data, sizeof(T));
    if(len > 0)
        return data;
    else
        return NULL;
}

這種庫一般都用在需要高效處理的地方,為了減少記憶體拷貝,一般都使用指標的形式操作。一個簡單的使用例子:

int main()
{
    zFifo<int*> zf;
    int a = 1;
    zf.push(&a);
    printf("a=%d\n", &a);
    int* b = NULL;
    b = zf.get();
    printf("b=%d\n", b);

    return 0;
}
需要注意的地方:

1.只有一個執行緒負責讀,另一個執行緒負責寫的時候,資料是執行緒安全的。上面的實現是基於這個原理實現的,當有多個執行緒讀或者多個執行緒寫的時候,不保證資料的正確性。
所以使用的時候,一個執行緒寫,一個執行緒讀。網路應用中比較常用,就是開一個執行緒介面資料,然後把資料寫入佇列。然後開一個排程執行緒讀取網路資料,然後分發到處理執行緒。

2.資料長度預設巨集定義了一個長度,超過這個長度的時候,後續的資料會寫入失敗。