lua+canal+oepnresty實現入口網站高併發訪問
首頁門戶系統需要展示各種各樣的廣告資料,但是通常情況下,首頁(門戶系統的流量一般非常的高)不適合直接通過mysql資料庫直接訪問的方式來獲取展示。
如下思路:
1.首先訪問nginx ,我們可以採用快取的方式,先從nginx本地快取中獲取,獲取到直接響應
2.如果沒有獲取到,再次訪問redis,我們可以從redis中獲取資料,如果有 則返回,並快取到nginx中
3.如果沒有獲取到,再次訪問mysql,我們從mysql中獲取資料,再將資料儲存到redis中,返回。
而這裡面,我們都可以使用LUA指令碼嵌入到程式中執行這些查詢相關的業務。
因為tomcat的併發承受量相對而言,有可能承受不住這種高併發場景,所以整個獲取資料步驟都不用java實現,因為只要這個步驟中出現java程式碼實現,就需要tomcat,這樣就限制住了併發量上限,一切都毫無意義。
Lua
Lua 是一種輕量小巧的指令碼語言,用標準C語言編寫並以原始碼形式開放, 其設計目的是為了嵌入應用程式中,從而為應用程式提供靈活的擴充套件和定製功能。
curl -R -O http://www.lua.org/ftp/lua-5.3.5.tar.gz tar zxf lua-5.3.5.tar.gz cd lua-5.3.5 make linux test
注意:此時安裝,有可能會出現如下錯誤:
此時需要安裝lua相關依賴庫的支援,執行如下命令即可:
yum install libtermcap-devel ncurses-devel libevent-devel readline-devel
此時再執行lua命令測試看lua是否安裝成功
LUA的基本語法(跟java語法整體上格式差不多,但又不一樣)
(1)互動式程式設計
Lua 提供了互動式程式設計模式。我們可以在命令列中輸入程式並立即檢視效果。
Lua 互動式程式設計模式可以通過命令 lua -i 或 lua 來啟用:
(2)指令碼式程式設計
我們可以將 Lua 程式程式碼保持到一個以 lua 結尾的檔案,並執行,該模式稱為指令碼式程式設計,例如上面入門程式中將lua語法寫到hello.lua檔案中。
註釋:--行註釋:兩個減號是單行註釋:
變數
全域性變數,預設的情況下,定義一個變數都是全域性變數,
如果要用區域性變數 需要宣告為local
如果變數沒有初始化:則 它的值為nil 這和java中的null不同。
模組
模組類似於一個封裝庫,從 Lua 5.1 開始,Lua 加入了標準的模組管理機制,可以把一些公用的程式碼放在一個檔案裡,以 API 介面的形式在其他地方呼叫,有利於程式碼的重用和降低程式碼耦合度。(類似於java的jar包,寫完程式碼後,可以打成jar包供其他人依賴使用,lua中是寫一個lua指令碼,然後其他地方想要使用,直接匯入這個寫好的指令碼(模組) 也可以理解成java中的封裝)
lua的使用在菜鳥教程中有,直接搜尋菜鳥教程lua即可,非常全面,這裡就不再贅述了。
OpenResty介紹
OpenResty(又稱:ngx_openresty) 是一個基於 nginx的可伸縮的 Web 平臺,由中國人章亦春發起,提供了很多高質量的第三方模組。
OpenResty 簡單理解成 就相當於封裝了nginx,並且集成了LUA指令碼,開發人員只需要簡單的其提供了模組就可以實現相關的邏輯,而不再像之前,還需要在nginx中自己編寫lua的指令碼,再進行呼叫了。
安裝openresty
centos安裝openresty:
1.新增倉庫執行命令
yum install yum-utils yum-config-manager --add-repo https://openresty.org/package/centos/openresty.repo
2.執行安裝
yum install openresty
3.安裝成功後 會在預設的目錄如下:
/usr/local/openresty
cd /usr/local/openresty/nginx/conf
vi nginx.conf
在第一行新增程式碼如下:
配置改完後,需要去指定目錄啟動nginx:
cd /usr/local/openresty/nginx/sbin 啟動命令:./nginx 可以使用 ps -ef|grep nginx檢視已經啟動的nginx
//埠預設是80,可以去nginx配置檔案中修改
http://{ip地址}/
ngx.header.content_type="application/json;charset=utf8" local uri_args = ngx.req.get_uri_args(); local id = uri_args["id"]; --獲取本地快取物件 local cache_ngx = ngx.shared.dis_cache; --根據ID 獲取本地快取資料 local contentCache = cache_ngx:get('content_cache_'..id); --判斷,如果openresty本地沒有自己想要的快取資料的話 if contentCache == "" or contentCache == nil then --則連線redis查詢 local redis = require("resty.redis"); local red = redis:new() red:set_timeout(2000) --傳入連線redis的ip和埠 red:connect("ip", prot) --根據ID獲取redis中的資料 這裡的 ..(兩個點) 表示拼接 local rescontent=red:get("content_"..id); --如果redis中沒有查到資料,則…… if ngx.null == rescontent then --連線mysql local cjson = require("cjson"); local mysql = require("resty.mysql"); local db = mysql:new(); db:set_timeout(2000) --下面的database表示要連線的資料庫 庫名 local props = { host = "127.0.0.1", port = 3306, database = "xxxx", user = "root", password = "haozigg" } local res = db:connect(props); local select_sql = "select url,pic from tb_content where status ='1' and category_id="..id.." order by sort_order"; res = db:query(select_sql); --使用json格式 local responsejson = cjson.encode(res); --將查詢到的資料往redis中儲存一份,並返回資料 red:set("content_"..id,responsejson); ngx.say(responsejson); db:close() else --如果redis中有資料,則往openresty中儲存一份,並返回資料 cache_ngx:set('content_cache_'..id, rescontent, 10*60); ngx.say(rescontent) end red:close() else --如果openresty本地有資料的話,直接返回資料 ngx.say(contentCache) end
2、修改nginx配置檔案,指定到上面編寫的lua指令碼路徑
修改/usr/local/openresty/nginx/conf/nginx.conf檔案: 新增頭資訊,和 location資訊
user root root; #user nobody; worker_processes 1; #error_log logs/error.log; #error_log logs/error.log notice; #error_log logs/error.log info; #pid logs/nginx.pid; events { worker_connections 1024; } http { include mime.types; default_type application/octet-stream; #log_format main '$remote_addr - $remote_user [$time_local] "$request" ' # '$status $body_bytes_sent "$http_referer" ' # '"$http_user_agent" "$http_x_forwarded_for"'; #access_log logs/access.log main; sendfile on; #tcp_nopush on; #keepalive_timeout 0; keepalive_timeout 65; #gzip on; #配置nginx的快取物件,分配記憶體空間(多大記憶體)快取物件名稱為dis_cache 空間大小為50M lua_shared_dict dis_cache 50m; #限流設定規則 用ip的方式,每次請求會儲存ip,容量為10m,儲存ip的容器名為contentRateLimit,當容量滿後,其他請求無效。速率為2r每秒 limit_req_zone $binary_remote_addr zone=contentRateLimit:10m rate=2r/s; #設定客戶端ip與伺服器的連線數的計數容器(儲存ip計數) 容器名為perip limit_conn_zone $binary_remote_addr zone=perip:1m; #限制與伺服器的總連線數的計數容器為1m,容器名為perserver limit_conn_zone $server_name zone=perserver:1m; server { listen 80; server_name localhost; location / { limit_conn perip 10;#單個客戶端ip與伺服器的連線數. limit_conn perserver 100; #限制與伺服器的總連線數 root html; index index.html index.htm; } #update_content請求過來後,由openresty接收到並轉發到指定lua指令碼處理該請求 location /update_content{ content_by_lua_file /root/lua/update_content.lua; } #read_content 另一個介面轉發 location /read_content{ content_by_lua_file /root/lua/read_content.lua; #設定該介面使用上面設定的限流規則 limit_req zone=contentRateLimit burst=4 nodelay; } } # another virtual host using mix of IP-, name-, and port-based configuration # #server { # listen 8000; # listen somename:8080; # server_name somename alias another.alias; # location / { # root html; # index index.html index.htm; # } #} # HTTPS server # #server { # listen 443 ssl; # server_name localhost; # ssl_certificate cert.pem; # ssl_certificate_key cert.key; # ssl_session_cache shared:SSL:1m; # ssl_session_timeout 5m; # ssl_ciphers HIGH:!aNULL:!MD5; # ssl_prefer_server_ciphers on; # location / { # root html; # index index.html index.htm; # } #} }
以上配置就做完了,重新載入nginx配置檔案即可測試:
cd /usr/local/openresty/nginx/sbin
重新載入配置檔案命令:./nginx -s reload
另外,如果測試不成功,可以去看看日誌:
cd /usr/local/openresty/nginx/logs
上面的配置中,配置了openresty+lua的環境,以及配置了nginx轉發請求到編寫的lua指令碼執行(從nginx中獲取快取,有則返回,沒有則查redis,redis中有則返回並往nginx快取中新增資料,沒有則查mysql)
此外,還做了限流的設定,如下解釋說明:
一般情況下,首頁的併發量是比較大的,即使 有了多級快取,當用戶不停的重新整理頁面的時候,也是沒有必要的,另外如果有惡意的請求 大量達到,也會對系統造成影響。而限流就是保護措施之一。
nginx提供兩種限流的方式:
-
-
一是控制速率
-
二是控制併發連線數
-
控制速率
控制速率的方式之一就是採用漏桶演算法。 漏桶演算法實現控制速率限流
配置示意圖如下:
配置說明:
binary_remote_addr 是一種key,表示基於 remote_addr(客戶端IP) 來做限流,binary_ 的目的是壓縮記憶體佔用量。 zone:定義共享記憶體區來儲存訪問資訊, contentRateLimit:10m 表示一個大小為10M,名字為contentRateLimit的記憶體區域。
1M能儲存16000 IP地址的訪問資訊,10M可以儲存16W IP地址訪問資訊。 rate:用於設定最大訪問速率,rate=10r/s 表示每秒最多處理10個請求。Nginx 實際上以毫秒為粒度來跟蹤請求資訊,
因此 10r/s 實際上是限制:每100毫秒處理一個請求。這意味著,自上一個請求處理完後,若後續100毫秒內又有請求到達,
將拒絕處理該請求.我們這裡設定成2 方便測試。
處理突發流量
burst 譯為突發、爆發,表示在超過設定的處理速率後能額外處理的請求數,當 rate=10r/s 時,將1s拆成10份,即每100ms可處理1個請求。
此處,burst=4 ,若同時有4個請求到達,Nginx 會處理第一個請求,剩餘3個請求將放入佇列,然後每隔500ms從佇列中獲取一個請求進行處理。若請求數大於4,將拒絕處理多餘的請求,直接返回503.
不過,單獨使用 burst 引數並不實用。假設 burst=50 ,rate依然為10r/s,排隊中的50個請求雖然每100ms會處理一個,但第50個請求卻需要等待 50 * 100ms即 5s,這麼長的處理時間自然難以接受。
因此,burst 往往結合 nodelay 一起使用。
例如:如下配置:
server { listen 80; server_name localhost; location /update_content { content_by_lua_file /root/lua/update_content.lua; } location /read_content { limit_req zone=contentRateLimit burst=4 nodelay; content_by_lua_file /root/lua/read_content.lua; } }
如上表示:
如上兩種配置結合就達到了速率穩定,但突然流量也能正常處理的效果。完整配置程式碼在最開始上面的配置中就有。
測試:如下圖 在1秒鐘之內可以重新整理4次,正常處理。
但是超過之後,連續重新整理5次,丟擲異常。
控制併發量(連線數)
ngx_http_limit_conn_module 提供了限制連線數的能力。主要是利用limit_conn_zone和limit_conn兩個指令。
利用連線數限制 某一個使用者的ip連線的數量來控制流量。
注意:並非所有連線都被計算在內 只有當伺服器正在處理請求並且已經讀取了整個請求頭時,才會計算有效連線。此處忽略測試。
配置語法:
Syntax: limit_conn zone number;
Default: —;
Context: http, server, location;
如下,配置如下:
limit_conn_zone $binary_remote_addr zone=addr:10m; 表示限制根據使用者的IP地址來顯示,設定儲存地址為的記憶體大小10M
limit_conn addr 2; 表示 同一個地址只允許連線2次。
此時開3個執行緒,測試的時候會發生異常,開2個就不會有異常
limit_conn_zone $binary_remote_addr zone=perip:10m; limit_conn_zone $server_name zone=perserver:10m; server { listen 80; server_name localhost; charset utf-8; location / { limit_conn perip 10;#單個客戶端ip與伺服器的連線數. limit_conn perserver 100; #限制與伺服器的總連線數 root html; index index.html index.htm; } }
canal可以用來監控資料庫資料的變化,從而獲得新增資料,或者修改的資料。
canal是應阿里巴巴存在杭州和美國的雙機房部署,存在跨機房同步的業務需求而提出的。
阿里系公司開始逐步的嘗試基於資料庫的日誌解析,獲取增量變更進行同步,由此衍生出了增量訂閱&消費的業務。
在上述流程中,使用openresty接收前端請求,並轉發lua處理(多級快取),但此時是有問題的,當專案的運營人員更改了資料時,redis以及openresty快取中的資料就變成垃圾資料了,也就是說,資料存在不一致的情況。
但此時因為並沒有java程式碼去幹涉這個獲取資料的流程,無法更新資料。
所以此時,解決方案有三,第一是設定快取的過期時間,上面的配置中,openresty就設定了過期時間,當快取時間過期後,自然會去資料庫中獲取最新的,redis也一樣,可以設定過期時間,雖然不能實時更新資料,但也勉強能做到資料更新。
方案二則是,openresty不變,用過期時間,但在java專案中,如果改變了資料,則多一個往redis中再次儲存 一次資料,這樣簡單方便就實現了redis的同步,當openresy的快取過期後自然會去redis中獲取最新的資料。
方案三:使用canal實現資料實時同步到redis中,雖然應用在這裡,我覺得有點多次一舉,因為饒了這麼一大圈,整了這麼多環境,其實最終用起來的實現效果跟方案二是一樣的,不過作為學習來說,多瞭解一種技術是好的,也許以後哪個地方就能用到呢。
canal原理相對比較簡單(基於mysql的主從):
-
canal模擬mysql slave的互動協議,偽裝自己為mysql slave(從),向mysql master傳送dump協議
-
-
canal解析binary log物件(原始為byte流)
開啟binlog模式
canal是基於mysql的主從模式實現的,所以必須先開啟binlog 連線到mysql中,並修改/etc/mysql/mysql.conf.d/mysqld.cnf 執行如下命令,編輯mysql配置檔案cd /etc/mysql/mysql.conf.d
vi mysqld.cnf
修改mysqld.cnf配置檔案,新增如下配置:
上圖配置如下:
log-bin=/var/lib/mysql/mysql-bin
server-id=12345
使用root賬號建立使用者並授予許可權(可以不賦予超級管理員許可權)
create user canal@'%' IDENTIFIED by 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES;
重啟mysql容器
docker restart mysql
docker pull docker.io/canal/canal-server
容器安裝:
docker run -p 11111:11111 --name canal -d docker.io/canal/canal-server
進入容器,修改核心配置canal.properties 和instance.properties,canal.properties 是canal自身的配置,instance.properties是需要同步資料的資料庫連線配置。
執行命令如下:
docker exec -it canal /bin/bash cd canal-server/conf/ vi canal.properties cd example/ vi instance.properties
修改canal.properties的id,不能和mysql的server-id重複,如下圖:
修改instance.properties,配置資料庫連線地址:
這裡的
mysql 資料解析關注的表,Perl正則表示式. 多個正則之間以逗號(,)分隔,轉義符需要雙斜槓(\\) 常見例子: 1. 所有表:.* or .*\\..* 2. canal schema下所有表: canal\\..* 3. canal下的以canal打頭的表:canal\\.canal.* 4. canal schema下的一張表:canal.test1 5. 多個規則組合使用:canal\\..*,mysql.test1,mysql.test2 (逗號分隔) 注意:此過濾條件只針對row模式的資料有效(ps. mixed/statement因為不解析sql,所以無法準確提取tableName進行過濾)
可以參考地址如下:
https://github.com/alibaba/canal/wiki/AdminGuide
配置完成後,設定開機啟動,並記得重啟canal。
docker update --restart=always canal
docker restart canal
(1)安裝輔助jar包(在自己搭建的微服務工程中)
在canal\spring-boot-starter-canal-master
中有一個工程starter-canal
,它主要提供了SpringBoot環境下canal
的支援,我們需要先安裝該工程,在starter-canal
目錄下執行mvn install
(2)canal微服務工程搭建
canal依賴 使用別人封裝好的,但是這個地址是通過匯入本地maven的方式,如果是線上環境構建的話,需要在伺服器做配置,所以挺麻煩。可以選擇去網上看canal客戶端原始碼,然後實現,也可以去找別人封裝好的包直接使用,網上搜一搜肯定有
依賴:
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <!--canal依賴-->
<dependency> <groupId>com.xpand</groupId> <artifactId>starter-canal</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency> </dependencies>
application.yml配置:
server: port: 18083 spring: application: name: canal eureka: client: service-url: defaultZone: http://127.0.0.1:7001/eureka instance: prefer-ip-address: true feign: hystrix: enabled: true #hystrix 配置 hystrix: command: default: execution: timeout: #如果enabled設定為false,則請求超時交給ribbon控制 enabled: true isolation: strategy: SEMAPHORE #canal配置 canal: client: instances: example: host: 192.168.211.132 port: 11111
(3)監聽建立
import com.alibaba.fastjson.JSONObject; import com.alibaba.otter.canal.protocol.CanalEntry; import com.changgou.content.entity.Content; import com.changgou.content.feign.ContentFeign; import com.xpand.starter.canal.annotation.*; import entity.Result; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import java.util.List; @CanalEventListener public class CanalDataEventListener { @Autowired private ContentFeign contentFeign; @Autowired private StringRedisTemplate stringRedisTemplate; /*** * 增加資料監聽 * @param eventType * @param rowData */ /*@InsertListenPoint public void onEventInsert(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { rowData.getAfterColumnsList().forEach((c) -> System.out.println("By--Annotation: " + c.getName() + " :: " + c.getValue())); }*/ /*** * 修改資料監聽 * @param rowData */ /*@UpdateListenPoint public void onEventUpdate(CanalEntry.RowData rowData) { System.out.println("UpdateListenPoint"); rowData.getAfterColumnsList().forEach((c) -> System.out.println("By--Annotation: " + c.getName() + " :: " + c.getValue())); }*/ /*** * 刪除資料監聽 * @param eventType */ /*@DeleteListenPoint public void onEventDelete(CanalEntry.EventType eventType) { System.out.println("DeleteListenPoint"); }*/ /*** * 自定義資料修改監聽 * @param eventType * @param rowData * example一定要跟配置檔案中保持一致,並且跟canal中資料夾的名稱保持一致,canal配置的哪個就用哪個 */ @ListenPoint(destination = "example", schema = "changgou_content", table = {"tb_content_category", "tb_content"}, eventType = { CanalEntry.EventType.UPDATE, CanalEntry.EventType.INSERT, CanalEntry.EventType.DELETE}) public void onEventCustomUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { System.out.println("型別:"+eventType.name()); System.err.println("DeleteListenPoint"); rowData.getAfterColumnsList().forEach((c) -> System.out.println("By--Annotation: " + c.getName() + " :: " + c.getValue())); //這裡能獲取到所有被修改資料的欄位名和值,然後執行業務程式碼就可以了,最後更新到redis中就實現了需求 Long categoryId = Long.valueOf(getColumnValue(eventType, rowData)); Result<List<Content>> byCategoryId = contentFeign.findByCategoryId(categoryId); String o = JSONObject.toJSONString(byCategoryId); stringRedisTemplate.boundValueOps("content_"+categoryId).set(o); } private String getColumnValue(CanalEntry.EventType eventType, CanalEntry.RowData rowData){ //判斷是delete還是update和insert String value = ""; if (eventType == CanalEntry.EventType.DELETE){ List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList(); for (CanalEntry.Column column : beforeColumnsList) { if (column.getName().equals("category_id")){ value = column.getValue(); break; } } }else { List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList(); for (CanalEntry.Column column : afterColumnsList) { if (column.getName().equals("category_id")){ value = column.getValue(); break; } } } return value; } }
(4)啟動類建立:
/** * Created by 錢溢 Luna on 2022/5/23 18:29 * 作用:監聽canal服務端獲取資料,實現mysql資料發生變更,用canal監聽,當canal監聽到資料後,同步到redis中 */ @SpringBootApplication(exclude = DataSourceAutoConfiguration.class) @EnableEurekaClient @EnableCanalClient @EnableFeignClients(basePackages = "com.changgou.content.feign")//加入註解才能使用feign呼叫,指定basePackages(feign介面的包路徑)不指定預設會掃描所有 public class CanalApplication { public static void main(String[] args) { SpringApplication.run(CanalApplication.class,args); } }
(5)測試
啟動canal微服務,然後修改任意資料庫的表資料,檢視canal微服務後臺輸出,再檢視redis中的資料是否實時更新