1. 程式人生 > >Openresty+Lua+Kafka實現日誌實時採集

Openresty+Lua+Kafka實現日誌實時採集

簡介

  在很多資料採集場景下,Flume作為一個高效能採集日誌的工具,相信大家都知道它。許多人想起Flume這個元件能聯想到的大多數都是Flume跟Kafka相結合進行日誌的採集,這種方案有很多他的優點,比如高效能、高吞吐、資料可靠性等。但是我們如果要求對日誌進行實時的採集,這顯然不是一個好的解決方案。原因如下:

  就目前來說,Flume能支援實時監控一個目錄的資料檔案,一旦對某個目錄的檔案採集完成,就會打上completed的標誌,若之後再有資料進入這個檔案中,Flume則不會檢測到。

  所以,我們更多的是使用這種方案進行定時採集,只要有一個新的資料目錄產生,我們就採集這個目錄下的資料檔案。

  那麼接下來本篇文章將為大家介紹基於Openresty+Lua+Kafka對日誌進行實時的採集。

需求

  很多時候,我們需要對使用者的埋點資料進行一個實時的採集,然後用這些資料對使用者的行為做一些實時的分析。所以,第一步當然是先解決怎樣對資料進行實時的採集。

  這裡我們用到的方案是Openresty+Lua+Kafka。

原理介紹

  那麼什麼是Openresty呢?這裡引用官方的一段話: 

  OpenResty是一個基於Nginx與Lua的高效能Web平臺,其內部集成了大量精良的Lua庫、第三方模組以及大多數的依賴項。用於方便地搭建能夠處理超高併發、擴充套件性極高的動態 Web 應用、Web 服務和動態閘道器。
  OpenResty通過匯聚各種設計精良的Nginx模組,從而將Nginx有效地變成一個強大的通用Web應用平臺。這樣,Web開發人員和系統工程師可以使用Lu 指令碼語言調動Nginx支援的各種C以及Lua模組,快速構造出足以勝任10K乃至1000 以上單機併發連線的高效能Web應用系統。
  OpenResty的目標是讓你的Web服務直接跑在Nginx服務內部,充分利用Nginx的非阻塞 I/O 模型,不僅僅對 HTTP 客戶端請求,甚至於對遠端後端諸如MySQL、PostgreSQL、Memcached 以及 Redis 等都進行一致的高效能響應。

  簡單來說,就是將客戶端的請求(本文指的是使用者的行為日誌)通過Nginx把使用者的資料投遞到我們指定的地方(Kafka),而為了實現這個需求,我們用到了Lua指令碼,因為Openresty封裝了各種Lua模組,其中有一個模組就是對Kafka模組進行了分裝,我們只需要寫一個簡單的指令碼就可以將使用者的資料通過Nginx轉發到Kafka中,以便後續對資料進行消費。

  這裡給出一張架構圖,方便大家理解:

 

  

  在這裡簡單總結一下使用Openresty+Lua+Kafka的優點:

    1.支援多種業務資料,不同的業務資料,只需要配置不同的Lua指令碼,就可以將不同的業務資料傳送到Kafka不同的topic中。

    2.對使用者觸發的埋點資料進行實時的採集

    3.高可靠的叢集,Openresty由於是基於Nginx,其叢集擁有非常高的效能和穩定性。

    4.高併發,相比tomcat、apache等web伺服器,Nginx的併發量遠遠高於其他兩種。正常情況下處理上萬的併發量都不是什麼難事。

  那麼接下來我們就動手實操一下。

Openresty的安裝

本例項採用的單機部署形式,當單機部署成功了之後,叢集的搭建跟單機一樣,只是在不同的機器上執行相同的步驟而已。

注:本實驗基於centos7.0作業系統

1.下載Openresty依賴:

yum install readline-devel pcre-devel openssl-devel gcc 

2.編譯安裝Openresty:

