1. 程式人生 > >Kafka基礎簡介

Kafka基礎簡介

err 日誌 class put 介紹 分享 頻率 actor oid

kafka是一個分布式的,可分區的,可備份的日誌提交服務,它使用獨特的設計實現了一個消息系統的功能。 由於最近項目升級,需要將spring的事件機制轉變為消息機制,針對後期考慮,選擇了kafka作為消息中間件。

kafka的安裝

這裏為了快速搭建,選擇用docker

docker run  -d -p 2181:2181 -p 9092:9092 -v /opt/kafka/server.properties:/opt/kafka_2.11-0.10.1.0/config/server.properties --env ADVERTISED_HOST=‘ip‘ --env ADVERTISED_PORT=9092 spotify/kafka

kafka的基本概念

這裏參照 官網 共有以下幾點

Topic:特指Kafka處理的消息源的不同分類,其實也可以理解為對不同消息源的區分的一個標識;
Partition:Topic物理上的分組,一個topic可以設置為多個partition,每個partition都是一個有序的隊列,partition中的每條消息都會被分配一個有序的id(offset);
Message:消息,是通信的基本單位,每個producer可以向一個topic(主題)發送一些消息;
Producers:消息和數據生產者,向Kafka的一個topic發送消息的過程叫做producers(producer可以選擇向topic哪一個partition發送數據)。
Consumers:消息和數據消費者,接收topics並處理其發布的消息的過程叫做consumer,同一個topic的數據可以被多個consumer接收;
Broker:緩存代理,Kafka集群中的一臺或多臺服務器統稱為broker。

  

這裏有一點是需要註意的

  consumer是一個抽象的概念,調用Consumer API的程序都可以稱作為一個consumer,它從broker端訂閱某個topic的消息。如果只有一個consumer的話,該topic(可能含有多個partition)下所有消息都會被這個consumer接收。但是在分布式的環境中,我們可能會遇到這樣一種情景,對於一個有多個partition的topic,我們希望啟動多個consumer去消費這些partition(如果發送速度較快,一個consumer是無法消費完的),並且要求topic的一條消息只能發給其中一個consumer,不希望這些conusmer出現重復接收一條消息的情況。對於這種情況,我們應該怎麽辦呢?kafka給我們提供了一種機制,可以很好來適應這種情況,那就是consumer group

(當然也可以應用在第一種情況,實際上,如果只有一個consumer時,是不需要指定consumer group,這時kafka會自動給這個consumer生成一個group名)。

  在調用conusmer API時,一般都會指定一個consumer group,該group訂閱的topic的每一條消息都發送到這個group的某一臺機器上。借用官網一張圖來詳細介紹一下這種情況,假如kafka集群有兩臺broker,集群上有一個topic,它有4個partition,partition 0和1在broker1上,partition 2和3在broker2上,這時有兩個consumer group同時訂閱這個topic,其中一個group有2個consumer,另一個consumer有4個consumer,則它們的訂閱消息情況如下圖所示:

技術分享圖片

如果group中的consumer數小於topic中的partition數,那麽group中的consumer就會消費多個partition;
如果group中的consumer數等於topic中的partition數,那麽group中的一個consumer就會消費topic中的一個partition;
如果group中的consumer數大於topic中的partition數,那麽group中就會有一部分的consumer處於空閑狀態。

  同時,同一個gruopid下多個consumer訂閱同一個topic,只有一個consumer能消費到數據。

下面我們開始集成kafka到系統

增加pom文件

<dependency>
        <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
<dependency>
        <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        <version>0.10.1.0</version>
 </dependency>
                

配置文件:

kafka.consumer.zookeeper.connect=47.92.168.221:2181
kafka.consumer.servers=47.92.168.221:9092
kafka.consumer.enable.auto.commit=true
kafka.consumer.session.timeout=6000
#消費者偏移提交給zookeeper的頻率(以毫秒為單位)
kafka.consumer.auto.commit.interval=100
kafka.consumer.auto.offset.reset=latest
#kafka.consumer.topic=test
kafka.consumer.group.id=test
#根據配置的spring.kafka.listener.concurrency來生成多個並發的KafkaMessageListenerContainer實例
kafka.consumer.concurrency=10
        

kafka.producer.servers=47.92.168.221:9092
#生產者重試次數
kafka.producer.retries=0
#每當多個記錄被發送到同一分區時,生產者將嘗試將記錄一起批量處理為更少的請求。
# 這有助於客戶端和服務器上的性能。此配置控制默認批量大小(以字節為單位)。
kafka.producer.batch.size=4096
#在正常負載的情況下, 要想減少請求的數量. 加上一個認為的延遲:
# 不是立即發送消息, 而是延遲等待更多的消息一起批量發送. 類似TCP中的Nagle算法
kafka.producer.linger=100
#producer可以使用的最大內存來緩存等待發送到server端的消息
kafka.producer.buffer.memory=40960

生產者配置類

@Configuration
@EnableKafka
public class KafkaProducerConfig {
    @Value("${kafka.producer.servers}")
    private String servers;
    @Value("${kafka.producer.retries}")
    private int retries;
    @Value("${kafka.producer.batch.size}")
    private int batchSize;
    @Value("${kafka.producer.linger}")
    private int linger;
    @Value("${kafka.producer.buffer.memory}")
    private int bufferMemory;

    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<String, String>(producerFactory());
    }
}

消費者配置類

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    @Value("${kafka.consumer.servers}")
    private String servers;
    @Value("${kafka.consumer.enable.auto.commit}")
    private boolean enableAutoCommit;
    @Value("${kafka.consumer.session.timeout}")
    private String sessionTimeout;
    @Value("${kafka.consumer.auto.commit.interval}")
    private String autoCommitInterval;
    @Value("${kafka.consumer.group.id}")
    private String groupId;
    @Value("${kafka.consumer.auto.offset.reset}")
    private String autoOffsetReset;
    @Value("${kafka.consumer.concurrency}")
    private int concurrency;

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        return propsMap;
    }
}

生產者

@Component
public class KafkaSender {
    private static final Logger logger = LoggerFactory.getLogger(KafkaSender.class);
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    /**
     * 發送消息方法
     */
    public void send() {
        Message message = new Message();
        message.setId(System.currentTimeMillis());
        message.setMsg(UUID.randomUUID().toString());
        message.setSendTime(new Date());
        logger.info("+++++++++++++++++++++  message = {}", JSON.toJSONString(message));
        kafkaTemplate.send("xmz", JSON.toJSONString(message));
    }

消費者

@Component
public class KafkaReceiver3 {
    private static final Logger logger = LoggerFactory.getLogger(KafkaReceiver3.class);

    @KafkaListener(topics = {"xmz"})
    public void listen(ConsumerRecord <?, ?>> record) {
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            longAdder.increment();
            Object message = kafkaMessage.get();
            int partition = record.partition();
            logger.info("----------------- record =" + record);
            logger.info("------------------ message =" + message);
        }
    }

}

以上,我們就把kafka集成進來了

Kafka基礎簡介