1. 程式人生 > 資料庫 >SQL Server 批量插入資料的完美解決方案

SQL Server 批量插入資料的完美解決方案

一、Sql Server插入方案介紹

關於 SqlServer 批量插入的方式,有三種比較常用的插入方式,InsertBatchInsertSqlBulkCopy,下面我們對比以下三種方案的速度

1.普通的Insert插入方法

public static void Insert(IEnumerable<Person> persons)
{
  using (var con = new SqlConnection("Server=.;Database=DemoDataBase;User ID=sa;Password=8888;"))
  {
    con.Open();
    foreach (var person in persons)
    {
      using (var com = new SqlCommand(
        "INSERT INTO dbo.Person(Id,Name,Age,CreateTime,Sex)VALUES(@Id,@Name,@Age,@CreateTime,@Sex)",con))
      {
        com.Parameters.AddRange(new[]
        {
          new SqlParameter("@Id",SqlDbType.BigInt) {Value = person.Id},new SqlParameter("@Name",SqlDbType.VarChar,64) {Value = person.Name},new SqlParameter("@Age",SqlDbType.Int) {Value = person.Age},new SqlParameter("@CreateTime",SqlDbType.DateTime)
            {Value = person.CreateTime ?? (object) DBNull.Value},new SqlParameter("@Sex",SqlDbType.Int) {Value = (int)person.Sex},});
        com.ExecuteNonQuery();
      }
    }
  }
}

2.拼接BatchInsert插入語句

public static void BatchInsert(Person[] persons)
{
  using (var con = new SqlConnection("Server=.;Database=DemoDataBase;User ID=sa;Password=8888;"))
  {
    con.Open();
    var pageCount = (persons.Length - 1) / 1000 + 1;
    for (int i = 0; i < pageCount; i++)
    {
      var personList = persons.Skip(i * 1000).Take(1000).ToArray();
      var values = personList.Select(p =>
        $"({p.Id},'{p.Name}',{p.Age},{(p.CreateTime.HasValue ? $"'{p.CreateTime:yyyy-MM-dd HH:mm:ss}'" : "NULL")},{(int) p.Sex})");
      var insertSql =
        $"INSERT INTO dbo.Person(Id,Sex)VALUES{string.Join(",",values)}";
      using (var com = new SqlCommand(insertSql,con))
      {
        com.ExecuteNonQuery();
      }
    }
  }
}

3.SqlBulkCopy插入方案

