1. 程式人生 > >消息隊列和發布訂閱

消息隊列和發布訂閱

mail final temp getter 特色 string 真的 rabbit static

編程語言集成了發布訂閱

很多編程語言框架裏都提供了發布訂閱的組件,或者叫事件處理機制,而spring框架對這個功能也有支持,主要使用EventListener實現訂閱,使用ApplicationEventPublisher使用發布。這種系統集成的我們先叫它“集成組件”

與語言無關的消息隊列

事實上,發布訂閱真的與開發語言沒有什麽關系,所以出現了另一種產品,消息中間件,或者叫消息隊列,它是以發布訂閱模式為理論基礎的,同時很多消息隊列產品又有自己的特色,這種獨立的消息隊列我們為rabbitmq為例子。

共同點

  1. 代碼解耦,發布者與訂閱者可以互不關心
  2. 異步處理,集成組件有的是同步的,需要加@Async
    註解
  3. 消息安全

不同點

  1. rabbitmq實現的是多服務之間的發布與訂閱
  2. 集成組件實現的是一個服務內部的發布與訂閱
  3. rabbitmq是異步的,集成組件可以是異步,也可以是同步
  4. rabbitmq可以有廣播,點對點等模式,而集成組件只有廣播模式

基於以上的介紹,主要幫助大家理解和認識,在什麽時候用什麽類型的工具。

實例

  • 集成組件的發布訂閱

訂閱

@Getter
@Builder(toBuilder = true)
@NoArgsConstructor
@AllArgsConstructor
public class CreateBookEvent {
  private String address;
  private String title;
}

@Component
public class EmailEventListener {
  @EventListener
  @Async
  public void handleEvent(CreateBookEvent event) throws Exception {
    System.out.println("email消息:建立圖書:" + event.getTitle());
  }
}

發布

@Autowired
  private ApplicationEventPublisher applicationEventPublisher;
  public void publish(){
      applicationEventPublisher.publishEvent(CreateBookEvent.builder().address("system").title("新建圖書").build());
}
  • rabbitmq的發布訂閱

訂閱

@Slf4j
@Component
public class DistributorSubscriber {
  public static final String WORK_QUEUE = "fx.activity.total";
  public static final String EXCHANGE = "fx.exchange";
  @Autowired
  DistributorActivityTotalRepository distributorActivityTotalRepository;
  @Autowired
  ObjectMapper objectMapper;

  @Bean
  public TopicExchange phoneTotalExchange() {
    return new TopicExchange(EXCHANGE);
  }

  @Bean
  public Queue phoneTotalQueue() {
    return new Queue(WORK_QUEUE);
  }
  
  @Bean
  public Binding bindSignQueue() {
    return BindingBuilder.bind(phoneTotalQueue()).to(phoneTotalExchange()).with(WORK_QUEUE);
  }
   @RabbitListener(queues = WORK_QUEUE)
  public void phoneTotalQueueListener(String data) {
    try {
      logger.debug("fx.activity.total:{}", data);
      DistributorActivityTotal entity =
          objectMapper.readValue(data, DistributorActivityTotal.class);
      distributorActivityTotalRepository.incUpdate(entity);
    } catch (Exception ex) {
      logger.error("fx.activity.total.error", ex);
    }
  }

發布

  @Autowired
  private RabbitTemplate rabbitTemplate;
  
   public void modifySalesperson(SalesPersonDTO salesPersonDTO) {
    try {
      rabbitTemplate.convertAndSend(
          "EXCHANGE",
          "MQName",
          objectMapper.writeValueAsString(salesPersonDTO)
      );
      logger.debug("Enter {},message:{}", "modifySalesperson", salesPersonDTO.toString());

    } catch (Exception ex) {
      logger.error("MQ.modifySalesperson.error", ex);
    }
  }

消息隊列和發布訂閱