1. 程式人生 > 實用技巧 >kafka springboot ( 或者 springcloud ) 整合

kafka springboot ( 或者 springcloud ) 整合


《SpringCloud Nginx 高併發核心程式設計》 環境搭建 - 系列

元件 連結地址
【必須】 虛擬機器Linux 開發環境準備 windows vmware 擴充套件硬碟 + 共享檔案
Linux 自啟動 假死自啟 定時自啟 Linux 自啟動 假死啟動
【必須】Linux Redis 安裝(帶視訊) Linux Redis 安裝(帶視訊)
【必須】Linux Zookeeper 安裝(帶視訊) Linux Zookeeper 安裝, 帶視訊
Windows Redis 安裝(帶視訊) Windows Redis 安裝(帶視訊)
RabbitMQ 離線安裝(帶視訊) RabbitMQ 離線安裝(帶視訊)
ElasticSearch 安裝, 帶視訊 ElasticSearch 安裝, 帶視訊
Nacos 安裝(帶視訊) Nacos 安裝(帶視訊)
【必須】Eureka Eureka 入門,帶視訊
【必須】springcloud Config 入門,帶視訊 springcloud Config 入門,帶視訊
【必須】SpringCloud 腳手架打包與啟動 SpringCloud腳手架打包與啟動

1 Apache Kafka 簡介

Kafka是最初由Linkedin公司開發,是一個分散式、分割槽的、多副本的、多訂閱者,基於zookeeper協調的分散式日誌系統(也可以當做MQ系統),常見可以用於web/nginx日誌、訪問日誌,訊息服務等等,Linkedin於2010年貢獻給了Apache基金會併成為頂級開源專案。

2 Apache Kafka 安裝

1.1 - 驗證Java是否安裝

希望你已經在你的機器上安裝了java,所以你只需使用下面的命令驗證它。

$ java -version

如果java在您的機器上成功安裝,您可以看到已安裝的Java的版本。

1.2 - 驗證ZooKeeper是否安裝

  • Apache Kafka 的執行依賴了ZooKeeper,所以安裝前,需要檢查ZooKeeper是否已經安裝

  • 驗證ZooKeeper安裝命令為:

/work/zookeeper/zookeeper-1/bin/zkServer.sh  status

具體的結果如下:


[root@localhost work]# /work/zookeeper/zookeeper-1/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /work/zookeeper/zookeeper-1/bin/../conf/zoo.cfg
Mode: follower
[root@localhost work]# /work/zookeeper/zookeeper-2/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /work/zookeeper/zookeeper-2/bin/../conf/zoo.cfg
Mode: leader

下載kafka

下載地址為:

http://kafka.apache.org/downloads , 瘋狂創客圈網盤也已經備好

建議下載1.1以前的版本,如果kafka_2.11-1.0.2, 安裝的時候問題比較少, 然後將kafka 安裝包上傳到 虛擬機器

3 單節點安裝

步驟3.2 - 解壓tar檔案

現在您已經在您的機器上下載了最新版本的Kafka, 使用以下命令提取tar檔案, 也就是解壓縮 -

$ cd /work/
$ tar -zxvf kafka_2.11-1.0.2.tgz
$ cd kafka_2.11-1.0.2
[root@localhost kafka_2.11-1.0.2]# ll
total 52
drwxr-xr-x 3 root root  4096 Apr  7  2020 bin
drwxr-xr-x 2 root root  4096 Apr  7  2020 config
drwxr-xr-x 2 root root  4096 Nov 23 22:23 libs
-rw-r--r-- 1 root root 32216 Apr  7  2020 LICENSE
-rw-r--r-- 1 root root   337 Apr  7  2020 NOTICE
drwxr-xr-x 2 root root  4096 Apr  7  2020 site-docs

步驟3.2 - 建立日誌目錄與環境變數

[root@localhost ~]#  cd /work/kafka_2.11-1.0.2/

[root@localhostkafka_2.11-1.0.2]#  mkdir -p logs/kafka1-logs

建立環境變數 vi /etc/profile

export KAFKA_HOME=/work/kafka_2.11-1.0.2

修改配置檔案:

進入kafka的config目錄下,有一個server.properties,主要修改的地方如下:

broker的全域性唯一編號,不能重複
broker.id=1
監聽
listeners=PLAINTEXT://192.168.233.128:9092

advertised.listeners=PLAINTEXT://192.168.233.128:9092

日誌目錄
log.dirs=/work/kafka_2.11-1.0.2/logs/kafka1-logs
配置zookeeper的連線(如果不是本機,需要該為ip或主機名)
zookeeper.connect=localhost:2181

vi /work/kafka_2.11-1.0.2/config/server.properties

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# see kafka.server.KafkaConfig for additional details and defaults

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=1

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092
listeners=PLAINTEXT://192.168.233.128:9092


# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
num.network.threads=3

# The number of threads that the server uses for processing requests, which may include disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma separated list of directories under which to store log files
log.dirs=/work/kafka_2.11-1.0.2/logs/kafka1-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=1

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log unless the remaining
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000


############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms.
# The default value for this is 3 seconds.
# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing.
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0

啟動Kafka 並且測試

$ nohup $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties  2>&1 &

列印的日誌資訊沒有報錯,可以看到如下資訊

