Java訊息佇列--ActiveMq筆記
阿新 • • 發佈:2018-11-09
1、下載安裝ActiveMQ
ActiveMQ官網下載地址:http://activemq.apache.org/download.html
ActiveMQ 提供了Windows 和Linux、Unix 等幾個版本,樓主這裡選擇了Linux 版本下進行開發。
下載完安裝包,解壓之後的目錄:
從它的目錄來說,還是很簡單的:
-
- bin存放的是指令碼檔案
- conf存放的是基本配置檔案
- data存放的是日誌檔案
- docs存放的是說明文件
- examples存放的是簡單的例項
- lib存放的是activemq所需jar包
- webapps用於存放專案的目錄
2、啟動ActiveMQ
進入到ActiveMQ 安裝目錄的Bin 目錄,linux 下輸入 ./activemq start 啟動activeMQ 服務。
輸入命令之後,會提示我們建立了一個程序IP 號,這時候說明服務已經成功啟動了。
ActiveMQ預設啟動時,啟動了內建的jetty伺服器,提供一個用於監控ActiveMQ的admin應用。
admin:http://127.0.0.1:8161/admin/
我們在瀏覽器開啟連結之後輸入賬號密碼(這裡和tomcat 伺服器類似)
預設賬號:admin
密碼:admin
到這裡為止,ActiveMQ 服務端就啟動完畢了。
ActiveMQ 在linux 下的終止命令是 ./activemq stop
3、建立一個ActiveMQ工程
專案目錄結構:
上述在官網下載ActiveMq 的時候,我們可以在目錄下看到一個jar包:
這個jar 包就是我們需要在專案中進行開發中使用到的相關依賴。
3.1 建立生產者
public class Producter { //ActiveMq 的預設使用者名稱 private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; //ActiveMq 的預設登入密碼 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; //ActiveMQ 的連結地址 private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL; AtomicInteger count = new AtomicInteger(0); //連結工廠 ConnectionFactory connectionFactory; //連結物件 Connection connection; //事務管理 Session session; ThreadLocal<MessageProducer> threadLocal = new ThreadLocal<>(); public void init(){ try { //建立一個連結工廠 connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL); //從工廠中建立一個連結 connection = connectionFactory.createConnection(); //開啟連結 connection.start(); //建立一個事務(這裡通過引數可以設定事務的級別) session = connection.createSession(true,Session.SESSION_TRANSACTED); } catch (JMSException e) { e.printStackTrace(); } } public void sendMessage(String disname){ try { //建立一個訊息佇列 Queue queue = session.createQueue(disname); //訊息生產者 MessageProducer messageProducer = null; if(threadLocal.get()!=null){ messageProducer = threadLocal.get(); }else{ messageProducer = session.createProducer(queue); threadLocal.set(messageProducer); } while(true){ Thread.sleep(1000); int num = count.getAndIncrement(); //建立一條訊息 TextMessage msg = session.createTextMessage(Thread.currentThread().getName()+ "productor:我是大帥哥,我現在正在生產東西!,count:"+num); System.out.println(Thread.currentThread().getName()+ "productor:我是大帥哥,我現在正在生產東西!,count:"+num); //傳送訊息 messageProducer.send(msg); //提交事務 session.commit(); } } catch (JMSException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
3.2 建立消費者
public class Comsumer { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL; ConnectionFactory connectionFactory; Connection connection; Session session; ThreadLocal<MessageConsumer> threadLocal = new ThreadLocal<>(); AtomicInteger count = new AtomicInteger(); public void init(){ try { connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL); connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); } catch (JMSException e) { e.printStackTrace(); } } public void getMessage(String disname){ try { Queue queue = session.createQueue(disname); MessageConsumer consumer = null; if(threadLocal.get()!=null){ consumer = threadLocal.get(); }else{ consumer = session.createConsumer(queue); threadLocal.set(consumer); } while(true){ Thread.sleep(1000); TextMessage msg = (TextMessage) consumer.receive(); if(msg!=null) { msg.acknowledge(); System.out.println(Thread.currentThread().getName()+": Consumer:我是消費者,我正在消費Msg"+msg.getText()+"--->"+count.getAndIncrement()); }else { break; } } } catch (JMSException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
4、執行ActiveMQ專案
4.1 生產者開始生產訊息
public class TestMq { public static void main(String[] args){ Producter producter = new Producter(); producter.init(); TestMq testMq = new TestMq(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } //Thread 1 new Thread(testMq.new ProductorMq(producter)).start(); //Thread 2 new Thread(testMq.new ProductorMq(producter)).start(); //Thread 3 new Thread(testMq.new ProductorMq(producter)).start(); //Thread 4 new Thread(testMq.new ProductorMq(producter)).start(); //Thread 5 new Thread(testMq.new ProductorMq(producter)).start(); } private class ProductorMq implements Runnable{ Producter producter; public ProductorMq(Producter producter){ this.producter = producter; } @Override public void run() { while(true){ try { producter.sendMessage("Jaycekon-MQ"); Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
執行結果:
INFO | Successfully connected to tcp://localhost:61616 Thread-6productor:我是大帥哥,我現在正在生產東西!,count:0 Thread-4productor:我是大帥哥,我現在正在生產東西!,count:1 Thread-2productor:我是大帥哥,我現在正在生產東西!,count:3 Thread-5productor:我是大帥哥,我現在正在生產東西!,count:2 Thread-3productor:我是大帥哥,我現在正在生產東西!,count:4 Thread-6productor:我是大帥哥,我現在正在生產東西!,count:5 Thread-3productor:我是大帥哥,我現在正在生產東西!,count:6 Thread-5productor:我是大帥哥,我現在正在生產東西!,count:7 Thread-2productor:我是大帥哥,我現在正在生產東西!,count:8 Thread-4productor:我是大帥哥,我現在正在生產東西!,count:9 Thread-6productor:我是大帥哥,我現在正在生產東西!,count:10 Thread-3productor:我是大帥哥,我現在正在生產東西!,count:11 Thread-5productor:我是大帥哥,我現在正在生產東西!,count:12 Thread-2productor:我是大帥哥,我現在正在生產東西!,count:13 Thread-4productor:我是大帥哥,我現在正在生產東西!,count:14 Thread-6productor:我是大帥哥,我現在正在生產東西!,count:15 Thread-3productor:我是大帥哥,我現在正在生產東西!,count:16 Thread-5productor:我是大帥哥,我現在正在生產東西!,count:17 Thread-2productor:我是大帥哥,我現在正在生產東西!,count:18 Thread-4productor:我是大帥哥,我現在正在生產東西!,count:19
4.2 消費者開始消費訊息
public class TestConsumer { public static void main(String[] args){ Comsumer comsumer = new Comsumer(); comsumer.init(); TestConsumer testConsumer = new TestConsumer(); new Thread(testConsumer.new ConsumerMq(comsumer)).start(); new Thread(testConsumer.new ConsumerMq(comsumer)).start(); new Thread(testConsumer.new ConsumerMq(comsumer)).start(); new Thread(testConsumer.new ConsumerMq(comsumer)).start(); } private class ConsumerMq implements Runnable{ Comsumer comsumer; public ConsumerMq(Comsumer comsumer){ this.comsumer = comsumer; } @Override public void run() { while(true){ try { comsumer.getMessage("Jaycekon-MQ"); Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } } } } }
執行結果:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
INFO | Successfully connected to tcp:
//localhost:61616
Thread-2: Consumer:我是消費者,我正在消費MsgThread-5productor:我是大帥哥,我現在正在生產東西!,count:4--->0
Thread-3: Consumer:我是消費者,我正在消費MsgThread-4productor:我是大帥哥,我現在正在生產東西!,count:36--->1
Thread-4: Consumer:我是消費者,我正在消費MsgThread-3productor:我是大帥哥,我現在正在生產東西!,count:38--->2
Thread-5: Consumer:我是消費者,我正在消費MsgThread-6productor:我是大帥哥,我現在正在生產東西!,count:37--->3
Thread-2: Consumer:我是消費者,我正在消費MsgThread-6productor:我是大帥哥,我現在正在生產東西!,count:2--->4
Thread-3: Consumer:我是消費者,我正在消費MsgThread-5productor:我是大帥哥,我現在正在生產東西!,count:40--->5
Thread-4: Consumer:我是消費者,我正在消費MsgThread-6productor:我是大帥哥,我現在正在生產東西!,count:42--->6
Thread-5: Consumer:我是消費者,我正在消費MsgThread-4productor:我是大帥哥,我現在正在生產東西!,count:41--->7
Thread-2: Consumer:我是消費者,我正在消費MsgThread-3productor:我是大帥哥,我現在正在生產東西!,count:1--->8
Thread-3: Consumer:我是消費者,我正在消費MsgThread-2productor:我是大帥哥,我現在正在生產東西!,count:44--->9
Thread-4: Consumer:我是消費者,我正在消費MsgThread-4productor:我是大帥哥,我現在正在生產東西!,count:46--->10
Thread-5: Consumer:我是消費者,我正在消費MsgThread-5productor:我是大帥哥,我現在正在生產東西!,count:45--->11
Thread-2: Consumer:我是消費者,我正在消費MsgThread-2productor:我是大帥哥,我現在正在生產東西!,count:3--->12
Thread-3: Consumer:我是消費者,我正在消費MsgThread-3productor:我是大帥哥,我現在正在生產東西!,count:48--->13
Thread-4: Consumer:我是消費者,我正在消費MsgThread-5productor:我是大帥哥,我現在正在生產東西!,count:50--->14
Thread-5: Consumer:我是消費者,我正在消費MsgThread-2productor:我是大帥哥,我現在正在生產東西!,count:49--->15
Thread-4: Consumer:我是消費者,我正在消費MsgThread-2productor:我是大帥哥,我現在正在生產東西!,count:54--->16
Thread-2: Consumer:我是消費者,我正在消費MsgThread-5productor:我是大帥哥,我現在正在生產東西!,count:6--->17
Thread-3: Consumer:我是消費者,我正在消費MsgThread-6productor:我是大帥哥,我現在正在生產東西!,count:52--->18
Thread-5: Consumer:我是消費者,我正在消費MsgThread-3productor:我是大帥哥,我現在正在生產東西!,count:53--->19
Thread-4: Consumer:我是消費者,我正在消費MsgThread-3productor:我是大帥哥,我現在正在生產東西!,count:58--->20
|
檢視執行結果,我們可以做ActiveMQ 服務端:http://127.0.0.1:8161/admin/ 裡面的Queues 中檢視我們生產的訊息。
5、ActiveMQ的特性
5.1 ActiveMq 的特性
- 多種語言和協議編寫客戶端。語言: Java, C, C++, C#, Ruby, Perl, Python, PHP。應用協議: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
- 完全支援JMS1.1和J2EE 1.4規範 (持久化,XA訊息,事務)
- 對Spring的支援,ActiveMQ可以很容易內嵌到使用Spring的系統裡面去,而且也支援Spring2.0的特性
- 通過了常見J2EE伺服器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的測試,其中通過JCA 1.5 resource adaptors的配置,可以讓ActiveMQ可以自動的部署到任何相容J2EE 1.4 商業伺服器上
- 支援多種傳送協議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
- 支援通過JDBC和journal提供高速的訊息持久化
- 從設計上保證了高效能的叢集,客戶端-伺服器,點對點
- 支援Ajax
- 支援與Axis的整合
- 可以很容易得呼叫內嵌JMS provider,進行測試
5.2 什麼情況下使用ActiveMQ?
- 多個專案之間整合
(1) 跨平臺
(2) 多語言
(3) 多專案 - 降低系統間模組的耦合度,解耦
(1) 軟體擴充套件性 - 系統前後端隔離
(1) 前後端隔離,遮蔽高安全區