HDFS下斷點續傳的實現——下載
阿新 • • 發佈:2019-01-27
在使用HDFS系統的伺服器端提供給客戶端斷點下載功能時,可以使用Hadoop api提供的seek方法讀取偏移量,從使用者所請求的偏移量斷點處開始讀取檔案流傳輸給使用者。
實現的方式,同樣使用偏移量的方式約定斷點續傳,但與上傳不同的是本次由客戶端維護offset引數,需要斷點下載時將包括了offset欄位的檔案資訊提交給服務端,服務端根據偏移量欄位使用seek方法讀取斷點後的檔案流傳輸給使用者。
HDFSHandler方法中,斷點下載的實現程式碼如下:
/** * 斷點續傳下載檔案到本地 * * @param response * @param offSet * 偏移量 * @param encryptfilename * HDFS中的加密檔名 * @throws IOException */ public static void downloadFromHDFSinOffset(HttpServletResponse response, long offSet, String encryptfilename) throws IOException { if (response == null || encryptfilename == null || encryptfilename.equals("")) return; response.setContentType("application/x-msdownload"); response.addHeader("Content-Disposition", "attachment;filename=" + encryptfilename); ServletOutputStream sos = response.getOutputStream(); DownloadInOffset dfb = null; try { dfb = new DownloadInOffset(encryptfilename); byte[] buffer = new byte[1024]; long size = dfb.getFileSize(encryptfilename);// 檔案總大小 System.out.println("HDFSHandler : getFileSize = " + size); int len = 0;// 每次讀取位元組長度 long length = 0;// 已讀取總長度 if (offSet == 0) { len = dfb.download(buffer);// 將指標指向檔案起始處 } else { len = dfb.download(buffer, offSet);// 先將指標指向偏移量位置 } do { // 開始迴圈,往buffer中寫入輸出流 sos.write(buffer, 0, len); length += len; } while ((len = dfb.download(buffer)) != -1 && length + offSet <= size); System.out.println("HDFSHandler : offset = " + offSet); System.out.println("HDFSHandler : length = " + length); System.out.println("HDFSHandler : offset + length = " + offSet + "+" + length + "=" + (offSet + length)); sos.flush(); } catch (Exception e) { Log.logException(e); } finally { dfb.close(); } }
其中,例項化了自定義的DownloadInOffset類,該類中使用瞭如上所說的seek方法將指標指向了偏移量位置,程式碼如下:
public class DownloadInOffset { private FileSystem hadoopFS = null; private Configuration conf = null; private FSDataInputStream fsInputStream = null; FileSystem hdfs = null; private FileInputStream fileInputStream = null; public DownloadInOffset(String srcPath) throws IOException { // srcPath = HDFSHandler.hdfs_path + HDFSHandler.user_path + "/" + // srcPath; srcPath = HDFSHandler.hdfs_path + HDFSHandler.download_path + "/" + srcPath; // srcPath = "/home/hadoop/backup/aaa.rmvb"; conf = HDFSHandler.conf; hadoopFS = FileSystem.get(conf); fsInputStream = hadoopFS.open(new Path(srcPath)); } public int download(byte[] ioBuffer, long offset) throws IOException { if (ioBuffer == null) { IOException e = new IOException("ioBuffer is null"); throw e; } // fsInputStream.read(offset, ioBuffer, 0, length); fsInputStream.seek(offset); return fsInputStream.read(ioBuffer); } public int download(byte[] ioBuffer) throws IOException { if (ioBuffer == null) { IOException e = new IOException("ioBuffer is null"); throw e; } return fsInputStream.read(ioBuffer); } public void close() { if (fsInputStream != null) { try { fsInputStream.close(); hdfs.close(); } catch (IOException e) { e.printStackTrace(); Log.logException(e); } } } public long getFileSize(String srcPath) throws IOException { srcPath = HDFSHandler.hdfs_path + HDFSHandler.download_path + "/" + srcPath; // srcPath = "/home/hadoop/backup/aaa.rmvb"; conf = HDFSHandler.conf; hdfs = FileSystem.get(URI.create(srcPath), conf); FileStatus fs = hdfs.getFileStatus(new Path(srcPath)); long size = fs.getLen(); // hdfs.close(); return size; } }
應注意的是,seek方法本身開銷較大,若服務端需要考慮到大量使用者同時請求該操作,可以手動搭配服務端負載均衡,或使用生產者消費者方法將過多的seek請求放置到阻塞佇列。