輕量級java訊息中介軟體簡介
阿新 • • 發佈:2020-10-21
uncode-mq
java輕量級訊息中介軟體。https://github.com/uncodecn/uncode-mq
功能特點
- 訊息儲存速度非常快速。
- 使用簡單方便,目前只支援topic方式。
- 依賴java環境。
說明:目前只在部分專案中使用,歡迎學習交流。
模組架構
說明:叢集由多個Group組成一個佇列,每個Group由Master和Salve兩個Broker組成,整體無中心架構。
部署
1下載檔案解壓umq-*.tar.gz到任意目錄。
2 配置資訊
在conf/config.properties檔案中填寫相關資訊。
mq.host=192.168.1.43 #本機ip mq.port=9000 #埠 mq.replica.host=192.168.7.131 #本機作為備機的主機ip mq.replica.fetch.size=100 #每次備份時同步的資料條數,預設30 mq.replica.fetch.interval=2 #備份同步時間間隔,預設2秒 mq.log.dir=./data #資料儲存目錄,預設data,不建議修改 mq.data.persistence.interval=2 #資料持久化的時間間隔,預設2秒 mq.enable.zookeeper=true #是否使用zk,叢集環境下必須使用 mq.zk.connect=192.168.1.14:2181 #zk地址 mq.zk.username=admin #zk使用者名稱 mq.zk.password=password #zk密碼 mq.zk.connectiontimeout.ms=6000 #zk連線超時時間 mq.zk.sessiontimeout.ms=6000 #zk連線session過期時間 mq.zk.data.persistence.interval=6000 #zk資料同步時間,預設6秒
3 啟動執行startup.sh,停止執行shutdown.sh,檢視執行狀態執行status.sh,檢視主題資訊執行info.sh,清除zk相關資訊執行zkclear.sh。
4 目錄
umq/conf 配置
umq/data 資料儲存
umq/logs 日誌
umq/lib 依賴jar
生產者
生產者為單例,必須最少執行一次connect操作,連線成功後不會重複connect。
String cfg = "file:/gitlib/uncode-mq/conf/config.properties"; Producer.getInstance().connect(cfg); for(int i=0;i<10000;i++){ List<Topic> list = new ArrayList<Topic>(); Topic topic = new Topic(); topic.setTopic("umq"); topic.addContent("umq作者juny=>"+i); list.add(topic); Producer.getInstance().send(list); } 或 Properties config = new Properties(); config.setProperty("mq.port", "9000"); config.setProperty("mq.zk.connect", "192.168.1.14:2181"); config.setProperty("mq.enable.zookeeper", "true"); ServerConfig serverConfig = new ServerConfig(config); Producer.getInstance().connect(serverConfig); for(int i=0;i<10000;i++){ List<Topic> list = new ArrayList<Topic>(); Topic topic = new Topic(); topic.setTopic("umq"); topic.addContent("umq作者juny=>"+i); list.add(topic); Producer.getInstance().send(list); }
消費者
1 普通方式
String cfg = "file:/gitlib/uncode-mq/conf/config.properties"; Consumer.runningConsumerRunnable(cfg); Consumer.addSubscriber(new ConsumerSubscriber(){ //訂閱主題 @Override public List<String> subscribeToTopic() { List<String> tps = new ArrayList<String>(); tps.add("umq"); return tps; } //通知 @Override public void notify(Topic topic) { System.err.println("consumer subscriber:"+topic.toString()); } });
2 與spring整合
@Service public class MyConsumerSubscriber implements ConsumerSubscriber { public static final String CFG = "file:/gitlib/uncode-mq/conf/config.properties"; @Autowired LogService logServiceImpl; public ExpressRecordConsumerSubscriber() { //註冊訂閱者 try { Consumer.runningConsumerRunnable(CFG); Consumer.addSubscriber(this); } catch (ConnectException e) { e.printStackTrace(); } } //訂閱主題 @Override public List<String> subscribeToTopic() { List<String> tps = new ArrayList<String>(); tps.add("umq"); return tps; } @Override public void notify(Topic topic) { //處理邏輯 } }
轉載於:https://my.oschina.net/uncode/blog/726719