1. 程式人生 > >RocketMQ-單機版安裝及遠端連線測試

RocketMQ-單機版安裝及遠端連線測試

安裝需要

jdk 1.8
centos 7
rocketmq 4.2.0

因為我是在阿里雲ECS上安裝的,所以centos就不用說了,jdk的安裝,如果沒有安裝可以看這篇文章:CentOS配置JAVA_HOME,下面就開始正式的安裝過程了。

安裝步驟

1. 下載rocketmq 4.2.0

通過wget命令下載,首先安裝wget

yum install wget

然後下載rocketmq,官網下載地址,可以選擇需要的版本,我下載的是rocketmq4.2.0

wget http://www-us.apache.org/dist/rocketmq/4.2.0/rocketmq-all-4.2
.0-bin-release.zip

解壓

yum install zip unzip  //安裝unzip命令

unzip rocketmq-all-4.2.0-bin-release.zip -d rocketmq-4.2.0 //解壓

解壓好後:
這裡寫圖片描述

將rocketmq-4.2.0這個目錄copy到/usr/local路徑下(個人習慣,可以忽略)

cp -r rocketmq-4.2.0 /usr/local

這樣準備工作就已經完成了,無需安裝就可以直接使用了,Apache的很多東西都是這樣。

2. 啟動rocketmq

進入rocketmq目錄

cd /usr/local/rocketmq-4.2
.0
  • 啟動Name Server
  > nohup sh bin/mqnamesrv & // 啟動
  > tail -f ~/logs/rocketmqlogs/namesrv.log // 檢視namaserver日誌
  The Name Server boot success...

注意:啟動過程中可能報錯顯示記憶體不足,報錯資訊如下:

# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to
map 33554432 bytes for committing reserved memory.

修改bin目錄下的runserver.sh檔案:

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=128m -XX:MaxPermSize=320m"

同理修改runbroker.sh檔案:

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m

同理修改tools.sh檔案:

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=128m -XX:MaxPermSize=128m"

啟動成功:
這裡寫圖片描述

  • 啟動Broker
  > nohup sh bin/mqbroker -n localhost:9876 & // 啟動
  > tail -f ~/logs/rocketmqlogs/broker.log // 檢視broker日誌
  The broker[%s, 172.30.30.233:10911] boot success...

注意:官網的這個啟動命令特別坑,啟動時broker會通過私有ip啟動,會導致客戶端無法遠端連線,所以啟動之前我們需要修改一下配置檔案,修改如下:

vim /usr/local/rocketmq-4.2.0/conf/broker.conf

這裡寫圖片描述

然後通過以下命令啟動:

nohup sh bin/mqbroker -n xxxx:9876 autoCreateTopicEnable=true -c /usr/local/rocketmq-4.2.0/conf/broker.conf & // 啟動broker,xxxx為你的公有ip,或者是localhost也可以

tail -f ~/logs/rocketmqlogs/broker.log // 檢視broker日誌

啟動成功:
這裡寫圖片描述

3. 客戶端遠端連線程式碼

執行以下程式碼之前需要在阿里雲安全組中開放以下埠:
這裡寫圖片描述

需要的maven依賴

 <dependencies>
    <dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-client</artifactId>
      <version>4.2.0</version>
    </dependency>
    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>1.2.29</version>
    </dependency>
  </dependencies>

Producer 程式碼

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {

        // 宣告並初始化一個producer
        // 需要一個producer group名字作為構造方法的引數,這裡為producer1
        DefaultMQProducer producer = new DefaultMQProducer("producer1");
        producer.setVipChannelEnabled(false);
        // 設定NameServer地址,此處應改為實際NameServer地址,多個地址之間用;分隔
        // NameServer的地址必須有
        // producer.setClientIP("119.23.211.22");
        // producer.setInstanceName("Producer");
        producer.setNamesrvAddr("119.23.211.22:9876");

        // 呼叫start()方法啟動一個producer例項
        producer.start();

        // 傳送1條訊息到Topic為TopicTest,tag為TagA,訊息內容為“Hello RocketMQ”拼接上i的值
        try {
            // 封裝訊息
            Message msg = new Message("TopicTest",// topic
                    "TagA",// tag
                    ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET)// body
            );
            // 呼叫producer的send()方法傳送訊息
            // 這裡呼叫的是同步的方式,所以會有返回結果
            SendResult sendResult = producer.send(msg);
            // 列印返回結果
            System.out.println(sendResult);
        } catch (RemotingException e) {
            e.printStackTrace();
        } catch (MQBrokerException e) {
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        //傳送完訊息之後,呼叫shutdown()方法關閉producer
        System.out.println("send success");
        producer.shutdown();
    }
}

Consumer 程式碼

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        //宣告並初始化一個consumer
        //需要一個consumer group名字作為構造方法的引數,這裡為consumer1
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1");
        //consumer.setVipChannelEnabled(false);
        //同樣也要設定NameServer地址
        consumer.setNamesrvAddr("119.23.211.22:9876");

        //這裡設定的是一個consumer的消費策略
        //CONSUME_FROM_LAST_OFFSET 預設策略,從該佇列最尾開始消費,即跳過歷史訊息
        //CONSUME_FROM_FIRST_OFFSET 從佇列最開始開始消費,即歷史訊息(還儲存在broker的)全部消費一遍
        //CONSUME_FROM_TIMESTAMP 從某個時間點開始消費,和setConsumeTimestamp()配合使用,預設是半個小時以前
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        //設定consumer所訂閱的Topic和Tag,*代表全部的Tag
        consumer.subscribe("TopicTest", "*");

        //設定一個Listener,主要進行訊息的邏輯處理
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {

                System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
                System.out.println("----------------------------------------------------------------------------------");
                //返回消費狀態
                //CONSUME_SUCCESS 消費成功
                //RECONSUME_LATER 消費失敗,需要稍後重新消費
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //呼叫start()方法啟動consumer
        consumer.start();
        System.out.println("Consumer Started.");
    }
}

測試結果:先啟動Consumer,接受到Producer傳送的訊息會將詳細打印出來

Consumer Started.
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=3, storeSize=176, queueOffset=1352, sysFlag=0, bornTimestamp=1525920532015, bornHost=/115.236.50.15:59835, storeTimestamp=1525920532023, storeHost=/119.23.211.22:10911, msgId=7717D31600002A9F00000000000EDF4C, commitLogOffset=974668, bodyCRC=1774740973, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1353, CONSUME_START_TIME=1525920532107, UNIQ_KEY=0A15211928C018B4AAC230AB4A2E0000, WAIT=true, TAGS=TagA}, body=14]]]
----------------------------------------------------------------------------------

SendResult [sendStatus=SEND_OK, msgId=0A152119388C18B4AAC230AE98E10000, offsetMsgId=7717D31600002A9F00000000000EE15C, messageQueue=MessageQueue [topic=TopicTest, brokerName=broker-a, queueId=1], queueOffset=1353]
send success

4. 遇到的錯誤:

  • broker啟動ip錯誤
    這裡寫圖片描述
    這裡如果你之間安裝過docker環境,broker啟動時,如果沒有設定brokerIP1的IP地址,也有可能會通過docker的虛擬ip啟動,導致遠端無法連線。

本文參考文章地址如下:

參考地址