[root@localhost ~]#  $KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties
[2020-11-25 21:59:42,557] INFO KafkaConfig values:
        advertised.host.name = null
        advertised.listeners = null
        advertised.port = null
        alter.config.policy.class.name = null
        authorizer.class.name =
        auto.create.topics.enable = true
        auto.leader.rebalance.enable = true
        background.threads = 10
        broker.id = 1
        broker.id.generation.enable = true
        broker.rack = null
        compression.type = producer
        connections.max.idle.ms = 600000
        controlled.shutdown.enable = true
        controlled.shutdown.max.retries = 3
        controlled.shutdown.retry.backoff.ms = 5000
        controller.socket.timeout.ms = 30000
        create.topic.policy.class.name = null
        default.replication.factor = 1
        delete.records.purgatory.purge.interval.requests = 1
        delete.topic.enable = true
        fetch.purgatory.purge.interval.requests = 1000
        group.initial.rebalance.delay.ms = 0
        group.max.session.timeout.ms = 300000
        group.min.session.timeout.ms = 6000
        host.name =
        inter.broker.listener.name = null
        inter.broker.protocol.version = 1.0-IV0
        leader.imbalance.check.interval.seconds = 300
        leader.imbalance.per.broker.percentage = 10
        listener.security.protocol.map = PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
        listeners = PLAINTEXT://192.168.233.128:9092
        log.cleaner.backoff.ms = 15000
        log.cleaner.dedupe.buffer.size = 134217728
        log.cleaner.delete.retention.ms = 86400000
        log.cleaner.enable = true
        log.cleaner.io.buffer.load.factor = 0.9
        log.cleaner.io.buffer.size = 524288
        log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
        log.cleaner.min.cleanable.ratio = 0.5
        log.cleaner.min.compaction.lag.ms = 0
        log.cleaner.threads = 1
        log.cleanup.policy = [delete]
        log.dir = /tmp/kafka-logs
        log.dirs = /work/kafka_2.11-1.0.2/logs/kafka1-logs
        log.flush.interval.messages = 9223372036854775807
        log.flush.interval.ms = null
        log.flush.offset.checkpoint.interval.ms = 60000
        log.flush.scheduler.interval.ms = 9223372036854775807
        log.flush.start.offset.checkpoint.interval.ms = 60000
        log.index.interval.bytes = 4096
        log.index.size.max.bytes = 10485760
        log.message.format.version = 1.0-IV0
        log.message.timestamp.difference.max.ms = 9223372036854775807
        log.message.timestamp.type = CreateTime
        log.preallocate = false
        log.retention.bytes = -1
        log.retention.check.interval.ms = 300000
        log.retention.hours = 168
        log.retention.minutes = null
        log.retention.ms = null
        log.roll.hours = 168
        log.roll.jitter.hours = 0
        log.roll.jitter.ms = null
        log.roll.ms = null
        log.segment.bytes = 1073741824
        log.segment.delete.delay.ms = 60000
        max.connections.per.ip = 2147483647
        max.connections.per.ip.overrides =
        message.max.bytes = 1000012
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        min.insync.replicas = 1
        num.io.threads = 8
        num.network.threads = 3
        num.partitions = 1
        num.recovery.threads.per.data.dir = 1
        num.replica.fetchers = 1
        offset.metadata.max.bytes = 4096
        offsets.commit.required.acks = -1
        offsets.commit.timeout.ms = 5000
        offsets.load.buffer.size = 5242880
        offsets.retention.check.interval.ms = 600000
        offsets.retention.minutes = 1440
        offsets.topic.compression.codec = 0
        offsets.topic.num.partitions = 50
        offsets.topic.replication.factor = 1
        offsets.topic.segment.bytes = 104857600
        port = 9092
        principal.builder.class = null
        producer.purgatory.purge.interval.requests = 1000
        queued.max.request.bytes = -1
        queued.max.requests = 500
        quota.consumer.default = 9223372036854775807
        quota.producer.default = 9223372036854775807
        quota.window.num = 11
        quota.window.size.seconds = 1
        replica.fetch.backoff.ms = 1000
        replica.fetch.max.bytes = 1048576
        replica.fetch.min.bytes = 1
        replica.fetch.response.max.bytes = 10485760
        replica.fetch.wait.max.ms = 500
        replica.high.watermark.checkpoint.interval.ms = 5000
        replica.lag.time.max.ms = 10000
        replica.socket.receive.buffer.bytes = 65536
        replica.socket.timeout.ms = 30000
        replication.quota.window.num = 11
        replication.quota.window.size.seconds = 1
        request.timeout.ms = 30000
        reserved.broker.max.id = 1000
        sasl.enabled.mechanisms = [GSSAPI]
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.principal.to.local.rules = [DEFAULT]
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.mechanism.inter.broker.protocol = GSSAPI
        security.inter.broker.protocol = PLAINTEXT
        socket.receive.buffer.bytes = 102400
        socket.request.max.bytes = 104857600
        socket.send.buffer.bytes = 102400
        ssl.cipher.suites = null
        ssl.client.auth = none
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        transaction.abort.timed.out.transaction.cleanup.interval.ms = 60000
        transaction.max.timeout.ms = 900000
        transaction.remove.expired.transaction.cleanup.interval.ms = 3600000
        transaction.state.log.load.buffer.size = 5242880
        transaction.state.log.min.isr = 1
        transaction.state.log.num.partitions = 50
        transaction.state.log.replication.factor = 1
        transaction.state.log.segment.bytes = 104857600
        transactional.id.expiration.ms = 604800000
        unclean.leader.election.enable = false
        zookeeper.connect = localhost:2181
        zookeeper.connection.timeout.ms = 6000
        zookeeper.session.timeout.ms = 6000
        zookeeper.set.acl = false
        zookeeper.sync.time.ms = 2000
 (kafka.server.KafkaConfig)
