1. 程式人生 > 其它 >Spring Assistant框架搭建訊息佇列寫入Kafka消費

Spring Assistant框架搭建訊息佇列寫入Kafka消費

前提:已安裝Java8、Maven

一、在Idea中建立SpringAssistant專案,選中web、ApacheKafka、lombok

二、匯入Pom檔案

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd
"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.5.3</version> <relativePath/> <!-- lookup parent from
repository --> </parent> <groupId>com.company</groupId> <artifactId>gmall-logger</artifactId> <version>0.0.1-SNAPSHOT</version> <name>gmall-logger</name> <description>gmall-logger</description> <properties> <java.version>1.8
</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <excludes> <exclude> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </exclude> </excludes> </configuration> </plugin> </plugins> </build> </project>

三、建立class檔案

package com.company.gmalllogger.controller;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
//@Controller
@RestController // =@Controller+@ResponseBody
public class LoggerController {

    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    @RequestMapping("test1")
//    @ResponseBody //返回普通的java物件
    public String test1(){
        System.out.println("success");
        return "success";
//        return "index.html"; //靜態頁面展示
    }

    //方法2:帶參查詢
    @RequestMapping("test2")
    public String test2(@RequestParam("name") String nn,
                        @RequestParam("age") int age){
        System.out.println(nn + ":" + age  );
        return "success";
    }

    //方法3:帶參查詢且給定age預設值
    @RequestMapping("test3")
    public String test3(@RequestParam("name") String nn,
                        @RequestParam(value = "age",defaultValue = "20") int age){
        System.out.println(nn + ":" + age  );
        return "success";
    }


    //對接Kafka
    @RequestMapping("applog")
    public String gerLog(@RequestParam("param") String logStr){
        System.out.println(logStr);
        //將行為資料儲存至日誌檔案並列印到控制檯
        log.info(logStr);

        //將資料寫入Kafka,主題是ods_base_log
        kafkaTemplate.send("ods_base_log",logStr);
        return "success";

    }
}

當你想要測試靜態頁面時,在resource.static目錄下建立index.html檔案並寫入

<!DCOTYPE html>
<html>
<h1>公司名稱</h1>
<h2>大資料</h2>>
<h3>靜態頁面的展示</h3>>
</html>

然後在瀏覽器中輸入:localhost:8080/test1即可檢視

四、配置application.properties檔案

# 應用名稱
spring.application.name=gmall-logger

# 應用服務 WEB 訪問埠
server.port=8081

#============== kafka ===================
# 指定kafka 代理地址,可以多個
spring.kafka.bootstrap-servers=hadoop201:9092

# 指定訊息key和訊息體的編解碼方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

五、配置logback.xml檔案

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <property name="LOG_HOME" value="d:/opt/module/logs" /> //此路徑是日誌寫入到本地的路徑,可自行更改
    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>%msg%n</pattern>
        </encoder>
    </appender>

    <appender name="rollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <file>${LOG_HOME}/app.log</file>
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <fileNamePattern>${LOG_HOME}/app.%d{yyyy-MM-dd}.log</fileNamePattern>
        </rollingPolicy>
        <encoder>
            <pattern>%msg%n</pattern>
        </encoder>
    </appender>

    <!-- 將某一個包下日誌單獨列印日誌 -->
    <logger name="com.company.gmalllogger.controller.LoggerController" //注意這裡需要根據自身情況來寫入
            level="INFO" additivity="false">
        <appender-ref ref="rollingFile" />
        <appender-ref ref="console" />
    </logger>

    <root level="error" additivity="false">
        <appender-ref ref="console" />
    </root>
</configuration>

loggername處的寫入方式:

六、在虛擬機器hadoop201中配置

前提:

環境搭建:

java8、zookeeper叢集、kafka叢集

上傳生產資料的jar包和application.yml檔案到/opt/module/gmall-logger/rt_log/目錄下

1.執行zookeeper叢集

/bin/zksh.sh start

2.執行kafka叢集

bin/kafka-server-start.sh --zookeeper hadoop201:2181 config/server.properties &

3.檢視kafka主題

bin/kafka-topics.sh --zookeeper hadoop102:2181 --list

4.建立topic

bin/kafka-topics.sh --zookeeper hadoop102:2181 \
--create --replication-factor 3 --partitions 1 --topic ods_base_log

選項說明:

--topic 定義topic名

--replication-factor 定義副本數

--partitions 定義分割槽數

5.刪除topic

bin/kafka-topics.sh --zookeeper hadoop102:2181 \
--delete --topic ods_base_log

需要server.properties中設定delete.topic.enable=true否則只是標記刪除或者直接重啟。

6.傳送訊息

bin/kafka-console-producer.sh \
--broker-list hadoop102:9092 --topic ods_base_log
>hello world
>kafka kafka

7.消費訊息

bin/kafka-console-consumer.sh \
--zookeeper hadoop102:2181 --from-beginning --topic ods_base_log

--from-beginning:會把ods_base_log主題中以往所有的資料都讀取出來。根據業務場景選擇是否增加該配置。

8.檢視某個topic詳情

bin/kafka-topics.sh --zookeeper hadoop102:2181 \
--describe --topic ods_base_log

七、修改application.yml檔案

ip地址需改成自己IPV4的真實地址

八、

  • 執行Windows上的Idea程式LoggerApplication
  • 執行rt_applog下的jar包
  • 啟動kafka消費者進行測試
    bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic ods_base_log

九、結果

1.jar包執行造資料:

2.kafka消費者消費資料

3.Idea控制檯列印資料

完畢。

不要為了追逐,而忘記當初的樣子。