activemq訊息佇列
1.下載http://activemq.apache.org/activemq-5157-release.html包,解壓後直接雙擊C:\Users\Mac\Desktop\apache-activemq-5.15.7-bin\apache-activemq-5.15.7\bin\win64下的activemq.bat即可執行activemq服務,瀏覽器執行http://localhost:8161/admin/topics.jsp即可檢視訊息佇列資訊。(包括訊息生產者,訊息消費者,訊息佇列queue、訂閱topic)
2.預設是使用者密碼和使用者賬號是:system、manager,在config檔案下的
3.可手動修改配置:
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" useJmx="true"> /** useJmx="true"
<managementContext createConnector="true"/> // createConnector="true"
4.queue普通佇列程式碼,一個訊息生產者和一個訊息消費者,
生產者:
/**
* 訊息生產者
* @author Mac
*
*/
public class QueueProducer {
public static void main(String[] args) {
//連線資訊設定
String username="system";
String password="manager";
String brokerUrl="failover://tcp://localhost:61616";
//連線工廠
ConnectionFactory cf=null;
//連線
Connection c=null;
//會話 接受或傳送訊息訊息的執行緒
Session session=null;
//訊息目的地,訊息佇列
Destination des=null;
//訊息生產者
MessageProducer mp=null;
//傳送訊息
TextMessage tm=null;
//例項化連線工廠
cf=new ActiveMQConnectionFactory(username,password,brokerUrl);
try {
c=cf.createConnection();//建立連線
c.start();
session=c.createSession(true,Session.AUTO_ACKNOWLEDGE);//建立會話,auto_acknowledge自動應答
des=session.createQueue("QueueTest");//建立名為queueTest名稱的訊息佇列
mp=session.createProducer(des);//訊息生產者
tm=session.createTextMessage("queue測試新訊息666888");//建立訊息
mp.send(tm);
session.commit();
} catch (Exception e) {
e.printStackTrace();
}finally{
if(c!=null){
try {
c.close();
} catch (Exception
e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}
消費者:
/**
* 訊息消費者
* @author Mac
*
*/
public class QueueConsumer {
public static void main(String[] args) {
String username="system";
String password="manager";
String url="failover://tcp://localhost:61616";
ConnectionFactory cf=null;//連線工廠
Connection c=null;//連線例項
Session session=null;//會話
Destination des=null;//訊息佇列
MessageConsumer mc=null;//訊息消費者
cf=new ActiveMQConnectionFactory(username,password,url);
try {
c=cf.createConnection();
c.start();
session=c.createSession(true, Session.AUTO_ACKNOWLEDGE);
des=session.createQueue("QueueTest");//連線到名為QueueTest的訊息佇列
mc=session.createConsumer(des);//建立訊息消費者
while(true){
TextMessage tm=(TextMessage) mc.receive(100000);
if(tm!=null){
String text=tm.getText();
System.out.println("成功消費到訊息佇列中的訊息:"+text);
}else{
break;
}
}
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally{
try {
c.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
5.topic訂閱模式程式碼:
生產者:
/**
* 主題生產者
* @author Mac
*
*/
public class TopicProducer {
public static void main(String[] args) {
String username="system";
String password="manager";
String url="failover://tcp://localhost:61616";
ConnectionFactory cf=null;
Connection c=null;
Session s=null;
TextMessage tm=null;
Topic top=null;
MessageProducer mp=null;
try {
cf=new ActiveMQConnectionFactory(username,password,url);//建立連線
c=cf.createConnection();
c.start();//開始連線
s=c.createSession(true, Session.AUTO_ACKNOWLEDGE);//建立會話
top=s.createTopic("TopicTest");//建立topic訂閱佇列
mp=s.createProducer(top);//建立訂閱訊息生產者
mp.setDeliveryMode(DeliveryMode.NON_PERSISTENT);//資料不持久化
for (int i=0; i<10; i++) {
//建立要傳送的文字資訊
tm = s.createTextMessage("Topic主題測試" +(i+1));
//通過主題生產者發出訊息
mp.send(tm);
System.out.println("傳送成功:" + tm.getText());
}
s.commit();
} catch (Exception e) {
e.printStackTrace();
}finally{
if(null!=c){
try {
//c.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
消費者:
/*
* 主題消費者
*/
public class TopicConsumer {
public static void main(String[] args) {
String username="system";
String password="manager";
String url="failover://tcp://localhost:61616";
ConnectionFactory connectionFactory=null;
Connection connection=null;
Session session=null;
Topic topic=null;
MessageConsumer messageConsumer=null;
connectionFactory=new ActiveMQConnectionFactory(username,password,url);
try {
connection=connectionFactory.createConnection();
connection.start();
session=connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
topic=session.createTopic("TopicTest");
messageConsumer=session.createConsumer(topic);
System.out.println("1");
messageConsumer.setMessageListener(new myMessageListener());
System.out.println("2");
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally{
if(null!=connection){
try {
//connection.close();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}
class myMessageListener implements MessageListener{
@Override
public void onMessage(Message mes) {
System.out.println("xx");
TextMessage tm=(TextMessage) mes;
try {
System.out.println("訊息:"+tm.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}