[2020-11-25 21:59:42,694] INFO starting (kafka.server.KafkaServer)
[2020-11-25 21:59:42,699] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer)
[2020-11-25 21:59:42,878] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread)
[2020-11-25 21:59:42,886] INFO Client environment:zookeeper.version=3.4.10-39d3a4f269333c922ed3db283be479f9deacaa0f, built on 03/23/2017 10:13 GMT (org.apache.zookeeper.ZooKeeper)
[2020-11-25 21:59:42,886] INFO Client environment:host.name=localhost (org.apache.zookeeper.ZooKeeper)
[2020-11-25 21:59:42,886] INFO Client environment:java.version=1.8.0_11 (org.apache.zookeeper.ZooKeeper)
[2020-11-25 21:59:42,886] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper)
[2020-11-25 21:59:42,886] INFO Client environment:java.home=/work/java/jdk1.8.0_11/jre (org.apache.zookeeper.ZooKeeper)
[2020-11-25 21:59:42,886] INFO Client environment:java.class.path=.:/work/java/jdk1.8.0_11/lib/dt.jar:/work/java/jdk1.8.0_11/lib/tools.jar:/work/kafka_2.11-1.0.2/bin/../libs/aopalliance-repackaged-2.5.0-b32.jar:/work/kafka_2.11-1.0.2/bin/../libs/argparse4j-0.7.0.jar:/work/kafka_2.11-1.0.2/bin/../libs/commons-lang3-3.5.jar:/work/kafka_2.11-1.0.2/bin/../libs/connect-api-1.0.2.jar:/work/kafka_2.11-1.0.2/bin/../libs/connect-file-1.0.2.jar:/work/kafka_2.11-1.0.2/bin/../libs/connect-json-1.0.2.jar:/work/kafka_2.11-1.0.2/bin/../libs/connect-runtime-1.0.2.jar:/work/kafka_2.11-1.0.2/bin/../libs/connect-transforms-1.0.2.jar:/work/kafka_2.11-1.0.2/bin/../libs/guava-20.0.jar:/work/kafka_2.11-1.0.2/bin/../libs/hk2-api-2.5.0-b32.jar:/work/kafka_2.11-1.0.2/bin/../libs/hk2-locator-2.5.0-b32.jar:/work/kafka_2.11-1.0.2/bin/../libs/hk2-utils-2.5.0-b32.jar:/work/kafka_2.11-1.0.2/bin/../libs/jackson-annotations-2.9.6.jar:/work/kafka_2.11-1.0.2/bin/../libs/jackson-core-2.9.6.jar:/work/kafka_2.11-1.0.2/bin/../libs/jackson-databind-2.9.6.jar:/work/kafka_2.11-1.0.2/bin/../libs/jackson-jaxrs-base-2.9.6.jar:/work/kafka_2.11-1.0.2/bin/../libs/jackson-jaxrs-json-provider-2.9.6.jar:/work/kafka_2.11-1.0.2/bin/../libs/jackson-module-jaxb-annotations-2.9.6.jar:/work/kafka_2.11-1.0.2/bin/../libs/javassist-3.20.0-GA.jar:/work/kafka_2.11-1.0.2/bin/../libs/javassist-3.21.0-GA.jar:/work/kafka_2.11-1.0.2/bin/../libs/javax.annotation-api-1.2.jar:/work/kafka_2.11-1.0.2/bin/../libs/javax.inject-1.jar:/work/kafka_2.11-1.0.2/bin/../libs/javax.inject-2.5.0-b32.jar:/work/kafka_2.11-1.0.2/bin/../libs/javax.servlet-api-3.1.0.jar:/work/kafka_2.11-1.0.2/bin/../libs/javax.ws.rs-api-2.0.1.jar:/work/kafka_2.11-1.0.2/bin/../libs/jersey-client-2.25.1.jar:/work/kafka_2.11-1.0.2/bin/../libs/jersey-common-2.25.1.jar:/work/kafka_2.11-1.0.2/bin/../libs/jersey-container-servlet-2.25.1.jar:/work/kafka_2.11-1.0.2/bin/../libs/jersey-container-servlet-core-2.25.1.jar:/work/kafka_2.11-1.0.2/bin/../libs/jersey-guava-2.25.1.jar:/work/kafka_2.11-1.0.2/bin/../libs/jersey-media-jaxb-2.25.1.jar:/work/kafka_2.11-1.0.2/bin/../libs/jersey-server-2.25.1.jar:/work/kafka_2.11-1.0.2/bin/../libs/jetty-continuation-9.2.22.v20170606.jar:/work/kafka_2.11-1.0.2/bin/../libs/jetty-http-9.2.22.v20170606.jar:/work/kafka_2.11-1.0.2/bin/../libs/jetty-io-9.2.22.v20170606.jar:/work/kafka_2.11-1.0.2/bin/../libs/jetty-security-9.2.22.v20170606.jar:/work/kafka_2.11-1.0.2/bin/../libs/jetty-server-9.2.22.v20170606.jar:/work/kafka_2.11-1.0.2/bin/../libs/jetty-servlet-9.2.22.v20170606.jar:/work/kafka_2.11-1.0.2/bin/../libs/jetty-servlets-9.2.22.v20170606.jar:/work/kafka_2.11-1.0.2/bin/../libs/jetty-util-9.2.22.v20170606.jar:/work/kafka_2.11-1.0.2/bin/../libs/jopt-simple-5.0.4.jar:/work/kafka_2.11-1.0.2/bin/../libs/kafka_2.11-1.0.2.jar:/work/kafka_2.11-1.0.2/bin/../libs/kafka_2.11-1.0.2-sources.jar:/work/kafka_2.11-1.0.2/bin/../libs/kafka_2.11-1.0.2-test-sources.jar:/work/kafka_2.11-1.0.2/bin/../libs/kafka-clients-1.0.2.jar:/work/kafka_2.11-1.0.2/bin/../libs/kafka-log4j-appender-1.0.2.jar:/work/kafka_2.11-1.0.2/bin/../libs/kafka-streams-1.0.2.jar:/work/kafka_2.11-1.0.2/bin/../libs/kafka-streams-examples-1.0.2.jar:/work/kafka_2.11-1.0.2/bin/../libs/kafka-tools-1.0.2.jar:/work/kafka_2.11-1.0.2/bin/../libs/log4j-1.2.17.jar:/work/kafka_2.11-1.0.2/bin/../libs/lz4-java-1.4.jar:/work/kafka_2.11-1.0.2/bin/../libs/maven-artifact-3.5.0.jar:/work/kafka_2.11-1.0.2/bin/../libs/metrics-core-2.2.0.jar:/work/kafka_2.11-1.0.2/bin/../libs/osgi-resource-locator-1.0.1.jar:/work/kafka_2.11-1.0.2/bin/../libs/plexus-utils-3.0.24.jar:/work/kafka_2.11-1.0.2/bin/../libs/reflections-0.9.11.jar:/work/kafka_2.11-1.0.2/bin/../libs/rocksdbjni-5.7.3.jar:/work/kafka_2.11-1.0.2/bin/../libs/scala-library-2.11.12.jar:/work/kafka_2.11-1.0.2/bin/../libs/slf4j-api-1.7.25.jar:/work/kafka_2.11-1.0.2/bin/../libs/slf4j-log4j12-1.7.25.jar:/work/kafka_2.11-1.0.2/bin/../libs/snappy-java-1.1.4.jar:/work/kafka_2.11-1.0.2/bin/../libs/validation-api-1.1.0.Final.jar:/work/kafka_2.11-1.0.2/bin/../libs/zkclient-0.10.jar:/work/kafka_2.11-1.0.2/bin/../libs/zookeeper-3.4.10.jar (org.apache.zookeeper.ZooKeeper)
[2020-11-25 21:59:42,887] INFO Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper)
[2020-11-25 21:59:42,887] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)
[2020-11-25 21:59:42,887] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)
[2020-11-25 21:59:42,887] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)
[2020-11-25 21:59:42,887] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)
[2020-11-25 21:59:42,887] INFO Client environment:os.version=3.10.0-123.el7.x86_64 (org.apache.zookeeper.ZooKeeper)
[2020-11-25 21:59:42,887] INFO Client environment:user.name=root (org.apache.zookeeper.ZooKeeper)
[2020-11-25 21:59:42,887] INFO Client environment:user.home=/root (org.apache.zookeeper.ZooKeeper)
[2020-11-25 21:59:42,887] INFO Client environment:user.dir=/root (org.apache.zookeeper.ZooKeeper)
[2020-11-25 21:59:42,888] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@481a996b (org.apache.zookeeper.ZooKeeper)
[2020-11-25 21:59:42,991] INFO Waiting for keeper state SyncConnected (org.I0Itec.zkclient.ZkClient)
[2020-11-25 21:59:42,999] INFO Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2020-11-25 21:59:43,012] INFO Socket connection established to localhost/127.0.0.1:2181, initiating session (org.apache.zookeeper.ClientCnxn)
[2020-11-25 21:59:43,086] INFO Session establishment complete on server localhost/127.0.0.1:2181, sessionid = 0x1006049103f0000, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2020-11-25 21:59:43,094] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient)
[2020-11-25 21:59:44,369] INFO Cluster ID = 4MOhHbbzS42FdvekFfLwTQ (kafka.server.KafkaServer)
[2020-11-25 21:59:44,381] WARN No meta.properties file under dir /work/kafka_2.11-1.0.2/logs/kafka1-logs/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2020-11-25 21:59:44,412] INFO [ThrottledRequestReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2020-11-25 21:59:44,429] INFO [ThrottledRequestReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2020-11-25 21:59:44,442] INFO [ThrottledRequestReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper)
[2020-11-25 21:59:44,541] INFO Loading logs. (kafka.log.LogManager)
[2020-11-25 21:59:44,547] INFO Logs loading complete in 6 ms. (kafka.log.LogManager)
[2020-11-25 21:59:45,086] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
[2020-11-25 21:59:45,095] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
[2020-11-25 21:59:45,394] INFO Awaiting socket connections on 192.168.233.128:9092. (kafka.network.Acceptor)
[2020-11-25 21:59:45,399] INFO [SocketServer brokerId=1] Started 1 acceptor threads (kafka.network.SocketServer)
[2020-11-25 21:59:45,422] INFO [ExpirationReaper-1-Produce]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2020-11-25 21:59:45,423] INFO [ExpirationReaper-1-Fetch]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2020-11-25 21:59:45,427] INFO [ExpirationReaper-1-DeleteRecords]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2020-11-25 21:59:45,438] INFO [LogDirFailureHandler]: Starting (kafka.server.ReplicaManager$LogDirFailureHandler)
[2020-11-25 21:59:45,646] INFO Creating /controller (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2020-11-25 21:59:45,648] INFO [ExpirationReaper-1-topic]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2020-11-25 21:59:45,651] INFO [ExpirationReaper-1-Heartbeat]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2020-11-25 21:59:45,658] INFO [ExpirationReaper-1-Rebalance]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2020-11-25 21:59:45,698] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2020-11-25 21:59:45,701] INFO [GroupCoordinator 1]: Starting up. (kafka.coordinator.group.GroupCoordinator)
[2020-11-25 21:59:45,701] INFO [GroupCoordinator 1]: Startup complete. (kafka.coordinator.group.GroupCoordinator)
[2020-11-25 21:59:45,705] INFO [GroupMetadataManager brokerId=1] Removed 0 expired offsets in 1 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2020-11-25 21:59:45,718] INFO [ProducerId Manager 1]: Acquired new producerId block (brokerId:1,blockStartProducerId:0,blockEndProducerId:999) by writing to Zk with path version 1 (kafka.coordinator.transaction.ProducerIdManager)
[2020-11-25 21:59:45,741] INFO [TransactionCoordinator id=1] Starting up. (kafka.coordinator.transaction.TransactionCoordinator)
[2020-11-25 21:59:45,771] INFO [TransactionCoordinator id=1] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
[2020-11-25 21:59:45,774] INFO [Transaction Marker Channel Manager 1]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2020-11-25 21:59:45,807] INFO Creating /brokers/ids/1 (is it secure? false) (kafka.utils.ZKCheckedEphemeral)
[2020-11-25 21:59:45,811] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral)
[2020-11-25 21:59:45,812] INFO Registered broker 1 at path /brokers/ids/1 with addresses: EndPoint(192.168.233.128,9092,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.utils.ZkUtils)
[2020-11-25 21:59:45,813] WARN No meta.properties file under dir /work/kafka_2.11-1.0.2/logs/kafka1-logs/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2020-11-25 21:59:45,893] INFO [SocketServer brokerId=1] Started processors for 1 acceptors (kafka.network.SocketServer)
[2020-11-25 21:59:45,894] INFO Kafka version : 1.0.2 (org.apache.kafka.common.utils.AppInfoParser)
[2020-11-25 21:59:45,894] INFO Kafka commitId : 2a121f7b1d402825 (org.apache.kafka.common.utils.AppInfoParser)
[2020-11-25 21:59:45,895] INFO [KafkaServer id=1] started (kafka.server.KafkaServer)

