筆記10:springMVC
阿新 • • 發佈:2020-07-20
springboot整合kafka
配置
依賴
yml配置
測試
訊息實體
傳送訊息
接收訊息
傳送訊息
kafka檢視
springboot整合kafka
參考:
參考:
配置
依賴
需要web和kafka
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
注意,springboot版本對kafka版本影響不小,1.x可以使用1.x的kafka(比如1.1.1.RELEASE),2.0.x使用2.1.7.RELEASE,2.1.x使用 2.2.x.RELEASE;
版本不對都會導致專案無法啟動
yml配置
#============== kafka =================== # 指定kafka 代理地址,可以多個 spring: kafka: #指定kafka server的地址,叢集配多個,中間,逗號隔開,或者使用 列表格式 # - 服務1 # - 服務2 .... bootstrap-servers: 192.168.88.128:9092 #=============== provider ======================= producer: retries: 0 # 每次批量傳送訊息的數量 batch-size: 16384 acks: 1 #這個值只能大不能小了,否則會影響sleuth。可以使用的最大記憶體來快取等待發送到server端的訊息 buffer-memory: 1048576 # 這是最小的? retries: 0 # 指定訊息key和訊息體的編解碼方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer properties: # 單個請求的最大大小(以位元組為單位) max.request.size: 2097152 # 從傳送請求到收到ACK確認等待的最長時間(超時時間) request.timeout.ms: 40000 # 這項設定設定了批量處理的更高的延遲邊界:一旦我們獲得某個partition的batch.size,他將會立即傳送而不顧這項設定,然而如果我們獲得訊息位元組數比這項設定要小的多, # 我們需要“linger”特定的時間以獲取更多的訊息。 這個設定預設為0,即沒有延遲。設定linger.ms=5,例如,將會減少請求數目,但是同時會增加5ms的延遲。 linger.ms: 1 # 訊息傳送失敗的情況下,重試傳送的次數 存在訊息傳送是成功的,只是由於網路導致ACK沒收到的重試,會出現訊息被重複傳送的情況 message.send.max.retries: 0 consumer: # 指定預設消費者group id group-id: test-consumer-group auto-offset-reset: earliest enable-auto-commit: true auto-commit-interval: 100 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
測試
訊息實體
@Data
@Accessors(chain = true)
@NoArgsConstructor
public class Message {
/**
* id
*/
private Long id;
/**
* 訊息
*/
private String msg;
/**
* 時間戳
*/
private Date sendTime;
}
傳送訊息
/** 訊息傳送方 * @author jingshiyu * @date 2019/7/31 14:04:21 * @desc */ @RestController @Slf4j public class KafkaSender { @Autowired private KafkaTemplate<String,String> kafkaTemplate; @RequestMapping("/send") public void send(@RequestParam String msg) { Message message=new Message(); message.setId(123L).setMsg(msg).setSendTime(new Date()); kafkaTemplate.send("kafka_one", JSON.toJSONString(message)); } }
就這樣,傳送訊息程式碼就實現了。
這裡關鍵的程式碼為 kafkaTemplate.send()
方法,kafka_one
是 Kafka 裡的 topic ,這個 topic 在 Java 程式中是不需要提前在 Kafka 中設定的,因為它會在傳送的時候自動建立你設定的 topic, JSON.toJSONString(message)
是訊息內容
接收訊息
/**
* 監聽伺服器上的kafka是否有相關的訊息發過來
*/
@Component
@Slf4j
public class KafkaReceiver {
/**
* 定義此消費者接收topics = {"kafka_one"}的訊息,與controller中的topic對應上即可
* @param record 變數代表訊息本身,可以通過ConsumerRecord<?,?>型別的record變數來列印接收的訊息的各種資訊
*/
@KafkaListener(topics = {"kafka_one"})
public void listen(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
log.info("----------------- record =" + record);
log.info("------------------ message =" + message);
}
}
}
客戶端 consumer 接收訊息特別簡單,直接用@KafkaListener
註解即可,並在監聽中設定監聽的 topic ,topics 是一個數組所以是可以繫結多個主題的,上面的程式碼中修改為 @KafkaListener(topics = {"zhisheng","tian"})
就可以同時監聽兩個 topic 的訊息了。需要注意的是:這裡的 topic 需要和訊息傳送類 KafkaSender.java 中設定的 topic 一致。
傳送訊息
啟動專案之後,呼叫介面傳送訊息
http://192.168.0.173:8083/send?msg=測試訊息
將會接收到訊息
record =ConsumerRecord(topic = kafka_one, partition = 0, offset = 0, CreateTime = 1564556254952, serialized key size = -1, serialized value size = 56, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"id":123,"msg":"測試訊息","sendTime":1564556254808})
message ={"id":123,"msg":"測試訊息","sendTime":1564556254808}
kafka檢視
./kafka-topics.sh --list --zookeeper localhost:2181 在kafka上檢視topic列表
就會發現剛才我們程式中的 kafka_one
已經自己建立了
來自為知筆記(Wiz)