Kafka簡介及使用
阿新 • • 發佈:2018-12-16
一、Kafka概述
離線部分: Hadoop->離線計算(hdfs / mapreduce) yarn zookeeper->分散式協調(動物管理員) hive->資料倉庫(離線計算 / sql)easy coding flume->資料採集 sqoop->資料遷移mysql->hdfs/hive hdfs/hive->mysql Azkaban->任務排程工具 hbase->資料庫(nosql)列式儲存 讀寫速度 實時: kafka storm 官網: http://kafka.apache.org/ ApacheKafka®是一個分散式流媒體平臺 流媒體平臺有三個關鍵功能: 釋出和訂閱記錄流,類似於訊息佇列或企業訊息傳遞系統。 以容錯的持久方式儲存記錄流。 記錄發生時處理流。 Kafka通常用於兩大類應用: 構建可在系統或應用程式之間可靠獲取資料的實時流資料管道 構建轉換或響應資料流的實時流應用程式
二、kafka是什麼?
在流計算中,kafka主要功能是用來快取資料,storm可以通過消費kafka中的資料進行流計算。
是一套開源的訊息系統,由scala寫成。支援javaAPI的。
kafka最初由LinkedIn公司開發,2011年開源。
2012年從Apache畢業。
是一個分散式訊息佇列,kafka讀訊息儲存採用Topic進行歸類。
角色
傳送訊息:Producer(生產者)
接收訊息:Consumer(消費者)
三、為什麼要用訊息佇列
1)解耦 為了避免出現問題 2)拓展性 可增加處理過程 3)靈活 面對訪問量劇增,不會因為超負荷請求而完全癱瘓。 4)可恢復 一部分元件失效,不會影響整個系統。可以進行恢復。 5)緩衝 控制資料流經過系統的速度。 6)順序保證 對訊息進行有序處理。 7)非同步通訊 akka,訊息佇列提供了非同步處理的機制。允許使用者把訊息放到佇列 , 不立刻處理。
四、kafka架構設計
kafka依賴zookeeper,用zk儲存元資料資訊。
搭建kafka叢集要先搭建zookeeper叢集。
zk在kafka中的作用?
儲存kafka叢集節點狀態資訊和消費者當前消費資訊。
Kafka介紹
Kafka架構
五、kafka叢集安裝部署
1)官網下載安裝包 2)上傳安裝包 把安裝包 kafka_2.11-2.0.0.tgz 放置在/root下 3)解壓安裝包 cd /root tar -zxvf kafka_2.11-2.0.0.tgz -C hd 4)重新命名 cd hd mv kafka_2.11-2.0.0/ kafka 5)修改配置檔案 cd /root/hd/kafka mkdir logs cd config vi server.properties broker.id=0 #每臺機器的id不同即可 delete.topic.enable=true #是否允許刪除主題 log.dirs=/root/hd/kafka/logs #執行日誌儲存位置 zookeeper.connect=hd09-1:2181,hd09-2:2181,hd09-3:2181 6)配置環境變數 vi /etc/profile 最下面新增 #kafka_home export KAFKA_HOME=/root/hd/kafka export PATH=$PATH:$KAFKA_HOME/bin 生效環境變數 source /etc/profile 7)分發到其他節點 cd /root/hd scp -r kafka/ hd09-2:$PWD scp -r kafka/ hd09-3:$PWD 8)修改其他節點/root/hd/kafka/config/server.properties broker.id=1 #hd09-2 broker.id=2 #hd09-3 9)啟動叢集 cd /root/hd/kafka bin/kafka-server-start.sh config/server.properties & 10)關閉 cd /root/hd/kafka bin/kafka-server-stop.sh
六、Kafka命令列操作
1)檢視當前叢集中已存在的主題topic
bin/kafka-topics.sh --zookeeper hd09-1:2181 --list
2)建立topic
bin/kafka-topics.sh --zookeeper hd09-1:2181 --create --replication-factor 3 --partitions 1 --topic study
--zookeeper 連線zk叢集
--create 建立
--replication-factor 副本
--partitions 分割槽
--topic 主題名
3)刪除主題
bin/kafka-topics.sh --zookeeper hd09-1:2181 --delete --topic study
4)傳送訊息
生產者啟動:
bin/kafka-console-producer.sh --broker-list hd09-1:9092 --topic study
消費者啟動:
bin/kafka-console-consumer.sh --bootstrap-server hd09-1:9092 --topic study --from-beginning
5)檢視主題詳細資訊
bin/kafka-topics.sh --zookeeper hd09-1:2181 --describe --topic study
七、Kafka簡單API
1、Producer1類---kafka生產者API 介面回撥
package com.css.kafka.kafka_producer;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
* kafka生產者API
*/
public class Producer1 {
public static void main(String[] args) {
//1.配置生產者屬性(指定多個引數)
Properties prop = new Properties();
//引數配置
//kafka節點的地址
prop.put("bootstrap.servers", "192.168.146.132:9092");
//傳送訊息是否等待應答
prop.put("acks", "all");
//配置傳送訊息失敗重試
prop.put("retries", "0");
//配置批量處理訊息大小
prop.put("batch.size", "10241");
//配置批量處理資料延遲
prop.put("linger.ms", "5");
//配置記憶體緩衝大小
prop.put("buffer.memory", "12341235");
//配置在傳送前必須序列化
prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//2.例項化producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);
//3.傳送訊息
for (int i = 0; i < 99; i++) {
producer.send(new ProducerRecord<String, String>("test", "helloworld" + i));
}
//4.釋放資源
producer.close();
}
}
2、Producer2類---kafka生產者API 介面回撥
package com.css.kafka.kafka_producer;
import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
/**
* kafka生產者API 介面回撥
*/
public class Producer2 {
public static void main(String[] args) {
//1.配置生產者屬性(指定多個引數)
Properties prop = new Properties();
//引數配置
//kafka節點的地址
prop.put("bootstrap.servers", "192.168.146.132:9092");
//傳送訊息是否等待應答
prop.put("acks", "all");
//配置傳送訊息失敗重試
prop.put("retries", "0");
//配置批量處理訊息大小
prop.put("batch.size", "10241");
//配置批量處理資料延遲
prop.put("linger.ms", "5");
//配置記憶體緩衝大小
prop.put("buffer.memory", "12341235");
//配置在傳送前必須序列化
prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//自定義分割槽
prop.put("partitioner.class", "com.css.kafka.kafka_producer.Partition1");
//2.例項化producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);
//3.傳送訊息
for (int i = 0; i < 99; i++) {
producer.send(new ProducerRecord<String, String>("yuandan", "nice" + i), new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
//如果metadata不為null 拿到當前的資料偏移量與分割槽
if(metadata != null) {
System.out.println(metadata.topic() + "----" + metadata.offset() + "----" + metadata.partition());
}
}
});
}
//4.關閉資源
producer.close();
}
}
3、Partition1類---設定自定義分割槽
package com.css.kafka.kafka_producer;
import java.util.Map;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
/**
* 設定自定義分割槽
*/
public class Partition1 implements Partitioner{
//設定
public void configure(Map<String, ?> configs) {
}
//分割槽邏輯
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
return 1;
}
//釋放資源
public void close() {
}
}
4、Consumer1類---消費者API
package com.css.kafka.kafka_consumer;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
/**
* 消費者類
*/
public class Consumer1 {
public static void main(String[] args) {
//1.配置消費者屬性
Properties prop = new Properties();
//2.配置屬性
//指定伺服器地址
prop.put("bootstrap.servers", "192.168.146.133:9092");
//配置消費者組
prop.put("group.id", "g1");
//配置是否自動確認offset
prop.put("enable.auto.commit", "true");
//序列化
prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//2.例項消費者
final KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(prop);
//4.釋放資源 執行緒安全
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run() {
if (consumer != null) {
consumer.close();
}
}
}));
//訂閱訊息主題
consumer.subscribe(Arrays.asList("test"));
//3.拉訊息 推push 拉poll
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
//遍歷訊息
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.topic() + "-----" + record.value());
}
}
}
}
5、Producer3類---kafka生產者API-帶攔截器
package com.css.kafka.interceptor;
import java.util.ArrayList;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
/**
* kafka生產者API 帶攔截器
*/
public class Producer3 {
public static void main(String[] args) {
//1.配置生產者屬性(指定多個引數)
Properties prop = new Properties();
//引數配置
//kafka節點的地址
prop.put("bootstrap.servers", "192.168.146.132:9092");
//傳送訊息是否等待應答
prop.put("acks", "all");
//配置傳送訊息失敗重試
prop.put("retries", "0");
//配置批量處理訊息大小
prop.put("batch.size", "10241");
//配置批量處理資料延遲
prop.put("linger.ms", "5");
//配置記憶體緩衝大小
prop.put("buffer.memory", "12341235");
//配置在傳送前必須序列化
prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//攔截器
ArrayList<String> inList = new ArrayList<String>();
inList.add("com.css.kafka.interceptor.TimeInterceptor");
prop.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, inList);
//2.例項化producer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);
//3.傳送訊息
for (int i = 0; i < 99; i++) {
producer.send(new ProducerRecord<String, String>("test", "helloworld" + i));
}
//4.釋放資源
producer.close();
}
}
6、TimeInterceptor類---攔截器類
package com.css.kafka.interceptor;
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
/**
* 攔截器類
*/
public class TimeInterceptor implements ProducerInterceptor<String, String>{
//配置資訊
public void configure(Map<String, ?> configs) {
}
//業務邏輯
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
return new ProducerRecord<String, String>(
record.topic(),
record.partition(),
record.key(),
System.currentTimeMillis() + "-" + record.value());
}
//傳送失敗呼叫
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
//關閉資源
public void close() {
}
}
7、kafka的maven依賴
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>2.0.0</version>
</dependency>