測試kafka

但是並不能保證Kafka已經啟動成功,輸入jps檢視程序,如果可以看到Kafka程序,表示啟動成功

[hadoop@Master ~]$ jps
9173 Kafka
9462 Jps
8589 QuorumPeerMain
[hadoop@Master ~]$ jps -m
9472 Jps -m
9173 Kafka /opt/kafka/config/server.properties
8589 QuorumPeerMain /opt/zookeeper/bin/../conf/zoo.cfg

建立topic

[hadoop@Master ~]$ $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper 192.168.233.128:2181 --replication-factor 1 --partitions 1 --topic test

引數說明:
– zookeeper:指定kafka連線zk的連線url,該值和server.properties檔案中的配置項{zookeeper.connect}一樣

這裡為 192.168.233.128:2181

– replication-factor:指定副本數量
– partitions:指定分割槽數量
– topic:主題名稱

[root@localhost ~]# $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper 192.168.233.128:2181 --replication-factor 1 --partitions 1 --topic test
Created topic "test".

檢視所有的topic資訊

[hadoop@Master ~]$  $KAFKA_HOME/bin/kafka-topics.sh --list --zookeeper 192.168.233.128:2181 

結果如下;


[root@localhost ~]#  $KAFKA_HOME/bin/kafka-topics.sh --list --zookeeper 192.168.233.128:9092 
test

