1. 程式人生 > 實用技巧 >輕量級java訊息中介軟體簡介

輕量級java訊息中介軟體簡介

uncode-mq

java輕量級訊息中介軟體。https://github.com/uncodecn/uncode-mq


功能特點

  1. 訊息儲存速度非常快速。
  2. 使用簡單方便,目前只支援topic方式。
  3. 依賴java環境。

說明:目前只在部分專案中使用,歡迎學習交流。


模組架構

umq

說明:叢集由多個Group組成一個佇列,每個Group由Master和Salve兩個Broker組成,整體無中心架構。


部署

1下載檔案解壓umq-*.tar.gz到任意目錄。

2 配置資訊

在conf/config.properties檔案中填寫相關資訊。

mq.host=192.168.1.43 #本機ip
mq.port=9000 #埠
mq.replica.host=192.168.7.131 #本機作為備機的主機ip
mq.replica.fetch.size=100 #每次備份時同步的資料條數,預設30
mq.replica.fetch.interval=2 #備份同步時間間隔,預設2秒
mq.log.dir=./data #資料儲存目錄,預設data,不建議修改
mq.data.persistence.interval=2 #資料持久化的時間間隔,預設2秒
mq.enable.zookeeper=true #是否使用zk,叢集環境下必須使用
mq.zk.connect=192.168.1.14:2181 #zk地址
mq.zk.username=admin #zk使用者名稱
mq.zk.password=password #zk密碼
mq.zk.connectiontimeout.ms=6000 #zk連線超時時間
mq.zk.sessiontimeout.ms=6000 #zk連線session過期時間
mq.zk.data.persistence.interval=6000 #zk資料同步時間,預設6秒

3 啟動執行startup.sh,停止執行shutdown.sh,檢視執行狀態執行status.sh,檢視主題資訊執行info.sh,清除zk相關資訊執行zkclear.sh。

4 目錄

umq/conf 配置
umq/data 資料儲存
umq/logs 日誌
umq/lib 依賴jar


生產者

生產者為單例,必須最少執行一次connect操作,連線成功後不會重複connect。

String cfg = "file:/gitlib/uncode-mq/conf/config.properties";
Producer.getInstance().connect(cfg);
for(int i=0;i<10000;i++){
    List<Topic> list = new ArrayList<Topic>();
    Topic topic = new Topic();
    topic.setTopic("umq");
    topic.addContent("umq作者juny=>"+i);
    list.add(topic);
    Producer.getInstance().send(list);
}

或

Properties config = new Properties();
config.setProperty("mq.port", "9000");
config.setProperty("mq.zk.connect", "192.168.1.14:2181");
config.setProperty("mq.enable.zookeeper", "true");
ServerConfig serverConfig = new ServerConfig(config);
Producer.getInstance().connect(serverConfig);
for(int i=0;i<10000;i++){
    List<Topic> list = new ArrayList<Topic>();
    Topic topic = new Topic();
    topic.setTopic("umq");
    topic.addContent("umq作者juny=>"+i);
    list.add(topic);
    Producer.getInstance().send(list);
}

消費者

1 普通方式

String cfg = "file:/gitlib/uncode-mq/conf/config.properties";
Consumer.runningConsumerRunnable(cfg);
Consumer.addSubscriber(new ConsumerSubscriber(){

    //訂閱主題
    @Override
    public List<String> subscribeToTopic() {
        List<String> tps = new ArrayList<String>();
        tps.add("umq");
        return tps;
    }

    //通知
    @Override
    public void notify(Topic topic) {
        System.err.println("consumer subscriber:"+topic.toString());
    }

});

2 與spring整合

@Service
public class MyConsumerSubscriber implements ConsumerSubscriber {

    public static final String CFG = "file:/gitlib/uncode-mq/conf/config.properties";

    @Autowired
    LogService logServiceImpl;

    public ExpressRecordConsumerSubscriber() {
        //註冊訂閱者
        try {
            Consumer.runningConsumerRunnable(CFG);
            Consumer.addSubscriber(this);
        } catch (ConnectException e) {
            e.printStackTrace();
        }
    }

    //訂閱主題
    @Override
    public List<String> subscribeToTopic() {
        List<String> tps = new ArrayList<String>();
        tps.add("umq");
        return tps;
    }

    @Override
    public void notify(Topic topic) {
        //處理邏輯
    }

}

轉載於:https://my.oschina.net/uncode/blog/726719