1. 程式人生 > >AKKA的JAVA-TCP實現

AKKA的JAVA-TCP實現

依賴

在使用之前,我們要將maven依賴匯入

<dependency>
  <groupId>com.typesafe.akka</groupId>
  <artifactId>akka-actor_2.12</artifactId>
  <version>2.5.17</version>
</dependency>

 接下來,預設你已經掌握了akka中actor的基本建立,及通知操作,下面我們來寫一下客戶端和服務端;

客戶端

官網所提供的程式碼是可以進行參考的,但是我們可以適當地修剪:

public class Client extends AbstractActor {
  /*這是網路地址的封裝類*/
  final InetSocketAddress remote;
  /*這是建立actor所依賴的類*/
  final ActorRef listener;
  
  //通過靜態方法的呼叫來實現客戶端的建立
  public static Props props(InetSocketAddress remote, ActorRef listener) {
      return Props.create(Client.class, remote, listener);
  }

  //構造方法
  public Client(InetSocketAddress remote, ActorRef listener) {
    this.remote = remote;
    this.listener = listener;
 
    //客戶端的actor的建立   
    final ActorRef tcp = Tcp.get(getContext().getSystem()).manager();
    tcp.tell(TcpMessage.connect(remote), getSelf());
  }

  @Override
  public Receive createReceive() {
    return receiveBuilder()
      .match(CommandFailed.class, msg -> {
        listener.tell("failed", getSelf());
        getContext().stop(getSelf());
        
      })
      //接收到連線,進行回覆及建立,呼叫下方的connected操作
      .match(Connected.class, msg -> {
        listener.tell(msg, getSelf());
        getSender().tell(TcpMessage.register(getSelf()), getSelf());
        getContext().become(connected(getSender()));
      })
      .build();
  }
  
 
  //進行接受資料,根據型別進行對應操作
  private Receive connected(final ActorRef connection) {
    return receiveBuilder()
      //傳送給服務端資料的動作
      .match(ByteString.class, msg -> {
          //具體操作,msg為要傳送給服務端的資料
          connection.tell(TcpMessage.write((ByteString) msg), getSelf());
      })
      .match(CommandFailed.class, msg -> {
        // OS kernel socket buffer was full
      })
      //接收服務端資料後的動作
      .match(Received.class, msg -> {
         //具體操作
        listener.tell(msg.data(), getSelf());
      })
      .matchEquals("close", msg -> {
        connection.tell(TcpMessage.close(), getSelf());
      })
      .match(ConnectionClosed.class, msg -> {
        getContext().stop(getSelf());
      })
      .build();
  }
  
}

 裡面我已加以註釋,根據業務需求可更改具體操作,此外補充一些基本akka知識:

 tell為通訊方法,呼叫者為發起資料的人(傳送者),tell的引數actor為接受者;

 getSelf為獲取本身actor,及物件自己

 getSender 為獲取本身所接受的資料的傳送者;白話就是獲取給你發資料的actor物件;

服務端

public class Server extends AbstractActor {
  
  final ActorRef manager;
  
  public Server(ActorRef manager) {
    this.manager = manager;
  }
  
  public static Props props(ActorRef manager) {
    return Props.create(Server.class, manager);
  }

  @Override
  public void preStart() throws Exception {
    final ActorRef tcp = Tcp.get(getContext().getSystem()).manager();
    //老樣子,是用來通知進行建立服務端actor
    tcp.tell(TcpMessage.bind(getSelf(),
        new InetSocketAddress("localhost", 0), 100), getSelf());
  }

  
  @Override
  public Receive createReceive() {
    return receiveBuilder()
      .match(Bound.class, msg -> {
        manager.tell(msg, getSelf());

      })
      .match(CommandFailed.class, msg -> {
        getContext().stop(getSelf());
      
      })
      .match(Connected.class, conn -> {
        //有客戶端連線
        manager.tell(conn, getSelf());
        //這個handler為自己實現的actor,即可指定讓某 
          個actor來處理接收的資料;
        final ActorRef handler = getContext().actorOf(
            Props.create(SimplisticHandler.class));

        //註冊一個handler來進行接收客戶端傳來的資料.
        getSender().tell(TcpMessage.register(handler), getSelf());
      })
      .build();
  }
  
}

注意:服務端和客戶端的區別就在於接收Connected型別的資料代表有客戶端連線,然後通過建立對應的actor來進行處理客戶端的資料; 

只是提供基本的TCP的基本客戶端及服務端通訊,還需要根據實際工作業務需求來進行更改,包括處理客戶端需求的actor的具體實現方式都需要注意一下;

如有問題可留言或者加群: 583055236 ,來聯絡我,一起討論解決!