啟動測試生產者

[hadoop@Master ~]$ $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list 192.168.233.128:9092  --topic test

注意,命令中的埠,是kafka的埠

執行上述命令後,就會在控制檯等待鍵入訊息體,直接輸入訊息值(value)即可,每行(以換行符分隔)表示一條訊息,如下所示。

>Hello Kafka!
>你好 kafka!

正常情況,每次回車表示觸發“傳送”操作,回車後可直接使用“Ctrl + c”退出生產者控制檯,再使用 kafka-console-consumer.sh 指令碼驗證本次的生產情況。

啟動測試消費者

[hadoop@Master ~]$ $KAFKA_HOME/bin/kafka-console-consumer.sh --zookeeper 192.168.233.128:9092 --topic test --from-beginning

注意:

  • 1 命令中的埠,是zookeeper 的埠

  • –from-beginning引數如果有表示從最開始消費資料,舊的和新的資料都會被消費,而沒有該引數表示只會消費新產生的資料

執行效果

傳送端的執行效果

[root@localhost ~]# $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list 192.168.233.128:9092  --topic test
>aaa bbbb
>ccc fff
>Hello Kafka!
>你好 kafka!
>



接收端的執行效果

[root@localhost ~]# $KAFKA_HOME/bin/kafka-console-consumer.sh --zookeeper 192.168.233.128:2181 --topic test --from-beginning
Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
aaa bbbb
ccc fff
Hello Kafka!
你好 kafka!

4 叢集模式 節點安裝

  • config/server.properties複製三份,分別命名為server1.properties,server2.properties,server3.properties
  • 修改server1.properties
- - broker.id=1
  - listeners=PLAINTEXT://:9092
  - advertised.listeners=PLAINTEXT://192.168.233.128:9092(其中192.168.233.128是我本機的ip)
  - log.dirs=/work/kafka_2.11-1.0.2/logs/kafka1-logs
  - zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
  • 同理,修改server2.properties
- - broker.id=2
  - listeners=PLAINTEXT://:9093
  - advertised.listeners=PLAINTEXT://192.168.233.128:9093(其中192.168.233.128是我本機的ip)
  - log.dirs=/work/kafka_2.11-1.0.2/logs/kafka2-logs
  - zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
  • 同理,修改server3.properties
- - broker.id=3
  - listeners=PLAINTEXT://:9094
  - advertised.listeners=PLAINTEXT://192.168.233.128:9094(其中192.168.233.128是我本機的ip)
  - log.dirs=/work/kafka_2.11-1.0.2/logs/kafka3-logs
  - zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
  • 然後執行以下命令
nohup /work/kafka_2.11-1.0.2/bin/kafka-server-start.sh /work/kafka_2.11-1.0.2/config/server3.properties > /work/kafka_2.11-1.0.2/logs/kafka3-logs/startup.log 2>&1 &
nohup /work/kafka_2.11-1.0.2/bin/kafka-server-start.sh /work/kafka_2.11-1.0.2/config/server2.properties > /work/kafka_2.11-1.0.2/logs/kafka2-logs/startup.log 2>&1 &
nohup /work/kafka_2.11-1.0.2/bin/kafka-server-start.sh /work/kafka_2.11-1.0.2/config/server1.properties > /work/kafka_2.11-1.0.2/logs/kafka1-logs/startup.log 2>&1 &
  • 通過startup.log,或者同級目錄下的server.log檢視是否有報錯即可。

5 訊息系統的型別

一個訊息系統負責將資料從一個應用傳遞到另外一個應用,應用只需關注於資料,無需關注資料在兩個或多個應用間是如何傳遞的。分散式訊息傳遞基於可靠的訊息佇列,在客戶端應用和訊息系統之間非同步傳遞訊息。有兩種主要的訊息傳遞模式:點對點傳遞模式、釋出-訂閱模式

5.1 點對點訊息傳遞模式

在點對點訊息系統中,訊息持久化到一個佇列中。此時,將有一個或多個消費者消費佇列中的資料。但是一條訊息只能被消費一次。當一個消費者消費了佇列中的某條資料之後,該條資料則從訊息佇列中刪除。該模式即使有多個消費者同時消費資料,也能保證資料處理的順序。這種架構描述示意圖如下:

生產者傳送一條訊息到queue,只有一個消費者能收到

5.2 釋出-訂閱訊息傳遞模式(kafka)

在釋出-訂閱訊息系統中,訊息被持久化到一個topic中。與點對點訊息系統不同的是,消費者可以訂閱一個或多個topic,消費者可以消費該topic中所有的資料,同一條資料可以被多個消費者消費,資料被消費後不會立馬刪除。在釋出-訂閱訊息系統中,訊息的生產者稱為釋出者,消費者稱為訂閱者。該模式的示例圖如下:

釋出者傳送到topic的訊息,只有訂閱了topic的訂閱者才會收到訊息

 如上圖所示,釋出訂閱模式是一個基於訊息送的訊息傳送模型,改模型可以有多種不同的訂閱者。生產者將訊息放入訊息佇列後,佇列會將訊息推送給訂閱過該類訊息的消費者(類似微信公眾號)。

大部分的訊息系統選用釋出-訂閱模式。Kafka就是一種釋出-訂閱模式

6、Kafka中的術語解釋

6.1 概述

在深入理解Kafka之前,先介紹一下Kafka中的術語。下圖展示了Kafka的相關術語以及之間的關係:

上圖中, 一個topic配置了3個partition。叢集中的每個broker儲存一個或多個partition。

Partition1有兩個offset:0和1。Partition2有4個offset。Partition3有1個offset。副本的id和副本所在的機器的id恰好相同。

如果一個topic的副本數為3,那麼Kafka將在叢集中為每個partition建立3個相同的副本。多個producer和consumer可同時生產和消費資料。

6.2 broker

Kafka 叢集包含一個或多個伺服器,伺服器節點稱為broker。

broker儲存topic的資料。如果某topic有N個partition,叢集有N個broker,那麼每個broker儲存該topic的一個partition。

如果某topic有N個partition,叢集有(N+M)個broker,那麼其中有N個broker儲存該topic的一個partition,剩下的M個broker不儲存該topic的partition資料。

