Kafka——使用spring進行整合
阿新 • • 發佈:2019-02-13
生產者:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <!-- 定義producer的引數 --> <bean id="producerProperties" class="java.util.HashMap"> <constructor-arg> <map> <entry key="bootstrap.servers" value="10.0.1.72:9092,10.0.1.73:9092,10.0.1.74:9092,10.0.1.75:9092" /> <entry key="group.id" value="0" /> <entry key="retries" value="1" /> <entry key="batch.size" value="16384" /> <entry key="linger.ms" value="1" /> <entry key="buffer.memory" value="33554432" /> <entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer" /> <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer" /> </map> </constructor-arg> </bean> <!-- 建立kafkatemplate需要使用的producerfactory bean --> <bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory"> <constructor-arg> <ref bean="producerProperties" /> </constructor-arg> </bean> <!-- 建立kafkatemplate bean,使用的時候,只需要注入這個bean,即可使用template的send訊息方法 --> <bean id="KafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate"> <constructor-arg ref="producerFactory" /> <constructor-arg name="autoFlush" value="true" /> <property name="defaultTopic" value="defaultTopic" /> <!-- <property name="producerListener" ref="producerListener"/>--> </bean> <!-- <bean id="producerListener" class="KafkaTest.KafkaProducerListener" />--> </beans>
消費者:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd"> <!-- 定義consumer的引數 --> <bean id="consumerProperties" class="java.util.HashMap"> <constructor-arg> <map> <entry key="bootstrap.servers" value="10.0.1.72:9092,10.0.1.73:9092,10.0.1.74:9092,10.0.1.75:9092"/> <entry key="group.id" value="0"/> <entry key="enable.auto.commit" value="true"/> <entry key="auto.commit.interval.ms" value="1000"/> <entry key="session.timeout.ms" value="30000"/> <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/> <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/> </map> </constructor-arg> </bean> <!-- 建立consumerFactory bean --> <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory"> <constructor-arg> <ref bean="consumerProperties"/> </constructor-arg> </bean> <!-- 實際執行訊息消費的類 --> <bean id="messageListernerConsumerService" class="KafkaTest.KafkaConsumerServer"/> <!-- 消費者容器配置資訊 --> <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties"> <constructor-arg value="defaultTopic"/> <property name="messageListener" ref="messageListernerConsumerService"/> </bean> <!-- 建立messageListenerContainer bean,使用的時候,只需要注入這個bean --> <bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart"> <constructor-arg ref="consumerFactory"/> <constructor-arg ref="containerProperties"/> </bean> </beans>
生產者傳送訊息程式碼:
首先注入:
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
之後進行傳送:
ListenableFuture<SendResult<String, String>> result = kafkaTemplate.send(topic, key, valueString);
消費端:
/** * kafka消費 * Created by liuhuichao on 2017/5/12. */ public class KafkaConsumerServer implements MessageListener<String, String> { private Logger logger = Logger.getLogger(KafkaConsumerServer.class); @Override public void onMessage(ConsumerRecord<String, String> record) { logger.info("KafkaConsumerServer=============kafkaConsumer開始消費============="); String topic = record.topic(); String key = record.key(); String value = record.value(); long offset = record.offset(); int partition = record.partition(); logger.info("KafkaConsumerServer-------------topic:"+topic); logger.info("KafkaConsumerServer-------------value:"+value); logger.info("KafkaConsumerServer-------------key:"+key); logger.info("KafkaConsumerServer-------------offset:"+offset); logger.info("KafkaConsumerServer-------------partition:"+partition); logger.info("~~~~~~~~~~~~~kafkaConsumer消費結束~~~~~~~~~~~~~"); System.out.println("消費成功***************************************************************"); } }
除了整合kafka,熟悉spring的同志們,可能還用spring整合過redis,hbase這些東西,可以完全放心把連線什麼的都交給spring,自己只關注該關注的東西。
ps: