Kafka及Spring&Kafka整合
由於某專案的訊息佇列使用了Spring整合Kafka,開發中我需要使用kafka客戶端模擬生產者和消費者。簡單瞭解了一下Kafka,掃盲貼,先標記一下,日後再深入學習。
一、Kafka簡介
1.1 簡介
kafka是一種高吞吐量的分散式釋出訂閱訊息系統,它可以處理消費者規模的網站中的所有動作流資料。這種動作(網頁瀏覽,搜尋和其他使用者的行動)是在現代網路上的許多社會功能的一個關鍵因素。這些資料通常是由於吞吐量的要求而通過處理日誌和日誌聚合來解決。 在大資料系統中,常常會碰到一個問題,整個大資料是由各個子系統組成,資料需要在各個子系統中高效能,低延遲的不停流轉。傳統的企業訊息系統並不是非常適合大規模的資料處理。為了已在同時搞定線上應用(訊息)和離線應用(資料檔案,日誌)Kafka就出現了。 簡單點概括一下:Kafka是一個分散式的,可劃分的,高效能,低延遲的,冗餘備份的永續性的日誌服務。它主要用於處理活躍的流式資料。
1.2 特點
* 高吞吐量
* 可進行持久化操作
* 分散式
1.3 元件
Topic,Broker,Partition,Message,Producer,Consumer,Zookpeer
1.3.1 名詞解釋
服務: Topic:主題,Kafka處理的訊息的不同分類。 Broker:訊息代理,Kafka叢集中的一個kafka服務節點稱為一個broker,主要儲存訊息資料。存在硬碟中。每個topic都是有分割槽的。 Partition:Topic物理上的分組,一個topic在broker中被分為1個或者多個partition,分割槽在建立topic的時候指定。 Message:訊息,是通訊的基本單位,每個訊息都屬於一個partition 服務相關: Producer:訊息和資料的生產者,向Kafka的一個topic釋出訊息。 Consumer:訊息和資料的消費者,定於topic並處理其釋出的訊息。 Zookeeper:協調kafka的正常執行。
1.4 應用場景
構建實時的流資料管道,可靠地獲取系統和應用程式之間的資料。 構建實時流的應用程式,對資料流進行轉換或反應。
二、Kafka搭建
2.1 安裝
教程很多,就不寫了。
2.2 配置
配置檔案放在kafka下config下
* consumer.properites 消費者配置 * producer.properties 生產者配置 * server.properties kafka伺服器的配置 broker.id 申明當前kafka伺服器在叢集中的唯一ID,需配置為integer,並且叢集中的每一個kafka伺服器的id都應是唯一的 listeners 申明此kafka伺服器需要監聽的埠號,如果是在本機上跑虛擬機器執行可以不用配置本項,預設會使用localhost的地址,如果是在遠端伺服器上執行則必須配置,例如: listeners=PLAINTEXT:// 192.168.180.128:9092。並確保伺服器的9092埠能夠訪問 zookeeper.connect 申明kafka所連線的zookeeper的地址 ,需配置為zookeeper的地址
上面配置檔案中listeners的配置尤其注意,剛開始整的時候,沒注意自己編寫producer和cusmer時報錯,如下:
Connection to node -1 could not be established. Broker may not be available.
就是因為配置檔案中的PLAINTEXT跟我請求的內容不同。
具體配置教程很多,也不寫了。
三、Kafka操作
3.1 Topic操作
3.1.1 建立Topic
kafka-topics.sh --create --topic hbase --zookeeper ip1:port --partitions 3 --replication-factor 1
建立topic過程的問題,replication-factor個數不能超過broker的個數
建立topic後,可以在../data/kafka目錄檢視到分割槽的目錄
3.1.2 檢視Topic列表
kafka-topics.sh --list --zookeeper ip:port
3.1.3 檢視某一個具體的Topic
kafka-topics.sh --describe xxx --zookeeper ip:port
3.1.4 修改Topic
kafka-topics.sh --alter --topic topic-test --zookeeper ip:port --partitions 3
不能修改replication-factor,以及只能對partition個數進行增加,不能減少
3.1.5 刪除Topic
kafka-topics.sh --delete --topic topic-test --zookeeper ip:port
徹底刪除一個topic,需要在server.properties中配置delete.topic.enable=true,否則只是標記刪除
配置完成之後,需要重啟kafka服務。
3.2 生產者操作
sh kafka-console-producer.sh --broker-list ip1:port,ip2:port,ip3:port --sync --topic kafka-topic-test
生產資料的時候需要指定:當前資料流向哪個broker,以及哪一個topic
3.3 消費者操作
sh kafka-console-consumer.sh --zookeeper ip1:port,ip2:port,ip3:port --topic kafka-topic-test --from-beginning
--from-begining 獲取最新以及歷史資料
黑白名單(暫時未用到)
--blacklist 後面跟需要過濾的topic的列表,使用","隔開,意思是除了列表中的topic之外,都能接收其它topic的資料
--whitelist 後面跟需要過濾的topic的列表,使用","隔開,意思是除了列表中的topic之外,都不能接收其它topic的資料
四、Springboot整合Kafka
這個只是個人使用的簡單的測試環境搭建,可能有很多地方有問題,以後深入學習時再檢查。
4.1 整合
springboot整合kafka的預設配置都在org.springframework.boot.autoconfigure.kafka包裡面。直接使用即可。flag=深入學習kafka。
4.2 pom.xml配置
<dependency>
<!--引入spring和kafka整合的jar-->
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
<exclusion>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-hystrix</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>1.0.1</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-task-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.3.5.RELEASE</version><!--$NO-MVN-MAN-VER$-->
</dependency>
<dependency>
4.3 Producer配置
@Configuration
@EnableKafka
public class KafkaProducer {
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConfig.BOOTSTRAP_SERVERS);
props.put(ProducerConfig.RETRIES_CONFIG, KafkaConfig.PRODUCER_RETRIES);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, KafkaConfig.PRODUCER_BATCH_SIZE);
props.put(ProducerConfig.LINGER_MS_CONFIG, KafkaConfig.PRODUCER_LINGER_MS);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, KafkaConfig.PRODUCER_BUFFER_MEMORY);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put("advertised.host.name",KafkaConfig.BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "1");
System.out.println("KafkaConfig.BOOTSTRAP_SERVERS:"+KafkaConfig.BOOTSTRAP_SERVERS);
return props;
}
/** 獲取工廠 */
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
/** 註冊例項 */
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
4.4 使用生產者
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
kafkaTemplate.send("kafka-topic-test", "helloWorld");
4.5 Consumer配置
@Configuration
@EnableKafka
public class KafkaConsumer {
private final static Logger log = LoggerFactory.getLogger(KafkaConsumer .class);
@KafkaListener(topics = {"kafka-topic-test"})
public void consume(ConsumerRecord<?, ?> record) {
String topic = record.topic();
String value = record.value().toString();
System.out.println("partitions:"+record.partition()+","+"offset:"+record.offset()+",value="+value);
MqConsumerRunnable runnable = new MqConsumerRunnable(topic,value);
executor.execute(runnable);
}
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConfig.BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "1");
props.put("auto.offset.reset", "latest");// 一般配置earliest 或者latest 值
return props;
}
/** 獲取工廠 */
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
/** 獲取例項 */
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
}
4.6 使用消費者
public class KafkaMessageListener implements MessageListener<String, String> {
private static Logger LOG = LoggerFactory.getLogger(KafkaMessageListener.class);
@Autowired
private AppProperties appProperties;
@Override
public void onMessage(ConsumerRecord<String, String> data) {
LOG.info("消費訊息topic:{} value {}", data.topic(), data.value());
String topic = data.topic();
String content = data.value();
//可同時監聽多個topic,根據不同topic處理不同的業務
if (topic.equals("topica")) {
LOG.info("###############topic:{} value:{}" ,topic,content);
} else if (topic.equals("topicb")) {
LOG.info("###############topic:{} value:{}" ,topic,content);
}
}
}
4.7 注意
kafkaTemplate.send("kafka-topic-test", "helloWorld");
@KafkaListener(topics = {"kafka-topic-test"})
topic需要對應
4.8 使用
本地執行以後,到kafka伺服器上可以進行消費者和生產者的模擬傳送與接收資訊。
五、總結
上述方法進行模擬測試,可以測試,但是總感覺問題很大,卻又找不出問題,這個後期再說吧,先湊合用。 有關Kafka的具體學習,後期補上。