如果某topic有N個partition,叢集中broker數目少於N個,那麼一個broker儲存該topic的一個或多個partition。在實際生產環境中,儘量避免這種情況的發生,這種情況容易導致Kafka叢集資料不均衡。

6.3 Topic

每條釋出到Kafka叢集的訊息都有一個類別,這個類別被稱為Topic。(物理上不同Topic的訊息分開儲存,邏輯上一個Topic的訊息雖然保存於一個或多個broker上但使用者只需指定訊息的Topic即可生產或消費資料而不必關心資料存於何處)

類似於資料庫的表名

6.3 Partition

topic中的資料分割為一個或多個partition。每個topic至少有一個partition。每個partition中的資料使用多個segment檔案儲存。partition中的資料是有序的,不同partition間的資料丟失了資料的順序。如果topic有多個partition,消費資料時就不能保證資料的順序。在需要嚴格保證訊息的消費順序的場景下,需要將partition數目設為1。

6.4 Producer

生產者即資料的釋出者,該角色將訊息釋出到Kafka的topic中。broker接收到生產者傳送的訊息後,broker將該訊息追加到當前用於追加資料的segment檔案中。生產者傳送的訊息,儲存到一個partition中,生產者也可以指定資料儲存的partition。

6.5 Consumer

消費者可以從broker中讀取資料。消費者可以消費多個topic中的資料。

6.6 Consumer Group

每個Consumer屬於一個特定的Consumer Group(可為每個Consumer指定group name,若不指定group name則屬於預設的group)。

6.7 Leader

每個partition有多個副本,其中有且僅有一個作為Leader,Leader是當前負責資料的讀寫的partition。

6.8 Follower

Follower跟隨Leader,所有寫請求都通過Leader路由,資料變更會廣播給所有Follower,Follower與Leader保持資料同步。如果Leader失效,則從Follower中選舉出一個新的Leader。當Follower與Leader掛掉、卡住或者同步太慢,leader會把這個follower從“in sync replicas”(ISR)列表中刪除,重新建立一個Follower。

7、常用Message Queue對比

7.1 RabbitMQ

RabbitMQ是使用Erlang編寫的一個開源的訊息佇列,本身支援很多的協議:AMQP,XMPP, SMTP, STOMP,也正因如此,它非常重量級,更適合於企業級的開發。同時實現了Broker構架,這意味著訊息在傳送給客戶端時先在中心佇列排隊。對路由,負載均衡或者資料持久化都有很好的支援。

7.2 Redis

Redis是一個基於Key-Value對的NoSQL資料庫,開發維護很活躍。雖然它是一個Key-Value資料庫儲存系統,但它本身支援MQ功能,所以完全可以當做一個輕量級的佇列服務來使用。對於RabbitMQ和Redis的入隊和出隊操作,各執行100萬次,每10萬次記錄一次執行時間。測試資料分為128Bytes、512Bytes、1K和10K四個不同大小的資料。實驗表明:入隊時,當資料比較小時Redis的效能要高於RabbitMQ,而如果資料大小超過了10K,Redis則慢的無法忍受;出隊時,無論資料大小,Redis都表現出非常好的效能,而RabbitMQ的出隊效能則遠低於Redis。

7.3 ZeroMQ

ZeroMQ號稱最快的訊息佇列系統,尤其針對大吞吐量的需求場景。ZeroMQ能夠實現RabbitMQ不擅長的高階/複雜的佇列,但是開發人員需要自己組合多種技術框架,技術上的複雜度是對這MQ能夠應用成功的挑戰。ZeroMQ具有一個獨特的非中介軟體的模式,你不需要安裝和執行一個訊息伺服器或中介軟體,因為你的應用程式將扮演這個伺服器角色。你只需要簡單的引用ZeroMQ程式庫,可以使用NuGet安裝,然後你就可以愉快的在應用程式之間傳送訊息了。但是ZeroMQ僅提供非永續性的佇列,也就是說如果宕機,資料將會丟失。其中,Twitter的Storm 0.9.0以前的版本中預設使用ZeroMQ作為資料流的傳輸(Storm從0.9版本開始同時支援ZeroMQ和Netty作為傳輸模組)。

7.4 ActiveMQ

ActiveMQ是Apache下的一個子專案。 類似於ZeroMQ,它能夠以代理人和點對點的技術實現佇列。同時類似於RabbitMQ,它少量程式碼就可以高效地實現高階應用場景。

7.5 Kafka/Jafka

Kafka是Apache下的一個子專案,是一個高效能跨語言分散式釋出/訂閱訊息佇列系統,而Jafka是在Kafka之上孵化而來的,即Kafka的一個升級版。具有以下特性:快速持久化,可以在O(1)的系統開銷下進行訊息持久化;高吞吐,在一臺普通的伺服器上既可以達到10W/s的吞吐速率;完全的分散式系統,Broker、Producer、Consumer都原生自動支援分散式,自動實現負載均衡;支援Hadoop資料並行載入,對於像Hadoop的一樣的日誌資料和離線分析系統,但又要求實時處理的限制,這是一個可行的解決方案。Kafka通過Hadoop的並行載入機制統一了線上和離線的訊息處理。Apache Kafka相對於ActiveMQ是一個非常輕量級的訊息系統,除了效能非常好之外,還是一個工作良好的分散式系統。

8 Kafka的開發

8.1開發簡單的Kafka 應用程式

簡單的傳送端程式碼

package test;

import java.util.Properties;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

public class SimpleProducer {
        private static Producer<Integer,String> producer;
        private final Properties props=new Properties();
        public SimpleProducer(){
                //定義連線的broker list
                props.put("metadata.broker.list", "192.168.1.216:9092");
                //定義序列化類 Java中物件傳輸之前要序列化
                props.put("serializer.class", "kafka.serializer.StringEncoder");
                producer = new Producer<Integer, String>(new ProducerConfig(props));
        }
        public static void main(String[] args) {
                SimpleProducer sp=new SimpleProducer();
                //定義topic
                String topic="mytopic";

                //定義要傳送給topic的訊息
                String messageStr = "This is a message";

                //構建訊息物件
                KeyedMessage<Integer, String> data = new KeyedMessage<Integer, String>(topic, messageStr);

                //推送訊息到broker
                producer.send(data);
                producer.close();
        }
}

