RocketMQ-單機版安裝及遠端連線測試
阿新 • • 發佈:2019-01-08
安裝需要
jdk 1.8
centos 7
rocketmq 4.2.0
因為我是在阿里雲ECS上安裝的,所以centos就不用說了,jdk的安裝,如果沒有安裝可以看這篇文章:CentOS配置JAVA_HOME,下面就開始正式的安裝過程了。
安裝步驟
1. 下載rocketmq 4.2.0
通過wget命令下載,首先安裝wget
yum install wget
然後下載rocketmq,官網下載地址,可以選擇需要的版本,我下載的是rocketmq4.2.0
wget http://www-us.apache.org/dist/rocketmq/4.2.0/rocketmq-all-4.2 .0-bin-release.zip
解壓
yum install zip unzip //安裝unzip命令
unzip rocketmq-all-4.2.0-bin-release.zip -d rocketmq-4.2.0 //解壓
解壓好後:
將rocketmq-4.2.0這個目錄copy到/usr/local路徑下(個人習慣,可以忽略)
cp -r rocketmq-4.2.0 /usr/local
這樣準備工作就已經完成了,無需安裝就可以直接使用了,Apache的很多東西都是這樣。
2. 啟動rocketmq
進入rocketmq目錄
cd /usr/local/rocketmq-4.2 .0
- 啟動Name Server
> nohup sh bin/mqnamesrv & // 啟動
> tail -f ~/logs/rocketmqlogs/namesrv.log // 檢視namaserver日誌
The Name Server boot success...
注意:啟動過程中可能報錯顯示記憶體不足,報錯資訊如下:
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 33554432 bytes for committing reserved memory.
修改bin目錄下的runserver.sh檔案:
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=128m -XX:MaxPermSize=320m"
同理修改runbroker.sh檔案:
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m
同理修改tools.sh檔案:
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=128m -XX:MaxPermSize=128m"
啟動成功:
- 啟動Broker
> nohup sh bin/mqbroker -n localhost:9876 & // 啟動
> tail -f ~/logs/rocketmqlogs/broker.log // 檢視broker日誌
The broker[%s, 172.30.30.233:10911] boot success...
注意:官網的這個啟動命令特別坑,啟動時broker會通過私有ip啟動,會導致客戶端無法遠端連線,所以啟動之前我們需要修改一下配置檔案,修改如下:
vim /usr/local/rocketmq-4.2.0/conf/broker.conf
然後通過以下命令啟動:
nohup sh bin/mqbroker -n xxxx:9876 autoCreateTopicEnable=true -c /usr/local/rocketmq-4.2.0/conf/broker.conf & // 啟動broker,xxxx為你的公有ip,或者是localhost也可以
tail -f ~/logs/rocketmqlogs/broker.log // 檢視broker日誌
啟動成功:
3. 客戶端遠端連線程式碼
執行以下程式碼之前需要在阿里雲安全組中開放以下埠:
需要的maven依賴
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.2.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.29</version>
</dependency>
</dependencies>
Producer 程式碼
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
// 宣告並初始化一個producer
// 需要一個producer group名字作為構造方法的引數,這裡為producer1
DefaultMQProducer producer = new DefaultMQProducer("producer1");
producer.setVipChannelEnabled(false);
// 設定NameServer地址,此處應改為實際NameServer地址,多個地址之間用;分隔
// NameServer的地址必須有
// producer.setClientIP("119.23.211.22");
// producer.setInstanceName("Producer");
producer.setNamesrvAddr("119.23.211.22:9876");
// 呼叫start()方法啟動一個producer例項
producer.start();
// 傳送1條訊息到Topic為TopicTest,tag為TagA,訊息內容為“Hello RocketMQ”拼接上i的值
try {
// 封裝訊息
Message msg = new Message("TopicTest",// topic
"TagA",// tag
("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET)// body
);
// 呼叫producer的send()方法傳送訊息
// 這裡呼叫的是同步的方式,所以會有返回結果
SendResult sendResult = producer.send(msg);
// 列印返回結果
System.out.println(sendResult);
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
//傳送完訊息之後,呼叫shutdown()方法關閉producer
System.out.println("send success");
producer.shutdown();
}
}
Consumer 程式碼
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
//宣告並初始化一個consumer
//需要一個consumer group名字作為構造方法的引數,這裡為consumer1
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1");
//consumer.setVipChannelEnabled(false);
//同樣也要設定NameServer地址
consumer.setNamesrvAddr("119.23.211.22:9876");
//這裡設定的是一個consumer的消費策略
//CONSUME_FROM_LAST_OFFSET 預設策略,從該佇列最尾開始消費,即跳過歷史訊息
//CONSUME_FROM_FIRST_OFFSET 從佇列最開始開始消費,即歷史訊息(還儲存在broker的)全部消費一遍
//CONSUME_FROM_TIMESTAMP 從某個時間點開始消費,和setConsumeTimestamp()配合使用,預設是半個小時以前
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//設定consumer所訂閱的Topic和Tag,*代表全部的Tag
consumer.subscribe("TopicTest", "*");
//設定一個Listener,主要進行訊息的邏輯處理
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
System.out.println("----------------------------------------------------------------------------------");
//返回消費狀態
//CONSUME_SUCCESS 消費成功
//RECONSUME_LATER 消費失敗,需要稍後重新消費
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//呼叫start()方法啟動consumer
consumer.start();
System.out.println("Consumer Started.");
}
}
測試結果:先啟動Consumer,接受到Producer傳送的訊息會將詳細打印出來
Consumer Started.
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=3, storeSize=176, queueOffset=1352, sysFlag=0, bornTimestamp=1525920532015, bornHost=/115.236.50.15:59835, storeTimestamp=1525920532023, storeHost=/119.23.211.22:10911, msgId=7717D31600002A9F00000000000EDF4C, commitLogOffset=974668, bodyCRC=1774740973, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1353, CONSUME_START_TIME=1525920532107, UNIQ_KEY=0A15211928C018B4AAC230AB4A2E0000, WAIT=true, TAGS=TagA}, body=14]]]
----------------------------------------------------------------------------------
SendResult [sendStatus=SEND_OK, msgId=0A152119388C18B4AAC230AE98E10000, offsetMsgId=7717D31600002A9F00000000000EE15C, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=1353]
send success
4. 遇到的錯誤:
- broker啟動ip錯誤
這裡如果你之間安裝過docker環境,broker啟動時,如果沒有設定brokerIP1的IP地址,也有可能會通過docker的虛擬ip啟動,導致遠端無法連線。
本文參考文章地址如下: