1. 程式人生 > >使用spring整合的kafka收發訊息

使用spring整合的kafka收發訊息

1. 引入maven依賴

<dependency>
	<groupId>org.springframework.integration</groupId>
	<artifactId>spring-integration-kafka</artifactId>
	<version>${spring-integration-kafka.version}</version>
</dependency>

2. 生產者的xml配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
       xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
		http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
		http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">

    <int:channel id="outWriteBackLemmaRecordChannel" />

    <int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
                                        kafka-template="kafkaTemplate"
                                        auto-startup="true"
                                        channel="outWriteBackLemmaRecordChannel"
                                        order="3"
                                        topic="writeBackLemmaRecordTopic">
        <int-kafka:request-handler-advice-chain>
            <bean class="org.springframework.integration.handler.advice.RequestHandlerCircuitBreakerAdvice" />
        </int-kafka:request-handler-advice-chain>
    </int-kafka:outbound-channel-adapter>

    <bean id="kafkaTemplate" class="org.springframework.kafka.core.KafkaTemplate">
        <constructor-arg>
            <bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
                <constructor-arg>
                    <map>
                        <entry key="bootstrap.servers" value="1.1.1.1:9092,2.2.2.2:9092"/>
                        <entry key="retries" value="10"/>
                        <entry key="batch.size" value="16384"/>
                        <entry key="linger.ms" value="1"/>
                        <entry key="buffer.memory" value="33554432"/>
                        <entry key="key.serializer" value="org.apache.kafka.common.serialization.IntegerSerializer"/>
                        <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/>
                    </map>
                </constructor-arg>
            </bean>
        </constructor-arg>
        <constructor-arg name="autoFlush" value="true"/>
        <property name="defaultTopic" value="writeBackLemmaRecordTopic"/>
    </bean>

    <bean id="kafkaProducerService"
          class="com.soso.baike.admin.service.kafka.producer.impl.KafkaProducerServiceImpl"/>
</beans>

針對DefaultKafkaProducerFactory 的引數,本公司其實是配置註冊到了zookeeper上,針對開發環境,預發環境,線上環境的配置是不同的,所以zookeeper上分別針對不同的環境註冊了三套配置檔案,釋出的時候,會根據要釋出的環境去zookeeper上拉取對應環境的配置檔案,從而填充DefaultKafkaProducerFactory的構造引數

3. 傳送訊息

傳送訊息是上述配置檔案中配置的KafkaProducerServiceImpl類

package com.soso.baike.admin.service.kafka.producer.impl;

import com.soso.baike.admin.service.kafka.producer.IKafkaProducerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;


public class KafkaProducerServiceImpl implements IKafkaProducerService {
    private Logger logger = LoggerFactory.getLogger("kafka");

    @Autowired
    private KafkaTemplate<Integer, String> kafkaTemplate;<span style="white-space:pre">	</span>//這個已經在上述xml檔案中配置

    @Override
    public void sendMessage(String topic, String data) {
        logger.info("the message is to be send by kafka is : topic = {}, data = {}", topic, data);
        kafkaTemplate.setDefaultTopic(topic);
        kafkaTemplate.sendDefault(data);
    }

    @Override
    public void sendMessage(String topic, int key, String data) {
        logger.info("the message is to be send by kafka is : topic = {}, data = {}", topic, data);
        kafkaTemplate.setDefaultTopic(topic);
        kafkaTemplate.sendDefault(key, data);
    }
}

4.  消費者xml配置
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
		http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd">

    <int-kafka:message-driven-channel-adapter
            id="kafkaMessageDrivenChannelAdapter"
            listener-container="kafkaMessageListenerContainer"
            auto-startup="true"
            phase="100"
            send-timeout="5000"
            channel="nullChannel"
            message-converter="messagingMessageConverter"
            error-channel="errorChannel"/>

    <bean id="messagingMessageConverter" class="org.springframework.kafka.support.converter.MessagingMessageConverter"/>

    <bean id="kafkaMessageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer">
        <constructor-arg>
            <bean class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
                <constructor-arg>
                    <map>
                        <entry key="bootstrap.servers" value="${kafka.consumer.bootstrap.servers}"/>
                        <entry key="group.id" value="${kafka.consumer.group.id}"/>
                        <entry key="enable.auto.commit" value="${kafka.consumer.enable.auto.commit}"/>
                        <entry key="auto.commit.interval.ms" value="${kafka.consumer.auto.commit.interval.ms}"/>
                        <entry key="session.timeout.ms" value="${kafka.consumer.session.timeout.ms}"/>
                        <entry key="key.deserializer" value="${kafka.consumer.key.deserializer}"/>
                        <entry key="value.deserializer" value="${kafka.consumer.value.deserializer}"/>
                    </map>
                </constructor-arg>
            </bean>
        </constructor-arg>
        <constructor-arg>
            <bean class="org.springframework.kafka.listener.config.ContainerProperties">
                <constructor-arg name="topics" value="writeBackLemmaRecordTopic"/>
            </bean>
        </constructor-arg>
    </bean>

    <!-- 實際執行訊息消費的類 -->
    <bean id="kafkaConsumerService"
          class="com.soso.baike.admin.service.kafka.consumer.impl.KafkaConsumerServiceImpl"/>
