ActionMQ入門經典的生產者消費者
阿新 • • 發佈:2018-12-10
什麼是ActiveMQ ActiveMQ 是Apache出品,最流行的,能力強勁的開源訊息匯流排。ActiveMQ 是一個完全支援JMS1.1和J2EE 1.4規範的 JMS Provider實現,儘管JMS規範出臺已經是很久的事情了,但是JMS在當今的J2EE應用中間仍然扮演著特殊的地位
1.多種語言和協議編寫客戶端。語言: Java,C,C++,C#,Ruby,Perl,Python,PHP。應用協議: OpenWire,Stomp REST,WS Notification,XMPP,AMQP 2.對Spring的支援,ActiveMQ可以很容易內嵌到使用Spring的系統裡面去 3.支援多種傳送協議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA 4.支援Ajax 5.可以很容易的進行測試和使用ActiveMQ的訊息形式
JMS定義了五種不同的訊息正文格式,以及呼叫的訊息型別,允許你傳送並接收以一些不同形式的資料,提供現有訊息格式的一些級別的相容性。 · StreamMessage -- Java原始值的資料流 · MapMessage--一套名稱-值對 · TextMessage--一個字串物件 · ObjectMessage--一個序列化的 Java物件 · BytesMessage--一個位元組的資料流
ActiveMQ的安裝
在maven專案下需要匯入的jar包。
<dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.4.3</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> </dependency> </dependencies>
最後是兩個例項生產者和消費者。
生產者:
@Test
public void product() throws Exception{
//第一步:建立ConnectionFactory物件,需要指定服務端ip及埠號。
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
//第二步:使用ConnectionFactory物件建立一個Connection物件。
Connection con= connectionFactory.createConnection();
//第三步:開啟連線,呼叫Connection物件的start方法。
con.start();
//第四步:使用Connection物件建立一個Session物件。(引數(“是否開啟事務”,“應答模式”))
Session session=con.createSession(false,Session.AUTO_ACKNOWLEDGE);
//第五步:使用Session物件建立一個Queue物件。(引數要和在頁面申請的佇列名保持一致)
Queue queue=session.createQueue("FirstQueue");
//第六步:使用Session物件建立一個Producer物件。
MessageProducer producer=session.createProducer(queue);
//第七步:建立一個Message物件,建立一個TextMessage物件。
TextMessage textMessage=session.createTextMessage("傳送的訊息");
//第八步:使用Producer物件傳送訊息。
producer.send(textMessage);
//第九步:關閉資源。
producer.close();
session.close();
con.close();
}
消費者:
@Test
public void consumer() throws Exception{
//第一步:建立一個ConnectionFactory物件。
ConnectionFactory factory= new ActiveMQConnectionFactory("tcp://localhost:61616");
//第二步:從ConnectionFactory物件中獲得一個Connection物件。
Connection connection=factory.createConnection();
//第三步:開啟連線。呼叫Connection物件的start方法。
connection.start();
//第四步:使用Connection物件建立一個Session物件。
Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//第五步:使用Session物件建立一個佇列Queue。和傳送端保持一致queue,並且佇列的名稱一致。
Queue queue=session.createQueue("FirstQueue");
//第六步:使用Session物件建立一個Consumer物件。
MessageConsumer consumer=session.createConsumer(queue);
//第七步:接收訊息。
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage=(TextMessage) message;
try {
String text=textMessage.getText();
//第八步:列印訊息。
System.out.println(text);
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
});
//一定要加上這句話,程式比監聽器執行快
System.in.read();
//第九步:關閉資源
consumer.close();
session.close();
connection.close();
}