jstorm kafkaspout未能實現fail機制重發功能
阿新 • • 發佈:2019-01-09
問題的發現是zookeeper kafka的消費offset很久沒有更新,通過打日誌發現
lastoffset其實是從傳送的儲存了所有傳送的offset的pendingOffsets(原始碼裡只是個treeset,阿里實現了ack,fail的非同步,居然沒用ConcurrentSkipListSet)中獲取的,而該pendingOffsets刪除資料,是ack後才呼叫的;如果某個offset的資料fail了,那麼此時呼叫的是另外一個treeset集合,而且只是簡單的remove,問題是都沒塞資料,就直接remvoe了;如下面兩張圖:
解決方法:
把原先的treeset替換為ConcurrentSkipListSet,
fail方法:
public void fail(KafkaMessageId fail) { failedOffsets.add(fail); pendingOffsets.remove(fail.getOffset()); }
在PartitionConsumer的emmit方法第一行新增如下程式碼行:
if (!failedOffsets.isEmpty()) { fillFailMessage(); }
private void fillFailMessage() { ByteBufferMessageSet msgs; try { if (failedOffsets.isEmpty()) { return; } KafkaMessageId kafkaMessageId = failedOffsets.pollFirst(); msgs = consumer.fetchMessages(kafkaMessageId.getPartition(), kafkaMessageId.getOffset()); List<Long> failedOffset = failedOffsets.stream().mapToLong(KafkaMessageId::getOffset).boxed().collect (Collectors.toList()); for (MessageAndOffset msg : msgs) { if (failedOffset.contains(msg.offset())) { LOG.info("failToSend data is parition :" + partition + " , offset : " + msg.offset() +"failedOffsets size : "+failedOffset.size()); pendingOffsets.add(kafkaMessageId.getOffset()); emittingMessages.add(msg); failedOffsets.removeIf(k->{ return k.getOffset() == msg.offset(); }); } } } catch (Exception e) { e.printStackTrace(); LOG.error(e.getMessage(), e); } }
這裡你可以自己決定是否過濾已經發送的。到此結束