1. 程式人生 > 程式設計 >c# 使用Task實現非阻塞式的I/O操作

c# 使用Task實現非阻塞式的I/O操作

  在前面的《基於任務的非同步程式設計模式(TAP)》文章中講述了.net 4.5框架下的非同步操作自我實現方式,實際上,在.net 4.5中部分類已實現了非同步封裝。如在.net 4.5中,Stream類加入了Async方法,所以基於流的通訊方式都可以實現非同步操作。

1、非同步讀取檔案資料

public static void TaskFromIOStreamAsync(string fileName)
{
  int chunkSize = 4096;
  byte[] buffer = new byte[chunkSize];

  FileStream fileStream = new FileStream(fileName,FileMode.Open,FileAccess.Read,FileShare.Read,chunkSize,true);

  Task<int> task = fileStream.ReadAsync(buffer,buffer.Length);
  task.ContinueWith((readTask) =>
  {
    int amountRead = readTask.Result;
    //必須在ContinueWith中釋放檔案流 
    fileStream.Dispose();
    Console.WriteLine($"Async(Simple) Read {amountRead} bytes");
  });
}

  上述程式碼中,非同步讀取資料只讀取了一次,完成讀取後就將執行權交還主執行緒了。但在真實場景中,需要從流中讀取多次才能獲得全部的資料(如檔案資料大於給定緩衝區大小,或處理來自網路流的資料(資料還沒全部到達機器))。因此,為了完成非同步讀取操作,需要連續從流中讀取資料,直到獲取所需全部資料。

  上述問題導致需要兩級Task來處理。外層的Task用於全部的讀取工作,供呼叫程式使用。內層的Task用於每次的讀取操作。

  第一次非同步讀取會返回一個Task。如果直接返回呼叫Wait或者ContinueWith的地方,會在第一次讀取結束後繼續向下執行。實際上是希望呼叫者在完成全部讀取操作後才執行。因此,不能把第一個Task釋出會給呼叫者,需要一個“偽Task”在完成全部讀取操作後再返回。

  上述問題需要使用到TaskCompletionSource<T>類解決,該類可以生成一個用於返回的“偽Task”。當非同步讀取操作全部完成後,呼叫其物件的TrySetResult,讓Wait或ContinueWith的呼叫者繼續執行。

public static Task<long> AsynchronousRead(string fileName)
{
  int chunkSize = 4096;
  byte[] buffer = new byte[chunkSize];
  //建立一個返回的偽Task物件
  TaskCompletionSource<long> tcs = new TaskCompletionSource<long>();

  MemoryStream fileContents = new MemoryStream();//用於儲存讀取的內容
  FileStream fileStream = new FileStream(fileName,true);
  fileContents.Capacity += chunkSize;//指定緩衝區大小。好像Capacity會自動增長,設定與否沒關係,後續寫入多少資料,就增長多少

  Task<int> task = fileStream.ReadAsync(buffer,buffer.Length);
  task.ContinueWith(readTask => ContinueRead(readTask,fileStream,fileContents,buffer,tcs));
  //在ContinueWith中迴圈讀取,讀取完成後,再返回tcs的Task
  return tcs.Task;
}

/// <summary>
/// 繼續讀取資料
/// </summary>
/// <param name="task">讀取資料的執行緒</param>
/// <param name="fileStream">檔案流</param>
/// <param name="fileContents">檔案存放位置</param>
/// <param name="buffer">讀取資料快取</param>
/// <param name="tcs">偽Task物件</param>
private static void ContinueRead(Task<int> task,FileStream fileStream,MemoryStream fileContents,byte[] buffer,TaskCompletionSource<long> tcs)
{
  if (task.IsCompleted)
  {
    int bytesRead = task.Result;
    fileContents.Write(buffer,bytesRead);//寫入記憶體區域。似乎Capacity會自動增長
    if (bytesRead > 0)
    {
      //雖然看似是一個新的任務,但是使用了ContinueWith,所以使用的是同一個執行緒。
      //沒有讀取完,開啟另一個非同步繼續讀取
      Task<int> newTask = fileStream.ReadAsync(buffer,buffer.Length);
      //此處做了一個迴圈
      newTask.ContinueWith(readTask => ContinueRead(readTask,tcs));
    }
    else
    {
      //已經全部讀取完,所以需要返回資料
      tcs.TrySetResult(fileContents.Length);
      fileStream.Dispose();
      fileContents.Dispose();//應該是在使用了資料之後才釋放資料緩衝區的資料
    }
  }
}

