訊息服務框架使用案例之--大檔案上傳(斷點續傳)功能
訊息服務框架使用案例之--大檔案上傳(斷點續傳)功能
一、分塊上傳和斷點續傳原理
在我們的一個產品應用中,客戶需要上傳大量的檔案到伺服器,其中不乏很大的視訊檔案。雖然可以使用FTP這樣成熟穩定的工具,但客戶表示不會使用FTP工具,並且我們產品也覺得客戶從我們軟體在切換到FTP使用者體驗不好,如果做成後臺指令碼呼叫FTP上傳那麼進度資訊很難呈現到我們軟體上。最終,決定我們自己做檔案上傳功能。
大檔案上傳受限於伺服器每次處理資料的能力,不能一次傳輸完成,所以分塊上傳是必然的了,由於上傳時間可能較長,中途可能因為網路或者人為原因終止上傳,所以還需要斷點上傳功能。
分塊上傳實際上是在客戶端分塊讀取檔案,然後在伺服器分塊寫入檔案,每次讀寫記錄下讀寫的起始位置,也就是檔案的偏移量,和要讀寫的資料長度。在上傳過程中,每完成一個檔案資料塊的寫入,就向客戶端返回一次資訊,客戶端據此進行下一檔案資料塊的讀取。 斷點續傳功能也比較好實現,就是上傳過程中將檔案在伺服器寫為臨時檔案,等全部寫完了(檔案上傳完),將此臨時檔案重新命名為正式檔案即可,如果中途上傳中斷過,下次上傳的時候根據當前臨時檔案大小,作為在客戶端讀取檔案的偏移量,從此位置繼續讀取檔案資料塊,上傳到伺服器從此偏移量繼續寫入檔案即可。
二、訊息服務框架實現檔案上傳
假設我們將每一個檔案資料塊看做一份“訊息”,那麼檔案上傳本質上就是客戶端和伺服器兩端頻繁的訊息互動而已。訊息服務框架(MSF)是一個集成了服務容器和訊息訪問的框架,正好可以用來做檔案上傳應用。具體做法就是在服務端,編寫一個“檔案上傳服務”,在客戶端,編寫一個呼叫上傳服務的回撥方法即可。
2.1,檔案上傳服務端
新建一個MSF服務類:
public class FilesService : ServiceBase
{
}
然後新增一個處理上傳檔案的方法:
/// <summary> /// 批量上傳檔案(通過回撥客戶端的方式,支援斷點續傳) /// </summary> /// <param name="list">檔案列表</param> /// <returns></returns> public UploadResult UploadFiles(List<UploadFileInfos> list) { int uploadCount = 0; foreach (var uploadInfo in list) { string pathfile = string.Empty; try { pathfile = this.MapServerPath(uploadInfo.FilePath); if (!Directory.Exists(Path.GetDirectoryName(pathfile))) { Directory.CreateDirectory(Path.GetDirectoryName(pathfile)); } if (File.Exists(pathfile)) { FileInfo fi = new FileInfo(pathfile); if (fi.Length == uploadInfo.Size && fi.LastWriteTime == uploadInfo.FileModifyTime) { Console.WriteLine("檔案 {0} {1}", pathfile, "已上傳,跳過"); continue;//檔案已上傳,跳過 } else { fi.Delete(); } } //"斷點"上傳的檔案 long offset = 0; //上傳的分部檔名稱增加一個檔案長度數字,避免下次客戶端上傳的時候,修改了內容。 //如果檔案上傳了一部分,的確修改了內容,那麼原來上傳的部分檔案就丟棄了。 string partFile = pathfile + uploadInfo.Size + ".part"; if (File.Exists(partFile)) { FileInfo fi = new FileInfo(partFile); offset = fi.Length; } while (offset < uploadInfo.Size) { uploadInfo.Offset = offset; uploadInfo.Length = MaxReadSize; if (uploadInfo.Offset + uploadInfo.Length > uploadInfo.Size) uploadInfo.Length = (int)(uploadInfo.Size - uploadInfo.Offset); //回撥客戶端,通知上傳檔案塊 var data = GetUploadFileData(uploadInfo); if (data.Length == 0) { //如果有長度為零的檔案表示客戶讀取檔案失敗,終止上傳操作 throw new Exception("讀取客戶端檔案失敗(Length=0),終止上傳操作"); } if (data.Length != uploadInfo.Length) throw new Exception("網路異常:上傳的檔案流資料塊大小與預期的不一致"); //等待上次寫完 resetEvent.WaitOne(); //非同步寫檔案 System.Threading.Thread t = new System.Threading.Thread(new System.Threading.ParameterizedThreadStart(obj => { WriteFileInfo wfi = (WriteFileInfo)obj; CurWriteFile(wfi.FileName, wfi.WriteData, wfi.Offset); })); t.Start(new WriteFileInfo() { FileName = partFile, WriteData = data, Offset = offset }); offset += uploadInfo.Length; } resetEvent.WaitOne(); //重新命名到正常檔名 File.Move(partFile, pathfile); System.IO.File.SetLastWriteTime(pathfile, uploadInfo.FileModifyTime); uploadCount++; resetEvent.Set(); } catch (Exception ex) { resetEvent.Set(); return new UploadResult() { Success = false, FilesCount = 0, Message = ex.Message }; }//end try } //end for return new UploadResult() { Success = true, FilesCount = list.Count }; }
在這個方法中,有一個重要方法, //回撥客戶端,通知上傳檔案塊 var data = GetUploadFileData(uploadInfo);
它呼叫了MSF框架服務上下文的回撥函式CallBackFunction,來讀取客戶端檔案資料的,程式碼如下:
private byte[] GetUploadFileData(UploadFileInfos fileinfo)
{
return base.CurrentContext.CallBackFunction<UploadFileInfos, byte[]>(fileinfo);
}
另外,服務端寫檔案的方法CurWriteFile 實現如下:
/// <summary>
/// 將伺服器端獲取到的位元組流寫入檔案
/// </summary>
/// <param name="pReadByte">流</param>
/// <param name="fileName">檔名</param>
/// <param name="offset">要寫入檔案的位置</param>
public void CurWriteFile(string fileName, byte[] pReadByte, long offset)
{
FileStream pFileStream = null;
try
{
pFileStream = new FileStream(fileName, FileMode.OpenOrCreate);
pFileStream.Seek(offset, SeekOrigin.Begin);
pFileStream.Write(pReadByte, 0, pReadByte.Length);
}
catch(Exception ex)
{
throw new Exception("寫檔案塊失敗,寫入位置:"+offset+",檔名:"+fileName+",錯誤原因:"+ex.Message);
}
finally
{
if (pFileStream != null)
pFileStream.Close();
resetEvent.Set();
}
}
2.2,檔案上傳客戶端
現在看檔案上傳客戶端程式碼,如何提供服務端需要的檔案讀取回調函式:
ServiceRequest request = new ServiceRequest();
request.ServiceName = "FilesService";
request.MethodName = "UploadFiles";
request.Parameters = new object[] { infos };
Proxy srvProxy = new Proxy();
srvProxy.ServiceBaseUri = string.Format("net.tcp://{0}", serverHost);
srvProxy.ErrorMessage += srvProxy_ErrorMessage;
Task<UploadResult> result= srvProxy.RequestServiceAsync<UploadResult, UploadFileInfos, byte[]>(request,
uploadingInfo =>
{
//action委託方法顯示進度給客戶端
action(new UploadStateArg()
{
State = uploadingInfo.Offset + uploadingInfo.Length >= uploadingInfo.Size
? UploadState.Success: UploadState.Uploading,
ProgressFile = uploadingInfo.FilePath,
ProcessValue = Convert.ToInt32(uploadingInfo.Offset * 100 / uploadingInfo.Size),
TotalProcessValue = Convert.ToInt32((uploadingInfo.UploadIndex +1) * 100 / index)
});
Console.WriteLine(">>>Debug:Path:{0},FilePath:{1}",folder, uploadingInfo.FilePath);
var fullName = Path.IsPathRooted(folder)? folder + uploadingInfo.FilePath : uploadingInfo.FilePath;
Console.WriteLine(">>>伺服器讀取客戶端檔案:{0},偏移量:{1} 長度:{2}",
fullName, uploadingInfo.Offset, uploadingInfo.Length);
return ReadFileData(fullName, uploadingInfo.Offset, uploadingInfo.Length);
}
);
在上面的方法中, srvProxy.RequestServiceAsync泛型方法需要3個引數,第一個引數是服務的結果型別,第二個引數是提供給服務端回撥方法(前面的base.CurrentContext.CallBackFunction方法)的引數,第三個引數是服務回撥方法的結果。srvProxy.RequestServiceAsync 的回撥方法的引數 uploadingInfo 是伺服器推送過來的訊息,裡面包含了需要讀取的檔案資訊,包括檔名,偏移量,讀取長度等資訊。
其中,客戶端讀取檔案的方法 ReadFileData 實現如下:
/// <summary>
/// 讀取檔案返回位元組流
/// </summary>
/// <param name="fileName">檔案路徑</param>
/// <param name="offset">要讀取的檔案流的位置</param>
/// <param name="length">要讀取的檔案塊大小</param>
/// <returns></returns>
private byte[] ReadFileData(string fileName, long offset, int length)
{
FileStream pFileStream = null;
byte[] pReadByte = new byte[0];
try
{
pFileStream = new FileStream(fileName, FileMode.Open, FileAccess.Read);
BinaryReader r = new BinaryReader(pFileStream);
r.BaseStream.Seek(offset, SeekOrigin.Begin);
pReadByte = r.ReadBytes(length);
return pReadByte;
}
catch
{
return pReadByte;
}
finally
{
if (pFileStream != null)
pFileStream.Close();
}
}
這樣,在一次檔案上傳的“請求-響應”過程中,MSF的服務端進行了多次回撥客戶端的操作,客戶端根據服務端推送過來的引數資訊來精確的讀取服務端需要的檔案資料。一個支援斷點續傳的大檔案上傳服務,使用MSF框架就做好了。
三、其它
本文使用到的其它相關服務端物件的程式碼定義如下:
/// <summary>
/// 上傳狀態列舉
/// </summary>
public enum UploadState
{
/// <summary>
/// 上傳成功
/// </summary>
Success,
/// <summary>
/// 上傳中
/// </summary>
Uploading,
/// <summary>
/// 錯誤
/// </summary>
Error
}
/// <summary>
/// 上傳狀態引數
/// </summary>
public class UploadStateArg
{
/// <summary>
/// 上傳狀態
/// </summary>
public UploadState State { get; set; }
/// <summary>
/// 上傳的檔名
/// </summary>
public string ProgressFile { get; set; }
/// <summary>
/// 處理的訊息,如果出錯,這裡是錯誤訊息
/// </summary>
public string Message { get; set; }
/// <summary>
/// 處理進度(百分比)
/// </summary>
public int ProcessValue { get; set; }
/// <summary>
/// 總體處理進度(百分比)
/// </summary>
public int TotalProcessValue { get; set; }
}
如果你不清楚如何使用MSF來實現本文的功能,請先閱讀下面的文章:
“一切都是訊息”--MSF(訊息服務框架)入門簡介
建議你讀完相關的其它兩篇文章:
“一切都是訊息”--MSF(訊息服務框架)之【請求-響應】模式
“一切都是訊息”--MSF(訊息服務框架)之【釋出-訂閱】模式
讀完後,建議你再讀讀MSF的理論總結:
分散式系統的訊息&服務模式簡單總結
有關訊息服務框架(MSF)更多的討論,請加我們QQ群討論,群號:18215717 ,加群口令:訊息服務框架