1. 程式人生 > >訊息佇列activemq入門教程

訊息佇列activemq入門教程

關於web系統中佇列的使用參看文章http://jinnianshilongnian.iteye.com/blog/2321715

前言:

訊息佇列僅僅是佇列中的一個分支應用,使用場景:各個微服務之間的通訊,包括資料非同步同步等等場合,比如下訂單業務,可能涉及到使用者中心服務,訂單服務,財務服務,產品服務,倉庫服務等微服務;傳統做法是寫大量的介面,各個服務間呼叫介面來進行建立訂單、付款、支付、扣減庫存等等操作,一方面大量介面後期難以維護,可能改一個介面會導致整個訂單流程掛掉;另一方面介面之間會出現呼叫失敗、相應超時等問題。在這種情況下,引入訊息佇列,可以解決這些問題。通過訊息佇列可實現非同步處理、系統解耦。

1.下載ActiveMQ

http://activemq.apache.org/

通過url可以看出來,apache出品~

2.安裝使用

解壓下載的zip檔案,執行bin目錄的activemq.bat。啟動後,登陸:http://localhost:8161/admin/,使用者名稱和密碼都是admin

3.例項

程式需要的jar包如下:

activemq-client-5.9.0.jar
geronimo-j2ee-management_1.1_spec-1.0.1.jar
geronimo-jms_1.1_spec-1.1.1.jar
log4j-1.2.17.jar
slf4j-api-1.7.5.jar
slf4j-log4j12-1.7.5.jar

我這裡模擬生產者/消費者,或者可以叫生產者/接受者。

先寫傳送模組

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 測試activeMQ訊息
 * 
 * @author ***
 * */
public class Sender {

	private static final int SEND_NUMBER = 5;

	public void sendMessage() {
		// 連線工廠,JMS 用它建立連線
		ConnectionFactory connectionFactory;
		// JMS 客戶端到JMS Provider 的連線
		Connection connection = null;
		// 一個傳送或接收訊息的執行緒
		Session session;
		// 訊息的目的地
		Destination destination;
		// 訊息傳送者
		MessageProducer producer;
		// TextMessage message;
		// 構造ConnectionFactory例項物件,此處採用ActiveMq的實現jar
		connectionFactory = new ActiveMQConnectionFactory(
				ActiveMQConnection.DEFAULT_USER,
				ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");

		try {
			// 構造從工廠得到連線物件
			connection = connectionFactory.createConnection();
			// 啟動
			connection.start();
			// 獲取操作連線
			session = connection.createSession(Boolean.TRUE,
					Session.AUTO_ACKNOWLEDGE);
			// 獲取session,注意引數值,接收要同樣的Queue
			destination = session.createQueue("FirstQueue");
			// 得到訊息生成者
			producer = session.createProducer(destination);
			// 設定不持久化,此處學習,實際根據專案決定
			producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
			// 構造訊息,此處寫死,專案就是引數,或者方法獲取
			sendMessage(session, producer);
			session.commit();
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			try {
				if (null != connection)
					connection.close();
			} catch (Throwable ignore) {
			}
		}
	}

	private void sendMessage(Session session, MessageProducer producer)
			throws Exception {
		for (int i = 1; i <= SEND_NUMBER; i++) {
			TextMessage message = session.createTextMessage("ActiveMq 傳送的訊息"
					+ i);
			producer.send(message);
		}
	}

}
接收模組
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

public class Receiver {

	public static void main(String[] args) {
		// 連線工廠,JMS 用它建立連線
		ConnectionFactory connectionFactory;
		// JMS 客戶端到JMS Provider 的連線
		Connection connection = null;
		// 一個傳送或接收訊息的執行緒
		Session session;
		// 訊息的目的地
		Destination destination;
		// 訊息接收者
		MessageConsumer consumer;
		connectionFactory = new ActiveMQConnectionFactory(
				ActiveMQConnection.DEFAULT_USER,
				ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
		try {
			// 構造從工廠得到連線物件
			connection = connectionFactory.createConnection();
			// 啟動
			connection.start();
			// 獲取操作連線
			session = connection.createSession(Boolean.FALSE,
					Session.AUTO_ACKNOWLEDGE);
			// 獲取session
			destination = session.createQueue("FirstQueue");
			consumer = session.createConsumer(destination);
			while (true) {
				// 設定接收者接收訊息的時間
				// 0意思是程式會一直執行
				TextMessage message = (TextMessage) consumer.receive(0);
				if (null != message) {
					System.out.println("收到訊息" + message.getText());
				} else {
					break;
				}
			}
		} catch (Exception e) {
			e.printStackTrace();
		} finally {
			try {
				if (null != connection)
					connection.close();
			} catch (Throwable ignore) {
			}
		}
	}

}
測試的時候,可以在Sender.java中增加一個main方法,先呼叫sendMessage(),然後啟動Receiver.java,可以在控制檯看到打印出了接受的值。

也可以登入http://localhost:8161/admin/檢視傳送接收的訊息。

我自己覺得activemq就是一個轉發伺服器,具體技術細節待後續研究。