1. 程式人生 > >RabbitMQ使用多路由,多佇列來破除流控

RabbitMQ使用多路由,多佇列來破除流控

流控機制是我們在使用RabbitMQ最頭疼的問題,一旦併發激增時,消費者消費佇列訊息就像滴水一樣慢。

現在我們下單後,需要給通知中心傳送訊息,讓通知中心通知服務商收取訂單,並確認提供服務。

我們先給Order介面新增一個傳送訊息的方法。

public interface Order {
    public void makeOrder(Order order);
    public OrderSuccessResult getResult(Order order);
    public void postOrder(Order order);
}

實現類實現該方法

@Data
@AllArgsConstructor @NoArgsConstructor @ServiceOrderVersion(value = 1) @RequiredArgsConstructor public class ServiceOrder extends AbstractOrder { private Long id; @NonNull private String code; @NonNull private Store store; @NonNull private ProviderService service
; @NonNull private Car car; @NonNull private Date serviceDate; @NonNull private String contact; @NonNull private String contactTel; private AppUser user; @NonNull private String content; private int status; private Date createDate;
@Override public void makeOrder(Order order) { ServiceOrderDao serviceOrderDao = SpringBootUtil.getBean(ServiceOrderDao.class); IdService idService = SpringBootUtil.getBean(IdService.class); ((ServiceOrder)order).setId(idService.genId()); ((ServiceOrder)order).setCode(getCodeInfo(idService)); AppUser loginAppUser = AppUserUtil.getLoginAppUser(); AppUser user = new AppUser(); user.setId(loginAppUser.getId()); user.setUsername(loginAppUser.getUsername()); ((ServiceOrder)order).setUser(user); ((ServiceOrder)order).setStatus(1); ((ServiceOrder)order).setCreateDate(new Date()); serviceOrderDao.save((ServiceOrder) order); } @Override public OrderSuccessResult getResult(Order order) { ServiceOrderSuccessResultFactory orderSuccessResultFactory = SpringBootUtil.getBean(ServiceOrderSuccessResultFactory.class); this.orderSuccessResult = orderSuccessResultFactory.getOrderSuccessResult(); return this.orderSuccessResult.getResult(order); } @Override public void postOrder(Order order) { MessageSender sender = SpringBootUtil.getBean(MessageSender.class); CompletableFuture.runAsync(() -> sender.send(OwnerCarCenterMq.MQ_EXCHANGE_ORDER, OwnerCarCenterMq.ROUTING_KEY_ORDER, order) ); } private String getCodeInfo(IdService idService) { String flow = String.valueOf(idService.genId()); flow = flow.substring(14,flow.length()); String pre = DateUtils.format(new Date(), DateUtils.pattern9); return pre + flow; } }

其中我們定義了這麼一組佇列名,交換機,和路由

public interface OwnerCarCenterMq {
    /**
     * 佇列名
     */
    String ORDER_QUEUE = "order";
    /**
     * 服務系統exchange名
     */
    String MQ_EXCHANGE_ORDER = "order.topic.exchange";

    /**
     * 服務新增routing key
     */
    String ROUTING_KEY_ORDER = "post.order";
}

為了避免流控,我們定義了10個佇列,並全部繫結到一個交換機上。

@Configuration
public class RabbitmqConfig {

   @Bean
   public List<Queue> orderQueues() {
      List<Queue> queues = new ArrayList<>();
      for (int i = 1;i < 11;i++) {
         Queue queue = new Queue(OwnerCarCenterMq.ORDER_QUEUE + "_" + i);
         queues.add(queue);
      }
      return queues;
   }

   @Bean
   public TopicExchange orderExchange() {
      return new TopicExchange(OwnerCarCenterMq.MQ_EXCHANGE_ORDER);
   }


   @Bean
   public List<Binding> bindingOrders() {
      List<Binding> bindings = new ArrayList<>();
      for (int i = 1;i < 11;i++) {
         Binding binding = BindingBuilder.bind(orderQueues().get(i - 1)).to(orderExchange())
               .with(OwnerCarCenterMq.ROUTING_KEY_ORDER + "_" + i);
         bindings.add(binding);
      }
      return bindings;
   }
}

重新封裝訊息提供者,每次傳送都隨機選取一個路由來進行傳送。

