基於t-io的MI工具實現
阿新 • • 發佈:2018-03-23
直接 span 簡單 getc AD else order sign ati
原文:https://my.oschina.net/u/2984386/blog/1630300
-
背景介紹
t-io是一款國產開源的網絡編程框架,主要是特點:簡單,易上手,AIP封裝通俗易懂,適合一般企業簡易即時通訊工具開發。宣傳性能也不錯:百萬TCP長連接,不過個人也沒測試過,所以想試一試看看。本文檔主要記錄了簡單群組聊天的實現,同時記錄下學習t-io的過程。其實 http://t-io.org/#/ 中有比較完整的Demo,本文也主要是參考其中。
-
服務端
啟動類:
package com.dooper.server;
import org.tio.server.AioServer;
import org.tio.server.ServerGroupContext;
import org.tio.server.intf.ServerAioListener;
import com.dooper.common.packet.Constant;
import com.dooper.server.handler.MyServerAioHandler;
public class ServerStarter {
public static MyServerAioHandler aioHandler = new MyServerAioHandler();
public static ServerAioListener aioListener = null;
public static ServerGroupContext serverGroupContext = new ServerGroupContext(aioHandler, aioListener);
public static AioServer aioServer = new AioServer(serverGroupContext);
public static String serverIp = null;
public static int serverPort = Constant.PORT;
public static void main(String[] args) throws Exception{
serverGroupContext.setHeartbeatTimeout(Constant.TIMEOUT);
aioServer.start(serverIp, serverPort);
}
}
消息處理
消息處理中有綁定組的步驟,實際不應該在此處,應該是有額外的處理類來處理群組綁定,此處因為懶,直接寫在裏面了。
package com.dooper.server.handler;
import java.nio.ByteBuffer;
import org.tio.core.Aio;
import org.tio.core.ChannelContext;
import org.tio.core.GroupContext;
import org.tio.core.exception.AioDecodeException;
import org.tio.core.intf.Packet;
import org.tio.server.intf.ServerAioHandler;
import com.dooper.common.packet.MyPacket;
/**
* server
*
*
*/
public class MyServerAioHandler implements ServerAioHandler{
@Override
public Packet decode(ByteBuffer buffer, ChannelContext chanelContext) throws AioDecodeException {
int readableLength = buffer.limit() - buffer.position();
if(readableLength < MyPacket.HEADER_LENGHT){
return null;
}
int bodyLength = buffer.getInt();
if(bodyLength<0){
throw new AioDecodeException("bodyLength ["+bodyLength+"] is not rigth,remote"+chanelContext.getClientNode());
}
int neededLength = MyPacket.HEADER_LENGHT+bodyLength;
int isDataEnough = readableLength - neededLength;
if(isDataEnough < 0){
return null;
}else{
MyPacket packet = new MyPacket();
if(bodyLength > 0){
byte[] dst = new byte[bodyLength];
buffer.get(dst);
packet.setBody(dst);
}
return packet;
}
}
@Override
public ByteBuffer encode(Packet packet, GroupContext groupContext, ChannelContext channelContext) {
MyPacket myPacket = (MyPacket)packet;
byte[] body = myPacket.getBody();
int bodyLen = 0;
if(body != null){
bodyLen = body.length;
}
int allLen = MyPacket.HEADER_LENGHT + bodyLen;
ByteBuffer buffer = ByteBuffer.allocate(allLen);
buffer.order(groupContext.getByteOrder());
buffer.putInt(bodyLen);
if(body != null){
buffer.put(body);
}
return buffer;
}
@Override
public void handler(Packet packet, ChannelContext channelContext) throws Exception {
MyPacket myPacket = (MyPacket)packet;
byte[] body = myPacket.getBody();
if(body != null){
String str = new String(body,MyPacket.CHARSET);
System.out.println("客戶端發送的消息:"+str);
Aio.bindGroup(channelContext, "group1");
GroupHandler gh = new GroupHandler();
gh.handler(myPacket, channelContext);
}
return;
}
}
自定義的群組消息處理
package com.dooper.server.handler;
import org.tio.core.Aio;
import org.tio.core.ChannelContext;
import org.tio.core.intf.Packet;
import com.dooper.common.packet.MyPacket;
public class GroupHandler extends MsgHandler{
@Override
public void handler(Packet packet, ChannelContext channelContext) throws Exception {
MyPacket myPacket = (MyPacket)packet;
byte[] body = myPacket.getBody();
if(body!=null){
MyPacket mp = new MyPacket();
System.out.println("服務端收到消息:"+new String(body,MyPacket.CHARSET));
mp.setBody((channelContext.getClientNode()+":"+new String(body,MyPacket.CHARSET)).getBytes(MyPacket.CHARSET));
Aio.sendToGroup(channelContext.getGroupContext(), "group1", mp);
}
}
}
-
客戶端
啟動類
package com.dooper.client;
import java.util.Scanner;
import org.tio.client.AioClient;
import org.tio.client.ClientChannelContext;
import org.tio.client.ClientGroupContext;
import org.tio.client.ReconnConf;
import org.tio.client.intf.ClientAioHandler;
import org.tio.client.intf.ClientAioListener;
import org.tio.core.Aio;
import org.tio.core.Node;
import com.dooper.common.packet.Constant;
import com.dooper.common.packet.MyPacket;
public class MyClientStarter {
public static Node serverNode = new Node(Constant.SERVER,Constant.PORT);
public static ClientAioHandler aioClientHandler = new MyClientAioHandler();
public static ClientAioListener aioListener = null;
private static ReconnConf reconnConf = new ReconnConf(5000L);
private static ClientGroupContext clientGroupContext = new ClientGroupContext(aioClientHandler, aioListener,reconnConf);
public static AioClient aioClient = null;
public static ClientChannelContext clientChannelContext = null;
public static void main(String[] args) throws Exception{
clientGroupContext.setHeartbeatTimeout(Constant.TIMEOUT);
aioClient = new AioClient(clientGroupContext);
clientChannelContext = aioClient.connect(serverNode);
Scanner sc = new Scanner(System.in);
System.out.println("請發送群組消息:");
String line = sc.nextLine(); // 這個就是用戶輸入的數據
while (true) {
if ("exit".equalsIgnoreCase(line)) {
System.out.println("Thanks for using! bye bye.");
break;
} else{
sendGroup(line);
}
line = sc.nextLine(); // 這個就是用戶輸入的數據
}
// send();
sc.close();
}
public static void send() throws Exception{
MyPacket packet = new MyPacket();
packet.setBody("hello world".getBytes(MyPacket.CHARSET));
Aio.send(clientChannelContext, packet);
}
public static void sendGroup(String msg) throws Exception{
Aio.bindGroup(clientChannelContext, "group1");
MyPacket packet = new MyPacket();
packet.setBody(msg.getBytes(MyPacket.CHARSET));
Aio.sendToGroup(clientGroupContext, "group1", packet);
}
}
消息處理類
package com.dooper.client;
import java.nio.ByteBuffer;
import org.tio.client.intf.ClientAioHandler;
import org.tio.core.ChannelContext;
import org.tio.core.GroupContext;
import org.tio.core.exception.AioDecodeException;
import org.tio.core.intf.Packet;
import com.dooper.common.packet.MyPacket;
public class MyClientAioHandler implements ClientAioHandler {
private static MyPacket heartbeatPacket = new MyPacket();
/**
* ????
*/
@Override
public Packet decode(ByteBuffer buffer, ChannelContext channelContext) throws AioDecodeException {
int readableLength = buffer.limit() - buffer.position();
if(readableLength < MyPacket.HEADER_LENGHT){
return null;
}
int bodyLength = buffer.getInt();
if(bodyLength < 0){
throw new AioDecodeException("bodyLength ["+bodyLength+"] is not right,remote:"+channelContext.getClientNode());
}
int neededLength = MyPacket.HEADER_LENGHT + bodyLength;
int isDataEnough = readableLength - neededLength;
if(isDataEnough < 0){
return null;
}else{
MyPacket myPacket = new MyPacket();
if(bodyLength > 0){
byte[] dst = new byte[bodyLength];
buffer.get(dst);
myPacket.setBody(dst);
}
return myPacket;
}
}
/**
* ????
*/
@Override
public ByteBuffer encode(Packet packet, GroupContext groupContext, ChannelContext channelContext) {
MyPacket myPacket = (MyPacket)packet;
byte[] body = myPacket.getBody();
int bodyLen = 0;
if(body != null){
bodyLen = body.length;
}
int allLen = MyPacket.HEADER_LENGHT +bodyLen;
ByteBuffer buffer = ByteBuffer.allocate(allLen);
buffer.order(groupContext.getByteOrder());
buffer.putInt(bodyLen);
if(body != null){
buffer.put(body);
}
return buffer;
}
@Override
public void handler(Packet packet, ChannelContext channelContext) throws Exception {
MyPacket myPacket = (MyPacket)packet;
byte[] body = myPacket.getBody();
if(body!=null){
String str = new String(body,MyPacket.CHARSET);
System.out.println(str);
}
return ;
}
@Override
public Packet heartbeatPacket() {
return heartbeatPacket;
}
}
基於t-io的MI工具實現