Openresty+Lua+Kafka實現日誌實時採集
簡介
在很多資料採集場景下,Flume作為一個高效能採集日誌的工具,相信大家都知道它。許多人想起Flume這個元件能聯想到的大多數都是Flume跟Kafka相結合進行日誌的採集,這種方案有很多他的優點,比如高效能、高吞吐、資料可靠性等。但是我們如果要求對日誌進行實時的採集,這顯然不是一個好的解決方案。原因如下:
就目前來說,Flume能支援實時監控一個目錄的資料檔案,一旦對某個目錄的檔案採集完成,就會打上completed的標誌,若之後再有資料進入這個檔案中,Flume則不會檢測到。
所以,我們更多的是使用這種方案進行定時採集,只要有一個新的資料目錄產生,我們就採集這個目錄下的資料檔案。
那麼接下來本篇文章將為大家介紹基於Openresty+Lua+Kafka對日誌進行實時的採集。
需求
很多時候,我們需要對使用者的埋點資料進行一個實時的採集,然後用這些資料對使用者的行為做一些實時的分析。所以,第一步當然是先解決怎樣對資料進行實時的採集。
這裡我們用到的方案是Openresty+Lua+Kafka。
原理介紹
那麼什麼是Openresty呢?這裡引用官方的一段話:
OpenResty是一個基於Nginx與Lua的高效能Web平臺,其內部集成了大量精良的Lua庫、第三方模組以及大多數的依賴項。用於方便地搭建能夠處理超高併發、擴充套件性極高的動態 Web 應用、Web 服務和動態閘道器。
OpenResty通過匯聚各種設計精良的Nginx模組,從而將Nginx有效地變成一個強大的通用Web應用平臺。這樣,Web開發人員和系統工程師可以使用Lu 指令碼語言調動Nginx支援的各種C以及Lua模組,快速構造出足以勝任10K乃至1000 以上單機併發連線的高效能Web應用系統。
OpenResty的目標是讓你的Web服務直接跑在Nginx服務內部,充分利用Nginx的非阻塞 I/O 模型,不僅僅對 HTTP 客戶端請求,甚至於對遠端後端諸如MySQL、PostgreSQL、Memcached 以及 Redis 等都進行一致的高效能響應。
簡單來說,就是將客戶端的請求(本文指的是使用者的行為日誌)通過Nginx把使用者的資料投遞到我們指定的地方(Kafka),而為了實現這個需求,我們用到了Lua指令碼,因為Openresty封裝了各種Lua模組,其中有一個模組就是對Kafka模組進行了分裝,我們只需要寫一個簡單的指令碼就可以將使用者的資料通過Nginx轉發到Kafka中,以便後續對資料進行消費。
這裡給出一張架構圖,方便大家理解:
在這裡簡單總結一下使用Openresty+Lua+Kafka的優點:
1.支援多種業務資料,不同的業務資料,只需要配置不同的Lua指令碼,就可以將不同的業務資料傳送到Kafka不同的topic中。
2.對使用者觸發的埋點資料進行實時的採集
3.高可靠的叢集,Openresty由於是基於Nginx,其叢集擁有非常高的效能和穩定性。
4.高併發,相比tomcat、apache等web伺服器,Nginx的併發量遠遠高於其他兩種。正常情況下處理上萬的併發量都不是什麼難事。
那麼接下來我們就動手實操一下。
Openresty的安裝
本例項採用的單機部署形式,當單機部署成功了之後,叢集的搭建跟單機一樣,只是在不同的機器上執行相同的步驟而已。
注:本實驗基於centos7.0作業系統
1.下載Openresty依賴:
yum install readline-devel pcre-devel openssl-devel gcc
2.編譯安裝Openresty:
#1.安裝openresty: mkdir /opt/software mkdir /opt/module cd /opt/software/ # 安裝檔案所在目錄 wget https://openresty.org/download/openresty-1.9.7.4.tar.gz tar -xzf openresty-1.9.7.4.tar.gz -C /opt/module/ cd /opt/module/openresty-1.9.7.4 #2.配置: # 指定目錄為/opt/openresty,預設在/usr/local。 ./configure --prefix=/opt/openresty \ --with-luajit \ --without-http_redis2_module \ --with-http_iconv_module make make install
3.安裝lua-resty-kafka
因為我們需要將資料通過nginx+lua指令碼轉發到Kafka中,編寫lua指令碼時需要用到lua模組中的一些關於Kafka的依賴。
#下載lua-resty-kafka: cd /opt/software/ wget https://github.com/doujiang24/lua-resty-kafka/archive/master.zip unzip master.zip -d /opt/module/ #拷貝kafka相關依賴指令碼到openresty cp -rf /opt/module/lua-resty-kafka-master/lib/resty/kafka/ /opt/openresty/lualib/resty/
注:由於kafka大家都比較熟知,這裡就不介紹它的安裝了。
Openresty安裝完成之後目錄結構如下:
drwxr-xr-x 2 root root 4096 Mar 24 14:26 bin drwxr-xr-x 6 root root 4096 Mar 24 14:26 luajit drwxr-xr-x 7 root root 4096 Mar 24 14:29 lualib drwxr-xr-x 12 root root 4096 Mar 24 14:40 nginx
4.配置檔案
編輯/opt/openresty/nginx/conf/nginx.conf
user nginx; #Linux的使用者 worker_processes auto; worker_rlimit_nofile 100000; #error_log logs/error.log; #error_log logs/error.log notice; #error_log logs/error.log info; #pid logs/nginx.pid; events { worker_connections 102400; multi_accept on; use epoll; } http { include mime.types; default_type application/octet-stream; log_format main '$remote_addr - $remote_user [$time_local] "$request" ' '$status $body_bytes_sent "$http_referer" ' '"$http_user_agent" "$http_x_forwarded_for"'; access_log /var/log/nginx/access.log main; resolver 8.8.8.8; #resolver 127.0.0.1 valid=3600s; sendfile on; keepalive_timeout 65; underscores_in_headers on; gzip on; include /opt/openresty/nginx/conf/conf.d/common.conf; #common.conf這個檔名字可自定義 }
編輯 /opt/openresty/nginx/conf/conf.d/common.conf
##api lua_package_path "/opt/openresty/lualib/resty/kafka/?.lua;;"; lua_package_cpath "/opt/openresty/lualib/?.so;;"; lua_shared_dict ngx_cache 128m; # cache lua_shared_dict cache_lock 100k; # lock for cache server { listen 8887; #監聽埠 server_name 192.168.3.215; #埋點日誌的ip地址或域名,多個域名之間用空格分開 root html; #root指令用於指定虛擬主機的網頁根目錄,這個目錄可以是相對路徑,也可以是絕對路徑。 lua_need_request_body on; #開啟獲取訊息體的開關,以便能獲取到訊息體 access_log /var/log/nginx/message.access.log main; error_log /var/log/nginx/message.error.log notice; location = /lzp/message { lua_code_cache on; charset utf-8; default_type 'application/json'; content_by_lua_file "/opt/openresty/nginx/lua/testMessage_kafka.lua";#引用的lua指令碼 } }
編輯 /opt/openresty/nginx/lua/testMessage_kafka.lua
#建立目錄mkdir /opt/openresty/nginx/lua/ vim /opt/openresty/nginx/lua/testMessage_kafka.lua
#編輯記憶體如下:
-- require需要resty.kafka.producer的lua指令碼,沒有會報錯 local producer = require("resty.kafka.producer") -- kafka的叢集資訊,單機也是可以的 local broker_list = { {host = "192.168.3.215", port = 9092}, } -- 定義最終kafka接受到的資料是怎樣的json格式 local log_json = {} --增加read_body之後即可獲取到訊息體,預設情況下可能會是nil log_json["body"] = ngx.req.read_body() log_json["body_data"] = ngx.req.get_body_data() -- 定義kafka同步生產者,也可設定為非同步 async -- -- 注意!!!當設定為非同步時,在測試環境需要修改batch_num,預設是200條,若大不到200條kafka端接受不到訊息 -- -- encode()將log_json日誌轉換為字串 -- -- 傳送日誌訊息,send配套之第一個引數topic: -- -- 傳送日誌訊息,send配套之第二個引數key,用於kafka路由控制: -- -- key為nill(空)時,一段時間向同一partition寫入資料 -- -- 指定key,按照key的hash寫入到對應的partition -- -- batch_num修改為1方便測試 local bp = producer:new(broker_list, { producer_type = "async",batch_num = 1 }) -- local bp = producer:new(broker_list) local cjson = require("cjson.safe") local sendMsg = cjson.encode(log_json) local ok, err = bp:send("testMessage",nil, sendMsg) if not ok then ngx.log(ngx.ERR, 'kafka send err:', err) elseif ok then ngx.say("the message send successful") else ngx.say("未知錯誤") end
5.啟動服務執行:
useradd nginx #建立使用者 passwd nginx #設定密碼 #設定openresty的所有者nginx chown -R nginx:nginx /opt/openresty/ #啟動服務 cd /opt/openresty/nginx/sbin ./nginx -c /opt/openresty/nginx/conf/nginx.conf 檢視服務: ps -aux | grep nginx nginx 2351 0.0 0.1 231052 46444 ? S Mar30 0:33 nginx: worker process nginx 2352 0.0 0.1 233396 48540 ? S Mar30 0:35 nginx: worker process nginx 2353 0.0 0.1 233396 48536 ? S Mar30 0:33 nginx: worker process nginx 2354 0.0 0.1 232224 47464 ? S Mar30 0:34 nginx: worker process nginx 2355 0.0 0.1 231052 46404 ? S Mar30 0:33 nginx: worker process nginx 2356 0.0 0.1 232224 47460 ? S Mar30 0:34 nginx: worker process nginx 2357 0.0 0.1 231052 46404 ? S Mar30 0:34 nginx: worker process nginx 2358 0.0 0.1 232224 47484 ? S Mar30 0:34 nginx: worker process root 7009 0.0 0.0 185492 2516 ? Ss Mar24 0:00 nginx: master process ./nginx -c /opt/openresty/nginx/conf/nginx.conf 檢視埠: netstat -anput | grep 8887 tcp 0 0 0.0.0.0:8887 0.0.0.0:* LISTEN 2351/nginx: worke
看到以上程序,證明服務已正常執行
6.使用postman,傳送post請求進行簡單的測試,檢視kafka是否能否接受到資料
7.kafka消費資料:
kafka-console-consumer --bootstrap-server 192.168.3.215:9092 --topic testMessage --from-beginning
若消費到資料,則證明配置成功,若未調通可檢視/var/log/nginx/message.access.log和/var/log/nginx/message.error.log相關錯誤日誌進行調整
總結
使用Openresty+Lua+Kafka就可以將使用者的埋點資料實時採集到kafka叢集中,並且Openresty是基於Nginx的,而Nginx能處理上萬的併發量,所以即使使用者的資料在短時間內激增,這套架構也能輕鬆的應對,不會導致叢集崩潰。另一方面,若資料過多導致叢集的超負荷,我們也可以隨時加多一臺機器,非常方便。
另外一個小小的拓展:若業務資料非常多,需要傳送到不同的topic中,我們也不用編寫多個指令碼,而是可以聯絡後端在json格式裡面加一個欄位,這個欄位的值就是topic的名稱。我們只需要編寫一個通用指令碼,解析Json資料將topic名稱拿出來就可以了。
&n