1. 程式人生 > >tomcat+JNDI+ActiveMQ簡單例項

tomcat+JNDI+ActiveMQ簡單例項

實現釋出/訂閱訊息:直接上程式碼

1、下載activeMq:

執行activeMq: 我的是64位系統 所以進入bin/64/資料夾下執行

執行後輸入:http://localhost:8161/admin 進入如下頁面。表示執行成功


2、tomcat配置

修改conf/context.xml檔案加入如下程式碼
<Resource auth="Container" brokerName="localhost" brokerURL="failover:(tcp://localhost:61616)?initialReconnectDelay=100&maxReconnectAttempts=5" description="JMS Connection Factory" factory="org.apache.activemq.jndi.JNDIReferenceFactory" name="jms/FailoverConnectionFactory" type="org.apache.activemq.ActiveMQConnectionFactory" useEmbeddedBroker="false"/> 
    <Resource auth="Container" brokerName="localhost" brokerURL="tcp://localhost:61616" description="JMS Connection Factory" factory="org.apache.activemq.jndi.JNDIReferenceFactory" name="jms/NormalConnectionFactory" type="org.apache.activemq.ActiveMQConnectionFactory" useEmbeddedBroker="false"/> 
    <Resource auth="Container" factory="org.apache.activemq.jndi.JNDIReferenceFactory" name="jms/topic/MyTopic" physicalName="MY.TEST.FOO" type="org.apache.activemq.command.ActiveMQTopic"/> 
    <Resource auth="Container" factory="org.apache.activemq.jndi.JNDIReferenceFactory" name="jms/queue/MyQueue" physicalName="MY.TEST.FOO.QUEUE" type="org.apache.activemq.command.ActiveMQQueue"/> 

然後activeMq包下的 jar包放入在tomcat/lib包下

3、java程式碼

新建JMSListener類
package com.flvcd.servlet;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
 * 
 * 
 * @version V1.0
 */
public class JMSListener extends HttpServlet implements MessageListener{

	public void init(ServletConfig config) throws ServletException{
		try {
			InitialContext initCtx = new InitialContext();//
			Context envContext = (Context) initCtx.lookup("java:comp/env");
			ConnectionFactory connectionFactory = (ConnectionFactory) envContext.lookup("jms/FailoverConnectionFactory");
			Connection connection = connectionFactory.createConnection();
			connection.setClientID("MyClient");
			Session jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			// 普通訊息訂閱者,無法接收持久訊息 //MessageConsumer consumer =
            // jmsSession.createConsumer((Destination)
            // envContext.lookup("jms/topic/MyTopic"));
            // //基於Topic建立持久的訊息訂閱者,前提:Connection必須指定一個唯一的clientId,當前為MyClient
			TopicSubscriber consumer = jmsSession.createDurableSubscriber((Topic)envContext.lookup("jms/topic/MyTopic"), "MySub");
			consumer.setMessageListener(this);
			connection.start();
		} catch (Exception e) {
			// TODO: handle exception
			e.printStackTrace();
		}
	}
	
	/**
	 * 接收訊息,做對應處理
	 */
	@Override
	public void onMessage(Message message) {
		// TODO Auto-generated method stub
		if(checkText(message,"RefreshArticleId")!=null){
			String articleId = checkText(message,"RefreshArticleId");
			System.out.println("接收重新整理文章訊息,開始重新整理文章ID="+articleId);
		}else if(checkText(message,"RefreshThreadId")!=null){
			String threadId = checkText(message,"RefreshThreadId");
			System.out.println("接收重新整理論壇帖子訊息,開始重新整理帖子ID="+threadId);
		}else{
			System.out.println("接收普通訊息,不做任何處理!");
		}
	}
	
	/**
	 * 
	 * @Title: checkText 
	 * @return String
	 * @author zyj
	 * @throws
	 */
	private static String checkText(Message m, String s){
		try {
			return m.getStringProperty(s);
		} catch (JMSException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
			return null;
		}
	}

}
新建MyPublish類
package com.flvcd.servlet;


