RabbitMQ與java、Spring結合例項詳細講解
摘要:本文介紹了rabbitMq,提供瞭如何在Ubuntu下安裝RabbitMQ 服務的方法。最好以RabbitMQ與java、Spring結合的兩個例項來演示如何使用RabbitMQ。
一、rabbitMQ簡介
1.1、rabbitMQ的優點(適用範圍)
1. 基於erlang語言開發具有高可用高併發的優點,適合叢集伺服器。
2. 健壯、穩定、易用、跨平臺、支援多種語言、文件齊全。
3. 有訊息確認機制和持久化機制,可靠性高。
4. 開源
其他MQ的優勢:
1. Apache ActiveMQ曝光率最高,但是可能會丟訊息。
2. ZeroMQ延遲很低、支援靈活拓撲,但是不支援訊息持久化和崩潰恢復。
1.2、幾個概念說明
producer&Consumer
producer指的是訊息生產者,consumer訊息的消費者。
Queue
訊息佇列,提供了FIFO的處理機制,具有快取訊息的能力。rabbitmq中,佇列訊息可以設定為持久化,臨時或者自動刪除。
設定為持久化的佇列,queue中的訊息會在server本地硬碟儲存一份,防止系統crash,資料丟失
設定為臨時佇列,queue中的資料在系統重啟之後就會丟失
設定為自動刪除的佇列,當不存在使用者連線到server,佇列中的資料會被自動刪除Exchange
Exchange類似於資料通訊網路中的交換機,提供訊息路由策略。rabbitmq中,producer不是通過通道直接將訊息傳送給queue,而是先發送給Exchange。一個Exchange可以和多個Queue進行繫結,producer在傳遞訊息的時候,會傳遞一個ROUTING_KEY,Exchange會根據這個ROUTING_KEY按照特定的路由演算法,將訊息路由給指定的queue。和Queue一樣,Exchange也可設定為持久化,臨時或者自動刪除。
Exchange有4種類型:direct(預設),fanout, topic, 和headers,不同型別的Exchange轉發訊息的策略有所區別:
Direct
直接交換器,工作方式類似於單播,Exchange會將訊息傳送完全匹配ROUTING_KEY的Queue
fanout
廣播是式交換器,不管訊息的ROUTING_KEY設定為什麼,Exchange都會將訊息轉發給所有繫結的Queue。
topic
主題交換器,工作方式類似於組播,Exchange會將訊息轉發和ROUTING_KEY匹配模式相同的所有佇列,比如,ROUTING_KEY為user.stock的Message會轉發給繫結匹配模式為 * .stock,user.stock, * . * 和#.user.stock.#的佇列。( * 表是匹配一個任意片語,#表示匹配0個或多個片語)
headers
訊息體的header匹配(ignore)
Binding
所謂繫結就是將一個特定的 Exchange 和一個特定的 Queue 繫結起來。Exchange 和Queue的繫結可以是多對多的關係。
virtual host
在rabbitmq server上可以建立多個虛擬的message broker,又叫做virtual hosts (vhosts)。每一個vhost本質上是一個mini-rabbitmq server,分別管理各自的exchange,和bindings。vhost相當於物理的server,可以為不同app提供邊界隔離,使得應用安全的執行在不同的vhost例項上,相互之間不會干擾。producer和consumer連線rabbit server需要指定一個vhost。
1.3、訊息佇列的使用過程
1. 客戶端連線到訊息佇列伺服器,開啟一個channel。
2. 客戶端宣告一個exchange,並設定相關屬性。
3. 客戶端宣告一個queue,並設定相關屬性。
4. 客戶端使用routing key,在exchange和queue之間建立好繫結關係。
5. 客戶端投遞訊息到exchange。
6. exchange接收到訊息後,就根據訊息的key和已經設定的binding,進行訊息路由,將訊息投遞到一個或多個佇列裡
二、環境配置與安裝
1、Erlang環境安裝
RabbitMQ是基於Erlang的,所以首先必須配置Erlang環境。
從Erlang的官網 http://www.erlang.org/download.html 下載最新的erlang安裝包,我下載的版本是 otp_src_R14B03.tar.gz 。然後:
$ tar xvzf otp_src_R14B03.tar.gz
$ cd otp_src_R14B03
$ ./configure
編譯後的輸出 如下圖:
注:
可能會報錯 configure: error: No curses library functions found
configure: error: /bin/sh '/home/liyixiang/erlang/configure' failed for erts
原因是缺少ncurses包
解決:在ubuntu系統下apt-cache search ncurses
apt-get install libncurses5-dev
然後重新執行
./configure
提示沒有wxWidgets和fop、ssh、odbc、ssl,但是問題不大。繼續:
make
然後:
sudo make install
配置erlang環境變數
修改/etc/profile檔案,增加下面的環境變數:(vim profile i插入 編輯完畢ESC退出 wq!強制修改)
#set erlang environment
export PATH=$PATH:/usr/erlang/bin:$PATH
source profile使得檔案生效
下面是我的
2、RabbitMQ-Server安裝
安裝完Erlang,開始安裝RabbitMQ-Server。安裝方法有三種,這裡筆者三者都試過了,就只有以下這個方法成功了。
直接使用:
apt-get install rabbitmq-server
安裝完成後會自動開啟:
使用命令檢視rabbitmq執行狀態:
rabbitmqctl status
停止
rabbitmqctl stop
開啟
rabbitmq-server start
3、rabbitmq web管理頁面外掛安裝
輸入以下命令
cd /usr/lib/rabbitmq/bin/
rabbitmq-plugins enable rabbitmq_management
這裡筆者一直安裝不成功。
如果安裝成功開啟瀏覽器,輸入 http://[server-name]:15672/ 如 http://localhost:15672/ ,會要求輸入使用者名稱和密碼,用預設的guest/guest即可(guest/guest使用者只能從localhost地址登入,如果要配置遠端登入,必須另建立使用者)。
如果要從遠端登入怎麼做呢?處於安全考慮,guest這個預設的使用者只能通過http://localhost:15672來登入,其他的IP無法直接用這個guest帳號。這裡我們可以通過配置檔案來實現從遠端登入管理介面,只要編輯/etc/rabbitmq/rabbitmq.config檔案(沒有就新增),新增以下配置就可以了。
4、新增使用者
vim /etc/rabbitmq/rabbitmq.config
然後新增 [
{rabbit, [{tcp_listeners, [5672]}, {loopback_users, ["asdf"]}]}
].
注意上面有個點號 現在添加了一個新授權使用者asdf,可以遠端使用這個使用者名稱。記得要先用命令新增這個命令才行:
cd /usr/lib/rabbitmq/bin/
#使用者名稱與密碼 sudo rabbitmqctl add_user asdf 123456
使用者設定為administrator才能遠端訪問 sudo rabbitmqctl set_user_tags asdf administrator
sudo rabbitmqctl set_permissions -p / asdf ".*" ".*" ".*"
其實也可以通過管理平臺頁面直接新增使用者和密碼等資訊。如果還不能遠端訪問或遠端登入檢查是不是5672, 15672埠沒有開放!!!!!!
5、開放埠
ufw allow 5672
三、簡單Java例項
下面來演示一個使用java的簡單例項: 1、首先是訊息生產者和提供者的基類package com.lin;
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
*
* 功能概要: EndPoint型別的佇列
*
* @author linbingwen
* @since 2016年1月11日
*/
public abstract class EndPoint{
protected Channel channel;
protected Connection connection;
protected String endPointName;
public EndPoint(String endpointName) throws IOException{
this.endPointName = endpointName;
//Create a connection factory
ConnectionFactory factory = new ConnectionFactory();
//hostname of your rabbitmq server
factory.setHost("10.75.4.25");
factory.setPort(5672);
factory.setUsername("asdf");
factory.setPassword("123456");
//getting a connection
connection = factory.newConnection();
//creating a channel
channel = connection.createChannel();
//declaring a queue for this channel. If queue does not exist,
//it will be created on the server.
channel.queueDeclare(endpointName, false, false, false, null);
}
/**
* 關閉channel和connection。並非必須,因為隱含是自動呼叫的。
* @throws IOException
*/
public void close() throws IOException{
this.channel.close();
this.connection.close();
}
}
2、訊息提供者
package com.lin.producer;
import java.io.IOException;
import java.io.Serializable;
import org.apache.commons.lang.SerializationUtils;
import com.lin.EndPoint;
/**
*
* 功能概要:訊息生產者
*
* @author linbingwen
* @since 2016年1月11日
*/
public class Producer extends EndPoint{
public Producer(String endPointName) throws IOException{
super(endPointName);
}
public void sendMessage(Serializable object) throws IOException {
channel.basicPublish("",endPointName, null, SerializationUtils.serialize(object));
}
}
3、訊息消費者
package com.lin.consumer;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang.SerializationUtils;
import com.lin.EndPoint;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
/**
*
* 功能概要:讀取佇列的程式端,實現了Runnable介面
*
* @author linbingwen
* @since 2016年1月11日
*/
public class QueueConsumer extends EndPoint implements Runnable, Consumer{
public QueueConsumer(String endPointName) throws IOException{
super(endPointName);
}
public void run() {
try {
//start consuming messages. Auto acknowledge messages.
channel.basicConsume(endPointName, true,this);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* Called when consumer is registered.
*/
public void handleConsumeOk(String consumerTag) {
System.out.println("Consumer "+consumerTag +" registered");
}
/**
* Called when new message is available.
*/
public void handleDelivery(String consumerTag, Envelope env,
BasicProperties props, byte[] body) throws IOException {
Map map = (HashMap)SerializationUtils.deserialize(body);
System.out.println("Message Number "+ map.get("message number") + " received.");
}
public void handleCancel(String consumerTag) {}
public void handleCancelOk(String consumerTag) {}
public void handleRecoverOk(String consumerTag) {}
public void handleShutdownSignal(String consumerTag, ShutdownSignalException arg1) {}
}
4、測試
package com.lin.test;
import java.io.IOException;
import java.sql.SQLException;
import java.util.HashMap;
import com.lin.consumer.QueueConsumer;
import com.lin.producer.Producer;
public class Test {
public Test() throws Exception{
QueueConsumer consumer = new QueueConsumer("queue");
Thread consumerThread = new Thread(consumer);
consumerThread.start();
Producer producer = new Producer("queue");
for (int i = 0; i < 1000000; i++) {
HashMap message = new HashMap();
message.put("message number", i);
producer.sendMessage(message);
System.out.println("Message Number "+ i +" sent.");
}
}
/**
* @param args
* @throws SQLException
* @throws IOException
*/
public static void main(String[] args) throws Exception{
new Test();
}
}
其中引入的jar包:
<!-- rabbitmq客戶端 -->
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.0.4</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.1</version>
</dependency>
</dependencies>
測試結果: 在提供訊息 在消費訊息
然後同時開啟rabbitmq的服務端,輸入如下:
rabbitmqctl list_queues
這個命令是用來檢視服務端中有多處個訊息佇列的。
可以看到有個名為queue的訊息佇列(更好的方法是安裝好web監控外掛,筆者一直安裝失敗,所以這裡就不展示了)
四、Rbbitmq與Spring結合使用
首先建立一個maven工程,整個專案的結構如下:下面將具體來講講整個過程 1、jar包的引入 pom.xml配置即可,如下:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.lin</groupId>
<artifactId>rabbit_c2</artifactId>
<version>0.0.1-SNAPSHOT</version>
<properties>
<!-- spring版本號 -->
<spring.version>3.2.8.RELEASE</spring.version>
<!-- log4j日誌檔案管理包版本 -->
<slf4j.version>1.6.6</slf4j.version>
<log4j.version>1.2.12</log4j.version>
<!-- junit版本號 -->
<junit.version>4.10</junit.version>
</properties>
<dependencies>
<!-- 新增Spring依賴 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aspects</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>${spring.version}</version>
</dependency>
<!--單元測試依賴 -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<!-- 日誌檔案管理包 -->
<!-- log start -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<!-- log end -->
<!--spring單元測試依賴 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>${spring.version}</version>
<scope>test</scope>
</dependency>
<!--rabbitmq依賴 -->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>1.3.5.RELEASE</version>
</dependency>
<dependency>
<groupId>javax.validation</groupId>
<artifactId>validation-api</artifactId>
<version>1.1.0.Final</version>
</dependency>
<dependency>
<groupId>org.hibernate</groupId>
<artifactId>hibernate-validator</artifactId>
<version>5.0.1.Final</version>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>src/main/resources</directory>
<targetPath>${basedir}/target/classes</targetPath>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
</includes>
<filtering>true</filtering>
</resource>
<resource>
<directory>src/main/resources</directory>
<targetPath>${basedir}/target/resources</targetPath>
<includes>
<include>**/*.properties</include>
<include>**/*.xml</include>
</includes>
<filtering>true</filtering>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-war-plugin</artifactId>
<version>2.1.1</version>
<configuration>
<warSourceExcludes>${warExcludes}</warSourceExcludes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.4.3</version>
<configuration>
<testFailureIgnore>true</testFailureIgnore>
</configuration>
</plugin>
<plugin>
<inherited>true</inherited>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<executions>
<execution>
<id>attach-sources</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<configuration>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>
2、訊息生產者
package com.lin.producer;
import javax.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Service;
/**
* 功能概要:訊息產生,提交到佇列中去
*
* @author linbingwen
* @since 2016年1月15日
*/
@Service
public class MessageProducer {
private Logger logger = LoggerFactory.getLogger(MessageProducer.class);
@Resource
private AmqpTemplate amqpTemplate;
public void sendMessage(Object message){
logger.info("to send message:{}",message);
amqpTemplate.convertAndSend("queueTestKey",message);
}
}
3、訊息消費者
package com.lin.consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
/**
* 功能概要:消費接收
*
* @author linbingwen
* @since 2016年1月15日
*/
public class MessageConsumer implements MessageListener {
private Logger logger = LoggerFactory.getLogger(MessageConsumer.class);
@Override
public void onMessage(Message message) {
logger.info("receive message:{}",message);
}
}
4、rabbitMq.xml配置資訊
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
<!--配置connection-factory,指定連線rabbit server引數 -->
<rabbit:connection-factory id="connectionFactory"
username="asdf" password="123456" host="10.75.4.25" port="5672" />
<!--定義rabbit template用於資料的接收和傳送 -->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"
exchange="exchangeTest" />
<!--通過指定下面的admin資訊,當前producer中的exchange和queue會在rabbitmq伺服器上自動生成 -->
<rabbit:admin connection-factory="connectionFactory" />
<!--定義queue -->
<rabbit:queue name="queueTest" durable="true" auto-delete="false" exclusive="false" />
<!-- 定義direct exchange,繫結queueTest -->
<rabbit:direct-exchange name="exchangeTest" durable="true" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="queueTest" key="queueTestKey"></rabbit:binding>
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- 訊息接收者 -->
<bean id="messageReceiver" class="com.lin.consumer.MessageConsumer"></bean>
<!-- queue litener 觀察 監聽模式 當有訊息到達時會通知監聽在對應的佇列上的監聽物件-->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener queues="queueTest" ref="messageReceiver"/>
</rabbit:listener-container>
</beans>
5、spring整合rabbiqMq。application.xml內容如下:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd">
<import resource="classpath*:rabbitmq.xml" />
<!-- 掃描指定package下所有帶有如@controller,@services,@resource,@ods並把所註釋的註冊為Spring Beans -->
<context:component-scan base-package="com.lin.consumer,com.lin.producer" />
<!-- 啟用annotation功能 -->
<context:annotation-config />
<!-- 啟用annotation功能 -->
<context:spring-configured />
</beans>
6、最後,為了方便,列印了日誌,log4j.properties配置如下
log4j.rootLogger=DEBUG,Console,Stdout
#Console
log4j.appender.Console=org.apache.log4j.ConsoleAppender
log4j.appender.Console.layout=org.apache.log4j.PatternLayout
log4j.appender.Console.layout.ConversionPattern=%d [%t] %-5p [%c] - %m%n
log4j.logger.java.sql.ResultSet=INFO
log4j.logger.org.apache=INFO
log4j.logger.java.sql.Connection=DEBUG
log4j.logger.java.sql.Statement=DEBUG
log4j.logger.java.sql.PreparedStatement=DEBUG
log4j.appender.Stdout = org.apache.log4j.DailyRollingFileAppender
log4j.appender.Stdout.File = E://logs/log.log
log4j.appender.Stdout.Append = true
log4j.appender.Stdout.Threshold = DEBUG
log4j.appender.Stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.Stdout.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss} [ %t:%r ] - [ %p ] %m%n
接著執行整個工程即可:
下面是執行的結果:
一會發一會收:因為在同一工程,所以發訊息和接訊息是交替出現的
我們出可以去rabbitMq 伺服器上看: 可以看到,我們配置的佇列已存在了:
到此,整個工程結束。