springboot配置kafka與原生kafka配置
搭建kafka要注意版本問題,本教程使用的kafka版本是kafka_2.11-0.11.0.2.tgz;首先看下spring-boot連結kafka的使用。
1. 新增pom依賴
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <!--<version>1.5.8.RELEASE</version>--> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.0</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> </dependencies>
2. application.properties配置
kafka.consumer.zookeeper.connect=zookeeper-ip:2181 kafka.consumer.servers=kafka-ip:9092 kafka.consumer.enable.auto.commit=true kafka.consumer.session.timeout=6000 kafka.consumer.auto.commit.interval=100 kafka.consumer.auto.offset.reset=latest kafka.consumer.topic=test kafka.consumer.group.id=test kafka.consumer.concurrency=10 kafka.producer.servers=kafka-ip:9092 kafka.producer.retries=0 kafka.producer.batch.size=4096 kafka.producer.linger=1 kafka.producer.buffer.memory=40960
3. 新增kafkaConsumer配置類:
package com.databps.bigdaf.admin.config; import com.databps.bigdaf.admin.manager.HomePageManager; import com.databps.bigdaf.admin.vo.HomePageVo; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import java.util.HashMap; import java.util.Map; /** * @author haipeng * @create 17-11-2 上午11:39 */ @Configuration @EnableKafka public class KafkaConsumerConfig { @Value("${kafka.consumer.servers}") private String servers; @Value("${kafka.consumer.enable.auto.commit}") private boolean enableAutoCommit; @Value("${kafka.consumer.session.timeout}") private String sessionTimeout; @Value("${kafka.consumer.auto.commit.interval}") private String autoCommitInterval; @Value("${kafka.consumer.group.id}") private String groupId; @Value("${kafka.consumer.auto.offset.reset}") private String autoOffsetReset; @Value("${kafka.consumer.concurrency}") private int concurrency; @Autowired private HomePageManager homePageManager; @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(concurrency); factory.getContainerProperties().setPollTimeout(1500); return factory; } public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); // propsMap.put("zookeeper.connect", "master1.hdp.com:2181,master2.hdp.com:2181,slave1.hdp.com:2181"); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); return propsMap; } }
4. 新增kafkaProducer配置類
package com.databps.bigdaf.admin.config;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
/**
* @author haipeng
* @create 17-11-2 上午11:37
*/
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Value("${kafka.producer.servers}")
private String servers;
@Value("${kafka.producer.retries}")
private int retries;
@Value("${kafka.producer.batch.size}")
private int batchSize;
@Value("${kafka.producer.linger}")
private int linger;
@Value("${kafka.producer.buffer.memory}")
private int bufferMemory;
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<String, String>(producerFactory());
}
}
5. 生產者呼叫過程:
(1)新增kafkaTemplate注入
@Autowired
private KafkaTemplate kafkaTemplate;
(2)將要傳輸資料轉化為json傳送,本例項通過Gson進行轉換
AuditVo auditVo=new AuditVo();
long sortData=Long.parseLong(DateUtils.getNowDateTime());
auditVo.setId("sdfdf");
auditVo.setCmpyId(cmpyId);
auditVo.setUser("whp");
auditVo.setPluginIp("192.168.1.53");
auditVo.setAccessTime(DateUtils.getNowDateTime());
auditVo.setAccessType("WRITE");
auditVo.setAction("write");
auditVo.setAccessResult("success");
auditVo.setServiceType("hbase");
auditVo.setResourcePath("/whp");
Gson gson=new Gson();
kafkaTemplate.send("test", gson.toJson(auditVo));
(3)消費者類只要在方法上添加註解就可以了:@KafkaListener(topics = {"test"})
@Component
public class KafkaConsumer {
@KafkaListener(topics = {"test"})
public void processMessage(String content) {
System.out.println("訊息被消費"+content);
}
}
使用原生kafka的java API進行kafka測試方法如下:
1. pom檔案引入依賴
<mapr-storm-kafka.version>1.0.1</mapr-storm-kafka.version>
<scala.version>2.11</scala.version>
<kafka.version>0.10.0.0</kafka.version>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_${scala.version}</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>${mapr-storm-kafka.version}</version>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>log4j-over-slf4j</artifactId>
</exclusion>
</exclusions>
</dependency>
2. 新增測試類
package com.example.demo.kafka;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.Test;
public class kafkaConsumer {
private String topic="test";
@Test
public void Producer(){
Properties props = new Properties();
props.put("bootstrap.servers", "master1.hdp.com:6667");
props.put("acks", "all"); //ack方式,all,會等所有的commit最慢的方式
props.put("retries", 0); //失敗是否重試,設定會有可能產生重複資料
props.put("batch.size", 16384); //對於每個partition的batch buffer大小
props.put("linger.ms", 1); //等多久,如果buffer沒滿,比如設為1,即訊息傳送會多1ms的延遲,如果buffer沒滿
props.put("buffer.memory", 33554432); //整個producer可以用於buffer的記憶體大小
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<String, String>(topic, "", Integer.toString(1)));
producer.close();
}
private ConsumerConnector consumer;
@Test
public void kafkaConsumer() {
Properties props = new Properties();
// zookeeper 配置
props.put("zookeeper.connect", "master1.hdp.com:2181,master2.hdp.com:2181,slave1.hdp.com:2181");
// group 代表一個消費組
props.put("group.id", "jd-group");
// zk連線超時
props.put("zookeeper.session.timeout.ms", "4000");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "largest");
// 序列化類
props.put("serializer.class", "kafka.serializer.StringEncoder");
ConsumerConfig config = new ConsumerConfig(props);
consumer = (ConsumerConnector) kafka.consumer.Consumer.createJavaConsumerConnector(config);
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put("test", new Integer(1));
StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
StringDecoder valueDecoder = new StringDecoder(
new VerifiableProperties());
Map<String, List<KafkaStream<String, String>>> consumerMap =
consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);
KafkaStream<String, String> stream = consumerMap.get(
"test").get(0);
ConsumerIterator<String, String> it = stream.iterator();
while (it.hasNext())
System.out.println(it.next().message());
}
}
程式碼請參考個人git賬號:https://github.com/whpHarper/springboot-kafka
相關推薦
springboot配置kafka與原生kafka配置
搭建kafka要注意版本問題,本教程使用的kafka版本是kafka_2.11-0.11.0.2.tgz;首先看下spring-boot連結kafka的使用。 1. 新增pom依賴 <dependencies> <depend
Kafka與ZooKeeper的配置
(3)虛擬機器(192.168.100.106)進行如下操作 a)對檔案: $KAFKA_HOME/config/server.properties進行修改編輯: broker.id=2
Nginx 配置HTTPS 與Node.js 配置HTTPS方法
發生 win var 路徑 上傳 還需要 centos 重啟 我沒 前段時間公司網站要求加上HTTPS安全CA證書,公司服務器全是阿裏雲服務器,並且配有負載均衡,所以選擇直接在阿裏雲購買CA證書,阿裏雲有一種證書可以免費試用一年,決定申請此證書,阿裏雲證書需要驗證,阿裏雲有
windows下的nginx安裝和配置及與tomcat關聯配置記錄
1. 官方中文說明:http://wiki.nginx.org/NginxChs 或者http://nginx.org/ 下載:當前穩定版: Nginx 1.0.5 | Nginx/windows 1.0.5 (更新記錄) (2011年7月19日) 我們下
Ubuntu中網路配置interfaces與介面網路配置NetworkManager
【Server版本】 在Ubuntu Server版本中,因為只存有命令列模式,所以要想進行網路引數設定,只能通過修改 /etc/network/interfaces 。具體設定方法如下: (1) Ubuntu Server 修改 IP地址 開啟 /etc/network/
組織級配置管理員與專案級配置管理員的職責定義
很多公司設定了組織級配置管理員與專案級配置管理員,以下為建議的對這兩種崗位的職責定義。 專案級配置管理員的職責: 1 制定配置管理計劃 2 建立並維護配置管理庫 3 建立併發布基線 4 物理審計(PCA) 5 跟蹤並關閉變更申請 6 報告配置狀態 組織級CM的職責: 1 為專
kafka叢集與zookeeper叢集 配置過程
Kafka的叢集配置一般有三種方法,即 (1)Single node – single broker叢集; (2)Single node – multiple broker叢集; (3)Multiple node – multiple broker叢集。 前兩種方法官網上有配置過
Kafka JavaApi中消費者與生產者的配置
檔案目錄如下: 1.ConsumerDemo配置 package com.course.test; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.c
kafka安裝配置及與logstash整合
1、kafka安裝 下載 wget http://mirror.bit.edu.cn/apache/kafka/0.8.2.2/kafka_2.9.1-0.8.2.2.tgz 配置zookeeper vim bin/zookeeper-server-start.
kafka 並發數配置過程中踩到的坑 InstanceAlreadyExistsException
static 靜態初始化塊 obj -c 異常 判斷 con lba comm 2017-07-05 13:09:15.460 [kafka_spout:7-MultipleThreadSpoutExecutors] WARN o.a.kafka.common.utils.
kafka集群安裝配置
del etc pcs zookeepe 分發 pan div 宋體 rod 1.下載安裝包 2.解壓安裝包 3.進入到kafka的config目錄修改server.properties文件 進入後顯示如下: 修改log.dirs,基本上大部分都是默認配置
【kafka】集群配置
kafka cluster 1.先配置好zookeeper集群(可以看zookeeper集群搭建)2.三個kafka機器10.1.44.186 kafka(9092) zookeeper(2181)10.1.44.187 kafka(9092) zookeeper(2181)10.1.44.188 k
SpringBoot(七):集成DataSource 與 Druid監控配置
javax time max release too .repo select 進行 防火墻 綁定DataSource:Spring Boot默認的數據源是:org.apache.tomcat.jdbc.pool.DataSource,Druid是Java語言中最好的數據庫
SpringBoot 配置 @ConfigurationProperties 與 @Value 區別
alt -s clas oot rop mage pro col ron 一、SpringBoot 配置 @ConfigurationProperties 與 @Value 區別 配置文件 yml 還是 properties 他們都能獲取到值; 如果說,我們只是在某個業
統一日誌ELK部署配置(2)——kafka
日誌收集前提:你服務器上已經安裝並配置了java運行環境; 一、zookeeper安裝1、從zookeeper官網:http://zookeeper.apache.org/ 下載;我這裏下載的是zookeeper-3.4.9.tar.gz;2、解壓到你安裝目錄:tar-zxvf zookeeper-3.4.9
向spark集群提交消費kafka應用時kafka鑒權配置問題
clu params pac tid version 屬性。 包含 conf red 提交消費kafka應用裏面包含sasl.jaas.config,通常需要配置文件。但是打成jar包後的應用,通過classload讀不到jar包中配置文件。需要初始化kafka時增加pro
Kafka概述以及安裝配置
一、Kafka概述 PUBLISH & SUBSCRIBE Read and write streams of data like a messaging system. 釋出和訂閱 讀取和寫入資料流,類似訊息傳遞系統。 PROCESS Write scalable stre
kafka 增加許可權認證配置
一、 版本說明: zookeeper版本無要求,kafka必須使用0.9 以後的版本 本例使用:zookeeper-3.4.10,kafka_2.11-1.0.0
kafka的配置,kafka和flume的配置
參考文件: https://files.cnblogs.com/files/han-guang-xue/kafka.zip 其中實現如圖的效果詳細步驟如下: #han01.confa1.sources=r1 a1.channels=c1 a1.sinks=k1 a1.sources.r1.t
kafka 消費者優化及配置詳解 [email
自定義屬性和執行工廠 public KafkaListenerContainerFactory<?> batchFactory(){ ConcurrentKafkaListenerContainerFactory<Integer, Stri