生產者消費者模型在專案中的實際應用
前言
不知道大家有沒有遇到過這種情況,某個介面的響應時間會隨著請求量的變大而越來越慢,明明CPU已經馬力全開了怎麼還會越來越
慢。又或者是客戶端呼叫http介面,對於客戶端來說只是一個入庫操作就可以直接返回結果繼續處理了,而你還要比較尷尬的拿著傳過
來的資料做一堆比較耗時的操作,導致這個介面的整體吞吐量上不去。業務耦合、沒有控制業務的處理量導致cup狂飆執行緒池瘋狂阻塞
是造成問題的元凶,而生產者消費者模型則是這方面的專家了。
生產者消費者的概念網上有很多,如果有不清楚的朋友可以在網上搜一下,這裡主要說一下生產者消費者模型的作用和在實際專案中
的具體應用。生產者消費者模型的作用有三個:
(1)解耦,這是生產者消費者模型附帶的作用,解耦意味著生產者和消費者之間的聯絡少,聯絡越少越可以獨自發展而不需要收到相互
的制約。
(2)非同步,生產者只需要將訊息放到佇列中,直接就可以進行其他的操作。而消費者則只負責從訊息佇列中取出訊息進行後續的邏輯處
理,當然在實際的場景中執行緒池也可以滿足非同步併發的功能,這個也不算是主要作用。
(3)通過平衡生產者的生產能力和消費者的消費能力來提升整個系統的執行效率,這個才是生產者消費者模型最重要的作用。
生產者消費者模型通過控制訊息佇列的大小可以達到流量削峰的效果,可以在自己的系統內部達到一種MQ的效果。當然除了正常的
功能實現外如何保證訊息不會丟一定會被百分百消費、在叢集的生產環境中怎麼保證宕機的系統任務可以分配到其他健康的系統繼續
消費、整個系統在重啟時如何自動消費上次沒消費完的任務、資料庫的主從不同步會不會對整個模型的健壯性有所影響,這些都是我們
在實際的開發使用中需要考慮的問題。
生產者消費者模型的一種實現
在開發之前我們先要結合自己專案的業務場景設計出一個臨時任務表,以保證任務的“安全”,然後在開始程式碼的編寫,以下程式碼裡可
能會包含一些虛擬碼,但是整體的實現步驟和一些容錯處理都會有所體現,下面看一下具體的實現:
臨時任務表的大概結構:
@Data public class TestTempMo { /** * */ private Integer id; /** * 待處理業務的Id */ private Integer logicId; /** * 本機ip */ private String ip; /** * 是否塞入任務佇列 */ private Boolean isTask; /** * 建立時間 */ private Date createDate; }
單例獲取阻塞佇列方法:
public class BlockingQueueUtils { public static BlockingQueue<TestTempMo> testTaskQueue; private BlockingQueueUtils() { } public BlockingQueue<TestTempMo> getInstance() { if (Objects.isNull(testTaskQueue)) { synchronized (this) { if (Objects.isNull(testTaskQueue)) { int cpuCores = Runtime.getRuntime().availableProcessors(); testTaskQueue = new ArrayBlockingQueue<>(cpuCores * 10); } } } return ocrScanTaskQueue; } }
任務生產者:
/** * 每臺機器只負責自己的任務(負載均衡由Nginx處理) * Created by lcy on 2019/10/27. */ @Service public class TestProducer implements Runnable{ private static Logger LOG = LoggerFactory.getLogger(TestProducer.class); /** 機器空閒時,定時掃描test_temp表的間隔 */ private static final long SCAN_PERIOD = 1000 * 10; private BlockingQueue<TestTempMo> testTaskQueue; @Resource //臨時任務表的Mapper類 private TestTempMapper testTempMapper; @Resource //自定義SQL類 private SelectForMasterMapper selectForMasterMapper; @Resource //錯誤日誌記錄類 private LogRecord logRecord; @Resource private BlockingQueueUtils blockingQueueUtils; @PostConstruct public void init() { try { //初始化臨時表狀態(防止機器重啟時有未處理的任務處理不掉) initTempTaskState(); testTaskQueue = blockingQueueUtils.getInstance(); new Thread(this, "ScanTempProducer").start(); } catch (Throwable e) { LogUtils.error(LOG, "初始化test生產者執行緒異常", e); throw new ExceptionInInitializerError(e); } } @Override @Transactional(rollbackFor = Throwable.class) public void run() { while(true) { /** 是否還有未執行完的任務 */ boolean hasMoreTask = false; long start = System.currentTimeMillis(); try { List<TestTempMo> taskTempMoList = produceTaskBatch(); if(CollectionUtils.isNotEmpty(taskTempMoList)) { for (TestTempMo taskTempMo : taskTempMoList) { //將任務塞入阻塞佇列 testTaskQueue.put(taskTempMo); //改變臨時表狀態,防止重複塞入任務佇列 taskTempMo.setIsTask(true); testTempMapper.updateByPrimaryKeySelective(taskTempMo); } /** 分頁查詢結果不止一頁,則認為還有更多的任務(強制查詢主庫) */ Double count = selectForMasterMapper.selectScanTempCount(ExternalOcrConstant.IP); if(count > 1) { hasMoreTask = true; } } } catch (Throwable e) { LogUtils.error(LOG, "test生產者執行緒發生異常", e); //記錄錯誤日誌(自定義方法,將錯誤日誌入庫傳送郵件方便及時處理問題) logRecord.selfDubboThrowableLogRecord(TraceUtils.getTrace(), "TestTempProducer"+"#"+"run", "test系統", (int)(System.currentTimeMillis()-start), e); } /** 沒有更多的任務,則休眠一段時間 */ if(!hasMoreTask) { waitAMoment(); } } } /** * 分頁查詢未完成的臨時表資訊(根據本機IP和狀態進行查詢) * @return */ private List<TestTempMo> produceTaskBatch() { try { //這裡使用自定義SQL強制查詢主庫,防止主從不一致(根據id或時間排序保證任務執行順序) List<TestTempMo> testTempMos = selectForMasterMapper.selectScanTempByPage(ExternalOcrConstant.IP); return testTempMos; } catch (Throwable e) { LogUtils.error(LOG, "獲取優先任務列表異常", e); throw new BusinessException(TestStatusEnum.SYSTEM_ERROR); } } private void waitAMoment() { try { Thread.sleep(SCAN_PERIOD); } catch (InterruptedException e) { LogUtils.error(LOG, "生產者執行緒休眠異常", e); } } /** * 初始化臨時表狀態(每臺機器只負責自己的任務) */ private void initTempTaskState(){ TestTempExample example = new TestTempExample(); example.createCriteria().andIpEqualTo(ExternalOcrConstant.IP).andIsTaskEqualTo(true); List<TestTempMo> testTempMos = testTempMapper.selectByExample(example); //存在遺留資料 if (CollectionUtils.isNotEmpty(testTempMos)){ for (TestTempMo testTempMo : testTempMos) { testTempMo.setIsTask(false); //將臨時表狀態改為初始狀態 testTempMapper.updateByPrimaryKeySelective(testTempMo); } } } }
任務消費者:
/** * Created by lcy on 2019/10/27. */ @Service public class TestTempConsumer implements Runnable{ private static Logger LOG = LoggerFactory.getLogger(TestTempConsumer.class); private BlockingQueue<TestTempMo> testTaskQueue; @Resource //錯誤日誌記錄類 private LogRecord logRecord; @Resource private BlockingQueueUtils blockingQueueUtils; @Resource //自定義SQL類 private SelectForMasterMapper selectForMasterMapper; @PostConstruct public void init() { testTaskQueue = blockingQueueUtils.getInstance(); new Thread(this, "TestConsumer").start(); } @Override public void run() { while(true) { //從阻塞佇列裡取出任務(如果沒有任務這裡會阻塞) TestTempMo taskTempMo = acquireTask(); //使用執行緒池多執行緒處理任務 ThreadPoolUtil.TestPool.execute(() -> { //具體的消費邏輯 consume(taskTempMo); }); } } /** * 從阻塞佇列裡取出任務 * @return */ private TestTempMo acquireTask() { try { TestTempMo testTemp = testTaskQueue.take(); return testTemp; } catch (InterruptedException e) { /** 簡單記錄異常,無需做特殊處理 */ LogUtils.error(LOG, "從佇列中獲取test任務異常", e); } return null; } /** * 消費邏輯(這裡的所有SQl都要強制查詢主庫否則會因為主從延遲而處理失敗) * @param taskTempMo */ private void consume(TestTempMo taskTempMo) { TraceUtils.beginTrace(); long start = System.currentTimeMillis(); try { LogUtils.info(LOG, "開始處理具體的邏輯"); //開始處理具體的邏輯... System.out.println("處理完啦"); } catch (Throwable e) { LogUtils.error(LOG, "處理具體邏輯時發生異常", e); //記錄錯誤日誌 logRecord.selfDubboThrowableLogRecord(TraceUtils.getTrace(), "TestTempConsumer"+"#"+"consume", "test系統,什麼資料:"+taskTempMo.getTestId(), (int)(System.currentTimeMillis()-start), e); } finally { try { //刪除任務表資料 selectForMasterMapper.delScanTemp(taskTempMo.getId()); } catch (Throwable e) { LogUtils.error(LOG, "刪除任務表資料異常", e,"id",taskTempMo.getId()); } LogUtils.info(LOG, "處理具體邏輯完成", "耗時(ms)", (System.currentTimeMillis() - start)); TraceUtils.endTrace(); } } }
當然僅僅只有上邊這些程式碼這個模型還是不夠可靠的,因為如果叢集中某臺機器宕機的話則該臺機器上的所有未處理完成的任務都會“陷入僵局”因此這個時候就需要其他的兄弟進行“接盤”操作了,這裡是使用ZK進行處理的:
ZK的操作類:
/** * ZK的連線工具類 * Created by lcy on 2019/10/27. */ @Component public class ZooKeeperClient { private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperClient.class); //dubbo服務地址 @Value("${dubbo.registry.addrss}") private String hosts; //本機環境地址 @Value("${dubbo.provider.group}") private String env; //超時時間 private final int SESSION_TIMEOUT = 5000; //根節點 private final String ROOT_NODE = "/test"; private ZooKeeper zk; private CountDownLatch latch = new CountDownLatch(1); @Resource //錯誤日誌記錄類 private LogRecord logRecord; @PostConstruct private void init() { try { //連結ZK initZookeeperClient(); } catch (Exception e) { LogUtils.error(LOG, "初始化ZooKeeperClient錯誤", e); throw new ExceptionInInitializerError("初始化ZooKeeperClient錯誤"); } } /** * 連結ZK * @throws Exception */ private synchronized void initZookeeperClient() throws Exception { LogUtils.info(LOG, "初始化Zookeeper連結", "hosts", hosts); zk = new ZooKeeper(hosts, SESSION_TIMEOUT, event -> { LogUtils.info(LOG, "處理ZooKeeper事件", "State", event.getState(), "Type", event.getType()); if (event.getState() == Watcher.Event.KeeperState.SyncConnected) { LogUtils.info(LOG, "連線建立"); latch.countDown(); } } ); // 等待連線建立 latch.await(); LogUtils.info(LOG, "成功建立ZooKeeper連線"); //判斷根節點是否存在 if (Objects.isNull(zk.exists(ROOT_NODE, false))){ //建立一個持久節點 zk.create(ROOT_NODE,"IP_Statistic".getBytes("UTF-8"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } //判斷環境節點是否存在 String envNode = ROOT_NODE + "/" + env; if (Objects.isNull(zk.exists(envNode, false))){ //建立環境節點 zk.create(envNode,("environment:" + env).getBytes("UTF-8"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } //建立IP臨時節點 String childNode = envNode + "/" + IPConstant.IP; String create = zk.create(childNode, ExternalOcrConstant.IP.getBytes("UTF-8"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); LogUtils.info(LOG, "建立IP節點成功", "create", create); } /** * 關閉資源 */ private void close() { if (Objects.nonNull(zk)) { try { zk.close(); zk = null; } catch (InterruptedException e) { LogUtils.error(LOG, "關閉ZK節點失敗", e, "path", ROOT_NODE); } } } /** * 重連ZK * @return */ private synchronized boolean reConnect() { long start = System.currentTimeMillis(); //關閉連結 close(); try { Thread.sleep(1000); initZookeeperClient(); return true; } catch (Exception e) { LogUtils.error(LOG, "重連ZooKeeper失敗", e); //記錄錯誤日誌 recordErroLog(e,"reConnect",start); } return false; } /** * 獲取活躍節點 * @return */ public synchronized List<String> fetchActiveNode() { long start = System.currentTimeMillis(); try { List<String> activeNodeList = zk.getChildren(ROOT_NODE + "/" + env, false); return activeNodeList; } catch (Throwable e) { LogUtils.error(LOG, "獲取ZK節點列表失敗", e, "path", ROOT_NODE); //記錄錯誤日誌 recordErroLog(e,"fetchActiveNode",start); //重連ZK reConnect(); return Lists.newArrayList(); } } /** * 記錄錯誤日誌 * @param e * @param methodName * @param start */ public void recordErroLog(Throwable e, String methodName, Long start){ logRecord.selfDubboThrowableLogRecord(TraceUtils.getTrace(), "ZooKeeperClient"+"#" + methodName, "test系統", (int)(System.currentTimeMillis()-start), e); } }
伺服器健康檢測:
/** * 伺服器健康檢測和未處理的任務分配類 * Created by lcy on 2019/10/27. */ @Component public class CheckServerProcess implements Runnable{ private static final Logger LOG = LoggerFactory.getLogger(CheckServerProcess.class); /** 檢查ZK健康狀況的間隔 */ private static final long CHECK_ZK = 1000 * 20; @Resource //臨時任務表Mapper類 private TestTempMapper testTempMapper; @Resource //錯誤日記記錄類 private LogRecord logRecord; @Resource //自定義SQL類 private SelectForMasterMapper selectForMasterMapper; @Resource //ZK的操作類 private ZooKeeperClient zooKeeperClient; @PostConstruct public void init() { new Thread(this, "CheckServerProcess").start(); } @Override public void run() { while(true) { //檢查伺服器的健康狀態,分配宕機的未完成任務 checkServerHealth(); waitAMoment(); } } /** * 檢查伺服器的健康狀態 */ public void checkServerHealth() { long start = System.currentTimeMillis(); List<String> taskIpList=Lists.newArrayList(); try { //查詢任務列表裡的全部Ip taskIpList = selectForMasterMapper.selectIpForOcrScanTemp(); //當前沒有臨時任務 if (CollectionUtils.isEmpty(taskIpList)){ return; } /** 從Zookeeper找到當前活動的機器 */ List<String> activeNodeList = zooKeeperClient.fetchActiveNode(); //活躍ip比任務ip數大於或等於則認為機器正常 if(activeNodeList.containsAll(taskIpList)) { return; } /** 全部IP去掉線上的IP,剩下的就是離線的IP */ taskIpList.removeAll(activeNodeList); LogUtils.info(LOG, "存在離線機器", "serverIp", taskIpList); //獲取離線機器的未完成任務 TestTempExample testTempExample =new TestTempExample(); testTempExample.createCriteria().andIpIn(taskIpList); List<TestTempMo> unDealTestTemp = testTempMapper.selectByExample(testTempExample); if(CollectionUtils.isEmpty(unDealOcrScanTemp)){ //沒有未完成的處理任務 return; } if (CollectionUtils.isNotEmpty(activeNodeList)){ //平均分配未完成的任務 List<TestTempMo> pendTestTempList = allotTask(unDealTestTemp, activeNodeList); //批量更新臨時表 batchUpdateTemp(pendTestTempList); LogUtils.info(LOG, "分配未處理test任務結束","deadIp", taskIpList, "task:", pendTestTempList); }else { LogUtils.error(LOG, "獲取ZK節點列表為空"); } }catch (Exception e){ LogUtils.error(LOG, "分配未處理test任務失敗", e,"serverIpMos",taskIpList); logRecord.selfDubboThrowableLogRecord(TraceUtils.getTrace(), "CheckServerProcess"+"#"+"checkServerHealth", "test系統", (int)(System.currentTimeMillis()-start), e); } } /** * 平均分配未完成的任務 * @param unDealTestTemp * @param activeNodeList */ public static List<TestTempMo> allotTask(List<TestTempMo> unDealTestTemp, List<String> activeNodeList) { List<TestTempMo> testTemp=Lists.newArrayList(); //每臺機器分配的任務數(平均分配) int taskCount = unDealTestTemp.size() / activeNodeList.size(); //分配個數奇偶判斷 int type = unDealTestTemp.size() % activeNodeList.size(); int count=0; for (String ip : activeNodeList) { Iterator<TestTempMo> it = unDealTestTemp.iterator(); while(it.hasNext()){ TestTempMo testTempMo = it.next(); testTempMo.setIp(ip); //初始化任務狀態 testTempMo.setIsTask(false); testTemp.add(testTempMo); it.remove(); count++; //如果任務數大於平均任務數任務數,則分配到下臺機器機器 if (type == 0){ if (count == taskCount){ count=0; break; } }else { if (count>taskCount){ count=0; break; } } } } return testTemp; } /** * 批量更新臨時表資料 * @param unDealTestTemp */ public void batchUpdateTemp(List<TestTempMo> unDealTestTemp){ for (TestTempMo testTempMo : unDealTestTemp) { testTempMapper.updateByPrimaryKeySelective(testTempMo); } } private void waitAMoment() { try { Thread.sleep(CHECK_ZK); } catch (InterruptedException e) { LogUtils.error(LOG, "生產者執行緒休眠異常", e); } } }
以上就是生產者消費者模型的全部思路了,1024程式設計師節的時候在公眾號上看到了一句話突然心裡感覺很暖:
我們在鍵盤上留下的餘溫,也將隨時代傳遞到更遠的將來。共勉!
&n