1. 程式人生 > >kafka消費者監控,程式啟動時執行

kafka消費者監控,程式啟動時執行

1.1 web.xml檔案新增listenner,如下:

<listener>
    <listener-class>com.sinosoft.lis.listener.ConsumerThreadListener</listener-class>
</listener>

1.2 編寫監控類,實現ServletContextListener 介面

package com.sinosoft.lis.listener;

import com.sinosoft.lis.services.addquasicust.ConsumerThread;

import javax.servlet.ServletContextEvent;
import javax.servlet.ServletContextListener;

public class ConsumerThreadListener implements ServletContextListener {
    @Override
    public void contextInitialized(ServletContextEvent sce) {
        ConsumerThread consumerThread = new ConsumerThread();
        consumerThread.start();
    }

    @Override
    public void contextDestroyed(ServletContextEvent sce) {

    }
}

1.3服務啟動,自動執行contextInitialized()的程式碼,即kafka相關內容,ConsumerThread 內容如下:

package com.sinosoft.lis.services.addquasicust;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ConsumerThread {
    private final KafkaConsumer<String, String> consumer;
    private final String topic;
    private ExecutorService executor;

    public ConsumerThread() {
        //獲取kafka服務配置
        Properties properties = buildKafkaProperty();
        this.consumer = new KafkaConsumer<>(properties);
        //tocpic名稱
        this.topic = "topic名稱";
        this.consumer.subscribe(Arrays.asList(this.topic));
    }

    public void start() {
        try {
            int threadCoreNumber = 3;
            int threadMaxNumber = 5;
            //啟用執行緒池
            executor = new ThreadPoolExecutor(threadCoreNumber, threadMaxNumber, 1L, TimeUnit.MINUTES,
                    new ArrayBlockingQueue<Runnable>(500), new ThreadPoolExecutor.CallerRunsPolicy());
             //kafka監控,另起一個執行緒,否則會由於while (true)導致阻斷,服務起不來
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    while (true) {
                        //相隔一定時間從kafka中讀取訊息
                        ConsumerRecords<String, String> consumerRecords = consumer.poll(1000);
                        for (ConsumerRecord<String, String> item : consumerRecords) {
                           //呼叫業務處理類
                            executor.submit(new ConsumerThreadHandler(item));
                        }
                    }
                }
            });
            thread.start();
        } catch (Exception e) {
            executor.shutdown();
        }
    }
	//kafka服務配置
    private static Properties buildKafkaProperty() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "storm-144:9092,storm-145:9092,storm-146:9092");
        props.put("group.id", "storm");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        return props;
    }
}

1.4業務邏輯處理類需實現Runnable 介面

package com.sinosoft.lis.services.addquasicust;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import java.lang.reflect.Method;

public class ConsumerThreadHandler implements Runnable {

    private ConsumerRecord consumerRecord;

    public ConsumerThreadHandler(ConsumerRecord consumerRecord) {
        this.consumerRecord = consumerRecord;
    }

    @Override
    public void run() {
        try {
         System.out.println(consumerRecord.value());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

1.5生產者相關程式碼,用於測試

package com.sinosoft.lis.services.addquasicust;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
public class ProducerTest {
    public void newTest() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "storm-144:9092,storm-145:9092,storm-146:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        for (int i = 0; i <= 10; i++) {
            String r = “測試”+i;
            producer.send(new ProducerRecord<String, String>("topic名稱", r));
        }
        producer.close();
    }

    public static void main(String[] args) {
        ProducerTest producerTest = new ProducerTest();
        producerTest.newTest();

    }
}

另:相關jar包

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.9.0.1</version>
</dependency>

注意該jar包要與kafka伺服器版本保持一致,否則有可能會發生錯誤