1. 程式人生 > >0.11.0.1的動態修改topic程式碼

0.11.0.1的動態修改topic程式碼

package kafkaDynamic;

import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.Future;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import util.PropertiesUtil;
/**
 * 
 * @author likexin
 *
 */
public class DynamicProducer {
    
    private static String topic = "atopic";

    public static void main(String[] args) {
        
        Properties proper = null;
        try {
            proper = PropertiesUtil.getProperties(System.getProperty("user.dir")+"/conf/producer.properties");
        } catch (IOException e1) {
            e1.printStackTrace();
        }
        
        pushMessage(proper);
        
        topic = "btopic";
        
        pushMessage(proper);
        
    }
    
    private static void pushMessage(Properties proper) {
        if (proper == null) {
            System.out.println("配置不能為空");
            System.exit(1);
        }
        KafkaProducer<String, String> myProducer = new KafkaProducer<String, String>(proper);

        try {
            for(int i = 0; i < 1; i++) {
                Future<RecordMetadata> future = myProducer.send(new ProducerRecord<String, String>(topic, Integer.toString(i), "LikexinMessage "  + topic + " -- "+ Integer.toString(i)));
                System.out.println(future.get());
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            myProducer.close();
        }
    }
    
}

package kafkaDynamic;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ConcurrentLinkedQueue;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import util.PropertiesUtil;
/**
 * 動態修改Topic
 * @author likexin
 *
 */
public class DynamicConsumer {
    
    private static String topic = "atopic";
    private static ConcurrentLinkedQueue<String> subscribedTopics = new ConcurrentLinkedQueue<>();
    
    public static void main(String[] args) {
        
        Properties proper = null;
        try {
            proper = PropertiesUtil.getProperties(System.getProperty("user.dir")+"/conf/consumer.properties");
        } catch (IOException e1) {
            e1.printStackTrace();
        }
        
        new DynamicConsumer().new alterTopic().start();
        
        pullMessage(proper);
        
    }
    
    private static void pullMessage(Properties proper) {
        if (proper == null) {
            System.out.println("配置不能為空");
            System.exit(1);
        }
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(proper);
        consumer.subscribe(Arrays.asList(topic));
        boolean flag = true;
        while (flag) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records)
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            if (!subscribedTopics.isEmpty()) {
                Iterator<String> iter = subscribedTopics.iterator();
                List<String> topics = new ArrayList<>();
                while (iter.hasNext()) {
                    topics.add(iter.next());
                }
                subscribedTopics.clear();
                consumer.subscribe(topics);
            }
        }
        
        consumer.close();
    }
    
    private class alterTopic extends Thread {

        @Override
        public void run() {
            try {
                Thread.sleep(15*1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            subscribedTopics.addAll(Arrays.asList("btopic"));
        }
        
    }
    
}

Consumer的配置檔案

bootstrap.servers=172.16.12.126:9100,172.16.12.127:9100,172.16.12.128:9100
enable.auto.commit=false
auto.commit.interval.ms=1000
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
auto.offset.reset=earliest
group.id=likexin
zookeeper.connect=172.16.12.128:4839
Producer配置檔案
bootstrap.servers=172.16.12.126:9100,172.16.12.127:9100,172.16.12.128:9100
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
#default.replication.factor=3
acks=-1
metadata.fetch.timeout.ms=12000
#producer.type=sync
#compression.type=gzip