kafka單機環境埠就是kafka broker埠9092,這裡定義topic為mytopic當然可以自己隨便定義不用考慮伺服器是否建立,對於傳送訊息的話上面程式碼是簡單的單條傳送,如果傳送資料量很大的話send方法多次推送會耗費時間,所以建議把data資料按一定量分組放到List中,最後send一下AarrayList即可,這樣速度會大幅度提高

簡單的Kafka 接收端程式碼

package test;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class SimpleHLConsumer {
        private final ConsumerConnector consumer;
        private final String topic;

        public SimpleHLConsumer(String zookeeper, String groupId, String topic) {
                Properties props = new Properties();
                //定義連線zookeeper資訊
                props.put("zookeeper.connect", zookeeper);
                //定義Consumer所有的groupID
                props.put("group.id", groupId);
                props.put("zookeeper.session.timeout.ms", "500");
                props.put("zookeeper.sync.time.ms", "250");
                props.put("auto.commit.interval.ms", "1000");
                consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
                this.topic = topic;
        }

        public void testConsumer() {
                Map<String, Integer> topicCount = new HashMap<String, Integer>();
                //定義訂閱topic數量
                topicCount.put(topic, new Integer(1));
                //返回的是所有topic的Map
                Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
                //取出我們要需要的topic中的訊息流
                List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
                for (final KafkaStream stream : streams) {
                        ConsumerIterator<byte[], byte[]> consumerIte = stream.iterator();
                        while (consumerIte.hasNext())
                                System.out.println("Message from Topic :" + new String(consumerIte.next().message()));
                }
                if (consumer != null)
                        consumer.shutdown();
        }

        public static void main(String[] args) {
                String topic = "mytopic";
                SimpleHLConsumer simpleHLConsumer = new SimpleHLConsumer("192.168.233.128:2181/kafka", "testgroup", topic);
                simpleHLConsumer.testConsumer();
        }

}

消費者程式碼主要邏輯就是對生產者傳送過來的資料做簡單處理和輸出,注意這裡的地址是zookeeper的地址並且包括節點/kafka,topic名稱要一致

8.2開發 通用kafka模組

開發一個通用的kafka模組傳送和接收模組,其他的模組,只需要呼叫該kafka模組統一的傳送介面和開發接收邏輯即可。

可以通過資料庫,進行 微服務Provider、 訂閱主題 topic、訂閱組 group 配置。 服務在訊息後,自動就那些配置的接收類的回撥。

庫表的配置如下:

該kafka模組處於 瘋狂創客圈的 Crazy-SpringCloud腳手架中, 模組名稱為 base-kafka ,啟動之後的swagger 介面如下:

可以通過該介面傳送某個topic的訊息,如果在資料庫裡配置了訂閱關係,如果 provider-name( 微服務名稱) 訂閱了 test 主題,並且配置了訊息的回撥類和方法, 那麼就會就會進行訊息的消費。

消費的介面如下:

9 Kafka 原理

來看看生產者和消費者、主題和組之間的關係:

如果看到這張圖你很懵逼,木有關係!我們先來分析相關概念
  Producer:Producer即生產者,訊息的產生者,是訊息的入口。

  kafka cluster
  

 Broker:Broker是kafka例項,每個伺服器上有一個或多個kafka的例項,我們姑且認為每個broker對應一臺伺服器。每個kafka叢集內的broker都有一個不重複的編號,如圖中的broker-0、broker-1等……
  

 Topic:訊息的主題,可以理解為訊息的分類,kafka的資料就儲存在topic。在每個broker上都可以建立多個topic。
  

 Partition:Topic的分割槽,每個topic可以有多個分割槽,分割槽的作用是做負載,提高kafka的吞吐量。同一個topic在不同的Partition分割槽的資料是不重複的,partition的表現形式就是一個一個的資料夾!
   

 Replication:每一個分割槽都有多個副本,副本的作用是做備胎。當主分割槽(Leader)故障的時候會選擇一個備胎(Follower)上位,成為Leader。在kafka中預設副本的最大數量是10個,且副本的數量不能大於Broker的數量,follower和leader絕對是在不同的機器,同一機器對同一個分割槽也只可能存放一個副本(包括自己)。

Message:每一條傳送的訊息主體。
 

 Consumer:消費者,即訊息的消費方,是訊息的出口。

  Consumer Group:我們可以將多個消費組組成一個消費者組,在kafka的設計中,同一個Partition分割槽的資料只能被消費者組中的某一個消費者消費。同一個消費者組的消費者可以消費同一個topic的不同分割槽的資料,這也是為了提高kafka的吞吐量!

  Zookeeper:kafka叢集依賴zookeeper來儲存叢集的的元資訊,來保證系統的可用性。

要點1:同一個topic在不同的Partition分割槽的資料是不重複的

要點2:同一個Partition分割槽的資料只能被消費者組中的某一個消費者消費

工作流程分析

  上面介紹了kafka的基礎架構及基本概念,不知道大家看完有沒有對kafka有個大致印象,如果對還比較懵也沒關係!我們接下來再結合上面的結構圖分析kafka的工作流程,最後再回來整個梳理一遍我相信你會更有收穫!

傳送資料

  我們看上面的架構圖中,producer就是生產者,是資料的入口。注意看圖中的紅色箭頭,Producer在寫入資料的時候永遠的找leader,不會直接將資料寫入follower!那leader怎麼找呢?寫入的流程又是什麼樣的呢?我們看下圖:

  

 

