1. 程式人生 > 其它 >kafka叢集六、java操作kafka(沒有密碼驗證)

kafka叢集六、java操作kafka(沒有密碼驗證)

系列導航

一、kafka搭建-單機版

二、kafka搭建-叢集搭建

三、kafka叢集增加密碼驗證

四、kafka叢集許可權增加ACL

五、kafka叢集__consumer_offsets副本數修改

kafka環境搭建好了如何通過程式碼來訪問?

先介紹不需要密碼驗證的kafka叢集如何操作

1、環境

包:kafka-clients-0.11.0.1.jar

jkd:1.7

2、kafka配置類

package nopassword; 

import java.util.Properties;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer; /* kafka沒有使用者名稱驗證的配置 */ public class KafkaUtil { //kafka叢集地址 public static final String servers="PLAINTEXT://192.168.0.104:9092,PLAINTEXT://192.168.0.104:9092,PLAINTEXT://192.168.0.104:9092"; //kafka叢集生產者配置 public static KafkaProducer<String, String> getProducer() { Properties props
= new Properties(); props.put("bootstrap.servers",servers ); props.put("acks", "1"); props.put("retries", 0); props.put("batch.size", 0);//16384 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer
<String, String> kp = new KafkaProducer<String, String>(props); return kp; } //kafka叢集消費者配置 public static KafkaConsumer<String, String> getConsumer(String groupId) { Properties props = new Properties(); props.put("bootstrap.servers", servers); props.put("auto.offset.reset", "earliest"); props.put("group.id", groupId); props.put("enable.auto.commit", "false"); props.put("auto.commit.interval.ms", "100"); props.put("max.partition.fetch.bytes", "10240"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> kc = new KafkaConsumer<String, String>(props); return kc; } }

3、生產者類ProducerClient

package nopassword; 

import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
 
 
public class ProducerClient {

    /**
     * @param args
     */
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        sendToKafka();
    }
    
    private static Producer<String, String> producer = KafkaUtil.getProducer();
    public static void sendToKafka( ) { 
        for(int i=0;i<5000;i++){
            try {
                final ProducerRecord<String, String> record = new ProducerRecord<String, String>("testTopic",
                        "d+key-" + i, "{\"name\":\"哈哈\",\"id\":\"218CF4630C2F8795\"}");
                Future<RecordMetadata> send = producer.send(record, new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception e) {
                        if (e != null) {
                            e.printStackTrace();
                        }
                    }
                });

                System.out.println("sendToKafka-傳送至Kafka:" + "d+key-" + i);

            } catch (Exception e) {
                e.printStackTrace();

            }
        }
        producer.close();  
        }
}

4、消費者類ConsumerClient

package nopassword; 

import java.util.Arrays;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
 
/*
消費者
*/
public class ConsumerClient {

    public  static KafkaConsumer<String, String> consumer = null;
    
    public static void main(String[] args) {
        fecthKafka();
    }
     
    public static void fecthKafka( ) {
         consumer = KafkaUtil.getConsumer("testGroup"); //group   
         consumer.subscribe(Arrays.asList("testTopic"));//topics  
 
         int i=0;
         while (true) {
             ConsumerRecords<String, String> records ;
             try {
                 records = consumer.poll(Long.MAX_VALUE);//毫秒
             }catch (Exception e){
                 e.printStackTrace();
                 continue;
             }

             for (ConsumerRecord<String, String> record : records) {
                  System.out.println("fetched from partition " + record.partition() + ", offset: " + record.offset() + ",key: " + record.key() + ",value:" + record.value() );
                  i++;
                  System.out.println(i);
             }

             try {
                 consumer.commitSync();
             } catch (Exception e) {
                 e.printStackTrace();
                 continue;
             }
             
          
             
       }
    }
}