AKKA的JAVA-TCP實現
阿新 • • 發佈:2018-12-14
依賴
在使用之前,我們要將maven依賴匯入
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.12</artifactId>
<version>2.5.17</version>
</dependency>
接下來,預設你已經掌握了akka中actor的基本建立,及通知操作,下面我們來寫一下客戶端和服務端;
客戶端
官網所提供的程式碼是可以進行參考的,但是我們可以適當地修剪:
public class Client extends AbstractActor { /*這是網路地址的封裝類*/ final InetSocketAddress remote; /*這是建立actor所依賴的類*/ final ActorRef listener; //通過靜態方法的呼叫來實現客戶端的建立 public static Props props(InetSocketAddress remote, ActorRef listener) { return Props.create(Client.class, remote, listener); } //構造方法 public Client(InetSocketAddress remote, ActorRef listener) { this.remote = remote; this.listener = listener; //客戶端的actor的建立 final ActorRef tcp = Tcp.get(getContext().getSystem()).manager(); tcp.tell(TcpMessage.connect(remote), getSelf()); } @Override public Receive createReceive() { return receiveBuilder() .match(CommandFailed.class, msg -> { listener.tell("failed", getSelf()); getContext().stop(getSelf()); }) //接收到連線,進行回覆及建立,呼叫下方的connected操作 .match(Connected.class, msg -> { listener.tell(msg, getSelf()); getSender().tell(TcpMessage.register(getSelf()), getSelf()); getContext().become(connected(getSender())); }) .build(); } //進行接受資料,根據型別進行對應操作 private Receive connected(final ActorRef connection) { return receiveBuilder() //傳送給服務端資料的動作 .match(ByteString.class, msg -> { //具體操作,msg為要傳送給服務端的資料 connection.tell(TcpMessage.write((ByteString) msg), getSelf()); }) .match(CommandFailed.class, msg -> { // OS kernel socket buffer was full }) //接收服務端資料後的動作 .match(Received.class, msg -> { //具體操作 listener.tell(msg.data(), getSelf()); }) .matchEquals("close", msg -> { connection.tell(TcpMessage.close(), getSelf()); }) .match(ConnectionClosed.class, msg -> { getContext().stop(getSelf()); }) .build(); } }
裡面我已加以註釋,根據業務需求可更改具體操作,此外補充一些基本akka知識:
tell為通訊方法,呼叫者為發起資料的人(傳送者),tell的引數actor為接受者;
getSelf為獲取本身actor,及物件自己
getSender 為獲取本身所接受的資料的傳送者;白話就是獲取給你發資料的actor物件;
服務端
public class Server extends AbstractActor { final ActorRef manager; public Server(ActorRef manager) { this.manager = manager; } public static Props props(ActorRef manager) { return Props.create(Server.class, manager); } @Override public void preStart() throws Exception { final ActorRef tcp = Tcp.get(getContext().getSystem()).manager(); //老樣子,是用來通知進行建立服務端actor tcp.tell(TcpMessage.bind(getSelf(), new InetSocketAddress("localhost", 0), 100), getSelf()); } @Override public Receive createReceive() { return receiveBuilder() .match(Bound.class, msg -> { manager.tell(msg, getSelf()); }) .match(CommandFailed.class, msg -> { getContext().stop(getSelf()); }) .match(Connected.class, conn -> { //有客戶端連線 manager.tell(conn, getSelf()); //這個handler為自己實現的actor,即可指定讓某 個actor來處理接收的資料; final ActorRef handler = getContext().actorOf( Props.create(SimplisticHandler.class)); //註冊一個handler來進行接收客戶端傳來的資料. getSender().tell(TcpMessage.register(handler), getSelf()); }) .build(); } }
注意:服務端和客戶端的區別就在於接收Connected型別的資料代表有客戶端連線,然後通過建立對應的actor來進行處理客戶端的資料;
只是提供基本的TCP的基本客戶端及服務端通訊,還需要根據實際工作業務需求來進行更改,包括處理客戶端需求的actor的具體實現方式都需要注意一下;
如有問題可留言或者加群: 583055236 ,來聯絡我,一起討論解決!