RocketMQ簡介、環境搭建
阿新 • • 發佈:2019-01-06
vim /etc/hosts 127.0.0.1 rocketmq-nameserver1 127.0.0.1 rocketmq-master1 127.0.0.1 rocketmq-master1-slave tar -zxvf alibaba-rocketmq-3.2.6.tar.gz -C /usr/local/ mv alibaba-rocketmq alibaba-rocketmq-3.2.6 ln -s alibaba-rocketmq-3.2.6/ rocketmq mkdir /usr/local/rocketmq/store mkdir /usr/local/rocketmq/store/commitlog mkdir /usr/local/rocketmq/store/consumequeue mkdir /usr/local/rocketmq/store/index mkdir /usr/local/rocketmq/store/checkpoint cd /usr/local/rocketmq/conf/2m-noslave vim broker-a.properties #所屬叢集名字 #一:(4個節點的叢集名字要相同) brokerClusterName=rocketmq-cluster #broker名字,注意此處不同的配置檔案填寫的不一樣 #二:(如果是broker-a.properties和broker-a-s.properties檔案的話修改為broker-a,如果是broker-b.properties和broker-b-s.properties修改為broker-b) brokerName=broker-a #0 表示 Master,>0 表示 Slave #(0 表示 Master,>0 表示 Slave) brokerId=0 #nameServer地址,分號分割 #三:(有幾個節點就應該有幾個nameserver使用";"隔開) namesrvAddr=rocketmq-nameserver1:9876 #在傳送訊息時,自動建立伺服器不存在的topic,預設建立的佇列數 defaultTopicQueueNums=4 #是否允許 Broker 自動建立Topic,建議線下開啟,線上關閉 autoCreateTopicEnable=true #是否允許 Broker 自動建立訂閱組,建議線下開啟,線上關閉 autoCreateSubscriptionGroup=true #Broker 對外服務的監聽埠 listenPort=10911 #刪除檔案時間點,預設凌晨 4點 deleteWhen=04 #檔案保留時間,預設 48 小時 fileReservedTime=120 #commitLog每個檔案的大小預設1G mapedFileSizeCommitLog=1073741824 #ConsumeQueue每個檔案預設存30W條,根據業務情況調整 mapedFileSizeConsumeQueue=300000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #檢測物理檔案磁碟空間 diskMaxUsedSpaceRatio=88 #儲存路徑 storePathRootDir=/usr/local/rocketmq/store #commitLog 儲存路徑 storePathCommitLog=/usr/local/rocketmq/store/commitlog #消費佇列儲存路徑儲存路徑 storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue #訊息索引儲存路徑 storePathIndex=/usr/local/rocketmq/store/index #checkpoint 檔案儲存路徑 storeCheckpoint=/usr/local/rocketmq/store/checkpoint #abort 檔案儲存路徑 abortFile=/usr/local/rocketmq/store/abort #限制的訊息大小 maxMessageSize=65536 #flushCommitLogLeastPages=4 #flushConsumeQueueLeastPages=2 #flushCommitLogThoroughInterval=10000 #flushConsumeQueueThoroughInterval=60000 #Broker 的角色 #- ASYNC_MASTER 非同步複製Master #- SYNC_MASTER 同步雙寫Master #- SLAVE #四:(如果是broker-a-s.properties檔案和broker-b-s.properties檔案的話角色發生變化變為SLAVE) brokerRole=ASYNC_MASTER #刷盤方式 #- ASYNC_FLUSH 非同步刷盤 #- SYNC_FLUSH 同步刷盤 flushDiskType=ASYNC_FLUSH #checkTransactionMessageEnable=false #發訊息執行緒池數量 #sendMessageThreadPoolNums=128 #拉訊息執行緒池數量 #pullMessageThreadPoolNums=128 mkdir -p /usr/local/rocketmq/logs cd /usr/local/rocketmq/conf && sed -i 's#${user.home}#/usr/local/rocketmq#g' *.xml
vim /usr/local/rocketmq/bin/runserver.sh JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=128m -XX:MaxPermSize=320m" vim /usr/local/rocketmq/bin/runbroker.sh JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=128m -XX:MaxPermSize=320m" cd /usr/local/rocketmq/bin/ nohup sh mqnamesrv & jps tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/namesrv.log nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-2s-noslave/broker-a.properties >/dev/null 2>&1 &
yum install wget wget http://www-us.apache.org/dist/rocketmq/4.2.0/rocketmq-all-4.2.0-bin-release.zip unzip rocketmq-all-4.2.0-bin-release.zip -d rocketmq-4.2.0 cp -r rocketmq-4.2.0 /usr/local/ cd rocketmq-4.2.0/ nohup sh bin/mqnamesrv & vim bin/runserver.sh JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m" vim bin/runbroker.sh JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m" vim bin/tools.sh JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:PermSize=128m -XX:MaxPermSize=128m" nohup sh bin/mqnamesrv & tail -f ~/logs/rocketmqlogs/namesrv.log // 檢視namaserver日誌 jps nohup sh bin/mqbroker -n localhost:9876 & vim /usr/local/rocketmq-4.2.0/conf/broker.conf namesrvAddr=localhost:9876 brokerIP1=localhost nohup sh bin/mqbroker -n localhost:9876 autoCreateTopicEnable=true -c /usr/local/rocketmq-4.2.0/conf/broker.conf & tail -f ~/logs/rocketmqlogs/broker.log
sh mqshutdown broker
sh mqshutdown namesrv
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.aztech.RocketMQ</groupId>
<artifactId>RocketMQ</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.2.0</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.29</version>
</dependency>
</dependencies>
</project>
package com.aztech.test;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
//宣告並初始化一個consumer
//需要一個consumer group名字作為構造方法的引數,這裡為consumer1
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1");
//consumer.setVipChannelEnabled(false);
//同樣也要設定NameServer地址
consumer.setNamesrvAddr("localhost:9876");
//這裡設定的是一個consumer的消費策略
//CONSUME_FROM_LAST_OFFSET 預設策略,從該佇列最尾開始消費,即跳過歷史訊息
//CONSUME_FROM_FIRST_OFFSET 從佇列最開始開始消費,即歷史訊息(還儲存在broker的)全部消費一遍
//CONSUME_FROM_TIMESTAMP 從某個時間點開始消費,和setConsumeTimestamp()配合使用,預設是半個小時以前
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//設定consumer所訂閱的Topic和Tag,*代表全部的Tag
consumer.subscribe("TopicTest", "*");
//設定一個Listener,主要進行訊息的邏輯處理
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
System.out.println("----------------------------------------------------------------------------------");
//返回消費狀態
//CONSUME_SUCCESS 消費成功
//RECONSUME_LATER 消費失敗,需要稍後重新消費
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//呼叫start()方法啟動consumer
consumer.start();
System.out.println("Consumer Started.");
}
}
package com.aztech.test;
import java.io.UnsupportedEncodingException;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
// 宣告並初始化一個producer
// 需要一個producer group名字作為構造方法的引數,這裡為producer1
DefaultMQProducer producer = new DefaultMQProducer("producer1");
producer.setVipChannelEnabled(false);
// 設定NameServer地址,此處應改為實際NameServer地址,多個地址之間用;分隔
// NameServer的地址必須有
// producer.setClientIP("localhost");
// producer.setInstanceName("Producer");
producer.setNamesrvAddr("localhost:9876");
// 呼叫start()方法啟動一個producer例項
producer.start();
// 傳送1條訊息到Topic為TopicTest,tag為TagA,訊息內容為“Hello RocketMQ”拼接上i的值
try {
// 封裝訊息
Message msg = new Message("TopicTest",// topic
"TagA",// tag
("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET)// body
);
// 呼叫producer的send()方法傳送訊息
// 這裡呼叫的是同步的方式,所以會有返回結果
SendResult sendResult = producer.send(msg);
// 列印返回結果
System.out.println(sendResult);
} catch (RemotingException e) {
e.printStackTrace();
} catch (MQBrokerException e) {
e.printStackTrace();
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
//傳送完訊息之後,呼叫shutdown()方法關閉producer
System.out.println("send success");
producer.shutdown();
}
}
tar -zxvf apache-tomcat-9.0.2.tar.gz -C /usr/local/
mkdir rocketmq-console
https://github.com/duomu/rocketmq-console
unzip rocketmq.war -d rocketmq-console/
cd /usr/local/apache-tomcat-9.0.2/webapps/rocketmq/WEB-INF/classes
vim config.properties
rocketmq.namesrv.addr=127.0.0.1:9876
bin/startup.sh
tail -f -n 500 ../logs/catalina.out
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
// 宣告並初始化一個producer
// 需要一個producer group名字作為構造方法的引數,這裡為producer1
DefaultMQProducer producer = new DefaultMQProducer("quickstart_producer");
producer.setNamesrvAddr("localhost:9876");
producer.start();
for (int i = 0; i < 5; i++) {
try {
Message msg = new Message("TopicQuickStart",// topic
"TagA",// tag
("Hello RocketMQ " + i).getBytes()// body
);
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
}
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("quickstart_consumer");
consumer.setNamesrvAddr("localhost:9876");
/**
* 設定Consumer第一次啟動是從佇列頭部開始消費還是佇列尾部開始消費
* 如果非第一次啟動,那麼按照上次消費的位置繼續消費
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicQuickStart", "*");
//設定一個Listener,主要進行訊息的邏輯處理
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
try {
for (MessageExt msg : msgs) {
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(),"UTF-8");
String tags = msg.getTags();
System.out.println("收到訊息: " + " topic :" + topic + " ,tags :" + tags + " ,msg :" + msgBody);
}
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
//System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
//System.out.println("----------------------------------------------------------------------------------");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//呼叫start()方法啟動consumer
consumer.start();
System.out.println("Consumer Started.");
}
}