0.11.0.1的動態修改topic程式碼
阿新 • • 發佈:2018-12-23
package kafkaDynamic; import java.io.IOException; import java.util.Properties; import java.util.concurrent.Future; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import util.PropertiesUtil; /** * * @author likexin * */ public class DynamicProducer { private static String topic = "atopic"; public static void main(String[] args) { Properties proper = null; try { proper = PropertiesUtil.getProperties(System.getProperty("user.dir")+"/conf/producer.properties"); } catch (IOException e1) { e1.printStackTrace(); } pushMessage(proper); topic = "btopic"; pushMessage(proper); } private static void pushMessage(Properties proper) { if (proper == null) { System.out.println("配置不能為空"); System.exit(1); } KafkaProducer<String, String> myProducer = new KafkaProducer<String, String>(proper); try { for(int i = 0; i < 1; i++) { Future<RecordMetadata> future = myProducer.send(new ProducerRecord<String, String>(topic, Integer.toString(i), "LikexinMessage " + topic + " -- "+ Integer.toString(i))); System.out.println(future.get()); } } catch (Exception e) { e.printStackTrace(); } finally { myProducer.close(); } } }
package kafkaDynamic; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Iterator; import java.util.List; import java.util.Properties; import java.util.concurrent.ConcurrentLinkedQueue; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import util.PropertiesUtil; /** * 動態修改Topic * @author likexin * */ public class DynamicConsumer { private static String topic = "atopic"; private static ConcurrentLinkedQueue<String> subscribedTopics = new ConcurrentLinkedQueue<>(); public static void main(String[] args) { Properties proper = null; try { proper = PropertiesUtil.getProperties(System.getProperty("user.dir")+"/conf/consumer.properties"); } catch (IOException e1) { e1.printStackTrace(); } new DynamicConsumer().new alterTopic().start(); pullMessage(proper); } private static void pullMessage(Properties proper) { if (proper == null) { System.out.println("配置不能為空"); System.exit(1); } KafkaConsumer<String, String> consumer = new KafkaConsumer<>(proper); consumer.subscribe(Arrays.asList(topic)); boolean flag = true; while (flag) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); if (!subscribedTopics.isEmpty()) { Iterator<String> iter = subscribedTopics.iterator(); List<String> topics = new ArrayList<>(); while (iter.hasNext()) { topics.add(iter.next()); } subscribedTopics.clear(); consumer.subscribe(topics); } } consumer.close(); } private class alterTopic extends Thread { @Override public void run() { try { Thread.sleep(15*1000); } catch (InterruptedException e) { e.printStackTrace(); } subscribedTopics.addAll(Arrays.asList("btopic")); } } }
Consumer的配置檔案
Producer配置檔案bootstrap.servers=172.16.12.126:9100,172.16.12.127:9100,172.16.12.128:9100 enable.auto.commit=false auto.commit.interval.ms=1000 key.deserializer=org.apache.kafka.common.serialization.StringDeserializer value.deserializer=org.apache.kafka.common.serialization.StringDeserializer auto.offset.reset=earliest group.id=likexin zookeeper.connect=172.16.12.128:4839
bootstrap.servers=172.16.12.126:9100,172.16.12.127:9100,172.16.12.128:9100
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
#default.replication.factor=3
acks=-1
metadata.fetch.timeout.ms=12000
#producer.type=sync
#compression.type=gzip