1. 程式人生 > >spring boot之整合kafka

spring boot之整合kafka

前陣子專案用到了kafka了,kafka和zookeeper的安裝以及原理在我的另一篇部落格中有提到,在這我就不講了,直接講如何在spring boot專案中整合kafka。
這篇我主要講兩個方法:
方法一我們使用spring原來整合kafka的一個外掛spring-integration-kafka,主要講一下spring如何把這套框架給整合進來使用。
spring boot的基本依賴包可以上前面兩個部落格中看,我們加一個這個整合框架的依賴包

<dependency>
            <groupId>org.springframework.integration</groupId
>
<artifactId>spring-integration-kafka</artifactId> <version>1.3.0.RELEASE</version> </dependency>

2.新增spring-kafka-consumer.xml
在src/main/resources下新建檔案spring-kafka-consumer.xml;
內容為:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns
="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation
="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
<int:channel id="inputFromKafka"> <int:queue/> </int:channel> <!--<int:service-activator auto-startup="true"--> <!--input-channel="inputFromKafka" ref="kafkaConsumerService" method="receiveMessage">--> <!--</int:service-activator>--> <!-- ʏЂ}ז·½ʽ¶¼¿ʒҠ--> <!-- ʹԃkafkaConsumerService4½ԊԫafkaлϢ --> <bean id="kafkaConsumerService" class="com.oscar.kafkaTest.service.impl.KafkaConsumerServiceImpl" /> <int:outbound-channel-adapter channel="inputFromKafka" ref="kafkaConsumerService" method="processMessage" auto-startup="true"/> <int:poller default="true" id="default" fixed-rate="5" time-unit="MILLISECONDS" max-messages-per-poll="5"> </int:poller> <int-kafka:inbound-channel-adapter kafka-consumer-context-ref="consumerContext" channel="inputFromKafka"> </int-kafka:inbound-channel-adapter> <bean id="consumerProperties" class="org.springframework.beans.factory.config.PropertiesFactoryBean"> <property name="properties"> <props> <prop key="auto.offset.reset">smallest</prop> <prop key="socket.receive.buffer.bytes">10485760</prop> <!-- 10M --> <prop key="fetch.message.max.bytes">5242880</prop> <prop key="auto.commit.interval.ms">1000</prop> </props> </property> </bean> <!-- agent_log/msg_log --> <int-kafka:consumer-context id="consumerContext" consumer-timeout="4000" zookeeper-connect="zookeeperConnect" consumer-properties="consumerProperties"> <int-kafka:consumer-configurations> <int-kafka:consumer-configuration group-id="log-monitor" max-messages="500"> <int-kafka:topic id="agent-log" streams="4"/> <int-kafka:topic id="msg-log" streams="4"/> <int-kafka:topic id="task-log" streams="4"/> </int-kafka:consumer-configuration> </int-kafka:consumer-configurations> </int-kafka:consumer-context> <int-kafka:zookeeper-connect id="zookeeperConnect" zk-connect="192.168.XX.XX:3181" zk-connection-timeout="6000" zk-session-timeout="400" zk-sync-time="200"/> </beans>

我們可以更改配置檔案中自己的資訊,主要改兩個地方

 <bean id="kafkaConsumerService" class="com.oscar.kafkaTest.service.impl.KafkaConsumerServiceImpl" />

這個對應的是你寫的處理訊息的方法,裡面定義

@Service
public class KafkaConsumerServiceImpl{

    public void processMessage(Map<String, Map<Integer, List<byte[]>>> msgs) {
        System.out.println(msgs);
    }

    public void process(String message) {

        System.out.println(message);

    }   
}

注意接收的格式;
還有一個地方是更改ip
3.注入檔案
在application中添加註解

@Configuration
@ImportResource(locations={"classpath:spring-kafka-consumer.xml"})

方法二
直接用spring boot的預設配置方法
1.在application.properties中新增:

spring.kafka.bootstrap-servers=192.168.101.16:9092

spring.kafka.templated.default-topic=test
spring.kafka.consumer.group-id=default3

#spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
#spring.kafka.producer.value-serizlizer=org.apache.kafka.common.serialization.StringSerializer

2.新增處理方法:

@Component
public class Receiver {

    @KafkaListener(topics="test")
    public void processMessage(String msgs){
        System.out.println("aa");
        System.out.println(msgs);
    }
}