Spring 與 Kafka整合實戰
阿新 • • 發佈:2019-01-28
首先下載解壓zookeeper,選擇合適的映象站點以加快下載速度。
建立一個Spring專案 以上的準備環境完成,讓我們開始建立一個專案。
以前我寫過一篇簡單介紹:Spring 整合 Kafka.
spring-integration-kafka這個官方框架我就不介紹了。 我們主要使用它做整合。 首先我們先看一下使用Kafka自己的Producer/Consumer API傳送/接收訊息的例子。 使用Producer API傳送訊息到Kafka OK,現在我們先看一個使用Kafka 自己的producer API傳送訊息的例子:
一旦配置好這個Channel,就可以利用這個Channel往Kafka發訊息。 明顯地,Spring Integration特定的訊息傳送給這個Adaptor,然後傳送前在內部被轉為Kafka訊息。當前的版本要求你必須指定訊息key和topic作為頭部資料 (header),訊息作為有載荷(payload)。
例如
int-kafka:outbound-channel-adapter是outbound-channel-adapter物件, 內部使用一個執行緒池處理訊息。關鍵是kafka-producer-context-ref。
int-kafka:producer-context配置producer列表,要處理的topic,這些Producer最終要轉換成Kafka的Producer。 producer的配置引數如下:
不指定Encoder將使用Kafka預設的Encoder (kafka.serializer.DefaultEncoder, byte[] —> same byte[])。 使用spring-integration-kafka接收訊息 同樣的原理實現一個消費者:
spring-integration-kafka是Spring官方提供的一個Spring整合框架的擴充套件,用來為使用Spring框架的應用程式提供Kafka框架的整合。
當前spring-integration-kafka僅提供Kafka 0.8的整合,低版本的Kafka並不支援。 spring-integration-kafka僅僅支援兩個元件,分別對應Producer和 High Level Consumer。 它們分別是:
Outbound Channel Adapter: 用來向Kafka叢集傳送訊息。訊息讀取於Spring Integration channel。當前的版本需要你指定Topic和MessageKey。 轉自*******************http://www.aboutyun.com/thread-10322-1-1.html************************
我們可以將zookeeper加到系統服務中, 增加一個/etc/init.d/zookeeper檔案。
cd /opt
wget http://apache.fayea.com/apache-mirror/zookeeper/zookeeper-3.4.6/zookeeper-3.4.6.tar.gz
tar zxvf zookeeper-3.4.6.tar.gz
vi /etc/init.d/zookeeper
將https://raw.githubusercontent.com/apache/zookeeper/trunk/src/packages/rpm/init.d/zookeeper檔案的內容拷貝到這個檔案,修改其中的執行zookeeper的使用者以及zookeeper的資料夾位置。...... start() { echo -n [ DISCUZ_CODE_1 ]quot;Starting $desc (zookeeper): " daemon --user root /opt/zookeeper-3.4.6/zkServer.sh start RETVAL=$? echo [ $RETVAL -eq 0 ] && touch /var/lock/subsys/zookeeper return $RETVAL } stop() { echo -n [ DISCUZ_CODE_1 ]quot;Stopping $desc (zookeeper): " daemon --user root /opt/zookeeper-3.4.6/zkServer.sh stop RETVAL=$? sleep 5 echo [ $RETVAL -eq 0 ] && rm -f /var/lock/subsys/zookeeper $PIDFILE } ......
chmod 755 /etc/init.d/zookeeper
service zookeeper start
如果你不想加到服務,也可以直接執行zookeeper。/opt/zookeeper-3.4.6/zkServer.sh start安裝Kafka
從合適的映象站點下載最新的kafka並解壓。啟動、建立topic更多的介紹可以檢視我翻譯整理的wget http://apache.01link.hk/kafka/0.8.2-beta/kafka_2.9.1-0.8.2-beta.tgz tar zxvf kafka_2.9.1-0.8.2-beta.tgz cd kafka_2.9.1-0.8.2-beta bin/kafka-server-start.sh config/server.properties bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
建立一個Spring專案 以上的準備環境完成,讓我們開始建立一個專案。
以前我寫過一篇簡單介紹:Spring 整合 Kafka.
spring-integration-kafka這個官方框架我就不介紹了。 我們主要使用它做整合。 首先我們先看一下使用Kafka自己的Producer/Consumer API傳送/接收訊息的例子。 使用Producer API傳送訊息到Kafka OK,現在我們先看一個使用Kafka 自己的producer API傳送訊息的例子:
public class NativeProducer {
public static void main(String[] args) {
String topic= "test";
long events = 100;
Random rand = new Random();
Properties props = new Properties();
props.put("metadata.broker.list", "localhost:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
for (long nEvents = 0; nEvents < events; nEvents++) {
String msg = "NativeMessage-" + rand.nextInt() ;
KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, nEvents + "", msg);
producer.send(data);
}
producer.close();
}
}
這個例子中首先初始化Producer物件,指定相應的broker和serializer, 然後傳送100個字串訊息給Kafka。
執行mvn package編譯程式碼,執行檢視結果:
java -cp target/lib/*:target/spring-kafka-demo-0.2.0-SNAPSHOT.jar com.colobu.spring_kafka_demo.NativeProducer
使用Kafka High Level API接收訊息
用High level Consumer API接收訊息,
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class NativeConsumer {
private final ConsumerConnector consumer;
private final String topic;
private ExecutorService executor;
public NativeConsumer(String a_zookeeper, String a_groupId, String a_topic) {
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig(a_zookeeper, a_groupId));
this.topic = a_topic;
}
public void shutdown() {
if (consumer != null)
consumer.shutdown();
if (executor != null)
executor.shutdown();
}
public void run(int a_numThreads) {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(a_numThreads));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
// now launch all the threads
//
executor = Executors.newFixedThreadPool(a_numThreads);
// now create an object to consume the messages
//
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.submit(new ConsumerTest(stream, threadNumber));
threadNumber++;
}
}
private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
Properties props = new Properties();
props.put("zookeeper.connect", a_zookeeper);
props.put("group.id", a_groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
return new ConsumerConfig(props);
}
public static void main(String[] args) {
String zooKeeper = "localhost:2181";
String groupId = "mygroup";
String topic = "test";
int threads = 1;
NativeConsumer example = new NativeConsumer(zooKeeper, groupId, topic);
example.run(threads);
try {
Thread.sleep(10000);
} catch (InterruptedException ie) {
}
//example.shutdown();
}
}
class ConsumerTest implements Runnable {
private KafkaStream m_stream;
private int m_threadNumber;
public ConsumerTest(KafkaStream a_stream, int a_threadNumber) {
m_threadNumber = a_threadNumber;
m_stream = a_stream;
}
public void run() {
ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
while (it.hasNext())
System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));
System.out.println("Shutting down Thread: " + m_threadNumber);
}
}
在生產者控制檯輸入幾條訊息,可以看到執行這個例子的控制檯可以將這些訊息打印出來。
教程的程式碼中還包括一個使用Simple Consumer API接收訊息的例子。 因為spring-integration-kafka不支援這種API,這裡也不列出對比程式碼了。
使用spring-integration-kafka傳送訊息
Outbound Channel Adapter用來發送訊息到Kafka。 訊息從Spring Integration Channel中讀取。 你可以在Spring application context指定這個channel。一旦配置好這個Channel,就可以利用這個Channel往Kafka發訊息。 明顯地,Spring Integration特定的訊息傳送給這個Adaptor,然後傳送前在內部被轉為Kafka訊息。當前的版本要求你必須指定訊息key和topic作為頭部資料 (header),訊息作為有載荷(payload)。
例如
import java.util.Random;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
public class Producer {
private static final String CONFIG = "/context.xml";
private static Random rand = new Random();
public static void main(String[] args) {
final ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext(CONFIG, Producer.class);
ctx.start();
final MessageChannel channel = ctx.getBean("inputToKafka", MessageChannel.class);
for (int i = 0; i < 100; i++) {
channel.send(MessageBuilder.withPayload("Message-" + rand.nextInt()).setHeader("messageKey", String.valueOf(i)).setHeader("topic", "test").build());
}
try {
Thread.sleep(100000);
} catch (InterruptedException e) {
e.printStackTrace();
}
ctx.close();
}
}
Spring 配置檔案:<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
<int:channel id="inputToKafka">
<int:queue/>
</int:channel>
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
kafka-producer-context-ref="kafkaProducerContext"
auto-startup="false"
channel="inputToKafka"
order="3"
>
<int:poller fixed-delay="1000" time-unit="MILLISECONDS" receive-timeout="0" task-executor="taskExecutor"/>
</int-kafka:outbound-channel-adapter>
<task:executor id="taskExecutor" pool-size="5" keep-alive="120" queue-capacity="500"/>
<bean id="producerProperties"
class="org.springframework.beans.factory.config.PropertiesFactoryBean">
<property name="properties">
<props>
<prop key="topic.metadata.refresh.interval.ms">3600000</prop>
<prop key="message.send.max.retries">5</prop>
<prop key="serializer.class">kafka.serializer.StringEncoder</prop>
<prop key="request.required.acks">1</prop>
</props>
</property>
</bean>
<int-kafka:producer-context id="kafkaProducerContext"
producer-properties="producerProperties">
<int-kafka:producer-configurations>
<int-kafka:producer-configuration broker-list="localhost:9092"
topic="test"
compression-codec="default"/>
</int-kafka:producer-configurations>
</int-kafka:producer-context>
</beans>
int:channel是配置Spring Integration Channel, 此channel基於queue。int-kafka:outbound-channel-adapter是outbound-channel-adapter物件, 內部使用一個執行緒池處理訊息。關鍵是kafka-producer-context-ref。
int-kafka:producer-context配置producer列表,要處理的topic,這些Producer最終要轉換成Kafka的Producer。 producer的配置引數如下:
broker-list List of comma separated brokers that this producer connects to
topic Topic name or Java regex pattern of topic name
compression-codec Compression method to be used. Default is no compression. Supported compression codec are gzip and snappy.
Anything else would result in no compression
value-encoder Serializer to be used for encoding messages.
key-encoder Serializer to be used for encoding the partition key
key-class-type Type of the key class. This will be ignored if no key-encoder is provided
value-class-type Type of the value class. This will be ignored if no value-encoder is provided.
partitioner Custom implementation of a Kafka Partitioner interface.
async True/False - default is false. Setting this to true would make the Kafka producer to use
an async producer
batch-num-messages Number of messages to batch at the producer. If async is false, then this has no effect.
Spring Integration Kafka 也提供了個基於Avro的Encoder。 Avro也是Apache的一個專案, 在大資料處理時也是一個常用的序列化框架。不指定Encoder將使用Kafka預設的Encoder (kafka.serializer.DefaultEncoder, byte[] —> same byte[])。 使用spring-integration-kafka接收訊息 同樣的原理實現一個消費者:
package com.colobu.spring_kafka_demo;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.LoggerFactory;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.messaging.Message;
import ch.qos.logback.classic.Level;
public class Consumer {
private static final String CONFIG = "/consumer_context.xml";
private static Random rand = new Random();
@SuppressWarnings({ "unchecked", "unchecked", "rawtypes" })
public static void main(String[] args) {
ch.qos.logback.classic.Logger rootLogger = (ch.qos.logback.classic.Logger)LoggerFactory.getLogger(ch.qos.logback.classic.Logger.ROOT_LOGGER_NAME);
rootLogger.setLevel(Level.toLevel("info"));
final ClassPathXmlApplicationContext ctx = new ClassPathXmlApplicationContext(CONFIG, Consumer.class);
ctx.start();
final QueueChannel channel = ctx.getBean("inputFromKafka", QueueChannel.class);
Message msg;
while((msg = channel.receive()) != null) {
HashMap map = (HashMap)msg.getPayload();
Set<Map.Entry> set = map.entrySet();
for (Map.Entry entry : set) {
String topic = (String)entry.getKey();
System.out.println("Topic:" + topic);
ConcurrentHashMap<Integer,List<byte[]>> messages = (ConcurrentHashMap<Integer,List<byte[]>>)entry.getValue();
Collection<List<byte[]>> values = messages.values();
for (Iterator<List<byte[]>> iterator = values.iterator(); iterator.hasNext();) {
List<byte[]> list = iterator.next();
for (byte[] object : list) {
String message = new String(object);
System.out.println("\tMessage: " + message);
}
}
}
}
try {
Thread.sleep(100000);
} catch (InterruptedException e) {
e.printStackTrace();
}
ctx.close();
}
}
轉自********************http://www.aboutyun.com/forum.php?mod=viewthread&tid=10321*************spring-integration-kafka是Spring官方提供的一個Spring整合框架的擴充套件,用來為使用Spring框架的應用程式提供Kafka框架的整合。
當前spring-integration-kafka僅提供Kafka 0.8的整合,低版本的Kafka並不支援。 spring-integration-kafka僅僅支援兩個元件,分別對應Producer和 High Level Consumer。 它們分別是:
- Outbound Channel Adapter
-
Inbound Channel Adapter based on the High level consumer API
Outbound Channel Adapter: 用來向Kafka叢集傳送訊息。訊息讀取於Spring Integration channel。當前的版本需要你指定Topic和MessageKey。 轉自*******************http://www.aboutyun.com/thread-10322-1-1.html************************