實時通訊 socketio nio 總結
阿新 • • 發佈:2019-02-03
公司要求多一個實時通訊的功能
解決思路如下
架構圖:
後臺管理頁面實時顯示線上的終端情況
終端伺服器和後臺伺服器之間用NIO通訊
當有終端登入登出,後臺管理伺服器(服務端)接收終端伺服器(客戶端) 介面請求
後臺管理伺服器(服務的)傳送資訊給實時監控頁面(客戶端)
附上程式碼:
終端的客戶端
public class EmployeeSocketClient {
static SocketChannel sc =null;
final static InetSocketAddress address = new InetSocketAddress(
"127.0.0.1", 8085);
ByteBuffer buffer = ByteBuffer.allocate(1024);
static {
try {
//這裡搞個單例好像也行
// 開啟通道
sc = SocketChannel.open();
// 建立連線
sc.connect(address);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void sendMessage(String inputMsg) {
try {
// 把輸入的資料放入buffer緩衝區
buffer.put(inputMsg.getBytes());
// 復位操作
// 重置capacity、position、limit位置,不用深入瞭解
buffer.flip();
// 將buffer的資料寫入通道
sc.write(buffer);
System.out.println(sc.hashCode());
// 清空緩衝區中的資料
buffer.clear();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
//這裡註釋掉是為了每次啟動伺服器之後做一個靜態的管道,不用關閉後每次新建
// finally {
// if (sc != null) {
// try {
// sc.close();
// } catch (IOException e) {
// e.printStackTrace();
// }
// }
// }
}
}
後臺管理伺服器NIO伺服器程式碼:
public class EmployeeSocketServer implements InitializingBean {//spring載入完類的構造方法之後,就會執行afterPropertiesSet方法
private Selector selector;
private ByteBuffer buffer = ByteBuffer.allocate(1024);
@Value("${socket_server_port}")//springboot配置引數,如果不用InitializingBean 注入不進來
private String port;
@Autowired
private EmployeeMapper emploueeMapper;
public EmployeeSocketServer() {
try {
// 1 開啟多複用器
selector = Selector.open();
// 2 開啟伺服器通道
// 這裡可以選擇netty的channel
ServerSocketChannel ssc = ServerSocketChannel.open();
// 3 設定伺服器通道為非阻塞方式
ssc.configureBlocking(false);
// 4 繫結地址
if (port == null || "".equals(port)) {
port = "8085";
}
ssc.bind(new InetSocketAddress(Integer.valueOf(port)));
// 5 把伺服器通道註冊到多路複用選擇器上,並監聽阻塞狀態
ssc.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("Server start, port:" + port);
} catch (IOException e) {
e.printStackTrace();
}
}
public void init() {
while (true) {
try {
// 1 必須讓多路複用選擇器開始監聽
selector.select();
// 2 返回所有已經註冊到多路複用選擇器上的通道的SelectionKey
Iterator<SelectionKey> keys = selector.selectedKeys()
.iterator();
// 3 遍歷keys,輪詢開始
while (keys.hasNext()) {
SelectionKey key = keys.next();
// 目的,防止在遍歷的時候對keys有插入刪除操作
keys.remove();
if (key.isValid()) { // 如果key的狀態是有效的
// 這部是在read之前註冊管道
if (key.isAcceptable()) { // 如果key是阻塞狀態,則呼叫accept()方法
accept(key);
}
if (key.isReadable()) { // 如果key是可讀狀態,則呼叫read()方法
read(key);
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void accept(SelectionKey key) {
try {
// 1 獲取伺服器通道
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
// 2 執行阻塞方法
SocketChannel sc = ssc.accept();
// 3 設定阻塞模式為非阻塞
sc.configureBlocking(false);
// 4 註冊到多路複用選擇器上,並設定讀取標識
sc.register(selector, SelectionKey.OP_READ);
} catch (Exception e) {
e.printStackTrace();
}
}
private void read(SelectionKey key) {
try {
// 1 清空緩衝區中的舊資料
buffer.clear();
// 2 獲取之前註冊的SocketChannel通道
SocketChannel sc = (SocketChannel) key.channel();
// 3 將sc中的資料放入buffer中
int count = sc.read(buffer);
if (count == -1) { // == -1表示通道中沒有資料
key.channel().close();
key.cancel();
return;
}
// 讀取到了資料,將buffer的position復位到0
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
// 將buffer中的資料寫入byte[]中
buffer.get(bytes);
String body = new String(bytes).trim();
if ("employeeLogin".equals(body) || "employeeLogout".equals(body)) {
for (SocketIOClient client : EmployeeCurrentTimeInfo.clients) {//socketio伺服器配置的靜態變數
Random random = new Random();
client.sendEvent("pushpoint", new Point(random.nextInt(100), random.nextInt(100)) );
}
}
System.out.println("Server:" + body);
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void afterPropertiesSet() throws Exception {
init();
}
}
後臺管理伺服器socketio伺服器程式碼:
public class EmployeeCurrentTimeInfo implements InitializingBean{
@Value("${socketio_ip}")
private String socketioIp;
@Value("${socketio_port}")
private String socketioPort;
@Value("${socketio_delay}")
private String socketioDelay;
@Autowired
private EmployeeService employeeService ;
// 用於儲存所有客戶端
public static List<SocketIOClient> clients = new ArrayList<SocketIOClient>();
public SocketIOServer server ;
public void init() throws InterruptedException {
Configuration configuration = new Configuration();
if(socketioIp == null || "".equals(socketioIp)){
socketioIp = "localhost";
}
if(socketioPort == null || "".equals(socketioPort)){
socketioPort = "8032";
}
if(socketioDelay == null || "".equals(socketioDelay)){
socketioDelay = "3000";
}
configuration.setHostname(socketioIp);// 設定主機名
configuration.setPort(Integer.valueOf(socketioPort));// 設定監聽的埠號
server = new SocketIOServer(configuration);// 根據配置建立伺服器物件
server.addConnectListener(new ConnectListener() {// 新增客戶端連線監聽器
@Override
public void onConnect(SocketIOClient client) {
clients.add(client);// 儲存客戶端
}
});
server.addDisconnectListener(new DisconnectListener() {
//方式重新整理頁面時候會導致list裡有多個無效的客戶端,當銷燬時,從list裡刪除,保證每次先銷燬再加入
@Override
public void onDisconnect(SocketIOClient client) {
clients.remove(client);
}
});
server.start();
System.out.println("server started");
// 預留,三秒一重新整理功能
// ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
// executorService.scheduleWithFixedDelay(new Runnable() {
// @Override
// public void run() {
// Random random = new Random();
// for (SocketIOClient client : clients) {
// client.sendEvent("pushpoint", new Point(
// random.nextInt(100), random.nextInt(100)));// 每隔一秒推送一次
// // System.out.println(p.x +"____"+p.y);
// }
// }
// }, 1000, Integer.valueOf(socketioDelay), TimeUnit.MILLISECONDS);
//
//
// // 防止同步併發問題,需要保留
// Object object = new Object();
// synchronized (object) {
// object.wait();
// }
}
@Override
public void afterPropertiesSet() throws Exception {
init();
}
}
實時監控頁面:
<!DOCTYPE html>
<html>
<head>
<title>netty-socketio測試</title>
<meta http-equiv="Content-Type" content="text/html;charset=UTF-8"/>
<!-- 自己去下載 -->
<script src="js/socket.io/socket.io.js"></script>
<script src="js/moment.min.js"></script>
<script src="js/jquery-1.7.2.min.js"></script>
<script>
$(function(){
var socket = io.connect('http://192.168.1.122:8032');
//監聽名為pushpoint的事件,這與服務端推送的那個事件名稱必須一致
socket.on("pushpoint", function(data){
$('#x').text(data.x);
$('#y').text(data.y);
});
});
</script>
</head>
<body>
<div id="display" style="height:50px;background-color:grey;">
x=<span id="x">0</span>, y=<span id="y">0</span>
</div>
</body>
</html>
剛開始準備做成股票監控系統那樣,3-3.5秒重新整理一次資訊,但發現實際業務沒有那麼複雜,就把實時重新整理給註釋掉了
socketio有2種方式,一種是上面寫到的伺服器自動傳送請求的,另一種是客戶端傳送請求,伺服器響應的方式。
以上2種方式結合使用,可以實現分頁效果:
例如頁面要顯示的資料由10頁,每頁10個,但是當終端狀態改變時頁面客戶端不會自動傳送請求,則需要在頁面客戶端第一次請求時附帶分頁預設值,當頁面客戶端傳送分頁請求時,改變預設值,並作為入參去查詢資料庫。千萬不要快取所有的資料在前段自己來做分頁處理,那樣好蠢。