public static void BulkCopy(IEnumerable<Person> persons)
{
  using (var con = new SqlConnection("Server=.;Database=DemoDataBase;User ID=sa;Password=8888;"))
  {
    con.Open();
    var table = new DataTable();
    table.Columns.AddRange(new []
    {
      new DataColumn("Id",typeof(long)),new DataColumn("Name",typeof(string)),new DataColumn("Age",typeof(int)),new DataColumn("CreateTime",typeof(DateTime)),new DataColumn("Sex",});
    foreach (var p in persons)
    {
      table.Rows.Add(new object[] {p.Id,p.Name,p.Age,p.CreateTime,(int) p.Sex});
    }

    using (var copy = new SqlBulkCopy(con))
    {
      copy.DestinationTableName = "Person";
      copy.WriteToServer(table);
    }
  }
}

3.三種方案速度對比

方案 數量 時間
Insert 1千條 145.4351ms
BatchInsert 1千條 103.9061ms
SqlBulkCopy 1千條 7.021ms
Insert 1萬條 1501.326ms
BatchInsert 1萬條 850.6274ms
SqlBulkCopy 1萬條 30.5129ms
Insert 10萬條 13875.4934ms
BatchInsert 10萬條 8278.9056ms
SqlBulkCopy 10萬條 314.8402ms

兩者插入效率對比,Insert明顯比SqlBulkCopy要慢太多,大概20~40倍效能差距,下面我們將SqlBulkCopy封裝一下,讓批量插入更加方便

二、SqlBulkCopy封裝程式碼

1.方法介紹

批量插入擴充套件方法簽名

方法 方法引數 介紹
BulkCopy 同步的批量插入方法
SqlConnection connection sql server 連線物件
IEnumerable<T> source 需要批量插入的資料來源
string tableName = null 插入表名稱【為NULL預設為實體名稱】
int bulkCopyTimeout = 30 批量插入超時時間
int batchSize = 0 寫入資料庫一批數量【如果為0代表全部一次性插入】最合適數量【這取決於您的環境,尤其是行數和網路延遲。就個人而言,我將從BatchSize屬性設定為1000行開始,然後看看其效能如何。如果可行,那麼我將使行數加倍(例如增加到2000、4000等),直到效能下降或超時。否則,如果超時發生在1000,那麼我將行數減少一半(例如500),直到它起作用為止。】
SqlBulkCopyOptions options = SqlBulkCopyOptions.Default 批量複製引數
SqlTransaction externalTransaction = null 執行的事務物件
BulkCopyAsync 非同步的批量插入方法
SqlConnection connection sql server 連線物件
IEnumerable<T> source 需要批量插入的資料來源
string tableName = null 插入表名稱【為NULL預設為實體名稱】
int bulkCopyTimeout = 30 批量插入超時時間
int batchSize = 0 寫入資料庫一批數量【如果為0代表全部一次性插入】最合適數量【這取決於您的環境,尤其是行數和網路延遲。就個人而言,我將從BatchSize屬性設定為1000行開始,然後看看其效能如何。如果可行,那麼我將使行數加倍(例如增加到2000、4000等),直到效能下降或超時。否則,如果超時發生在1000,那麼我將行數減少一半(例如500),直到它起作用為止。】
SqlBulkCopyOptions options = SqlBulkCopyOptions.Default 批量複製引數
SqlTransaction externalTransaction = null 執行的事務物件

這個方法主要解決了兩個問題:

  • 免去了手動構建DataTable或者IDataReader介面實現類,手動構建的轉換比較難以維護,如果修改欄位就得把這些地方都進行修改,特別是還需要將列舉型別特殊處理,轉換成他的基礎型別(預設int
  • 不用親自建立SqlBulkCopy物件,和配置資料庫列的對映,和一些屬性的配置

此方案也是在我公司中使用,以滿足公司的批量插入資料的需求,例如第三方的對賬資料此方法使用的是Expression動態生成資料轉換函式,其效率和手寫的原生程式碼差不多,和原生手寫程式碼相比,多餘的轉換損失很小【最大的效能損失都是在值型別拆裝箱上】

此方案和其他網上的方案有些不同的是:不是將List先轉換成DataTable,然後寫入SqlBulkCopy的,而是使用一個實現IDataReader的讀取器包裝List,每往SqlBulkCopy插入一行資料才會轉換一行資料

IDataReader方案和DataTable方案相比優點

效率高:DataTable方案需要先完全轉換後,才能交由SqlBulkCopy寫入資料庫,而IDataReader方案可以邊轉換邊交給SqlBulkCopy寫入資料庫(例如:10萬資料插入速度可提升30%)

佔用記憶體少:DataTable方案需要先完全轉換後,才能交由SqlBulkCopy寫入資料庫,需要佔用大量記憶體,而IDataReader方案可以邊轉換邊交給SqlBulkCopy寫入資料庫,無須佔用過多記憶體

強大:因為是邊寫入邊轉換,而且EnumerableReader傳入的是一個迭代器,可以實現持續插入資料的效果

2.實現原理

① 實體Model與表對映

資料庫表程式碼

CREATE TABLE [dbo].[Person](
	[Id] [BIGINT] NOT NULL,[Name] [VARCHAR](64) NOT NULL,[Age] [INT] NOT NULL,[CreateTime] [DATETIME] NULL,[Sex] [INT] NOT NULL,PRIMARY KEY CLUSTERED 
(
	[Id] ASC
)WITH (PAD_INDEX = OFF,STATISTICS_NORECOMPUTE = OFF,IGNORE_DUP_KEY = OFF,ALLOW_ROW_LOCKS = ON,ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]

實體類程式碼

public class Person
{
  public long Id { get; set; }
  public string Name { get; set; }
  public int Age { get; set; }
  public DateTime? CreateTime { get; set; }
  public Gender Sex { get; set; }
}

public enum Gender
{
  Man = 0,Woman = 1
}

  • 建立欄位對映【如果沒有此欄位對映會導致資料填錯位置,如果型別不對還會導致報錯】【因為:沒有此欄位對映預設是按照列序號對應插入的】
  • 建立對映使用的SqlBulkCopy型別的ColumnMappings屬性來完成,資料列與資料庫中列的對映
//建立批量插入物件
using (var copy = new SqlBulkCopy(connection,options,externalTransaction))
{
  foreach (var column in ModelToDataTable<TModel>.Columns)
  {
    //建立欄位對映
    copy.ColumnMappings.Add(column.ColumnName,column.ColumnName);
  }
}

② 實體轉換成資料行

將資料轉換成資料行採用的是:反射+Expression來完成

其中反射是用於獲取編寫Expression所需程式類,屬性等資訊

其中Expression是用於生成高效轉換函式其中ModelToDataTable<TModel>型別利用了靜態泛型類特性,實現泛型引數的快取效果

ModelToDataTable<TModel>的靜態建構函式中,生成轉換函式,獲取需要轉換的屬性資訊,並存入靜態只讀欄位中,完成快取

③ 使用IDataReader插入資料的過載

EnumerableReader是實現了IDataReader介面的讀取類,用於將模型物件,在迭代器中讀取出來,並轉換成資料行,可供SqlBulkCopy讀取

SqlBulkCopy只會呼叫三個方法:GetOrdinalReadGetValue

  • 其中GetOrdinal只會在首行讀取每個列所代表序號【需要填寫:SqlBulkCopy型別的ColumnMappings屬性】
  • 其中Read方法是迭代到下一行,並呼叫ModelToDataTable<TModel>.ToRowData.Invoke()來將模型物件轉換成資料行object[]
  • 其中GetValue方法是獲取當前行指定下標位置的值

3.完整程式碼

擴充套件方法類

 public static class SqlConnectionExtension
  {
    /// <summary>
    /// 批量複製
    /// </summary>
    /// <typeparam name="TModel">插入的模型物件</typeparam>
    /// <param name="source">需要批量插入的資料來源</param>
    /// <param name="connection">資料庫連線物件</param>
    /// <param name="tableName">插入表名稱【為NULL預設為實體名稱】</param>
    /// <param name="bulkCopyTimeout">插入超時時間</param>
    /// <param name="batchSize">寫入資料庫一批數量【如果為0代表全部一次性插入】最合適數量【這取決於您的環境,尤其是行數和網路延遲。就個人而言,我將從BatchSize屬性設定為1000行開始,然後看看其效能如何。如果可行,那麼我將使行數加倍(例如增加到2000、4000等),直到效能下降或超時。否則,如果超時發生在1000,那麼我將行數減少一半(例如500),直到它起作用為止。】</param>
    /// <param name="options">批量複製引數</param>
    /// <param name="externalTransaction">執行的事務物件</param>
    /// <returns>插入數量</returns>
    public static int BulkCopy<TModel>(this SqlConnection connection,IEnumerable<TModel> source,string tableName = null,int bulkCopyTimeout = 30,int batchSize = 0,SqlBulkCopyOptions options = SqlBulkCopyOptions.Default,SqlTransaction externalTransaction = null)
    {
      //建立讀取器
      using (var reader = new EnumerableReader<TModel>(source))
      {
        //建立批量插入物件
        using (var copy = new SqlBulkCopy(connection,externalTransaction))
        {
          //插入的表
          copy.DestinationTableName = tableName ?? typeof(TModel).Name;
          //寫入資料庫一批數量
          copy.BatchSize = batchSize;
          //超時時間
          copy.BulkCopyTimeout = bulkCopyTimeout;
          //建立欄位對映【如果沒有此欄位對映會導致資料填錯位置,如果型別不對還會導致報錯】【因為:沒有此欄位對映預設是按照列序號對應插入的】
          foreach (var column in ModelToDataTable<TModel>.Columns)
          {
            //建立欄位對映
            copy.ColumnMappings.Add(column.ColumnName,column.ColumnName);
          }
          //將資料批量寫入資料庫
          copy.WriteToServer(reader);
          //返回插入資料數量
          return reader.Depth;
        }
      }
    }

    /// <summary>
    /// 批量複製-非同步
    /// </summary>
    /// <typeparam name="TModel">插入的模型物件</typeparam>
    /// <param name="source">需要批量插入的資料來源</param>
    /// <param name="connection">資料庫連線物件</param>
    /// <param name="tableName">插入表名稱【為NULL預設為實體名稱】</param>
    /// <param name="bulkCopyTimeout">插入超時時間</param>
    /// <param name="batchSize">寫入資料庫一批數量【如果為0代表全部一次性插入】最合適數量【這取決於您的環境,尤其是行數和網路延遲。就個人而言,我將從BatchSize屬性設定為1000行開始,然後看看其效能如何。如果可行,那麼我將使行數加倍(例如增加到2000、4000等),直到效能下降或超時。否則,如果超時發生在1000,那麼我將行數減少一半(例如500),直到它起作用為止。】</param>
    /// <param name="options">批量複製引數</param>
    /// <param name="externalTransaction">執行的事務物件</param>
    /// <returns>插入數量</returns>
    public static async Task<int> BulkCopyAsync<TModel>(this SqlConnection connection,column.ColumnName);
          }
          //將資料批量寫入資料庫
          await copy.WriteToServerAsync(reader);
          //返回插入資料數量
          return reader.Depth;
        }
      }
    }
  }

