1. 程式人生 > >Kafka和Spring整合實踐

Kafka和Spring整合實踐

目錄

  1. 安裝Zookeeper
  2. 安裝Kafka
  3. 建立一個Spring專案
  4. 使用Producer API傳送訊息到Kafka
  5. 使用Kafka High Level API接收訊息
  6. 使用spring-integration-kafka傳送訊息
  7. 使用spring-integration-kafka接收訊息

    本文以單機的環境演示如何將Kafka和Spring整合。
    單機的環境最容易搭建, 並且只需在自己的PC上執行即可, 不需要很多的硬體環境,便於學習。 況且,本文的目的不是搭建ZooKeeper的叢集環境, 而是重點介紹Kafka和Spring的應用。
    具體的軟體環境如下:

OS: CentOS 6.4
Zookepper: zookeeper-3.4.6
Kafka: kafka_2.9.1-0.8.2-beta
Java: JDK 1.7.0_45-b18
Spring:4.0.6
本例子在我的這個環境中執行正常, 全部程式碼可以到 github 下載。

本文所有的作業系統使用者都是root。 實際產品中可能安全標準需要特定的使用者如zookeeper, kafka等。

安裝Zookeeper

首先下載解壓zookeeper,選擇合適的映象站點以加快下載速度。
我們可以將zookeeper加到系統服務中, 增加一個/etc/init.d/zookeeper檔案。

cd /opt
wget  http://apache.fayea.com/apache-mirror/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz
tar zxvf zookeeper-3.4.6.tar.gz
vi /etc/init.d
/zookeeper
......
start() {
  echo -n $"Starting $desc (zookeeper): "
  daemon --user root /opt/zookeeper-3.4.6/zkServer.sh start
  RETVAL=$?
  echo
  [ $RETVAL -eq 0 ] && touch /var/lock/subsys/zookeeper
  return $RETVAL
}
stop() {
  echo -n $"Stopping $desc (zookeeper): "
  daemon --user root /opt/zookeeper-3.4
.6/zkServer.sh stop RETVAL=$? sleep 5 echo [ $RETVAL -eq 0 ] && rm -f /var/lock/subsys/zookeeper $PIDFILE } ......
chmod 755 /etc/init.d/zookeeper
service zookeeper start

如果你不想加到服務,也可以直接執行zookeeper。

/opt/zookeeper-3.4.6/zkServer.sh start

安裝Kafka

從合適的映象站點下載最新的kafka並解壓。
wget http://apache.01link.hk/kafka/0.8.2-beta/kafka_2.9.1-0.8.2-beta.tgz
tar zxvf kafka_2.9.1-0.8.2-beta.tgz
cd kafka_2.9.1-0.8.2-beta
啟動Kafka:
bin/kafka-server-start.sh config/server.properties
建立一個test的topic:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
可以利用kafka的命令啟動一個生產者和消費者試驗一下:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is a message
This is another message

更多的介紹可以檢視我翻譯整理的 Kafka快速入門

建立一個Spring專案

以上的準備環境完成,讓我們開始建立一個專案。
以前我寫過一篇簡單介紹: Spring 整合 Kafka.
spring-integration-kafka這個官方框架我就不介紹了。 我們主要使用它做整合。

首先我們先看一下使用Kafka自己的Producer/Consumer API傳送/接收訊息的例子。

使用Producer API傳送訊息到Kafka

OK,現在我們先看一個使用Kafka 自己的producer API傳送訊息的例子:

public class NativeProducer {
    public static void main(String[] args) {
        String topic= "test";
        long events = 100;
        Random rand = new Random();

        Properties props = new Properties();
        props.put("metadata.broker.list", "localhost:9092");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("request.required.acks", "1");

        ProducerConfig config = new ProducerConfig(props);

        Producer<String, String> producer = new Producer<String, String>(config);

        for (long nEvents = 0; nEvents < events; nEvents++) {                
               String msg = "NativeMessage-" + rand.nextInt() ; 
               KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, nEvents + "", msg);
               producer.send(data);
        }
        producer.close();
    }
}

這個例子中首先初始化Producer物件,指定相應的broker和serializer, 然後傳送100個字串訊息給Kafka。

執行mvn package編譯程式碼,執行檢視結果:

java -cp target/lib/*:target/spring-kafka-demo-0.2.0-SNAPSHOT.jar com.colobu.spring_kafka_demo.NativeProducer

上面的消費者控制檯視窗會打印出收到的訊息:

......
NativeMessage--1645592376
NativeMessage-534168193
NativeMessage--1899432197
NativeMessage-1642480773
NativeMessage--911267171
NativeMessage-251458151
NativeMessage--55710397
NativeMessage-455515562
NativeMessage-1108982916
NativeMessage--1710296834
NativeMessage-2102648373
NativeMessage-499979365
NativeMessage--1200107003
NativeMessage-1184836299
NativeMessage--1161123005
NativeMessage-912582115
NativeMessage--1557863408
NativeMessage--1036456356
......

使用Kafka High Level API接收訊息

用High level Consumer API接收訊息

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class NativeConsumer {
    private final ConsumerConnector consumer;
    private final String topic;
    private ExecutorService executor;
    public NativeConsumer(String a_zookeeper, String a_groupId, String a_topic) {
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper, a_groupId));
        this.topic = a_topic;
    }
    public void shutdown() {
        if (consumer != null)
            consumer.shutdown();
        if (executor != null)
            executor.shutdown();
    }
    public void run(int a_numThreads) {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(a_numThreads));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
        // now launch all the threads
        //
        executor = Executors.newFixedThreadPool(a_numThreads);
        // now create an object to consume the messages
        //
        int threadNumber = 0;
        for (final KafkaStream stream : streams) {
            executor.submit(new ConsumerTest(stream, threadNumber));
            threadNumber++;
        }
    }
    private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
        Properties props = new Properties();
        props.put("zookeeper.connect", a_zookeeper);
        props.put("group.id", a_groupId);
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        return new ConsumerConfig(props);
    }
    public static void main(String[] args) {
        String zooKeeper = "localhost:2181";
        String groupId = "mygroup";
        String topic = "test";
        int threads = 1;
        NativeConsumer example = new NativeConsumer(zooKeeper, groupId, topic);
        example.run(threads);
        try {
            Thread.sleep(10000);
        } catch (InterruptedException ie) {
        }
        //example.shutdown();
    }
}
class ConsumerTest implements Runnable {
    private KafkaStream m_stream;
    private int m_threadNumber;

    public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
        m_threadNumber = a_threadNumber;
        m_stream = a_stream;
    }

    public void run() {
        ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
        while (it.hasNext())
            System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));
        System.out.println("Shutting down Thread: " + m_threadNumber);
    }
}

在生產者控制檯輸入幾條訊息,可以看到執行這個例子的控制檯可以將這些訊息打印出來。

教程的程式碼中還包括一個使用Simple Consumer API接收訊息的例子。 因為spring-integration-kafka不支援這種API,這裡也不列出對比程式碼了。

使用spring-integration-kafka傳送訊息

Outbound Channel Adapter用來發送訊息到Kafka。 訊息從Spring Integration Channel中讀取。 你可以在Spring application context指定這個channel。
一旦配置好這個Channel,就可以利用這個Channel往Kafka發訊息。 明顯地,Spring Integration特定的訊息傳送給這個Adaptor,然後傳送前在內部被轉為Kafka訊息。當前的版本要求你必須指定訊息key和topic作為頭部資料 (header),訊息作為有載荷(payload)。
例如

final MessageChannel channel = ctx.getBean("inputToKafka", MessageChannel.class);
channel.send(
        MessageBuilder.withPayload(payload) //設定有效載荷
                .setHeader("messageKey", "key") //指定key
                .setHeader("topic", "test").build()); /指定topic/

實際程式碼如下:

import java.util.Random;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
public class Producer {
    private static final String CONFIG = "/context.xml";
    private static Random rand = new Random();
    public static void main(String[] args) {
        final ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext(CONFIG, Producer.class);
        ctx.start();
        final MessageChannel channel = ctx.getBean("inputToKafka", MessageChannel.class);
        for (int i = 0; i < 100; i++) {
            channel.send(MessageBuilder.withPayload("Message-" + rand.nextInt()).setHeader("messageKey", String.valueOf(i)).setHeader("topic", "test").build());
        }
        try {
            Thread.sleep(100000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        ctx.close();
    }
}

Spring 配置檔案:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
       xmlns:task="http://www.springframework.org/schema/task"
       xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
    <int:channel id="inputToKafka">
        <int:queue/>
    </int:channel>
    <int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
                                        kafka-producer-context-ref="kafkaProducerContext"
                                        auto-startup="false"
                                        channel="inputToKafka"
                                        order="3"
            >
        <int:poller fixed-delay="1000" time-unit="MILLISECONDS" receive-timeout="0" task-executor="taskExecutor"/>
    </int-kafka:outbound-channel-adapter>
    <task:executor id="taskExecutor" pool-size="5" keep-alive="120" queue-capacity="500"/>
    <bean id="producerProperties"
        class="org.springframework.beans.factory.config.PropertiesFactoryBean">
        <property name="properties">
            <props>
                <prop key="topic.metadata.refresh.interval.ms">3600000</prop>
                <prop key="message.send.max.retries">5</prop>
                <prop key="serializer.class">kafka.serializer.StringEncoder</prop>
                <prop key="request.required.acks">1</prop>
            </props>
        </property>
    </bean>
    <int-kafka:producer-context id="kafkaProducerContext"
        producer-properties="producerProperties">
        <int-kafka:producer-configurations>
            <int-kafka:producer-configuration broker-list="localhost:9092"
                       topic="test"
                       compression-codec="default"/>
        </int-kafka:producer-configurations>
    </int-kafka:producer-context>
</beans>

int:channel是配置Spring Integration Channel, 此channel基於queue。
int-kafka:outbound-channel-adapter是outbound-channel-adapter物件, 內部使用一個執行緒池處理訊息。關鍵是kafka-producer-context-ref。
int-kafka:producer-context配置producer列表,要處理的topic,這些Producer最終要轉換成Kafka的Producer。

producer的配置引數如下:

broker-list             List of comma separated brokers that this producer connects to
topic                   Topic name or Java regex pattern of topic name
compression-codec       Compression method to be used. Default is no compression. Supported compression codec are gzip and snappy.
                        Anything else would result in no compression
value-encoder           Serializer to be used for encoding messages.
key-encoder             Serializer to be used for encoding the partition key
key-class-type          Type of the key class. This will be ignored if no key-encoder is provided
value-class-type        Type of the value class. This will be ignored if no value-encoder is provided.
partitioner             Custom implementation of a Kafka Partitioner interface.
async                   True/False - default is false. Setting this to true would make the Kafka producer to use
                        an async producer
batch-num-messages      Number of messages to batch at the producer. If async is false, then this has no effect.

value-encoder 和key-encoder可以是其它實現了Kafka Encoder介面的Bean。同樣partitioner也是實現了Kafka的Partitioner介面的Bean。
一個Encoder的例子:

<bean id="kafkaEncoder" class="org.springframework.integration.kafka.serializer.avro.AvroSpecificDatumBackedKafkaEncoder">
    <constructor-arg value="com.company.AvroGeneratedSpecificRecord" />
</bean>

Spring Integration Kafka 也提供了個基於Avro的Encoder。 Avro也是Apache的一個專案, 在大資料處理時也是一個常用的序列化框架。
不指定Encoder將使用Kafka預設的Encoder (kafka.serializer.DefaultEncoder, byte[] –> same byte[])。

使用spring-integration-kafka接收訊息

同樣的原理實現一個消費者:

package com.colobu.spring_kafka_demo;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.LoggerFactory;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.messaging.Message;
import ch.qos.logback.classic.Level;
public class Consumer {
    private static final String CONFIG = "/consumer_context.xml";
    private static Random rand = new Random();
    @SuppressWarnings({ "unchecked", "unchecked", "rawtypes" })
    public static void main(String[] args) {
        ch.qos.logback.classic.Logger rootLogger = (ch.qos.logback.classic.Logger)LoggerFactory.getLogger(ch.qos.logback.classic.Logger.ROOT_LOGGER_NAME);
        rootLogger.setLevel(Level.toLevel("info"));

        final ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext(CONFIG, Consumer.class);
        ctx.start();
        final QueueChannel channel = ctx.getBean("inputFromKafka", QueueChannel.class);
        Message msg;        
        while((msg = channel.receive()) != null) {
            HashMap map = (HashMap)msg.getPayload();
            Set<Map.Entry> set = map.entrySet();
            for (Map.Entry entry : set) {
                String topic = (String)entry.getKey();
                System.out.println("Topic:" + topic);
                ConcurrentHashMap<Integer,List<byte[]>> messages = (ConcurrentHashMap<Integer,List<byte[]>>)entry.getValue();
                Collection<List<byte[]>> values = messages.values();
                for (Iterator<List<byte[]>> iterator = values.iterator(); iterator.hasNext();) {
                    List<byte[]> list = iterator.next();
                    for (byte[] object : list) {
                        String message = new String(object);
                        System.out.println("\tMessage: " + message);
                    }

                }

            }

        }

        try {
            Thread.sleep(100000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        ctx.close();
    }
}

Spring的配置檔案如下:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
    xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
    xmlns:task="http://www.springframework.org/schema/task"
    xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
    <int:channel id="inputFromKafka">
        <int:queue/>
    </int:channel>

    <int-kafka:inbound-channel-adapter
        id="kafkaInboundChannelAdapter" kafka-consumer-context-ref="consumerContext"
        auto-startup="false" channel="inputFromKafka">
        <int:poller fixed-delay="10" time-unit="MILLISECONDS"
            max-messages-per-poll="5" />
    </int-kafka:inbound-channel-adapter>
    <bean id="consumerProperties"
        class="org.springframework.beans.factory.config.PropertiesFactoryBean">
        <property name="properties">
            <props>
                <prop key="auto.offset.reset">smallest</prop>
                <prop key="socket.receive.buffer.bytes">10485760</prop> <!-- 10M -->
                <prop key="fetch.message.max.bytes">5242880</prop>
                <prop key="auto.commit.interval.ms">1000</prop>
            </props>
        </property>
    </bean>
    <int-kafka:consumer-context id="consumerContext"
        consumer-timeout="4000" zookeeper-connect="zookeeperConnect" consumer-properties="consumerProperties">
        <int-kafka:consumer-configurations>
            <int-kafka:consumer-configuration
                group-id="mygroup" max-messages="5000">
                <int-kafka:topic id="test" streams="4" />
            </int-kafka:consumer-configuration>
            <!-- <int-kafka:consumer-configuration group-id="default3" value-decoder="kafkaSpecificDecoder" 
                key-decoder="kafkaReflectionDecoder" max-messages="10"> <int-kafka:topic-filter 
                pattern="regextopic.*" streams="4" exclude="false" /> </int-kafka:consumer-configuration> -->
        </int-kafka:consumer-configurations>
    </int-kafka:consumer-context>
    <int-kafka:zookeeper-connect id="zookeeperConnect"
        zk-connect="localhost:2181" zk-connection-timeout="6000"
        zk-session-timeout="400" zk-sync-time="200" />
</beans>

這個配置和Producer類似, 同樣宣告一個channel, 定義inbound-channel-adapter, 它引用Bean kafka-consumer-context,
kafka-consumer-context定義了消費者的列表。 consumer-configuration還提供了topic-filter,使用正則表示式建立白名單或者黑名單(exclude屬性)。

消費者上下文還需要zookeeper-connect。

由於spring-integration-kafka只實現了high level Consumer API,這也就意味著你不可能回滾重新檢視以前的訊息, 因為high level API不提供offset管理。

注意Channel中得到的有效負載的型別是:
Map