1. 程式人生 > 其它 >【Flink系列十二】使用OpenResty 在InfluxDB協議層攔截Flink指標

【Flink系列十二】使用OpenResty 在InfluxDB協議層攔截Flink指標

本文作者 一杯半盞
文章出處:https://www.cnblogs.com/slankka/p/16001705.html

背景

之前的一篇文章【Flink系列二】構建實時計算平臺——特別篇,用InfluxDb收集Flink Metrics
,裡面寫道

Influxdb 1.8,100個作業的情況下, 記憶體佔用峰值會超過20GB,這個時候容器會自動重啟,客戶端無法上報

現在又過了一年多,現在部署於K8s中的Influxdb佔用記憶體經常超過90%,經過檢查,發現有幾個Flink作業,在修改InfluxDB Reporter之前就提交過了,仍然在大量上報指標。

說明解決方案還不夠好。

目標

  • 需要找到一種,可以不重啟使用者作業的方法,直接過濾指標。
  • 因為Flink作業支援Prometheus的指標上報,使用InfluxDB僅收集Checkpoint型別的指標,所以需要過濾其他所有指標。

過程

大約6年前,早在大四的時候,有室友玩過OpenResty,其實就是Nginx™ + Lua™,那個時候便知道了這種技術。

於是花了兩個晚上,下班回家熬夜,再加上白天工作的時候,忙裡抽空 對著Lua 官方文件 + NginxLuaModule鼓搗出來了。

由於InfluxDb官方經常翻,便知道他的文件專門講了協議,幸好是純文字的協議。

HTTP報文如下:

POST /write?db=var1&rp=retentionPolicy&precision=[h/m/s/ms/u/n]&consistency=

Header: BasicAuth
Content-Type: text/plain; charset=UTF-8

Body
<measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]
<measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]
<measurement>[,<tag_key>=<tag_value>[,<tag_key>=<tag_value>]] <field_key>=<field_value>[,<field_key>=<field_value>] [<timestamp>]

知道協議之後,就可以通過HTTP請求體來過濾 measurement,將不需要的指標全部刪除。

方法

使用 OpenResty的 access_by_lua 方法,訪問讀取請求體:

不完全的程式碼

access_by_lua_block {
          ngx.req.read_body()
          local data = ngx.req.get_body_data()
          //...slankka...
          for s in data:gmatch("[^\r\n]+") do
              if string.find(s, "jobmanager_job_lastCheckpointExternalPath") then
                ngx.req.set_body_data(s)
                ngx.log(ngx.ERR, "Captured success:", s)
                break
              end
          end 
          //...slankka...    
        }

驗證

方法一:可通過抓包的方式驗證。

抓包的方法是,tcpdump 直接在InfluxDB所在的伺服器進行抓包。

tcpdump tcp -t -s 0  -c 100 port 8086 and src net 10.11.12.13 -w ./influxdb_slankka_traffic.cap

方法二:通過Influx 客戶端 InfluxQueryLanguage語法

show measurements on slankka;

//應該只看到這一個指標
jobmanager_job_lastCheckpointExternalPath

方法三:通過Influxdb廠家的Chronograf,直接連線到Influxdb進行檢視

上一篇關於InfluxDB的文章【Flink系列二】構建實時計算平臺——特別篇,用InfluxDb收集Flink Metrics
已經有Chronograf的安裝說明,不再贅述。

總結

OpenResty 為終極優化方案,效果極為優異。

除錯Lua,需要多看OpenResty官方文件,最終指標必須完全符合自己的Lua指令碼邏輯,才算完成。