1. 程式人生 > 程式設計 >這事沒完,繼續聊spring cloud stream和kafka的這些小事

這事沒完,繼續聊spring cloud stream和kafka的這些小事

上一篇文章講瞭如何用spring cloud stream整合kafka,並且跑起來一個demo,如果這一次宣傳spring cloud stream的文章,其實到這裡就可以啦。但實際上,工程永遠不是簡單的技術會還是不會的問題,在實際的開發中,我們會遇到很多的細節問題(簡稱坑),這篇文章,會把其中一些很小的點說一下,算是用例項告訴大家,工程的複雜性,往往體現在實際的繁瑣步驟中。

1、group的配置

在傳送訊息的配置裡面,group是不用配置的

關於這一點的證明,可以在原始碼的註釋裡面看到

org.springframework.cloud.stream.config.BindingProperties

2、修改topic的partitions

配置檔案如下

bindings:
        output:
          binder: kafka
          destination: wph-d-2 #訊息發往的目的地,對應topic
          content-type: text/plain #訊息的格式
          producer:  
            partitionCount: 7複製程式碼

partitionCount是用來設定partition的數量,預設是1,如果這個topic已經建了,修改partitionCount無效,會提示錯誤

Caused by: org.springframework.cloud.stream.provisioning.ProvisioningException: The number of expected partitions was: 7,but 5 have been found instead.Consider either increasing the partition count of the topic or enabling `autoAddPartitions`
	at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicAndPartitions(KafkaTopicProvisioner.java:384) ~[spring-cloud-stream-binder-kafka-core-3.0.0.M4.jar:3.0.0.M4]
	at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicIfNecessary(KafkaTopicProvisioner.java:325) ~[spring-cloud-stream-binder-kafka-core-3.0.0.M4.jar:3.0.0.M4]
	at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:302) ~[spring-cloud-stream-binder-kafka-core-3.0.0.M4.jar:3.0.0.M4]
	... 14 common frames omitted複製程式碼

根據錯誤的提示,新增autoAddPartitions

kafka: 
        binder:
          brokers: #Kafka的訊息中介軟體伺服器地址
          - localhost:9092
          autoAddPartitions: true複製程式碼

再次啟動就可以看到partitions數已經改了

autoAddPartitions屬性對應的類是org.springframework.cloud.stream.binder.kafka.properties.KafkaBinderConfigurationProperties

設定partitionCount屬性的類是org.springframework.cloud.stream.binder.ProducerProperties

3、傳送json報錯

用postman傳送sendMessage/complexType報錯

在伺服器端的報錯內容是:

Resolved [org.springframework.web.HttpMediaTypeNotSupportedException: Content type 'text/plain;charset=UTF-8' not supported]複製程式碼

原因是資料傳輸格式傳輸錯誤,要改一下postman傳送資料的格式

然後就能happy的發出去了

4、正確的傳送json並轉換成物件

如果我們需要傳輸json的資訊,那麼在傳送訊息端需要設定content-type為json(其實可以不寫,預設content-type就是json,後面會講)

bindings:
        output:
          binder: kafka
          destination: wph-d-2 #訊息發往的目的地,對應topic
          content-type: application/json #訊息的格式複製程式碼

然後通過producer傳送這個訊息

@RequestMapping(value = "/sendMessage/complexType",method = RequestMethod.POST)
	public String publishMessageComplextType(@RequestBody ChatMessage payload) {
		logger.info(payload.toString());
		producer.getMysource().output().send(MessageBuilder.withPayload(payload).setHeader("type","chatMessage").build());
		return "success";
	}複製程式碼

這裡需要注意的一點是ChatMessage的field name必須要有getter和settr方法,兩者有一就可以了,否則json轉換成物件的時候,field name收不到值。

在訂閱訊息的時候,application.yml裡面content-type可以不用配置,這個值預設就是“application/json”,這一點可以在org.springframework.cloud.stream.config.BindingProperties類的註釋裡面看到

和上面一樣,ChatMessage的field name需要有getter或者setter的方法,二者之一就行。

接收json並轉換成類的方法如下:

@StreamListener(target = Sink.INPUT,condition = "headers['type']=='chatMessage'")
	public void handle(ChatMessage message) {
		logger.info(message.toString());
}複製程式碼

有坑警告:如果我們把傳送訊息端的content-type設定成text/plain,訊息訂閱端的content-type設定成application/json,就會在訊息訂閱端報這個錯誤

Caused by: java.lang.IllegalStateException: argument type mismatch
Endpoint [com.wphmoon.kscsclient.Consumer]複製程式碼

如果顛倒過來的話,傳送訊息端的content-type設定成application/json,訊息訂閱端設定成text/plain,我實際測試過,是可以收到訊息,並且能轉換成ChatMessage物件,沒有問題。

原始碼