1. 程式人生 > >HBase-服務端處理請求的過程

HBase-服務端處理請求的過程

Region的架構



 

HRegionServer:

配置:

hbase.client.retries.number (預設10)  客戶端的重試次數

hbase.regionserver.msginterval (預設3*1000) ???

hbase.regionserver.checksum.verify(預設false) 是否啟用checksum

hbase.server.thread.wakefrequency(預設10*1000) 執行緒檢查頻率

hbase.regionserver.numregionstoreport(預設10) ???

hbase.regionserver.handler.count(預設10) handler處理執行緒個數

hbase.regionserver.metahandler.count(預設10) 處理meta和root的執行緒個數

hbase.rpc.verbose(預設false)

hbase.regionserver.nbreservationblocks(預設4)

hbase.regionserver.compactionChecker.majorCompactPriority(預設Integer.MAX_VALUE)

HRegionServer的主要操作:

包含的類有

HRegion集合

Leases(租借時間檢查)

HMasterRegionInterface(管理hbase)

HServerLoad(hbase負載)

CompactSplitThread(用於合併處理)

MemStoreFlusher(用於重新整理memstore)

HLog(WAL相關)

LogRoller(日誌回滾)

ZooKeeperWatcher(zk監聽)

SplitLogWorker(用於切分日誌)

ExecutorService(使用者啟動open,close HRegion的執行緒池)

ReplicationSourceService和ReplicationSinkService(replication相關)

HealthCheckChore(健康檢查)

一些監聽類

MasterAddressTracker

CatalogTracker

ClusterStatusTracker

一些函式

postOpenDeployTasks() 此函式用於更新root表或meta表

各種CURD,scanner,increment操作

multi操作(對於delete和put)

對HRegion的flush,close,open(提交到執行緒池去做)

split,compact操作,這些最終由一個具體的HRegion去完成

啟動的執行緒

hbase.regionserver.executor.openregion.threads 3

hbase.regionserver.executor.openroot.threads 1

hbase.regionserver.executor.openmeta.threads 1

hbase.regionserver.executor.closeregion.threads 3

hbase.regionserver.executor.closeroot.threads 1

hbase.regionserver.executor.closemeta.threads 1

hlog roller

cache flusher

compact

health check

lease

WEB UI

replication

rpc server

split worker

HRegion

配置:

HRegion的主要操作:

1.CURD和increment操作

2.doMiniBatchMutation操作(用於delete和put)

3.對region的open,delete,init,close,以及addRegionToMeta等操作

4.snapshot

5.bulkload

6.split

7.compact(major,minor)

8.lock

包含的內部類

WriteState(在flush,close,compact時會根據這個類加鎖)

RegionScannerImpl(scan的region級別操作)

coprocessor的處理原理

  1. //HRegion的建構函式  
  2. coprocessorHost = new RegionCoprocessorHost(this, rsServices, conf);  
  3. //RegionCoprocessorHost類中  將自定義的coprocessor類載入進來,並放到集合中  
  4. protected SortedSet<E> coprocessors = new SortedCopyOnWriteSet<E>(new EnvironmentPriorityComparator());  
  5. public RegionCoprocessorHost類中() {  
  6.     // load system default cp's from configuration.  
  7.     loadSystemCoprocessors(conf,"hbase.coprocessor.region.classes");  
  8.     // load system default cp's for user tables from configuration.  
  9.     if (!HTableDescriptor.isMetaTable(region.getRegionInfo().getTableName())) {  
  10.         loadSystemCoprocessors(conf,"hbase.coprocessor.user.region.classes");         
  11.     }  
  12.     // load Coprocessor From HDFS  
  13.     loadTableCoprocessors(conf);  
  14. }  
  15. public void load相關函式() {  
  16.     //1.從當前執行緒上下文classloader中找到類並載入  
  17.     //2.放到coporcessors集合中  
  18. }  
  19. //coprocessor的執行過程  
  20. //coprocessorHost.preFlush()時候會遍歷執行所有集合中的處理器  
  21. HRegion#flush() {  
  22.     //1.coprocessorHost.preFlush();  
  23.     //2.flush  
  24.     //3.coprocessorHost.postFlush();  
  25. }  

服務端接收處理過程

