讀寫(Read-Write)鎖實現
大部分情況下,使用一個數據結構時並不會對其進行修改。而是隻需要一個區段的讀取許可權來完成工作。如果有多個執行緒需要讀取某一個數據,沒有理由不讓它們併發的進行讀取。Spinlock 鎖無法區分只有讀以及讀寫混合的場景,因為 spinlock 鎖無法滿足這種潛在的並行操作。為了實現該並行操作,我們需要讀寫鎖。
typedef struct dumbrwlock dumbrwlock;
struct dumbrwlock
{
spinlock lock;
unsigned readers;
};
static void dumb_wrlock(dumbrwlock *l)
{
/* 獲取寫鎖,獲得後新的讀請求等待 */
spin_lock(&l->lock);
/* 等待之前的讀請求完成 */
while (l->readers) cpu_relax();
}
static void dumb_wrunlock(dumbrwlock *l)
{
spin_unlock(&l->lock);
}
static int dumb_wrtrylock(dumbrwlock *l)
{
/* Want no readers */
if (l->readers) return EBUSY;
/* Try to get write lock */
if (spin_trylock(&l->lock)) return EBUSY;
if (l->readers)
{
/* Oops, a reader started */
spin_unlock(&l->lock);
return EBUSY;
}
/* Success! */
return 0;
}
static void dumb_rdlock(dumbrwlock *l)
{
while (1)
{
/* 增一獲取讀鎖 */
atomic_inc(&l->readers);
/* Success? */
if (!l->lock) return;
/* 有寫鎖加鎖,解鎖讀鎖 */
atomic_dec(&l->readers);
while (l->lock) cpu_relax();
}
}
static void dumb_rdunlock(dumbrwlock *l)
{
atomic_dec(&l->readers);
}
static int dumb_rdtrylock(dumbrwlock *l)
{
/* Speculatively take read lock */
atomic_inc(&l->readers);
/* Success? */
if (!l->lock) return 0;
/* Failure - undo */
atomic_dec(&l->readers);
return EBUSY;
}
static int dumb_rdupgradelock(dumbrwlock *l)
{
/* 升級為寫鎖 */
if (spin_trylock(&l->lock)) return EBUSY;
/* I'm no longer a reader */
atomic_dec(&l->readers);
/* Wait for all other readers to finish */
while (l->readers) cpu_relax();
return 0;
}
作為評價以上程式碼的標準,比起 spinlock 我們需要一些額外的資訊。讀請求的比例是一個很重要的因素。讀請求越多,我們應該能夠併發更多的執行緒,程式碼的速度也應該更快。讀請求與寫請求的隨機分佈也很重要,像真實的讀寫場景一樣。因此我們使用了一個併發隨機數生成器,通過在一個位元組(256位)中隨機的選擇 1,25,128或者250個位,我們能夠模擬從大部分請求為讀請求到大部分請求為寫請求的場景。最後,我們需要觀測對於競爭的處理效果。通常情況下,在競爭激烈的場景寫鎖更容易被用到,所以我們只檢視執行緒數等於處理器核數的場景。
以上 dumb 鎖演算法在沒有競爭的場景表現的非常差,如果只使用一個執行緒我們有結果:
Writers per 256 | 1 | 25 | 128 | 250 |
---|---|---|---|---|
Time(s) | 3.7 | 3.8 | 4.6 | 5.4 |
如同預期,當寫請求比例上升時,讀寫鎖的表現趨近與 spinlock 鎖的表現。儘管有競爭,dumb 鎖演算法實際上表現的非常好,在 4 個執行緒的情況下:
Writers per 256 | 1 | 25 | 128 | 250 |
---|---|---|---|---|
Time(s) | 1.1 | 1.9 | 4.4 | 5.7 |
一個很明顯的改進方法是用 ticketlock 演算法來替換很慢的 spinlock 鎖。我們有:
typedef struct dumbtrwlock dumbtrwlock;
struct dumbtrwlock
{
ticketlock lock;
unsigned readers;
};
static void dumbt_wrlock(dumbtrwlock *l)
{
/* Get lock */
ticket_lock(&l->lock);
/* Wait for readers to finish */
while (l->readers) cpu_relax();
}
static void dumbt_wrunlock(dumbtrwlock *l)
{
ticket_unlock(&l->lock);
}
static int dumbt_wrtrylock(dumbtrwlock *l)
{
/* Want no readers */
if (l->readers) return EBUSY;
/* Try to get write lock */
if (ticket_trylock(&l->lock)) return EBUSY;
if (l->readers)
{
/* Oops, a reader started */
ticket_unlock(&l->lock);
return EBUSY;
}
/* Success! */
return 0;
}
static void dumbt_rdlock(dumbtrwlock *l)
{
while (1)
{
/* Success? */
if (ticket_lockable(&l->lock))
{
/* Speculatively take read lock */
atomic_inc(&l->readers);
/* Success? */
if (ticket_lockable(&l->lock)) return;
/* Failure - undo, and wait until we can try again */
atomic_dec(&l->readers);
}
while (!ticket_lockable(&l->lock)) cpu_relax();
}
}
static void dumbt_rdunlock(dumbtrwlock *l)
{
atomic_dec(&l->readers);
}
static int dumbt_rdtrylock(dumbtrwlock *l)
{
/* Speculatively take read lock */
atomic_inc(&l->readers);
/* Success? */
if (ticket_lockable(&l->lock)) return 0;
/* Failure - undo */
atomic_dec(&l->readers);
return EBUSY;
}
static int dumbt_rdupgradelock(dumbtrwlock *l)
{
/* Try to convert into a write lock */
if (ticket_trylock(&l->lock)) return EBUSY;
/* I'm no longer a reader */
atomic_dec(&l->readers);
/* Wait for all other readers to finish */
while (l->readers) cpu_relax();
return 0;
}
這個演算法在競爭激烈的場景表現的更好,在全部為寫請求時花費了 3.7s。然而在在非擁塞場景確沒有優勢:
Writers per 256 | 1 | 25 | 128 | 250 |
---|---|---|---|---|
Time(s) | 2.0 | 2.5 | 3.7 | 4.5 |
在低寫請求比例的場景該演算法更慢,在高寫請求比例場景更快。而我們使用讀寫鎖的大部分場景寫的比例都是低的,這是對該演算法不理的地方。其將會比它的競爭者慢兩倍。
為了減少衝突,來獲取速度。我們來探究下一個很複雜的演算法實現,在 Reactos 中用來模擬 Microsoft Window 的 slim read-write (SRW)鎖。其使用一個等待佇列,和一個位鎖來控制其等待佇列的處理。它設計讓等待者在不同的記憶體區段空轉來處理更多執行緒的場景。
/* Have a wait block */
#define SRWLOCK_WAIT 1
/* Users are readers */
#define SRWLOCK_SHARED 2
/* Bit-lock for editing the wait block */
#define SRWLOCK_LOCK 4
#define SRWLOCK_LOCK_BIT 2
/* Mask for the above bits */
#define SRWLOCK_MASK 7
/* Number of current users * 8 */
#define SRWLOCK_USERS 8
typedef struct srwlock srwlock;
struct srwlock
{
uintptr_t p;
};
typedef struct srw_sw srw_sw;
struct srw_sw
{
uintptr_t spin;
srw_sw *next;
};
typedef struct srw_wb srw_wb;
struct srw_wb
{
/* s_count is the number of shared acquirers * SRWLOCK_USERS. */
uintptr_t s_count;
/* Last points to the last wait block in the chain. The value
is only valid when read from the first wait block. */
srw_wb *last;
/* Next points to the next wait block in the chain. */
srw_wb *next;
/* The wake chain is only valid for shared wait blocks */
srw_sw *wake;
srw_sw *last_shared;
int ex;
};
/* Wait for control of wait block */
static srw_wb *lock_wb(srwlock *l)
{
uintptr_t p;
/* Spin on the wait block bit lock */
while (atomic_bitsetandtest(&l->p, SRWLOCK_LOCK_BIT)) cpu_relax();
p = l->p;
barrier();
if (!(p & SRWLOCK_WAIT))
{
/* Oops, looks like the wait block was removed. */
atomic_clear_bit(&l->p, SRWLOCK_LOCK_BIT);
return NULL;
}
return (srw_wb *)(p & ~SRWLOCK_MASK);
}
static void srwlock_init(srwlock *l)
{
l->p = 0;
}
static void srwlock_rdlock(srwlock *l)
{
srw_wb swblock;
srw_sw sw;
uintptr_t p;
srw_wb *wb, *shared;
while (1)
{
barrier();
p = l->p;
cpu_relax();
if (!p)
{
/* This is a fast path, we can simply try to set the shared count to 1 */
if (!cmpxchg(&l->p, 0, SRWLOCK_USERS | SRWLOCK_SHARED)) return;
continue;
}
/* Don't interfere with locking */
if (p & SRWLOCK_LOCK) continue;
if (p & SRWLOCK_SHARED)
{
if (!(p & SRWLOCK_WAIT))
{
/* This is a fast path, just increment the number of current shared locks */
if (cmpxchg(&l->p, p, p + SRWLOCK_USERS) == p) return;
}
else
{
/* There's other waiters already, lock the wait blocks and increment the shared count */
wb = lock_wb(l);
if (wb) break;
}
continue;
}
/* Initialize wait block */
swblock.ex = FALSE;
swblock.next = NULL;
swblock.last = &swblock;
swblock.wake = &sw;
sw.next = NULL;
sw.spin = 0;
if (!(p & SRWLOCK_WAIT))
{
/*
* We need to setup the first wait block.
* Currently an exclusive lock is held, change the lock to contended mode.
*/
swblock.s_count = SRWLOCK_USERS;
swblock.last_shared = &sw;
if (cmpxchg(&l->p, p, (uintptr_t)&swblock | SRWLOCK_WAIT) == p)
{
while (!sw.spin) cpu_relax();
return;
}
continue;
}
/* Handle the contended but not shared case */
/*
* There's other waiters already, lock the wait blocks and increment the shared count.
* If the last block in the chain is an exclusive lock, add another block.
*/
swblock.s_count = 0;
wb = lock_wb(l);
if (!wb) continue;
shared = wb->last;
if (shared->ex)
{
shared->next = &swblock;
wb->last = &swblock;
shared = &swblock;
}
else
{
shared->last_shared->next = &sw;
}
shared->s_count += SRWLOCK_USERS;
shared->last_shared = &sw;
/* Unlock */
barrier();
l->p &= ~SRWLOCK_LOCK;
/* Wait to be woken */
while (!sw.spin) cpu_relax();
return;
}
/* The contended and shared case */
sw.next = NULL;
sw.spin = 0;
if (wb->ex)
{
/*
* We need to setup a new wait block.
* Although we're currently in a shared lock and we're acquiring
* a shared lock, there are exclusive locks queued in between.
* We need to wait until those are released.
*/
shared = wb->last;
if (shared->ex)
{
swblock.ex = FALSE;
swblock.s_count = SRWLOCK_USERS;
swblock.next = NULL;
swblock.last = &swblock;
swblock.wake = &sw;
swblock.last_shared = &sw;
shared->next = &swblock;
wb->last = &swblock;
}
else
{
shared->last_shared->next = &sw;
shared->s_count += SRWLOCK_USERS;
shared->last_shared = &sw;
}
}
else
{
wb->last_shared->next = &sw;
wb->s_count += SRWLOCK_USERS;
wb->last_shared = &sw;
}
/* Unlock */
barrier();
l->p &= ~SRWLOCK_LOCK;
/* Wait to be woken */
while (!sw.spin) cpu_relax();
}
static void srwlock_rdunlock(srwlock *l)
{
uintptr_t p, np;
srw_wb *wb;
srw_wb *next;
while (1)
{
barrier();
p = l->p;
cpu_relax();
if (p & SRWLOCK_WAIT)
{
/*
* There's a wait block, we need to wake a pending exclusive acquirer,
* if this is the last shared release.
*/
wb = lock_wb(l);
if (wb) break;
continue;
}
/* Don't interfere with locking */
if (p & SRWLOCK_LOCK) continue;
/*
* This is a fast path, we can simply decrement the shared
* count and store the pointer
*/
np = p - SRWLOCK_USERS;
/* If we are the last reader, then the lock is unused */
if (np == SRWLOCK_SHARED) np = 0;
/* Try to release the lock */
if (cmpxchg(&l->p, p, np) == p) return;
}
wb->s_count -= SRWLOCK_USERS;
if (wb->s_count)
{
/* Unlock */
barrier();
l->p &= ~SRWLOCK_LOCK;
return;
}
next = wb->next;
if (next)
{
/*
* There's more blocks chained, we need to update the pointers
* in the next wait block and update the wait block pointer.
*/
np = (uintptr_t)next | SRWLOCK_WAIT;
next->last = wb->last;
}
else
{
/* Convert the lock to a simple exclusive lock. */
np = SRWLOCK_USERS;
}
barrier();
/* This also unlocks wb lock bit */
l->p = np;
barrier();
wb->wake = (void *) 1;
barrier();
/* We released the lock */
}
static int srwlock_rdtrylock(srwlock *s)
{
uintptr_t p = s->p;
barrier();
/* This is a fast path, we can simply try to set the shared count to 1 */
if (!p && (cmpxchg(&s->p, 0, SRWLOCK_USERS | SRWLOCK_SHARED) == 0)) return 0;
if ((p & (SRWLOCK_SHARED | SRWLOCK_WAIT)) == SRWLOCK_SHARED)
{
/* This is a fast path, just increment the number of current shared locks */
if (cmpxchg(&s->p, p, p + SRWLOCK_USERS) == p) return 0;
}
return EBUSY;
}
static void srwlock_wrlock(srwlock *l)
{
srw_wb swblock;
uintptr_t p, np;
/* Fastpath - no other readers or writers */
if (!l->p && (!cmpxchg(&l->p, 0, SRWLOCK_USERS))) return;
/* Initialize wait block */
swblock.ex = TRUE;
swblock.next = NULL;
swblock.last = &swblock;
swblock.wake = NULL;
while (1)
{
barrier();
p = l->p;
cpu_relax();
if (p & SRWLOCK_WAIT)
{
srw_wb *wb = lock_wb(l);
if (!wb) continue;
/* Complete Initialization of block */
swblock.s_count = 0;
wb->last->next = &swblock;
wb->last = &swblock;
/* Unlock */
barrier();
l->p &= ~SRWLOCK_LOCK;
/* Has our wait block became the first one in the chain? */
while (!swblock.wake) cpu_relax();
return;
}
/* Fastpath - no other readers or writers */
if (!p)
{
if (!cmpxchg(&l->p, 0, SRWLOCK_USERS)) return;
continue;
}
/* Don't interfere with locking */
if (p & SRWLOCK_LOCK) continue;
/* There are no wait blocks so far, we need to add ourselves as the first wait block. */
if (p & SRWLOCK_SHARED)
{
swblock.s_count = p & ~SRWLOCK_MASK;
np = (uintptr_t)&swblock | SRWLOCK_SHARED | SRWLOCK_WAIT;
}
else
{
swblock.s_count = 0;
np = (uintptr_t)&swblock | SRWLOCK_WAIT;
}
/* Try to make change */
if (cmpxchg(&l->p, p, np) == p) break;
}
/* Has our wait block became the first one in the chain? */
while (!swblock.wake) cpu_relax();
}
static void srwlock_wrunlock(srwlock *l)
{
uintptr_t p, np;
srw_wb *wb;
srw_wb *next;
srw_sw *wake, *wake_next;
while (1)
{
barrier();
p = l->p;
cpu_relax();
if (p == SRWLOCK_USERS)
{
/*
* This is the fast path, we can simply clear the SRWLOCK_USERS bit.
* All other bits should be 0 now because this is a simple exclusive lock,
* and no one else is waiting.
*/
if (cmpxchg(&l->p, SRWLOCK_USERS, 0) == SRWLOCK_USERS) return;
continue;
}
/* There's a wait block, we need to wake the next pending acquirer */
wb = lock_wb(l);
if (wb) break;
}
next = wb->next;
if (next)
{
/*
* There's more blocks chained, we need to update the pointers
* in the next wait block and update the wait block pointer.
*/
np = (uintptr_t)next | SRWLOCK_WAIT;
if (!wb->ex)
{
/* Save the shared count */
next->s_count = wb->s_count;
np |= SRWLOCK_SHARED;
}
next->last = wb->last;
}
else
{
/* Convert the lock to a simple lock. */
if (wb->ex)
{
np = SRWLOCK_USERS;
}
else
{
np = wb->s_count | SRWLOCK_SHARED;
}
}
barrier();
/* Also unlocks lock bit */
l->p = np;
barrier();
if (wb->ex)
{
barrier();
/* Notify the next waiter */
wb->wake = (void *) 1;
barrier();
return;
}
/* We now need to wake all others required. */
for (wake = wb->wake; wake; wake = wake_next)
{
barrier();
wake_next = wake->next;
barrier();
wake->spin = 1;
barrier();
}
}
static int srwlock_wrtrylock(srwlock *s)
{
/* No other readers or writers? */
if (!s->p && (cmpxchg(&s->p, 0, SRWLOCK_USERS) == 0)) return 0;
return EBUSY;
}
這並不是在 Reactos 中的真正實現,對其進行了一些簡化和清理。一個位標誌被移除了,那麼它的表現如何呢,在非競爭場景,其和基於 ticket 的讀寫鎖差不多。在 4 個執行緒競爭的場景表現為:
Writers per 256 | 1 | 25 | 128 | 250 |
---|---|---|---|---|
Time(s) | 2.2 | 3.2 | 5.7 | 6.4 |
這表現的很差,在競爭場景比 dumb 演算法更慢。其獲得的效能改進並不值得這樣複雜的實現。
另外一個可能是用一些位結合讀者數目來描述寫者的狀態。一個類似的技巧在 Linux 核心中被使用來實現其讀寫鎖。是的寫者處於一個飢餓的狀態,我們有了以下實現:
#define RW_WAIT_BIT 0
#define RW_WRITE_BIT 1
#define RW_READ_BIT 2
#define RW_WAIT 1
#define RW_WRITE 2
#define RW_READ 4
typedef unsigned rwlock;
static void wrlock(rwlock *l)
{
while (1)
{
unsigned state = *l;
/* 沒有讀者和寫者 */
if (state < RW_WRITE)
{
/* 設定狀態,加寫鎖 */
if (cmpxchg(l, state, RW_WRITE) == state) return;
/* 有人併發的加了鎖 */
state = *l;
}
/* 設定有寫者在等待 */
if (!(state & RW_WAIT)) atomic_set_bit(l, RW_WAIT_BIT);
/* 等待鎖被釋放 */
while (*l > RW_WAIT) cpu_relax();
}
}
static void wrunlock(rwlock *l)
{ /* 釋放寫鎖 */
atomic_add(l, -RW_WRITE);
}
static int wrtrylock(rwlock *l)
{
unsigned state = *l;
if ((state < RW_WRITE) && (cmpxchg(l, state, state + RW_WRITE) == state)) return 0;
return EBUSY;
}
static void rdlock(rwlock *l)
{
while (1)
{
/* 是否有寫鎖或寫者在等待 */
while (*l & (RW_WAIT | RW_WRITE)) cpu_relax();
/* 獲取讀鎖 */
if (!(atomic_xadd(l, RW_READ) & (RW_WAIT | RW_WRITE))) return;
/* 獲取讀鎖失敗,解讀鎖 */
atomic_add(l, -RW_READ);
}
}
static void rdunlock(rwlock *l)
{
atomic_add(l, -RW_READ);
}
static int rdtrylock(rwlock *l)
{
/* Try to get read lock */
unsigned state = atomic_xadd(l, RW_READ);
if (!(state & (RW_WAIT | RW_WRITE))) return 0;
/* Undo */
atomic_add(l, -RW_READ);
return EBUSY;
}
/* Get a read lock, even if a writer is waiting */
static int rdforcelock(rwlock *l)
{
/* Try to get read lock */
unsigned state = atomic_xadd(l, RW_READ);
/* 即使有寫者在等待也可以強制加讀鎖 */
if (!(state & RW_WRITE)) return 0;
/* Undo */
atomic_add(l, -RW_READ);
return EBUSY;
}
/* Try to upgrade from a read to a write lock atomically */
static int rdtryupgradelock(rwlock *l)
{
/* Someone else is trying (and will succeed) to upgrade to a write lock? */
if (atomic_bitsetandtest(l, RW_WRITE_BIT)) return EBUSY;
/* Don't count myself any more */
atomic_add(l, -RW_READ);
/* Wait until there are no more readers */
while (*l > (RW_WAIT | RW_WRITE)) cpu_relax();
return 0;
}
該鎖實現,和使用 ticket 鎖作為 spinlock 鎖的 dumb 鎖演算法表現差不多。
Writers per 256 | 1 | 25 | 128 | 250 |
---|---|---|---|---|
Time(s) | 2.0 | 3.4 | 3.9 | 4.6 |
在 Linux 核心中實現的版本是用匯編語言寫的,或許會快一些。它使用了一個事實是原子增操作可以用來設定零標誌。也就意味著很慢的 add-and-test 方法是不需要的,可以用一個快的兩條指令替代。
使用半移動的 C 程式碼,我們能做的更好。存在一種為讀寫鎖設計的 ticket 鎖。RedHat 的 David Howells 在 2002 年給 Linux 核心提供一個實現。其大幅優化了 IBM 的 Joseph Seigh 在90年代初提出的版本。一種類似的演算法被 Mellor-Crummey 和 Michael Scott 在他們里程碑式的論文 “Scalable Read-Writer Synchronization for Shared-Memory Multiprocessors”。將其轉化為 C 程式碼實現如下:
typedef union rwticket rwticket;
union rwticket
{
unsigned u;
unsigned short us;
__extension__ struct
{
unsigned char write;
unsigned char read;
unsigned char users;
} s;
};
static void rwticket_wrlock(rwticket *l)
{
unsigned me = atomic_xadd(&l->u, (1<<16));
unsigned char val = me >> 16;
while (val != l->s.write) cpu_relax(); /* ticket 鎖在val為特定值時加鎖成功 */
}
static void rwticket_wrunlock(rwticket *l)
{
rwticket t = *l;
barrier();
t.s.write++;
t.s.read++;
*(unsigned short *) l = t.us;
}
static int rwticket_wrtrylock(rwticket *l)
{
unsigned me = l->s.users;
unsigned char menew = me + 1;
unsigned read = l->s.read << 8;
unsigned cmp = (me << 16) + read + me;
unsigned cmpnew = (menew << 16) + read + me;
if (cmpxchg(&l->u, cmp, cmpnew) == cmp) return 0;
return EBUSY;
}
static void rwticket_rdlock(rwticket *l)
{
unsigned me = atomic_xadd(&l->u, (1<<16));
unsigned char val = me >> 16;
while (val != l->s.read) cpu_relax();
l->s.read++;
}
static void rwticket_rdunlock(rwticket *l)
{
atomic_inc(&l->s.write);
}
static int rwticket_rdtrylock(rwticket *l)
{
unsigned me = l->s.users;
unsigned write = l->s.write;
unsigned char menew = me + 1;
unsigned cmp = (me << 16) + (me << 8) + write;
unsigned cmpnew = ((unsigned) menew << 16) + (menew << 8) + write;
if (cmpxchg(&l->u, cmp, cmpnew) == cmp) return 0;
return EBUSY;
}
以上讀寫鎖表現的很好,在低寫請求比例時比 dumb spinlock 讀寫鎖一樣快,在高寫請求比例時和 dumb ticketlock 讀寫鎖也幾乎一樣快。當沒有競爭時其也沒有效能的下降,在所有的例子花費了 3.7s。
Writers per 256 | 1 | 25 | 128 | 250 |
---|---|---|---|---|
Time(s) | 1.1 | 1.8 | 3.9 | 4.7 |
在主要為讀請求的場景,該演算法比簡單的 spinlock 鎖快 5 倍,它的唯一缺點在於不能自動的將讀鎖升級為寫鎖(可以做,但是 rwticket_wrunlock 需要使用一個原子操作,會導致其變慢一些)。這個缺點也正是Linux核心沒有采用該演算法的原因。另一部分原因在於,如果你擁有一個讀鎖,那麼遞迴的獲取讀鎖總是會成功。然而,如果這個需求不需要的話,那麼這個演算法是一個很好的選擇。
最後需要注意的是該讀寫 ticket 鎖演算法並不是最優的,當讀者和寫者交替的再等待隊列出現時,寫者(執行),讀者1,寫者,讀者2。這兩個讀執行緒可能會被弄混,導致它們能夠併發的執行。比如,第二個讀者可能並不需要等待第二個寫者完成。幸運的是,當執行緒數少時很少出現該場景。對於4個執行緒,當讀者和寫著一樣多時其發生概率為 1/16,其餘場景則會更小。不幸的是,當執行緒數增加時,其速度下降的速率比起最優順序的執行將會是兩倍。
修復這個問題一個明顯需要做的事是,檢測讀者需要在等待連結串列中排隊。然而,因為在4個執行緒併發的場景效果是如此不顯著。我們很難以一個很低的代價來做該檢查。因此當多核機器變得很常見時,該使用哪種演算法將會是一個值得考慮的問題。
相關推薦
讀寫(Read-Write)鎖實現
大部分情況下,使用一個數據結構時並不會對其進行修改。而是隻需要一個區段的讀取許可權來完成工作。如果有多個執行緒需要讀取某一個數據,沒有理由不讓它們併發的進行讀取。Spinlock 鎖無法區分只有讀以及讀寫混合的場景,因為 spinlock 鎖無法滿足這種潛在
如何將NTFS格式的行動硬碟掛接到Mac OS上進行讀寫(Read/Write)操作
現在硬碟便宜,很多同學都有行動硬碟,如果你同時使用Windows與Mac OS的話,行動硬碟最好不要使用NTFS檔案系統,否則在Mac OS上,你只能讀你的行動硬碟,不能寫。 但是實際上的情況是,行動硬碟上有很多東西了,且最初是格式化為了NTFS格式,這時候重新格式化是很麻煩的,要做資料移動。
Java實現CSV格式檔案的讀寫(操作API)
首先引入maven <dependency> <groupId>net.sourceforge.javacsv</groupId> <
windows下測試磁盤讀寫(HD Tune)
hd tune hd tune測試磁盤讀寫速度 3個SATA磁盤組成RAID 5,4個SAS硬盤組成另外一個RAID 5。測試結果如下:用HD Tune測試讀速度:用HD Tune測試寫速度(需要刪除分區再測試):windows下測試磁盤讀寫(HD Tune)
IO讀寫(復制)視頻練習
IOpackage com.chen.io1; import java.io.BufferedInputStream;import java.io.BufferedOutputStream;import java.io.FileInputStream;import java.io.FileNotFoundEx
Hadoop_08_客戶端向HDFS讀寫(上傳)數據流程
pack 查詢 文件路徑 hdfs 校驗 blocks 管理 con 讀取數據 1.HDFS的工作機制: HDFS集群分為兩大角色:NameNode、DataNode (Secondary Namenode) NameNode負責管理整個文件系統的元數據 DataNode
C 檔案讀寫(二進位制檔案)
我們將介紹 C 程式設計師如何建立、開啟、關閉文字檔案或二進位制檔案。 一個檔案,無論它是文字檔案還是二進位制檔案,都是代表了一系列的位元組。C 語言不僅提供了訪問頂層的函式,也提供了底層(OS)呼叫來處理儲存裝置上的檔案。本章將講解檔案管理的重要呼叫。 開啟檔案 您可
C\C++對大檔案的快速讀寫(記憶體對映)
1、 建立檔案(CreateFile),如下: HANDLE CreateFile( LPCTSTR lpFileName, DWORD dwDesiredAccess, DWORD dwShareMode, LPSECURITY_ATTRIBU
linux下使用C++程式操作檔案的讀寫(複製/拷貝)
注意:在linux下使用eclipse+CDT執行下面的程式時,請在root狀態下!!!在終端中,進入root狀態,然後在root狀態下,開啟eclipse,再執行下面的程式。(因為涉及到檔案讀寫許可權,普通狀態可能無法完成操作,root狀態下會更好一些。)一.關於檔案操作的
LINUX C語言檔案的讀寫(非二進位制)
#include<stdio.h> #include<stdlib.h> #include<unistd.h> #include<fcntl.h> #include<string.h> int main() { i
四種讀寫檔案的方式:系統呼叫(open/read/write),C語言(fopen,fgets, fputs),C++(ifstream, ofstream getline,)泛型演算法
第一種方法是系統呼叫 (1)open系統呼叫 原型: #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h>
bug解決-核心C庫防寫(FORTIFY: write: prevented read past end of buffer)
備註:展訊平臺 1、問題描述 昨天同事問我一個問題,報的是一個native crash問題,問題log如下所示: 01-05 00:01:12.600 2794 6237 F libc : Fatal signal 6 (SIGABRT), code -6 in
Golang 入門系列(十六)鎖的使用場景主要涉及到哪些?讀寫鎖為什麼會比普通鎖快
前面已經講過很多Golang系列知識,感興趣的可以看看以前的文章,https://www.cnblogs.com/zhangweizhong/category/1275863.html, 接下來要說的是golang的鎖的使用場景主要涉及到哪些?讀寫鎖為什麼會比普通鎖快。 一、什麼場景下需要用到鎖
大數據【二】HDFS部署及文件讀寫(包含eclipse hadoop配置)
throw 大數據 我的電腦 ssh 生效 manager 方法 slave .sh 一 原理闡述 1‘ DFS 分布式文件系統(即DFS,Distributed File System),指文件系統管理的物理存儲資源不一定直接連接在本地節點上,而是通過計算機網
[轉]C#對文本文件的讀寫(轉)
eve nextline reader 操作系統 為我 reat ini toe http 原網頁:http://www.cnblogs.com/infly123/archive/2013/05/18/3085872.html 計算機在最初只支持ASCII編碼,但是後來為了
C#文件讀寫(txt 簡單方式)
換行 line string txt 不換行 返回 true text empty 1.文件寫入 // 路徑,寫入內容 System.IO.File.WriteAllText(@".\File.txt", string.Empty); 可更換相應的方法 2.文件讀入 /
windows下測試磁盤讀寫(Iometer)
測試磁盤讀寫速度 iometer測試磁盤讀寫速度 參考鏈接:http://hll142475.blog.163.com/blog/static/62138201151113835216/http://blog.csdn.net/yuesichiu/article/details/8499787http
Light libraries是一組通用的C基礎庫,目標是為減少重復造輪子而寫(全部用POSIX C實現)
six clas 原子操作 roi 實現 class 動態庫 readme tps Light libraries是一組通用的C基礎庫,目標是為減少重復造輪子而寫實現了日誌、原子操作、哈希字典、紅黑樹、動態庫加載、線程、鎖操作、配置文件、os適配層、事件驅動、工作隊列、RP
python之文件的讀寫(2)
import 文件讀寫 哈哈 進入 imp std 技術 都是 繼續 小R昨天因為在研究weblogic的漏洞就沒來得及學習python(好吧,這都是借口,懶了,大家可不能像我這樣。要堅持每天都學)。 這個進度是有點慢呀。哎呀,沒事沒事,我還年輕,才20歲。 哈哈,玩
基於zk的分布幸運28源碼下載式鎖(leader選舉)的實現
rest logger 接口 ets abstract 問題 .get single created 做了幸運28源碼下載論壇:haozbbs.com Q1446595067 兩版實現,先是直接用zk的接口做的,後來又用curator做了個。主要是用來在集群環境中確定一個主