Kafka Java
阿新 • • 發佈:2019-01-02
Kafka在工程中的使用方法
在解決一個具體的業務需求時,原始資料從app、web、H5等多個來源傳輸到Kafka當中;此時,開發者則需要開發Kafka消費者,將其中的資料進行業務邏輯的開發。
而其中,消費Kafka的方式則通過Java程式來進行;具體方式又有兩種:
- 構建Maven工程;
- 構建Spring Kafka工程。
以下,詳細說明構建Java工程消費Kafka的詳細步驟。
- IDE = IDEA 2018.2
- Java Version = 1.8
Maven Kafka
步驟
使用IDEA,基於Maven模組新建Project,輸入GroupId和ArtificialID,設定工程在本地的目錄;
開啟工程中的pom.xml檔案,新增依賴的jar包;
在src-java目錄下,新建兩個類Product.java和Consumer.java。接下來,在這兩個類中配置Kafka引數、並新增業務邏輯程式碼即可;
Demo
1.pom.xml檔案原始碼:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation ="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.scut</groupId>
<artifactId>test_kafka_maven</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency >
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>1.0.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
</dependency>
</dependencies>
</project>
2.KafkaConsumerDemo.java
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.*;
/**
* @ Name : KafkaConsumerDemo
* @ Description : TODO
* @ Author : yangsong
* @ Date : 2018-9-11 18:58
* @ Version : 1.0
**/
public class KafkaConsumerDemo {
private final KafkaConsumer<String, String> consumer;
private KafkaConsumerDemo(){
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.102.85:9092"); //
props.put("group.id", "test"); // group.id: 組名,不同組名可以重複消費
props.put("enable.auto.commit", "true"); // enable.auto.commit:是否自動提交,預設為true
props.put("auto.commit.interval.ms", "1000"); // auto.commit.interval.ms: 從pull(拉)的回話處理時長\
props.put("linger.ms", 1); //
props.put("buffer.memory", 33554432); //
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 鍵序列化,取預設值
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 值序列化,取預設值
consumer = new KafkaConsumer<>(props);
}
void consume(){
consumer.subscribe(Arrays.asList(KafkaProducerDemo.TOPIC));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
//
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
}
public static void main(String[] args){
new KafkaConsumerDemo().consume();
}
}
3.KafkaProducerDemo.java
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
/**
* @ Name : KafkaProducerDemo
* @ Description : TODO
* @ Author : yangsong
* @ Date : 2018-9-11 18:57
* @ Version : 1.0
**/
public class KafkaProducerDemo {
private final Producer<String,String> kafkaProducer;
public final static String TOPIC="zx_eam"; // TOPIC給定機器上的某個topic
private KafkaProducerDemo(){
kafkaProducer=createKafkaProducer() ;
}
private Producer<String,String> createKafkaProducer(){
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.102.85:9092"); // bootstrap.servers: Kafka的地址
props.put("acks", "all"); // acks: 訊息的確認機制,預設值是0
props.put("retries", 0); // retries: 配置為大於0的值的話,客戶端會在訊息傳送失敗時重新發送
props.put("batch.size", 16384); // batch.size: 當多條訊息需要傳送到同一個分割槽時,生產者會嘗試合併網路請求
props.put("linger.ms", 1); //
props.put("buffer.memory", 33554432); //
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 鍵序列化,取預設值
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 值序列化,取預設值
Producer<String, String> kafkaProducer = new KafkaProducer<>(props);
return kafkaProducer;
}
void produce(){
for(int i=1;i<1000;i++){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String key=String.valueOf("key"+i);
String data="hello kafka message:"+key;
// 生產資料
kafkaProducer.send(new ProducerRecord<>(TOPIC, key, data), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
//do sth
}
});
System.out.println(data);
}
}
public static void main(String[] args){
new KafkaProducerDemo().produce();
}
}
Spring Kafka
步驟
Demo
部署程式
當寫好上述的程式之後,有兩種部署方法。
當業務邏輯比較簡單,不需要使用到分散式系統的時候,直接在本地的windows機器上Run起來就可以了;