封裝的迭代器資料讀取器

 /// <summary>
  /// 迭代器資料讀取器
  /// </summary>
  /// <typeparam name="TModel">模型型別</typeparam>
  public class EnumerableReader<TModel> : IDataReader
  {
    /// <summary>
    /// 例項化迭代器讀取物件
    /// </summary>
    /// <param name="source">模型源</param>
    public EnumerableReader(IEnumerable<TModel> source)
    {
      _source = source ?? throw new ArgumentNullException(nameof(source));
      _enumerable = source.GetEnumerator();
    }

    private readonly IEnumerable<TModel> _source;
    private readonly IEnumerator<TModel> _enumerable;
    private object[] _currentDataRow = Array.Empty<object>();
    private int _depth;
    private bool _release;

    public void Dispose()
    {
      _release = true;
      _enumerable.Dispose();
    }

    public int GetValues(object[] values)
    {
      if (values == null) throw new ArgumentNullException(nameof(values));
      var length = Math.Min(_currentDataRow.Length,values.Length);
      Array.Copy(_currentDataRow,values,length);
      return length;
    }

    public int GetOrdinal(string name)
    {
      for (int i = 0; i < ModelToDataTable<TModel>.Columns.Count; i++)
      {
        if (ModelToDataTable<TModel>.Columns[i].ColumnName == name) return i;
      }

      return -1;
    }

    public long GetBytes(int ordinal,long dataIndex,byte[] buffer,int bufferIndex,int length)
    {
      if (dataIndex < 0) throw new Exception($"起始下標不能小於0!");
      if (bufferIndex < 0) throw new Exception("目標緩衝區起始下標不能小於0!");
      if (length < 0) throw new Exception("讀取長度不能小於0!");
      var numArray = (byte[])GetValue(ordinal);
      if (buffer == null) return numArray.Length;
      if (buffer.Length <= bufferIndex) throw new Exception("目標緩衝區起始下標不能大於目標緩衝區範圍!");
      var freeLength = Math.Min(numArray.Length - bufferIndex,length);
      if (freeLength <= 0) return 0;
      Array.Copy(numArray,dataIndex,buffer,bufferIndex,length);
      return freeLength;
    }

    public long GetChars(int ordinal,char[] buffer,int length)
    {
      if (dataIndex < 0) throw new Exception($"起始下標不能小於0!");
      if (bufferIndex < 0) throw new Exception("目標緩衝區起始下標不能小於0!");
      if (length < 0) throw new Exception("讀取長度不能小於0!");
      var numArray = (char[])GetValue(ordinal);
      if (buffer == null) return numArray.Length;
      if (buffer.Length <= bufferIndex) throw new Exception("目標緩衝區起始下標不能大於目標緩衝區範圍!");
      var freeLength = Math.Min(numArray.Length - bufferIndex,length);
      return freeLength;
    }

    public bool IsDBNull(int i)
    {
      var value = GetValue(i);
      return value == null || value is DBNull;
    }
    public bool NextResult()
    {
      //移動到下一個元素
      if (!_enumerable.MoveNext()) return false;
      //行層+1
      Interlocked.Increment(ref _depth);
      //得到資料行
      _currentDataRow = ModelToDataTable<TModel>.ToRowData.Invoke(_enumerable.Current);
      return true;
    }

    public byte GetByte(int i) => (byte)GetValue(i);
    public string GetName(int i) => ModelToDataTable<TModel>.Columns[i].ColumnName;
    public string GetDataTypeName(int i) => ModelToDataTable<TModel>.Columns[i].DataType.Name;
    public Type GetFieldType(int i) => ModelToDataTable<TModel>.Columns[i].DataType;
    public object GetValue(int i) => _currentDataRow[i];
    public bool GetBoolean(int i) => (bool)GetValue(i);
    public char GetChar(int i) => (char)GetValue(i);
    public Guid GetGuid(int i) => (Guid)GetValue(i);
    public short GetInt16(int i) => (short)GetValue(i);
    public int GetInt32(int i) => (int)GetValue(i);
    public long GetInt64(int i) => (long)GetValue(i);
    public float GetFloat(int i) => (float)GetValue(i);
    public double GetDouble(int i) => (double)GetValue(i);
    public string GetString(int i) => (string)GetValue(i);
    public decimal GetDecimal(int i) => (decimal)GetValue(i);
    public DateTime GetDateTime(int i) => (DateTime)GetValue(i);
    public IDataReader GetData(int i) => throw new NotSupportedException();
    public int FieldCount => ModelToDataTable<TModel>.Columns.Count;
    public object this[int i] => GetValue(i);
    public object this[string name] => GetValue(GetOrdinal(name));
    public void Close() => Dispose();
    public DataTable GetSchemaTable() => ModelToDataTable<TModel>.ToDataTable(_source);
    public bool Read() => NextResult();
    public int Depth => _depth;
    public bool IsClosed => _release;
    public int RecordsAffected => 0;
  }

