1. 程式人生 > 實用技巧 >穀粒商城訊息佇列(三十九)

穀粒商城訊息佇列(三十九)

248-260 訊息佇列

程式碼提交到:https://gitee.com/dalianpai/gulimall

其實只要看過尚矽谷的springboot視訊,就會發現和裡面講解的都是一模一樣。

也簡單學過別的機構的rabbitmq視訊:https://www.cnblogs.com/dalianpai/category/1795311.html

測試程式碼如下:

@Slf4j
@SpringBootTest
@RunWith(SpringRunner.class)
class GulimallOrderApplicationTests {

    @Autowired
    AmqpAdmin amqpAdmin;

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    public void sendMessageTest(){
        OrderReturnApplyEntity orderReturnApplyEntity = new OrderReturnApplyEntity();
        orderReturnApplyEntity.setId(1L);
        orderReturnApplyEntity.setCreateTime(new Date());
        orderReturnApplyEntity.setSkuName("wgr");
        rabbitTemplate.convertAndSend("hello-java-exchange","hello-java",orderReturnApplyEntity);
    }


    @Test
    public void createExchange(){
        DirectExchange directExchange = new DirectExchange("hello-java-exchange", true, false);
        amqpAdmin.declareExchange(directExchange);
        log.info("交換機建立成功");

    }

    @Test
    public void createQueue(){
        Queue queue = new Queue("hello-java-queue", true, false, false);
        amqpAdmin.declareQueue(queue);
        log.info("佇列建立成功");
    }

    @Test
    public void createBing(){
        Binding binding = new Binding("hello-java-queue", Binding.DestinationType.QUEUE, "hello-java-exchange"
                , "hello-java", null);
        amqpAdmin.declareBinding(binding);
    }

}

接受類:

@RabbitListener(queues={"hello-java-queue"})
@Service("orderItemService")
public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService {

    @Override
    public PageUtils queryPage(Map<String, Object> params) {
        IPage<OrderItemEntity> page = this.page(
                new Query<OrderItemEntity>().getPage(params),
                new QueryWrapper<OrderItemEntity>()
        );

        return new PageUtils(page);
    }

    @RabbitHandler
    public void recieveMessage(Message message,
                               OrderReturnApplyEntity content,
                               Channel channel){
        System.out.println("接受的訊息。。"+content);
        System.out.println(content.getSkuName());
        //channel內按順序自增的
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("deliveryTag==>"+deliveryTag);
        //簽收
        try {
            channel.basicAck(deliveryTag,false);
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

}

配置類:

/**
 * @author WGR
 * @create 2020/7/27 -- 13:15
 */
@Slf4j
@Configuration
public class MyRabbitConfig {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }

    /**
     *定製RabbitTemplate
     * 1.服務收到訊息就回調
     *    1)spring.rabbitmq.publisher-confirms=true
     *    2)設定確認回撥confirmCallback
     * 2.訊息正確抵達佇列進行回撥
     *    1)spring.rabbitmq.publisher-returns=true
     *    2) spring.rabbitmq.template.mandatory=true
     *    設定確認回撥ReturnCallback
     * 3.消費端確認(保證每個消費被正確消費,此時才可以broker刪除這個訊息)
     *    1)預設是自動確認的,只要訊息接收到,客戶端會自動確認,服務端就會移除這個訊息
     *       問題:
     *           我們收到很多訊息,自動回覆伺服器ack,只有一個訊息處理成功,宕機了,就會發生訊息丟失。
     *           消費者手動確認模式,只要我們沒有明確告訴MQ,貨物被簽收,沒有ACK
     *           訊息就一直是unacked狀態,即使Consumer宕機。訊息不會丟失,會重新變成ready
     *   2)如何簽收:
     *      channel.basicAck(deliveryTag,false);簽收獲取
     *      channel.basicNack(deliveryTag,false,true);拒籤
     *
     */

    @PostConstruct   //再配置類物件建立完成以後,執行這個方法
    public void initRabbitTemplate(){
        RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if (ack){
                    System.out.println("傳送成功");
                }else {
                    System.out.println("傳送失敗");
                }
            }
        };
        rabbitTemplate.setConfirmCallback(confirmCallback);

        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println(message);
                System.out.println(replyCode);
                System.out.println(replyText);
            }
        });
    }


}

傳送訊息:

/**
 * @author WGR
 * @create 2020/7/27 -- 14:47
 */
@RestController
public class RabbitController {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @RequestMapping("sendMq")
    public String sendMq(){
        OrderReturnApplyEntity orderReturnApplyEntity = new OrderReturnApplyEntity();
        orderReturnApplyEntity.setId(1L);
        orderReturnApplyEntity.setCreateTime(new Date());
        orderReturnApplyEntity.setSkuName("wgr");
        rabbitTemplate.convertAndSend("hello-java-exchange","hello-java",orderReturnApplyEntity);
        return  "ok";
    }

}

課件的幾個圖