HBaseServer$Listener的run()函式和doAccept()函式簡化如下  這是一個獨立的listene執行緒

  1. while (running) {  
  2.     SelectionKey key = null;  
  3.     selector.select(); // FindBugs IS2_INCONSISTENT_SYNC  
  4.     Iterator<SelectionKey> iter = selector.selectedKeys().iterator();  
  5.     while (iter.hasNext()) {  
  6.         key = iter.next();  
  7.         iter.remove();  
  8.         if (key.isValid()) {  
  9.             if (key.isAcceptable())  
  10.                 doAccept(key);  
  11.             }  
  12.         }  
  13.     }  
  14. }  
  15. void doAccept(SelectionKey key) {  
  16.     ServerSocketChannel server = (ServerSocketChannel) key.channel();  
  17.     currentReader = (currentReader + 1) % readers.length;  
  18.     Reader reader = readers[currentReader];  
  19.     readSelector.wakeup();  
  20.     SelectionKey readKey = reader.registerChannel(channel);  
  21.     c = getConnection(channel, System.currentTimeMillis());  
  22.     readKey.attach(c);  
  23. }  

HBaseServer$Listener$Reader的run()函式簡化如下   這是一個獨立的select執行緒

  1. while (running) {  
  2.     SelectionKey key = null;  
  3.     readSelector.select();  
  4.     while (adding) {  
  5.         this.wait(1000);  
  6.     }     
  7.     Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();  
  8.     while (iter.hasNext()) {  
  9.         key = iter.next();  
  10.         iter.remove();  
  11.         if (key.isValid()) {  
  12.             if (key.isReadable()) {  
  13.                 doRead(key);  
  14.             }  
  15.         }  
  16.     }  
  17. }  
  18. //doRead()主要是讀取遠端的資料並解析處理  
  19. //沒有這個process()函式,只是將邏輯簡化了一下展示而言  
  20. //解析id,param並封裝成一個Call物件,插入到併發佇列中,之後由Handler執行緒處理  
  21. void process() {  
  22.     int id = dis.readInt();   
  23.     param = ReflectionUtils.newInstance(paramClass, conf);//read param  
  24.     param.readFields(dis);  
  25.     Call call = new Call(id, param, this, responder, callSize);  
  26.     if (priorityCallQueue != null && getQosLevel(param) > highPriorityLevel) {  
  27.         priorityCallQueue.put(call);  
  28.     } else if (replicationQueue != null && getQosLevel(param) == HConstants.REPLICATION_QOS) {  
  29.         replicationQueue.put(call);  
  30.     } else {  
  31.         callQueue.put(call); // queue the call; maybe blocked here  
  32.     }  
  33. }  

HBaserServer$Handler的run()函式簡化如下

  1. public void run() {  
  2.     //這裡的myCallQueue和callQueue是一個佇列  
  3.     Call call = myCallQueue.take();  
  4.     Invocation call = (Invocation)param;  
  5.     Method method = protocol.getMethod(call.getMethodName(),  
  6.         call.getParameterClasses());  
  7.         Object[] params = call.getParameters();  
  8.     Object value = method.invoke(impl, params);  
  9.     //最後會呼叫到HBaserServer自身的某個函式  
  10.         //onlineRegions 是ConcurrentHashMap<String, HRegion>()  
  11.         String name = HRegionInfo.encodeRegionName(regionName)  
  12.         onlineRegions.get(name);  
  13.     Result r = region.getClosestRowBefore(row, family);  
  14.     return r;  
  15. }  

flush的過程

服務端是收到了客戶端發來的flushRegion請求,具體過程參見 客戶端請求過程一文

客戶端如果是flush全表,先是獲取這個表的所有region名字,然後做一次批量的flushRegion請求(多個請求),但是所有的請求都是在一個執行緒中執行的

和flush相關的類函式簡化如下,1-4是呼叫順序

1.HRegion#flushcache()

2.HRegion#internalFlushcache()

3.Store#internalFlushCache()