傳送的流程就在圖中已經說明了,就不單獨在文字列出來了!需要注意的一點是,訊息寫入leader後,follower是主動的去leader進行同步的!producer採用push模式將資料釋出到broker,每條訊息追加到分割槽中,順序寫入磁碟,所以保證同一分割槽內的資料是有序的!寫入示意圖如下:

  

  上面說到資料會寫入到不同的分割槽,那kafka為什麼要做分割槽呢?相信大家應該也能猜到,分割槽的主要目的是:
  1、 方便擴充套件。因為一個topic可以有多個partition,所以我們可以通過擴充套件機器去輕鬆的應對日益增長的資料量。
  2、 提高併發。以partition為讀寫單位,可以多個消費者同時消費資料,提高了訊息的處理效率。

  熟悉負載均衡的朋友應該知道,當我們向某個伺服器傳送請求的時候,服務端可能會對請求做一個負載,將流量分發到不同的伺服器,那在kafka中,如果某個topic有多個partition,producer又怎麼知道該將資料發往哪個partition呢?kafka中有幾個原則:
  1、 partition在寫入的時候可以指定需要寫入的partition,如果有指定,則寫入對應的partition。
  2、 如果沒有指定partition,但是設定了資料的key,則會根據key的值hash出一個partition。
  3、 如果既沒指定partition,又沒有設定key,則會輪詢選出一個partition。

  保證訊息不丟失是一個訊息佇列中介軟體的基本保證,那producer在向kafka寫入訊息的時候,怎麼保證訊息不丟失呢?其實上面的寫入流程圖中有描述出來,那就是通過ACK應答機制!在生產者向佇列寫入資料的時候可以設定引數來確定是否確認kafka接收到資料,這個引數可設定的值為01all
  0代表producer往叢集傳送資料不需要等到叢集的返回,不確保訊息傳送成功。安全性最低但是效率最高。
  1代表producer往叢集傳送資料只要leader應答就可以傳送下一條,只確保leader傳送成功。
  all代表producer往叢集傳送資料需要所有的follower都完成從leader的同步才會傳送下一條,確保leader傳送成功和所有的副本都完成備份。安全性最高,但是效率最低。

  最後要注意的是,如果往不存在的topic寫資料,能不能寫入成功呢?kafka會自動建立topic,分割槽和副本的數量根據預設配置都是1。

儲存資料

  Producer將資料寫入kafka後,叢集就需要對資料進行儲存了!kafka將資料儲存在磁碟,可能在我們的一般的認知裡,寫入磁碟是比較耗時的操作,不適合這種高併發的元件。Kafka初始會單獨開闢一塊磁碟空間,順序寫入資料(效率比隨機寫入高)。

Partition 結構
  前面說過了每個topic都可以分為一個或多個partition,如果你覺得topic比較抽象,那partition就是比較具體的東西了!Partition在伺服器上的表現形式就是一個一個的資料夾,每個partition的資料夾下面會有多組segment檔案,每組segment檔案又包含.index檔案、.log檔案、.timeindex檔案(早期版本中沒有)三個檔案, log檔案就實際是儲存message的地方,而index和timeindex檔案為索引檔案,用於檢索訊息。

  

  如上圖,這個partition有三組segment檔案,每個log檔案的大小是一樣的,但是儲存的message數量是不一定相等的(每條的message大小不一致)。檔案的命名是以該segment最小offset來命名的,如000.index儲存offset為0~368795的訊息,kafka就是利用分段+索引的方式來解決查詢效率的問題。

Message結構
上面說到log檔案就實際是儲存message的地方,我們在producer往kafka寫入的也是一條一條的message,那儲存在log中的message是什麼樣子的呢?訊息主要包含訊息體、訊息大小、offset、壓縮型別……等等!我們重點需要知道的是下面三個:
  1、 offset:offset是一個佔8byte的有序id號,它可以唯一確定每條訊息在parition內的位置!
  2、 訊息大小:訊息大小佔用4byte,用於描述訊息的大小。
  3、 訊息體:訊息體存放的是實際的訊息資料(被壓縮過),佔用的空間根據具體的訊息而不一樣。

儲存策略
  無論訊息是否被消費,kafka都會儲存所有的訊息。那對於舊資料有什麼刪除策略呢?
  1、 基於時間,預設配置是168小時(7天)。
  2、 基於大小,預設配置是1073741824。
  需要注意的是,kafka讀取特定訊息的時間複雜度是O(1),所以這裡刪除過期的檔案並不會提高kafka的效能!

消費資料

  訊息儲存在log檔案後,消費者就可以進行消費了。與生產訊息相同的是,消費者在拉取訊息的時候也是找leader去拉取。

  多個消費者可以組成一個消費者組(consumer group),每個消費者組都有一個組id!同一個消費組者的消費者可以消費同一topic下不同分割槽的資料,但是不會組內多個消費者消費同一分割槽的資料!!!是不是有點繞。我們看下圖:

  

  圖示是消費者組內的消費者小於partition數量的情況,所以會出現一個消費者消費多個partition資料的情況,消費的速度也就不及只處理一個partition的消費者的處理速度!

如果是消費者組的消費者多於partition的數量,那會不會出現多個消費者消費同一個partition的資料呢?

上面已經提到過不會出現這種情況!注意:多出來的消費者不消費任何partition的資料。所以在實際的應用中,建議消費者組的consumer的數量與partition的數量一致!,至少比partition多。

 

partition如何儲存的呢?

partition劃分為多組segment,每個segment又包含.log、.index、.timeindex檔案,存放的每條message包含offset、訊息大小、訊息體……我們多次提到segment和offset,查詢訊息的時候是怎麼利用segment+offset配合查詢的呢?假如現在需要查詢一個offset為368801的message是什麼樣的過程呢?我們先看看下面的圖:

  1、 先找到offset的368801message所在的segment檔案(利用二分法查詢),這裡找到的就是在第二個segment檔案。
  2、 開啟找到的segment中的.index檔案(也就是368796.index檔案,該檔案起始偏移量為368796+1,我們要查詢的offset為368801的message在該index內的偏移量為368796+5=368801,所以這裡要查詢的相對offset為5)。由於該檔案採用的是稀疏索引的方式儲存著相對offset及對應message物理偏移量的關係,所以直接找相對offset為5的索引找不到,這裡同樣利用二分法查詢相對offset小於或者等於指定的相對offset的索引條目中最大的那個相對offset,所以找到的是相對offset為4的這個索引。
  3、 根據找到的相對offset為4的索引確定message儲存的物理偏移位置為256。開啟資料檔案,從位置為256的那個地方開始順序掃描直到找到offset為368801的那條Message。

  這套機制是建立在offset為有序的基礎上,利用segment+有序offset+稀疏索引+二分查詢+順序查詢等多種手段來高效的查詢資料!至此,消費者就能拿到需要處理的資料進行處理了。

那每個消費者又是怎麼記錄自己消費的位置呢?

在早期的版本中,消費者將消費到的offset維護zookeeper中,consumer每間隔一段時間上報一次,這裡容易導致重複消費,且效能不好!在新的版本中消費者消費到的offset已經直接維護在kafk叢集的__consumer_offsets這個topic中!

回到◀瘋狂創客圈

瘋狂創客圈 - Java高併發研習社群,為大家開啟大廠之門