訊息佇列系列之ActiveMQ(JMS、叢集配置)
1、ActiveMQ的下載與啟動
到http://activemq.apache.org/activemq-5152-release.html下載ActiveMQ
windows版本的啟動:
執行bin資料夾中的win32(32位系統)/win64(64位系統)下的:
activemq.bat(直接啟動,不能關閉命令列視窗,否則會關閉)
InstallService.bat(以服務方式啟動,可以在windows的服務中找到並設定開機自啟動)
啟動成功後,在瀏覽器中訪問localhost:8161可以進入如下介面,點選進入管理介面,輸入預設使用者名稱admin密碼admin進入
今後常用的Queue(訊息佇列)和Topics(話題)將在此處檢視。
linux版本的啟動
進入安裝目錄的bin目錄下,使用./activemq start 啟動,啟動成功後可以遠端訪問activemq管理頁面。例如,此linux機器的IP為192.169.1.102,則訪問http://192.168.1.102:8161即可
2、訊息中介軟體的相關概念
ActiveMQ兩種基本模式
訊息佇列模式中,訊息生產者生產的訊息存在訊息佇列中,訊息消費者從訊息佇列中消費訊息,一條訊息被一個消費者消費後,其餘消費者將消費不到該訊息;
釋出訂閱模式中,訊息生產者生產的訊息存在話題中,訊息消費者從話題中訂閱訊息,一條訊息可以被多個消費者消費。
使用訊息中介軟體的好處:通過訊息中介軟體解耦服務呼叫
3、JMS規範
4、普通Java程式使用JMS整合ActiveMQ
記得新增jar包:
如果使用maven記得在pom.xml中新增相應依賴:
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.15.2</version>
</dependency>
4.1訊息佇列模式
生產者
package cn.edu.shu.ces.chenjie.jms.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.*;
public class AppProducer {
private static final String url = "tcp://192.168.1.102:61616";
private static final String queueName = "queue-test";
private static Logger Log = LoggerFactory.getLogger(AppProducer.class);
public static void main(String[] args) throws JMSException {
//1、建立ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
//2、建立Connection
Connection connection = connectionFactory.createConnection();
//3、啟動連線
connection.start();
//4、建立會話
//b:使用事物 應答模式
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5、建立一個目標
Destination destination = session.createQueue(queueName);
//6、建立一個生產者
MessageProducer producer = session.createProducer(destination);
for(int i = 0; i < 100; i++){
//7、建立訊息
TextMessage textMessage = session.createTextMessage("test :" + i);
//8、傳送訊息
producer.send(textMessage);
Log.info("傳送訊息" + textMessage + "");
}
//9、關閉連線
connection.close();
}
}
消費者
package cn.edu.shu.ces.chenjie.jms.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.*;
public class AppConsumer {
private static final String url = "tcp://192.168.1.102:61616";
private static final String queueName = "queue-test";
private static Logger Log = LoggerFactory.getLogger(AppProducer.class);
public static void main(String[] args) throws JMSException {
//1、建立ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
//2、建立Connection
Connection connection = connectionFactory.createConnection();
//3、啟動連線
connection.start();
//4、建立會話
//b:使用事物 應答模式
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5、建立一個目標
Destination destination = session.createQueue(queueName);
//6、建立一個生產者
MessageConsumer consumer = session.createConsumer(destination);
//7、建立一個監聽器
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
Log.info("接收到訊息:" + textMessage);
}
});
//8、關閉連線
//connection.close();
}
}
啟動生產者
啟動消費者
4.2 話題-訂閱模式
生產者
package cn.edu.shu.ces.chenjie.jms.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.*;
public class AppProducer {
private static final String url = "tcp://192.168.1.102:61616";
private static final String topicName = "topic-test";
private static Logger Log = LoggerFactory.getLogger(AppProducer.class);
public static void main(String[] args) throws JMSException {
//1、建立ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
//2、建立Connection
Connection connection = connectionFactory.createConnection();
//3、啟動連線
connection.start();
//4、建立會話
//b:使用事物 應答模式
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5、建立一個目標
Destination destination = session.createTopic(topicName);
//6、建立一個生產者
MessageProducer producer = session.createProducer(destination);
for(int i = 0; i < 100; i++){
//7、建立訊息
TextMessage textMessage = session.createTextMessage("test :" + i);
//8、傳送訊息
producer.send(textMessage);
Log.info("傳送訊息" + textMessage + "");
}
//9、關閉連線
connection.close();
}
}
消費者
package cn.edu.shu.ces.chenjie.jms.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.*;
public class AppConsumer {
private static final String url = "tcp://192.168.1.102:61616";
private static final String topicName = "topic-test";
private static Logger Log = LoggerFactory.getLogger(AppProducer.class);
public static void main(String[] args) throws JMSException {
//1、建立ConnectionFactory
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
//2、建立Connection
Connection connection = connectionFactory.createConnection();
//3、啟動連線
connection.start();
//4、建立會話
//b:使用事物 應答模式
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5、建立一個目標
Destination destination = session.createTopic(topicName);
//6、建立一個生產者
MessageConsumer consumer = session.createConsumer(destination);
//7、建立一個監聽器
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
Log.info("接收到訊息:" + textMessage);
}
});
//8、關閉連線
//connection.close();
}
}
【先】啟動消費者訂閱話題,【再】啟動生產者產生訊息
5、JavaEE(SpringJMS)中整合ActiveMQ
依賴:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.edu.shu.ces.chenjie</groupId>
<artifactId>ActiveMQ-Test</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<spring.version>4.2.5.RELEASE</spring.version>
</properties>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.15.2</version>
<exclusions>
<exclusion>
<artifactId>spring-context</artifactId>
<groupId>org.springframework</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring.version}</version>
</dependency>
</dependencies>
</project>
專案結構:
在resources中新增三個配置檔案:
common.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<!-- ActiveMQ 提供的ConnectionFactory-->
<bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://192.168.1.102:61616" />
<!--最新版的ActiveMQ想要傳遞物件訊息,需要指定可以序列話的類所在的包-->
<property name="trustedPackages">
<list>
<value>java.lang</value>
<value>javax.security</value>
<value>java.util</value>
<value>org.apache.activemq</value>
<value>cn.edu.shu.ces.chenjie</value>
</list>
</property>
</bean>
<!--Spring JMS 提供的ConnectionFactory-->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="targetConnectionFactory">
</property>
</bean>
<!--一個佇列目的地,點對點的-->
<bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="spring-queue"/>
</bean>
<!--一個主題目的地,釋出訂閱模式-->
<bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="spring-topic"/>
</bean>
</beans>
producer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<context:annotation-config/>
<import resource="common.xml"/>
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
<bean id="producerService" class="cn.edu.shu.ces.chenjie.jms.spring.ProducerServiceImpl"/>
</beans>
consumer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
<!--匯入公共配置-->
<import resource="common.xml"/>
<!--配置訊息監聽器-->
<bean id="consumerMessageListener" class="cn.edu.shu.ces.chenjie.jms.spring.ConsumerMessageListener"/>
<!--配置訊息容器-->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<!--<property name="destination" ref="queueDestination"/>-->
<property name="destination" ref="topicDestination"/>
<property name="messageListener" ref="consumerMessageListener"/>
</bean>
</beans>
ProducerService
package cn.edu.shu.ces.chenjie.jms.spring;
public interface ProducerService {
void sendMessage(String message);
void sendLoginMessage(User user);
}
ProducerServiceImpl
package cn.edu.shu.ces.chenjie.jms.spring;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import javax.annotation.Resource;
import javax.jms.*;
public class ProducerServiceImpl implements ProducerService{
@Autowired
JmsTemplate jmsTemplate;
@Resource(name="topicDestination")//queueDestination
Destination destination;
public void sendMessage(final String message) {
jmsTemplate.send(destination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
TextMessage textMessage = session.createTextMessage(message);
return textMessage;
}
});
System.out.println("傳送訊息:" + message);
}
public void sendLoginMessage(final User user) {
jmsTemplate.send(destination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
Message loginMessage = session.createObjectMessage(user);
return loginMessage;
}
});
System.out.println("傳送使用者登入訊息:" + user);
}
}
ConsumerMessageListener
package cn.edu.shu.ces.chenjie.jms.spring;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
public class ConsumerMessageListener implements MessageListener{
public void onMessage(Message message) {
System.out.println("received:" + message);
if(message instanceof ObjectMessage){
try {
User user = (User) ((ObjectMessage) message).getObject();
System.out.println("received:" + user);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
User
package cn.edu.shu.ces.chenjie.jms.spring;
import java.io.Serializable;
public class User implements Serializable {
private String username;
private String password;
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
@Override
public String toString() {
return "User{" +
"username='" + username + '\'' +
", password='" + password + '\'' +
'}';
}
}
AppProducer
package cn.edu.shu.ces.chenjie.jms.spring;
import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
public class AppProducer {
public static void main(String[] args) {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("producer.xml");
ProducerService service = context.getBean(ProducerService.class);
for(int i = 0; i < 100; i ++){
service.sendMessage("test " + i);
}
User user = new User();
user.setUsername("chenjie");
user.setPassword("chenjie123");
service.sendLoginMessage(user);
context.close();
}
}
AppConsumer
package cn.edu.shu.ces.chenjie.jms.spring;
import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
public class AppConsumer {
public static void main(String[] args) {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("consumer.xml");
}
}
啟動生產者、消費者
JMS規範規定的和ActiveMQ支援的訊息型別有很多,例子中使用到了文字訊息和物件訊息。
兩種訊息模式需要更改相應的配置檔案(配置檔案中的註釋)
6、ActiveMQ叢集的搭建與測試
叢集結構:
NodeA不儲存訊息,NodeB和NodeC互斥共享訊息儲存,當一方佔有訊息儲存時成為master,另一方成為slave。成為master的一方跟A同步訊息,做到了負載均衡。當B/C中的master宕機時,釋放訊息儲存,此時的slave得到訊息儲存成為master,做到了安全。
配置安排:
由於是示例,因此在用一臺機器上的不同埠部署3個結點充當3臺機器,並使用本地磁碟資料夾充當共享資料夾。
實際應該是3臺機器,使用分散式檔案系統當共享資料夾。
將activemq的資料夾拷貝3份,
分別取名為a b c,同時新建一個share資料夾用於訊息儲存共享資料夾
依次編輯各個資料夾中conf下的actvmq.xml 和jetty.xml
[email protected]:~/activemq/activemq-a/conf$ vim activemq.xml
註釋掉了transportConnectors標籤下的後面幾個配置,新增了一個networkConnectors配置,使A連線到B和C
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <!-- <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> --> </transportConnectors> <networkConnectors> <networkConnector name="local_network" uri="static:(tcp://127.0.0.1:61617,tcp://127.0.0.1:61618)" /> </networkConnectors>
[email protected]:~/activemq/activemq-b/conf$ vim activemq.xml
修改持久化目錄
<persistenceAdapter> <kahaDB directory="/home/chenjie/activemq/share"/> </persistenceAdapter>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<!-- <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
-->
</transportConnectors>
<networkConnectors>
<networkConnector name="network_a" uri="static:(tcp://127.0.0.1:61616)" />
</networkConnectors>
註釋掉transportConnector 的後面幾個配置,並增加一個networkConnectors,建立跟a的靜態連線
c的activemq.xml配置與b一樣
[email protected]:~/activemq/activemq-a/conf$ vim jetty.xml,在此修改埠號,將a b c的埠修改為8061 8062 8062
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8161"/>
</bean>
依次啟動a b c
依次訪問:http://192.168.1.102:8161/admin http://192.168.1.102:8162/admin http://192.168.1.102:8163/admin
可以看到6183對應的c無法訪問,這是因為6182對應的b搶佔了共享訊息儲存,所以c沒有提供服務。
將b殺掉:
再次訪問c對應的http://192.168.1.102:8163/admin
因為b已經殺掉了所以b對應的8062顯然不能顯示了。這也證實了b和c能夠起到保障可用性的作用。
使用叢集模式時程式碼中需修改的地方:
生產者:
private static final String url = "failover:(tcp://192.168.1.102:61617,tcp://192.168.1.102:61618)?randomize=true";
消費者:
private static final String url = "failover:(tcp://192.168.1.61616,tcp://192.168.1.102:61617,tcp://192.168.1.102:61618)?randomize=true";