4.StoreFile$Writer#append()

  1. //重新整理region中的資料,注意有一個讀鎖  
  2. HRegion#flushcache() {  
  3.     try {  
  4.         lock.readLock().lock();  
  5.         internalFlushcache(status);  
  6.     } finally {  
  7.         lock.readLock().unlock();  
  8.     }  
  9. }  
  10. //這裡是遍歷獲取region中的所有store,然後對每個store都建立一個  
  11. //StoreFlusher物件,使用這個物件來重新整理資料  
  12. //注意在獲取所有Store的時候使用了寫鎖  
  13. HRegion#internalFlushcache() {  
  14.     try {  
  15.         this.updatesLock.writeLock().lock();  
  16.         List<StoreFlusher> storeFlushers = new ArrayList<StoreFlusher>(stores.size());  
  17.         for (Store s : stores.values()) {  
  18.             storeFlushers.add(s.getStoreFlusher(completeSequenceId));  
  19.         }     
  20.     } finally {  
  21.         this.updatesLock.writeLock().unlock();  
  22.     }  
  23.     for (StoreFlusher flusher : storeFlushers) {  
  24.         flusher.flushCache(status);  
  25.     }  
  26. }  
  27. //將memstore中的資料取出然後遍歷所有的KV  
  28. //將其重新整理到HFile中,注意重新整理的時候有一個flush鎖  
  29. Store#internalFlushCache() {  
  30.     InternalScanner scanner = null;  
  31.     KeyValueScanner memstoreScanner = new CollectionBackedScanner(set, this.comparator);      
  32.     Scan scan = new Scan();  
  33.     scan.setMaxVersions(scanInfo.getMaxVersions());  
  34.     scanner = new StoreScanner(this, scanInfo, scan,  
  35.         Collections.singletonList(memstoreScanner), ScanType.MINOR_COMPACT,  
  36.         this.region.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP);  
  37.     try {  
  38.         flushLock.lock();  
  39.         StoreFile.Writer writer = createWriterInTmp(set.size());  
  40.         List<KeyValue> kvs = new ArrayList<KeyValue>();  
  41.         boolean hasMore;  
  42.         do {  
  43.             hasMore = scanner.next(kvs, compactionKVMax);  
  44.             for (KeyValue kv : kvs) {  
  45.                 Writer.append(kv);  
  46.                 flushed += this.memstore.heapSizeChange(kv, true);        
  47.             }  
  48.             kvs.clear();  
  49.         }while(hasMore);      
  50.     } finally {  
  51.         flushLock.unlock();  
  52.     }  
  53. 相關推薦

    HBase-服務處理請求過程

    Region的架構 HRegionServer:配置:hbase.client.retries.number (預設10)  客戶端的重試次數hbase.regionserver.msginterval (預設3*1000) ???hbase.regionserver.che

    fastHttp服務處理請求過程

    1. Github 地址 https://github.com/valyala/fasthttp 2. fastHttp 服務端的處理請求的過程 1. 工作過程 ![fastHttp服務端工作過程](https://img2020.cnblogs.com/blog/947592/2021

    電商平臺專案之——Ajax請求服務處理完不跳到success

    1、問題描述:       最近在修改電商平臺的釋出商品頁面,釋出商品時,前端與後臺互動採用Ajax  Post請求,就這麼一個簡單的畫面,我遇到一個非常奇怪且困擾我很久的問題:       (1)商品釋出失敗(有時候能釋

    客戶請求時間和服務處理時間不匹配問題

    現象:       客戶端日誌,調一個介面耗時為2-3s      服務端日誌,處理一個請求耗時只有100-200ms   問題:      這明顯速度不匹配, 理論

    Android BLE與終端通信(三)——client與服務通信過程以及實現數據通信

    .sh 沒有 indexof 實例 解析 rip listview filter @override Android BLE與終端通信(三)——client與服務端通信過程以及實現數據通信 前面的終究僅僅是小知識點。上不了臺面,也僅僅能算是起

    HTTP form表單提交數據(enctype="multipart/form-data" method="post")的服務處理

    keyword -s detail put form表單 表單提交 class eth ron 參考鏈接:http://blog.csdn.net/u010018421/article/details/52833346 <form action="fileUpload

    Openldap服務centos7搭建過程

    交互 sch usr 記錄 cdata 簡單的 data 手動 color 一:前言 目前網上有很多的Openldap服務端搭建教程,但是大部分其實都還是6的部署過程,許多朋友在centos7下按照網上的文檔進行部署,會發現根本沒有主配置文件,加上我自己踩過的一些坑,在這篇

    壓測中提高併發數後服務處理時間增長的原因分析

    後端邏輯:  接收請求、分詞匹配、讀redis做一些過濾策略 在不斷提高併發數的壓測過程中,發現一個問題,後臺服務對一個請求的處理時間也在增高, 而且增高的主要時間是消耗在讀redis過濾那塊 過濾那塊的主要邏輯是:從redis中讀資料,並基於這些資料做一些過濾的策略 為什

    node.js服務處理檔案上傳的完整流程

    在本文 微信小程式-從相簿獲取圖片 使用相機拍照 本地圖片上傳之前需要看看 微信小程式-獲取使用者session_key,openid,unionid - 後端為nodejs 程式碼封裝是在上文新增的。 本文知識點: 1、微信小程式選擇圖片wx.chooseImage()介

    [HBase] 服務RPC機制及代碼梳理

    con already oem http ade cer 同比 nging 劃分 基於版本:CDH5.4.2 上述版本較老,但是目前生產上是使用這個版本,所以以此為例。 1. 概要 說明: 客戶端API發送的請求將會被RPCServer的Listen

    SpringMVC原始碼--控制器Handler處理請求過程

          DispatcherServlet類的doDispatch()方法中,真正去處理請求的關鍵步驟是:            HandlerAdapter ha = getHandlerAdapter(mappedHandler.getHandler());  

    服務Http請求Request和response原理解析篇(1)

    什麼是servlet? .Sun(oracle)公司制定的一種用來擴充套件Web伺服器功能的元件規範. 元件:在軟體開發行業,符合一定規範,實現部分功能,並且需要部署到容器中才能執行的軟體模組 容器:符合一定規範,提供元件執行環境的一個程式 什麼是HTTP協議? Hype

    springmvc在處理請求過程中出現異常資訊交由異常處理器進行處理,自定義異常處理器可以實現一個系統的異常處理邏輯。為了區別不同的異常通常根據異常型別自定義異常類,這裡我們建立一個自定義系統異常,如果controller、service、dao丟擲此類異常說明是系統預期處理的異常資訊。

    springmvc在處理請求過程中出現異常資訊交由異常處理器進行處理,自定義異常處理器可以實現一個系統的異常處理邏輯。 1.1 異常處理思路 系統中異常包括兩類:預期異常和執行時異常RuntimeException,前者通過捕獲異常從而獲取異常資訊,後者主要通過規範程式碼開發、測試通過手段減少執

    springmvc在處理請求過程中出現異常信息交由異常處理器進行處理,自定義異常處理器可以實現一個系統的異常處理邏輯。為了區別不同的異常通常根據異常類型自定義異常類,這裏我們創建一個自定義系統異常,如果controller、service、dao拋出此類異常說明是系統預期處理的異常信息。

    ansi req -type this spring 進行 name ext code springmvc在處理請求過程中出現異常信息交由異常處理器進行處理,自定義異常處理器可以實現一個系統的異常處理邏輯。 1.1 異常處理思路 系統中異常包括兩類:預期異常和運行時異常Ru

    nextjs 服務渲染請求引數

    Post.getInitialProps = async function (context) { const { id } = context.query const res = await fetch(`http://api.tvmaze.com/shows/${id}`) const

    分享vue專案的服務渲染學習過程

          最近抽出了點時間,弄了下vue ssr專案,至於ssr的優點就不多提了。學習路線參照了官方例項,有興趣的同學可以去看下。      我的專案地址,主要使用了ssr+typescript+vuex+vue-cli 2.0,

    從客戶服務發起請求,呼叫功能方式:

    1_通過表單向服務端發起請求  2_通過連結向服務端發起請求  3_Ajax向服務端發起請求 <form action="/BaseServlet/ServletDemo?method=addStu" method="post">            使用

    服務處理同步發送小消息的性能上Kafka>RocketMQ>RabbitMQ

    阿裏 ica 阿裏雲 ref %u log rabbit 隊列 -c 在發送小消息的場景中,三個消息中間件的表現區分明顯:Kafka的吞吐量高達17.3w/s,遠超其他兩個產品。這主要取決於它的隊列模式保證了寫磁盤的過程是線性IO。此時broker磁盤IO已達瓶頸。Roc

    Zookeeper(十一)服務處理,客戶回撥

    服務端處理Watcher 上面主要講解了客戶端註冊Watcher的過程,並且已經瞭解了最終客戶端並不會將Watcher物件真正傳遞到服務端。那麼服務端究竟是如何完成客戶端的Watcher註冊,又是如何來處理這個Watcher的呢?後面我們圍繞這兩個問題講解。 ServerC

    Tomcat處理請求過程和重要引數詳解

    Tomcat處理使用者連線請求模式 BIO:阻塞模型 NIO:非阻塞模型 APR:高效能,可擴充套件的模式,Tomcat8版本預設模式 在使用tomcat時,經常會遇到連線數、執行緒數之類的配置問題,要真正理解這些概念,必須先了解Tomcat的聯結器(Connector)。在