1. 程式人生 > >kafka2.1.0核心概念及入門教程

kafka2.1.0核心概念及入門教程

kafka

kafka 核心概念

什麼是kafka

Kafka是由Apache軟體基金會開發的一個開源流處理平臺,由Scala和Java編寫。該專案的目標是為處理實時資料提供一個統一、高吞吐、低延遲的平臺。其持久化層本質上是一個“按照分散式事務日誌架構的大規模釋出/訂閱訊息佇列”,這使它作為企業級基礎設施來處理流式資料非常有價值。此外,Kafka可以通過Kafka Connect連線到外部系統(用於資料輸入/輸出),並提供了Kafka Streams——一個Java流式處理庫。該設計受事務日誌的影響較大。

基本概念

Kafka是一個分散式資料流平臺,可以執行在單臺伺服器上,也可以在多臺伺服器上部署形成叢集。它提供了釋出和訂閱功能,使用者可以傳送資料到Kafka中,也可以從Kafka中讀取資料(以便進行後續的處理)。Kafka具有高吞吐、低延遲、高容錯等特點。下面介紹一下Kafka中常用的基本概念:

  • Broker
    訊息佇列中常用的概念,在Kafka中指部署了Kafka例項的伺服器節點。

  • Topic
    用來區分不同型別資訊的主題。比如應用程式A訂閱了主題t1,應用程式B訂閱了主題t2而沒有訂閱t1,那麼傳送到主題t1中的資料將只能被應用程式A讀到,而不會被應用程式B讀到。

  • Partition
    每個topic可以有一個或多個partition(分割槽)。分割槽是在物理層面上的,不同的分割槽對應著不同的資料檔案。Kafka使用分割槽支援物理上的併發寫入和讀取,從而大大提高了吞吐量。

  • Record
    實際寫入Kafka中並可以被讀取的訊息記錄。每個record包含了key、value和timestamp。

  • Producer
    生產者,用來向Kafka中傳送資料(record)。

  • Consumer
    消費者,用來讀取Kafka中的資料(record)。

  • Consumer Group
    一個消費者組可以包含一個或多個消費者。使用多分割槽+多消費者方式可以極大提高資料下游的處理速度。

kafka 核心名詞解釋

  • Topic(主題): 每一條傳送到kafka叢集的訊息都可以有一個類別,這個類別叫做topic,不同的訊息會進行分開儲存,如果topic很大,可以分佈到多個broker上,也可以這樣理解:topic被認為是一個佇列,每一條訊息都必須指定它的topic,可以說我們需要明確把訊息放入哪一個佇列。對於傳統的message queue而言,一般會刪除已經被消費的訊息,而Kafka叢集會保留所有的訊息,無論其被消費與否。當然,因為磁碟限制,不可能永久保留所有資料(實際上也沒必要),因此Kafka提供兩種策略刪除舊資料。一是基於時間,二是基於Partition檔案大小。

  • Broker(代理): 一臺kafka伺服器就可以稱之為broker.一個叢集由多個broker組成,一個broker可以有多個topic

  • Partition(分割槽): 為了使得kafka吞吐量線性提高,物理上把topic分成一個或者多個分割槽,每一個分割槽是一個有序的佇列。且每一個分割槽在物理上都對應著一個資料夾,該資料夾下儲存這個分割槽所有訊息和索引檔案。
    分割槽的表示: topic名字-分割槽的id每個日誌檔案都是一個Log Entry序列,每個Log Entry包含一個4位元組整型數值(值為M+5),1個位元組的"magic value",4個位元組的CRC校驗碼,然後跟M個位元組的訊息這個log entries並非由一個檔案構成,而是分成多個segment,每個segment以該segment第一條訊息的offset命名並以“.kafka”為字尾。另外會有一個索引檔案,它標明瞭每個segment下包含的log entry的offset範圍分割槽中每條訊息都有一個當前Partition下唯一的64位元組的offset,它指明瞭這條訊息的起始位置,Kafka只保證一個分割槽的資料順序傳送給消費者,而不保證整個topic裡多個分割槽之間的順序

  • Replicas(副本): 試想:一旦某一個Broker宕機,則其上所有的Partition資料都不可被消費,所以需要對分割槽備份。其中一個宕機後其它Replica必須要能繼續服務並且即不能造成資料重複也不能造成資料丟失。
    如果沒有一個Leader,所有Replica都可同時讀/寫資料,那就需要保證多個Replica之間互相(N×N條通路)同步資料,資料的一致性和有序性非常難保證,大大增加了Replication實現的複雜性,同時也增加了出現異常的機率。而引入Leader後,只有Leader負責資料讀寫,Follower只向Leader順序Fetch資料(N條通路),系統更加簡單且高效。
    每一個分割槽,根據複製因子N,會有N個副本,比如在broker1上有一個topic,分割槽為topic-1, 複製因子為2,那麼在兩個broker的資料目錄裡,就都有一個topic-1,其中一個是leader,一個replicas同一個Partition可能會有多個Replica,而這時需要在這些Replication之間選出一個Leader,Producer和Consumer只與這個Leader互動,其它Replica作為Follower從Leader中複製資料

  • Producer: Producer將訊息釋出到指定的topic中,同時,producer還需要指定該訊息屬於哪個partition

  • Consumer: 本質上kafka只支援topic,每一個consumer屬於一個consumer group,每個consumer group可以包含多個consumer。傳送到topic的訊息只會被訂閱該topic的每個group中的一個consumer消費。如果所有的consumer都具有相同的group,這種情況和queue很相似,訊息將會在consumer之間均衡分配;如果所有的consumer都在不同的group中,這種情況就是廣播模式,訊息會被髮送到所有訂閱該topic的group中,那麼所有的consumer都會消費到該訊息。kafka的設計原理決定,對於同一個topic,同一個group中consumer的數量不能多於partition的數量,否則就會有consumer無法獲取到訊息。

  • Offset: Offset專指Partition以及User Group而言,記錄某個user group在某個partiton中當前已經消費到達的位置。

