[SpringCloud-SpringCloudStream] 簡單通過Spring-cloud-stream元件使用kafka
阿新 • • 發佈:2018-12-28
1.消費者
一.pom依賴
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<version>1.2.1.RELEASE</version>
</dependency>
也可以根據springcloud適配版本,省略版本號
二.Kafka消費者配置
#kafka對應的地址 spring.cloud.stream.kafka.binder.brokers = 192.168.xx.xxx:9092 #kafka的zookeeper對應的地址 spring.cloud.stream.kafka.binder.zkNodes = 192.168.xx.xxx:2181 #監聽kafka的topic spring.cloud.stream.bindings.xxxxxx.destination = topic-test #消費者分組 spring.cloud.stream.bindings.xxxxxx.group = test-group #接收原始訊息 spring.cloud.stream.bindings.xxxxxx.consumer.headerMode = raw 其中“xxxxxx”是自定義欄位,需要和第三步中消費者程式碼的String INPUT = "xxxxxx";保持一致;
三.消費者程式碼
public interface MqSinkI { String INPUT = "xxxxxx"; /** * 消費者介面 * * @return org.springframework.messaging.SubscribableChannel 介面物件 */ @Input(MqSinkI.INPUT) SubscribableChannel input(); } @EnableBinding(value = {MqSinkI.class}) public class MqSinkReceiver { @Autowired MqListener mqListener; @StreamListener(MqSinkI.INPUT) public void messageListen(JSONObject jsonParam) { System.out.println("收到資訊:" + jsonParam.toString()); //處理請求的類,對訊息進行處理 mqListener.listen(jsonParam); } } @Component public class MqListener { public void listen(JSONObject jsonParam) { System.out.println("收到:" + jsonParam); } }
四.小結
- 注意配置項裡面的xxx欄位要和程式碼定義的字串常量保持一致
- MqListener 類可以可以省去,處理邏輯直接可以寫在MqSinkReceiver 類的messageListen裡面;
- 配置中的地址和主題都可以配置多個
- spring.cloud.stream.bindings.xxxxxx.consumer.headerMode = raw配置項
可能會影響訊息的接收格式,如果不新增這條配置,接收引擎的訊息可能會有問題,如果其他生產者按照第2點的方式生產訊息,則可以不使用這條配置。
2.生產者
一.pom依賴
與消費者一樣
二.Kafka生產者配置
#kafka對應的地址
spring.cloud.stream.kafka.binder.brokers=192.168.11.199:9092
#kafka的zookeeper對應的地址
spring.cloud.stream.kafka.binder.zkNodes=192.168.11.199:2181
spring.cloud.stream.bindings.oooooooo.destination=topic-test
spring.cloud.stream.bindings.oooooooo.content-type=application/json
三.生產者程式碼
public interface MySource {
String OUTPUT = "oooooooo";
String OUTPUT1 = "myOutputTest1";
@Output(MySource.OUTPUT)
MessageChannel output();
}
@EnableBinding(MySource.class)
public class SendService {
@Autowired
private MySource mySource;
public void sendMessage(String msg) {
try {
mySource.output().send(MessageBuilder.withPayload(msg).build());
} catch (Exception e) {
e.printStackTrace();
}
}
}
@RestController
public class ProducerController {
@Autowired
private SendService service;
@RequestMapping(value = "/kafka")
public void send() {
while (true) {
//傳送訊息到指定topic
JSONObject obj = new JSONObject();
obj.put("time", (new Date()).toString());
System.out.println("生產者傳送:" + obj.toString());
service.sendMessage(obj.toString());
try {
Thread.sleep(5 * 1000);
} catch (InterruptedException e) {
}
}
}
}