Java Worker 設計模式
Worker模式
想解決的問題
非同步執行一些任務,有返回或無返回結果
使用動機
有些時候想執行一些非同步任務,如非同步網路通訊、daemon任務,但又不想去管理這任務的生命周。這個時候可以使用Worker模式,它會幫您管理與執行任務,並能非常方便地獲取結果
結構
很多人可能為覺得這與executor很像,但executor是多執行緒的,它的作用更像是一個規劃中心。而Worker則只是個搬運工,它自己本身只有一個執行緒的。每個worker有自己的任務處理邏輯,為了實現這個目的,有兩種方式
1. 建立一個抽象的AbstractWorker,不同邏輯的worker對其進行不同的實現;2. 對worker新增一個TaskProcessor不同的任務傳入不同的processor即可。
第二種方式worker的角色可以很方便地改變,而且可以隨時更換processor,可以理解成可”刷機”的worker ^ ^。這裡我們使用第二種方式來介紹此模式的整體結構。
針對上圖,詳細介紹一下幾個角色:
-
ConfigurableWorker:顧名思義這個就是真正幹活的worker了。要實現自我生命週期管理,需要實現Runable,這樣其才能以單獨的執行緒執行,需要注意的是work最好以daemon執行緒的方式執行。worker裡面還包括幾個其它成員:taskQueue,一個阻塞性質的queue,一般BlockingArrayList就可以了,這樣任務是FIFO(先進先出)的,如果要考慮任務的優先順序,則可以考慮使用PriorityBlockingQueue;listeners,根據事件進行劃分的事件監聽者,以便於當一個任務完成的時候進行處理,需要注意的是,為了較高效地進行listener遍歷,這裡我推薦使用CopyOnWriteArrayList,免得每次都複製。其對應的方法有addlistener、addTask等配套方法,這個都不多說了,更詳細的可以看後面的示例程式碼。
- WorkerTask:實際上這是一個抽象的工內容,其包括基本的id與,task的ID是Worker生成的,相當於遞wtte後的一個執回,當資料執行完了的時候需要使用這個id來取結果。而後面真正實現的實體task則包含任務處理時需要的資料。
- Processor:為了實現可”刷機”的worker,我們將處理邏輯與worker分開來,processor的本職工作很簡單,只需要加工傳入的task資料即可,加工完成後觸發fireEvent(WorkerEvent.TASK_COMPLETE)事件,之後通過Future的get即可得到最終的資料。
另外再說一點,對於addTask,可以有一個overload的方法,即在輸入task的同時,傳入一個RejectPolice,這樣可以在size過大的時候做出拒絕操作,有效避免被撐死。
適用性/問題
這種設計能自動處理任務,並能根據任務的優先順序自動調節任務的執行順序,一個完全獨立的thread,你完全可以將其理解成一專門負責幹某種活的”機器人”。它可以用於處理一些定時、請求量固定均勻且對實時性要求不是太高的任務,如日誌記錄,資料分析等。當然,如果想提高任務處理的資料,可以生成多個worker,就相當於僱傭更多的人來為你幹活,非常直觀的。當然這樣一來,誰來維護這worker便成了一個問題,另外就目前這種設計下worker之間是沒有通訊與協同的,這些都是改進點。
那麼對於多個worker,有什麼組織方式呢?這裡我介紹三種,算是拋磚引玉:
流水線式worker(assembly-line worker)
就像生產車間上的流水線工人一樣,將任務切分成幾個小塊,每個worker負責自己的一部分,以提高整體的生產、產出效率,如下圖:
假設完成任務 t 需要的時間為:W(t)=n,那麼將任務分解成m份,流水線式的執行,每小份需要的時間便為 W(t/m)=n/m,那麼執行1000條任務的時間,單個為1000n,流水線長度為L,則用這種方式所用的時間為(1000-1)*(m-L+1)*n/m+n 其中L<m,由此可見,流水線的worker越多、任務越細分,工作的效率將越高。這種主方式的問題在於,如果一個worker出現問題,那麼整個流水線就將停止工作。而且任務的優先順序不能動態呼叫,必須事先告知。
多級反饋佇列(Multilevel Feedback Queue)
這是一個有Q1、Q2...Qn個多重流水線方式,從高到低分別程式碼不同的優先順序,高優先順序的worker要多於低優先順序的,一般是2的倍數,即Q4有16個worker、Q3有8個,後面類推。任務根據預先估計好的優先順序進入,如果任務在某步的執行過長,直接踢到下一級,讓出最快的資源。如下圖所示:顯然這種方式的好處就在於可以動態地調整任務的優級,及時做出反應。當然,為了實現更好的高度,我們可以在低階裡增加一個閥值,使得放偶然放入低階的task可以有復活的機會^ ^。
MapReduce式
流水線雖然有一定的並行性,但總體來說仍然是序列的,因為只要有一個節點出了問題,那都是致命的錯誤。MapReduce是Google率先實現的一個分散式演算法,有非常好的並行執行效率。
如上圖所示,只要我們將Map與Reduce都改成Worker就行了,如MapWorker與ReduceWorker。這樣,可以看見,Map的過程是完全並行的,當然這樣就需要在Map與Reduce上的分配與資料組合上稍稍下一點功夫了。
樣例實現
這裡我們實現一個PageURLMiningWorker,對給定的URL,開啟頁面後,採取所有的URL,並反回結果進行彙總輸出。由於時間有限,這裡我只實現了單worker與MapReduce worker集兩種方式,有興趣的同學可以實現其它型別,如多級反饋佇列。注意!我這裡只是向大家展示這種設計模式,URL 抓取的效率不在本次考慮之列。所有的程式碼可以在這裡獲取:https://github.com/sefler1987/javaworker
單Worker實現樣例
package com.alibaba.taobao.main;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.TimeUnit;
import com.alibaba.taobao.worker.ConfigurableWorker;
import com.alibaba.taobao.worker.SimpleURLComparator;
import com.alibaba.taobao.worker.WorkerEvent;
import com.alibaba.taobao.worker.WorkerListener;
import com.alibaba.taobao.worker.WorkerTask;
import com.alibaba.taobao.worker.linear.PageURLMiningProcessor;
import com.alibaba.taobao.worker.linear.PageURLMiningTask;
/**
* Linear version of page URL mining. It's slow but simple.
* Average time cost for 1000 URLs is: 3800ms
*
* @author xuanyin.zy E-mail:[email protected]
* @since Sep 16, 2012 5:35:40 PM
*/
public class LinearURLMiningMain implements WorkerListener {
private static final String EMPTY_STRING = "";
private static final int URL_SIZE_TO_MINE = 10000;
private static ConcurrentHashMap<String, WorkerTask<?>> taskID2TaskMap = new ConcurrentHashMap<String, WorkerTask<?>>();
private static ConcurrentSkipListSet<String> foundURLs = new ConcurrentSkipListSet<String>(new SimpleURLComparator());
public static void main(String[] args) throws InterruptedException {
long startTime = System.currentTimeMillis();
ConfigurableWorker worker = new ConfigurableWorker("W001");
worker.setTaskProcessor(new PageURLMiningProcessor());
addTask2Worker(worker, new PageURLMiningTask("http://www.taobao.com"));
addTask2Worker(worker, new PageURLMiningTask("http://www.xinhuanet.com"));
addTask2Worker(worker, new PageURLMiningTask("http://www.zol.com.cn"));
addTask2Worker(worker, new PageURLMiningTask("http://www.163.com"));
LinearURLMiningMain mainListener = new LinearURLMiningMain();
worker.addListener(mainListener);
worker.start();
String targetURL = EMPTY_STRING;
while (foundURLs.size() < URL_SIZE_TO_MINE) {
targetURL = foundURLs.pollFirst();
if (targetURL == null) {
TimeUnit.MILLISECONDS.sleep(50);
continue;
}
PageURLMiningTask task = new PageURLMiningTask(targetURL);
taskID2TaskMap.putIfAbsent(worker.addTask(task), task);
TimeUnit.MILLISECONDS.sleep(100);
}
worker.stop();
for (String string : foundURLs) {
System.out.println(string);
}
System.out.println("Time Cost: " + (System.currentTimeMillis() - startTime) + "ms");
}
private static void addTask2Worker(ConfigurableWorker mapWorker_1, PageURLMiningTask task) {
String taskID = mapWorker_1.addTask(task);
taskID2TaskMap.put(taskID, task);
}
@Override
public List<WorkerEvent> intrests() {
return Arrays.asList(WorkerEvent.TASK_COMPLETE, WorkerEvent.TASK_FAILED);
}
@Override
public void onEvent(WorkerEvent event, Object... args) {
if (WorkerEvent.TASK_FAILED == event) {
System.err.println("Error while extracting URLs");
return;
}
if (WorkerEvent.TASK_COMPLETE != event)
return;
PageURLMiningTask task = (PageURLMiningTask) args[0];
if (!taskID2TaskMap.containsKey(task.getTaskID()))
return;
foundURLs.addAll(task.getMinedURLs());
System.out.println("Found URL size: " + foundURLs.size());
taskID2TaskMap.remove(task.getTaskID());
}
}
MapReduce實現樣例
package com.alibaba.taobao.main;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.TimeUnit;
import com.alibaba.taobao.worker.ConfigurableWorker;
import com.alibaba.taobao.worker.SimpleURLComparator;
import com.alibaba.taobao.worker.WorkerEvent;
import com.alibaba.taobao.worker.WorkerListener;
import com.alibaba.taobao.worker.WorkerTask;
import com.alibaba.taobao.worker.mapreduce.Map2ReduceConnector;
import com.alibaba.taobao.worker.mapreduce.MapReducePageURLMiningTask;
import com.alibaba.taobao.worker.mapreduce.PageContentFetchProcessor;
import com.alibaba.taobao.worker.mapreduce.URLMatchingProcessor;
/**
* MapReduce version of page URL mining. It's very powerful.
*
* @author xuanyin.zy E-mail:[email protected]
* @since Sep 16, 2012 5:35:40 PM
*/
public class MapReduceURLMiningMain implements WorkerListener {
private static final String EMPTY_STRING = "";
private static final int URL_SIZE_TO_MINE = 10000;
private static ConcurrentHashMap<String, WorkerTask<?>> taskID2TaskMap = new ConcurrentHashMap<String, WorkerTask<?>>();
private static ConcurrentSkipListSet<String> foundURLs = new ConcurrentSkipListSet<String>(new SimpleURLComparator());
public static void main(String[] args) throws InterruptedException {
long startTime = System.currentTimeMillis();
// four mapers
List<ConfigurableWorker> mappers = new ArrayList<ConfigurableWorker>(4);
ConfigurableWorker mapWorker_1 = new ConfigurableWorker("W_M1");
ConfigurableWorker mapWorker_2 = new ConfigurableWorker("W_M2");
ConfigurableWorker mapWorker_3 = new ConfigurableWorker("W_M3");
ConfigurableWorker mapWorker_4 = new ConfigurableWorker("W_M4");
mapWorker_1.setTaskProcessor(new PageContentFetchProcessor());
mapWorker_2.setTaskProcessor(new PageContentFetchProcessor());
mapWorker_3.setTaskProcessor(new PageContentFetchProcessor());
mapWorker_4.setTaskProcessor(new PageContentFetchProcessor());
mappers.add(mapWorker_1);
mappers.add(mapWorker_2);
mappers.add(mapWorker_3);
mappers.add(mapWorker_4);
// one reducer
ConfigurableWorker reduceWorker_1 = new ConfigurableWorker("W_R1");
reduceWorker_1.setTaskProcessor(new URLMatchingProcessor());
// bind reducer to final result class
MapReduceURLMiningMain main = new MapReduceURLMiningMain();
reduceWorker_1.addListener(main);
// initiate tasks
addTask2Worker(mapWorker_1, new MapReducePageURLMiningTask("http://www.taobao.com"));
addTask2Worker(mapWorker_2, new MapReducePageURLMiningTask("http://www.xinhuanet.com"));
addTask2Worker(mapWorker_3, new MapReducePageURLMiningTask("http://www.zol.com.cn"));
addTask2Worker(mapWorker_4, new MapReducePageURLMiningTask("http://www.sina.com.cn/"));
// bind mapper to reduer
Map2ReduceConnector connector = new Map2ReduceConnector(Arrays.asList(reduceWorker_1));
mapWorker_1.addListener(connector);
mapWorker_2.addListener(connector);
mapWorker_3.addListener(connector);
mapWorker_4.addListener(connector);
// start all
mapWorker_1.start();
mapWorker_2.start();
mapWorker_3.start();
mapWorker_4.start();
reduceWorker_1.start();
String targetURL = EMPTY_STRING;
int lastIndex = 0;
while (foundURLs.size() < URL_SIZE_TO_MINE) {
targetURL = foundURLs.pollFirst();
if (targetURL == null) {
TimeUnit.MILLISECONDS.sleep(50);
continue;
}
lastIndex = ++lastIndex % mappers.size();
MapReducePageURLMiningTask task = new MapReducePageURLMiningTask(targetURL);
taskID2TaskMap.putIfAbsent(mappers.get(lastIndex).addTask(task), task);
TimeUnit.MILLISECONDS.sleep(100);
}
// stop all
mapWorker_1.stop();
mapWorker_2.stop();
mapWorker_3.stop();
mapWorker_4.stop();
reduceWorker_1.stop();
for (String string : foundURLs) {
System.out.println(string);
}
System.out.println("Time Cost: " + (System.currentTimeMillis() - startTime) + "ms");
}
private static void addTask2Worker(ConfigurableWorker mapWorker_1, MapReducePageURLMiningTask task) {
String taskID = mapWorker_1.addTask(task);
taskID2TaskMap.put(taskID, task);
}
@Override
public List<WorkerEvent> intrests() {
return Arrays.asList(WorkerEvent.TASK_COMPLETE, WorkerEvent.TASK_FAILED);
}
@Override
public void onEvent(WorkerEvent event, Object... args) {
if (WorkerEvent.TASK_FAILED == event) {
System.err.println("Error while extracting URLs");
return;
}
if (WorkerEvent.TASK_COMPLETE != event)
return;
MapReducePageURLMiningTask task = (MapReducePageURLMiningTask) args[0];
if (!taskID2TaskMap.containsKey(task.getTaskID()))
return;
foundURLs.addAll(task.getMinedURLs());
System.out.println("Found URL size: " + foundURLs.size());
taskID2TaskMap.remove(task.getTaskID());
}
}