JMS-訊息中介軟體
一、簡介 1、JMS(Java Message Service),即:java訊息服務應用程式介面。 2、是Java平臺面向訊息中介軟體(MOM)的API/技術規範。 3、場景:應用與兩個應用程式之間,或者分散式系統架構中分發訊息,可進行非同步/同步方式的通訊,和平臺API無關, 基本多數的MOM都提供對JMS的支援
二、訊息模型
在JMS標準中,有兩種訊息模型P2P(Point to Point),Publish/Subscribe(Pub/Sub)。
Queue與Topic 主要區別
三、JMS 體系架構
(1) ConnectionFactory
建立Connection物件的工廠,針對兩種不同的jms訊息模型,分別有QueueConnectionFactory和TopicConnectionFactory兩種。可以通過JNDI來查詢ConnectionFactory物件。
(2) Destination
Destination的意思是訊息生產者的訊息傳送目標或者說訊息消費者的訊息來源。對於訊息生產者來說,它的Destination是某個佇列(Queue)或某個主題(Topic);對於訊息消費者來說,它的Destination也是某個佇列或主題(即訊息來源)。
所以,Destination實際上就是兩種型別的物件:Queue、Topic可以通過JNDI來查詢Destination。
(3) Connection
Connection表示在客戶端和JMS系統之間建立的連結(對TCP/IP socket的包裝)。Connection可以產生一個或多個Session。跟ConnectionFactory一樣,Connection也有兩種型別:QueueConnection和TopicConnection。
(4) Session
Session是操作訊息的介面。可以通過session建立生產者、消費者、訊息等。Session提供了事務的功能。當需要使用session傳送/接收多個訊息時,可以將這些傳送/接收動作放到一個事務中。同樣,也分QueueSession和TopicSession。
(5) 訊息的生產者
訊息生產者由Session建立,並用於將訊息傳送到Destination。同樣,訊息生產者分兩種型別:QueueSender和TopicPublisher。可以呼叫訊息生產者的方法(send或publish方法)傳送訊息。
(6) 訊息消費者
訊息消費者由Session建立,用於接收被髮送到Destination的訊息。兩種型別:QueueReceiver和TopicSubscriber。可分別通過session的createReceiver(Queue)或createSubscriber(Topic)來建立。當然,也可以session的creatDurableSubscriber方法來建立持久化的訂閱者。
(7) MessageListener
訊息監聽器。如果註冊了訊息監聽器,一旦訊息到達,將自動呼叫監聽器的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一種MessageListener。
四、常用訊息佇列
Active MQ,Rabbit MQ,Zero MQ,Kafka等是常見的JMS實現方式,一般實現的方法基本差不多。
功能描述:生產者將訊息傳送到佇列(佇列的名字為hello)中,消費者從佇列中獲取訊息。
1、Rabbit MQ 實現
(1)新建maven工程 ,引入依賴
<!-- RabbiteMQ Java客戶端 --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.0.3</version> </dependency>
<!--測試 --> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.7</version> <scope>test</scope> </dependency>
(2)編寫生產者、消費者
生產者:
package com.xdl.test;
import java.io.IOException; import java.util.concurrent.TimeoutException;
import org.junit.Test;
import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
/** * 生產者 */ public class Provider {
@Test public void testBasicPublish() throws IOException, TimeoutException { // 建立連線工廠 ConnectionFactory factory = new ConnectionFactory();
// 設定相關引數,地址,埠,賬號,密碼 factory.setHost("127.0.0.1"); factory.setPort(AMQP.PROTOCOL.PORT); // 5672 factory.setUsername("guest"); factory.setPassword("guest");
// 新建一個長連線 Connection connection = factory.newConnection();
// 建立一個通道(一個輕量級的連線) Channel channel = connection.createChannel();
// 宣告一個佇列 String QUEUE_NAME = "hello";
// 1-佇列名稱 2-佇列是否持久化 3-是否是排他佇列 4-使用完之後是否刪除此佇列 5-其他屬性 channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 傳送訊息到佇列中 String message = "Hello RabbitMQ!";
// // 建立路由 1-路由名稱 2-路由型別 // channel.exchangeDeclare("myexchange", "topic"); // // 繫結路由佇列 1-佇列名稱 2-路由名稱 3-routing key // channel.queueBind("heelo", "myexchange", "shensha"); // // 傳送訊息 1-路由名稱 2-routing key3-其他資訊 4-訊息位元組陣列 // channel.basicPublish("myexchange", "shensha", null, "HelloWorld".getBytes()); // 注意:exchange如果不需要寫成空字串,routingKey和佇列名稱保持一致 channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println("Producer Send a message:" + message);
// 關閉資源 channel.close(); connection.close(); }
} 消費者:
package com.xdl.test;
import java.io.UnsupportedEncodingException; import org.junit.Test;
import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope;
/** * 消費者 * */ public class Consumer {
@Test public void testBasicConsumer() throws Exception{ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(AMQP.PROTOCOL.PORT); // 5672 factory.setUsername("guest"); factory.setPassword("guest");
// 新建一個長連線 Connection connection = factory.newConnection();
// 建立一個通道(一個輕量級的連線) Channel channel = connection.createChannel(); // 宣告一個佇列 String QUEUE_NAME = "hello"; channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println("Consumer Wating Receive Message");
//消費訊息. 1-消費佇列 2-是否自動傳送訊息回執 3-回撥函式 com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException { String message = new String(body, "UTF-8"); System.out.println(" [C] Received '" + message + "'"); } }; // 訂閱訊息 channel.basicConsume(QUEUE_NAME, true, consumer); } } (3)安裝RabbitMQ,並啟動訪問
第一步:找到mq的安裝目錄sbin下輸入cmd
輸入:命令rabbitmq-plugins enable rabbitmq_management,訪問地址:http://127.0.0.1:15672/ 預設賬號:guest/guest
ActiveMQ :
主要實現: producer.send(message); //傳送者 consumer.receive(100000); //接受中