1. 程式人生 > >IBM MQ 學習

IBM MQ 學習

ges code 關閉 第三方 chan tostring adf log pri

  1 import java.io.IOException;  
  2 import java.util.HashMap;  
  3 import java.util.Map;  
  4   
  5 import com.ibm.mq.MQC;  
  6 import com.ibm.mq.MQEnvironment;  
  7 import com.ibm.mq.MQException;  
  8 import com.ibm.mq.MQGetMessageOptions;  
  9 import com.ibm.mq.MQMessage;  
 10 import com.ibm.mq.MQPutMessageOptions;  
11 import com.ibm.mq.MQQueue; 12 import com.ibm.mq.MQQueueManager; 13 14 public class CLIENT_MQ{ 15 //定義隊列管理器和隊列的名稱 16 private static final String qmName = "MQ_SERVICE";//MQ的隊列管理器名稱 ; 17 //private static final String qName = "MIDDLE_SEND_QUEUE"; //MQ遠程隊列的名稱 18
private static MQQueueManager qMgr;//隊列管理器 19 public static void init(){ 20 //設置環境: 21 //MQEnvironment中包含控制MQQueueManager對象中的環境的構成的靜態變量,MQEnvironment的值的設定會在MQQueueManager的構造函數加載的時候起作用, 22 //因此必須在建立MQQueueManager對象之前設定MQEnvironment中的值. 23 MQEnvironment.hostname="10.172.12.156"; //
MQ服務器的IP地址 24 MQEnvironment.channel="SERVICE_JAVA"; //通道類型:服務器連接 25 MQEnvironment.CCSID=1381;//437 //服務器MQ服務使用的編碼1381代表GBK、1208代表UTF(Coded Character Set Identifier:CCSID) 26 MQEnvironment.port=1456; //MQ端口 27 try { 28 //定義並初始化隊列管理器對象並連接 29 //MQQueueManager可以被多線程共享,但是從MQ獲取信息的時候是同步的,任何時候只有一個線程可以和MQ通信。 30 qMgr = new MQQueueManager(qmName); 31 } catch (MQException e) { 32 // TODO Auto-generated catch block 33 System.out.println("初使化MQ出錯"); 34 e.printStackTrace(); 35 } 36 } 37 /** 38 * 往MQ發送消息 39 * @param message 40 * @return 41 */ 42 public static Map<String,Object> sendMessage(Object message,String qName){ 43 Map<String,Object> map=new HashMap<String,Object>(); 44 try{ 45 //設置將要連接的隊列屬性 46 // Note. The MQC interface defines all the constants used by the WebSphere MQ Java programming interface 47 //(except for completion code constants and error code constants). 48 //MQOO_INPUT_AS_Q_DEF:Open the queue to get messages using the queue-defined default. 49 //MQOO_OUTPUT:Open the queue to put messages. 50 /*目標為遠程隊列,所有這裏不可以用MQOO_INPUT_AS_Q_DEF屬性*/ 51 //int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT; 52 /*以下選項可適合遠程隊列與本地隊列*/ 53 //int openOptions = MQC.MQOO_INQUIRE | MQC.MQOO_OUTPUT; //發送時使用 54 //int qOptioin = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT; 接收時使用 55 int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING; 56 //連接隊列 57 //MQQueue provides inquire, set, put and get operations for WebSphere MQ queues. 58 //The inquire and set capabilities are inherited from MQManagedObject. 59 /*關閉了就重新打開*/ 60 if(qMgr==null || !qMgr.isConnected()){ 61 qMgr = new MQQueueManager(qmName); 62 } 63 MQQueue queue = qMgr.accessQueue(qName, openOptions); 64 //定義一個簡單的消息 65 MQMessage putMessage = new MQMessage(); 66 map.put("messageId",putMessage); 67 //String uuid=java.util.UUID.randomUUID().toString(); 68 //將數據放入消息緩沖區 69 putMessage.writeObject(message); 70 //設置寫入消息的屬性(默認屬性) 71 MQPutMessageOptions pmo = new MQPutMessageOptions(); 72 73 //將消息寫入隊列 74 queue.put(putMessage,pmo); 75 map.put("message",message.toString()); 76 queue.close(); 77 }catch (MQException ex) { 78 System.out.println("A WebSphere MQ error occurred : Completion code " + ex.completionCode + " Reason code " + ex.reasonCode); 79 ex.printStackTrace(); 80 }catch (IOException ex) { 81 System.out.println("An error occurred whilst writing to the message buffer: " + ex); 82 }catch(Exception ex){ 83 ex.printStackTrace(); 84 }finally{ 85 try { 86 qMgr.disconnect(); 87 } catch (MQException e) { 88 e.printStackTrace(); 89 } 90 } 91 return map; 92 } 93 94 95 96 97 98 /** 99 * 處理完消息回放到MQ隊列 100 * @param message 101 * @return 102 */ 103 public static Map<String,Object> sendReplyMessage(Object message,String qName,MQMessage mqMessage){ 104 Map<String,Object> map=new HashMap<String,Object>(); 105 try{ 106 //設置將要連接的隊列屬性 107 // Note. The MQC interface defines all the constants used by the WebSphere MQ Java programming interface 108 //(except for completion code constants and error code constants). 109 //MQOO_INPUT_AS_Q_DEF:Open the queue to get messages using the queue-defined default. 110 //MQOO_OUTPUT:Open the queue to put messages. 111 /*目標為遠程隊列,所有這裏不可以用MQOO_INPUT_AS_Q_DEF屬性*/ 112 //int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT; 113 /*以下選項可適合遠程隊列與本地隊列*/ 114 //int openOptions = MQC.MQOO_INQUIRE | MQC.MQOO_OUTPUT; //發送時使用 115 //int qOptioin = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT; 接收時使用 116 int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING; 117 //連接隊列 118 //MQQueue provides inquire, set, put and get operations for WebSphere MQ queues. 119 //The inquire and set capabilities are inherited from MQManagedObject. 120 /*關閉了就重新打開*/ 121 if(qMgr==null || !qMgr.isConnected()){ 122 qMgr = new MQQueueManager(qmName); 123 } 124 MQQueue queue = qMgr.accessQueue(qName, openOptions); 125 //定義一個簡單的消息 126 MQMessage putMessage = new MQMessage(); 127 putMessage.messageId=mqMessage.messageId; 128 map.put("messageId",putMessage); 129 //String uuid=java.util.UUID.randomUUID().toString(); 130 //將數據放入消息緩沖區 131 putMessage.writeObject(message); 132 //設置寫入消息的屬性(默認屬性) 133 MQPutMessageOptions pmo = new MQPutMessageOptions(); 134 135 //將消息寫入隊列 136 queue.put(putMessage,pmo); 137 map.put("message",message.toString()); 138 queue.close(); 139 }catch (MQException ex) { 140 System.out.println("A WebSphere MQ error occurred : Completion code " + ex.completionCode + " Reason code " + ex.reasonCode); 141 ex.printStackTrace(); 142 }catch (IOException ex) { 143 System.out.println("An error occurred whilst writing to the message buffer: " + ex); 144 }catch(Exception ex){ 145 ex.printStackTrace(); 146 }finally{ 147 try { 148 qMgr.disconnect(); 149 } catch (MQException e) { 150 e.printStackTrace(); 151 } 152 } 153 return map; 154 } 155 156 157 158 159 /** 160 * 從隊列中去獲取消息,如果隊列中沒有消息,就會發生異常,不過沒有關系,有TRY...CATCH,如果是第三方程序調用方法,如果無返回則說明無消息 161 * 第三方可以將該方法放於一個無限循環的while(true){...}之中,不需要設置等待,因為在該方法內部在沒有消息的時候會自動等待。 162 * @return 163 */ 164 public static String getMessage(String qName,MQMessage mqMessage){ 165 String message=""; 166 try{ 167 //設置將要連接的隊列屬性 168 // Note. The MQC interface defines all the constants used by the WebSphere MQ Java programming interface 169 //(except for completion code constants and error code constants). 170 //MQOO_INPUT_AS_Q_DEF:Open the queue to get messages using the queue-defined default. 171 //MQOO_OUTPUT:Open the queue to put messages. 172 //int qOptioin = MQC.MQOO_INQUIRE | MQC.MQOO_OUTPUT; 發送時使用 173 //int qOptioin = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT; 接收時使用 174 175 int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT; 176 MQMessage retrieve = new MQMessage(); 177 //設置取出消息的屬性(默認屬性) 178 //Set the put message options.(設置放置消息選項) 179 MQGetMessageOptions gmo = new MQGetMessageOptions(); 180 181 gmo.options = gmo.options + MQC.MQGMO_SYNCPOINT;//Get messages under sync point control(在同步點控制下獲取消息) 182 gmo.options = gmo.options + MQC.MQGMO_WAIT; // Wait if no messages on the Queue(如果在隊列上沒有消息則等待) 183 gmo.options = gmo.options + MQC.MQGMO_FAIL_IF_QUIESCING;// Fail if Qeue Manager Quiescing(如果隊列管理器停頓則失敗) 184 gmo.waitInterval = 3000 ; // Sets the time limit for the wait.(設置等待的毫秒時間限制) 185 /*關閉了就重新打開*/ 186 if(qMgr==null || !qMgr.isConnected()){ 187 qMgr = new MQQueueManager(qmName); 188 } 189 MQQueue queue = qMgr.accessQueue(qName, openOptions); 190 191 MQMessage retrievedMessage = new MQMessage(); 192 //從隊列中取出對應messageId的消息 193 retrieve.messageId = mqMessage.messageId; 194 // 從隊列中取出消息 195 queue.get(retrieve, gmo); 196 197 198 Object obj = retrieve.readObject(); 199 message=obj.toString();//解決中文亂碼問題 200 /* 201 202 //int size = rcvMessage.getMessageLength(); 203 //byte[] p = new byte[size]; 204 //rcvMessage.readFully(p); 205 206 int len=retrieve.getDataLength(); 207 byte[] str = new byte[len]; 208 retrieve.readFully(str,0,len); 209 message = new String(str);//readUTF(); 210 */ 211 212 queue.close(); 213 }catch (MQException ex) { 214 int reason=ex.reasonCode; 215 if(reason==2033)//no messages 216 { 217 message="nomessage"; 218 }else{ 219 System.out.println("A WebSphere MQ error occurred : Completion code " + ex.completionCode + " Reason code " + ex.reasonCode); 220 } 221 }catch (IOException ex) { 222 System.out.println("An error occurred whilst writing to the message buffer: " + ex); 223 }catch(Exception ex){ 224 ex.printStackTrace(); 225 }finally{ 226 try { 227 qMgr.disconnect(); 228 } catch (MQException e) { 229 e.printStackTrace(); 230 } 231 } 232 return message; 233 } 234 235 236 237 public static void main(String args[]) { 238 init(); 239 Map<String,Object> map = new HashMap<String,Object>(); 240 map=sendMessage("{name: test get message id 123}","SERVICE_TRANSFER_QUEUE"); 241 MQMessage mqMessage = (MQMessage)map.get("messageId"); 242 outSys("傳輸消息:",mqMessage.messageId.toString()); 243 244 outSys("接收傳輸隊列:",getMessage("SERVICE_TRANSFER_QUEUE",mqMessage)); 245 Map<String,Object> reply_map = new HashMap<String,Object>(); 246 reply_map=sendReplyMessage("{name: local queue 008}","SERVICE_RECEIVE_QUEUE",mqMessage); 247 outSys("放入正常隊列:",reply_map.get("message").toString()); 248 249 outSys("接收正常隊列:",getMessage("SERVICE_RECEIVE_QUEUE",mqMessage)); 250 251 252 } 253 254 255 256 public static void outSys(String display,String val){ 257 System.out.println(display+val); 258 } 259 260 }

IBM MQ 學習