spring-kafka整合介紹
一 spring-kafka介紹
- spring-kafka是在kafka-clients的基礎上的封裝。
- 主要提供KafkaTemplate
二 pom 配置
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.1.6.RELEASE</version>
</dependency >
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-messaging</artifactId>
<version>4.3.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.retry</groupId >
<artifactId>spring-retry</artifactId>
<version>1.1.3.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version >0.10.0.0</version>
</dependency>
- 只需要配置spring-kafka,他內部依賴了kafka-clients,可以根據spring的verison版本調整spring-kafka版號。
- 請不依賴kafka_2.10,spring-integration-kafka,貌似只有低版本需要。
三 producer配置
<context:property-placeholder location="classpath:kafka.properties"/>
<!-- 定義producer的引數 http://blog.csdn.net/molingduzun123/article/details/51785141 -->
<bean id="producerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="${bootstrap.servers}"/>
<entry key="group.id" value="0"/>
<entry key="retries" value="10"/>
<entry key="batch.size" value="16384"/>
<entry key="linger.ms" value="1"/>
<entry key="buffer.memory" value="33554432"/>
<entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
<entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
</map>
</constructor-arg>
</bean>
<!-- 建立kafkatemplate需要使用的producerfactory bean -->
<bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<ref bean="producerProperties"/>
</constructor-arg>
</bean>
<!-- 建立kafkatemplate bean,使用的時候,只需要注入這個bean,即可使用template的send訊息方法 -->
<bean id="KafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg ref="producerFactory"/>
<constructor-arg name="autoFlush" value="true"/>
<property name="defaultTopic" value="page_visits5"/>
</bean>
1.hashMap引數集合,可以根據自己需求調整。
2.KafkaTemplate提供各種豐富方法。
public class SpringKafkaProducer extends WebBaseTest {
private static final Logger logger = LoggerFactory.getLogger(SnapshotController.class);
@Autowired
private KafkaTemplate<String, String> kafkaProductTemplate;
@Test
public void assignPartitionByKey() throws Exception {
try {
//Assign topicName to string variable
String topicName = "page_visits5";
for (int i = 0; i < 50; i++) {
for(int j=0;j<2;j++) {
ListenableFuture<SendResult<String, String>> future=kafkaProductTemplate.send(topicName,
Integer.toString(j),"ddddddddd洪10002" + i);
logger.info("Message sent successfully");
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
四 consumer配置
- consumer提供四種MessageListener
- MessageListener –單條訊息自動提交.
- BatchMessageListener –批量訊息自動提交,就是ConsumerRecords
<!-- consumer -->
<!-- 定義consumer的引數 -->
<bean id="consumerProperties" class="java.util.HashMap">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="${bootstrap.servers}"/>
<entry key="group.id" value="group-new"/>
<entry key="enable.auto.commit" value="false"/>
<entry key="auto.commit.interval.ms" value="1000"/>
<entry key="session.timeout.ms" value="15000"/>
<entry key="max.poll.records" value="10"/>
<entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
<entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
</map>
</constructor-arg>
</bean>
<!-- 建立consumerFactory bean -->
<bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
<constructor-arg>
<ref bean="consumerProperties"/>
</constructor-arg>
</bean>
<!-- 實際執行訊息消費的類 -->
<bean id="messageListernerConsumerService" class="com.calm.b.kafka.service.SpringKafkaConsumerListener"/>
<bean id="springKafkaConsumerAckListener" class="com.calm.b.kafka.service.SpringKafkaConsumerAckListener"/>
<bean id="ackmode" class="org.springframework.beans.factory.config.FieldRetrievingFactoryBean">
<property name="staticField"
value="org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE" />
</bean>
<!-- 消費者容器配置資訊 -->
<bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
<constructor-arg value="page_visits6"/>
<property name="messageListener" ref="springKafkaConsumerAckListener"/>
<property name="pollTimeout" value="1000"/>
<property name="ackMode" ref="ackmode" />
</bean>
<!-- 建立kafkatemplate bean,使用的時候,只需要注入這個bean,即可使用template的send訊息方法 -->
<bean id="messageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer"
init-method="doStart">
<constructor-arg ref="consumerFactory"/>
<constructor-arg ref="containerProperties"/>
<property name="concurrency" value="3"/>
</bean>
- 手動確認的消費者
/**
* @author andrexu
* @create 2017-07-13
*/
public class SpringKafkaConsumerAckListener implements AcknowledgingMessageListener<String, String> {
private Logger log = LoggerFactory.getLogger(KafkaConsumeService.class);
@Override
public void onMessage(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
log.info(record.toString());
acknowledgment.acknowledge();
}
}
- 自動確認消費者
/**
* @author andrexu
* @create 2017-07-13
*/
public class SpringKafkaConsumerListener implements MessageListener<Integer, String> {
private Logger log = LoggerFactory.getLogger(KafkaConsumeService.class);
@Override
public void onMessage(ConsumerRecord<Integer, String> consumerRecord) {
log.info("======"+consumerRecord);
}
}
五 @KafkaListener的用法
同上面consumer消費方式的一樣,也四種不同消費方式,不同消費方式需要不同的配置支援。
@KafkaListener 需要@EnableKafka來開啟配置,監聽類SpringKafkaConsumerServce類也需要在配置bean裡初始化。
/**
* @author andrexu
* @create 2017-07-13
*/
@Configuration
@EnableKafka
public class kafkaConfig {
private Logger log = LoggerFactory.getLogger(kafkaConfig.class);
@Autowired
private DefaultKafkaConsumerFactory consumerFactory;
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory
() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(1);
factory.getContainerProperties().setPollTimeout(1000);
log.info("init kafkaListener annotation container Factory");
return factory;
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> consumerProps =new HashedMap();
consumerProps.putAll(consumerFactory.getConfigurationProperties());
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return consumerProps;
}
@Bean
public ConsumerFactory<String, String> manualConsumerFactory() {
Map<String, Object> configs = consumerConfigs();
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new DefaultKafkaConsumerFactory<>(configs);
}
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
kafkaManualAckListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(manualConsumerFactory());
ContainerProperties props = factory.getContainerProperties();
props.setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
props.setIdleEventInterval(100L);
// factory.setRecordFilterStrategy(manualFilter());
props.setPollTimeout(1000L);
factory.setAckDiscarded(true);
// factory.setRetryTemplate(new RetryTemplate());
factory.setRecoveryCallback(new RecoveryCallback<Void>() {
@Override
public Void recover(RetryContext context) throws Exception {
return null;
}
});
return factory;
}
/**
* 先init他們
* @return
*/
@Bean
public SpringKafkaConsumerServce kafkaListeners() {
return new SpringKafkaConsumerServce();
}
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Lazy;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;
/**
* @author andrexu
* @create 2017-07-13
*/
public class SpringKafkaConsumerServce {
private Logger log = LoggerFactory.getLogger(SpringKafkaConsumerServce.class);
@KafkaListener(id = "bar5", topicPartitions =
{ @TopicPartition(topic = "page_visits5", partitions = {"0"}),
},group="group-new" )
public void listen(ConsumerRecord<String, String> record) {
log.info("only partion zero :"+ record);
}
@KafkaListener(id = "bar6", topicPartitions =
{ @TopicPartition(topic = "page_visits5", partitions = {"1"}),
},group="group-new" )
public void listenByTwo(ConsumerRecord<String, String> record) {
log.info("only two partion zero :"+ record);
}
@KafkaListener(id = "baz",topics ={"page_visits8"} ,group="group-new",
containerFactory = "kafkaManualAckListenerContainerFactory")
public void listenByOne(ConsumerRecord<String, String> record, Acknowledgment ack) {
log.info("only partion one :"+ record);
ack.acknowledge();
}
// @KafkaListener(id = "bar3",topics ={"page_visits8"} ,group="group-new" )
// public void listenByOne(ConsumerRecord<String, String> record) {
//
// log.info("only partion one :"+ record);
//
// }
}
相關推薦
spring-kafka整合介紹
一 spring-kafka介紹 spring-kafka是在kafka-clients的基礎上的封裝。 主要提供KafkaTemplate 二 pom 配置 <dependency> <groupId
Kafka及Spring&Kafka整合
由於某專案的訊息佇列使用了Spring整合Kafka,開發中我需要使用kafka客戶端模擬生產者和消費者。簡單瞭解了一下Kafka,掃盲貼,先標記一下,日後再深入學習。 一、Kafka簡介 1.1 簡介 kafka是一種高吞吐量的分散式釋
(一)Spring Cloud— 子項目、未來 整合介紹
springcloud 電子商務 子項目 雲架構 微服務 Spring Cloud是一系列框架的有序集合。利用Spring Boot的開發模式簡化了分布式系統基礎設施的開發,如服務發現、註冊、配置中心、消息總線、負載均衡、斷路器、數據監控等(這裏只簡單的列了一部分),都可以用Spring
Spring Cloudt整合Netflix Archaius介紹
1.概述 Netflix Archaius 是一個功能強大的配置管理庫。它是一個可用於從許多不同來源收集配置屬性的框架,提供對配置資訊的快速及執行緒安全訪問。 除此之外,Archaius允許屬性在執行時動態更改,使系統無需重新啟動應用程式即可獲得這些變化。 在這個介紹性文章中,我
Spring cloud 整合kafka
Kafka配置 一.安裝 wget http://mirror.bit.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz 獲取當前版本 tar -xzvf 解壓 二.配置 listeners=P
spring boot整合mongodb使用簡單介紹 spring整合mongo使用簡單介紹 spring整合mongoDB使用簡單介紹
最近在專案中使用到了mongodb,第一次用,各種百度加問大佬,簡單記錄下自己的理解,一是希望能幫助到同樣要學習mongo的同學,另外就是以後可以看一下複習複習。 簡單理解 第一步匯入mongo的依賴 <!--mongodb--> <dependen
訊息佇列之非同步訊息基本概念以及ActiveMQ整合Spring常用用法介紹
一 簡介 (1)非同步訊息: 所謂非同步訊息,跟RMI遠端呼叫、webservice呼叫是類似的,非同步訊息也是用於應用程式之間的通訊。但是它們之間的區別是: RMI、Hession/Burlap、webservice等遠端呼叫機制是同步的。也就是說,當客戶端呼叫遠端方法時,客戶端
spring boot整合kafka
1.pom.xml <dependency> <groupId> org.apache.kafka</groupId> <artifactId> kafka_2.10</artifactId&
spring boot整合shiro之shiro過濾器介紹
過濾器鏈條配置說明 1、一個URL可以配置多個Filter,使用逗號分隔 2、當設定多個過濾器時,全部驗證通過,才視為通過 3、部分過濾器可指定引數,如perms,roles Shiro內建的FilterChain anon(org.apac
Kafka整合Spring-AcknowledgeMessageListener介面實現
前言 因工作需要,需在系統利用Kafka監聽介面,實現訊息佇列中,對訊息的消費,首選Kafka,因為看中其超高的吞吐量。 基本概念 1 Producer: 特指訊息的生產者 2 Consumer :特指訊息的消費者 3 Consumer Group
spring-boot 整合kafka單節點訊息傳送與接收
springboot還處於學習階段,又同時在學習kafka,兩者結合,繼續學習。 1、官網下載kafka 2、解壓 3、對於單節點來說,按照官網上操作即可實現訊息的傳送和接收。 但是對於客戶端,是通過 @KafkaListener 註解監聽生產者傳送的訊
Spring Cloud 整合 kafka
kafka的安裝部署請參考:kafka安裝部署1、在pom.xml裡面新增kafka的maven依賴<dependency> <groupId>org.springframew
Spring整合Kafka之spring-kafka
配置檔案的方式實現spring整和kafka: 此文主要講述的內容: 1,連線kafka伺服器的配置 2,kafka-customer:消費者配置 3,kafka-provider:提供者配置 4,KfkaUtils:根據topic傳送訊息
Spring註解方式整合Kafka(spring-kafka的使用)
import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common
Spring 與 Kafka整合實戰
首先下載解壓zookeeper,選擇合適的映象站點以加快下載速度。我們可以將zookeeper加到系統服務中, 增加一個/etc/init.d/zookeeper檔案。 cd /opt wget http://apache.fayea.com/apache-mirror/zookeeper/zookeep
kafka學習筆記(三)spring boot整合kafka0.9.0.1(使用配置類)
spring boot 版本:1.5.6引入關於kafka的相關jar <dependency> <groupId>org.springframework.kafka</groupI
Spring kafka Integration整合
Spring Integration Kafka Adapter The Spring Integration Kafka Adapter provides client components for Apache Kafka. Apache Kafka is a d
Spring 與 Kafka 整合
– Start Spring 提供了介面來整合 Kafka。請參考《》瞭解更多詳情。 – 更多參見: – 聲 明:轉載請註明出處 – Last Updated on 2018-06-14 – Written by ShangBo on 2018
Kafka——使用spring進行整合
生產者:<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w
Spring Boot整合kafka筆記
kafka官網 http://kafka.apache.org/quickstartspring-kafka當前穩定版本是1.2.0..RELEASE http://docs.spring.io/spring-kafka/docs/1.2.0.RELEASE/reference/html/_introduc