1. 程式人生 > 實用技巧 >【事件中心 Azure Event Hub】Event Hub Java SDK的消費端出現不消費某一個分割槽中資料的情況,出現IdleTimerExpired錯誤訊息記錄

【事件中心 Azure Event Hub】Event Hub Java SDK的消費端出現不消費某一個分割槽中資料的情況,出現IdleTimerExpired錯誤訊息記錄

問題情形

使用Java SDK編寫的Event Hub消費端應用,隨機性遇見了某個分割槽沒有消費訊息的情況,在檢查日誌時候,有發現IdelTimeExpired的錯誤記錄。在重啟應用後,連線EventHub正常,並又能正常消費資料。比較懷疑的方面,在又開啟Retry機制的情況下,為什麼分割槽(Partition)連線斷掉後沒有重連呢?

錯誤訊息:

{"time":"2020-09-21 05:11:19.578", "level":"ERROR", "thread":"bounded-71", "appName":"events-service", "traceId":"", "spanId":""
, "url":"", "clientIp":"",
"method":"", "elapse":"", "code":"", "message":"", "class":"c.h.socialhub.eventhub.EventHub",
"line":"EventHub.java:150",
"msg":"Error occurred while processing events The connection was inactive for more than the allowed 240000 milliseconds and is closed by container 'cd8a74181e68151dde4_G28'.,
errorContext[NAMESPACE: shprod-member.servicebus.chinacloudapi.cn,
PATH: xxxx/ConsumerGroups/$default/Partitions/1, REFERENCE_ID: 2_xxxxxxxx LINK_CREDIT: 253]
"}

消費端程式碼:

 1     public EventHub start() {
 2         if (this.state == EventHubState.BLANK) {
 3             log.error("Must config nameSpaceId and topic");
 4             return this;
 5         }
 6         if (StringUtils.isBlank(nameSpaceId) || StringUtils.isBlank(topic)) {
 7             log.error("
The nameSpaceId or topic could not be null."); 8 return this; 9 } 10 Optional<NameSpaceProperty> currentEventHubPropertyOptional = eventHubConfig.findNameSpacePropertyById(nameSpaceId); 11 if (!currentEventHubPropertyOptional.isPresent()) { 12 log.error("The nameSpaceId is error."); 13 return this; 14 } 15 NameSpaceProperty currentEventHubProperty = currentEventHubPropertyOptional.get(); 16 17 log.info("EventHub begin listening {}:{} ...", nameSpaceId, topic); 18 blobContainerAsyncClient = new BlobContainerClientBuilder() 19 .connectionString(eventHubConfig.getStorageAccountConnectionString()) 20 .sasToken(eventHubConfig.getSasToken()) 21 .containerName(currentEventHubProperty.getContainerName()) 22 .buildAsyncClient(); 23 24 eventProcessorClient = new EventProcessorClientBuilder() 25 .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME) 26 .connectionString(currentEventHubProperty.getConnectionString(), this.topic) 27 .retry(retryOptions) 28 .checkpointStore(new BlobCheckpointStore(blobContainerAsyncClient)) 29 .processEvent(eventContext -> { 30 String currentData = ""; 31 if (this.listener != null) { 32 33 try { 34 // 獲取資料: 35 EventData event = eventContext.getEventData(); 36 PartitionContext partitionContext = eventContext.getPartitionContext(); 37 38 EventMessage eventMessage = new EventMessage(); 39 currentData = new String(event.getBody(), Charset.defaultCharset()); 40 eventMessage.setContent(currentData); 41 eventMessage.setPartitionId(partitionContext.getPartitionId()); 42 eventMessage.setSequenceNumber(event.getSequenceNumber()); 43 log.info("Topic: {} - Partition: {} - Sequence: {}", this.topic, partitionContext.getPartitionId(), event.getSequenceNumber()); 44 45 eventContext.updateCheckpoint(); 46 this.listener.listening(eventMessage); 47 48 } catch (Exception e) { 49 String msg = e.getMessage(); 50 if (StringUtils.isBlank(msg)) { 51 msg = e.getStackTrace().toString(); 52 } 53 log.error("Error occurred while do works with events[{}] : {}, data: {} ", this.topic, msg, currentData); 54 } 55 } 56 else { 57 log.info("Error occurred while do works with events: no listener for topic: {}", this.topic); 58 } 59 }) 60 .processError(errorContext -> log.error("Error occurred while processing events " + errorContext.getThrowable().getMessage())) 61 .buildEventProcessorClient(); 62 63 // This will start the processor. It will start processing events from all partitions. 64 eventProcessorClient.start(); 65 this.state = EventHubState.RUNING; 66 return this; 67 }

分析原因

第一步,需要根據日誌來判斷當前分割槽是否在問題時間點閒置了240秒,在此期間沒有資料進入該分割槽中,如日誌中有關於每一天訊息進入Queue的時間(enqueued time),則可以通過日誌分析,如果沒有,這可以在程式碼日誌中新增:(這是為了下一次發生問題時候,可以直接在日誌中分析)

 log.info("Topic: {} - Partition: {} - Sequence: {} - EnqueuedTime: {}", this.topic,  partitionContext.getPartitionId(), event.getSequenceNumber(),event.getEnqueuedTime());

而對於已經發生的問題,根據EventHub資料保留的設定,如果Event等資訊還在保留時間期內,則可以通過SDK的receiveFromPartition方法來指定需要獲取的資料範圍,來檢視其進入Queue的時間。(注:需要建一個不同的consumer group,不要用$Default,免得連不上),示例程式碼:https://azuresdkdocs.blob.core.windows.net/$web/java/azure-messaging-eventhubs/5.2.0/index.html

Consume events from an Event Hub partition

To consume events, create anEventHubConsumerAsyncClientorEventHubConsumerClientfor a specific consumer group. In addition, a consumer needs to specify where in the event stream to begin receiving events.

Consume events with EventHubConsumerAsyncClient

In the snippet below, we create an asynchronous consumer that receives events frompartitionIdand only listens to newest events that get pushed to the partition. Developers can begin receiving events from multiple partitions using the sameEventHubConsumerAsyncClientby callingreceiveFromPartition(String, EventPosition)with another partition id.

EventHubConsumerAsyncClient consumer = new EventHubClientBuilder()
    .connectionString("<< CONNECTION STRING FOR SPECIFIC EVENT HUB INSTANCE >>")
    .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
    .buildAsyncConsumerClient();

// Receive newly added events from partition with id "0". EventPosition specifies the position
// within the Event Hub partition to begin consuming events.
consumer.receiveFromPartition("0", EventPosition.latest()).subscribe(event -> {
    // Process each event as it arrives.
});
// add sleep or System.in.read() to receive events before exiting the process.

Consume events with EventHubConsumerClient

Developers can create a synchronous consumer that returns events in batches using anEventHubConsumerClient. In the snippet below, a consumer is created that starts reading events from the beginning of the partition's event stream.

EventHubConsumerClient consumer = new EventHubClientBuilder()
    .connectionString("<< CONNECTION STRING FOR SPECIFIC EVENT HUB INSTANCE >>")
    .consumerGroup(EventHubClientBuilder.DEFAULT_CONSUMER_GROUP_NAME)
    .buildConsumerClient();

String partitionId = "<< EVENT HUB PARTITION ID >>";

// Get the first 15 events in the stream, or as many events as can be received within 40 seconds.
IterableStream<PartitionEvent> events = consumer.receiveFromPartition(partitionId, 15,
    EventPosition.earliest(), Duration.ofSeconds(40));
for (PartitionEvent event : events) {
    System.out.println("Event: " + event.getData().getBodyAsString());
}

以上。 並沒有發現問題是否是應用端邏輯問題還是是SDK端問題,在借鑑了GitHub上的很多相類似的情況後,大部分傾向於Java SDK問題。需要等待Github中的進一步更新:

AmqpEventHubConsumer.IdleTimerExpired in Java EventHubConsumer SDK:https://github.com/Azure/azure-sdk-for-java/issues/11233