kafka使用場景

目前主流使用場景基本如下:

  • 訊息佇列(MQ)
    在系統架構設計中,經常會使用訊息佇列(Message Queue)——MQ。MQ是一種跨程序的通訊機制,用於上下游的訊息傳遞,使用MQ可以使上下游解耦,訊息傳送上游只需要依賴MQ,邏輯上和物理上都不需要依賴其他下游服務。MQ的常見使用場景如流量削峰、資料驅動的任務依賴等等。在MQ領域,除了Kafka外還有傳統的訊息佇列如ActiveMQ和RabbitMQ等。

  • 追蹤網站活動
    Kafka最出就是被設計用來進行網站活動(比如PV、UV、搜尋記錄等)的追蹤。可以將不同的活動放入不同的主題,供後續的實時計算、實時監控等程式使用,也可以將資料匯入到資料倉庫中進行後續的離線處理和生成報表等。

  • Metrics
    Kafka經常被用來傳輸監控資料。主要用來聚合分散式應用程式的統計資料,將資料集中後進行統一的分析和展示等。

  • 日誌聚合
    很多人使用Kafka作為日誌聚合的解決方案。日誌聚合通常指將不同伺服器上的日誌收集起來並放入一個日誌中心,比如一臺檔案伺服器或者HDFS中的一個目錄,供後續進行分析處理。相比於Flume和Scribe等日誌聚合工具,Kafka具有更出色的效能。

kafka 叢集搭建

安裝kefka叢集

由於kafka依賴zookeeper環境所以先安裝zookeeper,zk安裝

安裝環境

ContSO-7.5_x64
jdk1.8.0_191
zookeeper3.4.10
kafka_2.11-2.0.1
# 下載
wget http://mirrors.hust.edu.cn/apache/kafka/2.1.0/kafka_2.11-2.1.0.tgz

# 解壓
tar -zxvf kafka_2.11-2.1.0.tgz -C /xxx/xxx

# 編輯配置檔案修改一下幾個配置
vim $KAFKA_HOME/config/server.properties


# 每臺伺服器的broker.id都不能相同只能是數字
broker.id=1

# 修改為你的伺服器的ip或主機名
advertised.listeners=PLAINTEXT://node-1:9092

#設定zookeeper的連線埠,將下面的ip修改為你的IP稱或主機名
zookeeper.connect=node-1:2181,node-2:2181,node-3:2181

啟動Kafka叢集並測試

cd $KAFKA_HOME

# 分別在每個節點啟動kafka服務
bin/kafka-server-start.sh config/server.properties

# 建立Topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 1 --topic order

#解釋
--replication-factor 2  #複製兩份
--partitions 1          #建立1個分割槽
--topic                 #主題為 order

# 檢視topic
bin/kafka-topics.sh --list --zookeeper localhost:2181

# 檢視topic狀態
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic order

# 在一臺伺服器上建立一個 producer (生產者)
bin/kafka-console-producer.sh --broker-list node-1:9092,node-2:9092,node-3:9092 --topic order

# 在一臺伺服器上建立一個 consumer (消費者)
bin/kafka-console-consumer.sh --bootstrap-server node-2:9092,node-3:9092,node-4:9092 --topic order --from-beginning

# 現在可以在生產者的控制檯輸入任意字元就可以看到消費者端有消費訊息。

java 客戶端連線kafka

普通java形式

pom:

<dependencies>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.11</artifactId>
            <version>2.1.0</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
        </dependency>


    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.7.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
        </plugins>
    </build>

Consumer:

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.Collections;
import java.util.Properties;

public class JavaKafkaConsumer {

    private static Logger logger = LoggerFactory.getLogger(JavaKafkaConsumer.class);

    private static Producer<String, String> producer;

    private final static String TOPIC = "TEST-TOPIC";

    private static Properties properties;

