Spring Boot教程十二:整合Kafka
阿新 • • 發佈:2019-01-29
Kafka是最初由Linkedin公司開發,是一個分散式、支援分割槽的(partition)、多副本的(replica),基於zookeeper協調的分散式訊息系統,它的最大的特性就是可以實時的處理大量資料以滿足各種需求場景:比如基於hadoop的批處理系統、低延遲的實時系統、storm/Spark流式處理引擎,web/nginx日誌、訪問日誌,訊息服務等等,用scala語言編寫,Linkedin於2010年貢獻給了Apache基金會併成為頂級開源 專案。Kafka中釋出訂閱的物件是topic。我們可以為每類資料建立一個topic,把向topic釋出訊息的客戶端稱作producer,從topic訂閱訊息的客戶端稱作consumer。Producers和consumers可以同時從多個topic讀寫資料。一個kafka叢集由一個或多個broker伺服器組成,它負責持久化和備份具體的kafka訊息。
- Broker
- Topic:一類訊息,訊息存放的目錄即主題,例如page view日誌、click日誌等都可以以topic的形式存在,Kafka叢集能夠同時負責多個topic的分發。
- Partition:topic物理上的分組,一個topic可以分為多個partition,每個partition是一個有序的佇列
- Segment:partition物理上由多個segment組成,每個Segment存著message資訊
- Producer : 生產message傳送到topic
- Consumer : 訂閱topic消費message, consumer作為一個執行緒來消費
- Consumer Group:一個Consumer Group包含多個consumer, 這個是預先在配置檔案中配置好的。各個consumer(consumer 執行緒)可以組成一個組(Consumer group ),partition中的每個message只能被組(Consumer group ) 中的一個consumer(consumer 執行緒 )消費,如果一個message可以被多個consumer(consumer 執行緒 ) 消費的話,那麼這些consumer必須在不同的組。Kafka不支援一個partition中的message由兩個或兩個以上的consumer thread來處理,即便是來自不同的consumer group的也不行。它不能像AMQ那樣可以多個BET作為consumer去處理message,這是因為多個BET去消費一個Queue中的資料的時候,由於要保證不能多個執行緒拿同一條message,所以就需要行級別悲觀所(for update),這就導致了consume的效能下降,吞吐量不夠。而kafka為了保證吞吐量,只允許一個consumer執行緒去訪問一個partition。如果覺得效率不高的時候,可以加partition的數量來橫向擴充套件,那麼再加新的consumer thread去消費。這樣沒有鎖競爭,充分發揮了橫向的擴充套件性,吞吐量極高。這也就形成了分散式消費的概念。
springboot專案中使用kafka的配置如下:
首先引入必須的依賴jar:
<!-- springboot整合kafka -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
<version>4.3.6.RELEASE</version>
<classifier>sources</classifier>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.1.7.RELEASE</version>
</dependency>
配置檔案如下:
#============== kafka ===================
#kafka相關配置
spring.kafka.bootstrap-servers=39.105.104.132:9092
#設定一個預設組
spring.kafka.consumer.group-id=alarmTopic
#key-value序列化反序列化
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#每次批量傳送訊息的數量
spring.kafka.producer.batch-size=65536
spring.kafka.producer.buffer-memory=524288
訊息生產者:
/**
* @author Shuyu.Wang
* @package:com.ganinfo.kafka
* @className:
* @description:生產者
* @date 2018-07-05 16:30
**/
@Component
public class KafkaSender {
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
/** * 傳送訊息到kafka */
public void sendChannelMess(String channel, String message) {
kafkaTemplate.send(channel,message);
}
訊息消費者:/**
* @author Shuyu.Wang
* @package:com.ganinfo.kafka
* @className:
* @description:消費者
* @date 2018-07-05 16:31
**/
@Component
public class KafkaConsumer {
@Autowired
private SendMessageUtil sendMessageUtil;
/**
* 監聽alarmTopic主題,有訊息就讀取 *
* @param message
*/
@KafkaListener(topics = {"alarmTopic"})
public void receiveMessage(String message) {
//收到通道的訊息之後執行秒殺操作
System.out.println("KafkaConsumer的訂閱訊息:"+message);
sendMessageUtil.send("h1","1",message);
}
}
測試類:
@Autowired
private KafkaSender kafkaSender;
private static final String ALRAM_TOPIC = "alarmTopic";
@Autowired
private AlarmService alarmService;
@ResponseBody
@RequestMapping(value = "/kafka", method = RequestMethod.GET)
public ApiResult getCarInout(@RequestParam(value = "refId") String refId, @RequestParam(value = "passType") Integer passType, @RequestParam(value = "type") Integer type) {
ApiResult apiResult = new ApiResult();
try {
Map<String, Object> map = new HashMap<>();
map.put("refId", refId);
map.put("platformCode", "650106");
map.put("passType", passType);
map.put("type", type);
map.put("level", 2);
map.put("Gettime", 1000);
kafkaSender.sendChannelMess(ALRAM_TOPIC, GsonUtil.GsonString(map));
System.out.println("傳送資料:" + GsonUtil.GsonString(map));
// kafkaTemplate.send(ALRAM_TOPIC, "alarm", GsonUtil.GsonString(map));
} catch (Exception e) {
e.printStackTrace();
}
return apiResult;
}
kafka的相關程式碼就完成了,請求測試方法,就會在消費者的類中列印相關的引數了,另外可以通過外掛檢視kafka的相關主題和消費者情況。