1. 程式人生 > 程式設計 >解決ASP.NET Core中使用漏桶演算法限流的問題

解決ASP.NET Core中使用漏桶演算法限流的問題

1.準備安裝包

官網下載即可:

2.配置環境

2.1. 修改使用者最大程序數和檔案控制代碼數

vim /etc/security/limits.conf
#新增以下內容
*        hard    nproc           65536
*        soft    nproc           120000
*        hard    nofile          65536
*        soft    nofile          120000
#儲存退出

2.2. 修改虛擬記憶體

sudo vim /etc/sysctl.conf
#新增以下內容
vm.max_map_count=655360

儲存退出後執行以下命令:
sudo sysctl -p

3.開始安裝

3.1 安裝es

在hadoop103上解壓es安裝包

  1. 建立elk資料夾
    mkdir -p /ope/module/elk-7.13.4

  2. 解壓

tar -zxvf /opt/software/elk/elasticsearch-7.13.4-linux-x86_64.tar.gz -C /ope/module/elk-7.13.4
  1. 修改配置
cd /opt/module/elk-7.13.4/elasticsearch-7.13.4
vim config/elasticsearch.yml

#修改一下內容
# ---------------------------------- Cluster -----------------------------------
cluster.name: elk
# ------------------------------------ Node ------------------------------------
node.name: hadoop103
node.master: true
node.data: true
# ----------------------------------- Paths ------------------------------------
path.data: /opt/module/elk-7.13.4/elasticsearch-7.13.4/data
path.logs: /opt/module/elk-7.13.4/elasticsearch-7.13.4/logs
# ---------------------------------- Network -----------------------------------
network.host: hadoop103
http.port: 9200
transport.tcp.port: 9300
# --------------------------------- Discovery ----------------------------------
discovery.seed_hosts: ["hadoop101", "hadoop102","hadoop103"]
cluster.initial_master_nodes: [ "hadoop102","hadoop103"]
  1. 分發到其他節點
scp -r /opt/module/elk-7.13.4 lemo@hadoop101:/opt/module/
scp -r /opt/module/elk-7.13.4 lemo@hadoop102:/opt/module/

#修改另外兩臺的配置檔案

hadoop101上修改:
node.name: hadoop101
node.master: false
network.host: hadoop101

hadoop102上修改:
node.name: hadoop102
network.host: hadoop102

  1. 啟動es叢集
    配置一鍵啟停指令碼
vim es.sh
chmod u+x es.sh

#!/bin/bash

case $1 in
"start"){
        for es in hadoop101 hadoop102 hadoop103
        do
            ssh -T $es <<EOF
            /opt/module/elk-7.13.4/elasticsearch-7.13.4/bin/elasticsearch -d >> /dev/null
EOF
            yes | command
            echo 從節點 $es 啟動elasticsearch...[ done ]
            sleep 1
        done
};;
"stop"){
        for es in hadoop101 hadoop102 hadoop103
        do
            ssh -T $es <<EOF
            source /etc/profile.d/my_env.sh
            ps aux |grep elasticsearch |grep -v grep |awk '{print \$2}' |xargs kill -9
EOF
            echo 從節點 $es 停止elasticsearch...[ done ]
            sleep 1
        done
};;
"status"){
        echo ---------- es 狀態 ------------    
        curl http://hadoop103:9200/_cat/health?v
};;
esac

3.2 安裝logstash

  1. 解壓logstash

進入logstash安裝包所在目錄,執行解壓命令:

tar -zxvf logstash-7.13.4-linux-x86_64.tar.gz -C /opt/module/elk-7.13.4/

  1. 修改logstash配置

vim /opt/module/logstash-7.13.4/config/logstash.yml

  1. 分發並修改配置檔案http.host

3.3 安裝kibana

  1. 解壓kibana

tar -zxvf kibana-7.13.4-linux-x86_64.tar.gz -C /opt/module/elk-7.13.4/

  1. 修改配置檔案
cd /opt/module/elk-7.13.4/kibana-7.13.4-linux-x86_64/config

vim kibana.yml
#新增以下內容:
server.port: 5601
server.host: "hadoop103"
server.name: "lemo-elk"
elasticsearch.hosts: ["http://hadoop101:9200","http://hadoop102:9200","http://hadoop103:9200"]
i18n.locale: "zh-CN"
  1. 啟動kibana
cd /opt/module/elk-7.13.4/kibana-7.13.4-linux-x86_64/bin
nohup ./kibana >> /dev/null &

檢視kibana介面:

4.測試elk

簡單通過kafka控制檯生產消費的方式將資料接入es,並通過kibana檢視日誌

啟動zookeeper,kafka
建立一個topic用於測試

kafka-topics.sh --bootstrap-server hadoop101:9092 --create --topic first
#啟動一個控制檯生產者用於生產資料
kafka-console-producer.sh --bootstrap-server hadoop101:9092 --topic first

配置logstash
cd /opt/module/elk-7.13.4/logstash-7.13.4/
mkdir conf
vim conf/kafka_to_es.conf
#新增以下內容
input{
    kafka {
         bootstrap_servers => "hadoop101:9092,hadoop102:9092,hadoop103:9092"
         client_id => "test"
         group_id  => "test"
         topics => ["first"]
         auto_offset_reset => "earliest"
         auto_commit_interval_ms => "1000"
         decorate_events => true
             }
}
filter{
    grok {
            match => {
            "message" => "(?<field1>datatype.*)"
            }
    }
    kv {
            source => "field1"
            field_split => " "
            value_split => "="
    }
}
output{
    elasticsearch {
        hosts => ["hadoop101:9200","hadoop102:9200","hadoop103:9200"]
        index => "test-%{+YYYY.MM.dd}"
    }
}


#儲存退出後啟動logstash
./bin/logstash -f conf/kafka_to_es.conf

然後在kafka生產者控制檯輸入資料
qtAlert[72] datatype="system_audit" datatime="1639366113" action="add" type="monitor" comid="425356584254583" id="de1de7e3e2445cf5874" event_type="check" event_name="Linux-風險發現-風險總覽-檢視概覽資訊" event_src="console" os="linux" func="風險發現" subfunc="風險總覽" src_ip="xxx.xx.xxx.xx" region="區域網" req_id="1f3bcaf270a743f6bc9b038e8c9c71f5" user_name="[email protected]/xxx" user_type="子帳號" return_code="200" error_info=""

進入kibana介面

點選索引模式,建立索引模式並儲存

點選discover

選擇剛剛建立的索引模式,就可以檢視到剛剛生成的資料了

可以看到日誌已經按照logstash中配置的規則進行了欄位切分.

總結:

  1. 重點需要掌握logstash對不同格式的日誌進行切分,常用filter外掛的使用
  2. es優化
  3. kibana視覺化分析,圖表製作
一天一個小知識,日積月累,積沙成塔。。。