Kafka入門寶典(詳細截圖版)
1、瞭解 Apache Kafka
1.1、簡介
官網:http://kafka.apache.org/
- Apache Kafka 是一個開源訊息系統,由Scala 寫成。是由Apache 軟體基金會開發的一個開源訊息系統專案。
- Kafka 最初是由LinkedIn 開發,並於2011 年初開源。2012 年10 月從Apache Incubator 畢業。該專案的目標是為處理實時資料提供一個統一、高通量、低等待(低延時)的平臺。
- Kafka 是一個分散式訊息系統:具有生產者、消費者的功能。它提供了類似於JMS 的特性,但是在設計實現上完全不同,此外它並不是JMS 規範的實現。【重點】
1.2、kafka的基本結構
Producer:訊息的傳送者
Consumer:訊息的接收者
kafka cluster:kafka的叢集。
Topic:就是訊息類別名,一個topic中通常放置一類訊息。每個topic都有一個或者多個訂閱者(消費者)。
訊息的生產者將訊息推送到kafka叢集,訊息的消費者從kafka叢集中拉取訊息。
1.3、kafka的完整架構
說明:
- broker:叢集中的每一個kafka例項,稱之為broker;
- ZooKeeper:Kafka 利用ZooKeeper 儲存相應元資料資訊, Kafka 元資料資訊包括如代理節點資訊、Kafka叢集資訊、舊版消費者資訊及其消費偏移量資訊、主題資訊、分割槽狀態資訊、分割槽副本分配方案資訊、動態配置資訊等。
- ConsumerGroup:在Kafka 中每一個消費者都屬於一個特定消費組( ConsumerGroup ),我們可以為每個消費者指定一個消費組,以groupld 代表消費組名稱,通過group.id 配置設定。如果不指定消費組,則該消費者屬於預設消費組test-consumer-group 。
1.4、kafka的特性
- 訊息持久化
- Kafka 基於檔案系統來儲存和快取訊息。
- 高吞吐量
- Kafka 將資料寫到磁碟,充分利用磁碟的順序讀寫。同時, Kafka 在資料寫入及資料同步採用了零拷貝( zero-copy )技術,採用sendFile()函式呼叫,sendFile()函式是在兩個檔案描述符之間直接傳遞資料,完全在核心中操作,從而避免了核心緩衝區與使用者緩衝區之間資料的拷貝,操作效率極高。
- Kafka 還支援資料壓縮及批量傳送,同時Kafka 將每個主題劃分為多個分割槽,這一系列的優化及實現方法使得Kafka 具有很高的吞吐量。經大多數公司對Kafka 應用的驗證, Kafka 支援每秒數百萬級別的訊息。
- 高擴充套件性
- Kafka 依賴ZooKeeper來對叢集進行協調管理,這樣使得Kafka 更加容易進行水平擴充套件,生產者、消費者和代理都為分散式,可配置多個。
- 同時在機器擴充套件時無需將整個叢集停機,叢集能夠自動感知,重新進行負責均衡及資料複製。
- 多客戶端支援
- Kafka 核心模組用Scala 語言開發,Kafka 提供了多種開發語言的接入,如Java 、Scala、C 、C++、Python 、Go 、Erlang 、Ruby 、Node. 等。
- 安全機制
- Kafka 支援以下幾種安全措施:
- 通過SSL 和SASL(Kerberos), SASL/PLA時驗證機制支援生產者、消費者與broker連線時的身份認證;
- 支援代理與ZooKeeper 連線身份驗證;
- 通訊時資料加密;
- 客戶端讀、寫許可權認證;
- Kafka 支援與外部其他認證授權服務的整合;
- Kafka 支援以下幾種安全措施:
- 資料備份
- Kafka 可以為每個topic指定副本數,對資料進行持久化備份,這可以一定程度上防止資料丟失,提高可用性。
- 輕量級
- Kafka 的例項是無狀態的,即broker不記錄訊息是否被消費,消費偏移量的管理交由消費者自己或組協調器來維護。
- 同時叢集本身幾乎不需要生產者和消費者的狀態資訊,這就使得Kafka非常輕量級,同時生產者和消費者客戶端實現也非常輕量級。
- 訊息壓縮
- Kafka 支援Gzip, Snappy 、LZ4 這3 種壓縮方式,通常把多條訊息放在一起組成MessageSet,然後再把Message Set 放到一條訊息裡面去,從而提高壓縮比率進而提高吞吐量。
1.5、kafka的應用場景
- 訊息系統。
- Kafka 作為一款優秀的訊息系統,具有高吞吐量、內建的分割槽、備份冗餘分散式等特點,為大規模訊息處理提供了一種很好的解決方案。
- 應用監控。
- 利用Kafka 採集應用程式和伺服器健康相關的指標,如CPU 佔用率、IO 、記憶體、連線數、TPS 、QPS 等,然後將指標資訊進行處理,從而構建一個具有監控儀表盤、曲線圖等視覺化監控系統。例如,很多公司採用Kafka 與ELK (Elastic Search 、Logstash 和Kibana)整合構建應用服務監控系統。
- 網站使用者行為追蹤。
- 為了更好地瞭解使用者行為、操作習慣,改善使用者體驗,進而對產品升級改進,將使用者操作軌跡、內容等資訊傳送到Kafka 叢集上,通過Hadoop 、Spark 或Strom等進行資料分析處理,生成相應的統計報告,為推薦系統推薦物件建模提供資料來源,進而為每個使用者進行個性化推薦。
- 流處理。
- 需要將己收集的流資料提供給其他流式計算框架進行處理,用Kafka 收集流資料是一個不錯的選擇。
- 永續性日誌。
- Kafka 可以為外部系統提供一種永續性日誌的分散式系統。日誌可以在多個節點間進行備份, Kafka 為故障節點資料恢復提供了一種重新同步的機制。同時, Kafka很方便與HDFS 和Flume 進行整合,這樣就方便將Kafka 採集的資料持久化到其他外部系統。
2、Kafka的安裝與配置
準備三臺虛擬機器,分別是node01,node02,node03,並且修改hosts檔案如下:
vim /etc/hosts
#注意: 前面的ip地址改成自己的ip地址
192.168.40.133 node01
192.168.40.134 node02
192.168.40.135 node03
#3臺伺服器的時間要一致
#時間更新:
yum install -y rdate
rdate -s time-b.nist.gov
2.1、基礎環境配置
2.1.1、JDK環境
由於Kafka 是用Scala 語言開發的,執行在JVM上,因此在安裝Kafka 之前需要先安裝JDK 。
安裝過程略過,我這裡使用的是jdk1.8。
2.1.2、ZooKeeper環境
2.1.2.1、安裝ZooKeeper
Kafka 依賴ZooKeeper ,通過ZooKeeper 來對服務節點、消費者上下線管理、叢集、分割槽元資料管理等,因此ZooKeeper 也是Kafka 得以執行的基礎環境之一。
#上傳zookeeper-3.4.9.tar.gz到/export/software
cd /export/software
mkdir -p /export/servers/
tar -xvf zookeeper-3.4.9.tar.gz -C /export/servers/
#建立ZooKeeper的data目錄
mkdir /export/data/zookeeper -p
cd /export/servers/zookeeper-3.4.9/conf/
#修改配置檔案
mv zoo_sample.cfg zoo.cfg
vim zoo.cfg
#設定data目錄
dataDir=/export/data/zookeeper
#啟動ZooKeeper
./zkServer.sh start
#檢查是否啟動成功
jps
2.1.2.3、搭建ZooKeeper叢集
#在/export/data/zookeeper目錄中建立myid檔案
vim /export/data/zookeeper/myid
#寫入對應的節點的id,如:1,2等,儲存退出
#在conf下,修改zoo.cfg檔案
vim zoo.cfg
#新增如下內容
server.1=node01:2888:3888
server.2=node02:2888:3888
server.3=node03:2888:3888
2.1.2.3、配置環境變數
vim /etc/profile
export ZK_HOME=/export/servers/zookeeper-3.4.9
export PATH=${ZK_HOME}/bin:$PATH
#立即生效
source /etc/profile
2.1.2.4、分發到其它機器
scp /etc/profile node02:/etc/
scp /etc/profile node03:/etc/
cd /export/servers
scp -r zookeeper-3.4.9 node02:/export/servers/
scp -r zookeeper-3.4.9 node03:/export/servers/
2.1.2.5、一鍵啟動、停止指令碼
mkdir -p /export/servers/onekey/zk
vim slave
#輸入如下內容
node01
node02
node03
#儲存退出
vim startzk.sh
#輸入如下內容
cat /export/servers/onekey/zk/slave | while read line
do
{
echo "開始啟動 --> "$line
ssh $line "source /etc/profile;nohup sh ${ZK_HOME}/bin/zkServer.sh start >/dev/null 2>&1 &"
}&
wait
done
echo "★★★啟動完成★★★"
#儲存退出
vim stopzk.sh
#輸入如下內容
cat /export/servers/onekey/zk/slave | while read line
do
{
echo "開始停止 --> "$line
ssh $line "source /etc/profile;nohup sh ${ZK_HOME}/bin/zkServer.sh stop >/dev/null 2>&1 &"
}&
wait
done
echo "★★★停止完成★★★"
#儲存退出
#設定可執行許可權
chmod +x startzk.sh stopzk.sh
#新增到環境變數中
export ZK_ONEKEY=/export/servers/onekey
export PATH=${ZK_ONEKEY}/zk:$PATH
2.1.2.6、檢查啟動是否成功
發現三臺機器都有“QuorumPeerMain”程序,說明機器已經啟動成功了。
檢查叢集是否正常:
zkServer.sh status
發現,叢集執行一切正常。
2.2、安裝Kafka
2.2.1、單機版Kafka安裝
第一步:上傳Kafka安裝包並且解壓
rz 上傳kafka_2.11-1.1.0.tgz到 /export/software/
cd /export/software/
tar -xvf kafka_2.11-1.1.0.tgz -C /export/servers/
cd /export/servers
mv kafka_2.11-1.1.0/ kafka
第二步:配置環境變數
vim /etc/profile
#輸入如下內容
export KAFKA_HOME=/export/servers/kafka
export PATH=${KAFKA_HOME}/bin:$PATH
#儲存退出
source /etc/profile
第三步:修改配置檔案
cd /export/servers/kafka
cd config
vim server.properties
# The id of the broker. This must be set to a unique integer for each broker.
# 必須要只要一個brokerid,並且它必須是唯一的。
broker.id=0
# A comma separated list of directories under which to store log files
# 日誌資料檔案儲存的路徑 (如不存在,需要手動建立該目錄, mkdir -p /export/data/kafka/)
log.dirs=/export/data/kafka
# ZooKeeper的配置,本地模式下指向到本地的ZooKeeper服務即可
zookeeper.connect=node01:2181
# 儲存退出
第四步:啟動kafka服務
# 以守護程序的方式啟動kafka
kafka-server-start.sh -daemon /export/servers/kafka/config/server.properties
第五步:檢測kafka是否啟動
如果程序中有名為kafka的程序,就說明kafka已經啟動了。
2.2.2、驗證kafka是否安裝成功
由於kafka是將元資料儲存在ZooKeeper中的,所以,可以通過檢視ZooKeeper中的資訊進行驗證kafka是否安裝成功。
2.2.3、部署kafka-manager
Kafka Manager 由 yahoo 公司開發,該工具可以方便檢視叢集 主題分佈情況,同時支援對 多個叢集的管理、分割槽平衡以及建立主題等操作。
原始碼託管於github:https://github.com/yahoo/kafka-manager
第一步:上傳Kafka-manager安裝包並且解壓
rz上傳kafka-manager-1.3.3.17.tar.gz到 /export/software/
cd /export/software
tar -xvf kafka-manager-1.3.3.17.tar.gz -C /export/servers/
cd /export/servers/kafka-manager-1.3.3.17/conf
第二步:修改配置檔案
#修改配置檔案
vim application.conf
#新增項,http訪問服務的埠
http.port=19000
#修改成自己的zk機器地址和埠
kafka-manager.zkhosts="node01:2181"
#儲存退出
第三步:啟動服務
cd /export/servers/kafka-manager-1.3.3.17/bin
#啟動服務
./kafka-manager -Dconfig.file=../conf/application.conf
#製作啟動指令碼
vim /etc/profile
export KAFKA_MANAGE_HOME=/export/servers/kafka-manager-1.3.3.17
export PATH=${KAFKA_MANAGE_HOME}/bin:$PATH
source /etc/profile
cd /export/servers/onekey/
mkdir kafka-manager
cd kafka-manager
vim start-kafka-manager.sh
nohup kafka-manager -Dconfig.file=${KAFKA_MANAGE_HOME}/conf/application.conf >/dev/null 2>&1 &
chmod +x start-kafka-manager.sh
vim /etc/profile
export PATH=${ZK_ONEKEY}/kafka-manager:$PATH
source /etc/profile
第四步:檢查是否啟動成功
開啟瀏覽器,輸入地址:http://node01:19000/,即可看到kafka-manage管理介面。
2.2.4、kafka-manager的使用
進入管理介面,是沒有顯示Cluster資訊的,需要新增後才能操作。
- 新增 Cluster:
輸入Cluster Name、ZooKeeper資訊、以及Kafka的版本資訊(這裡最高只能選擇1.0.0)。
點選Save按鈕儲存。
新增成功。
- 檢視kafka的資訊
- 檢視Broker資訊
- 檢視Topic列表
- 檢視單個topic資訊以及操作
- 優化副本選舉
- 檢視消費者資訊
2.2.5、搭建kafka叢集
kafka叢集的搭建是非常簡單的,只需要將上面的單機版的kafka分發的其他機器,並且將ZooKeeper資訊修改成叢集的配置以及設定不同的broker值即可。
第一步:將kafka分發到node02、node03
cd /export/servers/
scp -r kafka node02:/export/servers/
scp -r kafka node03:/export/servers/
scp /etc/profile node02:/etc/
scp /etc/profile node03:/etc/
# 分別到node02、node03機器上執行
source /etc/profile
第二步:修改node01、node02、node03上的kafka配置檔案
node01:
cd /export/servers/kafka/config vim server.properties zookeeper.connect=node01:2181,node02:2181,node03:2181
node02:
cd /export/servers/kafka/config vim server.properties broker.id=1 zookeeper.connect=node01:2181,node02:2181,node03:2181
node03:
cd /export/servers/kafka/config vim server.properties broker.id=2 zookeeper.connect=node01:2181,node02:2181,node03:2181
第三步:編寫一鍵啟動、停止指令碼。注意:該指令碼依賴於環境變數中的KAFKA_HOME。
mkdir -p /export/servers/onekey/kafka
vim slave
#輸入如下內容
node01
node02
node03
#儲存退出
vim start-kafka.sh
#輸入如下內容
cat /export/servers/onekey/kafka/slave | while read line
do
{
echo "開始啟動 --> "$line
ssh $line "source /etc/profile;nohup sh ${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties >/dev/null 2>&1 &"
}&
wait
done
echo "★★★啟動完成★★★"
#儲存退出
chmod +x start-kafka.sh
vim stop-kafka.sh
#輸入如下內容
cat /export/servers/onekey/kafka/slave | while read line
do
{
echo "開始停止 --> "$line
ssh $line "source /etc/profile;nohup sh ${KAFKA_HOME}/bin/kafka-server-stop.sh >/dev/null 2>&1 &"
}&
wait
done
echo "★★★停止完成★★★"
#儲存退出
chmod +x stop-kafka.sh
#加入到環境變數中
export PATH=${ZK_ONEKEY}/kafka:$PATH
source /etc/profile
第四步:通過kafka-manager管理工具檢視叢集資訊。
由此可見,kafka叢集已經啟動完成。
3、Kafka快速入門
對kafka的操作有2種方式,一種是通過命令列方式,一種是通過API方式。
3.1、通過命令列Kafka
Kafka在bin目錄下提供了shell指令碼檔案,可以對Kafka進行操作,分別是:
通過命令列的方式,我們將體驗下kafka,以便我們對kafka有進一步的認知。
3.1.1、topic的操作
3.1.1.1、建立topic
kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 1 --partitions 1 --topic my-kafka-topic
#執行結果:
Created topic "my-kafka-topic".
引數說明:
- zookeeper:引數是必傳引數,用於配置 Kafka 叢集與 ZooKeeper 連線地址。至少寫一個。
- partitions:引數用於設定主題分割槽數,該配置為必傳引數。
- replication-factor:引數用來設定主題副本數 ,該配置也是必傳引數。
- topic:指定topic的名稱。
3.1.1.2、檢視topic列表
kafka-topics.sh --list --zookeeper node01:2181
__consumer_offsets
my-kafka-topic
可以檢視列表。
如果需要檢視topic的詳細資訊,需要使用describe命令。
kafka-topics.sh --describe --zookeeper node01:2181 --topic test-topic
#若不指定topic,則檢視所有topic的資訊
kafka-topics.sh --describe --zookeeper node01:2181
3.1.1.3、刪除topic
通過kafka-topics.sh執行刪除動作,需要在server.properties檔案中配置 delete.topic.enable=true,該配置預設為 false。
否則執行該指令碼並未真正刪除主題 ,將該topic標記為刪除狀態 。
kafka-topics.sh --delete --zookeeper node01:2181 --topic my-kafka-topic
# 執行如下
[root@node01 config]# kafka-topics.sh --delete --zookeeper node01:2181 --topic my-kafka-topic
Topic my-kafka-topic is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
# 如果將delete.topic.enable=true
[root@node01 config]# kafka-topics.sh --delete --zookeeper node01:2181 --topic my-kafka-topic2
Topic my-kafka-topic2 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
# 說明:雖然設定後,刪除時依然提示沒有設定為true,實際上已經刪除了。
3.1.2、生產者的操作
kafka-console-producer.sh --broker-list node01:9092 --topic my-kafka-topic
可以看到,已經向topic傳送了訊息。
3.1.3、消費者的操作
kafka-console-consumer.sh --bootstrap-server node01:9092 --topic my-kafka-topic
# 通過以上命令,可以看到消費者可以接收生產者傳送的訊息
# 如果需要從頭開始接收資料,需要新增--from-beginning引數
kafka-console-consumer.sh --bootstrap-server node01:9092 --from-beginning --topic my-kafka-topic
3.2、通過Java Api操作Kafka
除了通過命令列的方式操作kafka外,還可以通過Java api的方式操作,這種方式將更加的常用。
3.2.1、建立工程
匯入依賴:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>itcast-bigdata</artifactId>
<groupId>cn.itcast.bigdata</groupId>
<version>1.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>itcast-bigdata-kafka</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- java編譯外掛 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
</plugins>
</build>
</project>
3.2.2、topic的操作
由於主題的元資料資訊是註冊在 ZooKeeper 相 應節點之中,所以對主題的操作實質是對 ZooKeeper 中記錄主題元資料資訊相關路徑的操作。 Kafka將對 ZooKeeper 的相關操作封裝成一 個 ZkUtils 類 , 井封裝了一個AdrninUtils 類呼叫 ZkClient 類的相關方法以實現對 Kafka 元資料 的操作,包括對主題、代理、消費者等相關元資料的操作。對主題操作的相關 API呼叫較簡單, 相應操作都是通過呼叫 AdminUtils類的相應方法來完成的。
package cn.itcast.kafka;
import kafka.admin.AdminUtils;
import kafka.utils.ZkUtils;
import org.apache.kafka.common.security.JaasUtils;
import org.junit.Test;
import java.util.Properties;
public class TestKafkaTopic {
@Test
public void testCreateTopic() {
ZkUtils zkUtils = null;
try {
//引數:zookeeper的地址,session超時時間,連線超時時間,是否啟用zookeeper安全機制
zkUtils = ZkUtils.apply("node01:2181", 30000, 3000, JaasUtils.isZkSecurityEnabled());
String topicName = "my-kafka-topic-test1";
if (!AdminUtils.topicExists(zkUtils, topicName)) {
//引數:zkUtils,topic名稱,partition數量,副本數量,引數,機架感知模式
AdminUtils.createTopic(zkUtils, topicName, 1, 1, new Properties(), AdminUtils.createTopic$default$6());
System.out.println(topicName + " 建立成功!");
} else {
System.out.println(topicName + " 已存在!");
}
} finally {
if (null != zkUtils) {
zkUtils.close();
}
}
}
}
測試結果:
3.2.2.1、刪除topic
@Test
public void testDeleteTopic() {
ZkUtils zkUtils = null;
try {
//引數:zookeeper的地址,session超時時間,連線超時時間,是否啟用zookeeper安全機制
zkUtils = ZkUtils.apply("node01:2181", 30000, 3000, JaasUtils.isZkSecurityEnabled());
String topicName = "my-kafka-topic-test1";
if (AdminUtils.topicExists(zkUtils, topicName)) {
//引數:zkUtils,topic名稱
AdminUtils.deleteTopic(zkUtils, topicName);
System.out.println(topicName + " 刪除成功!");
} else {
System.out.println(topicName + " 不已存在!");
}
} finally {
if (null != zkUtils) {
zkUtils.close();
}
}
}
測試結果:
3.2.3、生產者的操作
package cn.itcast.kafka;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Test;
import java.util.Properties;
public class TestProducer {
@Test
public void testProducer() throws InterruptedException {
Properties config = new Properties();
// 設定kafka服務列表,多個用逗號分隔
config.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092");
// 設定序列化訊息 Key 的類
config.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 設定序列化訊息 value 的類
config.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 初始化
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(config);
for (int i = 0; i < 100 ; i++) {
ProducerRecord record = new ProducerRecord("my-kafka-topic","data-" + i);
// 傳送訊息
kafkaProducer.send(record);
System.out.println("傳送訊息 --> " + i);
Thread.sleep(100);
}
kafkaProducer.close();
}
}
3.2.4、消費者的操作
package cn.itcast.kafka;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Test;
import javax.sound.midi.Soundbank;
import java.util.Arrays;
import java.util.Properties;
public class TestConsumer {
@Test
public void testConsumer() {
Properties config = new Properties();
// 設定kafka服務列表,多個用逗號分隔
config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092");
// 設定消費者分組id
config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
// 設定序反列化訊息 Key 的類
config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 設定序反列化訊息 value 的類
config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(config);
// 訂閱topic
kafkaConsumer.subscribe(Arrays.asList("my-kafka-topic"));
while (true) { // 使用死迴圈不斷的拉取資料
ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
String value = record.value();
long offset = record.offset();
System.out.println("value = " + value + ", offset = " + offset);
}
}
}
}
什麼是Kafka?
Kafka監控工具彙總
Kafka快速入門
Kafka核心之Consumer
Kafka核心之Producer
替代Flume——Kafka Connect簡介
最簡單流處理引擎——Kafka Streams簡介
更多實時計算,Flink,Kafka等相關技術博文,歡迎關注實時流式計算
相關推薦
Kafka入門寶典(詳細截圖版)
1、瞭解 Apache Kafka 1.1、簡介 官網:http://kafka.apache.org/ Apache Kafka 是一個開源訊息系統,由Scala 寫成。是由Apache 軟體基金會開發的一個開源訊息系統專案。 Kafka 最初是由LinkedIn 開發,並於2011 年初開源。2
Flink入門寶典(詳細截圖版)
本文基於java構建Flink1.9版本入門程式,需要Maven 3.0.4 和 Java 8 以上版本。需要安裝Netcat進行簡單除錯。 這裡簡述安裝過程,並使用IDEA進行開發一個簡單流處理程式,本地除錯或者提交到Flink上執行,Maven與JDK安裝這裡不做說明。 一、Flink簡介 Fli
IT(計算機/軟件/互聯網)專業詞匯寶典(持續更新中)
hub point charger 中國 mar asi lose 社區 less 1.Stack Overflow:http://stackoverflow.com/ .一個著名的IT技術的問答站點。全然免費。程序猿必知。2.programmer:程序猿3.e
程式設計師面試寶典(第5版)
網站 更多書籍點選進入>> CiCi島 下載 電子版僅供預覽及學習交流使用,下載後請24小時內刪除,支援正版,喜歡的請購買正版書籍 電子書下載(皮皮雲盤-點選“普通下載”) 購買正版 封頁 編輯推薦 揭開知名IT企業面試、筆試
Java程式設計師面試寶典(第4版)
網站 更多書籍點選進入>> CiCi島 下載 電子版僅供預覽及學習交流使用,下載後請24小時內刪除,支援正版,喜歡的請購買正版書籍 電子書下載(皮皮雲盤-點選“普通下載”) 購買正版 封頁 編輯推薦 揭開知名IT企業面試、筆試
優動漫PAINT實用寶典(圖層篇)——柵格圖層
您在使用優動漫PAINT描畫影象時,是否都是從“新建圖層”開始的呢?瞭解圖層的型別及其不同圖層之間的差異,更易於使用優動漫PAINT作畫。優動漫PAINT入門寶典(圖層篇)將向各位小夥伴們展示不同圖層的特點,以及其使用方法。本次為大家介紹的是柵格圖層,下一篇將向大家介紹向量圖
opengl超級寶典(第五版)閱讀筆記 8 模型檢視投影矩陣
從這裡開始就進入了opengl最關鍵的部分了。 1.構造平移矩陣 m3dTranslationMatrix44(mTranslate, 0.0f, 0.0f, -2.5f); 第一個引數是4x4的矩陣,後面是位移向量 2.構造旋轉矩陣 m3dRotationMatrix44(m
opengl超級寶典(第五版)閱讀筆記 7 抗鋸齒
在圖形光柵化的時候,難免會出現很多鋸齒現象,如下圖所示: 可以看到很明顯的一段一段的鋸齒。 opengl中通過把該點畫素與周圍畫素相混合來優化鋸齒現象,方法如下: glBlendFunc(GL_SRC_ALPHA, GL_ONE_MINUS_SRC_ALPHA);//設定混合模式
opengl超級寶典(第五版)閱讀筆記 6 幾何圖形繪製細節
1.剔除背面 如果不剔除物體的背面的話,3D圖形的正面和背面會同時顯示,如下圖所示: 通過glEnable(GL_CULL_FACE);來剔除背面 2.深度測試 通過開啟深度測試,能保證在物體後面的圖形不會被渲染出來,通過glEnable(GL_DEPTH_TEST);開啟 3.三種填充模
opengl超級寶典(第五版)閱讀筆記 5 混合
這裡只提一種最基本的混合,其他的混合方式可以參考書中的表格 混合程式碼如下: glEnable(GL_BLEND); glBlendFunc(GL_SRC_ALPHA, GL_ONE_MINUS_SRC_ALPHA); shaderManager.UseStockShader(GL
opengl超級寶典(第五版)閱讀筆記 4 裁剪
通過glScissor(100, 100, 600, 400)函式可以設定裁剪區域,引數分別為左下角和右上角的座標 當然,別忘記要開啟裁剪測試glEnable(GL_SCISSOR_TEST); #include <GLTools.h> // OpenGL toolkit
opengl超級寶典(第五版)閱讀筆記 3 基本圖元的使用
基本圖元有 GL_POINTS GL_LINES GL_LINE_STRIP GL_LINE_LOOP GL_TRIANGLES GL_TRIANGLE_LOOP GL_TRIANGLE_FAN. 後面將一 一對其進行簡單的介紹 1.GL_POINTS I. 建立點集 GLfloat v
opengl超級寶典(第五版)閱讀筆記 1 基本程式框架
配置環境部分其實還是有點煩,網上資料有很多,耐心點問題也不大。 下面也算是opengl的hello world了,寫了比較詳細的註釋。 值得注意的是#pragma comment(lib,“gltools.lib”)這一行,書中是沒有的,可能因為環境配置方法的不同,我必須要手動連結一下glt
VS2012 中完整配置OpenGL超級寶典(第五版)編譯環境
在接觸OpenGL中,配置顯得相當麻煩,特別是在VS2012下配置時,存在許多問題,而網上的很多方法僅僅適用於VS2008,甚至僅適用於VC6.0,筆者經過自身的實踐,參考了許多網上的資料,總結了一下配置的方法,當然這僅僅是筆者的個人理解,筆者個人水平有限,因此未必是萬能
程式設計師面試寶典(第三版)——單鏈表的基本操作:建立,求長度,輸出,排序,插入,刪除,逆置
程式設計實現一個單鏈表的建立,求單鏈表的長度,列印輸出單鏈表,對單鏈表進行排序,插入元素,刪除元素,對單鏈表進行逆置。 我是借鑑參考資料,然後自己寫規範,對函式都進行了呼叫,每一次呼叫,都有輸出單鏈表。程式完整,已除錯執行。 源程式: #include<iostrea
《Java程式設計師面試寶典(第4版)》試讀感想
作為一名java程式設計師,已經有幾年經驗了,但是試讀章節的題目在看答案之前也自己做了一下,基本沒有做對一道題目,雖然有經驗,但是基礎的東西在平時工作用的少,或者一些實現方式或寫法根本沒有這樣寫過,所以這些題目答錯在所難免了。 面試寶典,顧名思義它的核心在於面試,往往
OpenGL超級寶典(第五版) 環境配置
特別提醒:有些在word中或者其他中的程式碼複製到vs中會報錯,原因是word中有些隱含的字元,複製到vs中就會報錯;重新輸一遍就可以解決問題,這裡只是提醒下! 可以參閱我前面轉載的一篇文章,進行比較然後來配置,本人蔘照這兩篇,成執行,算是學習opengl的開始吧;
零基礎新手的Python入門實戰寶典(二) —— 從哪裡開始?(搭建Python開發環境,Python + Pycharm)
如果你之前看過其他教程,但是發現雲裡霧裡複雜的讓你頭暈眼花的話,沒錯,看這裡,本系列Python教程專為啥都不會的新手使用者打造,放寬心,大膽看,我就是說說書,你就當聽聽故事,輕鬆愉快走進程式設計的大門,“程式設計”不再神祕也不再遙不可及。只要你會最基本的電腦操
零基礎新手的Python入門實戰寶典(一) —— Python都能幹些啥?(Python的用途)
如果你之前看過其他教程,但是發現雲裡霧裡複雜的讓你頭暈眼花的話,沒錯,看這裡,本系列Python教程專為啥都不會的新手使用者打造,放寬心,大膽看,我就是說說書,你就當聽聽故事,輕鬆愉快走進程式設計的大門,“程式設計”不再神祕也不再遙不可及。只要你會最基本的電腦操
React Native 入門寶典
num 種類 類的繼承 nds 哪些 這份 校驗 sre working 聲明:該書的筆者為徐嬴老師,一名具有5年IOS開發經驗,和兩年RN開發經驗的老司機。 原文可以在gitbook上找到 筆者只是為他的書中提的的一些列問題,進行有償答疑。 有償答疑。本書將持續保持更