#1.安裝openresty: 
mkdir /opt/software 
mkdir /opt/module
cd /opt/software/ # 安裝檔案所在目錄  
wget https://openresty.org/download/openresty-1.9.7.4.tar.gz  
tar -xzf openresty-1.9.7.4.tar.gz -C /opt/module/
cd /opt/module/openresty-1.9.7.4 
#2.配置:  
# 指定目錄為/opt/openresty,預設在/usr/local。  
./configure --prefix=/opt/openresty \  
            --with-luajit \  
            --without-http_redis2_module \  
            --with-http_iconv_module  
make  
make install  

3.安裝lua-resty-kafka

因為我們需要將資料通過nginx+lua指令碼轉發到Kafka中,編寫lua指令碼時需要用到lua模組中的一些關於Kafka的依賴。

#下載lua-resty-kafka:
cd /opt/software/  
wget https://github.com/doujiang24/lua-resty-kafka/archive/master.zip  
unzip master.zip -d /opt/module/  
    
#拷貝kafka相關依賴指令碼到openresty  
cp -rf /opt/module/lua-resty-kafka-master/lib/resty/kafka/ /opt/openresty/lualib/resty/

 注:由於kafka大家都比較熟知,這裡就不介紹它的安裝了。

Openresty安裝完成之後目錄結構如下:

drwxr-xr-x  2 root root 4096 Mar 24 14:26 bin
drwxr-xr-x  6 root root 4096 Mar 24 14:26 luajit
drwxr-xr-x  7 root root 4096 Mar 24 14:29 lualib
drwxr-xr-x 12 root root 4096 Mar 24 14:40 nginx

4.配置檔案

編輯/opt/openresty/nginx/conf/nginx.conf

user  nginx;  #Linux的使用者
worker_processes  auto;
worker_rlimit_nofile 100000;

#error_log  logs/error.log;
#error_log  logs/error.log  notice;
#error_log  logs/error.log  info;

#pid        logs/nginx.pid;

events {
    worker_connections  102400;
    multi_accept on;
    use epoll;
}


http {
    include       mime.types;
    default_type  application/octet-stream;

    log_format  main  '$remote_addr - $remote_user [$time_local] "$request" '
                      '$status $body_bytes_sent "$http_referer" '
                      '"$http_user_agent" "$http_x_forwarded_for"';

    access_log  /var/log/nginx/access.log  main;

    resolver 8.8.8.8;
    #resolver 127.0.0.1 valid=3600s;

    sendfile        on;

    keepalive_timeout  65;

    underscores_in_headers on;

    gzip  on;

    include /opt/openresty/nginx/conf/conf.d/common.conf; #common.conf這個檔名字可自定義

}

 編輯 /opt/openresty/nginx/conf/conf.d/common.conf

##api
lua_package_path "/opt/openresty/lualib/resty/kafka/?.lua;;";
lua_package_cpath "/opt/openresty/lualib/?.so;;";

lua_shared_dict ngx_cache 128m;  # cache
lua_shared_dict cache_lock 100k; # lock for cache

server {
    listen       8887; #監聽埠
    server_name  192.168.3.215; #埋點日誌的ip地址或域名,多個域名之間用空格分開
    root         html; #root指令用於指定虛擬主機的網頁根目錄,這個目錄可以是相對路徑,也可以是絕對路徑。
    lua_need_request_body on; #開啟獲取訊息體的開關,以便能獲取到訊息體

    access_log /var/log/nginx/message.access.log  main;
    error_log  /var/log/nginx/message.error.log  notice;

    location = /lzp/message {
        lua_code_cache on;
        charset utf-8;
        default_type 'application/json';
        content_by_lua_file "/opt/openresty/nginx/lua/testMessage_kafka.lua";#引用的lua指令碼
    }
}

 編輯 /opt/openresty/nginx/lua/testMessage_kafka.lua

#建立目錄mkdir /opt/openresty/nginx/lua/
vim /opt/openresty/nginx/lua/testMessage_kafka.lua
#編輯記憶體如下:
-- require需要resty.kafka.producer的lua指令碼,沒有會報錯
local producer = require("resty.kafka.producer")

-- kafka的叢集資訊,單機也是可以的
local broker_list = {
    {host = "192.168.3.215", port = 9092},
}

