1. 程式人生 > >Nginx + Lua + Kafka + Redis + Mysql

Nginx + Lua + Kafka + Redis + Mysql

寫在開頭

# 系統版本  cat /etc/issue
CentOS release 6.8 (Final)
# 切換到tmp目錄
cd /tmp

安裝 lua

# 下載
wget http://luajit.org/download/LuaJIT-2.0.5.tar.gz
# 解壓
tar zxf LuaJIT-2.0.5.tar.gz

cd LuaJIT-2.0.5
# 編譯
make PREFIX=/usr/local/LuaJIT
# 安裝
make install PREFIX=/usr/local/LuaJIT

# 動態連結庫
echo "/usr/local/LuaJIT/lib" > /etc/ld
.so.conf.d/usr_local_luajit_lib.conf ldconfig # 設定環境變數 export LUAJIT_LIB=/usr/local/LuaJIT/lib export LUAJIT_INC=/usr/local/LuaJIT/include/luajit-2.0

下載lua-nginx-module

# 下載
wget https://codeload.github.com/openresty/lua-nginx-module/tar.gz/v0.10.13
# 解壓
tar zxf v0.10.13

下載ngx_devel_kit

# 下載
wget https://github.com
/simplresty/ngx_devel_kit/archive/v0.3.0.tar.gz # 解壓 tar zxf v0.3.0.tar.gz

下載lua-resty-kafka

# 下載
wget https://github.com/doujiang24/lua-resty-kafka/archive/master.zip
# 解壓
unzip master.zip

# 新建目錄
mkdir /usr/local/lua
# 拷貝
cp -r lua-resty-kafka-master /usr/local/lua/lua-resty-kafka

安裝 pcre

# 下載
wget https://jaist.dl
.sourceforge.net/project/pcre/pcre/8.42/pcre-8.42.tar.bz2 # 解壓 tar -jxf pcre-8.42.tar.bz2 chmod -R 777 pcre-8.42 cd pcre-8.42 # 配置 ./configure # 安裝 make # 安裝 make install # 建立 軟鏈 ln -s /usr/local/lib/libpcre.so.1 /lib64/

安裝openresty

# http://openresty.org/cn/linux-packages.html

yum install pcre-devel openssl-devel gcc curl
sudo yum install yum-utils
sudo yum-config-manager --add-repo https://openresty.org/package/centos/openresty.repo
sudo yum install openresty

拷貝lua-resty-kafka

