nio client和netty server例項
花了一週時間,研究了java裡面的nio和netty,其實nio很好理解,用過c語言的,都應該知道select和epoll,nio和select和epoll非常類似,使用方法和解決的問題也都是一樣的。
至於netty,不得不欽佩java語言的框架技術,雖說這個框架研究起來非常費勁,但是對於上層使用者,使用這個netty框架,會幫我們解決很多效能、穩定性問題。同時,使用框架,也會大大提高開發效率。
這裡,不想講太多關於nio和netty的東西,所有最基本的知識點,都在如下學習資料中。目前我對這個netty框架研究的還不深入,想了半天其實真寫不出啥有水平的文章,待今後深入研究後,將學習成果再和大家彙報。
學習資料:
這裡給出一個用nio實現的tcp client、用netty實現的一個tcp server的例子。
處理過程為:client傳遞a、b兩個整型數,server計算和,將結果返回給客戶端。在服務端加入執行緒池,用來處理兩個數的和。當然了從效能角度,目前這麼簡單的操作完全沒有必要這樣做。主要考慮到如果有更復雜的操作,一般的服務端的模型都是將任務傳入一個訊息佇列,後端再用執行緒池從訊息佇列中取出任務進行處理,再返回處理結果。所以這個地方的執行緒池,可以認為是以後的擴充套件,也可以認為其就是個擺設。
client 程式碼
NioClient.java
package nio.client.test; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.IntBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.HashMap; import java.util.Map; import java.util.Set; public class NioClient { private final static int MAX_BUF_SIZE = 1024; private InetSocketAddress serverAddr; private int clientCount; public NioClient(String ip, int port, int clientCount) { this.clientCount = clientCount; this.serverAddr = new InetSocketAddress(ip, port); } private void sendMessageToSrv(SocketChannel sockChnl, int clientNo, int index) throws IOException { // send data to server... /* ByteBuffer sendBuf = ByteBuffer.allocate(MAX_BUF_SIZE); String sendText = "Client " + clientNo + " say " + index + "\r\n"; sendBuf.put(sendText.getBytes()); sendBuf.flip(); sockChnl.write(sendBuf); System.out.println(sendText);*/ ByteBuffer sendBuf = ByteBuffer.allocate(4*4); sendBuf.putInt(clientNo); sendBuf.putInt(index); sendBuf.putInt(clientNo); sendBuf.putInt(index); sendBuf.flip(); sockChnl.write(sendBuf); String out = String.format("client: %d send message, index: %d, a: %d, b: %d", clientNo, index, clientNo, index); System.out.println(out); } private void recvMessage(SocketChannel sockChnl, int clientNo) throws IOException { /*ByteBuffer recvBuf = ByteBuffer.allocate(MAX_BUF_SIZE); int bytesRead = sockChnl.read(recvBuf); while (bytesRead > 0) { recvBuf.flip(); // write mode to read mode, position to 0, // limit to position String recvText = new String(recvBuf.array(), 0, bytesRead); recvBuf.clear(); // clear buffer content, read mode to write mode, position to 0, limit to capacity System.out.println("Client " + clientNo + " receive: " + recvText); bytesRead = sockChnl.read(recvBuf); }*/ ByteBuffer recvBuf = ByteBuffer.allocate(MAX_BUF_SIZE); int bytesRead = sockChnl.read(recvBuf); while (bytesRead > 0) { recvBuf.flip(); // write mode to read mode, position to 0, // limit to position int result = recvBuf.getInt(); recvBuf.clear(); // clear buffer content, read mode to write mode, position to 0, limit to capacity String out = String.format("client: %d recv message, result: %d", clientNo, result); System.out.println(out); bytesRead = sockChnl.read(recvBuf); } } public void startNioClient() throws IOException, InterruptedException { Selector selector = Selector.open(); for (int i = 0; i < clientCount; i++) { SocketChannel socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); Map<String, Integer> clientInfo = new HashMap<String, Integer>(); clientInfo.put("no", i); clientInfo.put("index", 0); socketChannel.register(selector, SelectionKey.OP_CONNECT, clientInfo); socketChannel.connect(this.serverAddr); } while (true) { int readyChannels = selector.select(); if (0 == readyChannels) { continue; } Set<SelectionKey> selectionKeys = selector.selectedKeys(); for (SelectionKey sk : selectionKeys) { Map clientInfo = (Map) sk.attachment(); int clientNo = (Integer) clientInfo.get("no"); SocketChannel socketchnl = (SocketChannel) sk.channel(); if (sk.isConnectable()) { while(!socketchnl.finishConnect()) { Thread.sleep(5); } if (socketchnl.isConnected()) { System.out.println("connect is finish..."); // send data to server... sendMessageToSrv(socketchnl, clientNo, -1); sk.interestOps(SelectionKey.OP_READ); } } else if (sk.isReadable()) { // read data from server... recvMessage(socketchnl, clientNo); // send data to server... int index = (Integer) clientInfo.get("index"); index += 1; sendMessageToSrv(socketchnl, clientNo, index); clientInfo.put("index", index); } } selectionKeys.clear(); } } public int getClientCount() { return clientCount; } public void setClientCount(int clientCount) { this.clientCount = clientCount; } }
Main.java
package nio.client.test; import java.io.IOException; public class Main { public static void main(String[] args) { System.out.println("clients start.............."); NioClient client = new NioClient("127.0.0.1", 8080, 5000); try { client.startNioClient(); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
server端程式碼:
NettyServer.java
package com.bj58.nettyTest;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.ChannelGroupFuture;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
public class NettyServer {
static final ChannelGroup allChannels = new DefaultChannelGroup("time-server");
public static void main(String[] args) throws Exception {
ExecutorService threadPool = Executors.newFixedThreadPool(5);
ChannelFactory factory = new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool());
ServerBootstrap bootstrap = new ServerBootstrap(factory);
bootstrap.setPipelineFactory(new ServerPipelineFactory(threadPool));
bootstrap.setOption("child.tcpNoDelay", true);
bootstrap.setOption("child.keepAlive", true);
System.out.println("Netty Server start...");
Channel channel = bootstrap.bind(new InetSocketAddress(8080));
/* allChannels.add(channel);
System.out.println("1111111111111111111");
Thread.sleep(2*60*1000);
System.out.println("2222222222222222222");
ChannelGroupFuture future = allChannels.close();
future.awaitUninterruptibly();
factory.releaseExternalResources();
System.out.println("3333333333333333333");*/
}
}
ServerDecoder.java
package com.bj58.nettyTest;
import java.util.ArrayList;
import java.util.List;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.frame.FrameDecoder;
public class ServerDecoder extends FrameDecoder{
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
if (buffer.readableBytes() < 8) {
return null;
}
int clientNo = buffer.readInt();
int index = buffer.readInt();
int a = buffer.readInt();
int b = buffer.readInt();
List<Integer> data = new ArrayList<Integer>();
data.add(clientNo);
data.add(index);
data.add(a);
data.add(b);
return data;
}
}
ServerHandler.java
package com.bj58.nettyTest;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
public class ServerHandler extends SimpleChannelHandler {
private ExecutorService threadPool;
public ServerHandler(ExecutorService threadPool) {
this.threadPool = threadPool;
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws InterruptedException {
/*
* ChannelBuffer buf = (ChannelBuffer)e.getMessage(); byte[] des =
* buf.array(); String recvText = new String(des, 0, des.length);
* System.out.println(recvText); Channel ch = e.getChannel();
* ch.write(e.getMessage());
*/
List<Integer> data = (List<Integer>) e.getMessage();
HandleTask ht = new HandleTask(e);
threadPool.submit(ht);
int clientNo = data.get(0);
int index = data.get(1);
int a = data.get(2);
int b = data.get(3);
String content = String.format("client: %d, index: %d, a: %d, b: %d", clientNo, index, a, b);
System.out.println(content);
Channel ch = e.getChannel();
ChannelBuffer buf = ChannelBuffers.buffer(4);
buf.writeInt(a+b);
ch.write(buf);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
e.getCause().printStackTrace();
Channel ch = e.getChannel();
ch.close();
}
}
class HandleTask implements Runnable {
MessageEvent e;
public HandleTask(MessageEvent e) {
this.e = e;
}
public void run() {
List<Integer> data = (List<Integer>) e.getMessage();
int clientNo = data.get(0);
int index = data.get(1);
int a = data.get(2);
int b = data.get(3);
String content = String.format("client: %d, index: %d, a: %d, b: %d", clientNo, index, a, b);
System.out.println(content);
Channel ch = e.getChannel();
ChannelBuffer buf = ChannelBuffers.buffer(4);
buf.writeInt(a+b);
ch.write(buf);
}
}
ServerPipelineFactory.java
package com.bj58.nettyTest;
import static org.jboss.netty.channel.Channels.pipeline;
import java.util.concurrent.ExecutorService;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
public class ServerPipelineFactory implements ChannelPipelineFactory{
private ExecutorService threadPool;
public ServerPipelineFactory(ExecutorService threadPool) {
this.threadPool = threadPool;
}
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = pipeline();
pipeline.addLast("framer", new ServerDecoder());
pipeline.addLast("handler", new ServerHandler(threadPool));
return pipeline;
}
}
程式碼理解:
客戶端可以同時啟動N個tcp client,同時連線一個tcp server,傳遞a/b兩個數,獲取a/b之和。
當代碼寫完之後,發現通過java的序列化技術,可以直接傳遞一個java 物件,這樣一來,傳送和接收端,處理起來會更簡單一些,而且實際專案中,傳遞的資料要比這個複雜的多。
接下來研究一下java的序列化技術、netty如何傳遞物件、以及Google protobuf,給出一個完整的rpc的程式碼例子。