1. 程式人生 > >基於t-io的MI工具實現

基於t-io的MI工具實現

直接 span 簡單 getc AD else order sign ati

原文:https://my.oschina.net/u/2984386/blog/1630300

  • 背景介紹

t-io是一款國產開源的網絡編程框架,主要是特點:簡單,易上手,AIP封裝通俗易懂,適合一般企業簡易即時通訊工具開發。宣傳性能也不錯:百萬TCP長連接,不過個人也沒測試過,所以想試一試看看。本文檔主要記錄了簡單群組聊天的實現,同時記錄下學習t-io的過程。其實 http://t-io.org/#/ 中有比較完整的Demo,本文也主要是參考其中。

  • 服務端

啟動類:

package com.dooper.server;

import org.tio.server.AioServer;
import org.tio.server.ServerGroupContext;
import org.tio.server.intf.ServerAioListener;

import com.dooper.common.packet.Constant;
import com.dooper.server.handler.MyServerAioHandler;

public class ServerStarter {
	
	public static MyServerAioHandler aioHandler = new MyServerAioHandler();
	
	
	public static ServerAioListener aioListener = null;
	
	
	public static ServerGroupContext serverGroupContext = new ServerGroupContext(aioHandler, aioListener);
	
	
	public static AioServer aioServer = new AioServer(serverGroupContext);
	
	
	public static String serverIp = null;
	
	
	public static int serverPort = Constant.PORT;
	
	
	public static void main(String[] args) throws Exception{
		serverGroupContext.setHeartbeatTimeout(Constant.TIMEOUT);
		aioServer.start(serverIp, serverPort);
	}
	
	
}

消息處理

消息處理中有綁定組的步驟,實際不應該在此處,應該是有額外的處理類來處理群組綁定,此處因為懶,直接寫在裏面了。

package com.dooper.server.handler;

import java.nio.ByteBuffer;

import org.tio.core.Aio;
import org.tio.core.ChannelContext;
import org.tio.core.GroupContext;
import org.tio.core.exception.AioDecodeException;
import org.tio.core.intf.Packet;
import org.tio.server.intf.ServerAioHandler;

import com.dooper.common.packet.MyPacket;


/**
 * server
 * 
 *
 */
public class MyServerAioHandler implements ServerAioHandler{
	
	
	@Override
	public Packet decode(ByteBuffer buffer, ChannelContext chanelContext) throws AioDecodeException {
		int readableLength = buffer.limit() - buffer.position();
		if(readableLength < MyPacket.HEADER_LENGHT){
			return null;
		}
		
		int bodyLength = buffer.getInt();
		if(bodyLength<0){
			throw new AioDecodeException("bodyLength ["+bodyLength+"] is not rigth,remote"+chanelContext.getClientNode());
		}
		int neededLength = MyPacket.HEADER_LENGHT+bodyLength;
		int isDataEnough = readableLength - neededLength;
		if(isDataEnough < 0){
			return null;
		}else{
			MyPacket packet = new MyPacket();
			if(bodyLength > 0){
				byte[] dst = new byte[bodyLength];
				buffer.get(dst);
				packet.setBody(dst);
			}
			return packet;
		}
	}
	
	
	@Override
	public ByteBuffer encode(Packet packet, GroupContext groupContext, ChannelContext channelContext) {
		MyPacket myPacket = (MyPacket)packet;
		byte[] body = myPacket.getBody();
		int bodyLen = 0;
		if(body != null){
			bodyLen = body.length;
		}
		
		int allLen = MyPacket.HEADER_LENGHT + bodyLen;
		ByteBuffer buffer = ByteBuffer.allocate(allLen);
		buffer.order(groupContext.getByteOrder());
		buffer.putInt(bodyLen);
		
		if(body != null){
			buffer.put(body);
		}
		
		return buffer;		
	}

	@Override
	public void handler(Packet packet, ChannelContext channelContext) throws Exception {
		MyPacket myPacket = (MyPacket)packet;
		byte[] body = myPacket.getBody();
		if(body != null){
			String str = new String(body,MyPacket.CHARSET);
			System.out.println("客戶端發送的消息:"+str);
			Aio.bindGroup(channelContext, "group1");
			GroupHandler gh = new GroupHandler();
			gh.handler(myPacket, channelContext);
			
		}
		return;
		
	}
	
	
	

}

自定義的群組消息處理

package com.dooper.server.handler;

import org.tio.core.Aio;
import org.tio.core.ChannelContext;
import org.tio.core.intf.Packet;

import com.dooper.common.packet.MyPacket;

public class GroupHandler extends MsgHandler{

	@Override
	public void handler(Packet packet, ChannelContext channelContext) throws Exception {
		MyPacket myPacket = (MyPacket)packet;
		byte[] body = myPacket.getBody();
		if(body!=null){
			MyPacket mp = new MyPacket();
			System.out.println("服務端收到消息:"+new String(body,MyPacket.CHARSET));
			mp.setBody((channelContext.getClientNode()+":"+new String(body,MyPacket.CHARSET)).getBytes(MyPacket.CHARSET));
			Aio.sendToGroup(channelContext.getGroupContext(), "group1", mp);
		}
	}
	

}
  • 客戶端

