tomcat+JNDI+ActiveMQ簡單例項
阿新 • • 發佈:2019-02-20
實現釋出/訂閱訊息:直接上程式碼
1、下載activeMq:
執行activeMq: 我的是64位系統 所以進入bin/64/資料夾下執行
執行後輸入:http://localhost:8161/admin 進入如下頁面。表示執行成功
2、tomcat配置
修改conf/context.xml檔案加入如下程式碼<Resource auth="Container" brokerName="localhost" brokerURL="failover:(tcp://localhost:61616)?initialReconnectDelay=100&maxReconnectAttempts=5" description="JMS Connection Factory" factory="org.apache.activemq.jndi.JNDIReferenceFactory" name="jms/FailoverConnectionFactory" type="org.apache.activemq.ActiveMQConnectionFactory" useEmbeddedBroker="false"/> <Resource auth="Container" brokerName="localhost" brokerURL="tcp://localhost:61616" description="JMS Connection Factory" factory="org.apache.activemq.jndi.JNDIReferenceFactory" name="jms/NormalConnectionFactory" type="org.apache.activemq.ActiveMQConnectionFactory" useEmbeddedBroker="false"/> <Resource auth="Container" factory="org.apache.activemq.jndi.JNDIReferenceFactory" name="jms/topic/MyTopic" physicalName="MY.TEST.FOO" type="org.apache.activemq.command.ActiveMQTopic"/> <Resource auth="Container" factory="org.apache.activemq.jndi.JNDIReferenceFactory" name="jms/queue/MyQueue" physicalName="MY.TEST.FOO.QUEUE" type="org.apache.activemq.command.ActiveMQQueue"/>
然後activeMq包下的 jar包放入在tomcat/lib包下
3、java程式碼
新建JMSListener類新建MyPublish類package com.flvcd.servlet; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.Topic; import javax.jms.TopicSubscriber; import javax.naming.Context; import javax.naming.InitialContext; import javax.servlet.ServletConfig; import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; import org.apache.activemq.ActiveMQConnectionFactory; /** * * * @version V1.0 */ public class JMSListener extends HttpServlet implements MessageListener{ public void init(ServletConfig config) throws ServletException{ try { InitialContext initCtx = new InitialContext();// Context envContext = (Context) initCtx.lookup("java:comp/env"); ConnectionFactory connectionFactory = (ConnectionFactory) envContext.lookup("jms/FailoverConnectionFactory"); Connection connection = connectionFactory.createConnection(); connection.setClientID("MyClient"); Session jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 普通訊息訂閱者,無法接收持久訊息 //MessageConsumer consumer = // jmsSession.createConsumer((Destination) // envContext.lookup("jms/topic/MyTopic")); // //基於Topic建立持久的訊息訂閱者,前提:Connection必須指定一個唯一的clientId,當前為MyClient TopicSubscriber consumer = jmsSession.createDurableSubscriber((Topic)envContext.lookup("jms/topic/MyTopic"), "MySub"); consumer.setMessageListener(this); connection.start(); } catch (Exception e) { // TODO: handle exception e.printStackTrace(); } } /** * 接收訊息,做對應處理 */ @Override public void onMessage(Message message) { // TODO Auto-generated method stub if(checkText(message,"RefreshArticleId")!=null){ String articleId = checkText(message,"RefreshArticleId"); System.out.println("接收重新整理文章訊息,開始重新整理文章ID="+articleId); }else if(checkText(message,"RefreshThreadId")!=null){ String threadId = checkText(message,"RefreshThreadId"); System.out.println("接收重新整理論壇帖子訊息,開始重新整理帖子ID="+threadId); }else{ System.out.println("接收普通訊息,不做任何處理!"); } } /** * * @Title: checkText * @return String * @author zyj * @throws */ private static String checkText(Message m, String s){ try { return m.getStringProperty(s); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); return null; } } }
web.xml加入以下配置package com.flvcd.servlet; import java.io.IOException; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.naming.Context; import javax.naming.InitialContext; import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; public class MyPublish extends HttpServlet implements MessageListener{ private InitialContext initCtx; private Context envContext; private ConnectionFactory connectionFactory; private Connection connection; private Session session; private MessageProducer producer; @Override public void onMessage(Message arg0) { // TODO Auto-generated method stub } public MyPublish() { super(); } public void destroy() { super.destroy(); // Just puts "destroy" string in log // Put your code here } public void init() throws ServletException { try { initCtx = new InitialContext(); envContext = (Context) initCtx.lookup("java:comp/env"); connectionFactory = (ConnectionFactory) envContext.lookup("jms/NormalConnectionFactory"); connection = connectionFactory.createConnection(); connection.createSession(false, Session.AUTO_ACKNOWLEDGE); session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); producer = session.createProducer((Destination) envContext.lookup("jms/topic/MyTopic")); } catch (Exception e) { // TODO: handle exception e.printStackTrace(); } } public void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { String content = request.getParameter("content"); try { producer.setDeliveryMode(DeliveryMode.PERSISTENT); Message testMessage = session.createMessage(); // 釋出重新整理文章訊息 testMessage.setStringProperty("RefreshArticleId",content); producer.send(testMessage); // 釋出重新整理帖子訊息 testMessage.clearProperties(); testMessage.setStringProperty("RefreshThreadId", content); producer.send(testMessage); } catch (Exception e) { // TODO: handle exception e.printStackTrace(); } } }
</welcome-file-list>
<servlet>
<servlet-name>sssss</servlet-name>
<servlet-class>com.flvcd.servlet.MyPublish</servlet-class>
</servlet>
<servlet-mapping>
<servlet-name>sssss</servlet-name>
<url-pattern>/myPublish.do</url-pattern>
</servlet-mapping>
<servlet>
<servlet-name>jms-listener</servlet-name>
<servlet-class>
com.flvcd.servlet.JMSListener
</servlet-class>
<load-on-startup>1</load-on-startup>
</servlet>
index.html
<form action="myPublish.do" method="post">
<input type="text" name="content"/>
<input type="submit" value="提交"/>
</form>
4、執行後列印日誌
接收重新整理文章訊息,開始重新整理文章ID=asd接收重新整理論壇帖子訊息,開始重新整理帖子ID=asd
5、activieMq對比
Number Of Consumers 消費者 這個是消費者端的消費者數量
Number Of Pending Messages 等待消費的訊息 這個是當前未出佇列的數量。可以理解為總接收數-總出佇列數
Messages Enqueued 進入佇列的訊息 進入佇列的總數量,包括出佇列的。 這個數量只增不減
Messages Dequeued 出了佇列的訊息 可以理解為是消費這消費掉的數量
這個要分兩種情況理解
在queues裡它和進入佇列的總數量相等(因為一個訊息只會被成功消費一次),如果暫時不等是因為消費者還沒來得及消費。
在 topics裡 它因為多消費者從而導致數量會比入佇列數高。
簡單的理解上面的意思就是
當有一個訊息進入這個佇列時,等待消費的訊息是1,進入佇列的訊息是1。
當訊息消費後,等待消費的訊息是0,進入佇列的訊息是1,出佇列的訊息是1.
在來一條訊息時,等待消費的訊息是1,進入佇列的訊息就是2.
沒有消費者時 Pending Messages 和 入佇列數量一樣
有消費者消費的時候 Pedding會減少 出佇列會增加
到最後 就是 入佇列和出佇列的數量一樣多
以此類推,進入佇列的訊息和出佇列的訊息是池子,等待消費的訊息是水流。