1. 程式人生 > >JAVA程式碼之RocketMQ生產和消費資料

JAVA程式碼之RocketMQ生產和消費資料

一、啟動RocketMQ

[[email protected] ~]#cat /etc/hosts

# Do not remove the following line, or various programs

# that require network functionality will fail.

127.0.0.1               localhost.localdomain localhost

::1             localhost6.localdomain6 localhost6

192.168.1.106  node1

192.168.1.103  master

192.168.1.110  node2

[[email protected] ~]#cd /opt/alibaba-rocketmq/bin/

[[email protected] bin]#cat play.sh 

#!/bin/sh

#

# Name Server

#

nohup sh mqnamesrv > ns.log 2>&1 &

#

# Service Addr

#

ADDR=`hostname -i`:9876

#

# Broker

#

nohup sh mqbroker -n ${ADDR} > bk.log 2>&1 &

echo "Start Name Server and Broker Successfully, ${ADDR}"

[[email protected] bin]#sh play.sh 

Start Name Server and Broker Successfully, 192.168.1.103:9876

[[email protected] bin]#sh mqadmin topicList -n 192.168.1.103:9876

BenchmarkTest

DefaultCluster

SELF_TEST_TOPIC

%RETRY%please_rename_unique_group_name_4

TBW102

gaojingsong

master

OFFSET_MOVED_EVENT

[[email protected] bin]# cd ../

備註:此時topic不存在,但是生產資料的時候會自動建立

二、生產和消費資料

生產:下載

package cn.cn.mq.demo;

import java.util.concurrent.TimeUnit;

import com.alibaba.rocketmq.client.exception.MQClientException;

import com.alibaba.rocketmq.client.producer.DefaultMQProducer;

import com.alibaba.rocketmq.client.producer.SendResult;

import com.alibaba.rocketmq.common.message.Message;

public class Producer {

   public static void main(String[] args) throws MQClientException,

         InterruptedException{

      /**

       * 一個應用建立一個Producer,由應用來維護此物件,可以設定為全域性物件或者單例<br>

       * 注意:ProducerGroupName需要由應用來保證唯一<br>

       * ProducerGroup這個概念傳送普通的訊息時,作用不大,但是傳送分散式事務訊息時,比較關鍵,

       * 因為伺服器會回查這個Group下的任意一個Producer

       */

      final DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");

      producer.setNamesrvAddr("192.168.1.103:9876");

      producer.setInstanceName("Producer");

      /**

       * Producer物件在使用之前必須要呼叫start初始化,初始化一次即可<br>

       * 注意:切記不可以在每次傳送訊息時,都呼叫start方法

       */

      producer.start();

      /**

       * 下面這段程式碼表明一個Producer物件可以傳送多個topic,多個tag的訊息。

       * 注意:send方法是同步呼叫,只要不拋異常就標識成功。但是傳送成功也可會有多種狀態,<br>

       * 例如訊息寫入Master成功,但是Slave不成功,這種情況訊息屬於成功,但是對於個別應用如果對訊息可靠性要求極高,<br>

       * 需要對這種情況做處理。另外,訊息可能會存在傳送失敗的情況,失敗重試由應用來處理。

       */

      for (int i = 0; i < 3; i++){

         try {

            {

                Message msg = new Message("TopicTest1",// topic

                      "TagA",// tag

                      "OrderID001",// key

                      ("我的名字是程式設計師:"+i).getBytes());// body

                SendResult sendResult = producer.send(msg);

                System.out.println(sendResult);

            }

            {

                Message msg = new Message("TopicTest1",// topic

                      "TagC",// tag

                      "OrderID001",// key

                      ("我來測試RocketMQ:"+i).getBytes());// body

                SendResult sendResult = producer.send(msg);

                System.out.println(sendResult);

            }

         }catch(Exception e) {

            e.printStackTrace();

         }

         TimeUnit.MILLISECONDS.sleep(4000);

      }

      /**

       * 應用退出時,要呼叫shutdown來清理資源,關閉網路連線,從MetaQ伺服器上登出自己

       * 注意:我們建議應用在JBOSS、Tomcat等容器的退出鉤子裡呼叫shutdown方法

       */

//    producer.shutdown();

      Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {

         public void run() {

            producer.shutdown();

         }

      }));

      System.exit(0);

   }

}

消費:下載

package cn.cn.mq.demo;

import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;

import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;

import com.alibaba.rocketmq.client.exception.MQClientException;

import com.alibaba.rocketmq.common.message.MessageExt;

