1. 程式人生 > >Java+Kafka訊息佇列

Java+Kafka訊息佇列

本文主要針對,Java端對Kafka訊息佇列的生產和消費。Kafka的安裝部署,請看檢視相關文章。

筆者最近所用的是Spring mvc,監聽檔案路徑,然後將讀取到的檔案內容傳送到訊息佇列中。由另外系統去消費訊息。

當然訊息佇列作為訊息交換機,本系統既有生產訊息也有消費訊息。不做詳述。

生成者程式碼相對簡單很多。

package com.dhc.test.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.log4j.Logger;

import java.util.Properties;

public class ProducerHandler {
    private final KafkaProducer<String, String> producer;
    private static Logger logger = Logger.getLogger(DataInManager.class.getName());

    public ProducerHandler(String topic,String message) {

        Properties props = new Properties();
        props.put("bootstrap.servers”,"127.0.0.1:9092");
        props.put("acks", "all");
        props.put("retries", "0");
        props.put("batch.size", "16384");
        props.put("linger.ms", "1");
        props.put("buffer.memory", "33554432");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"
); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); producer = new KafkaProducer<String, String>(props); //生成訊息 ProducerRecord record = new ProducerRecord(topic,message); //傳送訊息 producer.send(record); logger.info("【kafka】向Kafka的TOPIC【" + topic + "】中傳送訊息"); logger.info("【kafka】訊息內容:" + message); logger.info("【kafka】推送成功"); } }
消費者程式碼
package com.dhc.test.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.log4j.Logger;

import java.util.List;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ConsumerHandler {

    static Logger logger = Logger.getLogger(DataInManager.class.getName());
    private final KafkaConsumer<String, String> consumer;
    private ExecutorService executors;

    public ConsumerHandler(List<String> topics) {

        Properties props = new Properties();
        props.put("bootstrap.servers", "127.0.0.1:9092");
        props.put("group.id", "test");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(topics);
        execute(1);
    }

    public void execute(int workerNum) {
        executors = new ThreadPoolExecutor(workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue(1000), new ThreadPoolExecutor.CallerRunsPolicy());
        Thread t = new Thread(new Runnable(){//啟動一個子執行緒來監聽kafka訊息
            public void run(){
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(200);
                    for (final ConsumerRecord record : records) {
                        logger.info("【Kafka】監聽到kafka的TOPIC【" + record.topic() + "】的訊息");
                        logger.info("【Kafka】訊息內容:" + record.value());
                        executors.submit(new ConsumerWorker(record));
                    }
                }
            }});
        t.start();
    }

    public void shutdown() {
        if (consumer != null) {
            consumer.close();
        }
        if (executors != null) {
            executors.shutdown();
        }
        try {
            if (!executors.awaitTermination(10, TimeUnit.SECONDS)) {
                logger.info("【Kafka】Timeout.... Ignore for this case ");
            }
        } catch (InterruptedException ignored) {
            logger.info("【Kafka】Other thread interrupted this shutdown, ignore for this case.");
            Thread.currentThread().interrupt();
        }
    }
}

package com.dhc.test.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.log4j.Logger;

public class ConsumerWorker implements Runnable {

    private ConsumerRecord<String, String> consumerRecord;

    public ConsumerWorker(ConsumerRecord record) {
        this.consumerRecord = record;
    }

    private static Logger logger = Logger.getLogger(DataInManager.class.getName());

    public void run() {
        // consumer接收訊息後,這裡可以寫針對收到的訊息的業務處理
        System.out.println(consumerRecord.value());
    }
}
main方法啟動
package com.dhc.test;

import com.dhc.test.kafka.ConsumerHandler;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

public class Start {
    public static void main(String[] args) throws Exception {
        // 啟動Kafka consumer監視
        List<String> topics = new ArrayList<String>();
        // 監聽的訊息通道
        topics.add("test");
        new ConsumerHandler(topics);
    }
}

謝謝關注!