</beans>

上述DefaultKafkaConsumerFactory的構造引數就是在配置檔案中配置的,這裡你可以直接替換成實際的引數而不用配置檔案

5. 接收訊息類是上述配置檔案中配置的KafkaConsumerServiceImpl類,程式碼如下:

package com.soso.baike.admin.service.kafka.consumer.impl;

import com.soso.baike.admin.constant.KafkaConstants;
import com.soso.baike.admin.lmaimp.DummyUser;
import com.soso.baike.admin.service.kafka.consumer.IKafkaConsumerService;
import com.soso.baike.audit.Auditors;
import com.soso.baike.audit.db.LemmaAuditDao;
import com.soso.baike.audit.lemma.LemmaRecord;
import com.soso.baike.audit.lemma.LemmaWriteBackOp;
import com.soso.baike.domain.IdConvert;
import org.apache.commons.lang.math.NumberUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.config.ContainerProperties;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * Created by zhangyongguang on 2016/6/30.
 */
public class KafkaConsumerServiceImpl implements IKafkaConsumerService, InitializingBean {
    private Logger logger = LoggerFactory.getLogger("kafka");

    @Autowired
    private KafkaMessageListenerContainer kafkaMessageListenerContainer;
    @Autowired
    private LemmaWriteBackOp lemmaWriteBackOp;

    private int threadNum = 8;
    private int maxQueueSize = 2000;
    private ExecutorService executorService = new ThreadPoolExecutor(threadNum,
            threadNum, 0L, TimeUnit.MILLISECONDS,
            new ArrayBlockingQueue<Runnable>(maxQueueSize),
            new ThreadPoolExecutor.CallerRunsPolicy());

    @Override
    public void onMessage(ConsumerRecord<Integer, String> record) {
        logger.info("===============processMessage===============");
        logger.info("the kafka message is arriving with topic = {}, partition = {}, key = {}, value = {}",
                new Object[]{record.topic(), record.partition(), record.key(), record.value()});
<span style="white-space:pre">	</span>//這裡收到訊息後,開啟了一個執行緒來處理<span style="white-space:pre">	</span>
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                String msg = record.value();
                
            }
        });
    }

    @Override<span style="white-space:pre">	</span>//設定監聽
    public void afterPropertiesSet() throws Exception {
        ContainerProperties containerProperties = kafkaMessageListenerContainer.getContainerProperties();

        if (null != containerProperties) {
            containerProperties.setMessageListener(this);
        }
    }
}


相關推薦

SPRING 整合 KAFKA 傳送訊息

準備工作 1.安裝kafka+zookeeper環境  2.利用命令建立好topic,建立一個topic my-topic 整合步驟 1.配置生產者 <?xml version="1.0" encoding="UTF-8"?> <beans xmlns=

使用spring整合kafka收發訊息

1. 引入maven依賴 <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-kaf

Java架構-spring+springmvc+kafka分散式訊息中介軟體整合方案

Honghu的訊息服務平臺已經拋棄了之前的ActiveMQ,改用高吞吐量比較大的Kafka分散式訊息中介軟體方案: kafka訊息平臺使用spring+kafka的整合方案,詳情如下: 使用最高版本2.1.0.RELEASE整合jar包:spring-integration

Spring整合Kafka中的事務

       原文連結:https://docs.spring.io/spring-kafka/reference/htmlsingle/#transactions 事務Transactions  &nb

