1. 程式人生 > >ActionMQ入門經典的生產者消費者

ActionMQ入門經典的生產者消費者

什麼是ActiveMQ ActiveMQ 是Apache出品,最流行的,能力強勁的開源訊息匯流排。ActiveMQ 是一個完全支援JMS1.1和J2EE 1.4規範的 JMS Provider實現,儘管JMS規範出臺已經是很久的事情了,但是JMS在當今的J2EE應用中間仍然扮演著特殊的地位

1.多種語言和協議編寫客戶端。語言: Java,C,C++,C#,Ruby,Perl,Python,PHP。應用協議: OpenWire,Stomp REST,WS Notification,XMPP,AMQP 2.對Spring的支援,ActiveMQ可以很容易內嵌到使用Spring的系統裡面去 3.支援多種傳送協議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA 4.支援Ajax 5.可以很容易的進行測試和使用ActiveMQ的訊息形式

對於訊息的傳遞有兩種型別: 一種是點對點的,即一個生產者和一個消費者一一對應; 另一種是釋出/訂閱模式,即一個生產者產生訊息並進行傳送後,可以由多個消費者進行接收。

JMS定義了五種不同的訊息正文格式,以及呼叫的訊息型別,允許你傳送並接收以一些不同形式的資料,提供現有訊息格式的一些級別的相容性。   · StreamMessage -- Java原始值的資料流   · MapMessage--一套名稱-值對   · TextMessage--一個字串物件   · ObjectMessage--一個序列化的 Java物件   · BytesMessage--一個位元組的資料流

ActiveMQ的安裝

1.下載ActivityMQ http://activemq.apache.org/download-archives.html 2.啟動ActivityMQ 解壓apache-activemq-5.4.3-bin.zip到指定目錄雙擊apache-activemq-5.4.3\bin\activemq.bat  啟動ActiveMQ以後,登陸:http://localhost:8161/admin/,建立一個Queue,命名為FirstQueue,此佇列在程式碼中會使用到

在maven專案下需要匯入的jar包。

<dependencies>
  <dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-all</artifactId>
  <version>5.4.3</version>
  </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>
  </dependencies>

最後是兩個例項生產者和消費者。

生產者:

@Test
	public void product() throws Exception{
		//第一步:建立ConnectionFactory物件,需要指定服務端ip及埠號。
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
		//第二步:使用ConnectionFactory物件建立一個Connection物件。
		Connection con= connectionFactory.createConnection();
		//第三步:開啟連線,呼叫Connection物件的start方法。
		con.start();
		//第四步:使用Connection物件建立一個Session物件。(引數(“是否開啟事務”,“應答模式”))
		Session session=con.createSession(false,Session.AUTO_ACKNOWLEDGE);
		//第五步:使用Session物件建立一個Queue物件。(引數要和在頁面申請的佇列名保持一致)
		Queue queue=session.createQueue("FirstQueue");
		//第六步:使用Session物件建立一個Producer物件。
		MessageProducer producer=session.createProducer(queue);
		//第七步:建立一個Message物件,建立一個TextMessage物件。
		TextMessage textMessage=session.createTextMessage("傳送的訊息");
		//第八步:使用Producer物件傳送訊息。
		producer.send(textMessage);
		//第九步:關閉資源。
		producer.close();
		session.close();
		con.close();
	}
	

消費者:

@Test
	public void consumer() throws Exception{
		//第一步:建立一個ConnectionFactory物件。
		 ConnectionFactory factory= new ActiveMQConnectionFactory("tcp://localhost:61616");
		//第二步:從ConnectionFactory物件中獲得一個Connection物件。
		 Connection connection=factory.createConnection();
		//第三步:開啟連線。呼叫Connection物件的start方法。
		 connection.start();
		//第四步:使用Connection物件建立一個Session物件。
		 Session session=connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		//第五步:使用Session物件建立一個佇列Queue。和傳送端保持一致queue,並且佇列的名稱一致。
		Queue queue=session.createQueue("FirstQueue");
		//第六步:使用Session物件建立一個Consumer物件。
		MessageConsumer consumer=session.createConsumer(queue);
		//第七步:接收訊息。
		consumer.setMessageListener(new MessageListener() {
			
			@Override
			public void onMessage(Message message) {
				TextMessage textMessage=(TextMessage) message;
				try {
					String text=textMessage.getText();
					//第八步:列印訊息。
					System.out.println(text);
				} catch (JMSException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
			}
		});
		//一定要加上這句話,程式比監聽器執行快
		System.in.read();
		//第九步:關閉資源
		consumer.close();
		session.close();
		connection.close();

	}