1. 程式人生 > >Kafka 生產消費例項

Kafka 生產消費例項

  • 環境準備
    • 建立topic
  • 命令列模式
    • 執行生產者消費者例項
  • 客戶端模式
    • 執行消費者生產者

1. 環境準備

說明:kafka叢集環境我比較懶直接使用公司現有的環境。安全起見,所有的操作都是在自己使用者下完成的,若是自己的kafka環境,完全可以使用kafka管理員的使用者。建立topic時需要在kafka管理員的使用者下完成。

1.登入到kafka叢集中節點,並切換到kafka管理員使用者下

ssh 172.16.150.xx 22
su - kafka

2.建立topic

建立topic命令:
kafka-topics --zookeeper bdap-nn-1.cebbank.
com,bdap-mn-1.cebbank.com,bdap-nn-2.cebbank.com:2181/kafka --create --topic topicname --partitions 4 --replication-factor 3 查詢topic命令: kafka-topics --zookeeper bdap-nn-1.cebbank.com,bdap-mn-1.cebbank.com,bdap-nn-2.cebbank.com:2181/kafka –list

注意建立topic時會指定分割槽數,zookerper叢集名及topicname要換成自己的,topicname不要重複

3.給我自己的使用者建立讀寫topic的許可權

寫許可權:
kafka-acls --authorizer-properties zookeeper.connect=bdap-nn-1.cebbank.com,bdap-mn-1.cebbank.com,bdap-nn-2.cebbank.com:2181/kafka --add --allow-principal User:xx --operation Write --operation Describe --topic topicname
讀許可權:
kafka-acls --authorizer-properties zookeeper.
connect=bdap-nn-1.cebbank.com,bdap-mn-1.cebbank.com,bdap-nn-2.cebbank.com:2181/kafka --add --allow-principal User:xx --operation READ --topic topicname --group "*"

2. 命令列

1.需要切換到自己使用者下
2.將kafka使用者目錄下的producer.properties、consumer.properties拷貝到自己目錄下

執行消費者例項

kafka-console-consumer --zookeeper bdap-nn-1.cebbank.com,bdap-mn-1.cebbank.com,bdap-nn-2.cebbank.com:2181/kafka -consumer.config /home/username/consumer.properties --topic topicname --new-consumer --bootstrap-server bdap-nn-1.cebbank.com:9092 --from-beginning

執行生產者例項

kafka-console-producer --broker-list bdap-nn-1.cebbank.com:9092, bdap-mn-1.cebbank.com:9092,bdap-nn-2.cebbank.com:9092 --topic topicname --producer.config /home/username/producer.properties

生產消費者例項啟動後,在生產者視窗中輸入任意字元後,消費者視窗能接收到,則例項執行完成。

命令列的例項很簡單,就一個收發功能,只是讓我們先認識一下kafka的生產消費形式。實際專案中都是在程式碼中實現生產消費的。

3. 客戶端

消費者程式碼

package kafka.consumer;

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MyKafkaConsumer {
    private static final Logger log = LoggerFactory.getLogger(MyKafkaConsumer.class);
    public static void main(String[] args) throws InterruptedException {
        //kerberos配置,無認證時,不需要引入
         System.setProperty("java.security.krb5.conf","D:/krb5.conf");
         System.setProperty("java.security.auth.login.config","D:/lsz_jaas.conf");
         Properties props = new Properties();
         log.info("**********************************************");
         props.put("security.protocol", "SASL_PLAINTEXT");
         props.put("sasl.kerberos.service.name", "kafka");
         props.put("bootstrap.servers", "172.16.150.xx:9092,172.16.150.xx1:9092,172.16.150.xx2:9092");

         //消費者分組,如果一個topic有4個分割槽,並且一個消費者分組有2個消費者。
         //每個消費者消費2個分組
         props.put("group.id", "kafka_lsz_1");

         //自動提交偏移量,改為false為手動控制偏移量
         props.put("enable.auto.commit", "true");

         props.put("auto.commit.interval.ms", "1000");//增大poll的間隔,可以為消費者提供更多的時間去處理返回的訊息,缺點是此值越大將會延遲組重新平衡
         props.put("session.timeout.ms", "30000");
         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // props.put("auto.offset.reset", "earliest");
         KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(props);
         consumer.seekToBeginning();
         consumer.subscribe(Arrays.asList("lsztopic3"));

         /**自動提交偏移量*/
         while (true) {
             //消費者訂閱topic後,呼叫poll方法,加入到組。
             //要留在組中,必須持續呼叫poll方法
             ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
             for (ConsumerRecord<byte[], byte[]> record : records) {
                System.out.println(record.topic()+" --- "+ record.partition());
                System.out.printf("offset = %d, key = %s", record.offset(),record.key()+"     \r\n");
            }         

         }
    }
}

生產者

package kafka.producer;

import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class consume {

  private static final Logger LOG = LoggerFactory.getLogger(MyKafkaProducer.class);
  private static final String TOPIC = "lsztopic3";

  public static void main(String[] args) throws Exception {

    System.setProperty("java.security.krb5.conf","D:/krb5.conf");
    System.setProperty("java.security.auth.login.config","D:/lsz_jaas.conf");
    Properties props = new Properties();

    props.put("bootstrap.servers", "172.16.150.xx:9092,172.16.150.xx1:9092,172.16.150.xx2:9092");
    props.put("producer.type", "async");
    // 重試次數
    props.put("message.send.max.retries", "3");
    // 非同步提交的時候(async),併發提交的記錄數
    props.put("batch.num.messages", "200");
    //快取池大小
    props.put("batch.size", "16384");
    // 設定緩衝區大小,預設10KB
    props.put("send.buffer.bytes", "102400");
    props.put("request.required.acks", "1");
    props.put("security.protocol", "SASL_PLAINTEXT");
    props.put("sasl.kerberos.service.name", "kafka");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
   // props.put("partitioner.class", "kafka.producer.KafkaCustomPartitioner");
    KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(props);

    String key = "";
    String value = "";
    ProducerRecord<String,String> records = new ProducerRecord<String,String>(TOPIC,key,value);
    kafkaProducer.send(records,new Callback(){

        public void onCompletion(RecordMetadata metadata, Exception e) {
            if(e != null)
                e.printStackTrace();
            System.out.println("The offset of the record we just sent is: "+metadata.partition() +"  "+ metadata.offset());
        }
    });

    //Thread.sleep(5000);
   kafkaProducer.close();
  }
}

當然,客戶端的生產者啟動後,命令列消費者同樣可以收到訊息。不過如果使用kerberos認證後,一定要注意客戶端和服務端的時間,kerberos有個時間檢驗,若兩端時間不一致,則消費者收不到訊息。