網際網路架構(6):併發程式設計--Disruptor併發框架
6 Disruptor併發框架簡介
Martin Fowler在自己網站上寫了一篇LMAX架構的文章,在文章中他介紹了LMAX是一種新型零售金融交易平臺,它能夠以很低的延遲產生大量交易。這個系統是建立在JVM平臺上,其核心是一個業務邏輯處理器,它能夠在一個執行緒裡每秒處理6百萬訂單。業務邏輯處理器完全是執行在記憶體中,使用事件源驅動方式。業務邏輯處理器的核心是Disruptor。
Disruptor它是一個開源的併發框架,並獲得2011 Duke’s 程式框架創新獎,能夠在無鎖的情況下實現網路的Queue併發操作。
Disruptor是一個高效能的非同步處理框架,或者可以認為是最快的訊息框架(輕量的JMS),也可以認為是一個觀察者模式的實現,或者事件監聽模式的實現。
目前我們使用disruptor已經更新到了3.x版本,比之前的2.x版本效能更加的優秀,提供更多的API使用方式。
下載disruptor-3.3.2.jar引入我們的專案既可以開始disruptor之旅。
在使用之前,首先說明disruptor主要功能加以說明,你可以理解為他是一種高效的”生產者-消費者”模型。也就效能遠遠高於傳統的BlockingQueue容器。
(1)使用Disruptor
- 第一:建立一個Event類,用來承載資料,因為Disruptor是一個事件驅動的,所以再Disruptor中是以事件繫結資料進行傳遞的
- 第二:建立一個工廠Event類,用於建立Event類例項物件
- 第三:需要有一個監聽事件類,用於處理資料(Event類)
- 第四:我們需要進行測試程式碼編寫。例項化Disruptor例項,配置一系列引數。然後我們對Disruptor例項繫結監聽事件類,接受並處理資料。
第五:在Disruptor中,真正儲存資料的核心叫做RingBuffer,我們通過Disruptor例項拿到它,然後把資料生產出來,把資料加入到RingBuffer的例項物件中即可。
例項化一個Disruptor物件: //建立Disruptor //1 eventFactory 為 //2 ringBufferSize為RingBuffer緩衝區大小,最好是2的指數倍 //3 執行緒池,進行Disruptor內部的資料接收處理呼叫 //4 第四個引數ProducerType.SINGLE和ProducerType.MULTI,用來指定資料生成者有一個還是多個 //5 第五個引數是一種策略:WaitStrategy /** * 建立Disruptor * @param eventFactory 工廠類物件,用於建立一個個的LongEvent, LongEvent是實際的消費資料,初始化啟動Disruptor的時候,Disruptor會呼叫該工廠方法建立一個個的消費資料例項存放到RingBuffer緩衝區裡面去,建立的物件個數為ringBufferSize指定的 * @param ringBufferSize RingBuffer緩衝區大小 * @param executor 執行緒池,Disruptor內部的對資料進行接收處理時呼叫 * @param producerType 用來指定資料生成者有一個還是多個,有兩個可選值ProducerType.SINGLE和ProducerType.MULTI * @param waitStrategy 一種策略,用來均衡資料生產者和消費者之間的處理效率,預設提供了3個實現類 */ com.lmax.disruptor.dsl.Disruptor.Disruptor<V>(EventFactory<V> eventFactory, int ringBufferSize, Executor executor, ProducerType producerType, WaitStrategy waitStrategy) //BlockingWaitStrategy 是最低效的策略,但其對CPU的消耗最小並且在各種不同部署環境中能提供更加一致的效能表現 WaitStrategy BLOCKING_WAIT = new BlockingWaitStrategy(); //SleepingWaitStrategy 的效能表現跟BlockingWaitStrategy差不多,對CPU的消耗也類似,但其對生產者執行緒的影響最小,適合用於非同步日誌類似的場景 WaitStrategy SLEEPING_WAIT = new SleepingWaitStrategy(); //YieldingWaitStrategy是可以被用在低延遲系統中的兩個策略之一,這種策略在減低系統延遲的同時也會增加CPU運算量。YieldingWaitStrategy策略會迴圈等待sequence增加到合適的值。迴圈中呼叫Thread.yield()允許其他準備好的執行緒執行。如果需要高效能而且事件消費者執行緒比邏輯核心少的時候,推薦使用YieldingWaitStrategy策略。例如:在開啟超執行緒的時候。 WaitStrategy YIELDING_WAIT = new YieldingWaitStrategy(); //BusySpinWaitStrategy是效能最高的等待策略,同時也是對部署環境要求最高的策略。這個效能最好用在事件處理執行緒比物理核心數目還要小的時候。例如:在禁用超執行緒技術的時候。 WaitStrategy BusySpin_WAIT = new BusySpinWaitStrategy(); //連線消費事件方法,其中EventHandler的是為消費者消費訊息的實現類 disruptor.handleEventsWith(? extends EventHandler<V>); //通過例項化的Disruptor物件獲取到RingBuffer緩衝區,然後往緩衝區裡面新增資料並且釋出,消費者就可以消費這個資料了 RingBuffer<V> ringBuffer = disruptor.getRingBuffer();//獲取資料緩衝區 long sequence = ringBuffer.next();//從資料緩衝區中獲取下一個可用事件槽的Id V event = ringBuffer.get(sequence); //從事件槽中獲取一個數據物件(初始化的時候,槽就會生成對應的物件V放到RingBuffer裡面,就是eventFactory返回的物件) event.setValue(bbf.getLong(0));//呼叫Event的方法,設定資料,注意Event完全由使用者實現 ringBuffer.publish(sequence);//釋出事件,釋出的是RingBuffer的事件槽的Id,消費者也是根據這個Id去RingBuffer中獲取對應的事件資料的,另外ringBuffer.publish 方法必須包含在 finally 中以確保必須得到呼叫;如果某個請求的 sequence 未被提交,將會堵塞後續的釋出操作或者其它的 producer。
(2)Disruptor術語
RingBuffer:被看做Disruptor最主要的元件,然而從3.0開始RingBuffer僅僅負責儲存和更新再Disruptor中流通的資料。對一些特殊的使用場景能夠被使用者(使用其他資料結構)完全替代。
Sequence:Disruptor使用Sequence來表示一個特殊元件處理的序號。和Disruptor一樣,每一個消費者(EventProcessor)都維持著一個Sequence。大部分的併發程式碼依賴這些Sequence值得運轉,因此Sequence支援多種當前為AtomicLong類的特性。
Sequencer:這是Disruptor真正的核心。實現了這個介面的兩種生產者(單生產者和多生產者)均實現了所有的併發演算法,為了在生產者和消費者之間進行準確快速的資料傳遞。
SequenceBarrier:由Sequencer生成,並且包含了已經發布的Sequence的引用,這些Sequence源於Sequencer和一些獨立的消費者的Sequence。它包含了決定是否有供消費者消費的Event的邏輯。用來權衡當消費者無法從RingBuffer裡面獲取事件時的處理策略。(例如:當生產者太慢,消費者太快,會導致消費者獲取不到新的事件會根據該策略進行處理,預設會堵塞)
WaitStrategy:決定一個消費者將如何等待生產者將Event置入Disruptor的策略。用來權衡當生產者無法將新的事件放進RingBuffer時的處理策略。(例如:當生產者太快,消費者太慢,會導致生成者獲取不到新的事件槽來插入新事件,則會根據該策略進行處理,預設會堵塞)
Event:從生產者到消費者過程中所處理的資料單元。Disruptor中沒有程式碼表示Event,因為它完全是由使用者定義的。
EventProcessor:主要事件迴圈,處理Disruptor中的Event,並且擁有消費者的Sequence。它有一個實現類是BatchEventProcessor,包含了event loop有效的實現,並且將回調到一個EventHandler介面的實現物件。
EventHandler:由使用者實現並且代表了Disruptor中的一個消費者的介面。
Producer:由使用者實現,它呼叫RingBuffer來插入事件(Event),在Disruptor中沒有相應的實現程式碼,由使用者實現。
WorkProcessor:確保每個sequence只被一個processor消費,在同一個WorkPool中的處理多個WorkProcessor不會消費同樣的sequence。
WorkerPool:一個WorkProcessor池,其中WorkProcessor將消費Sequence,所以任務可以在實現WorkHandler介面的worker之間移交
LifecycleAware:當BatchEventProcessor啟動和停止時,於實現這個介面用於接收通知。
(3)理解RingBuffer
ringbuffer到底是什麼?
答:嗯,正如名字所說的一樣,它是一個環(首尾相接的環),你可以把它用做在不同上下文(執行緒)間傳遞資料的buffer。基本來說,ringbuffer擁有一個序號,這個序號指向陣列中下一個可用元素。
Disruptor說的是生產者和消費者的故事. 有一個數組.生產者往裡面扔芝麻.消費者從裡面撿芝麻. 但是扔芝麻和撿芝麻也要考慮速度的問題. 1 消費者撿的比扔的快 那麼消費者要停下來.生產者扔了新的芝麻,然後消費者繼續. 2 陣列的長度是有限的,生產者到末尾的時候會再從陣列的開始位置繼續.這個時候可能會追上消費者,消費者還沒從那個地方撿走芝麻,這個時候生產者要等待消費者撿走芝麻,然後繼續.
隨著你不停地填充這個buffer(可能也會有相應的讀取),這個序號會一直增長,直到繞過這個環。
要找到陣列中當前序號指向的元素,可以通過mod操作:
Sequence mod Array.length = index in Array
(取模操作)假如當前的Sequence為12,RingBuffer的長度為10,那麼下一個事件槽的ID就為(java的mod語法):12 % 10 = 2
。很簡單吧。由於是取模操作,所以如果槽的個數是2的N次方那麼將更有利於基於二進位制的計算機進行計算。
(4)RingBuffer的特點
- 如果你看了維基百科裡面的關於環形buffer的詞條,你就會發現,我們的實現方式,與其最大的區別在於:沒有尾指標。我們只維護了一個指向下一個可用位置的序號。這種實現是經過深思熟慮的—我們選擇用環形buffer的最初原因就是想要提供可靠的訊息傳遞。
- 我們實現的ring buffer和大家常用的佇列之間的區別是,我們不刪除buffer中的資料,也就是說這些資料一直存放在buffer中,直到新的資料覆蓋他們。這就是和維基百科版本相比,我們不需要尾指標的原因。ringbuffer本身並不控制是否需要重疊。
- 因為它是陣列,所以要比連結串列快,而且有一個容易預測的訪問模式。
- 這是對CPU快取友好的,也就是說在硬體級別,陣列中的元素是會被預載入的,因此在ringbuffer當中,cpu無需時不時去主存載入陣列中的下一個元素。
- 其次,你可以為陣列預先分配記憶體,使得陣列物件一直存在(除非程式終止)。這就意味著不需要花大量的時間用於垃圾回收。此外,不像連結串列那樣,需要為每一個新增到其上面的物件創造節點物件—對應的,當刪除節點時,需要執行相應的記憶體清理操作。
(5)Disruptor應用
Disruptor實際上是對RingBuffer的封裝,所以我們也可以直接使用RingBuffer類
API提供的生產者介面
EventTranslator<V>與EventTranslatorOneArg<V v, Object data>
,前者不能動態傳參,後者可以動態傳遞一個引數data,V為需要建立的資料物件,data為實際資料,實現translateTo(V v, long sequeue, Object data)
方法,其中v就是下一個可用事件槽裡面的物件,data為傳進來的真實資料,呼叫ringBuffer.publishEvent(EventTranslatorOneArg translator, Object data);
來發布資料到RingBuffer中import java.nio.ByteBuffer; import com.lmax.disruptor.EventTranslatorOneArg; import com.lmax.disruptor.RingBuffer; /** * Disruptor 3.0提供了lambda式的API。這樣可以把一些複雜的操作放在Ring Buffer, * 所以在Disruptor3.0以後的版本最好使用Event Publisher或者Event Translator來發布事件 */ public class LongEventProducerWithTranslator { //一個translator可以看做一個事件初始化器,publicEvent方法會呼叫它 //填充Event private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = new EventTranslatorOneArg<LongEvent, ByteBuffer>() { @Override public void translateTo(LongEvent event, long sequeue, ByteBuffer buffer) { event.setValue(buffer.getLong(0)); } }; private final RingBuffer<LongEvent> ringBuffer; public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) { this.ringBuffer = ringBuffer; } public void onData(ByteBuffer buffer){ ringBuffer.publishEvent(TRANSLATOR, buffer); } }
API提供的消費者介面
WorkerPool :
WorkerPool<Order>(RingBuffer<V> ringBuffer, SequenceBarrier sequenceBarrier, ExceptionHandler<? super V> exceptionHandler, WorkHandler<? super V>... workHandlers)
其中RingBuffer
為資料緩衝區,sequenceBarrier
是消費者與生產者之間的協調策略,API預設提供了一個實現類ProcessingSequenceBarrier,可以通過RingBuffer.newBarrier(Sequence... sequencesToTrack);
來獲取,exceptionHandler
為異常處理函式,當handler發生異常時回撥該函式,workHandlers
為實現了EventHandler介面的訊息業務處理類,可以有多個。
WorkerPool啟動的方法是WorkerPool.start(Executor executor)
BatchEventProcessor :
BatchEventProcessor<V>(RingBuffer extends DataProvider, SequenceBarrier sequenceBarrier, EventHandler<? super V> eventHandler)
其中RingBuffer
為資料緩衝區,sequenceBarrier
是消費者與生產者之間的協調策略,API預設提供了一個實現類ProcessingSequenceBarrier,可以通過RingBuffer.newBarrier(Sequence... sequencesToTrack);
來獲取,eventHandler
為實現了EventHandler介面的訊息業務處理類。
BatchEventProcessor啟動的方法是Executor.submit(BatchEventProcessor batchEventProcessor)
**注意**SequenceBarrier是用來協調消費者和生成者之間效率的策略類,所以要想Barrier生效,必須要將消費者消費的Seuence傳遞給RingBuffer,然後由RingBuffer進行協調:ringBuffer.addGatingSequences(BatchEventProcessor.getSequence()); 多消費者時使用BatchEventProcessor.getWorkerSequences()
(這兩個方法WorkerPool同樣適用)。這是在直接使用RingBuffer時需要進行的處理,如果通過Disruptor去進行呼叫,在呼叫handleEventsWith註冊消費者方法時會自動新增該處理。
-Trade.java
import java.util.concurrent.atomic.AtomicInteger;
public class Trade {
private String id;//ID
private String name;
private double price;//金額
private AtomicInteger count = new AtomicInteger(0);
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public double getPrice() {
return price;
}
public void setPrice(double price) {
this.price = price;
}
public AtomicInteger getCount() {
return count;
}
public void setCount(AtomicInteger count) {
this.count = count;
}
}
TradeHandler.java
import java.util.UUID; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.WorkHandler; /** * 實現EventHandler是為了作為BatchEventProcessor的事件處理器, * 實現WorkHandler是為了作為WorkerPool的事件處理器 * @author jliu10 * */ public class TradeHandler implements EventHandler<Trade>, WorkHandler<Trade> { @Override public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception { this.onEvent(event); } @Override public void onEvent(Trade event) throws Exception { //這裡做具體的消費邏輯 event.setId(UUID.randomUUID().toString());//簡單生成下ID System.out.println(event.getId()); } }
Main1.java
import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import com.lmax.disruptor.BatchEventProcessor; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.SequenceBarrier; import com.lmax.disruptor.YieldingWaitStrategy; public class Main1 { public static void main(String[] args) throws Exception { int BUFFER_SIZE=1024; int THREAD_NUMBERS=4; /* * createSingleProducer建立一個單生產者的RingBuffer, * 第一個引數叫EventFactory,從名字上理解就是"事件工廠",其實它的職責就是產生資料填充RingBuffer的區塊。 * 第二個引數是RingBuffer的大小,它必須是2的指數倍 目的是為了將求模運算轉為&運算提高效率 * 第三個引數是RingBuffer的生產都在沒有可用區塊的時候(可能是消費者(或者說是事件處理器) 太慢了)的等待策略 */ final RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<Trade>() { @Override public Trade newInstance() { return new Trade(); } }, BUFFER_SIZE, new YieldingWaitStrategy()); //建立執行緒池 ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS); //建立SequenceBarrier 用來權衡消費者是否可以從ringbuffer裡面獲取事件 SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); //建立訊息處理器 BatchEventProcessor<Trade> transProcessor = new BatchEventProcessor<Trade>( ringBuffer, sequenceBarrier, new TradeHandler()); //這一步的目的就是把消費者的位置資訊引用注入到生產者 如果只有一個消費者的情況可以省略 ringBuffer.addGatingSequences(transProcessor.getSequence()); //把訊息處理器提交到執行緒池 executors.submit(transProcessor); //如果存在多個消費者 那重複執行上面3行程式碼 把TradeHandler換成其它消費者類 Future<?> future= executors.submit(new Callable<Void>() { @Override public Void call() throws Exception { long seq; for(int i=0;i<10;i++){ seq = ringBuffer.next();//佔個坑 --ringBuffer一個可用區塊 ringBuffer.get(seq).setPrice(Math.random()*9999);//給這個區塊放入 資料 ringBuffer.publish(seq);//釋出這個區塊的資料使handler(consumer)可見 } return null; } }); future.get();//等待生產者結束 Thread.sleep(1000);//等上1秒,等消費都處理完成 transProcessor.halt();//通知事件(或者說訊息)處理器 可以結束了(並不是馬上結束!!!) executors.shutdown();//終止執行緒 } }
Main2.java
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.IgnoreExceptionHandler; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.SequenceBarrier; import com.lmax.disruptor.WorkHandler; import com.lmax.disruptor.WorkerPool; public class Main2 { public static void main(String[] args) throws InterruptedException { int BUFFER_SIZE=1024; int THREAD_NUMBERS=4; EventFactory<Trade> eventFactory = new EventFactory<Trade>() { public Trade newInstance() { return new Trade(); } }; RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(eventFactory, BUFFER_SIZE); SequenceBarrier sequenceBarrier = ringBuffer.newBarrier(); ExecutorService executor = Executors.newFixedThreadPool(THREAD_NUMBERS); WorkHandler<Trade> handler = new TradeHandler(); WorkerPool<Trade> workerPool = new WorkerPool<Trade>(ringBuffer, sequenceBarrier, new IgnoreExceptionHandler(), handler); //這一步的目的就是把消費者的位置資訊引用注入到生產者 如果只有一個消費者的情況可以省略 ringBuffer.addGatingSequences(workerPool.getWorkerSequences()); workerPool.start(executor); //下面這個生產8個數據 for(int i=0;i<8;i++){ long seq=ringBuffer.next(); ringBuffer.get(seq).setPrice(Math.random()*9999); ringBuffer.publish(seq); } Thread.sleep(1000); workerPool.halt(); executor.shutdown(); } }
Disruptor註冊消費者的方法是:
Disruptor.handleEventsWith(final EventHandler<? super T>... handlers)
Disruptor提供了一些複雜的並行執行方式。1、生產者A生成的資料同時被B,C兩個消費者消費,兩者都消費完成之後再由消費者D對兩者同時消費。(注意ABC以及下面提到的訊息處理類必須要實現EventHandler介面)
EventHandlerGroup<Trade> handlerGroup = disruptor.handleEventsWith(A, B); //宣告在C1,C2完事之後執行JMS訊息傳送操作 也就是流程走到C3 handlerGroup.then(C);
2、生產者A生成的資料同時被B1,C2兩個消費者消費,而B消耗完畢之後由B2處理,C1處理完成之後由C2處理,B2與C2兩者都消費完成之後再由消費者D對兩者同時消費。其中B1與B2,C1與C2是並行執行的。
disruptor.handleEventsWith(B1, C1); disruptor.after(B1).handleEventsWith(B2); disruptor.after(C1).handleEventsWith(C2); disruptor.after(B2, C2).handleEventsWith(h3);
3、生產者A生成的資料依次被A,B,C三個消費者消費
disruptor.handleEventsWith(A).handleEventsWith(B).handleEventsWith(C);
Main.java
import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import com.lmax.disruptor.BusySpinWaitStrategy; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.dsl.Disruptor; import com.lmax.disruptor.dsl.EventHandlerGroup; import com.lmax.disruptor.dsl.ProducerType; import com.test.sync13.generate1.Trade; public class Main { public static void main(String[] args) throws InterruptedException { long beginTime=System.currentTimeMillis(); int bufferSize=8; ExecutorService executor=Executors.newFixedThreadPool(8); Disruptor<Trade> disruptor = new Disruptor<Trade>(new EventFactory<Trade>() { @Override public Trade newInstance() { return new Trade(); } }, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy()); //菱形操作 /** //使用disruptor建立消費者組C1,C2 EventHandlerGroup<Trade> handlerGroup = disruptor.handleEventsWith(new Handler1(), new Handler2()); //宣告在C1,C2完事之後執行JMS訊息傳送操作 也就是流程走到C3 handlerGroup.then(new Handler3()); */ //順序操作 /** */ disruptor.handleEventsWith(new Handler1()). handleEventsWith(new Handler2()). handleEventsWith(new Handler3()); //六邊形操作. /** Handler1 h1 = new Handler1(); Handler2 h2 = new Handler2(); Handler3 h3 = new Handler3(); Handler4 h4 = new Handler4(); Handler5 h5 = new Handler5(); disruptor.handleEventsWith(h1, h2); disruptor.after(h1).handleEventsWith(h4); disruptor.after(h2).handleEventsWith(h5); disruptor.after(h4, h5).handleEventsWith(h3); */ disruptor.start();//啟動 CountDownLatch latch=new CountDownLatch(1); //生產者準備 executor.submit(new TradePublisher(latch, disruptor)); latch.await();//等待生產者完事. disruptor.shutdown(); executor.shutdown(); System.out.println("總耗時:"+(System.currentTimeMillis()-beginTime)); } }
Handler*.java
import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.WorkHandler; import com.test.sync13.generate1.Trade; public class Handler1 implements EventHandler<Trade>,WorkHandler<Trade> { @Override public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception { this.onEvent(event); } @Override public void onEvent(Trade event) throws Exception { System.out.println("handler1: set name"); event.setName("h1"); Thread.sleep(500); } }
TradePublisher.java
import java.util.Random; import java.util.concurrent.CountDownLatch; import com.lmax.disruptor.EventTranslator; import com.lmax.disruptor.dsl.Disruptor; import com.test.sync13.generate1.Trade; public class TradePublisher implements Runnable { Disruptor<Trade> disruptor; private CountDownLatch latch; private static int LOOP=10;//模擬百萬次交易的發生 public TradePublisher(CountDownLatch latch,Disruptor<Trade> disruptor) { this.disruptor=disruptor; this.latch=latch; } @Override public void run() { TradeEventTranslator tradeTransloator = new TradeEventTranslator(); for(int i=0;i<LOOP;i++){ disruptor.publishEvent(tradeTransloator); } //採用CountDownLatch來保證10個執行緒能夠同時啟動 latch.countDown(); } } class TradeEventTranslator implements EventTranslator<Trade>{ private Random random=new Random(); @Override public void translateTo(Trade event, long sequence) { this.generateTrade(event); } private Trade generateTrade(Trade trade){ trade.setPrice(random.nextDouble()*9999); return trade; } }
Trade.java
import java.util.concurrent.atomic.AtomicInteger; public class Trade { private String id;//ID private String name; private double price;//金額 private AtomicInteger count = new AtomicInteger(0); public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public double getPrice() { return price; } public void setPrice(double price) { this.price = price; } public AtomicInteger getCount() { return count; } public void setCount(AtomicInteger count) { this.count = count; } }
相關推薦
網際網路架構(6):併發程式設計--Disruptor併發框架
6 Disruptor併發框架簡介 Martin Fowler在自己網站上寫了一篇LMAX架構的文章,在文章中他介紹了LMAX是一種新型零售金融交易平臺,它能夠以很低的延遲產生大量交易。這個系統是建立在JVM平臺上,其核心是一個業務邏輯處理器,它能夠在一個執行
網際網路架構(3):併發程式設計--執行緒池
3 Executor框架 為了更好的控制多執行緒,JDK提供了一套執行緒框架Executor,幫助開發人員有效的進行執行緒控制。它們都在java.util.concurrent包中,是JDK併發包的核心。其中有一個比較重要的類:Executors,它扮演著執行
網際網路架構(8):Socket網路通訊程式設計--Netty
三、Socket網路通訊程式設計–Netty Netty是一個提供非同步事件驅動的網路應用框架,用以快速開發高效能、高可靠性的網路伺服器和客戶端程式。 換句話說,Netty是一個NIO框架,使用它可以簡單快速地開發網路應用程式,比如客戶端和服務端的協議。Ne
老司機帶你玩轉面試(6):分散式鎖、併發競爭、雙寫一致
![](https://cdn.geekdigging.com/Interview/mianshi_header_1.jpg) ## 前文回顧 建議前面文章沒看過的同學先看下前面的文章: [「老司機帶你玩轉面試(1):快取中介軟體 Redis 基礎知識以及資料持久化」](https://www.geek
python快速學習系列(6):面向物件程式設計(OOP)
一、面向物件程式設計: 1.比設計模式更重要的是設計原則: 1)面向物件設計的目標: ·可擴充套件:新特性很容易新增到現有系統中,基本不影響系統原有功能 ·可修改:當修改某一部分程式碼時,不會影響到其他不相關的部分 ·可替代:用具有相同介面的程式碼去替換系統中某一部分程式碼時,系統不受影
Python併發程式設計(三):網路程式設計之粘包現象
目錄 一、什麼是粘包 須知:只有TCP有粘包現象,UDP永遠不會粘包 粘包不一定會發生 如果發生了:1.可能是在客戶端已經粘了 2.客戶端沒有粘,可能是在服務端粘了 首先需要掌握一個socket收發訊息的原理 應用
linux命令學習(6):ps命令
bytes 釋放 ice cti width kthread hellip 名稱 pts Linux中的ps命令是Process Status的縮寫。ps命令用來列出系統中當前運行的那些進程。ps命令列出的是當前那些進程的快照,就是執行ps命令的那個時刻的那些進程,如果想要
C++傳智筆記(6):socket客戶端發送報文接受報文的api接口
內存泄露 rcp 分配內存 strcpy light cpp tac 第三方 _file__ #define _CRT_SECURE_NO_WARNINGS #include "stdio.h" #include "stdlib.h" #include "string.
Windows Phone開發(6):處理屏幕方向的改變
cati sources mon stack mar ber XML break pac 俺們都知道,智能手機可以通過旋轉手機來改變屏幕的顯示方向,更多的時候,對於屏幕方向的改變,我們要做出相應的處理,例如,當手機屏幕方向從縱向變為橫向時,可能要重新排列頁面上的控件以適應顯
設計模式六大原則(6):開閉原則
思考 外部 編程人員 恰恰 單一職責 何事 適應 擴展 分享 開閉原則 定義:一個軟件實體如類、模塊和函數應該對擴展開放,對修改關閉。 問題由來:在軟件的生命周期內,因為變化、升級和維護等原因需要對軟件原有代碼進行修改時,可能會給舊代碼中引入錯誤,也可能會使我們不得不對
springBoot(6):web開發-模板引擎jsp
spring boot 一、新建工程 註意新建的工程下沒有webapp目錄eclipse下會自動創建webapp目錄這裏我們需要自動創建一個webapp目錄並創建WEB-INF。 對ServletInitializer.java進行說明 1、這個類相當於我們以前的web.xml 2、只有3.0以上才
學習用Node.js和Elasticsearch構建搜索引擎(6):實際項目中常用命令使用記錄
nds 黃色 ati cat htm action last shard open 1、檢測集群是否健康。 curl -XGET ‘localhost:9200/_cat/health?v‘#後面加一個v表示讓輸出內容表格顯示表頭 綠色表示一切正常,黃色表示所有
高性能服務器架構(一):緩沖策略
lin 特點 領域 思路 不能 查表 edi 操作 帶寬 原文鏈接:https://mp.weixin.qq.com/s?__biz=MzA5ODExMTkwMA==&mid=402675187&idx=1&sn=d240f6d1430b86bc00
EasyPR源碼剖析(6):車牌判斷之LBP特征
extend 順序 位置 feature tput ray bpf range str 一、LBP特征 LBP指局部二值模式,英文全稱:Local Binary Pattern,是一種用來描述圖像局部特征的算子,LBP特征具有灰度不變性和旋轉不變性等顯著優點。 原始的LBP
python函數(6):內置函數和匿名函數
a20 *args -s 執行 code str 思維導圖 inpu 其他 我們學了這麽多關於函數的知識基本都是自己定義自己使用,那麽我們之前用的一些函數並不是我們自己定義的比如說print(),len(),type()等等,它們是哪來的呢? 一、內置函數 由pytho
Linux自學筆記(6):Linux文件系統及文件類型
linux文件類型 linux文件系統 Linux的文件系統:跟文件系統(rootfs)root filesystem LSB,FHS:linux發行標準1 bin boot dev etc home lib lib64 media mnt opt proc root run
《Linux學習並不難》Linux常用操作命令(6):uname命令顯示計算機和系統相關信息
Linux8.6 《Linux學習並不難》Linux常用操作命令(6):uname命令顯示計算機和系統相關信息 使用uname命令可以顯示計算機以及操作系統的相關信息,比如計算機硬件架構、內核發行號、操作系統名稱、計算機主機名等。 命令語法: uname [選項] 命令中各選項的
《Linux學習並不難》用戶管理(6):刪除Linux用戶賬戶
Linux 用戶 userdel 9.6 《Linux學習並不難》用戶管理(6):刪除Linux用戶賬戶 使用userdel命令可以在Linux系統中刪除用戶賬戶,甚至連用戶的主目錄也一起刪除。命令語法:userdel [選項] [用戶名]命令中各選項的含義如表所示。選項 選項含
《Linux學習並不難》文件/目錄管理(6):mkdir命令創建目錄
Linux mkdir 目錄 7.6 《Linux學習並不難》文件/目錄管理(6):mkdir命令創建目錄使用mkdir命令可以在Linux系統中創建目錄。 命令語法:mkdir [選項] [目錄]命令中各選項的含義如表所示。選項 選項含義 -m <權限模式>對新創建的目錄設置權限
Exchange 2016與國內版O365混合部署(6):混合後的操作和驗證
分享圖片 In 報告 mailbox inf 通訊錄 png 完成後 動手 雲端和本地統一的通訊錄: AAD Connect 目錄同步後,登錄O365管理員界面查看用戶,可以看到本地的兩個用戶已經同步上來了,同步類型顯示已與AD同步。 登錄一個雲端用戶的郵箱,在收件人處輸