1. 程式人生 > >Disruptor併發框架-2

Disruptor併發框架-2


import java.util.concurrent.atomic.AtomicInteger;

public class Trade {  
	
	private String id;//ID  
	private String name;
	private double price;//金額  
	private AtomicInteger count = new AtomicInteger(0);
	
	public String getId() {
		return id;
	}
	public void setId(String id) {
		this.id = id;
	}
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	public double getPrice() {
		return price;
	}
	public void setPrice(double price) {
		this.price = price;
	}
	public AtomicInteger getCount() {
		return count;
	}
	public void setCount(AtomicInteger count) {
		this.count = count;
	} 
	  
	  
}  

import java.util.UUID;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;

public class TradeHandler implements EventHandler<Trade>, WorkHandler<Trade> {  
	  
    @Override  
    public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {  
        this.onEvent(event);  
    }  
  
    @Override  
    public void onEvent(Trade event) throws Exception {  
        //這裡做具體的消費邏輯  
        event.setId(UUID.randomUUID().toString());//簡單生成下ID  
        System.out.println(event.getId());  
    }  
}  

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import com.lmax.disruptor.BatchEventProcessor;
import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.YieldingWaitStrategy;

public class Main1 {  
   
	public static void main(String[] args) throws Exception {  
        int BUFFER_SIZE=1024;  
        int THREAD_NUMBERS=4;  
        /* 
         * createSingleProducer建立一個單生產者的RingBuffer, 
         * 第一個引數叫EventFactory,從名字上理解就是"事件工廠",其實它的職責就是產生資料填充RingBuffer的區塊。 
         * 第二個引數是RingBuffer的大小,它必須是2的指數倍 目的是為了將求模運算轉為&運算提高效率 
         * 第三個引數是RingBuffer的生產都在沒有可用區塊的時候(可能是消費者(或者說是事件處理器) 太慢了)的等待策略 
         */  
        final RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(new EventFactory<Trade>() {  
            @Override  
            public Trade newInstance() {  
                return new Trade();  
            }  
        }, BUFFER_SIZE, new YieldingWaitStrategy());  
        
        //建立執行緒池  
        ExecutorService executors = Executors.newFixedThreadPool(THREAD_NUMBERS);  
        
        //建立SequenceBarrier  
        SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();  
          
        //建立訊息處理器  
        BatchEventProcessor<Trade> transProcessor = new BatchEventProcessor<Trade>(  
                ringBuffer, sequenceBarrier, new TradeHandler());  
          
        //這一步的目的就是把消費者的位置資訊引用注入到生產者    如果只有一個消費者的情況可以省略 
        ringBuffer.addGatingSequences(transProcessor.getSequence());  
          
        //把訊息處理器提交到執行緒池  
        executors.submit(transProcessor);  
        
        //如果存在多個消費者 那重複執行上面3行程式碼 把TradeHandler換成其它消費者類  
          
        Future<?> future= executors.submit(new Callable<Void>() {  
            @Override  
            public Void call() throws Exception {  
                long seq;  
                for(int i=0;i<10;i++){  
                    seq = ringBuffer.next();//佔個坑 --ringBuffer一個可用區塊  
                    ringBuffer.get(seq).setPrice(Math.random()*9999);//給這個區塊放入 資料 
                    ringBuffer.publish(seq);//釋出這個區塊的資料使handler(consumer)可見  
                }  
                return null;  
            }  
        }); 
        
        future.get();//等待生產者結束  
        Thread.sleep(1000);//等上1秒,等消費都處理完成  
        transProcessor.halt();//通知事件(或者說訊息)處理器 可以結束了(並不是馬上結束!!!)  
        executors.shutdown();//終止執行緒  
    }  
}  

 


import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.IgnoreExceptionHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.WorkHandler;
import com.lmax.disruptor.WorkerPool;

public class Main2 {  
    public static void main(String[] args) throws InterruptedException {  
        int BUFFER_SIZE=1024;  
        int THREAD_NUMBERS=4;  
        
        EventFactory<Trade> eventFactory = new EventFactory<Trade>() {  
            public Trade newInstance() {  
                return new Trade();  
            }  
        };  
        
        RingBuffer<Trade> ringBuffer = RingBuffer.createSingleProducer(eventFactory, BUFFER_SIZE);  
          
        SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();  
          
        ExecutorService executor = Executors.newFixedThreadPool(THREAD_NUMBERS);  
          
        WorkHandler<Trade> handler = new TradeHandler();  

        WorkerPool<Trade> workerPool = new WorkerPool<Trade>(ringBuffer, sequenceBarrier, new IgnoreExceptionHandler(), handler);  
          
        workerPool.start(executor);  
          
        //下面這個生產8個數據
        for(int i=0;i<8;i++){  
            long seq=ringBuffer.next();  
            ringBuffer.get(seq).setPrice(Math.random()*9999);  
            ringBuffer.publish(seq);  
        }  
          
        Thread.sleep(1000);  
        workerPool.halt();  
        executor.shutdown();  
    }  
}