1. 程式人生 > >Kafka Java

Kafka Java



Kafka在工程中的使用方法

在解決一個具體的業務需求時,原始資料從app、web、H5等多個來源傳輸到Kafka當中;此時,開發者則需要開發Kafka消費者,將其中的資料進行業務邏輯的開發。

而其中,消費Kafka的方式則通過Java程式來進行;具體方式又有兩種:

  1. 構建Maven工程;
  2. 構建Spring Kafka工程。

以下,詳細說明構建Java工程消費Kafka的詳細步驟。
- IDE = IDEA 2018.2
- Java Version = 1.8

Maven Kafka

步驟

  1. 使用IDEA,基於Maven模組新建Project,輸入GroupId和ArtificialID,設定工程在本地的目錄;
    新建Maven工程

  2. 開啟工程中的pom.xml檔案,新增依賴的jar包;

  3. 在src-java目錄下,新建兩個類Product.java和Consumer.java。接下來,在這兩個類中配置Kafka引數、並新增業務邏輯程式碼即可;

Demo

1.pom.xml檔案原始碼:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation
="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion> <groupId>com.scut</groupId> <artifactId>test_kafka_maven</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency
>
<groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>1.0.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.25</version> </dependency> </dependencies> </project>

2.KafkaConsumerDemo.java

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.*;

/**
 * @ Name        : KafkaConsumerDemo
 * @ Description : TODO
 * @ Author      : yangsong
 * @ Date        : 2018-9-11 18:58
 * @ Version     : 1.0
 **/


public class KafkaConsumerDemo {
    private final KafkaConsumer<String, String> consumer;

    private KafkaConsumerDemo(){
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.102.85:9092");  //
        props.put("group.id", "test");  // group.id: 組名,不同組名可以重複消費
        props.put("enable.auto.commit", "true");  // enable.auto.commit:是否自動提交,預設為true
        props.put("auto.commit.interval.ms", "1000");  // auto.commit.interval.ms: 從pull(拉)的回話處理時長\

        props.put("linger.ms", 1);  //
        props.put("buffer.memory", 33554432);  //

        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  // 鍵序列化,取預設值
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  // 值序列化,取預設值
        consumer = new KafkaConsumer<>(props);
    }

    void consume(){
        consumer.subscribe(Arrays.asList(KafkaProducerDemo.TOPIC));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                //
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }

    public  static  void main(String[] args){
        new KafkaConsumerDemo().consume();
    }
}

3.KafkaProducerDemo.java

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

/**
 * @ Name        : KafkaProducerDemo
 * @ Description : TODO
 * @ Author      : yangsong
 * @ Date        : 2018-9-11 18:57
 * @ Version     : 1.0
 **/


public class KafkaProducerDemo {
    private final Producer<String,String> kafkaProducer;

    public final static String TOPIC="zx_eam";  // TOPIC給定機器上的某個topic

    private KafkaProducerDemo(){
        kafkaProducer=createKafkaProducer() ;
    }
    private Producer<String,String> createKafkaProducer(){
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.102.85:9092");  // bootstrap.servers: Kafka的地址
        props.put("acks", "all");  // acks: 訊息的確認機制,預設值是0
        props.put("retries", 0);  // retries: 配置為大於0的值的話,客戶端會在訊息傳送失敗時重新發送
        props.put("batch.size", 16384);  // batch.size: 當多條訊息需要傳送到同一個分割槽時,生產者會嘗試合併網路請求
        props.put("linger.ms", 1);  //
        props.put("buffer.memory", 33554432);  //
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");  // 鍵序列化,取預設值
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");  // 值序列化,取預設值

        Producer<String, String> kafkaProducer = new KafkaProducer<>(props);
        return kafkaProducer;
    }

    void produce(){
        for(int i=1;i<1000;i++){
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            String key=String.valueOf("key"+i);
            String data="hello kafka message:"+key;
            // 生產資料
            kafkaProducer.send(new ProducerRecord<>(TOPIC, key, data), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    //do sth
                }
            });
            System.out.println(data);
        }
    }

    public  static  void main(String[] args){
        new KafkaProducerDemo().produce();
    }
}

Spring Kafka

步驟

Demo

部署程式

當寫好上述的程式之後,有兩種部署方法。

  1. 當業務邏輯比較簡單,不需要使用到分散式系統的時候,直接在本地的windows機器上Run起來就可以了;