1. 程式人生 > >使用RocketMQ 和 Netty WebSocket 搭建一個訊息伺服器 (一)

使用RocketMQ 和 Netty WebSocket 搭建一個訊息伺服器 (一)

      專案要做一個訊息伺服器,最近瘋狂百度查詢資料終於搭建完事兒了. 剛開始一臉懵逼,學習了這麼久總算是入門兒奮鬥

因為只是一個訊息伺服器,業務不算太複雜,所以我使用spring boot 來做(省事!)

spring boot 的建立在這裡就不發了,我想寫的是搭建的思路,因為程式碼網上鋪天蓋地的多的是.

首先我們需要一個訊息生產者(Producer)--生產者將訊息傳送到消費者(netty websocket),--然後再由websocket去

分發接受到的訊息.

注意: 安裝rocketmq 是要先配置環境變數的.(配置網上查);

        首先要開啟rocketmq 的兩個服務nameserver(老版本叫這個,新的叫mqnamesrv.cmd) 

在dos窗口裡到安裝的bin目錄下輸入start mqnamesrv.cmd 啟動 , 

然後在輸入命令: start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true 啟動broker

生產者:

1:  DefaultMQProducer producer = new DefaultMQProducer("隨便起");  //生產者組名

2:  producer.setNamesrvAddr("localhost:9876");   //指定nameServer的ip和埠;

3:   producer.start();    //Producer物件在使用之前必須要呼叫start初始化,初始化一次即可

4:    Message message = new Message("topic","tag","keys","訊息內容".getbytes());    //建立訊息物件

        topic: 一個組裡面可以有多個,(可以理解為一個主題);

        tag: 一個topic又分為多個tag.(一個主題又有多個單元);

        keys: 

5:    producer.send(message);    //傳送訊息

6:    producer.shutdown();    清理髮送過的訊息

消費方:

1:   DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("組名");    //監聽組名得是生產者的組名

2:   consumer .setNamesrvAddr("localhost:9876");   //指定nameServer的ip和埠;

3:   consumer.subscribe("topic", "tag");    //訂閱topic下為tag的訊息   如果第二個引數為 "*",即topic下的所有訊息.

4:    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);    //設定第一次啟動消費的方式.  (見名知意,從第一次的訊息開始消費,還可以選擇從最後一個訊息消費)

5:    ConsumerListener consumerListener = new ConsumerListener();    //建立監聽器

        consumer.registerMessageListener(consumerListener);    

        consumer.start();    //只需要start一次就夠了

        注意:   該監聽器是由我們自己寫的實現MessageListenerOrderly介面的實現類.(因為如果寫成內部類的話在專案中啟動它只能監聽一次,就會失效(這是rocketmq的一個坑!!),所以必須分開來寫.).

下面這個是實現MessageListenerOrderly介面的實現類:

public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext context) {
    try {
      // 設定自動提交
      context.setAutoCommit(true);
      for (MessageExt msg : list) {

        System.err.println("接受內容:" + new String(msg.getBody()));

        //在這裡我們可以對接受到的訊息進行各種操作.

      }
    } catch (Exception e) {
      e.printStackTrace();
    }
    return ConsumeOrderlyStatus.SUCCESS;

  }

注意:  如果我們要把生產者訊息放入到spring 容器或者其他快取中,在消費方去注入取出的話是會報空指標的,因為監聽器是比spring容器要先載入的(又是個大坑!). 所以想要快取生產者訊息的時候要注意怎麼設計程式碼能把訊息傳遞給消費方這個得看自己的業務需求來做.總之不能在消費方去注入比如redis不然取不到資料...

簡單的rocketmq 生產者  消費者就完成了. 

///////////////////////下面是關於rocketmq的一些詳細介紹///////////////////////

rocketmq會列印生產者--消費者產生的訊息日誌和程式工作日誌(預設是在本機賬戶的logs資料夾下),我們可以通過對D:\NewWorker\RocketMQ\conf下的4個xml檔案中將所有${user.logs}修改成自定義日誌的路徑.(我的安裝路徑是D:\new...).這裡的是關於生產者的日誌配置資訊.消費者的比較麻煩需要到我們的maven本地倉庫中找到rocketmq-client-3.2.6.jar 這個jar開啟會看到有log4j_rocketmq_client.xml---logback_rocketmq_client.xml兩個配置檔案,修改裡面的${user.logs}改成自定義的日誌路徑替換掉兩個xml就可以了.(如果日誌所在的磁碟空間低於20%專案啟動時會報錯,磁碟無線大的可以忽略).

rocketmq有兩種消費模式: 

叢集消費(常用):

        一個ConsumerGroup中的Consumer例項平均分攤消費訊息。例如某個Topic有9條訊息,其中一個ConsumerGroup有3個例項(可能是3個程序,或者3臺機器),那麼每個例項只消費其中部分,消費完的訊息不能被其他例項消費。

廣播消費:

        一條訊息被多個consumer消費,即使這些consumer屬於同一個ConsumerGroup,訊息也會被ConsumerGroup中的每個Consumer都消費一次,廣播消費中ConsumerGroup概念可以認為在訊息劃分方面無意義。

DefaultMQPushConsumer模式:    訊息傳送者將訊息傳送到Broker,然後Broker主動推送給訂閱了該訊息的消費者。

DefaultMQPullConsumer模式:    訊息傳送者將訊息傳送到Broker上,然後由訊息消費者自發的向Broker拉取訊息。

consumer裡面可以設定很多資訊點選DefaultMQPushConsumer或者DefaultMQPullConsumer進入原始碼可以發現:

比如執行緒數量 , 如果是多使用者推送訊息它會自動的開啟執行緒來執行分發訊息.訊息一旦消費成功會返回ConsumeOrderlyStatus.SUCCESS標誌,之前消費的資訊會被清空掉,否則會一直在佇列中等待.

就先寫這麼多吧,第一次純手工打造的,寫的不好但希望能幫助到更多有需要的人!還有好多內容暫時沒想出來就先到這吧,基本的流程是能跑通的.還得趕到Netty websocket場呢. 再見