1. 程式人生 > >spring-kafka整合介紹

spring-kafka整合介紹

一 spring-kafka介紹

  1. spring-kafka是在kafka-clients的基礎上的封裝。
  2. 主要提供KafkaTemplate

二 pom 配置

   <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>1.1.6.RELEASE</version>
        </dependency
>
<dependency> <groupId>org.springframework</groupId> <artifactId>spring-messaging</artifactId> <version>4.3.2.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.retry</groupId
>
<artifactId>spring-retry</artifactId> <version>1.1.3.RELEASE</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version
>
0.10.0.0</version> </dependency>
  1. 只需要配置spring-kafka,他內部依賴了kafka-clients,可以根據spring的verison版本調整spring-kafka版號。
  2. 請不依賴kafka_2.10,spring-integration-kafka,貌似只有低版本需要。

三 producer配置

  <context:property-placeholder location="classpath:kafka.properties"/>

    <!-- 定義producer的引數   http://blog.csdn.net/molingduzun123/article/details/51785141  -->
    <bean id="producerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <entry key="bootstrap.servers" value="${bootstrap.servers}"/>
                <entry key="group.id" value="0"/>
                <entry key="retries" value="10"/>
                <entry key="batch.size" value="16384"/>
                <entry key="linger.ms" value="1"/>
                <entry key="buffer.memory" value="33554432"/>
                <entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
                <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
            </map>
        </constructor-arg>
    </bean>

    <!-- 建立kafkatemplate需要使用的producerfactory bean -->
    <bean id="producerFactory" class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
        <constructor-arg>
            <ref bean="producerProperties"/>
        </constructor-arg>
    </bean>

    <!-- 建立kafkatemplate bean,使用的時候,只需要注入這個bean,即可使用template的send訊息方法 -->
    <bean id="KafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
        <constructor-arg ref="producerFactory"/>
        <constructor-arg name="autoFlush" value="true"/>
        <property name="defaultTopic" value="page_visits5"/>
    </bean>

1.hashMap引數集合,可以根據自己需求調整。
2.KafkaTemplate提供各種豐富方法。

