1. 程式人生 > >java 實現jms的客戶端(傳送接收訊息)

java 實現jms的客戶端(傳送接收訊息)

本文以ActiveMQ 訊息伺服器中介軟體為例。

實現的步驟如下:

1)例項化連線 工廠ConnectionFactory,主要設定的引數為連線到訊息伺服器中介軟體的使用者名稱,密碼及url.

2)通過連線工廠ConnectionFactory獲取到訊息中介軟體的連線Connection.

3)啟動連線,並建立訊息會話Session,用於傳送或接收訊息的執行緒

4)通過訊息會話建立訊息目的地Destination

5)建立訊息生產者MessageProducer或訊息消費者MessageConsumer

6)通過訊息生產者MessageProducer傳送訊息或通過訊息消費者MessageConsumer接收訊息

7)關閉並釋放連線資料

具體的實現程式碼如下:

傳送訊息客戶端程式碼如下:



import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import com.afmobi.jms.model.User;

public class QueueSender {
	
	private ConnectionFactory connFactory;
	private Connection conn;
	private Session session;
	private MessageProducer producer;
	private boolean stop=false;

	public void execute() throws Exception {
		// 連線工廠
		// 設定使用者名稱和密碼,這個使用者名稱和密碼在conf目錄下的credentials.properties檔案中
		connFactory = new ActiveMQConnectionFactory(
				ActiveMQConnection.DEFAULT_USER,
				ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
		// 連線到JMS提供者
		conn = connFactory.createConnection();
		conn.start();

		// 事務性會話,自動確認訊息
		// 第一個引數是否使用事務:當訊息傳送者向訊息提供者(即訊息代理)傳送訊息時,訊息傳送者等待訊息代理的確認,沒有迴應則丟擲異常,訊息傳送程式負責處理這個錯誤。
		// 第二個引數訊息的確認模式:
		// AUTO_ACKNOWLEDGE :
		// 指定訊息提供者在每次收到訊息時自動傳送確認。訊息只向目標傳送一次,但傳輸過程中可能因為錯誤而丟失訊息。
		// CLIENT_ACKNOWLEDGE :
		// 由訊息接收者確認收到訊息,通過呼叫訊息的acknowledge()方法(會通知訊息提供者收到了訊息)
		// DUPS_OK_ACKNOWLEDGE : 指定訊息提供者在訊息接收者沒有確認傳送時重新發送訊息(這種確認模式不在乎接收者收到重複的訊息)
		session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);

		// 建立目標,就建立主題也可以建立佇列
		Destination destination = session.createQueue("queue.hello");

		// 訊息生產者
		producer = session.createProducer(destination);
		// 設定持久化,DeliveryMode.PERSISTENT和DeliveryMode.NON_PERSISTENT
		// 如果DeliveryMode沒有設定或者設定為NON_PERSISTENT,那麼重啟MQ之後訊息就會丟失。
		producer.setDeliveryMode(DeliveryMode.PERSISTENT);// 持久化

		// 傳送訊息
		while(!stop){
			Thread.sleep(1000);
			sendObject(session, producer);
			session.commit();// 在事務會話中,只有commit之後,訊息才會真正到達目的地
			System.out.println("已傳送訊息");
			
		} 
		

		
		producer.close();
		session.close();
		conn.close();
	}

	// 物件訊息
	public void sendObject(Session session, MessageProducer producer)
			throws JMSException {
		User user = new User();
		user.setAccount("petty");
		user.setName("happy");
		ObjectMessage objectMessage = session.createObjectMessage();
		objectMessage.setObject(user);
		producer.send(objectMessage);
	}

	// 位元組訊息
	public void sendBytes(Session session, MessageProducer producer)
			throws JMSException {
		String s = "BytesMessage位元組訊息";
		BytesMessage bytesMessage = session.createBytesMessage();
		bytesMessage.writeBytes(s.getBytes());
		producer.send(bytesMessage);
	}

	// 流訊息
	public void sendStream(Session session, MessageProducer producer)
			throws JMSException {
		StreamMessage streamMessage = session.createStreamMessage();
		streamMessage.writeString("streamMessage流訊息");
		streamMessage.writeLong(55);
		producer.send(streamMessage);
	}