模型物件轉資料行工具類

/// <summary>
  /// 物件轉換成DataTable轉換類
  /// </summary>
  /// <typeparam name="TModel">泛型型別</typeparam>
  public static class ModelToDataTable<TModel>
  {
    static ModelToDataTable()
    {
      //如果需要剔除某些列可以修改這段程式碼
      var propertyList = typeof(TModel).GetProperties().Where(w => w.CanRead).ToArray();
      Columns = new ReadOnlyCollection<DataColumn>(propertyList
        .Select(pr => new DataColumn(pr.Name,GetDataType(pr.PropertyType))).ToArray());
      //生成物件轉資料行委託
      ToRowData = BuildToRowDataDelegation(typeof(TModel),propertyList);
    }

    /// <summary>
    /// 構建轉換成資料行委託
    /// </summary>
    /// <param name="type">傳入型別</param>
    /// <param name="propertyList">轉換的屬性</param>
    /// <returns>轉換資料行委託</returns>
    private static Func<TModel,object[]> BuildToRowDataDelegation(Type type,PropertyInfo[] propertyList)
    {
      var source = Expression.Parameter(type);
      var items = propertyList.Select(property => ConvertBindPropertyToData(source,property));
      var array = Expression.NewArrayInit(typeof(object),items);
      var lambda = Expression.Lambda<Func<TModel,object[]>>(array,source);
      return lambda.Compile();
    }

    /// <summary>
    /// 將屬性轉換成資料
    /// </summary>
    /// <param name="source">源變數</param>
    /// <param name="property">屬性資訊</param>
    /// <returns>獲取屬性資料表示式</returns>
    private static Expression ConvertBindPropertyToData(ParameterExpression source,PropertyInfo property)
    {
      var propertyType = property.PropertyType;
      var expression = (Expression)Expression.Property(source,property);
      if (propertyType.IsEnum)
        expression = Expression.Convert(expression,propertyType.GetEnumUnderlyingType());
      return Expression.Convert(expression,typeof(object));
    }

    /// <summary>
    /// 獲取資料型別
    /// </summary>
    /// <param name="type">屬性型別</param>
    /// <returns>資料型別</returns>
    private static Type GetDataType(Type type)
    {
      //列舉預設轉換成對應的值型別
      if (type.IsEnum)
        return type.GetEnumUnderlyingType();
      //可空型別
      if (type.IsGenericType && type.GetGenericTypeDefinition() == typeof(Nullable<>))
        return GetDataType(type.GetGenericArguments().First());
      return type;
    }

    /// <summary>
    /// 列集合
    /// </summary>
    public static IReadOnlyList<DataColumn> Columns { get; }

    /// <summary>
    /// 物件轉資料行委託
    /// </summary>
    public static Func<TModel,object[]> ToRowData { get; }

    /// <summary>
    /// 集合轉換成DataTable
    /// </summary>
    /// <param name="source">集合</param>
    /// <param name="tableName">表名稱</param>
    /// <returns>轉換完成的DataTable</returns>
    public static DataTable ToDataTable(IEnumerable<TModel> source,string tableName = "TempTable")
    {
      //建立表物件
      var table = new DataTable(tableName);
      //設定列
      foreach (var dataColumn in Columns)
      {
        table.Columns.Add(new DataColumn(dataColumn.ColumnName,dataColumn.DataType));
      }

      //迴圈轉換每一行資料
      foreach (var item in source)
      {
        table.Rows.Add(ToRowData.Invoke(item));
      }

      //返回表物件
      return table;
    }
  }