啟動類

package com.dooper.client;

import java.util.Scanner;

import org.tio.client.AioClient;
import org.tio.client.ClientChannelContext;
import org.tio.client.ClientGroupContext;
import org.tio.client.ReconnConf;
import org.tio.client.intf.ClientAioHandler;
import org.tio.client.intf.ClientAioListener;
import org.tio.core.Aio;
import org.tio.core.Node;

import com.dooper.common.packet.Constant;
import com.dooper.common.packet.MyPacket;

public class MyClientStarter {
	
	public static Node serverNode = new Node(Constant.SERVER,Constant.PORT);
	
	public static ClientAioHandler aioClientHandler = new MyClientAioHandler(); 
	
	public static ClientAioListener aioListener = null;
	
	private static ReconnConf reconnConf = new ReconnConf(5000L);
	
	private static ClientGroupContext clientGroupContext = new ClientGroupContext(aioClientHandler, aioListener,reconnConf);
	
	public static AioClient aioClient = null;
	
	public static ClientChannelContext clientChannelContext = null;
	
	
	public static void main(String[] args) throws Exception{
		clientGroupContext.setHeartbeatTimeout(Constant.TIMEOUT);
		
		aioClient = new AioClient(clientGroupContext);
		
		clientChannelContext = aioClient.connect(serverNode);
		
		Scanner sc = new Scanner(System.in);
		System.out.println("請發送群組消息:");
		String line = sc.nextLine(); // 這個就是用戶輸入的數據
		while (true) {
			if ("exit".equalsIgnoreCase(line)) {
				System.out.println("Thanks for using! bye bye.");
				break;
			} else{
				sendGroup(line);
			}
			line = sc.nextLine(); // 這個就是用戶輸入的數據
		}
//		send();
		sc.close();
	}
	
	
	public static void send() throws Exception{
		MyPacket packet = new MyPacket();
		
		packet.setBody("hello world".getBytes(MyPacket.CHARSET));
		
		Aio.send(clientChannelContext, packet);
		
	}
	
	public static void sendGroup(String msg) throws Exception{
		Aio.bindGroup(clientChannelContext, "group1");
		MyPacket packet = new MyPacket();
		packet.setBody(msg.getBytes(MyPacket.CHARSET));
		Aio.sendToGroup(clientGroupContext, "group1", packet);
	}
	

}

消息處理類

package com.dooper.client;

import java.nio.ByteBuffer;

import org.tio.client.intf.ClientAioHandler;
import org.tio.core.ChannelContext;
import org.tio.core.GroupContext;
import org.tio.core.exception.AioDecodeException;
import org.tio.core.intf.Packet;

import com.dooper.common.packet.MyPacket;

public class MyClientAioHandler implements ClientAioHandler {
	
	private static MyPacket heartbeatPacket = new MyPacket();
	
	/**
	 * ????
	 */
	@Override
	public Packet decode(ByteBuffer buffer, ChannelContext channelContext) throws AioDecodeException {
		int readableLength = buffer.limit() - buffer.position();
		
		if(readableLength < MyPacket.HEADER_LENGHT){
			return null;
		}
		
		int bodyLength = buffer.getInt();
		
		if(bodyLength < 0){
			throw new AioDecodeException("bodyLength ["+bodyLength+"] is not right,remote:"+channelContext.getClientNode());
		}
		
		int neededLength = MyPacket.HEADER_LENGHT + bodyLength;
		
		int isDataEnough = readableLength  - neededLength;
		
		if(isDataEnough < 0){
			return null;
		}else{
			MyPacket myPacket = new MyPacket();
			
			if(bodyLength > 0){
				byte[] dst = new byte[bodyLength];
				buffer.get(dst);
				myPacket.setBody(dst);
			}
			return myPacket;
		}
		
	}

	/**
	 * ????
	 */
	@Override
	public ByteBuffer encode(Packet packet, GroupContext groupContext, ChannelContext channelContext) {
		MyPacket myPacket = (MyPacket)packet;
		
		byte[] body = myPacket.getBody();
		
		int bodyLen = 0;
		
		if(body != null){
			bodyLen = body.length;
		}
		
		int allLen = MyPacket.HEADER_LENGHT +bodyLen;
		
		ByteBuffer buffer = ByteBuffer.allocate(allLen);
		
		buffer.order(groupContext.getByteOrder());
		
		buffer.putInt(bodyLen);
		
		if(body != null){
			buffer.put(body);
		}
		
		return buffer;
	}

	@Override
	public void handler(Packet packet, ChannelContext channelContext) throws Exception {
		MyPacket myPacket = (MyPacket)packet;
		byte[] body = myPacket.getBody();
		if(body!=null){
			String str = new String(body,MyPacket.CHARSET);
			System.out.println(str);
			
		}
		return ;
	}

	@Override
	public Packet heartbeatPacket() {
		return heartbeatPacket;
	}
	
	
	
	

}

基於t-io的MI工具實現