kafka系列 -- 多線程消費者實現
阿新 • • 發佈:2018-10-13
eating turn collect 處理程序 per message arr ren 線程安全
看了一個星期的kafka,然後寫了消費Kafka數據的代碼。
感覺自己還是很不合格。
- 不能隨心所欲地操作數據,數據結構沒學好,spark的RDD操作沒學好。
- 不能很好地組織代碼結構,設計模式沒學好,面向對象思想理解不夠。
消費程序特點:
- 用隊列來存儲要消費的數據。
- 用隊列來存儲要提交的offest,然後處理線程將其給回消費者提交。
- 每個分區開一個處理線程來處理數據,分區與處理器的映射放在map中。
- 當處理到一定的數量或者距離上一次處理一定的時間間隔後, 由poll線程進行提交offset。
不好的地方:
- 每次處理的數據太少,而且每個數據都進行判斷其分區是否已經有處理線程在處理了。
- 獲取topic不太優雅。
流程圖
下面是多線程消費者實現:
1. 管理程序
/** * 負責啟動消費者線程MsgReceiver, 保存消費者線程MsgReceiver, 保存處理任務和線程RecordProcessor, 以及銷毀這些線程 * Created by stillcoolme on 2018/10/12. */ public class KafkaMultiProcessorMain { private static final Logger logger = LoggerFactory.getLogger(KafkaMultiProcessorMain.class); // 消費者參數 private Properties consumerProps = new Properties(); // kafka消費者參數 Map<String, Object> consumerConfig; //存放topic的配置 Map<String, Object> topicConfig; //訂閱的topic private String alarmTopic; //消費者線程數組 private Thread[] threads; //保存處理任務和線程的map ConcurrentHashMap<TopicPartition, RecordProcessor> recordProcessorTasks = new ConcurrentHashMap<>(); ConcurrentHashMap<TopicPartition, Thread> recordProcessorThreads = new ConcurrentHashMap<>(); public void setAlarmTopic(String alarmTopic) { this.alarmTopic = alarmTopic; } public static void main(String[] args) { KafkaMultiProcessorMain kafkaMultiProcessor = new KafkaMultiProcessorMain(); //這樣設置topic不夠優雅啊!!! kafkaMultiProcessor.setAlarmTopic("picrecord"); kafkaMultiProcessor.init(null); } private void init(String consumerPropPath) { getConsumerProps(consumerPropPath); consumerConfig = getConsumerConfig(); int threadsNum = 3; logger.info("create " + threadsNum + " threads to consume kafka warn msg"); threads = new Thread[threadsNum]; for (int i = 0; i < threadsNum; i++) { MsgReceiver msgReceiver = new MsgReceiver(consumerConfig, alarmTopic, recordProcessorTasks, recordProcessorThreads); Thread thread = new Thread(msgReceiver); threads[i] = thread; } for (int i = 0; i < threadsNum; i++) { threads[i].start(); } logger.info("finish creating" + threadsNum + " threads to consume kafka warn msg"); } //銷毀啟動的線程 public void destroy() { closeRecordProcessThreads(); closeKafkaConsumer(); } private void closeRecordProcessThreads() { logger.debug("start to interrupt record process threads"); for (Map.Entry<TopicPartition, Thread> entry : recordProcessorThreads.entrySet()) { Thread thread = entry.getValue(); thread.interrupt(); } logger.debug("finish interrupting record process threads"); } private void closeKafkaConsumer() { logger.debug("start to interrupt kafka consumer threads"); //使用interrupt中斷線程, 在線程的執行方法中已經設置了響應中斷信號 for (int i = 0; i < threads.length; i++) { threads[i].interrupt(); } logger.debug("finish interrupting consumer threads"); } private Map<String,Object> getConsumerConfig() { return ImmutableMap.<String, Object>builder() .put("bootstrap.servers", consumerProps.getProperty("bootstrap.servers")) .put("group.id", "group.id") .put("enable.auto.commit", "false") .put("session.timeout.ms", "30000") .put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") .put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") .put("max.poll.records", 1000) .build(); } /** * 獲取消費者參數 * * @param proPath */ private void getConsumerProps(String proPath) { InputStream inStream = null; try { if (StringUtils.isNotEmpty(proPath)) { inStream = new FileInputStream(proPath); } else { inStream = this.getClass().getClassLoader().getResourceAsStream("consumer.properties"); } consumerProps.load(inStream); } catch (IOException e) { logger.error("讀取consumer配置文件失敗:" + e.getMessage(), e); } finally { if (null != inStream) { try { inStream.close(); } catch (IOException e) { logger.error("讀取consumer配置文件失敗:" + e.getMessage(), e); } } } } }
2. 消費者任務 MsgReceiver
/** * 負責調用 RecordProcessor進行數據處理 * Created by zhangjianhua on 2018/10/12. */ public class MsgReceiver implements Runnable { private static final Logger logger = LoggerFactory.getLogger(MsgReceiver.class); private BlockingQueue<Map<TopicPartition, OffsetAndMetadata>> commitQueue = new LinkedBlockingQueue<>(); private ConcurrentHashMap<TopicPartition, Thread> recordProcessorThreads; private ConcurrentHashMap<TopicPartition, RecordProcessor> recordProcessorTasks; private String alarmTopic; private Map<String, Object> consumerConfig; public MsgReceiver(Map<String, Object> consumerConfig, String alarmTopic, ConcurrentHashMap<TopicPartition, RecordProcessor> recordProcessorTasks, ConcurrentHashMap<TopicPartition, Thread> recordProcessorThreads) { this.consumerConfig = consumerConfig; this.alarmTopic = alarmTopic; this.recordProcessorTasks = recordProcessorTasks; this.recordProcessorThreads = recordProcessorThreads; } @Override public void run() { //kafka Consumer是非線程安全的,所以需要每個線程建立一個consumer KafkaConsumer kafkaConsumer = new KafkaConsumer(consumerConfig); kafkaConsumer.subscribe(Arrays.asList(alarmTopic)); try{ while (!Thread.currentThread().isInterrupted()) { try { //看commitQueue裏面是非有需要提交的offest, 這樣查看好頻繁啊!!! //查看該消費者是否有需要提交的偏移信息, 使用非阻塞讀取 Map<TopicPartition, OffsetAndMetadata> offestToCommit = commitQueue.poll(); if (offestToCommit != null) { logger.info(Thread.currentThread().getName() + "commit offset: " + offestToCommit); kafkaConsumer.commitAsync(); } //最多輪詢1000ms ConsumerRecords<String, String> records = kafkaConsumer.poll(1000); if (records.count() > 0) { logger.info("poll records size: " + records.count()); } for (ConsumerRecord record : records) { String topic = record.topic(); int partition = record.partition(); TopicPartition topicPartition = new TopicPartition(topic, partition); RecordProcessor processTask = recordProcessorTasks.get(topicPartition); //每條消息都去檢查 //如果當前分區還沒有開始消費, 則就沒有消費任務在map中 if (processTask == null) { //生成新的處理任務和線程, 然後將其放入對應的map中進行保存 processTask = new RecordProcessor(commitQueue); recordProcessorTasks.put(topicPartition, processTask); Thread processTaskThread = new Thread(processTask); processTaskThread.setName("Thread-for " + topicPartition.toString()); logger.info("start processor Thread: " + processTaskThread.getName()); processTaskThread.start(); recordProcessorThreads.put(topicPartition, processTaskThread); } //有 processor 可以處理該分區的 record了 processTask.addRecordToQueue(record); } } catch (Exception e) { e.printStackTrace(); logger.warn("MsgReceiver exception " + e + " ignore it"); } } } finally { kafkaConsumer.close(); } } }
3. 消息處理任務 RecordProcessor
public class RecordProcessor implements Runnable{
private static Logger logger = LoggerFactory.getLogger(RecordProcessor.class);
//保存MsgReceiver線程發送過來的消息
private BlockingQueue<ConsumerRecord<String, String>> queue = new LinkedBlockingQueue<>();
//用於向consumer線程提交消費偏移的隊列
private BlockingQueue<Map<TopicPartition, OffsetAndMetadata>> commitQueue;
//上一次提交時間
private LocalDateTime lastTime = LocalDateTime.now();
//消費了20條數據, 就進行一次提交
private long commitLength = 20L;
//距離上一次提交多久, 就提交一次
private Duration commitTime = Duration.standardSeconds(2);
//當前該線程消費的數據條數
private int completeTask = 0;
//保存上一條消費的數據
private ConsumerRecord<String, String> lastUncommittedRecord;
public RecordProcessor(BlockingQueue<Map<TopicPartition, OffsetAndMetadata>> commitQueue) {
this.commitQueue = commitQueue;
}
@Override
public void run() {
while(!Thread.interrupted()){
ConsumerRecord<String, String> record = null;
try {
record = queue.poll(100, TimeUnit.MICROSECONDS);
if (record != null) {
process(record);
//完成任務數加1
this.completeTask++;
//保存上一條處理記錄
lastUncommittedRecord = record;
}
//提交偏移給queue中
commitTOQueue();
} catch (InterruptedException e) {
//線程被interrupt,直接退出
logger.info(Thread.currentThread() + "is interrupted");
}
}
}
//將當前的消費偏移量放到queue中, 由MsgReceiver進行提交
private void commitTOQueue() {
if(lastUncommittedRecord == null){
return;
}
//如果消費了設定的條數, 比如又消費了commitLength消息
boolean arrivedCommitLength = this.completeTask % commitLength == 0;
//獲取當前時間, 看是否已經到了需要提交的時間
LocalDateTime currentTime = LocalDateTime.now();
boolean arrivedTime = currentTime.isAfter(lastTime.plus(commitTime));
if(arrivedCommitLength || arrivedTime){
lastTime = currentTime;
long offset = lastUncommittedRecord.offset();
int partition = lastUncommittedRecord.partition();
String topic = lastUncommittedRecord.topic();
TopicPartition topicPartition = new TopicPartition(topic, partition);
logger.info("partition: " + topicPartition + " submit offset: " + (offset + 1L) + " to consumer task");
Map<TopicPartition, OffsetAndMetadata> map = Collections.singletonMap(topicPartition, new OffsetAndMetadata(offset + 1L));
commitQueue.add(map);
//置空
lastUncommittedRecord = null;
}
}
//consumer線程向處理線程的隊列中添加record
public void addRecordToQueue(ConsumerRecord<String, String> record) {
try {
queue.put(record);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void process(ConsumerRecord<String, String> record) {
//具體業務邏輯
//System.out.println(record);
}
}
改進
- 對處理程序RecordProcessor進行抽象,抽象出BasePropessor父類。以後業務需求需要不同的處理程序RecordProcessor就可以靈活改變了。
- 反射來構建RecordProcessor??在配置文件配置具體要new的RecordProcessor類路徑,然後在創建MsgReceiver的時候傳遞進去。
kafka系列 -- 多線程消費者實現