	// 鍵值對訊息
	public void sendMap(Session session, MessageProducer producer)
			throws JMSException {
		MapMessage mapMessage = session.createMapMessage();
		mapMessage.setLong("age", 25);
		mapMessage.setDouble("sarray", new Double(6555.5));
		mapMessage.setString("username", "鍵值對訊息");
		producer.send(mapMessage);
	}

	// 文字訊息
	public void sendText(Session session, MessageProducer producer)
			throws JMSException {
		TextMessage textMessage = session.createTextMessage("文字訊息");
		producer.send(textMessage);
	}


}
接收訊息客戶端程式碼如下:


import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import com.afmobi.jms.model.User;

public class QueueReceiver implements MessageListener{
	
	private ConnectionFactory connFactory;
	private Connection conn;
	private Session session;
	private boolean stop=false;
	
	public void execute()throws Exception{
		//連線工廠
		// 設定使用者名稱和密碼,這個使用者名稱和密碼在conf目錄下的credentials.properties檔案中
		connFactory=new ActiveMQConnectionFactory(
				ActiveMQConnection.DEFAULT_USER,
				ActiveMQConnection.DEFAULT_PASSWORD,
				"tcp://localhost:61616");
		
		//連線到JMS提供者
		conn=connFactory.createConnection();
		conn.start();
		
		//事務性會話,自動確認訊息
		// 第一個引數是否使用事務:當訊息傳送者向訊息提供者(即訊息代理)傳送訊息時,訊息傳送者等待訊息代理的確認,沒有迴應則丟擲異常,訊息傳送程式負責處理這個錯誤。
        // 第二個引數訊息的確認模式:
        // AUTO_ACKNOWLEDGE : 指定訊息提供者在每次收到訊息時自動傳送確認。訊息只向目標傳送一次,但傳輸過程中可能因為錯誤而丟失訊息。
        // CLIENT_ACKNOWLEDGE : 由訊息接收者確認收到訊息,通過呼叫訊息的acknowledge()方法(會通知訊息提供者收到了訊息)
        // DUPS_OK_ACKNOWLEDGE : 指定訊息提供者在訊息接收者沒有確認傳送時重新發送訊息(這種確認模式不在乎接收者收到重複的訊息)
		session=conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
		
		// 建立目標,就建立主題也可以建立佇列
		Destination destination=session.createQueue("queue.hello");
		
		//訊息的消費者
		MessageConsumer consumer=session.createConsumer(destination);
		consumer.setMessageListener(this);
		
		//等待接收訊息
		while(!stop){
			Thread.sleep(5000);
		}
				
		consumer.close();
		session.close();
		conn.close();
	}

	public void onMessage(Message m) {
		try{
			if(m instanceof TextMessage){//接收檔案訊息
				TextMessage message=(TextMessage)m;
				System.out.println(message.getText());
			}else if(m instanceof MapMessage){//接收鍵值訊息
				MapMessage message=(MapMessage)m;
				System.out.println(message.getLong("age"));
				System.out.println(message.getDouble("sarray"));
				System.out.println(message.getString("username"));
			}else if(m instanceof StreamMessage){//接收流訊息
				StreamMessage message=(StreamMessage)m;
				System.out.println(message.readString());
				System.out.println(message.readLong());
			}else if(m instanceof BytesMessage){
				byte[] b=new byte[1024];
				int len=-1;
				BytesMessage message=(BytesMessage)m;
				while((len=message.readBytes(b))!=-1){
					System.out.println(new String(b,0,len));
				}
			}else if(m instanceof ObjectMessage){
				ObjectMessage message=(ObjectMessage)m;
				User user=(User)message.getObject();
				System.out.println("name:"+user.getAccount()+";info:"+user.getName());
			}else{
				System.out.println(m);
			}
			session.commit();
		}catch(JMSException e){
			e.printStackTrace();
		}
		
		
	}

}


以上程式碼是PTP即點對點訊息模式的示例,如果採用Sub/Pub即釋出/訂閱者訊息模式,基本程式碼的實現過程都一樣,只需把

建立訊息目的地的程式碼
Destination destination = session.createQueue("queue.hello");

修改為

Destination destination=session.createTopic("topic.hello");

即可。