java 實現jms的客戶端(傳送接收訊息)
阿新 • • 發佈:2019-01-22
本文以ActiveMQ 訊息伺服器中介軟體為例。
實現的步驟如下:
1)例項化連線 工廠ConnectionFactory,主要設定的引數為連線到訊息伺服器中介軟體的使用者名稱,密碼及url.
2)通過連線工廠ConnectionFactory獲取到訊息中介軟體的連線Connection.
3)啟動連線,並建立訊息會話Session,用於傳送或接收訊息的執行緒
4)通過訊息會話建立訊息目的地Destination
5)建立訊息生產者MessageProducer或訊息消費者MessageConsumer
6)通過訊息生產者MessageProducer傳送訊息或通過訊息消費者MessageConsumer接收訊息
7)關閉並釋放連線資料
具體的實現程式碼如下:
傳送訊息客戶端程式碼如下:
接收訊息客戶端程式碼如下:import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.MessageProducer; import javax.jms.ObjectMessage; import javax.jms.Session; import javax.jms.StreamMessage; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import com.afmobi.jms.model.User; public class QueueSender { private ConnectionFactory connFactory; private Connection conn; private Session session; private MessageProducer producer; private boolean stop=false; public void execute() throws Exception { // 連線工廠 // 設定使用者名稱和密碼,這個使用者名稱和密碼在conf目錄下的credentials.properties檔案中 connFactory = new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); // 連線到JMS提供者 conn = connFactory.createConnection(); conn.start(); // 事務性會話,自動確認訊息 // 第一個引數是否使用事務:當訊息傳送者向訊息提供者(即訊息代理)傳送訊息時,訊息傳送者等待訊息代理的確認,沒有迴應則丟擲異常,訊息傳送程式負責處理這個錯誤。 // 第二個引數訊息的確認模式: // AUTO_ACKNOWLEDGE : // 指定訊息提供者在每次收到訊息時自動傳送確認。訊息只向目標傳送一次,但傳輸過程中可能因為錯誤而丟失訊息。 // CLIENT_ACKNOWLEDGE : // 由訊息接收者確認收到訊息,通過呼叫訊息的acknowledge()方法(會通知訊息提供者收到了訊息) // DUPS_OK_ACKNOWLEDGE : 指定訊息提供者在訊息接收者沒有確認傳送時重新發送訊息(這種確認模式不在乎接收者收到重複的訊息) session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE); // 建立目標,就建立主題也可以建立佇列 Destination destination = session.createQueue("queue.hello"); // 訊息生產者 producer = session.createProducer(destination); // 設定持久化,DeliveryMode.PERSISTENT和DeliveryMode.NON_PERSISTENT // 如果DeliveryMode沒有設定或者設定為NON_PERSISTENT,那麼重啟MQ之後訊息就會丟失。 producer.setDeliveryMode(DeliveryMode.PERSISTENT);// 持久化 // 傳送訊息 while(!stop){ Thread.sleep(1000); sendObject(session, producer); session.commit();// 在事務會話中,只有commit之後,訊息才會真正到達目的地 System.out.println("已傳送訊息"); } producer.close(); session.close(); conn.close(); } // 物件訊息 public void sendObject(Session session, MessageProducer producer) throws JMSException { User user = new User(); user.setAccount("petty"); user.setName("happy"); ObjectMessage objectMessage = session.createObjectMessage(); objectMessage.setObject(user); producer.send(objectMessage); } // 位元組訊息 public void sendBytes(Session session, MessageProducer producer) throws JMSException { String s = "BytesMessage位元組訊息"; BytesMessage bytesMessage = session.createBytesMessage(); bytesMessage.writeBytes(s.getBytes()); producer.send(bytesMessage); } // 流訊息 public void sendStream(Session session, MessageProducer producer) throws JMSException { StreamMessage streamMessage = session.createStreamMessage(); streamMessage.writeString("streamMessage流訊息"); streamMessage.writeLong(55); producer.send(streamMessage); } // 鍵值對訊息 public void sendMap(Session session, MessageProducer producer) throws JMSException { MapMessage mapMessage = session.createMapMessage(); mapMessage.setLong("age", 25); mapMessage.setDouble("sarray", new Double(6555.5)); mapMessage.setString("username", "鍵值對訊息"); producer.send(mapMessage); } // 文字訊息 public void sendText(Session session, MessageProducer producer) throws JMSException { TextMessage textMessage = session.createTextMessage("文字訊息"); producer.send(textMessage); } }
import javax.jms.BytesMessage; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.ObjectMessage; import javax.jms.Session; import javax.jms.StreamMessage; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import com.afmobi.jms.model.User; public class QueueReceiver implements MessageListener{ private ConnectionFactory connFactory; private Connection conn; private Session session; private boolean stop=false; public void execute()throws Exception{ //連線工廠 // 設定使用者名稱和密碼,這個使用者名稱和密碼在conf目錄下的credentials.properties檔案中 connFactory=new ActiveMQConnectionFactory( ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); //連線到JMS提供者 conn=connFactory.createConnection(); conn.start(); //事務性會話,自動確認訊息 // 第一個引數是否使用事務:當訊息傳送者向訊息提供者(即訊息代理)傳送訊息時,訊息傳送者等待訊息代理的確認,沒有迴應則丟擲異常,訊息傳送程式負責處理這個錯誤。 // 第二個引數訊息的確認模式: // AUTO_ACKNOWLEDGE : 指定訊息提供者在每次收到訊息時自動傳送確認。訊息只向目標傳送一次,但傳輸過程中可能因為錯誤而丟失訊息。 // CLIENT_ACKNOWLEDGE : 由訊息接收者確認收到訊息,通過呼叫訊息的acknowledge()方法(會通知訊息提供者收到了訊息) // DUPS_OK_ACKNOWLEDGE : 指定訊息提供者在訊息接收者沒有確認傳送時重新發送訊息(這種確認模式不在乎接收者收到重複的訊息) session=conn.createSession(true, Session.AUTO_ACKNOWLEDGE); // 建立目標,就建立主題也可以建立佇列 Destination destination=session.createQueue("queue.hello"); //訊息的消費者 MessageConsumer consumer=session.createConsumer(destination); consumer.setMessageListener(this); //等待接收訊息 while(!stop){ Thread.sleep(5000); } consumer.close(); session.close(); conn.close(); } public void onMessage(Message m) { try{ if(m instanceof TextMessage){//接收檔案訊息 TextMessage message=(TextMessage)m; System.out.println(message.getText()); }else if(m instanceof MapMessage){//接收鍵值訊息 MapMessage message=(MapMessage)m; System.out.println(message.getLong("age")); System.out.println(message.getDouble("sarray")); System.out.println(message.getString("username")); }else if(m instanceof StreamMessage){//接收流訊息 StreamMessage message=(StreamMessage)m; System.out.println(message.readString()); System.out.println(message.readLong()); }else if(m instanceof BytesMessage){ byte[] b=new byte[1024]; int len=-1; BytesMessage message=(BytesMessage)m; while((len=message.readBytes(b))!=-1){ System.out.println(new String(b,0,len)); } }else if(m instanceof ObjectMessage){ ObjectMessage message=(ObjectMessage)m; User user=(User)message.getObject(); System.out.println("name:"+user.getAccount()+";info:"+user.getName()); }else{ System.out.println(m); } session.commit(); }catch(JMSException e){ e.printStackTrace(); } } }
以上程式碼是PTP即點對點訊息模式的示例,如果採用Sub/Pub即釋出/訂閱者訊息模式,基本程式碼的實現過程都一樣,只需把
建立訊息目的地的程式碼
Destination destination = session.createQueue("queue.hello");
修改為
Destination destination=session.createTopic("topic.hello");
即可。