1. 程式人生 > >Kafka簡介及使用

Kafka簡介及使用

一、Kafka概述

    離線部分:
    Hadoop->離線計算(hdfs / mapreduce) yarn
    zookeeper->分散式協調(動物管理員)
    hive->資料倉庫(離線計算 / sql)easy coding
    flume->資料採集
    sqoop->資料遷移mysql->hdfs/hive hdfs/hive->mysql
    Azkaban->任務排程工具
    hbase->資料庫(nosql)列式儲存 讀寫速度
    實時:
    kafka
    storm
    官網:
    http://kafka.apache.org/
    ApacheKafka®是一個分散式流媒體平臺
    流媒體平臺有三個關鍵功能:
    釋出和訂閱記錄流,類似於訊息佇列或企業訊息傳遞系統。
    以容錯的持久方式儲存記錄流。
    記錄發生時處理流。
    Kafka通常用於兩大類應用:
    構建可在系統或應用程式之間可靠獲取資料的實時流資料管道
    構建轉換或響應資料流的實時流應用程式

二、kafka是什麼?

    在流計算中,kafka主要功能是用來快取資料,storm可以通過消費kafka中的資料進行流計算。
    是一套開源的訊息系統,由scala寫成。支援javaAPI的。
    kafka最初由LinkedIn公司開發,2011年開源。
    2012年從Apache畢業。
    是一個分散式訊息佇列,kafka讀訊息儲存採用Topic進行歸類。
    角色
    傳送訊息:Producer(生產者)
    接收訊息:Consumer(消費者)

三、為什麼要用訊息佇列

    1)解耦
    為了避免出現問題
    2)拓展性
    可增加處理過程
    3)靈活
    面對訪問量劇增,不會因為超負荷請求而完全癱瘓。
    4)可恢復
    一部分元件失效,不會影響整個系統。可以進行恢復。
    5)緩衝
    控制資料流經過系統的速度。
    6)順序保證
    對訊息進行有序處理。
    7)非同步通訊
    akka,訊息佇列提供了非同步處理的機制。允許使用者把訊息放到佇列 , 不立刻處理。

四、kafka架構設計

    kafka依賴zookeeper,用zk儲存元資料資訊。
    搭建kafka叢集要先搭建zookeeper叢集。
    zk在kafka中的作用?
    儲存kafka叢集節點狀態資訊和消費者當前消費資訊。

Kafka介紹

Kafka架構

五、kafka叢集安裝部署

    1)官網下載安裝包
    2)上傳安裝包
    把安裝包 kafka_2.11-2.0.0.tgz 放置在/root下
    
    3)解壓安裝包
    cd /root
    tar -zxvf kafka_2.11-2.0.0.tgz -C hd
    
    4)重新命名
    cd hd
    mv kafka_2.11-2.0.0/ kafka
    
    5)修改配置檔案
    cd /root/hd/kafka
    mkdir logs
    cd config
    vi server.properties
    broker.id=0 #每臺機器的id不同即可
    delete.topic.enable=true #是否允許刪除主題
    log.dirs=/root/hd/kafka/logs #執行日誌儲存位置
    zookeeper.connect=hd09-1:2181,hd09-2:2181,hd09-3:2181
    
    6)配置環境變數
    vi /etc/profile
    最下面新增
    #kafka_home
    export KAFKA_HOME=/root/hd/kafka
    export PATH=$PATH:$KAFKA_HOME/bin

    生效環境變數
    source /etc/profile
    
    7)分發到其他節點
    cd /root/hd
    scp -r kafka/ hd09-2:$PWD
    scp -r kafka/ hd09-3:$PWD
    
    8)修改其他節點/root/hd/kafka/config/server.properties
    broker.id=1 #hd09-2
    broker.id=2 #hd09-3
    
    9)啟動叢集
    cd /root/hd/kafka
    bin/kafka-server-start.sh config/server.properties &
    10)關閉
    cd /root/hd/kafka
    bin/kafka-server-stop.sh

六、Kafka命令列操作

    1)檢視當前叢集中已存在的主題topic
    bin/kafka-topics.sh --zookeeper hd09-1:2181 --list
    
    2)建立topic
    bin/kafka-topics.sh --zookeeper hd09-1:2181 --create --replication-factor 3 --partitions 1 --topic study
    
    --zookeeper 連線zk叢集
    --create 建立
    --replication-factor 副本
    --partitions 分割槽
    --topic 主題名
    
    3)刪除主題
    bin/kafka-topics.sh --zookeeper hd09-1:2181 --delete --topic study
    
    4)傳送訊息
    生產者啟動:
    bin/kafka-console-producer.sh --broker-list hd09-1:9092 --topic study
    消費者啟動:
    bin/kafka-console-consumer.sh --bootstrap-server hd09-1:9092 --topic study --from-beginning
    
    5)檢視主題詳細資訊
    bin/kafka-topics.sh --zookeeper hd09-1:2181 --describe --topic study

七、Kafka簡單API

1、Producer1類---kafka生產者API 介面回撥

package com.css.kafka.kafka_producer;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