mkdir /usr/local/openresty/lualib/resty/kafka
cp -R /tmp/lua-resty-kafka-master/lib/resty/kafka/* /usr/local/openresty/lualib/resty/kafka/

安裝nginx

# 下載
wget http://nginx.org/download/nginx-1.9.8.tar.gz
# 解壓
tar zxf nginx-1.9.8.tar.gz

cd nginx-1.9.8

# 修改src/core/ngx_conf_file.c中的NGX_CONF_BUFFER為8192或者更大, 以避免"too long parameter"

# 配置
./configure --prefix=/usr/local/nginx/ --sbin-path=/usr/bin/nginx --with-http_stub_status_module --with-http_ssl_module --with-http_realip_module --add-module=/tmp/lua-nginx-module-0.10.13 --add-module=/tmp/ngx_devel_kit-0.3.0 --with-pcre=/tmp/pcre-8.42 --with-pcre-jit

# 編譯
make

# 安裝
make install

nginx 配置

worker_processes  1;

events {
    worker_connections  1024;
}


http {
    default_type  application/json;

    lua_package_path "/usr/local/openresty/lualib/?.lua;;";
    lua_package_cpath "/usr/local/openresty/lualib/?.so;;";

    resolver 127.0.0.1;

    server {
        listen       8899;

        # 讀取請求體, 跟ngx.req.read_body()函式作用類似
        lua_need_request_body on;

        location / {
            content_by_lua '
                ngx.header.content_type = "text/plain";
                if jit then
                    ngx.say(jit.version)
                else
                    ngx.say(_VERSION)
                end
            ';
        }

        location /api/v1/sdk/import {

            content_by_lua '
                local producer = require "resty.kafka.producer"

                -- 返回體
                function response(code, msg)
                    local resp = {}
                    resp["errcode"] = code
                    resp["errmsg"] = msg
                    ngx.say(cjson.encode(resp))
                end

                local broker_list = {
                    { host = "192.168.1.117", port = 9092}
                }

                local bp = producer:new(broker_list, {producer_type="sync"})

                local ok, err = bp:send("sdk-receive", nil, ngx.var.request_body)
                if not ok then
                    response(10000, "系統錯誤")
                else
                    response(0, "請求成功")
                end
            ';
        }
        error_page   500 502 503 504  /50x.html;
        location = /50x.html {
            root   html;
        }
    }
}

完整示例

worker_processes  1;

events {
    worker_connections  1024;
}

http {
    default_type  application/json;

    lua_package_path "/usr/local/openresty/lualib/?.lua;;";
    lua_package_cpath "/usr/local/openresty/lualib/?.so;;";   

    resolver 127.0.0.1; 

    server {
        listen       8899;

        # 讀取請求體, 跟ngx.req.read_body()函式作用類似
        lua_need_request_body on;

        location /api/v1/sdk/import {

            content_by_lua '

                local cjson = require "cjson"

                -- 返回體
                function response(code, msg)
                    local resp = {}
                    resp["errcode"] = code
                    resp["errmsg"] = msg
                    ngx.say(cjson.encode(resp))
                end

                -- 獲取secret 並校驗非空
                local secret = ngx.req.get_uri_args()["secret"]
                if secret == nil or secret == "" then
                    response(10003, "缺少secret引數")
                    return
                end

                -- 獲取請求引數 並校驗json格式
                local data
                if not xpcall(
                    function()
                        -- 相當於 try
                        data = cjson.decode(ngx.var.request_body)
                    end,
                    function()
                        -- 相當於 catch
                        response(10001,"json資料格式化失敗")
                    end
                ) then
                    return
                end

                -- 根據secret獲取資料來源詳情
                local redis = require("resty.redis")
                local redis_instance = redis:new()
                redis_instance:set_timeout(1000)
                local ok, err = redis_instance:connect("192.168.1.117", 6379)
                if not ok then
                    response(10006, "獲取資料來源資訊失敗")
                    redis_instance:close()
                    return
                end
                redis_instance:select(6)
                local resp, err = redis_instance:get(string.format("sdk_auth_%s",secret))
                if not err then
                    if type(resp) == "string" then
                        local _resp
                        if not xpcall(
                            function()
                                -- 相當於 try
                                _resp = cjson.decode(resp)
                            end,
                            function()
                            end
                        ) then
                            response(10009, "獲取資料來源資訊失敗")
                            return
                        end
                        if not _resp["auth"] then
                            response(10010, "secret已失效")
                            return
                        end
                        data["cid"] = _resp["cid"]
                        data["srcId"] = _resp["sid"]
                    else
                        -- mysql中取配置資訊,並寫redis快取
                        local mysql = require("resty.mysql")
                        local db, err = mysql:new()
                        if not db then
                            response(10007, "獲取資料來源資訊失敗")
                            return
                        end
                        db:set_timeout(1000)
                        local ok, err, errcode, sqlstate = db:connect{
                            host = "192.168.1.117",
                            port = 3306,
                            database = "database_test",
                            user = "root",
                            password = "root",
                            charset = "utf8"
                        }
                        if not ok then
                            response(10008, "獲取資料來源資訊失敗")
                            return
                        end
                        local res, err, errcode, sqlstate = db:query(string.format("select id,cid from sdk where del_flag=0 and secret=\'%s\'",secret))
                        if not res or res[1] == nil then
                            response(10010, "secret不存在或已失效")
                            db:close()
                            return
                        end
                        local _res = res[1]
                        data["srcId"] = _res["id"]
                        data["cid"] = _res["cid"]
                        local auth_data = {
                            auth = true,
                            sid = _res["id"],
                            cid = _res["cid"]
                        }
                        redis_instance:set(string.format("sdk_auth_%s",secret), cjson.encode(auth_data))
                    end
                else
                    response(10009, "獲取資料來源資訊失敗")
                    db:close()
                    return
                end
                data["srcType"] = "sdk"

                local producer = require "resty.kafka.producer"
                local broker_list = {
                    { host = "192.168.1.117", port = 9092 }
                }
                local bp = producer:new(broker_list, {producer_type="sync"})
                local ok, err = bp:send("sdk-receive", nil, cjson.encode(data))

                if not ok then
                    response(10000, "系統錯誤")
                else
                    response(0, "請求成功")
                end
            ';
        }
        error_page   500 502 503 504  /50x.html;
        location = /50x.html {
            root   html;
        }
    }
}