@Slf4j
@Component
public class MessageSender implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(String exchange,String routingKey,Object content) {
        log.info("send content=" + content);
        this.rabbitTemplate.setMandatory(true);
        this.rabbitTemplate.setConfirmCallback(this);
        this.rabbitTemplate.setReturnCallback(this);
        ThreadLocalRandom random = ThreadLocalRandom.current();
        this.rabbitTemplate.convertAndSend(exchange,routingKey + "_" + random.nextInt(1,11),serialize(content));
    }

    /**
     * 確認後回撥:
     * @param correlationData
     * @param ack
     * @param cause
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (!ack) {
            log.info("send ack fail, cause = " + cause);
        } else {
            log.info("send ack success");
        }
    }

    /**
     * 失敗後return回撥:
     *
     * @param message
     * @param replyCode
     * @param replyText
     * @param exchange
     * @param routingKey
     */
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("send fail return-message = " + new String(message.getBody()) + ", replyCode: " + replyCode + ", replyText: " + replyText + ", exchange: " + exchange + ", routingKey: " + routingKey);
    }

    /**
     * 對訊息物件進行二進位制序列化
     * @param o
     * @return
     */
    private byte[] serialize(Object o) {
        Kryo kryo = new Kryo();
        ByteArrayOutputStream stream = new ByteArrayOutputStream();
        Output output = new Output(stream);
        kryo.writeObject(output, o);
        output.close();
        return stream.toByteArray();
    }
}

我們可以看到在ServiceOrder裡,我們是通過非同步來進行傳送到。

Controller如下

@Slf4j
@RestController
public class OrderController {
    private ThreadLocal<OrderFactory> orderFactory = new ThreadLocal<>();
    private ThreadLocal<Order> orderService = new ThreadLocal<>();
    @Autowired
    private OrderBean orderBean;

    @Transactional
    @SuppressWarnings("unchecked")
    @PostMapping("/makeeorder")
    public Result<OrderSuccessResult> makeOrder(@RequestBody String orderStr, @RequestParam("type") String type) {
        log.info(orderStr);
        Order order = setOrderFactory(orderStr,type);
        orderService.get().makeOrder(order);
        orderService.get().postOrder(order);
        return Result.success(orderService.get().getResult(order));
    }

    /**
     * 判斷是哪一種型別的訂單來獲取哪一種型別的具體訂單工廠
     * @param orderStr
     * @return
     */
    private Order setOrderFactory(String orderStr,String type) {
        Class<?> classType = orderBean.getOrderMap().get(type);
        Object order = JSONObject.parseObject(orderStr, classType);
//        if (orderStr.contains("service")) {
//            order = JSON.parseObject(orderStr, ServiceOrder.class);
//        }else if (orderStr.contains("product")) {
//            order = JSON.parseObject(orderStr, ProductOrder.class);
//        }
        Class<?> classFactoryType = orderBean.getOrderFactoryMap().get(type + "Factory");
        this.orderFactory.set((OrderFactory) SpringBootUtil.getBean(classFactoryType));
//        if (order instanceof ServiceOrder) {
//            this.orderFactory.set(SpringBootUtil.getBean(ServiceOrderFactory.class));
//        }else if (order instanceof ProductOrder) {
//            this.orderFactory.set(SpringBootUtil.getBean(ProductOrderFactory.class));
//        }
        orderService.set(orderFactory.get().getOrder());
        return (Order) order;
    }
}

最後是在我們的通知中心模組接收訊息,同時對這10個佇列實行監控

@Slf4j
@Component
@RabbitListener(queues = {OwnerCarCenterMq.ORDER_QUEUE + "_" + 1,
        OwnerCarCenterMq.ORDER_QUEUE + "_" + 2,
        OwnerCarCenterMq.ORDER_QUEUE + "_" + 3,
        OwnerCarCenterMq.ORDER_QUEUE + "_" + 4,
        OwnerCarCenterMq.ORDER_QUEUE + "_" + 5,
        OwnerCarCenterMq.ORDER_QUEUE + "_" + 6,
        OwnerCarCenterMq.ORDER_QUEUE + "_" + 7,
        OwnerCarCenterMq.ORDER_QUEUE + "_" + 8,
        OwnerCarCenterMq.ORDER_QUEUE + "_" + 9,
        OwnerCarCenterMq.ORDER_QUEUE + "_" + 10})
public class ServiceOrderConsummer {
    @Getter
    private Queue<ServiceOrder> serviceOrders = new ConcurrentLinkedDeque<>();
    @RabbitHandler
    public void receiceOrder(byte[] data, Channel channel, Message message) throws IOException {
        try {
            //告訴伺服器收到這條訊息 已經被我消費了 可以在佇列刪掉;否則訊息伺服器以為這條訊息沒處理掉 後續還會在發
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            ServiceOrder order = unSerialize(data);
            this.serviceOrders.add(order);
            log.info(String.valueOf(order));
        } catch (IOException e) {
            e.printStackTrace();
            //丟棄這條訊息
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
            log.info("receiver fail");
        }
    }

    /**
     * 反序列化
     * @param data
     * @return
     */
    private ServiceOrder unSerialize(byte[] data) {
        Input input = null;
        try {
            Kryo kryo = new Kryo();
            input = new Input(new ByteArrayInputStream(data));
            return kryo.readObject(input,ServiceOrder.class);
        }
        finally {
            input.close();