1. 程式人生 > >AMQ的一些簡單實戰

AMQ的一些簡單實戰

目錄

三、釋出

一、建立客戶端

public class Sender {

	public static void main(String[] args) throws Exception{
		
		//第一步:建立connectionFactory工廠物件【需填入使用者名稱、密碼、要連線的地址】
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("bhz","bhz","tcp://localhost:61616");
		
		//第二步:通過ConnectionFactory工廠物件建立一個Connection連線,並且呼叫Connection的start方法開啟連線【connection預設是關閉的】
		Connection connection = connectionFactory.createConnection();
		connection.start();
		
		//第三步:通過connection建立session會話(上下文環境物件),用於接收訊息,引數配置1為是否啟用事務,引數配置2為簽收模式【一般我們設定為自動簽收】
	//	Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

		//使用事務的方式進行訊息的傳送
	//	Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
		
		//使用CLIENT端簽收的方式
		Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
		
		//第四步:通過session建立Destination物件,指的是一個客戶端用來指定生產訊息目標和消費訊息來源的物件,在PTP模式中,Destination被稱作Queue即佇列;在Pub/Sub模式,Destination被稱作Topic即主題.在程式中可以使用多個Queue和Topic.
		Destination destination = session.createQueue("queue1");
		
		//第五步:通過session物件建立訊息的傳送和接收物件(生產者和消費者)MessageProducer/MessageConsumer
		MessageProducer messageProducer = session.createProducer(null);
		
		//第六步:可以使用MessageProducer的setDeliveryMode方法為其設定持久化特性和非持久化特性(DeliveryMode)
		messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
		
		//第七步:使用JMS規範的TextMessage形式建立資料(通過session物件),並用MessageProducer的send方法傳送資料
		for (int i = 1; i <= 5; i++) {
			TextMessage textMessage = session.createTextMessage("helloworld"+i);
			//第一個引數: 目的地
			//第二個引數: 訊息
			//第三個引數: 是否持久化
			//第四個引數: 優先順序【0-9  0-4表示普通    5-9表示加急  預設4】
			//第五個引數: 訊息在mq上的存放有效期【單位毫秒】
			messageProducer.send(destination, textMessage, DeliveryMode.NON_PERSISTENT, i, 1000*60*2);
			System.out.println("生產者:"+textMessage.getText());
		}
		
		//使用事務提交
		//session.commit();
		
		if (connection!=null) {
			connection.close();
		}
	}
}

 二、建立接收端

public class Receiver {

public static void main(String[] args) throws Exception{
		
		//第一步:建立connectionFactory工廠物件【需填入使用者名稱、密碼、要連線的地址】
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("bhz","bhz","tcp://localhost:61616");
		
		//第二步:通過ConnectionFactory工廠物件建立一個Connection連線,並且呼叫Connection的start方法開啟連線【connection預設是關閉的】
		Connection connection = connectionFactory.createConnection();
		connection.start();
		
		//第三步:通過connection建立session會話(上下文環境物件),用於接收訊息,引數配置1為是否啟用事務,引數配置2為簽收模式【一般我們設定為自動簽收】
		Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
		
		//第四步:通過session建立Destination物件,指的是一個客戶端用來指定生產訊息目標和消費訊息來源的物件,在PTP模式中,Destination被稱作Queue即佇列;在Pub/Sub模式,Destination被稱作Topic即主題.在程式中可以使用多個Queue和Topic.
		Destination destination = session.createQueue("queue1");
		
		//第五步:通過session物件建立訊息的傳送和接收物件(生產者和消費者)MessageProducer/MessageConsumer
		MessageConsumer messageConsumer = session.createConsumer(destination);
			
		//第七步:使用JMS規範的TextMessage形式建立資料(通過session物件),並用MessageProducer的send方法傳送資料
		while (true) {
			TextMessage msg = (TextMessage) messageConsumer.receive();
			//手動去簽收訊息,另起一個執行緒(TCP)去通知我們的MQ服務,確認簽收
			Thread.sleep(1000);
			msg.acknowledge();
			if(msg == null) break;
			System.out.println(msg.getText());
		}

		if (connection!=null) {
			connection.close();
		}
	}
}

三、釋出

public class Publish {

	private ConnectionFactory factory;
	private Connection connection;
	private Session session;
	private MessageProducer producer;
	public Publish(){
		try {
			factory = new ActiveMQConnectionFactory(
					"bhz",//ActiveMQConnection.DEFAULT_USER, 
					"bhz",//ActiveMQConnection.DEFAULT_PASSWORD,
					"tcp://localhost:61616");
		    connection = factory.createConnection();
		    connection.start();
		    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		    producer = session.createProducer(null);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	public void sendMessage() throws Exception{
		Destination destination = session.createTopic("topic1");
		TextMessage textMessage = session.createTextMessage("我是內容");
		producer.send(destination, textMessage);
	}
	
	public static void main(String[] args) throws Exception {
		Publish p = new Publish();
		p.sendMessage();
	}
}

四、使用者訂閱

public class Consumer1 {

	private ConnectionFactory factory;
	private Connection connection;
	private Session session;
	private MessageConsumer consumer;
	public Consumer1(){
		try {
			factory = new ActiveMQConnectionFactory(
					"bhz",//ActiveMQConnection.DEFAULT_USER, 
					"bhz",//ActiveMQConnection.DEFAULT_PASSWORD,
					"tcp://localhost:61616");
		    connection = factory.createConnection();
		    connection.start();
		    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	public void receive() throws Exception{
		Destination destination = session.createTopic("topic1");
		consumer = session.createConsumer(destination);
		consumer.setMessageListener(new Listener());
	}
	
	class Listener implements MessageListener{

		public void onMessage(Message message) {
			try {
				System.out.println("consumer1收到訊息:"+((TextMessage)message).getText());
			} catch (JMSException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
		
	}
	
	public static void main(String[] args) throws Exception {
		Consumer1 p = new Consumer1();
		p.receive();
	}
}

五、配置安全認證

1、 activemq.xml中

在<shutdownHooks>             <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />         </shutdownHooks>下面新增            <plugins>              <simpleAuthenticationPlugin>                  <users>                       <authenticationUser username="bhz" password="bhz" groups="users,admins"/>                  </users>              </simpleAuthenticationPlugin>   </plugins>  2、對應的java程式碼要改成

//第一步:建立connectionFactory工廠物件【需填入使用者名稱、密碼、要連線的地址】         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("bhz","bhz","tcp://localhost:61616");

 3、頁面登入:http://127.0.0.1:8161/ 還是按照jconf/jetty-real.properties中的配置來        # Defines users that can access the web (console, demo, etc.)    # username: password [,rolename ...]    admin: admin, admin