訊息佇列之kafka(API)
阿新 • • 發佈:2018-12-31
1.模擬實現kafka的生產者消費者(原生API)
解決相關依賴:
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>2.1.0</version>
</dependency>
生產者:
packagecom.zy.kafka; importjava.util.Properties; importorg.apache.kafka.clients.producer.KafkaProducer; importorg.apache.kafka.clients.producer.Producer; importorg.apache.kafka.clients.producer.ProducerRecord; publicclassKafkaTest { publicstaticvoidmain(String[] args) { //1.載入配置檔案 //1.1封裝配置檔案物件 Properties prps=newProperties(); //配置broker地址 prps.put("bootstrap.servers", "hadoop02:9092"); //配置ack級別:0 1 -1(all) prps.put("acks", "all"); //重試次數 prps.put("retries", 3); prps.put("batch.size", 16384); prps.put("linger.ms",1); prps.put("buffer.memory", 33554432); //指定(message的K-V)的序列化 prps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); prps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //2.建立生產者物件(指定的key和value的泛型) Producer<String, String>producer=new KafkaProducer<>(prps); //生產者傳送訊息 for(inti=0;i<100;i++) { /** * ProducerRecord<String, String>(topic, value) * topic:主題名稱 * key: * value: */ //訊息的封裝物件 ProducerRecord<String, String>pr=newProducerRecord<String, String>("test_topic", "key"+i, "value"+i); producer.send(pr); } producer.close(); } }
消費者:
packagecom.zy.kafka; importjava.util.Arrays; importjava.util.Properties; importorg.apache.kafka.clients.consumer.ConsumerRecord; importorg.apache.kafka.clients.consumer.ConsumerRecords; importorg.apache.kafka.clients.consumer.KafkaConsumer; importorg.apache.kafka.clients.producer.KafkaProducer; importorg.apache.kafka.clients.producer.Producer; importorg.apache.kafka.clients.producer.ProducerRecord; publicclassKafkaTest { publicstaticvoidmain(String[] args) { //1.載入配置檔案 //1.1封裝配置檔案物件 Properties prps=newProperties(); //配置broker地址 prps.put("bootstrap.servers", "hadoop02:9092"); //指定消費的組的ID prps.put("group.id", "test"); //是否啟動自動提交(是否自動提交反饋資訊,向zookeeper提交) prps.put("enable.auto.commit", "true"); //自動提交的時間間隔 prps.put("auto.commit.interval.ms", "1000"); //指定(message的K-V)的序列化 prps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); prps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //建立kafka的消費者 KafkaConsumer<String, String>consumer=newKafkaConsumer<>(prps); //新增消費主題 consumer.subscribe(Arrays.asList("kafka_test")); //開始消費 while(true) { //設定從哪裡開始消費,返回的是一個消費記錄 ConsumerRecords<String, String>poll = consumer.poll(10); for(ConsumerRecord<String, String>p:poll) { System.out.printf("offset=%d,key=%s,value=%s\n",p.offset(),p.key(),p.value()); } } } }
2.以shell命令的方式API
import java.io.IOException; import java.io.InputStream; import java.util.Properties; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import kafka.admin.TopicCommand; public class KafkaAPI { public static void main(String[] args) throws IOException { /* kafka-topics.sh \ --create \ --zookeeper hadoop02:2181,hadoop03:2181,hadoop04:2181 \ --replication-factor 3 \ --partitions 10 \ --topic kafka_test11 */ //建立一個topic String ops[]=new String []{ "--create", "--zookeeper","hadoop01:2181,hadoop02:2181,hadoop03:2181", "--replication-factor","3", "--topic","zy_topic","--partitions","5" }; String list[]=new String[] { "--list", "--zookeeper", "hadoop01:2181,hadoop02:2181,hadoop03:2181" }; //以命令的方式提交 TopicCommand.main(list); } }