-- 定義最終kafka接受到的資料是怎樣的json格式
local log_json = {}
--增加read_body之後即可獲取到訊息體,預設情況下可能會是nil
log_json["body"] = ngx.req.read_body()
log_json["body_data"] = ngx.req.get_body_data()

-- 定義kafka同步生產者,也可設定為非同步 async
-- -- 注意!!!當設定為非同步時,在測試環境需要修改batch_num,預設是200條,若大不到200條kafka端接受不到訊息
-- -- encode()將log_json日誌轉換為字串
-- -- 傳送日誌訊息,send配套之第一個引數topic:
-- -- 傳送日誌訊息,send配套之第二個引數key,用於kafka路由控制:
-- -- key為nill(空)時,一段時間向同一partition寫入資料
-- -- 指定key,按照key的hash寫入到對應的partition

-- -- batch_num修改為1方便測試
local bp = producer:new(broker_list, { producer_type = "async",batch_num = 1 })
-- local bp = producer:new(broker_list)

local cjson = require("cjson.safe")
local sendMsg = cjson.encode(log_json)
local ok, err = bp:send("testMessage",nil, sendMsg)
if not ok then
   ngx.log(ngx.ERR, 'kafka send err:', err)
elseif ok then
   ngx.say("the message send successful")
else
   ngx.say("未知錯誤")
end

5.啟動服務執行:

useradd nginx #建立使用者
passwd nginx #設定密碼

#設定openresty的所有者nginx
chown -R nginx:nginx /opt/openresty/

#啟動服務
cd /opt/openresty/nginx/sbin
./nginx -c /opt/openresty/nginx/conf/nginx.conf

檢視服務:
ps -aux | grep nginx
nginx     2351  0.0  0.1 231052 46444 ?        S    Mar30   0:33 nginx: worker process
nginx     2352  0.0  0.1 233396 48540 ?        S    Mar30   0:35 nginx: worker process
nginx     2353  0.0  0.1 233396 48536 ?        S    Mar30   0:33 nginx: worker process
nginx     2354  0.0  0.1 232224 47464 ?        S    Mar30   0:34 nginx: worker process
nginx     2355  0.0  0.1 231052 46404 ?        S    Mar30   0:33 nginx: worker process
nginx     2356  0.0  0.1 232224 47460 ?        S    Mar30   0:34 nginx: worker process
nginx     2357  0.0  0.1 231052 46404 ?        S    Mar30   0:34 nginx: worker process
nginx     2358  0.0  0.1 232224 47484 ?        S    Mar30   0:34 nginx: worker process
root      7009  0.0  0.0 185492  2516 ?        Ss   Mar24   0:00 nginx: master process ./nginx -c /opt/openresty/nginx/conf/nginx.conf


檢視埠:
netstat -anput | grep 8887
tcp        0      0 0.0.0.0:8887            0.0.0.0:*               LISTEN      2351/nginx: worke

看到以上程序,證明服務已正常執行

6.使用postman,傳送post請求進行簡單的測試,檢視kafka是否能否接受到資料

 

 

 7.kafka消費資料:

kafka-console-consumer --bootstrap-server 192.168.3.215:9092 --topic testMessage --from-beginning

若消費到資料,則證明配置成功,若未調通可檢視/var/log/nginx/message.access.log和/var/log/nginx/message.error.log相關錯誤日誌進行調整

總結

  使用Openresty+Lua+Kafka就可以將使用者的埋點資料實時採集到kafka叢集中,並且Openresty是基於Nginx的,而Nginx能處理上萬的併發量,所以即使使用者的資料在短時間內激增,這套架構也能輕鬆的應對,不會導致叢集崩潰。另一方面,若資料過多導致叢集的超負荷,我們也可以隨時加多一臺機器,非常方便。

  另外一個小小的拓展:若業務資料非常多,需要傳送到不同的topic中,我們也不用編寫多個指令碼,而是可以聯絡後端在json格式裡面加一個欄位,這個欄位的值就是topic的名稱。我們只需要編寫一個通用指令碼,解析Json資料將topic名稱拿出來就可以了。

&n