public class PushConsumer {

/**

* 當前例子是PushConsumer用法,使用方式給使用者感覺是訊息從RocketMQ伺服器推到了應用客戶端。<br>

* 但是實際PushConsumer內部是使用長輪詢Pull方式從MetaQ伺服器拉訊息,然後再回呼叫戶Listener方法<br>

*/

public static void main(String[] args) throws InterruptedException,

MQClientException {

/**

* 一個應用建立一個Consumer,由應用來維護此物件,可以設定為全域性物件或者單例<br>

* 注意:ConsumerGroupName需要由應用來保證唯一

*/

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(

"ConsumerGroupName");

consumer.setNamesrvAddr("192.168.1.103:9876");

consumer.setInstanceName("Consumber");

/**

* 訂閱指定topic下tags分別等於TagA或TagC或TagD

*/

consumer.subscribe("TopicTest1", "TagA || TagC || TagD");

/**

* 訂閱指定topic下所有訊息<br>

* 注意:一個consumer物件可以訂閱多個topic

*/

//consumer.subscribe("TopicTest2", "*");

consumer.registerMessageListener(new MessageListenerConcurrently() {

public ConsumeConcurrentlyStatus consumeMessage(

List<MessageExt> msgs, ConsumeConcurrentlyContext context) {

System.out.println(Thread.currentThread().getName()

+ " Receive New Messages: " + msgs.size());

MessageExt msg = msgs.get(0);

if (msg.getTopic().equals("TopicTest1")) {

// 執行TopicTest1的消費邏輯

if (msg.getTags() != null // 執行TagA的消費

&& msg.getTags().equals("TagA")) {

System.out.println("TagA:"+new String(msg.getBody()));

} else if (msg.getTags() != null// 執行TagC的消費

&& msg.getTags().equals("TagC")) {

System.out.println("TagC:"+new String(msg.getBody()));

} else if (msg.getTags() != null// 執行TagD的消費

&& msg.getTags().equals("TagD")) {

System.out.println("TagD:"+new String(msg.getBody()));

}

}  

return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

}

});

/**

* Consumer物件在使用之前必須要呼叫start初始化,初始化一次即可<br>

*/

consumer.start();

System.out.println("ConsumerStarted.");

}

}

三、驗證消費結果 下載

[[email protected] bin]# sh mqadmin topicList -n 192.168.1.103:9876

BenchmarkTest

TopicTest1

DefaultCluster

SELF_TEST_TOPIC

%RETRY%please_rename_unique_group_name_4

%RETRY%ConsumerGroupName

TBW102

gaojingsong

master

OFFSET_MOVED_EVENT

[[email protected] bin]#sh mqadmin  topicStatus -n  192.168.1.103:9876 -t TopicTest1

#Broker Name                      #QID  #Min Offset           #Max Offset             #Last Updated

master                            0     0                     4                       2016-10-20 14:38:19,236

master                            1     0                     4                       2016-10-20 14:38:19,243

master                            2     0                     2                       2016-10-20 14:38:15,171

master                            3     0                     2                       2016-10-20 14:38:15,180

[[email protected] bin]# shutdown -h now


 

消費資料



 

四、錯誤解決方案  下載


相關推薦

JAVA程式碼RocketMQ生產消費資料

一、啟動RocketMQ [[email protected] ~]#cat /etc/hosts # Do not remove the following line, or various programs # that require network fun

Java入門 變量常量

java標識符 float clas 數據類型 interface 入門 整數 布爾型 類型轉換 1. Java中的關鍵字 2. 認識Java標識符 3. 變量是什麽 4. 如何命名Java變量 5. Java中的數據類型   5.1 基本數據類型     - 數值型:  

java基礎變量常量、類型轉換

聲明 src .cn ble .com 不能 需要 bsp 字母 一、 變量 變量是可改變的量,每賦個值便會開辟一個新內存地址。 1、首先,變量需要一個聲明,例如:int a,這個a也可以當作是一個標簽,它指向了一個內存地址,這個地址是屬於int類型的套餐,可以通過

小白的java學習路 “ 類對象”

之路 抽象 AS 類和對象 可維護 屬性和方法 style “.” 信息 一.※ 萬物皆對象 二.對象的兩個特征: 屬性:對象具有的各種特征 方法:對象執行的操作 對象:用來描述客觀事物的一個實體,由一組屬性和方法構成 三.

12. Java基礎抽象類接口

pre is-a 解決方案 自動變 有理 void 包括 重寫 跨域 接口和內部類為我們提供了一種將接口與實現分離的更加結構化的方法。 抽象類與接口是java語言中對抽象概念進行定義的兩種機制,正是由於他們的存在才賦予java強大的面向對象的能力。他們兩者之

springboot kafka整合(包括java程式碼不能傳送消費kafka訊息的採坑記錄)

