1. 程式人生 > WINDOWS開發 >kafka基礎命令及api使用

kafka基礎命令及api使用

一、Kafka 0.11

參考文件
(1)https://kafka.apache.org/0110/documentation.html

二、kafka 0.8
1、命令列操作
(1)新建topic

> bin/kafka-topics.sh --create --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --replication-factor 1 --partitions 1 --topic msg_format_v0

(2)傳送訊息

bin/kafka-console-producer.sh --broker-list hadoop1:9092,hadoop2:9092,hadoop3:9092 --topic msg_format_v0

2、API使用
(1)pom依賴

    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>0.8.2.1</version>
    </dependency>

(2)生產者api使用

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class TestProducer {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers","hadoop1:9092,hadoop3:9092");
        props.put("acks","all");
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

        Producer<String,String> producer = new KafkaProducer<>(props);
        //topic: 目標topic;  key: message的序號;   value: 寫入的message資訊;
        producer.send(new ProducerRecord<>("msg_format_v0","key","value"));

        //當不需要指定key值時,採用下面的方法
        //Producer<Object,String> producer2 = new KafkaProducer<>(props);
        //producer2.send(new ProducerRecord<>("msg_format_v1","value"));

        producer.close();

    }
}

參考文件
(1)https://kafka.apache.org/082/documentation.html#producerapi