1. 程式人生 > >簡單通過Spring-cloud-stream元件使用kafka

簡單通過Spring-cloud-stream元件使用kafka

1.消費者

一.pom依賴

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
            <version>1.2.1.RELEASE</version>
        </dependency>
        也可以根據springcloud適配版本,省略版本號

二.Kafka消費者配置

#kafka對應的地址
spring.cloud.stream.kafka.binder.brokers = 192.168.xx.xxx:9092
#kafka的zookeeper對應的地址
spring.cloud.stream.kafka.binder.zkNodes = 192.168.xx.xxx:2181
#監聽kafka的topic
spring.cloud.stream.bindings.xxxxxx.destination = topic-test
#消費者分組
spring.cloud.stream.bindings.xxxxxx.group
= test-group #接收原始訊息 spring.cloud.stream.bindings.xxxxxx.consumer.headerMode = raw 其中“xxxxxx”是自定義欄位,需要和第三步中消費者程式碼的String INPUT = "xxxxxx";保持一致;

三.消費者程式碼

public interface MqSinkI {

    String INPUT = "xxxxxx";

    /**
     * 消費者介面
     *
     * @return org.springframework.messaging.SubscribableChannel 介面物件
     */
@Input(MqSinkI.INPUT) SubscribableChannel input(); } @EnableBinding(value = {MqSinkI.class}) public class MqSinkReceiver { @Autowired MqListener mqListener; @StreamListener(MqSinkI.INPUT) public void messageListen(JSONObject jsonParam) { System.out.println("收到資訊:" + jsonParam.toString()); //處理請求的類,對訊息進行處理 mqListener.listen(jsonParam); } } @Component public class MqListener { public void listen(JSONObject jsonParam) { System.out.println("收到:" + jsonParam); } }

四.小結

  • 注意配置項裡面的xxx欄位要和程式碼定義的字串常量保持一致
  • MqListener 類可以可以省去,處理邏輯直接可以寫在MqSinkReceiver 類的messageListen裡面;
  • 配置中的地址和主題都可以配置多個
  • spring.cloud.stream.bindings.xxxxxx.consumer.headerMode = raw配置項 可能會影響訊息的接收格式,如果不新增這條配置,接收引擎的訊息可能會有問題,如果其他生產者按照第2點的方式生產訊息,則可以不使用這條配置。

2.生產者

一.pom依賴

與消費者一樣

二.Kafka生產者配置

#kafka對應的地址
spring.cloud.stream.kafka.binder.brokers=192.168.11.199:9092
#kafka的zookeeper對應的地址
spring.cloud.stream.kafka.binder.zkNodes=192.168.11.199:2181
spring.cloud.stream.bindings.oooooooo.destination=topic-test
spring.cloud.stream.bindings.oooooooo.content-type=application/json

三.生產者程式碼

public interface MySource {

    String OUTPUT = "oooooooo";

    String OUTPUT1 = "myOutputTest1";


    @Output(MySource.OUTPUT)
    MessageChannel output();
}


@EnableBinding(MySource.class)
public class SendService {

    @Autowired
    private MySource mySource;


    public void sendMessage(String msg) {
        try {
            mySource.output().send(MessageBuilder.withPayload(msg).build());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

@RestController
public class ProducerController {

    @Autowired
    private SendService service;

    @RequestMapping(value = "/kafka")
    public void send() {
        while (true) {
            //傳送訊息到指定topic
            JSONObject obj = new JSONObject();
            obj.put("time", (new Date()).toString());
            System.out.println("生產者傳送:" + obj.toString());
            service.sendMessage(obj.toString());
            try {
                Thread.sleep(5 * 1000);
            } catch (InterruptedException e) {
            }
        }
    }
}

3.原始碼