public class SpringKafkaProducer   extends WebBaseTest  {
    private static final Logger logger = LoggerFactory.getLogger(SnapshotController.class);
    @Autowired
    private KafkaTemplate<String, String> kafkaProductTemplate;
    @Test
    public  void assignPartitionByKey() throws Exception {
        try {
            //Assign topicName to string variable
            String topicName = "page_visits5";
            for (int i = 0; i < 50; i++) {
                for(int j=0;j<2;j++) {
                    ListenableFuture<SendResult<String, String>> future=kafkaProductTemplate.send(topicName,
                            Integer.toString(j),"ddddddddd洪10002" + i);
                    logger.info("Message sent successfully");
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

四 consumer配置

  1. consumer提供四種MessageListener
  2. MessageListener –單條訊息自動提交.
  3. BatchMessageListener –批量訊息自動提交,就是ConsumerRecords

 <!--  consumer  -->

    <!-- 定義consumer的引數 -->
    <bean id="consumerProperties" class="java.util.HashMap">
        <constructor-arg>
            <map>
                <entry key="bootstrap.servers" value="${bootstrap.servers}"/>
                <entry key="group.id" value="group-new"/>
                <entry key="enable.auto.commit" value="false"/>
                <entry key="auto.commit.interval.ms" value="1000"/>
                <entry key="session.timeout.ms" value="15000"/>
                <entry key="max.poll.records" value="10"/>
                <entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
                <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
            </map>
        </constructor-arg>
    </bean>

    <!-- 建立consumerFactory bean -->
    <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
        <constructor-arg>
            <ref bean="consumerProperties"/>
        </constructor-arg>
    </bean>


    <!-- 實際執行訊息消費的類 -->
    <bean id="messageListernerConsumerService" class="com.calm.b.kafka.service.SpringKafkaConsumerListener"/>

    <bean id="springKafkaConsumerAckListener" class="com.calm.b.kafka.service.SpringKafkaConsumerAckListener"/>




    <bean id="ackmode" class="org.springframework.beans.factory.config.FieldRetrievingFactoryBean">
        <property name="staticField"
                  value="org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE" />
    </bean>


    <!-- 消費者容器配置資訊 -->
    <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
        <constructor-arg value="page_visits6"/>
        <property name="messageListener" ref="springKafkaConsumerAckListener"/>
        <property name="pollTimeout" value="1000"/>
        <property name="ackMode" ref="ackmode"  />
    </bean>


    <!-- 建立kafkatemplate bean,使用的時候,只需要注入這個bean,即可使用template的send訊息方法 -->
    <bean id="messageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer"
          init-method="doStart">
        <constructor-arg ref="consumerFactory"/>
        <constructor-arg ref="containerProperties"/>
        <property name="concurrency" value="3"/>
    </bean>


  1. 手動確認的消費者

/**
 * @author andrexu
 * @create 2017-07-13
 */
public class SpringKafkaConsumerAckListener implements AcknowledgingMessageListener<String, String> {
    private Logger log = LoggerFactory.getLogger(KafkaConsumeService.class);
    @Override
    public void onMessage(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {

        log.info(record.toString());

        acknowledgment.acknowledge();
    }
}

  1. 自動確認消費者

/**
 * @author andrexu
 * @create 2017-07-13
 */
public class SpringKafkaConsumerListener implements MessageListener<Integer, String> {

    private Logger log = LoggerFactory.getLogger(KafkaConsumeService.class);
    @Override
    public void onMessage(ConsumerRecord<Integer, String> consumerRecord) {
        log.info("======"+consumerRecord);
    }
}

五 @KafkaListener的用法

  1. 同上面consumer消費方式的一樣,也四種不同消費方式,不同消費方式需要不同的配置支援。

  2. @KafkaListener 需要@EnableKafka來開啟配置,監聽類SpringKafkaConsumerServce類也需要在配置bean裡初始化。


/**
 * @author andrexu
 * @create 2017-07-13
 */
@Configuration
@EnableKafka
public class kafkaConfig {

    private Logger log = LoggerFactory.getLogger(kafkaConfig.class);
 @Autowired
 private DefaultKafkaConsumerFactory consumerFactory;

 @Bean
 public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>  kafkaListenerContainerFactory
         () {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.setConcurrency(1);
        factory.getContainerProperties().setPollTimeout(1000);

        log.info("init kafkaListener annotation container Factory");
        return factory;
    }



    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> consumerProps =new HashedMap();
        consumerProps.putAll(consumerFactory.getConfigurationProperties());
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return consumerProps;
    }


    @Bean
    public ConsumerFactory<String, String> manualConsumerFactory() {
        Map<String, Object> configs = consumerConfigs();
        configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        return new DefaultKafkaConsumerFactory<>(configs);
    }


    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
    kafkaManualAckListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();

        factory.setConsumerFactory(manualConsumerFactory());
        ContainerProperties props = factory.getContainerProperties();
        props.setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
        props.setIdleEventInterval(100L);
     //   factory.setRecordFilterStrategy(manualFilter());
        props.setPollTimeout(1000L);
        factory.setAckDiscarded(true);
    //    factory.setRetryTemplate(new RetryTemplate());
        factory.setRecoveryCallback(new RecoveryCallback<Void>() {

            @Override
            public Void recover(RetryContext context) throws Exception {
                return null;
            }

        });

        return factory;
    }


    /**
     * 先init他們
     * @return
     */
    @Bean
    public SpringKafkaConsumerServce kafkaListeners() {
        return new SpringKafkaConsumerServce();
    }


import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Lazy;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;

/**
 * @author andrexu
 * @create 2017-07-13
 */
public class SpringKafkaConsumerServce {

    private Logger log = LoggerFactory.getLogger(SpringKafkaConsumerServce.class);


    @KafkaListener(id = "bar5", topicPartitions =
            { @TopicPartition(topic = "page_visits5", partitions = {"0"}),
            },group="group-new"  )
    public void listen(ConsumerRecord<String, String> record) {
        log.info("only partion zero :"+ record);
    }


    @KafkaListener(id = "bar6", topicPartitions =
            { @TopicPartition(topic = "page_visits5", partitions = {"1"}),
            },group="group-new"  )
    public void listenByTwo(ConsumerRecord<String, String> record) {
        log.info("only two partion zero :"+ record);
    }


    @KafkaListener(id = "baz",topics ={"page_visits8"} ,group="group-new",
            containerFactory = "kafkaManualAckListenerContainerFactory")
    public void listenByOne(ConsumerRecord<String, String> record, Acknowledgment ack) {

        log.info("only partion one :"+ record);

        ack.acknowledge();
    }


//    @KafkaListener(id = "bar3",topics ={"page_visits8"} ,group="group-new"   )
//    public void listenByOne(ConsumerRecord<String, String> record) {
//
//        log.info("only partion one :"+ record);
//
//    }

}

相關推薦

spring-kafka整合介紹

一 spring-kafka介紹 spring-kafka是在kafka-clients的基礎上的封裝。 主要提供KafkaTemplate 二 pom 配置 <dependency> <groupId

KafkaSpring&Kafka整合

 由於某專案的訊息佇列使用了Spring整合Kafka,開發中我需要使用kafka客戶端模擬生產者和消費者。簡單瞭解了一下Kafka,掃盲貼,先標記一下,日後再深入學習。 一、Kafka簡介 1.1 簡介   kafka是一種高吞吐量的分散式釋

(一)Spring Cloud— 子項目、未來 整合介紹

springcloud 電子商務 子項目 雲架構 微服務 Spring Cloud是一系列框架的有序集合。利用Spring Boot的開發模式簡化了分布式系統基礎設施的開發,如服務發現、註冊、配置中心、消息總線、負載均衡、斷路器、數據監控等(這裏只簡單的列了一部分),都可以用Spring

Spring Cloudt整合Netflix Archaius介紹

1.概述 Netflix Archaius 是一個功能強大的配置管理庫。它是一個可用於從許多不同來源收集配置屬性的框架,提供對配置資訊的快速及執行緒安全訪問。 除此之外,Archaius允許屬性在執行時動態更改,使系統無需重新啟動應用程式即可獲得這些變化。 在這個介紹性文章中,我

Spring cloud 整合kafka

  Kafka配置 一.安裝 wget http://mirror.bit.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz  獲取當前版本 tar  -xzvf  解壓 二.配置 listeners=P

spring boot整合mongodb使用簡單介紹 spring整合mongo使用簡單介紹 spring整合mongoDB使用簡單介紹

最近在專案中使用到了mongodb,第一次用,各種百度加問大佬,簡單記錄下自己的理解,一是希望能幫助到同樣要學習mongo的同學,另外就是以後可以看一下複習複習。 簡單理解 第一步匯入mongo的依賴 <!--mongodb--> <dependen

訊息佇列之非同步訊息基本概念以及ActiveMQ整合Spring常用用法介紹

一 簡介 (1)非同步訊息: 所謂非同步訊息,跟RMI遠端呼叫、webservice呼叫是類似的,非同步訊息也是用於應用程式之間的通訊。但是它們之間的區別是: RMI、Hession/Burlap、webservice等遠端呼叫機制是同步的。也就是說,當客戶端呼叫遠端方法時,客戶端

spring boot整合kafka

1.pom.xml <dependency> <groupId> org.apache.kafka</groupId> <artifactId> kafka_2.10</artifactId&

spring boot整合shiro之shiro過濾器介紹

過濾器鏈條配置說明 1、一個URL可以配置多個Filter,使用逗號分隔 2、當設定多個過濾器時,全部驗證通過,才視為通過 3、部分過濾器可指定引數,如perms,roles Shiro內建的FilterChain anon(org.apac

Kafka整合Spring-AcknowledgeMessageListener介面實現

前言 因工作需要,需在系統利用Kafka監聽介面,實現訊息佇列中,對訊息的消費,首選Kafka,因為看中其超高的吞吐量。 基本概念 1 Producer: 特指訊息的生產者 2 Consumer :特指訊息的消費者 3 Consumer Group

spring-boot 整合kafka單節點訊息傳送與接收

springboot還處於學習階段,又同時在學習kafka,兩者結合,繼續學習。 1、官網下載kafka 2、解壓 3、對於單節點來說,按照官網上操作即可實現訊息的傳送和接收。 但是對於客戶端,是通過 @KafkaListener 註解監聽生產者傳送的訊

Spring Cloud 整合 kafka

kafka的安裝部署請參考:kafka安裝部署1、在pom.xml裡面新增kafka的maven依賴<dependency> <groupId>org.springframew

Spring整合Kafkaspring-kafka

配置檔案的方式實現spring整和kafka:    此文主要講述的內容:     1,連線kafka伺服器的配置    2,kafka-customer:消費者配置    3,kafka-provider:提供者配置    4,KfkaUtils:根據topic傳送訊息 

Spring註解方式整合Kafkaspring-kafka的使用)

import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common

SpringKafka整合實戰

首先下載解壓zookeeper,選擇合適的映象站點以加快下載速度。我們可以將zookeeper加到系統服務中, 增加一個/etc/init.d/zookeeper檔案。 cd /opt wget http://apache.fayea.com/apache-mirror/zookeeper/zookeep

kafka學習筆記(三)spring boot整合kafka0.9.0.1(使用配置類)

spring boot 版本:1.5.6引入關於kafka的相關jar         <dependency>          <groupId>org.springframework.kafka</groupI

Spring kafka Integration整合

Spring Integration Kafka Adapter The Spring Integration Kafka Adapter provides client components for Apache Kafka. Apache Kafka is a d

SpringKafka 整合

– Start Spring 提供了介面來整合 Kafka。請參考《》瞭解更多詳情。 – 更多參見: – 聲 明:轉載請註明出處 – Last Updated on 2018-06-14 – Written by ShangBo on 2018

Kafka——使用spring進行整合

生產者:<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w

Spring Boot整合kafka筆記

kafka官網 http://kafka.apache.org/quickstartspring-kafka當前穩定版本是1.2.0..RELEASE  http://docs.spring.io/spring-kafka/docs/1.2.0.RELEASE/reference/html/_introduc