關於阻塞佇列\生產者消費者模式用於資訊上傳的案例
public class UploadInfoBiz extends BaseBiz {
private final static Logger log = Logger.getLogger(UploadInfoBiz.class);
private static UploadInfoBiz instance = null;
private static boolean isStarted = false;
private final static int MAX_SIZE = 50000; //佇列容量
private static LinkedBlockingQueue<UploadInfo> uploadList = new LinkedBlockingQueue<UploadInfo>(MAX_SIZE);//阻塞執行緒
Thread uploadService = null;
public static synchronized UploadInfoBiz getInstance() {
if (instance == null) {
instance = new UploadInfoBiz();
}
return instance;
}
public void start() {
isStarted = true;
if (uploadService == null) {
uploadService = new Thread(new NewUploadTask());
uploadService.start();
}
}
public void stop() {
isStarted = false;
}
/**
* 接受處理資料並放入佇列
* @param jsonReq json
* @return
* @throws InvalidKeyException
* @throws NoSuchAlgorithmException
* @throws NoSuchPaddingException
* @throws IllegalBlockSizeException
* @throws BadPaddingException
* @throws UnsupportedEncodingException
* @throws DBAccessException
*/
public JsonObject uploadList(UploadInfo uploadInfo) throws InvalidKeyException, NoSuchAlgorithmException, NoSuchPaddingException, IllegalBlockSizeException, BadPaddingException, UnsupportedEncodingException, DBAccessException {
JsonObject jsonObject = new JsonObject();
putUploadMessage(uploadInfo);
jsonObject.setState(0);
jsonObject.setStateText("上傳成功");
return jsonObject;
}
public void putUploadMessage(UploadInfo uploadInfo) {
try {
uploadList.put(uploadInfo);
} catch (InterruptedException e) {
log.error("資料插入佇列失敗", e);
}
}
/**
* 資料從佇列取出的方法
*/
public class NewUploadTask implements Runnable {
public void run() {
while (isStarted) {
try {
int num = 500;
if (uploadList.size() < 500) {
num = uploadList.size();
}
dealWithInfo(num); //從佇列裡拿出資料
UploadInfo uploadInfo = uploadList.take(); //阻塞佇列
uploadList.put(uploadInfo);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
}
}
/**
* 資料從佇列拿出並處理
*/
@Transactional
public void dealWithInfo(int num) {
try {
for (int i=0;i < num;i++) {
UploadInfo uploadInfo = uploadList.take();
int result = uploadData(uploadInfo);
if (result != 0 ) {
log.error("上傳失敗" };
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
} catch (RuntimeException e) {
log.error(e.getMessage(), e);
} catch (SQLException e) {
e.printStackTrace();
} catch (Exception e) {
log.error(e.getMessage(), e);
}
}
/**
* 資料儲存到資料庫
*/
public int uploadData(UploadInfo uploadInfo) throws SQLException,Exception {
result = TextBiz.getInstance().syncData(uploadInfo);
return result;
}
}