基於Netty3的RPC架構筆記3之執行緒模型原始碼分析
阿新 • • 發佈:2019-01-30
隨著使用者量上升,專案的架構也在不斷的升級,由最開始的MVC的垂直架構(傳統專案)到RPC架構(webservice,rest,netty,mina),再到SOA模型(dubbo),再到最近的微服務,又比如Tomcat6之前的IO模型都是BIO 也就是阻塞IO,到後來變成多路複用,也是阻塞IO。到非阻塞NIO,再到非同步非阻塞AIO,
言歸正傳,接著談netty,傳統IO是一個執行緒服務一個客戶,後來通過netty,可以一個執行緒服務多個客戶,下面的那個圖展示的是netty的NIO通過引入多執行緒來提高效能,既一個執行緒負責一片使用者
直接上程式碼
package com.cn; import java.net.InetSocketAddress; import java.util.concurrent.Executors; import com.cn.pool.NioSelectorRunnablePool; /** * 啟動函式 * */ public class Start { public static void main(String[] args) { //初始化執行緒 NioSelectorRunnablePool nioSelectorRunnablePool = new NioSelectorRunnablePool(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); //獲取服務類 ServerBootstrap bootstrap = new ServerBootstrap(nioSelectorRunnablePool); //繫結埠 bootstrap.bind(new InetSocketAddress(10101)); System.out.println("start"); } }
package com.cn.pool; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicInteger; import com.cn.NioServerBoss; import com.cn.NioServerWorker; /** * selector執行緒管理者 * */ public class NioSelectorRunnablePool { /** * boss執行緒陣列 */ private final AtomicInteger bossIndex = new AtomicInteger(); private Boss[] bosses; /** * worker執行緒陣列 */ private final AtomicInteger workerIndex = new AtomicInteger(); private Worker[] workeres; public NioSelectorRunnablePool(Executor boss, Executor worker) { initBoss(boss, 1); initWorker(worker, Runtime.getRuntime().availableProcessors() * 2); } /** * 初始化boss執行緒 * @param boss * @param count */ private void initBoss(Executor boss, int count) { this.bosses = new NioServerBoss[count]; for (int i = 0; i < bosses.length; i++) { bosses[i] = new NioServerBoss(boss, "boss thread " + (i+1), this); } } /** * 初始化worker執行緒 * @param worker * @param count */ private void initWorker(Executor worker, int count) { this.workeres = new NioServerWorker[count]; for (int i = 0; i < workeres.length; i++) { workeres[i] = new NioServerWorker(worker, "worker thread " + (i+1), this); } } /** * 獲取一個worker * @return */ public Worker nextWorker() { return workeres[Math.abs(workerIndex.getAndIncrement() % workeres.length)]; } /** * 獲取一個boss * @return */ public Boss nextBoss() { return bosses[Math.abs(bossIndex.getAndIncrement() % bosses.length)]; } }
package com.cn; import java.net.SocketAddress; import java.nio.channels.ServerSocketChannel; import com.cn.pool.Boss; import com.cn.pool.NioSelectorRunnablePool; /** * 服務類 * */ public class ServerBootstrap { private NioSelectorRunnablePool selectorRunnablePool; public ServerBootstrap(NioSelectorRunnablePool selectorRunnablePool) { this.selectorRunnablePool = selectorRunnablePool; } /** * 繫結埠 * @param localAddress */ public void bind(final SocketAddress localAddress){ try { // 獲得一個ServerSocket通道 ServerSocketChannel serverChannel = ServerSocketChannel.open(); // 設定通道為非阻塞 serverChannel.configureBlocking(false); // 將該通道對應的ServerSocket繫結到port埠 serverChannel.socket().bind(localAddress); //獲取一個boss執行緒 Boss nextBoss = selectorRunnablePool.nextBoss(); //向boss註冊一個ServerSocket通道 nextBoss.registerAcceptChannelTask(serverChannel); } catch (Exception e) { e.printStackTrace(); } } }
package com.cn.pool;
import java.nio.channels.SocketChannel;
/**
* worker介面
*
*/
public interface Worker {
/**
* 加入一個新的客戶端會話
* @param channel
*/
public void registerNewChannelTask(SocketChannel channel);
}
package com.cn.pool;
import java.nio.channels.ServerSocketChannel;
/**
* boss介面
*
*/
public interface Boss {
/**
* 加入一個新的ServerSocket
* @param serverChannel
*/
public void registerAcceptChannelTask(ServerSocketChannel serverChannel);
}
package com.cn;
import java.io.IOException;
import java.nio.channels.Selector;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import com.cn.pool.NioSelectorRunnablePool;
/**
* 抽象selector執行緒類
*
*
*/
public abstract class AbstractNioSelector implements Runnable {
/**
* 執行緒池
*/
private final Executor executor;
/**
* 選擇器
*/
protected Selector selector;
/**
* 選擇器wakenUp狀態標記
*/
protected final AtomicBoolean wakenUp = new AtomicBoolean();
/**
* 任務佇列
*/
private final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>();
/**
* 執行緒名稱
*/
private String threadName;
/**
* 執行緒管理物件
*/
protected NioSelectorRunnablePool selectorRunnablePool;
AbstractNioSelector(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {
this.executor = executor;
this.threadName = threadName;
this.selectorRunnablePool = selectorRunnablePool;
openSelector();
}
/**
* 獲取selector並啟動執行緒
*/
private void openSelector() {
try {
this.selector = Selector.open();
} catch (IOException e) {
throw new RuntimeException("Failed to create a selector.");
}
executor.execute(this);
}
@Override
public void run() {
Thread.currentThread().setName(this.threadName);
while (true) {
try {
wakenUp.set(false);
select(selector);
processTaskQueue();
process(selector);
} catch (Exception e) {
// ignore
}
}
}
/**
* 註冊一個任務並激活selector
*
* @param task
*/
protected final void registerTask(Runnable task) {
taskQueue.add(task);
Selector selector = this.selector;
if (selector != null) {
if (wakenUp.compareAndSet(false, true)) {
selector.wakeup();
}
} else {
taskQueue.remove(task);
}
}
/**
* 執行佇列裡的任務
*/
private void processTaskQueue() {
for (;;) {
final Runnable task = taskQueue.poll();
if (task == null) {
break;
}
task.run();
}
}
/**
* 獲取執行緒管理物件
* @return
*/
public NioSelectorRunnablePool getSelectorRunnablePool() {
return selectorRunnablePool;
}
/**
* select抽象方法
*
* @param selector
* @return
* @throws IOException
*/
protected abstract int select(Selector selector) throws IOException;
/**
* selector的業務處理
*
* @param selector
* @throws IOException
*/
protected abstract void process(Selector selector) throws IOException;
}
package com.cn;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Executor;
import com.cn.pool.Boss;
import com.cn.pool.NioSelectorRunnablePool;
import com.cn.pool.Worker;
/**
* boss實現類
*
*/
public class NioServerBoss extends AbstractNioSelector implements Boss{
public NioServerBoss(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {
super(executor, threadName, selectorRunnablePool);
}
@Override
protected void process(Selector selector) throws IOException {
Set<SelectionKey> selectedKeys = selector.selectedKeys();
if (selectedKeys.isEmpty()) {
return;
}
for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
SelectionKey key = i.next();
i.remove();
ServerSocketChannel server = (ServerSocketChannel) key.channel();
// 新客戶端
SocketChannel channel = server.accept();
// 設定為非阻塞
channel.configureBlocking(false);
// 獲取一個worker
Worker nextworker = getSelectorRunnablePool().nextWorker();
// 註冊新客戶端接入任務
nextworker.registerNewChannelTask(channel);
System.out.println("新客戶端連結");
}
}
public void registerAcceptChannelTask(final ServerSocketChannel serverChannel){
final Selector selector = this.selector;
registerTask(new Runnable() {
@Override
public void run() {
try {
//註冊serverChannel到selector
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
}
});
}
@Override
protected int select(Selector selector) throws IOException {
return selector.select();
}
}
package com.cn;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.Executor;
import com.cn.pool.NioSelectorRunnablePool;
import com.cn.pool.Worker;
/**
* worker實現類
*
*/
public class NioServerWorker extends AbstractNioSelector implements Worker{
public NioServerWorker(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {
super(executor, threadName, selectorRunnablePool);
}
@Override
protected void process(Selector selector) throws IOException {
Set<SelectionKey> selectedKeys = selector.selectedKeys();
if (selectedKeys.isEmpty()) {
return;
}
Iterator<SelectionKey> ite = this.selector.selectedKeys().iterator();
while (ite.hasNext()) {
SelectionKey key = (SelectionKey) ite.next();
// 移除,防止重複處理
ite.remove();
// 得到事件發生的Socket通道
SocketChannel channel = (SocketChannel) key.channel();
// 資料總長度
int ret = 0;
boolean failure = true;
ByteBuffer buffer = ByteBuffer.allocate(1024);
//讀取資料
try {
ret = channel.read(buffer);
failure = false;
} catch (Exception e) {
// ignore
}
//判斷是否連線已斷開
if (ret <= 0 || failure) {
key.cancel();
System.out.println("客戶端斷開連線");
}else{
System.out.println("收到資料:" + new String(buffer.array()));
//回寫資料
ByteBuffer outBuffer = ByteBuffer.wrap("收到\n".getBytes());
channel.write(outBuffer);// 將訊息回送給客戶端
}
}
}
/**
* 加入一個新的socket客戶端
*/
public void registerNewChannelTask(final SocketChannel channel){
final Selector selector = this.selector;
registerTask(new Runnable() {
@Override
public void run() {
try {
//將客戶端註冊到selector中
channel.register(selector, SelectionKey.OP_READ);
} catch (ClosedChannelException e) {
e.printStackTrace();
}
}
});
}
@Override
protected int select(Selector selector) throws IOException {
return selector.select(500);
}
}
上面的例子是直接引用jar,也可以通過引用專案netty的原始碼從而理解netty工作原理
試想我們如何提高NIO的工作效率,一個NIO是不是隻能有一個selector?當然不是,一個系統可以有多個selector
selector可以註冊多個ServerSocketChannel
我們如何去看一個開源框架的程式碼
一斷點(多執行緒的情況下可以設定斷點的條件,指定列印某個執行緒)
二列印
三看呼叫棧
四搜尋