1. 程式人生 > >Disruptor之粗糙認識

Disruptor之粗糙認識

sta service generated port buffers except warning 創建 exception

一 概述

1.Disruptor

Disruptor是一個高性能的異步處理框架,一個“生產者-消費者”模型。

2.RingBuffer

RingBuffer是一種環形數據結構,包含一個指向下一個槽點的序號,可以在線程間傳遞數據。

3.Event

在Disruptor框架中,生產者生產的數據叫做Event。

二 Disruptor框架基本構成

1.MyEvent:自定義對象,充當“生產者-消費者”模型中的數據。
2.MyEventFactory:實現EventFactory的接口,用於生產數據。
3.MyEventProducerWithTranslator:將數據存儲到自定義對象中並發布。


4.MyEventHandler:自定義消費者。

三 Demo

初次接觸Disruptor,認識停留在表面,零散,模糊,在此記一個簡單的示例,以便日後深入研究。

1.自定義數據類

package com.disruptor.basic;

public class LongEvent {
    private long value;

    public long getValue() {
        return value;
    }

    public void setValue(long value) {
        this.value = value;
    }

}

2.數據生產工廠(創建數據類對象)

package com.disruptor.basic;

import com.lmax.disruptor.EventFactory;

public class LongEventFactory implements EventFactory<LongEvent> {

    public LongEvent newInstance() {
        // TODO Auto-generated method stub
        return new LongEvent();
    }

}

3.數據源(初始化數據對象並發布)

package com.disruptor.basic;

import java.nio.ByteBuffer;

import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;

public class LongEventProducerWithTranslator {

    private final RingBuffer<LongEvent> ringBuffer;

    public LongEventProducerWithTranslator(RingBuffer<LongEvent> ringBuffer) {
        this.ringBuffer = ringBuffer;
    }

    private final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = new EventTranslatorOneArg<LongEvent, ByteBuffer>() {
        /**
         * event:包含有消費數據的對象; sequence:分配給目標對象的RingBuffer空間序號;
         * bb:包含有將要被存儲到目標對象中的數據的容器
         */
        public void translateTo(LongEvent event, long sequence, ByteBuffer bb) {
            // TODO Auto-generated method stub
            event.setValue(bb.getLong(0));// 將數據存儲到目標對象中
        }
    };

    public void onData(ByteBuffer bb) {
        ringBuffer.publishEvent(TRANSLATOR, bb);// 發布,將數據推送給消費者
    }

}

4.消費者

package com.disruptor.basic;

import com.lmax.disruptor.EventHandler;

public class LongEventHandler implements EventHandler<LongEvent> {

    public void onEvent(LongEvent event, long sequence, boolean endOfBatch) throws Exception {
        // TODO Auto-generated method stub
        System.out.println("當前消費的數據="+event.getValue());
    }

}

5.測試類

package com.disruptor.basic;

import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.junit.Test;

import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.YieldingWaitStrategy;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;

public class LongEventTest {

    @SuppressWarnings({ "unchecked", "deprecation" })
    @Test
    public void test01() throws InterruptedException {
        ExecutorService executor = Executors.newCachedThreadPool();
        EventFactory<LongEvent> factory = new LongEventFactory();
        int bufferSize = 1024;
        Disruptor<LongEvent> disruptor = new Disruptor<LongEvent>(factory, bufferSize, executor, ProducerType.SINGLE,
                new YieldingWaitStrategy());
        disruptor.handleEventsWith(new LongEventHandler());
        disruptor.start();

        RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
        // LongEventProducer producer = new
        // LongEventProducer(ringBuffer);
        LongEventProducerWithTranslator producer = new LongEventProducerWithTranslator(ringBuffer);
        ByteBuffer bb = ByteBuffer.allocate(8);
        // long startTime = System.currentTimeMillis();
        for (long a = 0; a < 100; a++) {
            bb.putLong(0, a);
            producer.onData(bb);
            /*if (a == 99) {
                long endTime = System.currentTimeMillis();
                System.out.println("useTime=" + (endTime - startTime));
            }*/
            Thread.sleep(100);
        }
        /*long endTime = System.currentTimeMillis();
        System.out.println("useTime=" + (endTime - startTime));*/
        disruptor.shutdown();
        executor.shutdown();
    }

    /*@Test
    public void test02() {
        long startTime = System.currentTimeMillis();
        for (long a = 0; a < 100; a++) {
            System.out.println(a);
        }
        long endTime = System.currentTimeMillis();
        System.out.println("useTime=" + (endTime - startTime));
    }*/

}

Disruptor之粗糙認識