多執行緒 取資料必須不重複的方案
阿新 • • 發佈:2018-11-16
最近一直在處理資料上傳和採集的問題, 因為寫在asp.net 裡面的web服務預設就是多執行緒的, 一個請求就是一個執行緒… 所以多執行緒之間為了不讀取重複的資料, 就成了問題.
資料必須嚴格不重複, 同樣的資料絕對不能處理2次…
多執行緒就更不能出現重複讀取的現象了.
自己現在也用的是另外一套非常蹩腳的方法, . 把資料取出來然後在記憶體裡面通過lock(object)的形式實現資料不重複處理的辦法. 但是這又牽扯到資料的取出和更新, 也比較麻煩. 雖然實現了, 但是後續的修改和變更邏輯極其複雜.
後來再百度上又看了一遍找到了我認為最完美的方法,其它的我感覺都不怎麼優美.
文章是下面這個.
找到了一種專門針對sqlserver的. 可以通過先更新同時通過deleted表(就像是在觸發器中使用一樣)取出的方式,來保證每條記錄只會被讀取一次。
declare @Rowid table(rowid int);
BEGIN
set rowcount 100; --一次讀取的行數
--先將要讀取的記錄狀態更新
update Sms set [status]= 1 output deleted.ID into @Rowid Where [status] = 0;
--讀取剛更新狀態的記錄
select * from Sms where ID in (select Rowid from @Rowid);
END
但是這種只是針對sqlserver的, 所以在這個的基礎上, 我設計改進了另外一種通用的方法.
同樣是發簡訊為例,
邏輯過程如下
1. 開啟事務, 保證update語句互不影響,
2. update top 100 Sms set status=@ThreadId where status = 0 ;
3. select * from Sms where status = @ThreadId;
4. 提交事務
如果不想影響status的狀態, 可以改成
1. 開啟事務, 保證update語句互不影響,
2. update top 100 Sms set processer = @ThreadId where status = 0 ;
3. select * from Sms where status = 0 and processer = @ThreadId;
4. 提交事務
這樣就可以在這個請求中, 確保取到的資料,沒有被其它執行緒取到. 因為每個執行緒的ThreadId肯定是不一樣的.
當然這個邏輯也可以升級一下把ThreadId 改成其它的某個有規則的能夠區分不同的任務的編號, 如果是分散式任務, 可以考慮前面再加個機器號.
或者把這個@ThreadId改成 orcale中的 sequence
如果是多執行緒只有一個程式在執行的話, 可以把這個數值通過單列模式在靜態變數裡面取資料.
每次任務執行前取一個 任務ID 當作@ThreadId.
我實際在用的程式碼
BBZQ表中加了3個欄位
欄位名 | 型別 | 說明 |
---|---|---|
W_JOBID | string | 任務ID,主要用它來分割不同的任務 |
W_PROCESSTIME | date | 處理時間, 主要用在處理失敗的或者未處理的, 超時10分鐘後會強行再次被獲取 |
ISUPLOAD | int | 是否已上傳,上傳成功後會更新此欄位 |
private static int _JobId = 0;
public string GetJobId()
{
lock (SynCacheObject)
{
_JobId = _JobId + 1;
return "JOB"+DateTime.Now.ToString("yyyyMMddHHmmssfff_")+_JobId;
}
}
public List<DataChangeLog> GetCHGAndWSWJobs()
{
//ReturnLogs rlog = new ReturnLogs();
var jobid = GetJobId(); //這裡通過lock 鎖定取得唯一的編號
//AND(W_JOBID is null OR W_PROCESSTIME < SYSDATE - 10 / 1440)-- 超時10分鐘處理失敗的或者未處理的的也會強行再次被獲取,因為處理的部分有防止重複執行的功能, 所以可以重複執行
//order by EXECUTEDATE asc 如果不按照事件發生時間排序,反稽核之後又稽核的的資料可能會被刪掉
//with temptable as (**) 臨時表的寫法是因為orcale 在 update 語句中 子查詢不支援orderby 所以用了臨時表.
var sql = @"update BBZQ set W_JOBID = :JobId, W_PROCESSTIME=SYSDATE where JGID=2 AND XH in (
with temptable as(
select XH
from BBZQ
where ROWNUM < 200
AND ( W_JOBID is null OR W_PROCESSTIME < SYSDATE - 10/1440 )
AND ISUPLOAD = 0
AND EXECUTEDATE > SYSDATE - 30
AND STATUS in(15, 16)
AND JGID=2
order by EXECUTEDATE asc
)
select XH from temptable
)";
var i = DB.Execute(sql, new { JobId = jobid });
if (i==0) //如果沒有更新到資料也就直接返回了, 無需再次查詢
{
return null;
}
sql = @" SELECT *
FROM BBZQ
WHERE W_JOBID = :JobId
ORDER BY EXECUTEDATE asc";
// throw new Exception(sql);
return DB.Query<DataChangeLog>(sql, new { JobId = jobid }).ToList();
}