1. 程式人生 > >kafka consumer不能消費訊息及其處理辦法

kafka consumer不能消費訊息及其處理辦法

我這裡的Kafka Consumer程式碼是拷貝網上的,就是開一個執行緒監聽kafka topic,一有訊息就處理。開始的程式碼是這樣的:

public void kafkaStart() {
        final String topic = hipchatAction.properties.getProperty("kafka.hipchat.topic");
        final int partitionNum = Integer.valueOf(hipchatAction.properties.getProperty("kafka.hipchat.topic.partitionNum"));

        log.debug("Comes to kafkaStart() with topic : " + topic + ", partitionNum : " + partitionNum);

        Map<String, Integer> topicCountMap = new HashMap<>();

        topicCountMap.put(topic, partitionNum);
        Map<String, List<KafkaStream<byte[], byte[]>>> streams = connector.createMessageStreams(topicCountMap);

        List<KafkaStream<byte[], byte[]>> partitions = streams.get(topic);

        ExecutorService executor = Executors.newFixedThreadPool(partitionNum * 2);

        for (KafkaStream<byte[], byte[]> partition : partitions) {
            /**
             * Here also it is an unknown issue, if use anonymous inner class, then the thread seems died, must have
             * a named inner class!
             */
            executor.execute(new MessageRunner(partition));
        }
    }

    class MessageRunner implements Runnable {
        private KafkaStream<byte[], byte[]> partition;

        MessageRunner(KafkaStream<byte[], byte[]> partition) {
            this.partition = partition;
        }

        public void run() {
            ConsumerIterator<byte[], byte[]> it = partition.iterator();
            while (it.hasNext()) {
                MessageAndMetadata<byte[], byte[]> msgMeta = it.next();
                /**
                 * In order to consume Chinese message, here should use charset.
                 */
                String jsonStr = new String(msgMeta.message(), StandardCharsets.UTF_8);
                log.debug("********* Message to be consumed in UTF-8 is :: " + jsonStr);

                KafkaMsgVO msg = new Gson().fromJson(jsonStr, KafkaMsgVO.class);
                hipchatAction.sendMessageToRoom(msg.getRoomName(), msg.getToken(), msg.getMsgText());
            }
        }
    }

方法kafkaStart()會在Spring容器啟動的時候執行。

對於 KafkaMsgVO msg = new Gson().fromJson(jsonStr, KafkaMsgVO.class); 這行,因為我的程式對訊息要求是一個物件名為 KafkaMsgVO 的JSON格式資料,但是測試的時候測試人員就隨便發了一條訊息,沒有進行物件的屬性賦值並組裝成 JSON 資料,所以丟擲 JSONSyntaxException。問題就出在這,因為一旦丟擲異常,這個執行緒就被破壞了,沒辦法進行後續訊息的消費,儘管 kafka consumer 仍然能感知到 topic 裡面有新的訊息到來。

解決辦法:

為了不讓異常破壞 consumer 執行緒,我進行了出錯程式碼的位置轉移,將得到的訊息直接通過 Apache AKKA 進行轉發,然後由 AKKA 的 onReceive() 對訊息進行處理,也就是出錯程式碼移到 onReceive() 裡面,這樣就保證了 consumer 執行緒的健壯性。程式碼如下:

class MessageRunner implements Runnable {
        private KafkaStream<byte[], byte[]> partition;

        MessageRunner(KafkaStream<byte[], byte[]> partition) {
            this.partition = partition;
        }

        public void run() {
            ConsumerIterator<byte[], byte[]> it = partition.iterator();
            while (it.hasNext()) {
                MessageAndMetadata<byte[], byte[]> msgMeta = it.next();
                /**
                 * In order to consume Chinese message, here should use charset.
                 */
                String jsonStr = new String(msgMeta.message(), StandardCharsets.UTF_8);
                log.debug("********* Message to be consumed in UTF-8 is :: " + jsonStr);

                ActorRef sender = akkaSystem.getMsgConductor();
                sender.tell(new AkkaAdaptor(jsonStr, hipchatAction), sender);
            }
        }
    }

一旦訊息被 AKKA 轉發後,對訊息的處理程式碼就放在了AKKA 的 Actor 裡面了:
} else if (message instanceof AkkaAdaptor) {
		AkkaAdaptor akkaAdaptor = (AkkaAdaptor)message;

		String textMessage = akkaAdaptor.getTextMessage();
		KafkaMsgVO msg = null;
		try {
				msg = new Gson().fromJson(textMessage, KafkaMsgVO.class);
		} catch (Exception e) {
			log.debug(textMessage + " is malformed, it may miss some important property (value).");
			return;
		}

		HipchatAction hipchatAction = akkaAdaptor.getHipchatAction();

		log.debug("Kafka message sent by AKKA is :: " + msg.getMsgText());
		hipchatAction.sendMessageToRoom(msg.getRoomName(), msg.getToken(), msg.getMsgText());

	}