1. 程式人生 > >關於阻塞佇列\生產者消費者模式用於資訊上傳的案例

關於阻塞佇列\生產者消費者模式用於資訊上傳的案例

public class UploadInfoBiz extends BaseBiz {
    
    private final static Logger log = Logger.getLogger(UploadInfoBiz.class);
    private static UploadInfoBiz instance = null;
    private static boolean isStarted = false;
    private final static int MAX_SIZE = 50000;   //佇列容量
    private static LinkedBlockingQueue<UploadInfo> uploadList = new LinkedBlockingQueue<UploadInfo>(MAX_SIZE);//阻塞執行緒
    Thread uploadService = null;
    public static synchronized UploadInfoBiz getInstance() {
        if (instance == null) {
            instance = new UploadInfoBiz();
        }
        return instance;
    }
     
    public void start() {
        isStarted = true;
        if (uploadService == null) {
            uploadService = new Thread(new NewUploadTask()); 
            uploadService.start();
        }
    }

    public void stop() {
        isStarted = false;
    }
    
    /**
     *  接受處理資料並放入佇列
     * @param jsonReq  json
     * @return
     * @throws InvalidKeyException
     * @throws NoSuchAlgorithmException
     * @throws NoSuchPaddingException
     * @throws IllegalBlockSizeException
     * @throws BadPaddingException
     * @throws UnsupportedEncodingException
     * @throws DBAccessException
     */
    public JsonObject uploadList(UploadInfo uploadInfo) throws InvalidKeyException, NoSuchAlgorithmException, NoSuchPaddingException, IllegalBlockSizeException, BadPaddingException, UnsupportedEncodingException, DBAccessException {
        JsonObject jsonObject = new JsonObject();
        putUploadMessage(uploadInfo);
        jsonObject.setState(0);
         jsonObject.setStateText("上傳成功");
        return jsonObject;
    }
    
    public void putUploadMessage(UploadInfo uploadInfo) {
        try {
            uploadList.put(uploadInfo);
        } catch (InterruptedException e) {
            log.error("資料插入佇列失敗", e);
        }
     }
      
     /**
      * 資料從佇列取出的方法
      */
     public class NewUploadTask implements Runnable {
        public void run() {
            while (isStarted) {
                try {
                    int num = 500;
                    if (uploadList.size() < 500) {
                         num = uploadList.size();
                    }
                    dealWithInfo(num);  //從佇列裡拿出資料
                    UploadInfo uploadInfo = uploadList.take(); //阻塞佇列
                    uploadList.put(uploadInfo);
                } catch (Exception e) {
                    log.error(e.getMessage(), e);
                }    
            }    
        }
    }    
     
    /**
     *  資料從佇列拿出並處理
     */
     @Transactional
    public void dealWithInfo(int num) {
        try {
            for (int i=0;i < num;i++) {
                UploadInfo uploadInfo = uploadList.take();
                int result = uploadData(uploadInfo);
                if (result != 0 ) {
                    log.error("上傳失敗"  };
        } catch (InterruptedException e) {
            log.error(e.getMessage(), e);
        } catch (RuntimeException e) {
            log.error(e.getMessage(), e);
        } catch (SQLException e) {
            e.printStackTrace();
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }    
    }
        
    /**
     * 資料儲存到資料庫
     */
    public int uploadData(UploadInfo uploadInfo) throws SQLException,Exception  {
     
             result = TextBiz.getInstance().syncData(uploadInfo);
   
        return result;
    } 
        
}