/**
 * kafka生產者API
*/ public class Producer1 { public static void main(String[] args) { //1.配置生產者屬性(指定多個引數) Properties prop = new Properties(); //引數配置 //kafka節點的地址 prop.put("bootstrap.servers", "192.168.146.132:9092"); //傳送訊息是否等待應答 prop.put("acks", "all"); //配置傳送訊息失敗重試 prop.put("retries", "0"); //配置批量處理訊息大小 prop.put("batch.size", "10241"); //配置批量處理資料延遲 prop.put("linger.ms", "5"); //配置記憶體緩衝大小 prop.put("buffer.memory", "12341235"); //配置在傳送前必須序列化 prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //2.例項化producer KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop); //3.傳送訊息 for (int i = 0; i < 99; i++) { producer.send(new ProducerRecord<String, String>("test", "helloworld" + i)); } //4.釋放資源 producer.close(); } }

2、Producer2類---kafka生產者API 介面回撥

package com.css.kafka.kafka_producer;

import java.util.Properties;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

/**
 * kafka生產者API  介面回撥
*/ public class Producer2 { public static void main(String[] args) { //1.配置生產者屬性(指定多個引數) Properties prop = new Properties(); //引數配置 //kafka節點的地址 prop.put("bootstrap.servers", "192.168.146.132:9092"); //傳送訊息是否等待應答 prop.put("acks", "all"); //配置傳送訊息失敗重試 prop.put("retries", "0"); //配置批量處理訊息大小 prop.put("batch.size", "10241"); //配置批量處理資料延遲 prop.put("linger.ms", "5"); //配置記憶體緩衝大小 prop.put("buffer.memory", "12341235"); //配置在傳送前必須序列化 prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //自定義分割槽 prop.put("partitioner.class", "com.css.kafka.kafka_producer.Partition1"); //2.例項化producer KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop); //3.傳送訊息 for (int i = 0; i < 99; i++) { producer.send(new ProducerRecord<String, String>("yuandan", "nice" + i), new Callback() { public void onCompletion(RecordMetadata metadata, Exception exception) { //如果metadata不為null 拿到當前的資料偏移量與分割槽 if(metadata != null) { System.out.println(metadata.topic() + "----" + metadata.offset() + "----" + metadata.partition()); } } }); } //4.關閉資源 producer.close(); } }

3、Partition1類---設定自定義分割槽

package com.css.kafka.kafka_producer;

import java.util.Map;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

/**
 * 設定自定義分割槽
*/ public class Partition1 implements Partitioner{ //設定 public void configure(Map<String, ?> configs) { } //分割槽邏輯 public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { return 1; } //釋放資源 public void close() { } }

4、Consumer1類---消費者API

package com.css.kafka.kafka_consumer;

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

/**
 * 消費者類
*/ public class Consumer1 { public static void main(String[] args) { //1.配置消費者屬性 Properties prop = new Properties(); //2.配置屬性 //指定伺服器地址 prop.put("bootstrap.servers", "192.168.146.133:9092"); //配置消費者組 prop.put("group.id", "g1"); //配置是否自動確認offset prop.put("enable.auto.commit", "true"); //序列化 prop.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); prop.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //2.例項消費者 final KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(prop); //4.釋放資源 執行緒安全 Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() { public void run() { if (consumer != null) { consumer.close(); } } })); //訂閱訊息主題 consumer.subscribe(Arrays.asList("test")); //3.拉訊息 推push 拉poll while (true) { ConsumerRecords<String, String> records = consumer.poll(1000); //遍歷訊息 for (ConsumerRecord<String, String> record : records) { System.out.println(record.topic() + "-----" + record.value()); } } } }

5、Producer3類---kafka生產者API-帶攔截器

package com.css.kafka.interceptor;

import java.util.ArrayList;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

/**
 * kafka生產者API 帶攔截器
*/ public class Producer3 { public static void main(String[] args) { //1.配置生產者屬性(指定多個引數) Properties prop = new Properties(); //引數配置 //kafka節點的地址 prop.put("bootstrap.servers", "192.168.146.132:9092"); //傳送訊息是否等待應答 prop.put("acks", "all"); //配置傳送訊息失敗重試 prop.put("retries", "0"); //配置批量處理訊息大小 prop.put("batch.size", "10241"); //配置批量處理資料延遲 prop.put("linger.ms", "5"); //配置記憶體緩衝大小 prop.put("buffer.memory", "12341235"); //配置在傳送前必須序列化 prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //攔截器 ArrayList<String> inList = new ArrayList<String>(); inList.add("com.css.kafka.interceptor.TimeInterceptor"); prop.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, inList); //2.例項化producer KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop); //3.傳送訊息 for (int i = 0; i < 99; i++) { producer.send(new ProducerRecord<String, String>("test", "helloworld" + i)); } //4.釋放資源 producer.close(); } }

6、TimeInterceptor類---攔截器類

package com.css.kafka.interceptor;

import java.util.Map;

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

/**
 * 攔截器類
*/ public class TimeInterceptor implements ProducerInterceptor<String, String>{ //配置資訊 public void configure(Map<String, ?> configs) { } //業務邏輯 public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { return new ProducerRecord<String, String>( record.topic(), record.partition(), record.key(), System.currentTimeMillis() + "-" + record.value()); } //傳送失敗呼叫 public void onAcknowledgement(RecordMetadata metadata, Exception exception) { } //關閉資源 public void close() { } }

 7、kafka的maven依賴

    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.0.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>2.0.0</version>
    </dependency>