AMQ的一些簡單實戰
目錄
一、建立客戶端
public class Sender { public static void main(String[] args) throws Exception{ //第一步:建立connectionFactory工廠物件【需填入使用者名稱、密碼、要連線的地址】 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("bhz","bhz","tcp://localhost:61616"); //第二步:通過ConnectionFactory工廠物件建立一個Connection連線,並且呼叫Connection的start方法開啟連線【connection預設是關閉的】 Connection connection = connectionFactory.createConnection(); connection.start(); //第三步:通過connection建立session會話(上下文環境物件),用於接收訊息,引數配置1為是否啟用事務,引數配置2為簽收模式【一般我們設定為自動簽收】 // Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //使用事務的方式進行訊息的傳送 // Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); //使用CLIENT端簽收的方式 Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); //第四步:通過session建立Destination物件,指的是一個客戶端用來指定生產訊息目標和消費訊息來源的物件,在PTP模式中,Destination被稱作Queue即佇列;在Pub/Sub模式,Destination被稱作Topic即主題.在程式中可以使用多個Queue和Topic. Destination destination = session.createQueue("queue1"); //第五步:通過session物件建立訊息的傳送和接收物件(生產者和消費者)MessageProducer/MessageConsumer MessageProducer messageProducer = session.createProducer(null); //第六步:可以使用MessageProducer的setDeliveryMode方法為其設定持久化特性和非持久化特性(DeliveryMode) messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); //第七步:使用JMS規範的TextMessage形式建立資料(通過session物件),並用MessageProducer的send方法傳送資料 for (int i = 1; i <= 5; i++) { TextMessage textMessage = session.createTextMessage("helloworld"+i); //第一個引數: 目的地 //第二個引數: 訊息 //第三個引數: 是否持久化 //第四個引數: 優先順序【0-9 0-4表示普通 5-9表示加急 預設4】 //第五個引數: 訊息在mq上的存放有效期【單位毫秒】 messageProducer.send(destination, textMessage, DeliveryMode.NON_PERSISTENT, i, 1000*60*2); System.out.println("生產者:"+textMessage.getText()); } //使用事務提交 //session.commit(); if (connection!=null) { connection.close(); } } }
二、建立接收端
public class Receiver { public static void main(String[] args) throws Exception{ //第一步:建立connectionFactory工廠物件【需填入使用者名稱、密碼、要連線的地址】 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("bhz","bhz","tcp://localhost:61616"); //第二步:通過ConnectionFactory工廠物件建立一個Connection連線,並且呼叫Connection的start方法開啟連線【connection預設是關閉的】 Connection connection = connectionFactory.createConnection(); connection.start(); //第三步:通過connection建立session會話(上下文環境物件),用於接收訊息,引數配置1為是否啟用事務,引數配置2為簽收模式【一般我們設定為自動簽收】 Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); //第四步:通過session建立Destination物件,指的是一個客戶端用來指定生產訊息目標和消費訊息來源的物件,在PTP模式中,Destination被稱作Queue即佇列;在Pub/Sub模式,Destination被稱作Topic即主題.在程式中可以使用多個Queue和Topic. Destination destination = session.createQueue("queue1"); //第五步:通過session物件建立訊息的傳送和接收物件(生產者和消費者)MessageProducer/MessageConsumer MessageConsumer messageConsumer = session.createConsumer(destination); //第七步:使用JMS規範的TextMessage形式建立資料(通過session物件),並用MessageProducer的send方法傳送資料 while (true) { TextMessage msg = (TextMessage) messageConsumer.receive(); //手動去簽收訊息,另起一個執行緒(TCP)去通知我們的MQ服務,確認簽收 Thread.sleep(1000); msg.acknowledge(); if(msg == null) break; System.out.println(msg.getText()); } if (connection!=null) { connection.close(); } } }
三、釋出
public class Publish { private ConnectionFactory factory; private Connection connection; private Session session; private MessageProducer producer; public Publish(){ try { factory = new ActiveMQConnectionFactory( "bhz",//ActiveMQConnection.DEFAULT_USER, "bhz",//ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); connection = factory.createConnection(); connection.start(); session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); producer = session.createProducer(null); } catch (Exception e) { e.printStackTrace(); } } public void sendMessage() throws Exception{ Destination destination = session.createTopic("topic1"); TextMessage textMessage = session.createTextMessage("我是內容"); producer.send(destination, textMessage); } public static void main(String[] args) throws Exception { Publish p = new Publish(); p.sendMessage(); } }
四、使用者訂閱
public class Consumer1 {
private ConnectionFactory factory;
private Connection connection;
private Session session;
private MessageConsumer consumer;
public Consumer1(){
try {
factory = new ActiveMQConnectionFactory(
"bhz",//ActiveMQConnection.DEFAULT_USER,
"bhz",//ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
} catch (Exception e) {
e.printStackTrace();
}
}
public void receive() throws Exception{
Destination destination = session.createTopic("topic1");
consumer = session.createConsumer(destination);
consumer.setMessageListener(new Listener());
}
class Listener implements MessageListener{
public void onMessage(Message message) {
try {
System.out.println("consumer1收到訊息:"+((TextMessage)message).getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public static void main(String[] args) throws Exception {
Consumer1 p = new Consumer1();
p.receive();
}
}
五、配置安全認證
1、 activemq.xml中
在<shutdownHooks> <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" /> </shutdownHooks>下面新增 <plugins> <simpleAuthenticationPlugin> <users> <authenticationUser username="bhz" password="bhz" groups="users,admins"/> </users> </simpleAuthenticationPlugin> </plugins> 2、對應的java程式碼要改成
//第一步:建立connectionFactory工廠物件【需填入使用者名稱、密碼、要連線的地址】 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("bhz","bhz","tcp://localhost:61616");
3、頁面登入:http://127.0.0.1:8161/ 還是按照jconf/jetty-real.properties中的配置來 # Defines users that can access the web (console, demo, etc.) # username: password [,rolename ...] admin: admin, admin