import java.io.IOException;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

public class MyPublish extends HttpServlet implements MessageListener{

	private InitialContext initCtx;
	private Context envContext;
	private ConnectionFactory connectionFactory;
	private Connection connection;
	private Session session;
	private MessageProducer producer;
	
	@Override
	public void onMessage(Message arg0) {
		// TODO Auto-generated method stub
	}
	
	public MyPublish() { 
        super(); 
    } 
	
	public void destroy() { 
        super.destroy(); // Just puts "destroy" string in log 
        // Put your code here 
    } 
	
	public void init() throws ServletException { 
		try {
			initCtx = new InitialContext();
			envContext = (Context) initCtx.lookup("java:comp/env");
			connectionFactory = (ConnectionFactory) envContext.lookup("jms/NormalConnectionFactory");
			connection = connectionFactory.createConnection();
			connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
			producer = session.createProducer((Destination) envContext.lookup("jms/topic/MyTopic"));
		} catch (Exception e) {
			// TODO: handle exception
			e.printStackTrace();
		}
	}
	
	public void doPost(HttpServletRequest request, HttpServletResponse response) 
            throws ServletException, IOException { 
		String content = request.getParameter("content");
		
		try {
			producer.setDeliveryMode(DeliveryMode.PERSISTENT);
			Message testMessage = session.createMessage();
			// 釋出重新整理文章訊息
			testMessage.setStringProperty("RefreshArticleId",content);
			producer.send(testMessage);
			// 釋出重新整理帖子訊息
			testMessage.clearProperties();
			testMessage.setStringProperty("RefreshThreadId", content); 
	        producer.send(testMessage); 
		} catch (Exception e) {
			// TODO: handle exception
			e.printStackTrace();
		}
		
	}

}
web.xml加入以下配置
  </welcome-file-list>
  <servlet>
  	<servlet-name>sssss</servlet-name>
  	<servlet-class>com.flvcd.servlet.MyPublish</servlet-class>
  </servlet>
  <servlet-mapping>
  	<servlet-name>sssss</servlet-name>
  	<url-pattern>/myPublish.do</url-pattern>
  </servlet-mapping>
  
  <servlet>
    <servlet-name>jms-listener</servlet-name>
    <servlet-class>
       com.flvcd.servlet.JMSListener
    </servlet-class>
    <load-on-startup>1</load-on-startup>
  </servlet>
  

index.html
	<form action="myPublish.do" method="post">
		<input type="text" name="content"/>
		<input type="submit" value="提交"/>
	</form>

4、執行後列印日誌

接收重新整理文章訊息,開始重新整理文章ID=asd
接收重新整理論壇帖子訊息,開始重新整理帖子ID=asd

5、activieMq對比



Number Of Consumers  消費者 這個是消費者端的消費者數量 

Number Of Pending Messages 等待消費的訊息 這個是當前未出佇列的數量。可以理解為總接收數-總出佇列數 
Messages Enqueued 進入佇列的訊息  進入佇列的總數量,包括出佇列的。 這個數量只增不減 
Messages Dequeued 出了佇列的訊息  可以理解為是消費這消費掉的數量 
這個要分兩種情況理解 
在queues裡它和進入佇列的總數量相等(因為一個訊息只會被成功消費一次),如果暫時不等是因為消費者還沒來得及消費。 
在 topics裡 它因為多消費者從而導致數量會比入佇列數高。 
簡單的理解上面的意思就是 
當有一個訊息進入這個佇列時,等待消費的訊息是1,進入佇列的訊息是1。 
當訊息消費後,等待消費的訊息是0,進入佇列的訊息是1,出佇列的訊息是1. 
在來一條訊息時,等待消費的訊息是1,進入佇列的訊息就是2. 


沒有消費者時  Pending Messages   和 入佇列數量一樣 
有消費者消費的時候 Pedding會減少 出佇列會增加 
到最後 就是 入佇列和出佇列的數量一樣多 
以此類推,進入佇列的訊息和出佇列的訊息是池子,等待消費的訊息是水流。