2、適應Task的非同步程式設計模式

  .NET Framework中的舊版非同步方法都帶有“Begin-”和“End-”字首。這些方法仍然有效,為了介面的一致性,它們可以被封裝到Task中。

  FromAsyn方法把流的BeginRead和EndRead方法作為引數,再加上存放資料的緩衝區。BeginRead和EndRead方法會執行,並在EndRead完成後呼叫Continuation Task,把控制權交回主程式碼。上述例子會關閉流並返回轉換的資料

const int ReadSize = 256;

/// <summary>
/// 從檔案中獲取字串
/// </summary>
/// <param name="path">檔案路徑</param>
/// <returns>字串</returns>
public static Task<string> GetStringFromFile(string path)
{
  FileInfo file = new FileInfo(path);
  byte[] buffer = new byte[1024];//存放資料的緩衝區

  FileStream fileStream = new FileStream(
    path,FileShare.None,buffer.Length,FileOptions.DeleteOnClose | FileOptions.Asynchronous);

  Task<int> task = Task<int>.Factory.FromAsync(fileStream.BeginRead,fileStream.EndRead,ReadSize,null);//此引數為BeginRead需要的引數

  TaskCompletionSource<string> tcs = new TaskCompletionSource<string>();

  task.ContinueWith(taskRead => OnReadBuffer(taskRead,tcs));

  return tcs.Task;
}

/// <summary>
/// 讀取資料
/// </summary>
/// <param name="taskRead">讀取任務</param>
/// <param name="fileStream">檔案流</param>
/// <param name="buffer">讀取資料存放位置</param>
/// <param name="offset">讀取偏移量</param>
/// <param name="tcs">偽Task</param>
private static void OnReadBuffer(Task<int> taskRead,int offset,TaskCompletionSource<string> tcs)
{
  int readLength = taskRead.Result;
  if (readLength > 0)
  {
    int newOffset = offset + readLength;
    Task<int> task = Task<int>.Factory.FromAsync(fileStream.BeginRead,newOffset,Math.Min(buffer.Length - newOffset,ReadSize),null);

    task.ContinueWith(callBackTask => OnReadBuffer(callBackTask,tcs));
  }
  else
  {
    tcs.TrySetResult(System.Text.Encoding.UTF8.GetString(buffer,buffer.Length));
    fileStream.Dispose();
  }
}

3、使用async 和 await方式讀取資料

  下面的示例中,使用了async和await關鍵字實現非同步讀取一個檔案的同時進行壓縮並寫入另一個檔案。所有位於await關鍵字之前的操作都運行於呼叫者執行緒,從await開始的操作都是在Continuation Task中執行。但有無法使用這兩個關鍵字的場合:①Task的結束時機不明確時;②必須用到多級Task和TaskCompletionSource時

/// <summary>
/// 同步方法的壓縮
/// </summary>
/// <param name="lstFiles">檔案清單</param>
public static void SyncCompress(IEnumerable<string> lstFiles)
{
  byte[] buffer = new byte[16384];
  foreach(string file in lstFiles)
  {
    using (FileStream inputStream = File.OpenRead(file))
    {
      using (FileStream outputStream = File.OpenWrite(file + ".compressed"))
      {
        using (System.IO.Compression.GZipStream compressStream = new System.IO.Compression.GZipStream(outputStream,System.IO.Compression.CompressionMode.Compress))
        {
          int read = 0;
          while((read=inputStream.Read(buffer,buffer.Length))>0)
          {
            compressStream.Write(buffer,read);
          }
        }
      }
    }
  }
}

/// <summary>
/// 非同步方法的檔案壓縮
/// </summary>
/// <param name="lstFiles">需要壓縮的檔案</param>
/// <returns></returns>
public static async Task AsyncCompress(IEnumerable<string> lstFiles)
{
  byte[] buffer = new byte[16384];
  foreach(string file in lstFiles)
  {
    using (FileStream inputStream = File.OpenRead(file))
    {
      using (FileStream outputStream = File.OpenWrite(file + ".compressed"))
      {
        using (System.IO.Compression.GZipStream compressStream = new System.IO.Compression.GZipStream(outputStream,System.IO.Compression.CompressionMode.Compress))
        {
          int read = 0;
          while ((read = await inputStream.ReadAsync(buffer,buffer.Length)) > 0)
          {
            await compressStream.WriteAsync(buffer,read);
          }
        }
      }
    }
  }
}

以上就是c# 使用Task實現非阻塞式的I/O操作的詳細內容,更多關於c# 實現非阻塞式的I/O操作的資料請關注我們其它相關文章!