1. 程式人生 > >RocketMQ部署

RocketMQ部署

off -xms ase public ice 開始 exc exce 如果

1.解壓後用maven編譯

unzip rocketmq-all-4.3.0-source-release.zip
cd rocketmq-all-4.3.0/
mvn -Prelease-all -DskipTests clean install -U
cd distribution/target/apache-rocketmq
cd rocketmq-all-4.3.0/distribution/target/apache-rocketmq

2.修改內存

vim /bin/runbroker.sh
vim /bin/runserver.sh
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m"

3.啟動名稱服務器

nohup sh bin/mqnamesrv -n 192.168.121.130:9876 autoCreateTopicEnable=true&
tail -f ~/logs/rocketmqlogs/namesrv.log

4.啟動經紀人

nohup sh bin/mqbroker -n 192.168.121.130:9876 autoCreateTopicEnable=true&
tail -f ~/logs/rocketmqlogs/broker.log

5.發送接收測試

export NAMESRV_ADDR=localhost:9876
sh bin
/tools.sh org.apache.rocketmq.example.quickstart.ProducerSendResult [sendStatus=SEND_OK] sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

6.關閉服務器

sh bin/mqshutdown broker
sh bin/mqshutdown namesrv

7.java測試

<dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.1.0-incubating</version>
</dependency>


package rocketmq; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.junit.Test; import java.util.List; public class RocketMQTest { @Test public void mqSendTest() throws InterruptedException, MQClientException { DefaultMQProducer producer = new DefaultMQProducer("producer_demo"); //指定NameServer地址 //producer.setNamesrvAddr("10.125.20.39:9876"); //修改為自己的 producer.setNamesrvAddr("192.168.121.130:9876"); //修改為自己的 /** * Producer對象在使用之前必須要調用start初始化,初始化一次即可 * 註意:切記不可以在每次發送消息時,都調用start方法 */ producer.start(); for (int i = 0; i < 997892; i++) { try { //構建消息 Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("測試RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET) ); //發送同步消息 SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } catch (Exception e) { e.printStackTrace(); Thread.sleep(1000); } } producer.shutdown(); } @Test public void mqConsumer() throws MQClientException { /** * Consumer Group,非常重要的概念,後續會慢慢補充 */ DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_demo"); //指定NameServer地址,多個地址以 ; 隔開 consumer.setNamesrvAddr("10.125.20.39:9876"); //修改為自己的 /** * 設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費 * 如果非第一次啟動,那麽按照上次消費的位置繼續消費 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { for (MessageExt msg : msgs) { String msgbody = new String(msg.getBody(), "utf-8"); System.out.println(" MessageBody: " + msgbody);//輸出消息內容 } } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍後再試 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消費成功 } }); consumer.start(); System.out.printf("Consumer Started.%n"); } }

中文名:惠凡

博客名:淹死的魚o0

轉載時請說明出處:http://www.cnblogs.com/huifan/

RocketMQ部署