三、測試封裝程式碼

1.測試程式碼

創表程式碼

CREATE TABLE [dbo].[Person](
	[Id] [BIGINT] NOT NULL,ALLOW_PAGE_LOCKS = ON) ON [PRIMARY]
) ON [PRIMARY]

實體類程式碼

定義的實體的屬性名稱需要和SqlServer列名稱型別對應

public class Person
{
  public long Id { get; set; }
  public string Name { get; set; }
  public int Age { get; set; }
  public DateTime? CreateTime { get; set; }
  public Gender Sex { get; set; }
}

public enum Gender
{
  Man = 0,Woman = 1
}

測試方法

//生成10萬條資料
var persons = new Person[100000];
var random = new Random();
for (int i = 0; i < persons.Length; i++)
{
  persons[i] = new Person
  {
    Id = i + 1,Name = "張三" + i,Age = random.Next(1,128),Sex = (Gender)random.Next(2),CreateTime = random.Next(2) == 0 ? null : (DateTime?) DateTime.Now.AddSeconds(i)
  };
}

//建立資料庫連線
using (var conn = new SqlConnection("Server=.;Database=DemoDataBase;User ID=sa;Password=8888;"))
{
  conn.Open();
  var sw = Stopwatch.StartNew();
  //批量插入資料
  var qty = conn.BulkCopy(persons);
  sw.Stop();
  Console.WriteLine(sw.Elapsed.TotalMilliseconds + "ms");
}

執行批量插入結果

226.4767ms
請按任意鍵繼續. . .

SQL Server 批量插入資料的完美解決方案

四、程式碼下載

GitHub程式碼地址:https://github.com/liu-zhen-liang/PackagingComponentsSet/tree/main/SqlBulkCopyComponents