Kafka安裝部署,及Java測試
阿新 • • 發佈:2018-12-20
Kafka
一、單例模式安裝
1、準備安裝檔案kafka_2.11-0.11.0.0.tgz,放到linux伺服器,並解壓
2、vim config/server.properties
3、修改項如下: host.name=你的ip listeners=PLAINTEXT://你的ip:9092 advertised.listeners=PLAINTEXT://你的ip:9092 zookeeper.connect=zookeeper叢集地址
4、啟動kafka ./bin/kafka-server-start.sh -daemon config/server.properties &
5、測試
(1)建立topic:my-first-topic
./kafka-topics.sh --create --zookeeper 192.168.85.130:2181 --topic my-first-topic --partitions 1 --replication-factor 1
(2)啟動consumer
./kafka-console-consumer.sh --bootstrap-server 192.168.85.130:9092 --topic my-first-topic --from-beginning
本指令執行完畢後暫時沒有任何輸出。
(3)啟動producer,這裡需要新開一個視窗
./kafka-console-producer.sh --broker-list 192.168.85.130:9092 --topic my-first-topic
啟動成功後,在本視窗輸入任意內容,回車傳送後,可以上consumer視窗接受到相同的訊息。
6、java程式碼測試
//新建topic public class TestKafka { public static void main(String[] arg){ Properties prop = new Properties(); prop.put("bootstrap.servers","192.168.85.130:9092"); AdminClient adminClient = AdminClient.create(prop); List<NewTopic> topicList = new ArrayList<>(); NewTopic topic = new NewTopic("my-first-topic", 1, (short)1); topicList.add(topic); adminClient.createTopics(topicList); } }
//Producer public class MyProducer { public static void main(String[] args){ Properties props = new Properties(); props.put("bootstrap.servers", "192.168.85.130:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<String, String>(props); for (int i = 0; i < 10; i++) producer.send(new ProducerRecord<String, String>("my-first-topic", Integer.toString(i), Integer.toString(i))); producer.close(); } }
//Consumer
public class MyConsumer { public static void main(String[] args){ Properties props = new Properties(); props.put("bootstrap.servers", "192.168.85.130:9092"); props.put("group.id", "my-first-topic"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); final KafkaConsumer<String, String> consumer = new KafkaConsumer<String,String>(props); consumer.subscribe(Arrays.asList("my-first-topic"),new ConsumerRebalanceListener() { public void onPartitionsRevoked(Collection<TopicPartition> collection) { } public void onPartitionsAssigned(Collection<TopicPartition> collection) { //將偏移設定到最開始 consumer.seekToBeginning(collection); } }); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } }