簡單通過Spring-cloud-stream元件使用kafka
阿新 • • 發佈:2018-12-18
1.消費者
一.pom依賴
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<version>1.2.1.RELEASE</version>
</dependency>
也可以根據springcloud適配版本,省略版本號
二.Kafka消費者配置
#kafka對應的地址
spring.cloud.stream.kafka.binder.brokers = 192.168.xx.xxx:9092
#kafka的zookeeper對應的地址
spring.cloud.stream.kafka.binder.zkNodes = 192.168.xx.xxx:2181
#監聽kafka的topic
spring.cloud.stream.bindings.xxxxxx.destination = topic-test
#消費者分組
spring.cloud.stream.bindings.xxxxxx.group = test-group
#接收原始訊息
spring.cloud.stream.bindings.xxxxxx.consumer.headerMode = raw
其中“xxxxxx”是自定義欄位,需要和第三步中消費者程式碼的String INPUT = "xxxxxx";保持一致;
三.消費者程式碼
public interface MqSinkI {
String INPUT = "xxxxxx";
/**
* 消費者介面
*
* @return org.springframework.messaging.SubscribableChannel 介面物件
*/
@Input(MqSinkI.INPUT)
SubscribableChannel input();
}
@EnableBinding(value = {MqSinkI.class})
public class MqSinkReceiver {
@Autowired
MqListener mqListener;
@StreamListener(MqSinkI.INPUT)
public void messageListen(JSONObject jsonParam) {
System.out.println("收到資訊:" + jsonParam.toString());
//處理請求的類,對訊息進行處理
mqListener.listen(jsonParam);
}
}
@Component
public class MqListener {
public void listen(JSONObject jsonParam) {
System.out.println("收到:" + jsonParam);
}
}
四.小結
- 注意配置項裡面的xxx欄位要和程式碼定義的字串常量保持一致
- MqListener 類可以可以省去,處理邏輯直接可以寫在MqSinkReceiver 類的messageListen裡面;
- 配置中的地址和主題都可以配置多個
- spring.cloud.stream.bindings.xxxxxx.consumer.headerMode = raw配置項 可能會影響訊息的接收格式,如果不新增這條配置,接收引擎的訊息可能會有問題,如果其他生產者按照第2點的方式生產訊息,則可以不使用這條配置。
2.生產者
一.pom依賴
與消費者一樣
二.Kafka生產者配置
#kafka對應的地址
spring.cloud.stream.kafka.binder.brokers=192.168.11.199:9092
#kafka的zookeeper對應的地址
spring.cloud.stream.kafka.binder.zkNodes=192.168.11.199:2181
spring.cloud.stream.bindings.oooooooo.destination=topic-test
spring.cloud.stream.bindings.oooooooo.content-type=application/json
三.生產者程式碼
public interface MySource {
String OUTPUT = "oooooooo";
String OUTPUT1 = "myOutputTest1";
@Output(MySource.OUTPUT)
MessageChannel output();
}
@EnableBinding(MySource.class)
public class SendService {
@Autowired
private MySource mySource;
public void sendMessage(String msg) {
try {
mySource.output().send(MessageBuilder.withPayload(msg).build());
} catch (Exception e) {
e.printStackTrace();
}
}
}
@RestController
public class ProducerController {
@Autowired
private SendService service;
@RequestMapping(value = "/kafka")
public void send() {
while (true) {
//傳送訊息到指定topic
JSONObject obj = new JSONObject();
obj.put("time", (new Date()).toString());
System.out.println("生產者傳送:" + obj.toString());
service.sendMessage(obj.toString());
try {
Thread.sleep(5 * 1000);
} catch (InterruptedException e) {
}
}
}
}