1. 程式人生 > 程式設計 >Vertx基於EventBus傳送接受自定義物件

Vertx基於EventBus傳送接受自定義物件

先看官方文件步驟:

Vertx基於EventBus傳送接受自定義物件

需要一個編解碼器,看原始碼:

Vertx基於EventBus傳送接受自定義物件

可見內建了需要資料型別的實現,所以傳送其他訊息可以傳送,但是如果傳送自定義物件就需要自己實現編解碼邏輯了

一 自定義編解碼器

/**
 * 自定義物件編解碼器,兩個型別可用於訊息轉換,即傳送物件轉換為接受需要的物件
 */
public class CustomizeMessageCodec implements MessageCodec<OrderMessage,OrderMessage> {
  /**
   * 將訊息實體封裝到Buffer用於傳輸
   * 實現方式:使用物件流從物件中獲取Byte陣列然後追加到Buffer
   */
  @Override
  public void encodeToWire(Buffer buffer,OrderMessage orderMessage) {
    final ByteArrayOutputStream b = new ByteArrayOutputStream();
    try (ObjectOutputStream o = new ObjectOutputStream(b)){
      o.writeObject(orderMessage);
      o.close();
      buffer.appendBytes(b.toByteArray());
    } catch (IOException e) { e.printStackTrace(); }
  }
  //從Buffer中獲取訊息物件
  @Override
  public OrderMessage decodeFromWire(int pos,Buffer buffer) {
    final ByteArrayInputStream b = new ByteArrayInputStream(buffer.getBytes());
    OrderMessage msg = null;
    try (ObjectInputStream o = new ObjectInputStream(b)){ msg = (OrderMessage) o.readObject();
    } catch (IOException | ClassNotFoundException e) { e.printStackTrace(); }
    return msg;
  }
  //訊息轉換
  @Override
  public OrderMessage transform(OrderMessage orderMessage) {
    System.out.println("訊息轉換---");//可對接受訊息進行轉換,比如轉換成另一個物件等
    orderMessage.setName("姚振");
    return orderMessage;
  }
  @Override
  public String name() { return "myCodec"; }
  //識別是否是使用者自定義編解碼器,通常為-1
  @Override
  public byte systemCodecID() { return -1; }
  public static MessageCodec create() {
    return new CustomizeMessageCodec();
  }
}

這裡有一個點要注意,nam方法是必須的,且傳送的時候一定要指明name

二 傳送訊息編寫

public class ProducerVerticle extends AbstractVerticle {
  @Override
  public void start() throws Exception {
    EventBus eventBus = vertx.eventBus();
    //釋出訊息(群發)
    eventBus.publish("com.hou","群發祝福!");
    //傳送訊息(單發),只會傳送註冊此地址的一個,採用不嚴格的輪詢演算法選擇
    DeliveryOptions options = new DeliveryOptions();//設定訊息頭等
    options.addHeader("some-header","some-value");
    eventBus.send("com.hou","單發訊息",options,ar->{
      if(ar.succeeded()) System.out.println("收到消費者確認資訊:"+ar.result().body());
    });
    //傳送自定義物件,需要編解碼器
    eventBus.registerCodec(CustomizeMessageCodec.create());//註冊編碼器
    DeliveryOptions options1 = new DeliveryOptions().setCodecName("myCodec");//必須指定名字
    OrderMessage orderMessage = new OrderMessage();
    orderMessage.setName("侯徵");
    eventBus.send("com.hou",orderMessage,options1);
  }
}

三 接受訊息Verticle編寫

public class ConsumerVerticle extends AbstractVerticle {
  @Override
  public void start() throws Exception {
    //每個Vertx例項預設是單例
    EventBus eb = vertx.eventBus();
    //註冊處理器,消費com.hou傳送的訊息
    MessageConsumer<Object> consumer = eb.consumer("com.hou");//訂閱地址
    consumer.handler(message -> {//訊息處理器
      if(message.body() instanceof OrderMessage){
        System.out.println("接受到物件: " + ((OrderMessage) message.body()).getName());
      }
      System.out.println("我是普通消費者: " + message.body());
      message.reply("收到了!"); // 回覆生產者,send才能接受
    }).completionHandler(res -> {//註冊完成後通知事件,適用於叢集中比較慢的情況下
        System.out.println("註冊處理器結果"+res.succeeded());
    });
    //撤銷處理器
    //consumer.unregister();
  }
}

四 註冊部署Verticcle

vertx.deployVerticle(ConsumerVerticle.class.getName());
    TimeUnit.SECONDS.sleep(1);
    vertx.deployVerticle(ProducerVerticle.class.getName());

五 測試

Vertx基於EventBus傳送接受自定義物件

以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支援我們。