Spring Assistant框架搭建訊息佇列寫入Kafka消費
阿新 • • 發佈:2021-08-01
前提:已安裝Java8、Maven
一、在Idea中建立SpringAssistant專案,選中web、ApacheKafka、lombok
二、匯入Pom檔案
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.5.3</version> <relativePath/> <!-- lookup parent fromrepository --> </parent> <groupId>com.company</groupId> <artifactId>gmall-logger</artifactId> <version>0.0.1-SNAPSHOT</version> <name>gmall-logger</name> <description>gmall-logger</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <configuration> <excludes> <exclude> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </exclude> </excludes> </configuration> </plugin> </plugins> </build> </project>
三、建立class檔案
package com.company.gmalllogger.controller; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.RestController; @Slf4j //@Controller @RestController // =@Controller+@ResponseBody public class LoggerController { @Autowired private KafkaTemplate<String,String> kafkaTemplate; @RequestMapping("test1") // @ResponseBody //返回普通的java物件 public String test1(){ System.out.println("success"); return "success"; // return "index.html"; //靜態頁面展示 } //方法2:帶參查詢 @RequestMapping("test2") public String test2(@RequestParam("name") String nn, @RequestParam("age") int age){ System.out.println(nn + ":" + age ); return "success"; } //方法3:帶參查詢且給定age預設值 @RequestMapping("test3") public String test3(@RequestParam("name") String nn, @RequestParam(value = "age",defaultValue = "20") int age){ System.out.println(nn + ":" + age ); return "success"; } //對接Kafka @RequestMapping("applog") public String gerLog(@RequestParam("param") String logStr){ System.out.println(logStr); //將行為資料儲存至日誌檔案並列印到控制檯 log.info(logStr); //將資料寫入Kafka,主題是ods_base_log kafkaTemplate.send("ods_base_log",logStr); return "success"; } }
當你想要測試靜態頁面時,在resource.static目錄下建立index.html檔案並寫入
<!DCOTYPE html> <html> <h1>公司名稱</h1> <h2>大資料</h2>> <h3>靜態頁面的展示</h3>> </html>
然後在瀏覽器中輸入:localhost:8080/test1即可檢視
四、配置application.properties檔案
# 應用名稱 spring.application.name=gmall-logger # 應用服務 WEB 訪問埠 server.port=8081 #============== kafka =================== # 指定kafka 代理地址,可以多個 spring.kafka.bootstrap-servers=hadoop201:9092 # 指定訊息key和訊息體的編解碼方式 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
五、配置logback.xml檔案
<?xml version="1.0" encoding="UTF-8"?> <configuration> <property name="LOG_HOME" value="d:/opt/module/logs" /> //此路徑是日誌寫入到本地的路徑,可自行更改 <appender name="console" class="ch.qos.logback.core.ConsoleAppender"> <encoder> <pattern>%msg%n</pattern> </encoder> </appender> <appender name="rollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender"> <file>${LOG_HOME}/app.log</file> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <fileNamePattern>${LOG_HOME}/app.%d{yyyy-MM-dd}.log</fileNamePattern> </rollingPolicy> <encoder> <pattern>%msg%n</pattern> </encoder> </appender> <!-- 將某一個包下日誌單獨列印日誌 --> <logger name="com.company.gmalllogger.controller.LoggerController" //注意這裡需要根據自身情況來寫入 level="INFO" additivity="false"> <appender-ref ref="rollingFile" /> <appender-ref ref="console" /> </logger> <root level="error" additivity="false"> <appender-ref ref="console" /> </root> </configuration>
loggername處的寫入方式:
六、在虛擬機器hadoop201中配置
前提:
環境搭建:
java8、zookeeper叢集、kafka叢集
上傳生產資料的jar包和application.yml檔案到/opt/module/gmall-logger/rt_log/目錄下
/bin/zksh.sh start
2.執行kafka叢集
bin/kafka-server-start.sh --zookeeper hadoop201:2181 config/server.properties &
3.檢視kafka主題
bin/kafka-topics.sh --zookeeper hadoop102:2181 --list
4.建立topic
bin/kafka-topics.sh --zookeeper hadoop102:2181 \ --create --replication-factor 3 --partitions 1 --topic ods_base_log
選項說明:
--topic 定義topic名
--replication-factor 定義副本數
--partitions 定義分割槽數
5.刪除topic
bin/kafka-topics.sh --zookeeper hadoop102:2181 \ --delete --topic ods_base_log
需要server.properties中設定delete.topic.enable=true否則只是標記刪除或者直接重啟。
6.傳送訊息
bin/kafka-console-producer.sh \ --broker-list hadoop102:9092 --topic ods_base_log >hello world >kafka kafka
7.消費訊息
bin/kafka-console-consumer.sh \ --zookeeper hadoop102:2181 --from-beginning --topic ods_base_log
--from-beginning:會把ods_base_log主題中以往所有的資料都讀取出來。根據業務場景選擇是否增加該配置。
8.檢視某個topic詳情
bin/kafka-topics.sh --zookeeper hadoop102:2181 \ --describe --topic ods_base_log
七、修改application.yml檔案
ip地址需改成自己IPV4的真實地址
八、
- 執行Windows上的Idea程式LoggerApplication
- 執行rt_applog下的jar包
- 啟動kafka消費者進行測試
bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic ods_base_log
九、結果
1.jar包執行造資料:
2.kafka消費者消費資料
3.Idea控制檯列印資料
完畢。
不要為了追逐,而忘記當初的樣子。