Storm 的流量控制和多執行緒併發處理
阿新 • • 發佈:2019-01-28
- 面臨問題:
storm多執行緒的時候,會遇到併發修改的問題,會報concurrentModificationException,如下圖所示
- 解決方法:
- 第一種治標不治本的方法:
一方面,對傳送到kafka的資料進行控制,將執行緒sleep的時間變長
if(count==18000)
{
try {
Thread.sleep(1000);
time++;
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
count=0;
}
另一方面,可以從處理的執行緒入手,即不允許併發修改(單執行緒不會報這種錯誤);
可以將執行緒數設低,控制比較快的處理,即前一部分的資料處理過快,導致後面資料處理成為瓶頸,也就是要找到一個平衡點。
- 第二種網路上的其他人寫的方法(試過,都不管用):
1.hasNext()是不會丟擲ConcurrentModificationException的,將next()換成hasNext(),但是還是會用到next()對ArrayList進行遍歷
2.使用併發容器CopyOnWriteArrayList代替ArrayList
下面是原先程式碼。
public static void modify(ArrayList<frequent> afr, double count,String time, PatternTree tree)
{
for (frequent fr : afr)
{
ArrayList<String> item = fr.getItem();
double value = fr.getCount();
}
}
做了如下修改
public static void modify(List<frequent> afr, double count,String time, PatternTree tree)
{
for(Iterator it = afr.iterator(); it.hasNext();)
{
List<String> item=new CopyOnWriteArrayList<String>();
item = ((frequent) it.next()).getItem();
double value = ((frequent) it.next()).getCount();
}
}
最終還是報這個錯,不管是修改遍歷方法,還是換成CopyOnWriteArrayList,當一個執行緒執行modify方法的時候,另一個執行緒修改了modify中的afr,就會丟擲ConcurrentModificationException。
- 最終方案
將ConcurrentModificationException的異常catch住,這樣導致了傳送資料會fail掉,storm有出錯重發的機制,所以會不斷重發,拖慢了處理速度,在後面的部落格中,我會介紹storm的Qos的方法,減少fail的資料,加快處理的速度。