kafka集群配置和java編寫生產者消費者操作例子
阿新 • • 發佈:2019-01-12
tor http dep org create comm getname fig exp
- kafka
- 安裝
- 修改配置文件
- java操作kafka
kafka
kafka的操作相對來說簡單很多
安裝
下載kafka http://kafka.apache.org/downloads tar -zxvf kafka_2.12-2.1.0.tgz rm kafka_2.12-2.1.0.tgz mv kafka_2.12-2.1.0 kafka sudo vim /etc/profile export KAFKA_HOME=/usr/local/kafka export PATH=$PATH:$KAFKA_HOME/bin source /etc/profile 準備 worker1 worker2 worker3 這四臺機器 首先確保你的zookeeper集群能夠正常運行worker1 worker2 worker3為zk集群 具體配置參照我的博客https://www.cnblogs.com/ye-hcj/p/9889585.html
修改配置文件
server.properties
sudo vim server.properties 添加如下屬性 broker.id=0 # 3臺機器分別設置成0 1 2 log.dirs=/usr/local/kafka/logs zookeeper.connect=worker1:2181,worker2:2181,worker3:2181
運行
運行 bin/kafka-server-start.sh config/server.properties 創建topic bin/kafka-topics.sh --create --zookeeper worker1:2181 --replication-factor 2 --partitions 2 --topic test 查看topic bin/kafka-topics.sh --list --zookeeper worker1:2181 訂閱topic,利用worker2來訂閱 bin/kafka-console-consumer.sh --bootstrap-server worker1:9092 --topic test --from-beginning topic發送消息 bin/kafka-console-producer.sh --broker-list worker1:9092 --topic test 鍵入任何消息,worker2都能接收到 查看topic詳情 bin/kafka-topics.sh --describe --zookeeper worker1:2181 --topic test
java操作kafka
依賴
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>2.1.0</version> </dependency>
生產者
public class Producer { public static void main( String[] args ){ Properties props = new Properties(); // 服務器ip props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "worker1:9092,worker2:9092,worker3:9092"); // 屬性鍵值對都序列化成字符串 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); // 創建一個生產者,向test主題發送數據 KafkaProducer<String, String> producer = new KafkaProducer<>(props); producer.send(new ProducerRecord<String, String>("test", "生產者傳遞的消息")); producer.close(); } }
消費者
public class Consumer { public static void main( String[] args ){ Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "worker1:9092,worker2:9092,worker3:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 消費者對象 KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props); kafkaConsumer.subscribe(Arrays.asList("test")); while (true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.between( LocalDateTime.parse("2019-01-09T11:30:30"), LocalDateTime.now())); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, value = %s", record.offset(), record.value()); System.out.println(); } } } }
kafka集群配置和java編寫生產者消費者操作例子