Kafka簡單入門與Spring結合實踐
Kafka簡單入門與Spring結合實踐
一、【安裝部署kafka伺服器環境(centos7.0)】:
1.【注意】新版的kafka已經內建了一個zookeeper環境
2.【安裝與執行】:
可以在kafka官網 http://kafka.apache.org/downloads下載到最新的kafka安裝包,選擇下載二進位制版本的tgz檔案,本章用的是版本2.11_2.0.0,在centos7.0 直接解壓即可
3.【執行命令】:
./zookeeper-server-start.sh ../config/zookeeper.properties & ./kafka-server-start.sh ../config/server.properties &
4.【注意】:
【問題一】:Java端的消費者取不到訊息,生產者訊息也沒傳送成功,java通過kafka-client的API寫的程式碼始終不能跟kafka通訊:java producer的訊息發不出去, java comsumer也收不到任何訊息
【解決辦法】:修改kafka/config/server.properties 中的advertised.listeners 這個值改成自己虛擬機器IP地址
【問題二】:WARN [Consumer clientId=consumer-1, groupId=console-consumer-950] Connection to node -1 could not be established. Broker may not be available.
【解決辦法】:檢視本機宿主機是否可以ping通centos7.0 虛擬機器的IP地址,一般ping不通,虛擬機器的IP地址改變了。
二、【Java程式 + Spring -Kafka 執行例項】:
參考部落格:https://www.cnblogs.com/hei12138/p/7805475.html
1.【Java程式】 :
pom.xml 依賴配置:【注意】:其中版本號要與伺服器端版本一致
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.0.0</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>2.0.0</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency>
【主題配置】:TopicMain.java
package com.caox.kafka._01_topic;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import java.util.ArrayList;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
/**
* Created by nazi on 2018/8/27.
* @author nazi
*/
public class TopicMain {
public static void main(String[] argv) throws Exception {
//建立topic
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.80.150:9092");
AdminClient adminClient = AdminClient.create(props);
ArrayList<NewTopic> topics = new ArrayList<NewTopic>();
/**
* NewTopic(String name, int numPartitions, short replicationFactor)
* 的構造方法來建立了一個名為“topic-test”,分割槽數為1,複製因子為1的Topic.
*/
NewTopic newTopic = new NewTopic("topic-test3", 1, (short) 1);
topics.add(newTopic);
CreateTopicsResult result = adminClient.createTopics(topics);
try {
result.all().get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
【生產者】:ProducerMain.java
package com.caox.kafka._01_topic;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
/**
* Created by nazi on 2018/8/27.
* @author nazi
*/
@Slf4j
public class ProducerMain {
public static void main(String[] argv) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.80.149:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String,String> producer = new KafkaProducer<String, String>(props);
for (int i = 0; i < 100; i++){
producer.send(new ProducerRecord<String, String>("topic-test2", Integer.toString(i), Integer.toString(i)));
}
log.info("call SUCCESS");
producer.close();
}
}
【消費者】:ConsumerMain.java
package com.caox.kafka._01_topic;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;
/**
* Created by nazi on 2018/8/27.
* @author nazi
*/
@Slf4j
public class ConsumerMain {
public static void main(String[] argv) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.80.149:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
final KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props);
consumer.subscribe(Arrays.asList("topic-test2"),new ConsumerRebalanceListener() {
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
}
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
//將偏移設定到最開始
consumer.seekToBeginning(collection);
}
});
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records){
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
}
}
2.【Spring-Kafka配置程式】 :
【pom.xml】依賴配置:
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.0.6.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>5.0.6.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>5.0.6.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.1.6.RELEASE</version>
</dependency>
【kafka配置】:KafkaConfig.java:
package com.caox.kafka._02_spring_kafka;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import java.util.HashMap;
import java.util.Map;
/**
* Created by nazi on 2018/8/28.
* @author nazi
*/
@Configuration
@EnableKafka
public class KafkaConfig {
private static String BOOTSTRAP_SERVERS_CONFIG = "192.168.80.150:9092";
/**
* topic配置
*/
/******************************************************************************************************************/
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<String, Object>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,BOOTSTRAP_SERVERS_CONFIG);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topic1() {
return new NewTopic("foo", 10, (short) 1);
}
/******************************************************************************************************************/
/**
* 配置生產者Factory及Template
*/
/******************************************************************************************************************/
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<Integer,String>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<String,Object>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS_CONFIG);
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory());
}
/******************************************************************************************************************/
/**
* 配置ConsumerFactory
*/
/******************************************************************************************************************/
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer,String> kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<Integer, String>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public ConsumerFactory<Integer,String> consumerFactory(){
return new DefaultKafkaConsumerFactory<Integer, String>(consumerConfigs());
}
@Bean
public Map<String,Object> consumerConfigs(){
HashMap<String, Object> props = new HashMap<String, Object>();
props.put("bootstrap.servers", BOOTSTRAP_SERVERS_CONFIG);
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return props;
}
/******************************************************************************************************************/
/**
* 預設spring-kafka會為每一個監聽方法建立一個執行緒來向kafka伺服器拉取訊息
*/
@Bean
public SimpleConsumerListener simpleConsumerListener(){
return new SimpleConsumerListener();
}
}
【生產者配置】:ProducerMain.java
package com.caox.kafka._02_spring_kafka;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
/**
* Created by nazi on 2018/8/28.
* @author nazi
*/
public class ProducerMain {
/**
* 建立訊息生產者
* @param argv 引數
* @throws Exception 異常
*/
public static void main(String[] argv) throws Exception {
AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(KafkaConfig.class);
KafkaTemplate<Integer, String> kafkaTemplate = (KafkaTemplate<Integer, String>) ctx.getBean("kafkaTemplate");
String data="this is a test message";
ListenableFuture<SendResult<Integer, String>> send = kafkaTemplate.send("topic-test3", 1, data);
send.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
public void onFailure(Throwable throwable) {
}
public void onSuccess(SendResult<Integer, String> integerStringSendResult) {
System.out.println("success to receive message !");
}
});
}
}
【消費者配置】:消費者監聽配置 SimpleConsumerListener.java:
package com.caox.kafka._02_spring_kafka;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import java.util.concurrent.CountDownLatch;
/**
* Created by nazi on 2018/8/28.
* @author nazi
*/
public class SimpleConsumerListener {
private final static Logger logger = LoggerFactory.getLogger(SimpleConsumerListener.class);
private final CountDownLatch latch1 = new CountDownLatch(1);
@KafkaListener(id = "foo", topics = "topic-test3")
// public void listen(byte[] records) {
// //do something here
// this.latch1.countDown();
// }
public void listen(ConsumerRecord record) {
System.out.println("listen : " + " key:"+ record.key() + " value: " + record.value());
}
}