1. 程式人生 > >史上最簡單的kafka實戰教程

史上最簡單的kafka實戰教程

你在寫java 版的 kafka程式可能會遇到如下問題

問題一:程式丟擲了org.apache.kafka.common.errors.TimeoutException:

在application.yml 中加入下面這句話 :logging.level.root:debug

然後再看報錯,可以發現下面這個錯誤

錯誤:java.io.IOException: Can't resolve address: ubuntu:9092

原來是無法解析ubuntu.

解決辦法:到C:\Windows\System32\drivers\etc ,用nopaid++開啟hosts檔案 加入下面這句話  遠端主機的ip   ubuntu 

如   192.168.23.139     ubuntu

好了開始正題了:

一,你得有個虛擬機器,裡面跑著ubuntu

二,環境安裝,1.裝jdk, jdk1.8下載地址 2.環境變數配置

2、解壓原始碼包

通過終端在/usr/local目錄下新建java資料夾,命令列:

2、解壓原始碼包
通過終端在/usr/local目錄下新建java資料夾,命令列:

sudo mkdir /usr/local/java

然後將下載到壓縮包拷貝到java資料夾中,命令列:
進入jdk原始碼包所在目錄

cp jdk-8u25-linux-x64.tar.gz /usr/local/java

然後進入java目錄,命令列:

cd /usr/local/java

解壓壓縮包,命令列:

sudo tar xvf jdk-8u25-linux-x64.tar.gz

然後可以把壓縮包刪除,命令列:

sudo rm jdk-8u25-linux-x64.tar.gz

3、設定jdk環境變數

這裡採用全域性設定方法,它是是所有使用者的共用的環境變數

$sudo gedit ~/.bashrc

開啟之後在末尾新增

export JAVA_HOME=/usr/local/java/jdk1.8.0_25  
export JRE_HOME=${JAVA_HOME}/jre  
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib  

export PATH=${JAVA_HOME}/bin:$PATH

請記住,在上述新增過程中,等號兩側不要加入空格,不然會出現“不是有效的識別符號”,因為source /etc/profile 時不能識別多餘到空格,會理解為是路徑一部分。
然後儲存。

4、檢驗是否安裝成功
在終端輸入如下命令
java -version


下載kafka下載地址 ,解壓檔案

tar  zxvf  下載得檔案位置  -C 要解壓得位置 如/Download

1.開啟kafka內建得zookeeper

bin/zookeeper-server-start.sh config/zookeeper.properties

2.開啟kafka服務

bin/kafka-server-start.sh config/server.properties

3.建立一個topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic hello

4.檢視我們建立得topic

bin/kafka-topics.sh --list --zookeeper localhost:2181 

5.建立一個消費者

bin/kafka-console-consumer.sh  --bootstrap-server  localhost:9092 --topic hello

6建立一個生產者

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic hello

5.檢視我們的ip

ifconfig

我的ip是

192.168.23.139

一,建立一個springboot專案

二,修改application.properties 為application.yml

開啟pom.xml 檔案新增下面依賴

我的springboot parent 如下

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.3.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>
這個是要新增的spring-kafka依賴
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.1.7.RELEASE</version>
</dependency>

日誌檔案依賴

<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
</dependency>

開啟application.yml檔案

spring:
  kafka:
    producer:
      retries: 0
batch-size: 16384
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      bootstrap-servers: 192.168.23.139:9092
    consumer:
      bootstrap-servers: 192.168.23.139:9092
      group-id: foo
      auto-offset-reset: earliest
enable-auto-commit: true
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#logging.level.root: debug

建立一個消費者

@Component
@Slf4j
public class KafkaConsumer {

    /**
     * 監聽test主題,有訊息就讀取
     *
     * @param message
*/
@KafkaListener(topics = {"hello"})
    public void consumer(String message) {
        log.info("c1 消費了hello topic messge:{}", message);
}

}

再建立一個生產者

@Component
@Slf4j
public class KafkaSender {
        @Autowired
private KafkaTemplate kafkaTemplate;
/**
         * 傳送訊息到kafka,主題為test
         */
public void sendTest(){
            kafkaTemplate.send("hello","hello,kafka  "  + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS")));
}

}

application 主程式

@SpringBootApplication
@EnableScheduling
public class KafkaApplication {
   @Autowired
KafkaSender kafkaSender;
@Autowired
KafkaConsumer kafkaConsumer;
    public static void main(String[] args) {
        SpringApplication.run(KafkaApplication.class, args);
}
    @Scheduled(cron = "0/2 * * * * ? ")
    public void sendMsg(){
      kafkaSender.sendTest();
}
}