RocketMQ最佳實踐(一)4.0版本/概念介紹/安裝除錯/客戶端demo
阿新 • • 發佈:2019-02-10
為什麼選擇RocketMQ
我們來看看官方回答:
“我們研究發現,對於ActiveMQ而言,隨著越來越多的使用queues和topics,其IO成為了瓶頸。某些情況下,消費者緩慢(消費能力不足)還會拖慢生產者(造成訊息阻塞)。雖然我們做了最大努力進行優化:節流、斷路器或者回退,但是並不能進行優雅的擴充套件。因此我們開始專注於使用時下非常流行的kafka,但是仍然不能滿足我們的要求,如低延遲和高可靠性,詳情見這裡。在這樣的背景下,我們決定開發一個新的訊息中介軟體來處理一系列廣泛的使用場景,包括從傳統的釋出/訂閱場景到高容量的實時交易系統中不允許訊息丟失的場景。”
核心概念
- 生產者(Producer):
- 生產者組(Producer Group):相同角色的生產者被歸為同一組,比如通常情況下一個服務會部署多個例項,這多個例項就是一個組,生產者分組的作用只體現在訊息回查的時候,即如果一個生產者組中的一個生產者例項傳送一個事務訊息到broker後掛掉了,那麼broker會回查此例項所在組的其他例項,從而進行訊息的提交或回滾操作。
- 消費者(Consumer):訊息消費方,從brokers拉取訊息。站在使用者的角度,有以下兩種消費者。
- 主動消費者(PullConsumer):從brokers拉取訊息並消費。
- 被動消費者(PushConsumer):內部也是通過pull方式獲取訊息,只是進行了擴充套件和封裝,並給使用者預留了一個回撥介面去實現,當訊息到底的時候會執行使用者自定義的回撥介面。
- 消費者組(Consumer Group):和生產者組類似。其作用體現在實現消費者的負載均衡和容錯,有了消費者組變得異常容易。需要注意的是:同一個消費者組的每個消費者例項訂閱的主題必須相同。
- 主題(Topic):主題就是訊息傳遞的型別。一個生產者例項可以傳送訊息到多個主題,多個生產者例項也可以傳送訊息到同一個主題。同樣的,對於消費者端來說,一個消費者組可以訂閱多個主題的訊息,一個主題的訊息也可以被多個消費者組訂閱。
- 訊息(Message):訊息就像是你傳遞資訊的信封。每個訊息必須指定一個主題,就好比每個信封上都必須寫明收件人。
- 訊息佇列(Message Queues):在主題內部,邏輯劃分了多個子主題,每個子主題被稱為訊息佇列。這個概念在實現最大併發數、故障切換等功能上有巨大的作用。
- 標籤(Tag):標籤,可以被認為是子主題。通常用於區分同一個主題下的不同作用或者說不同業務的訊息。同時也是避免主題定義過多引起效能問題,通常情況下一個生產者組只向一個主題傳送訊息,其中不同業務的訊息通過標籤或者說子主題來區分。
- 訊息代理(Broker):訊息代理是RockerMQ中很重要的角色。它接收生產者傳送的訊息,進行訊息儲存,為消費者拉取訊息服務。它還儲存訊息消耗相關的元資料,包括消費群體,消費進度偏移和主題/佇列資訊。
- 命名服務(Name Server):命名服務作為路由資訊提供程式。生產者/消費者進行主題查詢、訊息代理查詢、讀取/寫入訊息都需要通過命名服務獲取路由資訊。
- 訊息順序(Message Order):當我們使用DefaultMQPushConsumer時,我們可以選擇使用“orderly”還是“concurrently”。
- orderly:消費訊息的有序化意味著訊息被生產者按照每個訊息佇列傳送的順序消費。如果您正在處理全域性順序為強制的場景,請確保您使用的主題只有一個訊息佇列。注意:如果指定了消費順序,則訊息消費的最大併發性是消費組訂閱的訊息佇列數。
- concurrently:當同時消費時,訊息消費的最大併發僅限於為每個消費客戶端指定的執行緒池。注意:此模式不再保證訊息順序。
安裝與除錯
官方要求的環境:
- 64bit OS, Linux/Unix/Mac is recommended;
- 64bit JDK 1.7+;
- Maven 3.2.x
- Git
我的環境:(我喜歡使用較新的版本)
- CentOS Linux release 7.3.1611;
- 64bit JDK 1.8.0_91;
- apache-maven-3.5.0;
- Git 1.8.3.1
安裝jdk
麻煩各位看官自行搜尋,資料多的嚇人。。。安裝maven
先去官網下載maven然後上傳到安裝目錄,解壓:sudo tar zxvf apache-maven-3.5.0-bin.tar.gz
解壓完成設定環境變數:sudo vi /etc/profile
然後使環境變數生效:source /etc/profile
最後驗證是否安裝成功:mvn -v
安裝Git(so easy)
先檢檢視看是否已經安裝過了:git --version
如果沒有就開始安裝:sudo yum install git
安裝完畢再看看:git --version
下面進行RocketMq安裝
編譯:> git clone https://github.com/apache/incubator-rocketmq.git
> cd incubator-rocketmq
> mvn clean package install -Prelease-all assembly:assembly -U
在執行mvn編譯的時候,你可能會遇到如下的問題: 這是由於沒有許可權建立目錄造成的。所以,要麼你切換到root使用者,要麼使用sudo:> cd target/apache-rocketmq-all
提示:sudo: mvn: command not found。好吧,也是醉了。我們還需要在你當前使用者的Home目錄下的一個隱藏檔案(.bashrc)中新增點東西:sudo mvn clean package install -Prelease-all assembly:assembly -U
> cd ~
新增完成後,執行:source .bashrc 使修改生效。然後再重新執行看看:> sudo vi .bashrc
時間稍微有點長,我的環境用了16分鐘,請看官耐心等待,完成後如下圖:sudo mvn clean package install -Prelease-all assembly:assembly -U
啟動RocketMQ
修改預設配置
由於RocketMQ預設配置要求很高,比如記憶體至少就要4個G,開發除錯環境根本吃不消,所以我們開始啟動前需要先修改這些引數。否則的話,我們很有會遇到記憶體分配或者不夠的問題。修改target/apache-rocketmq-all/bin/runserver.shJAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=128m -XX:MaxPermSize=320m"
修改target/apache-rocketmq-all/bin/runbroker.shJAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m
修改target/apache-rocketmq-all/bin/tools.shJAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=128m -XX:MaxPermSize=128m"
啟動NameServer
進入target/apache-rocketmq-all目錄下
> nohup sh bin/mqnamesrv &
> tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
啟動Broker
> nohup sh bin/mqbroker -n localhost:9876 &
> tail -f ~/logs/rocketmqlogs/broker.log
The broker[%s, 172.17.0.1:10911] boot success...
開放埠
sudo vi /etc/sysconfig/iptables
然後重啟生效:sudo systemctl restart iptables
新增ROCKETMQ_HOME環境變數
sudo vi /etc/profile
source /etc/profile
java客戶端
pom.xml
<rocketmq.version>4.0.0-incubating</rocketmq.version> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>${rocketmq.version}</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-common</artifactId> <version>${rocketmq.version}</version> </dependency>
生產者
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import java.util.concurrent.TimeUnit; public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { /** * 一個應用建立一個Producer,由應用來維護此物件,可以設定為全域性物件或者單例<br> * 注意:ProducerGroupName需要由應用來保證唯一<br> * ProducerGroup這個概念傳送普通的訊息時,作用不大,但是傳送分散式事務訊息時,比較關鍵, * 因為伺服器會回查這個Group下的任意一個Producer */ DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); producer.setNamesrvAddr("192.168.56.101:9876"); producer.setInstanceName("Producer"); producer.setVipChannelEnabled(false); /** * Producer物件在使用之前必須要呼叫start初始化,初始化一次即可<br> * 注意:切記不可以在每次傳送訊息時,都呼叫start方法 */ producer.start(); /** * 下面這段程式碼表明一個Producer物件可以傳送多個topic,多個tag的訊息。 * 注意:send方法是同步呼叫,只要不拋異常就標識成功。但是傳送成功也可會有多種狀態,<br> * 例如訊息寫入Master成功,但是Slave不成功,這種情況訊息屬於成功,但是對於個別應用如果對訊息可靠性要求極高,<br> * 需要對這種情況做處理。另外,訊息可能會存在傳送失敗的情況,失敗重試由應用來處理。 */ for (int i = 0; i < 1; i++) { try { { Message msg = new Message("TopicTest1",// topic "TagA",// tag "OrderID001",// key ("Hello MetaQ").getBytes());// body SendResult sendResult = producer.send(msg); System.out.println(sendResult); } { Message msg = new Message("TopicTest2",// topic "TagB",// tag "OrderID0034",// key ("Hello MetaQ").getBytes());// body SendResult sendResult = producer.send(msg); System.out.println(sendResult); } { Message msg = new Message("TopicTest3",// topic "TagC",// tag "OrderID061",// key ("Hello MetaQ").getBytes());// body SendResult sendResult = producer.send(msg); System.out.println(sendResult); } } catch (Exception e) { e.printStackTrace(); } TimeUnit.MILLISECONDS.sleep(1000); } /** * 應用退出時,要呼叫shutdown來清理資源,關閉網路連線,從MetaQ伺服器上登出自己 * 注意:我們建議應用在JBOSS、Tomcat等容器的退出鉤子裡呼叫shutdown方法 */ producer.shutdown(); } }
消費者
下一篇,我們將進行RocketMQ的叢集部署。 參考: http://rocketmq.incubator.apache.org/docs/motivation/import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; public class PushConsumer { /** * 當前例子是PushConsumer用法,使用方式給使用者感覺是訊息從RocketMQ伺服器推到了應用客戶端。<br> * 但是實際PushConsumer內部是使用長輪詢Pull方式從MetaQ伺服器拉訊息,然後再回呼叫戶Listener方法<br> */ public static void main(String[] args) throws InterruptedException, MQClientException { /** * 一個應用建立一個Consumer,由應用來維護此物件,可以設定為全域性物件或者單例<br> * 注意:ConsumerGroupName需要由應用來保證唯一 */ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer( "ConsumerGroupName"); consumer.setNamesrvAddr("192.168.56.101:9876"); consumer.setInstanceName("Consumber"); /** * 訂閱指定topic下tags分別等於TagA或TagC或TagD */ consumer.subscribe("TopicTest1", "TagA || TagC || TagD"); /** * 訂閱指定topic下所有訊息<br> * 注意:一個consumer物件可以訂閱多個topic */ consumer.subscribe("TopicTest2", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { /** * 預設msgs裡只有一條訊息,可以通過設定consumeMessageBatchMaxSize引數來批量接收訊息 */ @Override public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs.size()); MessageExt msg = msgs.get(0); if (msg.getTopic().equals("TopicTest1")) { // 執行TopicTest1的消費邏輯 if (msg.getTags() != null && msg.getTags().equals("TagA")) { // 執行TagA的消費 System.out.println(new String(msg.getBody())); } else if (msg.getTags() != null && msg.getTags().equals("TagC")) { // 執行TagC的消費 } else if (msg.getTags() != null && msg.getTags().equals("TagD")) { // 執行TagD的消費 } } else if (msg.getTopic().equals("TopicTest2")) { System.out.println(new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); /** * Consumer物件在使用之前必須要呼叫start初始化,初始化一次即可<br> */ consumer.start(); System.out.println("Consumer Started."); } }
https://my.oschina.net/jayronwang/blog/861396