kafka消費者監控,程式啟動時執行
阿新 • • 發佈:2018-12-29
1.1 web.xml檔案新增listenner,如下:
<listener>
<listener-class>com.sinosoft.lis.listener.ConsumerThreadListener</listener-class>
</listener>
1.2 編寫監控類,實現ServletContextListener 介面
package com.sinosoft.lis.listener; import com.sinosoft.lis.services.addquasicust.ConsumerThread; import javax.servlet.ServletContextEvent; import javax.servlet.ServletContextListener; public class ConsumerThreadListener implements ServletContextListener { @Override public void contextInitialized(ServletContextEvent sce) { ConsumerThread consumerThread = new ConsumerThread(); consumerThread.start(); } @Override public void contextDestroyed(ServletContextEvent sce) { } }
1.3服務啟動,自動執行contextInitialized()的程式碼,即kafka相關內容,ConsumerThread 內容如下:
package com.sinosoft.lis.services.addquasicust; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class ConsumerThread { private final KafkaConsumer<String, String> consumer; private final String topic; private ExecutorService executor; public ConsumerThread() { //獲取kafka服務配置 Properties properties = buildKafkaProperty(); this.consumer = new KafkaConsumer<>(properties); //tocpic名稱 this.topic = "topic名稱"; this.consumer.subscribe(Arrays.asList(this.topic)); } public void start() { try { int threadCoreNumber = 3; int threadMaxNumber = 5; //啟用執行緒池 executor = new ThreadPoolExecutor(threadCoreNumber, threadMaxNumber, 1L, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(500), new ThreadPoolExecutor.CallerRunsPolicy()); //kafka監控,另起一個執行緒,否則會由於while (true)導致阻斷,服務起不來 Thread thread = new Thread(new Runnable() { @Override public void run() { while (true) { //相隔一定時間從kafka中讀取訊息 ConsumerRecords<String, String> consumerRecords = consumer.poll(1000); for (ConsumerRecord<String, String> item : consumerRecords) { //呼叫業務處理類 executor.submit(new ConsumerThreadHandler(item)); } } } }); thread.start(); } catch (Exception e) { executor.shutdown(); } } //kafka服務配置 private static Properties buildKafkaProperty() { Properties props = new Properties(); props.put("bootstrap.servers", "storm-144:9092,storm-145:9092,storm-146:9092"); props.put("group.id", "storm"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); return props; } }
1.4業務邏輯處理類需實現Runnable 介面
package com.sinosoft.lis.services.addquasicust; import org.apache.kafka.clients.consumer.ConsumerRecord; import java.lang.reflect.Method; public class ConsumerThreadHandler implements Runnable { private ConsumerRecord consumerRecord; public ConsumerThreadHandler(ConsumerRecord consumerRecord) { this.consumerRecord = consumerRecord; } @Override public void run() { try { System.out.println(consumerRecord.value()); } catch (Exception e) { e.printStackTrace(); } }
1.5生產者相關程式碼,用於測試
package com.sinosoft.lis.services.addquasicust;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class ProducerTest {
public void newTest() {
Properties props = new Properties();
props.put("bootstrap.servers", "storm-144:9092,storm-145:9092,storm-146:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i <= 10; i++) {
String r = “測試”+i;
producer.send(new ProducerRecord<String, String>("topic名稱", r));
}
producer.close();
}
public static void main(String[] args) {
ProducerTest producerTest = new ProducerTest();
producerTest.newTest();
}
}
另:相關jar包
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>
</dependency>
注意該jar包要與kafka伺服器版本保持一致,否則有可能會發生錯誤