kafka採坑記錄:     1、kafka服務端server.properties中的broker.id叢集內需要唯一。     2、kafka config檔案中listeners和advertised.listeners需要配置本機ip:9092

深入學習Java Scipt作用域閉包

引擎與作用域及編譯器 在傳統的編譯語言的流程中,程式的一段原始碼主要分成三步,統稱為“編譯”   分詞/詞法分析     它的主要作用是將字元組成的字串分解成有意義的程式碼塊,例如:var a=2;者會被分解成“var”,“a”,“=”,

Java程式碼實現順序棧鏈式棧

Java程式碼實現順序棧和鏈式棧 棧(stack)又名堆疊,它是一種運算受限的線性表。其限制是僅允許在表的一端進行插入或者刪除運算。後進先出(Last In First Out)。 棧中的資料操作主要有push(壓入)和pop(彈出)操作。 實際上,棧就可以用陣列來實現,也可

初學Java IO使用FileInputStreamFileReader讀取檔案 四十一

import java.io.*; public class FileInputStreamTest { public static void main(String[] args) throws IOException { //建立位元組輸入流 FileInputStream fis =

初學Java IO使用FileInputStreamFileReader讀取文件 四十一

ava sys 字符串 imp ont new ati args fileread import java.io.*; public class FileInputStreamTest { public static void main(String[] args) t

java圖片圖片擷取圖片壓縮

/** * 圖片壓縮 * @param filePath 原檔案路徑 * @param w 壓縮後寬度 * @param h 壓縮後高度 * @return */ public static String imgCompress(String filePath, int w,

JAVA基礎設計模式列舉

列舉 列舉是JDK1.5版本新增的特性(泛型、For-each等如今被廣泛應用的特性也是由JDK1.5時所新增的),另外到了JDK1.6後switch語句支援列舉型別; 列舉的使用情況: 有的時候一個類的物件是有限且固定的,這種情況下我們使用列舉類就比較方便; 列舉就是將

死磕Java系列GUI 元件事件監聽

當我們搭建好圖形介面窗體的時候,需要向窗體中加入各種元件,便於使用者操作,使用者在窗體中都會有哪些操作呢?窗體是程式與使用者可互動的介面,比如說登入介面,使用者可以輸入姓名和密碼,點選登入按鈕,登入到另一個介面,在下一個介面有更多的操作,比如說有多個選擇組成的選單,需要輸入文字的文字框,用來

Java程式碼JDBC實現資料庫之間定時的表格傳輸(由一個庫讀取到另一個庫)例項,親測有效

package com.openup.system.service.imp;import java.sql.Connection;import java.sql.DriverManager;import java.sql.PreparedStatement;import ja

通過java程式碼獲取jvm資訊系統資訊

轉載自LOC_Thomas的部落格 前言 隨著微服務的概念逐漸流行,監控成了必不可少的模組,本篇文章主要介紹一下如何通過java程式碼獲得一些核心的資料,方便從各個方面對應用進行監控 獲取jvm資料 jvm資料是監控應用很重要的一系列引數,一般本地開發的時候可以通過jcons

資料結構java《棧佇列》

1、棧。(Android的Activity載入是基礎棧結構的)底層使用陣列實現package ch4; /** * 棧 * @author Howard * 特點: * 1、通常情況作為程式設計

高效能Java程式碼 記憶體管理

http://soft.chinabyte.com/database/8/11981008.shtml     更甚者你寫的程式碼,GC根本就回收不了,直接系統掛掉。GC是一段程式,不是智慧,他只回收他認為的垃圾,而不是回收你認為的垃圾。    GC垃圾回收:    Gr

Kafka 使用Java實現資料的生產消費demo

前言 在上一篇中講述如何搭建kafka叢集,本篇則講述如何簡單的使用 kafka 。不過在使用kafka的時候,還是應該簡單的瞭解下kafka。 Kafka的介紹 Kafka是一種高吞吐量的分散式釋出訂閱訊息系統,它可以處理消費者規模的網站中的所有動作流資

Java程式碼呼叫儲存過程儲存方法

準備一個oracle 的JDBC jar 包:ojdbc14_11g.jar 首先找到你的 oracle 安裝位置,例如: 1.建立一個JDBC資料庫連線工具類: package com.test

Java併發AQS用法原始碼分析

概念 AQS:佇列同步器AbstractQueuedSynchronizer(以下簡稱同步器),是用來構建鎖或者其他同步元件的基礎框架,許多同步器可以通過AQS很容易的並且高效的構建出來。不僅RenntrantLock和Semaphore是基於AQS構建的,還包括CountDownLat