程式設計師過關斬將--自定義執行緒池來實現文件轉碼
阿新 • • 發佈:2020-01-11
背景
我司在很久之前,一位很久之前的同事寫過一個文件轉圖片的服務,具體業務如下:
- 使用者在客戶端上傳文件,可以是ppt,word,pdf 等格式,使用者上傳完成可以在客戶端預覽上傳的文件,預覽的時候採用的是圖片形式(不要和我說用別的方式預覽,現在已經來不及了)
- 當用戶把文件上傳到雲端之後(阿里雲),把文件相關的資訊記錄在資料庫,然後等待轉碼完成
- 伺服器有一個轉碼服務(其實就是一個windows service)不停的在輪訓待轉碼的資料,如果有待轉碼的資料,則從資料庫取出來,然後根據文件的網路地址下載到本地進行轉碼(轉成多張圖片)
- 當文件轉碼完畢,把轉碼出來的圖片上傳到雲端,並把雲端圖片的資訊記錄到資料庫
- 客戶端有預覽需求的時候,根據資料庫來判斷有沒有轉碼成功,如果成功,則獲取資料來顯示。
文件預覽的整體過程如以上所說,老的轉碼服務現在什麼問題呢?
- 由於一個文件同時只能被一個執行緒進行轉碼操作,所以老的服務採用了把待轉碼資料劃分管道的思想,一共有六個管道,對映到資料庫大體就是 Id=》管道ID 這個樣子。
- 一個控制檯程式,根據配置檔案資訊,讀取某一個管道待轉碼的文件,然後單執行緒進行轉碼操作
- 一共有六個管道,所以伺服器上起了六個cmd的黑視窗……
- 有的時候個別文件由於格式問題或者其他問題 轉碼過程中會卡住,具體的表現為:停止了轉碼操作。
- 如果程式卡住了,需要運維人員重新啟動轉碼cmd視窗(這種維護比較蛋疼)
後來機緣巧合,這個程式的維護落到的菜菜頭上,維護了一週左右,大約重啟了10多次,終於忍受不了了,重新搞一個吧。仔細分析過後,刨除實際文件轉碼的核心操作之外,整個轉碼流程其實還有很多注意點
- 需要保證轉碼服務不被卡住,如果和以前一樣就沒有必要重新設計了
- 儘量避免開多個程序的方式,其實在這個業務場景下,多個程序和多個執行緒作用是一致的。
- 每個文件只能被轉碼一次,如果一個文件被轉碼多次,不僅浪費了伺服器資源,而且還有可能會有資料不一致的情況發生
- 轉碼失敗的文件需要有一定次數的重試,因為一次失敗不代表第二次失敗,所以一定要給失敗的文件再次被操作的機會
- 因為程式不停的把文件轉碼成本地圖片,所以需要保證這些檔案在轉碼完成在伺服器上刪除,不然的話,時間長了會生成很多無用的檔案
說了這麼多,其實需要注意的點還是很多的。以整個的轉碼流程來說,本質上是一個任務池的生產和消費問題,任務池中的任務就是待轉碼的文件,生產者不停的把待轉碼文件丟進任務池,消費者不停的把任務池中文件轉碼完成。
執行緒池
這很顯然和執行緒池很類似,菜菜之前就寫過一個執行緒池的文章,有興趣的同學可以去翻翻歷史。今天我們就以這個執行緒池來解決這個轉碼問題。執行緒池的本質是初始化一定數目的執行緒,不停的執行任務。
//執行緒池定義
public class LXThreadPool:IDisposable
{
bool PoolEnable = true; //執行緒池是否可用
List<Thread> ThreadContainer = null; //執行緒的容器
ConcurrentQueue<ActionData> JobContainer = null; //任務的容器
int _maxJobNumber; //執行緒池最大job容量
ConcurrentDictionary<string, DateTime> JobIdList = new ConcurrentDictionary<string, DateTime>(); //job的副本,用於排除某個job 是否在執行中
public LXThreadPool(int threadNumber,int maxJobNumber=1000)
{
if(threadNumber<=0 || maxJobNumber <= 0)
{
throw new Exception("執行緒池初始化失敗");
}
_maxJobNumber = maxJobNumber;
ThreadContainer = new List<Thread>(threadNumber);
JobContainer = new ConcurrentQueue<ActionData>();
for (int i = 0; i < threadNumber; i++)
{
var t = new Thread(RunJob);
t.Name = $"轉碼執行緒{i}";
ThreadContainer.Add(t);
t.Start();
}
//清除超時任務的執行緒
var tTimeOutJob = new Thread(CheckTimeOutJob);
tTimeOutJob.Name = $"清理超時任務執行緒";
tTimeOutJob.Start();
}
//往執行緒池新增一個執行緒,返回執行緒池的新執行緒數
public int AddThread(int number=1)
{
if(!PoolEnable || ThreadContainer==null || !ThreadContainer.Any() || JobContainer==null|| !JobContainer.Any())
{
return 0;
}
while (number <= 0)
{
var t = new Thread(RunJob);
ThreadContainer.Add(t);
t.Start();
number -= number;
}
return ThreadContainer?.Count ?? 0;
}
//向執行緒池新增一個任務,返回0:新增任務失敗 1:成功
public int AddTask(Action<object> job, object obj,string actionId, Action<Exception> errorCallBack = null)
{
if (JobContainer != null)
{
if(JobContainer.Count>= _maxJobNumber)
{
return 0;
}
//首先排除10分鐘還沒轉完的
var timeoOutJobList = JobIdList.Where(s => s.Value.AddMinutes(10) < DateTime.Now);
if(timeoOutJobList!=null&& timeoOutJobList.Any())
{
foreach (var timeoutJob in timeoOutJobList)
{
JobIdList.TryRemove(timeoutJob.Key,out DateTime v);
}
}
if (!JobIdList.Any(s => s.Key == actionId))
{
if(JobIdList.TryAdd(actionId, DateTime.Now))
{
JobContainer.Enqueue(new ActionData { Job = job, Data = obj, ActionId = actionId, ErrorCallBack = errorCallBack });
return 1;
}
else
{
return 101;
}
}
else
{
return 100;
}
}
return 0;
}
private void RunJob()
{
while (JobContainer != null && PoolEnable)
{
//任務列表取任務
ActionData job = null;
JobContainer?.TryDequeue(out job);
if (job == null)
{
//如果沒有任務則休眠
Thread.Sleep(20);
continue;
}
try
{
//執行任務
job.Job.Invoke(job.Data);
}
catch (Exception error)
{
//異常回調
if (job != null&& job.ErrorCallBack!=null)
{
job?.ErrorCallBack(error);
}
}
finally
{
if (!JobIdList.TryRemove(job.ActionId,out DateTime v))
{
}
}
}
}
//終止執行緒池
public void Dispose()
{
PoolEnable = false;
JobContainer = null;
if (ThreadContainer != null)
{
foreach (var t in ThreadContainer)
{
//強制執行緒退出並不好,會有異常
t.Join();
}
ThreadContainer = null;
}
}
//清理超時的任務
private void CheckTimeOutJob()
{
//首先排除10分鐘還沒轉完的
var timeoOutJobList = JobIdList.Where(s => s.Value.AddMinutes(10) < DateTime.Now);
if (timeoOutJobList != null && timeoOutJobList.Any())
{
foreach (var timeoutJob in timeoOutJobList)
{
JobIdList.TryRemove(timeoutJob.Key, out DateTime v);
}
}
System.Threading.Thread.Sleep(60000);
}
}
public class ActionData
{
//任務的id,用於排重
public string ActionId { get; set; }
//執行任務的引數
public object Data { get; set; }
//執行的任務
public Action<object> Job { get; set; }
//發生異常時候的回撥方法
public Action<Exception> ErrorCallBack { get; set; }
}
以上就是一個執行緒池的具體實現,和具體的業務無關,完全可以用於任何適用於執行緒池的場景,其中有一個注意點,我新加了任務的標示,主要用於排除重複的任務被投放多次(只排除正在執行中的任務)。當然程式碼不是最優的,有需要的同學可以自己去優化
使用執行緒池
接下來,我們利用以上的執行緒池來完成我們的文件轉碼任務,首先我們啟動的時候初始化一個執行緒池,並啟動一個獨立執行緒來不停的往執行緒池來輸送任務,順便起了一個監控執行緒去監視傳送任務的執行緒
string lastResId = null;
string lastErrorResId = null;
Dictionary<string, int> ResErrNumber = new Dictionary<string, int>(); //轉碼失敗的資源重試次數
int MaxErrNumber = 5;//最多轉碼錯誤的資源10次
Thread tPutJoj = null;
LXThreadPool pool = new LXThreadPool(4,100);
public void OnStart()
{
//初始化一個執行緒傳送轉碼任務
tPutJoj = new Thread(PutJob);
tPutJoj.IsBackground = true;
tPutJoj.Start();
//初始化 監控執行緒
var tMonitor = new Thread(MonitorPutJob);
tMonitor.IsBackground = true;
tMonitor.Start();
}
//監視發放job的執行緒
private void MonitorPutJob()
{
while (true)
{
if(tPutJoj == null|| !tPutJoj.IsAlive)
{
Log.Error($"傳送轉碼任務執行緒停止==========");
tPutJoj = new Thread(PutJob);
tPutJoj.Start();
Log.Error($"傳送轉碼任務執行緒重新初始化並啟動==========");
}
System.Threading.Thread.Sleep(5000);
}
}
private void PutJob()
{
while (true)
{
try
{
//先搜尋等待轉碼的
var fileList = DocResourceRegisterProxy.GetFileList(new int[] { (int)FileToImgStateEnum.Wait }, 30, lastResId);
Log.Error($"拉取待轉碼記錄===總數:lastResId:{lastResId},結果:{fileList?.Count() ?? 0}");
if (fileList == null || !fileList.Any())
{
lastResId = null;
Log.Error($"待轉碼數量為0,開始拉取轉碼失敗記錄,重新轉碼==========");
//如果無待轉,則把出錯的 嘗試
fileList = DocResourceRegisterProxy.GetFileList(new int[] { (int)FileToImgStateEnum.Error, (int)FileToImgStateEnum.TimeOut, (int)FileToImgStateEnum.Fail }, 1, lastErrorResId);
if (fileList == null || !fileList.Any())
{
lastErrorResId = null;
}
else
{
// Log.Error($"開始轉碼失敗記錄:{JsonConvert.SerializeObject(fileList)}");
List<DocResourceRegister> errFilter = new List<DocResourceRegister>();
foreach (var errRes in fileList)
{
if (ResErrNumber.TryGetValue(errRes.res_id, out int number))
{
if (number > MaxErrNumber)
{
Log.Error($"資源:{errRes.res_id} 轉了{MaxErrNumber}次不成功,放棄===========");
continue;
}
else
{
errFilter.Add(errRes);
ResErrNumber[errRes.res_id] = number + 1;
}
}
else
{
ResErrNumber.Add(errRes.res_id, 1);
errFilter.Add(errRes);
}
}
fileList = errFilter;
if (fileList.Any())
{
lastErrorResId = fileList.Select(s => s.res_id).Max();
}
}
}
else
{
lastResId = fileList.Select(s => s.res_id).Max();
}
if (fileList != null && fileList.Any())
{
foreach (var file in fileList)
{
//如果 任務投放執行緒池失敗,則等待一面繼續投放
int poolRet = 0;
while (poolRet <= 0)
{
poolRet = pool.AddTask(s => {
AliFileService.ConvertToImg(file.res_id + $".{file.res_ext}", FileToImgFac.Instance(file.res_ext));
}, file, file.res_id);
if (poolRet <= 0 || poolRet > 1)
{
Log.Error($"發放轉碼任務失敗==========執行緒池返回結果:{poolRet}");
System.Threading.Thread.Sleep(1000);
}
}
}
}
//每一秒去資料庫取一次資料
System.Threading.Thread.Sleep(3000);
}
catch
{
continue;
}
}
}
以上就是發放任務,執行緒池執行任務的所有程式碼,由於具體的轉碼程式碼涉及到隱私,這裡不在提供,如果有需要可以私下找菜菜索要,雖然我深知還有更優的方式,但是我覺得執行緒池這樣的思想可能會對部分人有幫助,其中任務超時的核心程式碼如下(採用了polly外掛):
var policy= Policy.Timeout(TimeSpan.FromSeconds(this.TimeOut), onTimeout: (context, timespan, task) =>
{
ret.State=Enum.FileToImgStateEnum.TimeOut;
});
policy.Execute(s=>{
.....
});
把你的更優方案寫在留言區吧,2020年大家越來越好