Kafka 生產消費例項
阿新 • • 發佈:2019-01-03
- 環境準備
- 建立topic
- 命令列模式
- 執行生產者消費者例項
- 客戶端模式
- 執行消費者生產者
1. 環境準備
說明:kafka叢集環境我比較懶直接使用公司現有的環境。安全起見,所有的操作都是在自己使用者下完成的,若是自己的kafka環境,完全可以使用kafka管理員的使用者。建立topic時需要在kafka管理員的使用者下完成。
1.登入到kafka叢集中節點,並切換到kafka管理員使用者下
ssh 172.16.150.xx 22
su - kafka
2.建立topic
建立topic命令:
kafka-topics --zookeeper bdap-nn-1.cebbank. com,bdap-mn-1.cebbank.com,bdap-nn-2.cebbank.com:2181/kafka --create --topic topicname --partitions 4 --replication-factor 3
查詢topic命令:
kafka-topics --zookeeper bdap-nn-1.cebbank.com,bdap-mn-1.cebbank.com,bdap-nn-2.cebbank.com:2181/kafka –list
注意建立topic時會指定分割槽數,zookerper叢集名及topicname要換成自己的,topicname不要重複
3.給我自己的使用者建立讀寫topic的許可權
寫許可權:
kafka-acls --authorizer-properties zookeeper.connect=bdap-nn-1.cebbank.com,bdap-mn-1.cebbank.com,bdap-nn-2.cebbank.com:2181/kafka --add --allow-principal User:xx --operation Write --operation Describe --topic topicname
讀許可權:
kafka-acls --authorizer-properties zookeeper. connect=bdap-nn-1.cebbank.com,bdap-mn-1.cebbank.com,bdap-nn-2.cebbank.com:2181/kafka --add --allow-principal User:xx --operation READ --topic topicname --group "*"
2. 命令列
1.需要切換到自己使用者下
2.將kafka使用者目錄下的producer.properties、consumer.properties拷貝到自己目錄下
執行消費者例項
kafka-console-consumer --zookeeper bdap-nn-1.cebbank.com,bdap-mn-1.cebbank.com,bdap-nn-2.cebbank.com:2181/kafka -consumer.config /home/username/consumer.properties --topic topicname --new-consumer --bootstrap-server bdap-nn-1.cebbank.com:9092 --from-beginning
執行生產者例項
kafka-console-producer --broker-list bdap-nn-1.cebbank.com:9092, bdap-mn-1.cebbank.com:9092,bdap-nn-2.cebbank.com:9092 --topic topicname --producer.config /home/username/producer.properties
生產消費者例項啟動後,在生產者視窗中輸入任意字元後,消費者視窗能接收到,則例項執行完成。
命令列的例項很簡單,就一個收發功能,只是讓我們先認識一下kafka的生產消費形式。實際專案中都是在程式碼中實現生產消費的。
3. 客戶端
消費者程式碼
package kafka.consumer;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MyKafkaConsumer {
private static final Logger log = LoggerFactory.getLogger(MyKafkaConsumer.class);
public static void main(String[] args) throws InterruptedException {
//kerberos配置,無認證時,不需要引入
System.setProperty("java.security.krb5.conf","D:/krb5.conf");
System.setProperty("java.security.auth.login.config","D:/lsz_jaas.conf");
Properties props = new Properties();
log.info("**********************************************");
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.kerberos.service.name", "kafka");
props.put("bootstrap.servers", "172.16.150.xx:9092,172.16.150.xx1:9092,172.16.150.xx2:9092");
//消費者分組,如果一個topic有4個分割槽,並且一個消費者分組有2個消費者。
//每個消費者消費2個分組
props.put("group.id", "kafka_lsz_1");
//自動提交偏移量,改為false為手動控制偏移量
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");//增大poll的間隔,可以為消費者提供更多的時間去處理返回的訊息,缺點是此值越大將會延遲組重新平衡
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// props.put("auto.offset.reset", "earliest");
KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(props);
consumer.seekToBeginning();
consumer.subscribe(Arrays.asList("lsztopic3"));
/**自動提交偏移量*/
while (true) {
//消費者訂閱topic後,呼叫poll方法,加入到組。
//要留在組中,必須持續呼叫poll方法
ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
for (ConsumerRecord<byte[], byte[]> record : records) {
System.out.println(record.topic()+" --- "+ record.partition());
System.out.printf("offset = %d, key = %s", record.offset(),record.key()+" \r\n");
}
}
}
}
生產者
package kafka.producer;
import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class consume {
private static final Logger LOG = LoggerFactory.getLogger(MyKafkaProducer.class);
private static final String TOPIC = "lsztopic3";
public static void main(String[] args) throws Exception {
System.setProperty("java.security.krb5.conf","D:/krb5.conf");
System.setProperty("java.security.auth.login.config","D:/lsz_jaas.conf");
Properties props = new Properties();
props.put("bootstrap.servers", "172.16.150.xx:9092,172.16.150.xx1:9092,172.16.150.xx2:9092");
props.put("producer.type", "async");
// 重試次數
props.put("message.send.max.retries", "3");
// 非同步提交的時候(async),併發提交的記錄數
props.put("batch.num.messages", "200");
//快取池大小
props.put("batch.size", "16384");
// 設定緩衝區大小,預設10KB
props.put("send.buffer.bytes", "102400");
props.put("request.required.acks", "1");
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.kerberos.service.name", "kafka");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// props.put("partitioner.class", "kafka.producer.KafkaCustomPartitioner");
KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(props);
String key = "";
String value = "";
ProducerRecord<String,String> records = new ProducerRecord<String,String>(TOPIC,key,value);
kafkaProducer.send(records,new Callback(){
public void onCompletion(RecordMetadata metadata, Exception e) {
if(e != null)
e.printStackTrace();
System.out.println("The offset of the record we just sent is: "+metadata.partition() +" "+ metadata.offset());
}
});
//Thread.sleep(5000);
kafkaProducer.close();
}
}
當然,客戶端的生產者啟動後,命令列消費者同樣可以收到訊息。不過如果使用kerberos認證後,一定要注意客戶端和服務端的時間,kerberos有個時間檢驗,若兩端時間不一致,則消費者收不到訊息。