SpringBoot開發案例之整合Kafka實現訊息佇列

  前言   最近在做一款秒殺的案例,涉及到了同步鎖、資料庫鎖、分散式鎖、程序內佇列以及分散式訊息佇列,這裡對SpringBoot整合Kafka實現訊息佇列做一個簡單的記錄。   Kafka簡介   Kafka是由Apache軟體基金會開發的一個開源流處理平臺,由Scala和Java編寫

SpringBoot整合Kafka實現訊息上報

一、該篇部落格使用技術版本 SpringBoot:1.5.9.RELEASE zookeeper:zookeeper-3.4.5 kafka:kafka_2.10-0.10.2.1 二、SpringBoot整合Kafka 1、首先修改kafka的server.proper

第 22 將 Spring整合RestTemplate消費訊息

第二十二將 Spring整合RestTemplate消費訊息 1.前言 2.Spring整合RedisTemplate 2.1 引入依賴:pom.xml <dependency>

spring整合rabbitmq實現訊息佇列

RabbitTemplate概念瞭解 RabbitMQ是由Erlang(愛立信公司)語言開發,實現Advanced Message Queuing Protocol (AMQP高階訊息佇列協議)的訊息中介軟體。訊息中介軟體主要用於元件之間的解耦,訊息的傳送者無需知道

spring整合kfka實現訊息佇列

1.搭建kafka執行環境 ,下載地址:http://kafka.apache.org/downloads 下載完修改安裝檔案config裡面的server.properties檔案,將listeners=PLAINTEXT://IP:9092改成伺服器的IP,修改

在springboot專案中整合kafka收發message

、先解決依賴 springboot相關的依賴我們就不提了,和kafka相關的只依賴一個spring-kafka整合包 <dependency> <groupId>org.springframework.kafka<

spring 整合 kafka

使用spring-integration-kafka傳送訊息 Outbound Channel Adapter用來發送訊息到Kafka。 訊息從Spring Integration Channel中讀取。 你可以在Spring application context

springboot整合kafka實現訊息的生產與消費--訊息的生產

由於工作需要,最近在研究springboot整合kafka。做一個分散式的同步應用程式。springboot整合kafka須注意版本。對於springboot 1.5版本之前的話,需要自己去配置java configuration,而1.5版本以後則提供了auto confi

RabbitMQ學習(十一)之spring整合傳送非同步訊息

實現使用Exchange型別為DirectExchange. routingkey的名稱預設為Queue的名稱。非同步傳送訊息。 1.配置檔案 [plain] view plain copy print? #============== rabbitmq co

Spring整合Kafkaspring-kafka

配置檔案的方式實現spring整和kafka:    此文主要講述的內容:     1,連線kafka伺服器的配置    2,kafka-customer:消費者配置    3,kafka-provider:提供者配置    4,KfkaUtils:根據topic傳送訊息 

spring整合kafka例項

1.依賴jar <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="

Spring 整合Kafka(完整版)

前面的文章我們已經完成了Kafka基於Zookeeper的叢集的搭建了。Kafka叢集搭建請點我。記過幾天的研究已經實現Spring的集成了。本文重點 jar包準備 整合是基於spring-i

spring 整合kafka監聽消費

前言 最近專案裡有個需求,要消費kafka裡的資料。之前也手動寫過程式碼去消費kafka資料。但是轉念一想。既然spring提供了消費kafka的方法。就沒必要再去重複造輪子。於是嘗試使用spring的API。 專案技術背景,使用springMVC,XML配置和註解相互使用。kafka的配置都是使用XML方式

spring-boot 整合kafka單節點訊息傳送與接收

springboot還處於學習階段,又同時在學習kafka,兩者結合,繼續學習。 1、官網下載kafka 2、解壓 3、對於單節點來說,按照官網上操作即可實現訊息的傳送和接收。 但是對於客戶端,是通過 @KafkaListener 註解監聽生產者傳送的訊

SpringBoot整合Kafka:簡單收發訊息案例

環境說明 Windows 10 1709 IDEA 2017.3.2 SpringBoot 2.0.M7 Spring-Kafka 2.1.0.RELEASE JDK 1.8.0_144 Maven 3.5.0 阿里雲ECS CentOS 7 Kafka

spring整合JMS一同步收發訊息(基於ActiveMQ的實現)

1. 安裝ActiveMQ 注意:JDK版本需要1.7及以上才行 bin目錄結構如下: 如果我們是32位的機器,就雙擊win32目錄下的activemq.bat,如果是64位機器,則雙擊win64目錄下的activemq.bat,執行結果如下: 啟動成功!成功之後