關於apache-activemq傳送圖片未處理的解決方案
阿新 • • 發佈:2019-01-07
圖片可以通過BlobMessage物件傳送,但是在生產者訊息傳送成功後,消費者接收到,要首先 session.commit();否則一直要等待處理,程式碼如下,消費者程式碼:
生產者:package org.aisino.mq; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.util.Map; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import org.aisino.common.Constants; import org.aisino.common.HttpClientUtil; import org.aisino.common.RequestUtils; import org.aisino.common.xutils.JsonUtil; import org.aisino.common.xutils.PropertiesLoader; import org.aisino.dlgs.cgsdzda.vo.FileVo; import org.aisino.rpcclient.HttpClient; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.BlobMessage; import org.apache.commons.codec.binary.Base64; import org.apache.commons.net.ftp.FTP; import org.apache.commons.net.ftp.FTPClient; import org.apache.commons.net.ftp.FTPFile; import org.springframework.beans.factory.InitializingBean; public class ThreadQueueConsumerN implements Runnable, InitializingBean { /** * @param args * @throws JMSException */ private static String user = ActiveMQConnection.DEFAULT_USER; private static String password = ActiveMQConnection.DEFAULT_PASSWORD; private static String url = "tcp://127.0.0.1:61616?jms.blobTransferPolicy.defaultUploadUrl=http://127.0.0.1:8161/fileserver/"; Connection connection; Session session; @Override public void afterPropertiesSet() throws Exception { new Thread(this).start(); } @Override public void run() { try { // 獲取 ConnectionFactory PropertiesLoader loader = new PropertiesLoader("global.properties"); url = loader.getProperty("mq_url_receive");// 從global.properties中取值 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( user, password, url); System.out.println("進入了內網...........................run"); // 建立 Connection connection = connectionFactory.createConnection(); connection.start(); // 建立 Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 建立 Destinatione Destination destination = session.createQueue("queue1"); // 建立 Consumer MessageConsumer consumer = session.createConsumer(destination); // 註冊訊息監聽器,當訊息到達時被觸發並處理訊息 consumer.setMessageListener(new MessageListener() { // 監聽器中處理訊息 public void onMessage(Message message) { if (message instanceof BlobMessage) { try { session.commit();//注意要先commit,否則一直顯示未處理 } catch (JMSException e) { e.printStackTrace(); } BlobMessage blobMessage = (BlobMessage) message; String fileName = null; try { fileName = blobMessage .getStringProperty("FILE.NAME");// 在json中取不到,要在blobMessage物件中才可以取到 File file = new File(fileName);// 存入臨時檔案 if (!file.exists()) { file.createNewFile(); } OutputStream os = new FileOutputStream(file); System.out.println("開始接收檔案:" + fileName); InputStream inputStream = blobMessage .getInputStream(); // 寫檔案,你也可以使用其他方式 byte[] buff = new byte[256]; int len = 0; while ((len = inputStream.read(buff)) > 0) { os.write(buff, 0, len); } os.close(); String json = blobMessage.getStringProperty("json"); System.out.println("完成檔案接收:" + file.getName() + " 路徑 :" + file.getPath()); System.out.println("json:" + json); FileVo vo = uploadConfigFile(file);// 上傳FTP String f = vo.getFilename();// 重新命名後的檔名,等會兒傳入後臺 String furl = vo.getFileurl();// 重新命名後的檔名,等會兒傳入後臺 Map map = JsonUtil.JsonObjStr2Map(json); map.put(Constants.FILENAME, f);// 檔名 map.put("fileurl", furl);// //注意,傳入ftp的路徑 json = JsonUtil.map2json(map);// 從新打包放入json傳入後臺 //httpRequest(json); } catch (Exception e) { e.printStackTrace(); } } else if (message instanceof TextMessage) { try { session.commit();//注意要先commit,否則一直顯示未處理 } catch (JMSException e) { e.printStackTrace(); } try { TextMessage textMessage = (TextMessage) message; String json = textMessage.getStringProperty("json");// 此處json為引數 String string = textMessage.getText(); //httpRequest(string); System.out.println("傳送的文字是 " + string + " 傳送的json資料是 " + json); } catch (Exception e) { e.printStackTrace(); } } } }); } catch (Exception e) { e.printStackTrace(); } } /** * 此處根據不同的引數來轉發 * * @param json * @return */ public void httpRequest(String json) { //插入業務程式碼 } /** * 檔案上傳FTP,注意傳入檔案前要將檔案重新命名,JDK1.7測試通過 * * @param uploadFile * ,檔案物件 */ private void uploadConfigFile(File uploadFile) { FTPClient ftpClient = new FTPClient(); String url = null; try { PropertiesLoader loader = new PropertiesLoader( "global.properties"); String http_picture = loader.getProperty("http_picture");// 從global.properties中取值 Map<String, Object> mapPath = RequestUtils.loadFilePath(); ftpClient.connect(mapPath.get("ftp_host").toString(), Integer.parseInt(mapPath.get("ftp_port").toString())); ftpClient.login(mapPath.get("ftp_username").toString(), mapPath .get("ftp_password").toString()); ftpClient.enterLocalPassiveMode(); ftpClient.setFileType(FTP.BINARY_FILE_TYPE); // 設定上傳目錄 ftpClient.changeWorkingDirectory(mapPath.get("ftp_filepath") .toString()); String fileName = new String( uploadFile.getName().getBytes("utf-8"), "iso-8859-1"); FTPFile[] fs = ftpClient.listFiles(); url = http_picture+ fileName; if (fs != null && fs.length > 0) { for (int i = 0; i < fs.length; i++) { if (fs[i].getName().equals(fileName)) { ftpClient.deleteFile(fs[i].getName()); break; } } } OutputStream os = ftpClient.appendFileStream(fileName); byte[] bytes = new byte[1024]; // InputStream is = uploadFile.getInputStream(); InputStream is = new FileInputStream(uploadFile); int c; // 暫未考慮中途終止的情況 while ((c = is.read(bytes)) != -1) { os.write(bytes, 0, c); } os.flush(); is.close(); os.close(); } catch (IOException e) { e.printStackTrace(); } finally { try { ftpClient.disconnect(); } catch (IOException e) { e.printStackTrace(); } } } }
package tk.mybatis.springboot.mq; import java.io.File; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.aisino.common.xutils.PropertiesLoader; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQSession; import org.apache.activemq.BlobMessage; /** * 通過 ActiveMQ 傳送檔案的程式 * * @author wb-liufei * */ public class MessageSender { /** * @param args * @throws JMSException * */ private static String user = ActiveMQConnection.DEFAULT_USER; private static String password = ActiveMQConnection.DEFAULT_PASSWORD; private static String url = "tcp://127.0.0.1:61616?jms.blobTransferPolicy.defaultUploadUrl=http://127.0.0.1:8161/fileserver/"; private static String queque = "queue1"; /*** * file和text只能傳一個,另一個必須傳入null * * @param file * 檔案物件 * @param text * 文字資訊 * @param json * json字串引數 * @param queue * :通道 * @throws JMSException */ public void sender(File file, String text, String json) throws JMSException { PropertiesLoader loader = new PropertiesLoader("global.properties"); url = loader.getProperty("mq_url_sender");// 從global.properties中取值 // 獲取 ConnectionFactory ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( user, password, url); // 建立 Connection Connection connection = connectionFactory.createConnection(); connection.start(); // 建立 Session Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 建立 Destination Queue destination = session.createQueue(queque); // 建立 Producer MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.PERSISTENT);// 設定為非永續性 // 設定永續性的話,檔案也可以先快取下來,接收端離線再連線也可以收到檔案 // 構造 BlobMessage,用來傳輸檔案 // 如果設定 producer.setDeliveryMode(DeliveryMode.PERSISTENT); 訊息永續性的話, // 傳送方傳檔案的時候,接收方可以不線上,檔案會暫存在 ActiveMQ 伺服器上,等到接收程式上線後仍然可以收到發過來的檔案。 TextMessage textMessage = null; BlobMessage blobMessage = null; if (text != null && file == null) { textMessage = session.createTextMessage(text); // textMessage.setStringProperty("json", json); producer.send(textMessage);// 存入文字 } if (file != null && text == null) { blobMessage = ((ActiveMQSession) session).createBlobMessage(file); blobMessage.setStringProperty("json", json); blobMessage.setStringProperty("FILE.NAME", file.getName());// 存入檔名稱 System.out.println("開始傳送檔案:" + file.getName() + ",檔案大小:" + file.length() + " 位元組"); producer.send(blobMessage);// 存入二進位制檔案,併發送 System.out.println("完成檔案傳送:" + file.getName()); } // producer.close(); session.commit(); connection.close(); // 不關閉 Connection, 程式則不退出 } }