spring boot之整合kafka
阿新 • • 發佈:2019-02-19
前陣子專案用到了kafka了,kafka和zookeeper的安裝以及原理在我的另一篇部落格中有提到,在這我就不講了,直接講如何在spring boot專案中整合kafka。
這篇我主要講兩個方法:
方法一我們使用spring原來整合kafka的一個外掛spring-integration-kafka,主要講一下spring如何把這套框架給整合進來使用。
spring boot的基本依賴包可以上前面兩個部落格中看,我們加一個這個整合框架的依賴包
<dependency>
<groupId>org.springframework.integration</groupId >
<artifactId>spring-integration-kafka</artifactId>
<version>1.3.0.RELEASE</version>
</dependency>
2.新增spring-kafka-consumer.xml
在src/main/resources下新建檔案spring-kafka-consumer.xml;
內容為:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns ="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation ="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
<int:channel id="inputFromKafka">
<int:queue/>
</int:channel>
<!--<int:service-activator auto-startup="true"-->
<!--input-channel="inputFromKafka" ref="kafkaConsumerService" method="receiveMessage">-->
<!--</int:service-activator>-->
<!-- ʏЂ}ז·½ʽ¶¼¿ʒҠ-->
<!-- ʹԃkafkaConsumerService4½ԊԫafkaлϢ -->
<bean id="kafkaConsumerService" class="com.oscar.kafkaTest.service.impl.KafkaConsumerServiceImpl" />
<int:outbound-channel-adapter channel="inputFromKafka"
ref="kafkaConsumerService" method="processMessage" auto-startup="true"/>
<int:poller default="true" id="default" fixed-rate="5"
time-unit="MILLISECONDS" max-messages-per-poll="5">
</int:poller>
<int-kafka:inbound-channel-adapter
kafka-consumer-context-ref="consumerContext" channel="inputFromKafka">
</int-kafka:inbound-channel-adapter>
<bean id="consumerProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean">
<property name="properties">
<props>
<prop key="auto.offset.reset">smallest</prop>
<prop key="socket.receive.buffer.bytes">10485760</prop>
<!-- 10M -->
<prop key="fetch.message.max.bytes">5242880</prop>
<prop key="auto.commit.interval.ms">1000</prop>
</props>
</property>
</bean>
<!-- agent_log/msg_log -->
<int-kafka:consumer-context id="consumerContext"
consumer-timeout="4000" zookeeper-connect="zookeeperConnect"
consumer-properties="consumerProperties">
<int-kafka:consumer-configurations>
<int-kafka:consumer-configuration
group-id="log-monitor" max-messages="500">
<int-kafka:topic id="agent-log" streams="4"/>
<int-kafka:topic id="msg-log" streams="4"/>
<int-kafka:topic id="task-log" streams="4"/>
</int-kafka:consumer-configuration>
</int-kafka:consumer-configurations>
</int-kafka:consumer-context>
<int-kafka:zookeeper-connect id="zookeeperConnect"
zk-connect="192.168.XX.XX:3181" zk-connection-timeout="6000"
zk-session-timeout="400" zk-sync-time="200"/>
</beans>
我們可以更改配置檔案中自己的資訊,主要改兩個地方
<bean id="kafkaConsumerService" class="com.oscar.kafkaTest.service.impl.KafkaConsumerServiceImpl" />
這個對應的是你寫的處理訊息的方法,裡面定義
@Service
public class KafkaConsumerServiceImpl{
public void processMessage(Map<String, Map<Integer, List<byte[]>>> msgs) {
System.out.println(msgs);
}
public void process(String message) {
System.out.println(message);
}
}
注意接收的格式;
還有一個地方是更改ip
3.注入檔案
在application中添加註解
@Configuration
@ImportResource(locations={"classpath:spring-kafka-consumer.xml"})
方法二
直接用spring boot的預設配置方法
1.在application.properties中新增:
spring.kafka.bootstrap-servers=192.168.101.16:9092
spring.kafka.templated.default-topic=test
spring.kafka.consumer.group-id=default3
#spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
#spring.kafka.producer.value-serizlizer=org.apache.kafka.common.serialization.StringSerializer
2.新增處理方法:
@Component
public class Receiver {
@KafkaListener(topics="test")
public void processMessage(String msgs){
System.out.println("aa");
System.out.println(msgs);
}
}