批量插入或更新數據
/// <summary>
/// 大數據單表批量插入,帶事務
/// </summary>
/// <param name="keepID"></param>
/// <param name="tableName"></param>
/// <param name="dt"></param>
/// <param name="connectionString"></param>
/// <returns></returns>
public static bool MsSqlBulkCopyInsert(bool keepID, string tableName, DataTable dt, ref string msg, string connectionString = null)
{
if (dt == null || dt.Rows.Count <= 0)
{
msg = "找不到數據";
return true;
}
using (SqlConnection conn = string.IsNullOrWhiteSpace(connectionString) ? new SqlConnection(MSSQLHelper.connectionString) : new SqlConnection(connectionString))
{
conn.Open();
using (SqlTransaction trans = conn.BeginTransaction())
{
try
{
using (SqlBulkCopy sbc = new SqlBulkCopy(conn, (keepID ? SqlBulkCopyOptions.KeepIdentity : SqlBulkCopyOptions.Default) | SqlBulkCopyOptions.FireTriggers, trans))
{
sbc.BatchSize = dt.Rows.Count; //每一批次中的行數
//Bulkcopy 無法判斷數據是否已經存在,如果在bulkcopy時發生數據庫返回錯誤,此次的bulkcopy操作會rollback。建議在數據庫中添加臨時表,bulkcopy的destination設置為臨時表,bulkcopy完成後運行從臨時表到真正表的插入操作,篩選duplicate數據並刪除
sbc.DestinationTableName = string.Concat("[", tableName + "]");
sbc.BulkCopyTimeout = 300;//超時之前操作完成所允許的秒數,大批量數量需要的時長5分鐘。 報錯:“超時時間已到。在操作完成之前超時時間已過或服務器未響應
foreach (DataColumn column in dt.Columns)
{
sbc.ColumnMappings.Add(column.ColumnName, column.ColumnName);
}
sbc.WriteToServer(dt);
trans.Commit();
return true;
}
}
catch (Exception err)
{
trans.Rollback();//回滾事務
msg = err.Message;
}
}
}
return false;
}
/// <summary>
/// 大數據多表批量插入,帶事務
/// </summary>
/// <param name="keepID"></param>
/// <param name="tableName"></param>
/// <param name="dt"></param>
/// <param name="connectionString"></param>
/// <returns></returns>
public static bool MsSqlBulkCopyManyInsert(bool keepID, Dictionary<string, DataTable> dic, ref string msg, string connectionString = null)
{
if (dic == null || dic.Count <= 0)
{
msg = "找不到數據";
return true;
}
using (SqlConnection conn = string.IsNullOrWhiteSpace(connectionString) ? new SqlConnection(MSSQLHelper.connectionString) : new SqlConnection(connectionString))
{
conn.Open();
using (SqlTransaction trans = conn.BeginTransaction())
{
try
{
using (SqlBulkCopy sbc = new SqlBulkCopy(conn, (keepID ? SqlBulkCopyOptions.KeepIdentity : SqlBulkCopyOptions.Default) | SqlBulkCopyOptions.FireTriggers, trans))
{
foreach (var item in dic)
{
string tableName = item.Key;
DataTable dt = item.Value;
if (dt == null || dt.Rows.Count <= 0)
{
continue;
}
sbc.BatchSize = dt.Rows.Count; //每一批次中的行數
//Bulkcopy 無法判斷數據是否已經存在,如果在bulkcopy時發生數據庫返回錯誤,此次的bulkcopy操作會rollback。建議在數據庫中添加臨時表,bulkcopy的destination設置為臨時表,bulkcopy完成後運行從臨時表到真正表的插入操作,篩選duplicate數據並刪除
sbc.DestinationTableName = string.Concat("[", tableName + "]");
sbc.BulkCopyTimeout = 300;//超時之前操作完成所允許的秒數,大批量數量需要的時長5分鐘。 報錯:“超時時間已到。在操作完成之前超時時間已過或服務器未響應
sbc.ColumnMappings.Clear();
foreach (DataColumn column in dt.Columns)
{
sbc.ColumnMappings.Add(column.ColumnName, column.ColumnName);
}
sbc.WriteToServer(dt);
}
trans.Commit();
return true;
}
}
catch (Exception err)
{
trans.Rollback();//回滾事務
msg = err.Message;
return false;
}
}
}
}
/// <summary>
/// 大數據批量更新
/// </summary>
/// <param name="table"></param>
/// <param name="tableName"></param>
/// <param name="limitWhere"></param>
/// <param name="onceUpdateNumber"></param>
/// <param name="connectionString"></param>
/// <returns></returns>
public static int MsSqlUpdate(DataTable table, string tableName, string limitWhere, int onceUpdateNumber = 5000, string connectionString = null)
{
int result = -1;
SqlConnection conn = new SqlConnection(string.IsNullOrWhiteSpace(connectionString) ? MSSQLHelper.connectionString :connectionString);
SqlCommand comm = conn.CreateCommand();
comm.CommandType = CommandType.Text;
SqlDataAdapter adapter = new SqlDataAdapter(comm);
SqlCommandBuilder commandBulider = new SqlCommandBuilder(adapter);
commandBulider.ConflictOption = ConflictOption.OverwriteChanges;
try
{
conn.Open();
//設置批量更新的每次處理條數
adapter.UpdateBatchSize = table.Rows.Count;
adapter.SelectCommand.Transaction = conn.BeginTransaction();
adapter.SelectCommand.CommandText = string.Format("select * from {0} where {1}", string.IsNullOrWhiteSpace(tableName) ? table.TableName : tableName, limitWhere);
result = adapter.Update(table);
adapter.SelectCommand.Transaction.Commit();/////提交事務
}
catch (Exception ex)
{
if (adapter.SelectCommand != null && adapter.SelectCommand.Transaction != null)
{
adapter.SelectCommand.Transaction.Rollback();
}
throw ex;
}
finally
{
conn.Close();
conn.Dispose();
}
return result;
}
批量插入或更新數據