1. 程式人生 > >關於apache-activemq傳送圖片未處理的解決方案

關於apache-activemq傳送圖片未處理的解決方案

圖片可以通過BlobMessage物件傳送,但是在生產者訊息傳送成功後,消費者接收到,要首先 session.commit();否則一直要等待處理,程式碼如下,消費者程式碼:

package org.aisino.mq;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.aisino.common.Constants;
import org.aisino.common.HttpClientUtil;
import org.aisino.common.RequestUtils;
import org.aisino.common.xutils.JsonUtil;
import org.aisino.common.xutils.PropertiesLoader;
import org.aisino.dlgs.cgsdzda.vo.FileVo;
import org.aisino.rpcclient.HttpClient;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.BlobMessage;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile;
import org.springframework.beans.factory.InitializingBean;

public class ThreadQueueConsumerN implements Runnable, InitializingBean {
	/**
	 * @param args
	 * @throws JMSException
	 */
	private static String user = ActiveMQConnection.DEFAULT_USER;
	private static String password = ActiveMQConnection.DEFAULT_PASSWORD;
	private static String url = "tcp://127.0.0.1:61616?jms.blobTransferPolicy.defaultUploadUrl=http://127.0.0.1:8161/fileserver/";
	Connection connection;
	Session session;

	@Override
	public void afterPropertiesSet() throws Exception {
		new Thread(this).start();
	}

