1. 程式人生 > >HDFS下斷點續傳的實現——下載

HDFS下斷點續傳的實現——下載

在使用HDFS系統的伺服器端提供給客戶端斷點下載功能時,可以使用Hadoop api提供的seek方法讀取偏移量,從使用者所請求的偏移量斷點處開始讀取檔案流傳輸給使用者。

實現的方式,同樣使用偏移量的方式約定斷點續傳,但與上傳不同的是本次由客戶端維護offset引數,需要斷點下載時將包括了offset欄位的檔案資訊提交給服務端,服務端根據偏移量欄位使用seek方法讀取斷點後的檔案流傳輸給使用者。

HDFSHandler方法中,斷點下載的實現程式碼如下:

/**
	 * 斷點續傳下載檔案到本地
	 * 
	 * @param response
	 * @param offSet
	 *            偏移量
	 * @param encryptfilename
	 *            HDFS中的加密檔名
	 * @throws IOException
	 */
	public static void downloadFromHDFSinOffset(HttpServletResponse response, long offSet, String encryptfilename)
			throws IOException {
		if (response == null || encryptfilename == null || encryptfilename.equals(""))
			return;

		response.setContentType("application/x-msdownload");
		response.addHeader("Content-Disposition", "attachment;filename=" + encryptfilename);
		ServletOutputStream sos = response.getOutputStream();

		DownloadInOffset dfb = null;
		try {
			dfb = new DownloadInOffset(encryptfilename);
			byte[] buffer = new byte[1024];
			long size = dfb.getFileSize(encryptfilename);// 檔案總大小
			System.out.println("HDFSHandler : getFileSize = " + size);
			int len = 0;// 每次讀取位元組長度
			long length = 0;// 已讀取總長度
			if (offSet == 0) {
				len = dfb.download(buffer);// 將指標指向檔案起始處
			} else {
				len = dfb.download(buffer, offSet);// 先將指標指向偏移量位置
			}
			do {
				// 開始迴圈,往buffer中寫入輸出流
				sos.write(buffer, 0, len);
				length += len;
			} while ((len = dfb.download(buffer)) != -1 && length + offSet <= size);

			System.out.println("HDFSHandler : offset = " + offSet);
			System.out.println("HDFSHandler : length = " + length);
			System.out.println("HDFSHandler : offset + length = " + offSet + "+" + length + "=" + (offSet + length));

			sos.flush();
		} catch (Exception e) {
			Log.logException(e);
		} finally {
			dfb.close();
		}
	}


其中,例項化了自定義的DownloadInOffset類,該類中使用瞭如上所說的seek方法將指標指向了偏移量位置,程式碼如下:

public class DownloadInOffset {
	private FileSystem hadoopFS = null;
	private Configuration conf = null;
	private FSDataInputStream fsInputStream = null;
	FileSystem hdfs = null;

	private FileInputStream fileInputStream = null;

	public DownloadInOffset(String srcPath) throws IOException {
		// srcPath = HDFSHandler.hdfs_path + HDFSHandler.user_path + "/" +
		// srcPath;
		srcPath = HDFSHandler.hdfs_path + HDFSHandler.download_path + "/" + srcPath;
		// srcPath = "/home/hadoop/backup/aaa.rmvb";
		conf = HDFSHandler.conf;
		hadoopFS = FileSystem.get(conf);
		fsInputStream = hadoopFS.open(new Path(srcPath));
	}

	public int download(byte[] ioBuffer, long offset) throws IOException {
		if (ioBuffer == null) {
			IOException e = new IOException("ioBuffer is null");
			throw e;
		}
		// fsInputStream.read(offset, ioBuffer, 0, length);
		fsInputStream.seek(offset);
		return fsInputStream.read(ioBuffer);
	}

	public int download(byte[] ioBuffer) throws IOException {
		if (ioBuffer == null) {
			IOException e = new IOException("ioBuffer is null");
			throw e;
		}
		return fsInputStream.read(ioBuffer);
	}

	public void close() {
		if (fsInputStream != null) {
			try {
				fsInputStream.close();
				hdfs.close();
			} catch (IOException e) {
				e.printStackTrace();
				Log.logException(e);
			}
		}
	}

	public long getFileSize(String srcPath) throws IOException {
		srcPath = HDFSHandler.hdfs_path + HDFSHandler.download_path + "/" + srcPath;
		// srcPath = "/home/hadoop/backup/aaa.rmvb";
		conf = HDFSHandler.conf;
		hdfs = FileSystem.get(URI.create(srcPath), conf);
		FileStatus fs = hdfs.getFileStatus(new Path(srcPath));
		long size = fs.getLen();
		// hdfs.close();
		return size;
	}
}

應注意的是,seek方法本身開銷較大,若服務端需要考慮到大量使用者同時請求該操作,可以手動搭配服務端負載均衡,或使用生產者消費者方法將過多的seek請求放置到阻塞佇列。