    static {
        properties = new Properties();
        //此處配置的是kafka的埠
        properties.put("bootstrap.servers", "node-2:9092,node-3:9092,node-3:9094");
    }

    public static void main(String[] args) {
        properties.put("group.id", "test");
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singletonList(TOPIC), new ConsumerRebalanceListener() {

            public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            }

            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                //將偏移設定到最開始
                consumer.seekToBeginning(collection);
            }
        });
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                logger.info("offset = {}, key = {}, value = {}", record.offset(), record.key(), record.value());
            }
        }
    }


}

Producer:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;
import java.util.UUID;

/**
 * <p>
 *
 * @author leone
 * @since 2018-12-26
 **/
public class JavaKafkaProducer {

    private static Logger logger = LoggerFactory.getLogger(JavaKafkaProducer.class);

    private static Producer<String, String> producer;

    private final static String TOPIC = "TEST-TOPIC";

    private static Properties properties;

    static {
        properties = new Properties();
        //此處配置的是kafka的埠
        properties.put("bootstrap.servers", "node-2:9092,node-3:9092,node-3:9094");
    }

    public static void main(String[] args) {
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<>(properties);
        for (int i = 0; i < 100; i++) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
            }
            String uuid = UUID.randomUUID().toString();
            producer.send(new ProducerRecord<>(TOPIC, Integer.toString(i), uuid));
            logger.info("send message success key:{}, value:{}", i, uuid);
        }
        producer.close();
    }

}

log4j.properties

# Global logging configuration 開發時候建議使用 debug
log4j.rootLogger=info, stdout
# Console output...
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%5p [%t] - %m%n

基於spirngboot整合kafka

pom:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">

    <artifactId>spring-boot-kafka</artifactId>
    <groupId>com.andy</groupId>
    <version>1.0.7.RELEASE</version>
    
    <packaging>jar</packaging>
    <modelVersion>4.0.0</modelVersion>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>io.spring.platform</groupId>
                <artifactId>platform-bom</artifactId>
                <version>Cairo-SR5</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

    </dependencies>


    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.7.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>2.0.3.RELEASE</version>
                <configuration>
                    <!--<mainClass>${start-class}</mainClass>-->
                    <layout>ZIP</layout>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

application.yml

spring:
  application:
    name: spring-jms
  kafka:
    bootstrap-servers: node-2:9092,node-3:9092,node-4:9092
    producer:
      retries:
      batch-size: 16384
      buffer-memory: 33554432
      compressionType: snappy
      acks: all
    consumer:
      group-id: 0
      auto-offset-reset: earliest
      enable-auto-commit: true

message:


/**
 * <p>
 *
 * @author leone
 * @since 2018-12-26
 **/
@ToString
public class Message<T> {

    private Long id;

    private T message;

    private Date time;

    public Long getId() {
        return id;
    }

    public void setId(Long id) {
        this.id = id;
    }

    public T getMessage() {
        return message;
    }

    public void setMessage(T message) {
        this.message = message;
    }

    public Date getTime() {
        return time;
    }

    public void setTime(Date time) {
        this.time = time;
    }
}

controller

import com.andy.jms.kafka.service.KafkaSender;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

/**
 * <p> 
 *
 * @author leone
 * @since 2018-12-26
 **/
@Slf4j
@RestController
public class KafkaController {

    @Autowired
    private KafkaSender kafkaSender;

    @GetMapping("/kafka/{topic}")
    public String send(@PathVariable("topic") String topic, @RequestParam String message) {
        kafkaSender.send(topic, message);
        return "success";
    }

}

KafkaReceiver:

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Optional;

/**
 * <p> 
 *
 * @author leone
 * @since 2018-12-26
 **/
@Slf4j
@Component
public class KafkaReceiver {


    @KafkaListener(topics = {"order"})
    public void listen(ConsumerRecord<?, ?> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            log.info("record:{}", record);
            log.info("message:{}", message);
        }
    }
}

KafkaSender:

import com.andy.jms.kafka.commen.Message;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * <p>
 *
 * @author leone
 * @since 2018-12-26
 **/
@Slf4j
@Component
public class KafkaSender {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Autowired
    private ObjectMapper objectMapper;

    /**
     *
     * @param topic
     * @param body
     */
    public void send(String topic, Object body) {
        Message<String> message = new Message<>();
        message.setId(System.currentTimeMillis());
        message.setMessage(body.toString());
        message.setTime(new Date());
        String content = null;
        try {
            content = objectMapper.writeValueAsString(message);
        } catch (JsonProcessingException e) {
            e.printStackTrace();
        }
        kafkaTemplate.send(topic, content);
        log.info("send {} to {} success!", message, topic);
    }
}

main:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author Leone
 * @since 2018-04-10
 **/
@SpringBootApplication
public class JmsApplication {
    public static void main(String[] args) {
        SpringApplication.run(JmsApplication.class, args);
    }
}

github地址