	@Override
	public void run() {
		try {
			// 獲取 ConnectionFactory
			PropertiesLoader loader = new PropertiesLoader("global.properties");
			url = loader.getProperty("mq_url_receive");// 從global.properties中取值
			ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
					user, password, url);
			System.out.println("進入了內網...........................run");
			// 建立 Connection
			connection = connectionFactory.createConnection();
			connection.start();
			// 建立 Session
			session = connection.createSession(Boolean.TRUE,
					Session.AUTO_ACKNOWLEDGE);
			// 建立 Destinatione
			Destination destination = session.createQueue("queue1");
			// 建立 Consumer
			MessageConsumer consumer = session.createConsumer(destination);
			// 註冊訊息監聽器,當訊息到達時被觸發並處理訊息
			consumer.setMessageListener(new MessageListener() {
				// 監聽器中處理訊息
				public void onMessage(Message message) {
					if (message instanceof BlobMessage) {

						try {
							session.commit();//注意要先commit,否則一直顯示未處理
						} catch (JMSException e) {
							e.printStackTrace();
						}

						BlobMessage blobMessage = (BlobMessage) message;

						String fileName = null;
						try {
							fileName = blobMessage
									.getStringProperty("FILE.NAME");// 在json中取不到,要在blobMessage物件中才可以取到
							File file = new File(fileName);// 存入臨時檔案
							if (!file.exists()) {
								file.createNewFile();
							}
							OutputStream os = new FileOutputStream(file);
							System.out.println("開始接收檔案:" + fileName);
							InputStream inputStream = blobMessage
									.getInputStream();
							// 寫檔案,你也可以使用其他方式
							byte[] buff = new byte[256];
							int len = 0;
							while ((len = inputStream.read(buff)) > 0) {
								os.write(buff, 0, len);
							}
							os.close();
							String json = blobMessage.getStringProperty("json");
							System.out.println("完成檔案接收:" + file.getName()
									+ " 路徑 :" + file.getPath());
							System.out.println("json:" + json);
							FileVo vo = uploadConfigFile(file);// 上傳FTP
							String f = vo.getFilename();// 重新命名後的檔名,等會兒傳入後臺
							String furl = vo.getFileurl();// 重新命名後的檔名,等會兒傳入後臺
							Map map = JsonUtil.JsonObjStr2Map(json);
							map.put(Constants.FILENAME, f);// 檔名
							map.put("fileurl", furl);// //注意,傳入ftp的路徑
							json = JsonUtil.map2json(map);// 從新打包放入json傳入後臺
							//httpRequest(json);
						} catch (Exception e) {
							e.printStackTrace();
						}
					} else if (message instanceof TextMessage) {

						try {
							session.commit();//注意要先commit,否則一直顯示未處理
						} catch (JMSException e) {
							e.printStackTrace();
						}

						try {
							TextMessage textMessage = (TextMessage) message;
							String json = textMessage.getStringProperty("json");// 此處json為引數
							String string = textMessage.getText();
							//httpRequest(string);
							System.out.println("傳送的文字是 " + string
									+ " 傳送的json資料是 " + json);
						} catch (Exception e) {
							e.printStackTrace();
						}
					}
				}
			});
		} catch (Exception e) {
			e.printStackTrace();
		}

	}

	/**
	 * 此處根據不同的引數來轉發
	 * 
	 * @param json
	 * @return
	 */
	public void httpRequest(String json) {
		//插入業務程式碼
	}

	/**
	 * 檔案上傳FTP,注意傳入檔案前要將檔案重新命名,JDK1.7測試通過
	 * 
	 * @param uploadFile
	 *            ,檔案物件
	 */
	private void uploadConfigFile(File uploadFile) {
		FTPClient ftpClient = new FTPClient();
		String url = null;
		try {
			PropertiesLoader loader = new PropertiesLoader(
					"global.properties");
			String http_picture = loader.getProperty("http_picture");// 從global.properties中取值
			
			Map<String, Object> mapPath = RequestUtils.loadFilePath();
			ftpClient.connect(mapPath.get("ftp_host").toString(),
					Integer.parseInt(mapPath.get("ftp_port").toString()));
			ftpClient.login(mapPath.get("ftp_username").toString(), mapPath
					.get("ftp_password").toString());
			ftpClient.enterLocalPassiveMode();
			ftpClient.setFileType(FTP.BINARY_FILE_TYPE);

			// 設定上傳目錄
			ftpClient.changeWorkingDirectory(mapPath.get("ftp_filepath")
					.toString());
			String fileName = new String(
					uploadFile.getName().getBytes("utf-8"), "iso-8859-1");
			FTPFile[] fs = ftpClient.listFiles();
			url = http_picture+ fileName;
			if (fs != null && fs.length > 0) {
				for (int i = 0; i < fs.length; i++) {
					if (fs[i].getName().equals(fileName)) {
						ftpClient.deleteFile(fs[i].getName());
						break;
					}
				}
			}
			OutputStream os = ftpClient.appendFileStream(fileName);
			byte[] bytes = new byte[1024];
			// InputStream is = uploadFile.getInputStream();
			InputStream is = new FileInputStream(uploadFile);
			int c;
			// 暫未考慮中途終止的情況
			while ((c = is.read(bytes)) != -1) {
				os.write(bytes, 0, c);
			}
			os.flush();
			is.close();
			os.close();
		} catch (IOException e) {
			e.printStackTrace();
		} finally {
			try {
				ftpClient.disconnect();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
}
生產者:

package tk.mybatis.springboot.mq;

import java.io.File;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.aisino.common.xutils.PropertiesLoader;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.BlobMessage;

/**
 * 通過 ActiveMQ 傳送檔案的程式
 * 
 * @author wb-liufei
 * 
 */
public class MessageSender {
	/**
	 * @param args
	 * @throws JMSException
	 * 
	 */
	private static String user = ActiveMQConnection.DEFAULT_USER;
	private static String password = ActiveMQConnection.DEFAULT_PASSWORD;
	private static String url = "tcp://127.0.0.1:61616?jms.blobTransferPolicy.defaultUploadUrl=http://127.0.0.1:8161/fileserver/";
	private static String queque = "queue1";

	/***
	 * file和text只能傳一個,另一個必須傳入null
	 * 
	 * @param file
	 *            檔案物件
	 * @param text
	 *            文字資訊
	 * @param json
	 *            json字串引數
	 * @param queue
	 *            :通道
	 * @throws JMSException
	 */
	public void sender(File file, String text, String json) throws JMSException {
		PropertiesLoader loader = new PropertiesLoader("global.properties");
		url = loader.getProperty("mq_url_sender");// 從global.properties中取值
		// 獲取 ConnectionFactory
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
				user, password, url);
		// 建立 Connection
		Connection connection = connectionFactory.createConnection();
		connection.start();
		// 建立 Session
		Session session = connection.createSession(Boolean.TRUE,
				Session.AUTO_ACKNOWLEDGE);
		// 建立 Destination
		Queue destination = session.createQueue(queque);

		// 建立 Producer
		MessageProducer producer = session.createProducer(destination);
		producer.setDeliveryMode(DeliveryMode.PERSISTENT);// 設定為非永續性
		// 設定永續性的話,檔案也可以先快取下來,接收端離線再連線也可以收到檔案
		// 構造 BlobMessage,用來傳輸檔案
		// 如果設定 producer.setDeliveryMode(DeliveryMode.PERSISTENT); 訊息永續性的話,
		// 傳送方傳檔案的時候,接收方可以不線上,檔案會暫存在 ActiveMQ 伺服器上,等到接收程式上線後仍然可以收到發過來的檔案。
		TextMessage textMessage = null;
		BlobMessage blobMessage = null;
		if (text != null && file == null) {
			textMessage = session.createTextMessage(text);
			// textMessage.setStringProperty("json", json);
			producer.send(textMessage);// 存入文字
		}
		if (file != null && text == null) {
			blobMessage = ((ActiveMQSession) session).createBlobMessage(file);
			blobMessage.setStringProperty("json", json);
			blobMessage.setStringProperty("FILE.NAME", file.getName());// 存入檔名稱
			System.out.println("開始傳送檔案:" + file.getName() + ",檔案大小:"
					+ file.length() + " 位元組");
			producer.send(blobMessage);// 存入二進位制檔案,併發送
			System.out.println("完成檔案傳送:" + file.getName());
		}
		// producer.close();
		session.commit();
		connection.close(); // 不關閉 Connection, 程式則不退出
	}
}