Kafka基礎簡介
kafka是一個分布式的,可分區的,可備份的日誌提交服務,它使用獨特的設計實現了一個消息系統的功能。 由於最近項目升級,需要將spring的事件機制轉變為消息機制,針對後期考慮,選擇了kafka作為消息中間件。
kafka的安裝
這裏為了快速搭建,選擇用docker
docker run -d -p 2181:2181 -p 9092:9092 -v /opt/kafka/server.properties:/opt/kafka_2.11-0.10.1.0/config/server.properties --env ADVERTISED_HOST=‘ip‘ --env ADVERTISED_PORT=9092 spotify/kafka
kafka的基本概念
這裏參照 官網 共有以下幾點
Topic:特指Kafka處理的消息源的不同分類,其實也可以理解為對不同消息源的區分的一個標識; Partition:Topic物理上的分組,一個topic可以設置為多個partition,每個partition都是一個有序的隊列,partition中的每條消息都會被分配一個有序的id(offset); Message:消息,是通信的基本單位,每個producer可以向一個topic(主題)發送一些消息; Producers:消息和數據生產者,向Kafka的一個topic發送消息的過程叫做producers(producer可以選擇向topic哪一個partition發送數據)。 Consumers:消息和數據消費者,接收topics並處理其發布的消息的過程叫做consumer,同一個topic的數據可以被多個consumer接收; Broker:緩存代理,Kafka集群中的一臺或多臺服務器統稱為broker。
這裏有一點是需要註意的
consumer是一個抽象的概念,調用Consumer API
的程序都可以稱作為一個consumer,它從broker端訂閱某個topic的消息。如果只有一個consumer的話,該topic(可能含有多個partition)下所有消息都會被這個consumer接收。但是在分布式的環境中,我們可能會遇到這樣一種情景,對於一個有多個partition的topic,我們希望啟動多個consumer去消費這些partition(如果發送速度較快,一個consumer是無法消費完的),並且要求topic的一條消息只能發給其中一個consumer,不希望這些conusmer出現重復接收一條消息的情況。對於這種情況,我們應該怎麽辦呢?kafka給我們提供了一種機制,可以很好來適應這種情況,那就是consumer group
在調用conusmer API時,一般都會指定一個consumer group,該group訂閱的topic的每一條消息都發送到這個group的某一臺機器上。借用官網一張圖來詳細介紹一下這種情況,假如kafka集群有兩臺broker,集群上有一個topic,它有4個partition,partition 0和1在broker1上,partition 2和3在broker2上,這時有兩個consumer group同時訂閱這個topic,其中一個group有2個consumer,另一個consumer有4個consumer,則它們的訂閱消息情況如下圖所示:
如果group中的consumer數小於topic中的partition數,那麽group中的consumer就會消費多個partition; 如果group中的consumer數等於topic中的partition數,那麽group中的一個consumer就會消費topic中的一個partition; 如果group中的consumer數大於topic中的partition數,那麽group中就會有一部分的consumer處於空閑狀態。
同時,同一個gruopid下多個consumer訂閱同一個topic,只有一個consumer能消費到數據。
下面我們開始集成kafka到系統
增加pom文件
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.1.0</version> </dependency>
配置文件:
kafka.consumer.zookeeper.connect=47.92.168.221:2181 kafka.consumer.servers=47.92.168.221:9092 kafka.consumer.enable.auto.commit=true kafka.consumer.session.timeout=6000 #消費者偏移提交給zookeeper的頻率(以毫秒為單位) kafka.consumer.auto.commit.interval=100 kafka.consumer.auto.offset.reset=latest #kafka.consumer.topic=test kafka.consumer.group.id=test #根據配置的spring.kafka.listener.concurrency來生成多個並發的KafkaMessageListenerContainer實例 kafka.consumer.concurrency=10 kafka.producer.servers=47.92.168.221:9092 #生產者重試次數 kafka.producer.retries=0 #每當多個記錄被發送到同一分區時,生產者將嘗試將記錄一起批量處理為更少的請求。 # 這有助於客戶端和服務器上的性能。此配置控制默認批量大小(以字節為單位)。 kafka.producer.batch.size=4096 #在正常負載的情況下, 要想減少請求的數量. 加上一個認為的延遲: # 不是立即發送消息, 而是延遲等待更多的消息一起批量發送. 類似TCP中的Nagle算法 kafka.producer.linger=100 #producer可以使用的最大內存來緩存等待發送到server端的消息 kafka.producer.buffer.memory=40960
生產者配置類
@Configuration @EnableKafka public class KafkaProducerConfig { @Value("${kafka.producer.servers}") private String servers; @Value("${kafka.producer.retries}") private int retries; @Value("${kafka.producer.batch.size}") private int batchSize; @Value("${kafka.producer.linger}") private int linger; @Value("${kafka.producer.buffer.memory}") private int bufferMemory; public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ProducerConfig.RETRIES_CONFIG, retries); props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize); props.put(ProducerConfig.LINGER_MS_CONFIG, linger); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<String, String>(producerFactory()); } }
消費者配置類
@Configuration @EnableKafka public class KafkaConsumerConfig { @Value("${kafka.consumer.servers}") private String servers; @Value("${kafka.consumer.enable.auto.commit}") private boolean enableAutoCommit; @Value("${kafka.consumer.session.timeout}") private String sessionTimeout; @Value("${kafka.consumer.auto.commit.interval}") private String autoCommitInterval; @Value("${kafka.consumer.group.id}") private String groupId; @Value("${kafka.consumer.auto.offset.reset}") private String autoOffsetReset; @Value("${kafka.consumer.concurrency}") private int concurrency; @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(concurrency); factory.getContainerProperties().setPollTimeout(3000); return factory; } public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } public Map<String, Object> consumerConfigs() { Map<String, Object> propsMap = new HashMap<>(); propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit); propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval); propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout); propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset); return propsMap; } }
生產者
@Component public class KafkaSender { private static final Logger logger = LoggerFactory.getLogger(KafkaSender.class); @Autowired private KafkaTemplate<String, String> kafkaTemplate; /** * 發送消息方法 */ public void send() { Message message = new Message(); message.setId(System.currentTimeMillis()); message.setMsg(UUID.randomUUID().toString()); message.setSendTime(new Date()); logger.info("+++++++++++++++++++++ message = {}", JSON.toJSONString(message)); kafkaTemplate.send("xmz", JSON.toJSONString(message)); }
消費者
@Component public class KafkaReceiver3 { private static final Logger logger = LoggerFactory.getLogger(KafkaReceiver3.class); @KafkaListener(topics = {"xmz"}) public void listen(ConsumerRecord <?, ?>> record) { Optional<?> kafkaMessage = Optional.ofNullable(record.value()); if (kafkaMessage.isPresent()) { longAdder.increment(); Object message = kafkaMessage.get(); int partition = record.partition(); logger.info("----------------- record =" + record); logger.info("------------------ message =" + message); } } }
以上,我們就把kafka集成進來了
Kafka基礎簡介