卡夫卡gradle
阿新 • • 發佈:2018-12-31
可參考系列文章地址:https://www.jianshu.com/p/f5eaa32cf1f3
gradle依賴
compile('org.springframework.kafka:spring-kafka:1.1.1.RELEASE')
概念:
主題(topic),分割槽,訊息位置(offset,偏移量),關係:1:n:n
類:KafkaTemplate,ProducerRecord,ConsumerRecord
傳送訊息:原始碼
kafkaTemplate.send("c2", "shenke");
@Override public ListenableFuture<SendResult<K, V>> send(String topic, V data) { ProducerRecord<K, V> producerRecord = new ProducerRecord<>(topic, data); return doSend(producerRecord); }
protected ListenableFuture<SendResult<K, V>> doSend(final ProducerRecord<K, V> producerRecord) { getTheProducer(); if (this.logger.isTraceEnabled()) { this.logger.trace("Sending: " + producerRecord); } final SettableListenableFuture<SendResult<K, V>> future = new SettableListenableFuture<>(); getTheProducer().send(producerRecord, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception == null) { future.set(new SendResult<>(producerRecord, metadata)); if (KafkaTemplate.this.producerListener != null && KafkaTemplate.this.producerListener.isInterestedInSuccess()) { KafkaTemplate.this.producerListener.onSuccess(producerRecord.topic(), producerRecord.partition(), producerRecord.key(), producerRecord.value(), metadata); } } else { future.setException(new KafkaProducerException(producerRecord, "Failed to send", exception)); if (KafkaTemplate.this.producerListener != null) { KafkaTemplate.this.producerListener.onError(producerRecord.topic(), producerRecord.partition(), producerRecord.key(), producerRecord.value(), exception); } } } }); if (this.autoFlush) { flush(); } if (this.logger.isTraceEnabled()) { this.logger.trace("Sent: " + producerRecord); } return future; }
消費訊息
//kafka消費訊息 @KafkaListener(topics ="c2") public void executeIptvTask(ConsumerRecord<?, ?> cr) { ObjectMapper mapper = new ObjectMapper(); //KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs); try { // kafkaTemplate.send("c2", "shenke"); IptvTask iptvTask = mapper.readValue((String) cr.value(), IptvTask.class); log.info("---c2KafkaListener==="+iptvTask); //2,播控回掉 3,iptv回掉 iptvTask.setProcesses(2); //0,待解析,1解析成功 iptvTask.setStatus(0); saveOrUpdateSoapRequestByCorrelateId(iptvTask); handleIptvTaskList(iptvTask); }catch (Exception e) { log.error("---c2KafkaListener Exception===", e); } }