netty 點對點聊天程式
package com.anxpp.im.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import java.io.IOException; import java.util.Scanner; import com.anxpp.im.common.IMConfig; import com.anxpp.im.common.IMMessage; import com.anxpp.im.common.MsgPackDecode; import com.anxpp.im.common.MsgPackEncode; import com.anxpp.im.server.handler.ServerHandler; public class Server implements Runnable,IMConfig{ ServerHandler serverHandler = new ServerHandler(); public static void main(String[] args) throws IOException{ new Server().start(); } public void start() throws IOException{ new Thread(this).start(); runServerCMD(); } /**啟動服務端控制檯 * @throws IOException */ private void runServerCMD() throws IOException { //服務端主動推送訊息 int toID = 1; IMMessage message = new IMMessage( APP_IM, CLIENT_VERSION, SERVER_ID, TYPE_MSG_TEXT, toID, MSG_EMPTY); @SuppressWarnings("resource") Scanner scanner = new Scanner(System.in); do{ message.setMsg(scanner.nextLine()); } while (serverHandler.sendMsg(message)); } public void run() { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024) // .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65536, 0, 2, 0, 2)); ch.pipeline().addLast("msgpack decoder",new MsgPackDecode()); ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(2)); ch.pipeline().addLast("msgpack encoder",new MsgPackEncode()); ch.pipeline().addLast(serverHandler); } }); ChannelFuture f = b.bind(SERVER_PORT).sync(); f.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { workerGroup.shutdownGracefully(); bossGroup.shutdownGracefully(); } } }
2.
3package com.anxpp.im.server.handler; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.io.IOException; import com.anxpp.im.common.IMMessage; import com.anxpp.im.common.OnlineUser; import com.anxpp.im.server.config.BaseConfig; @ChannelHandler.Sharable public class ServerHandler extends ChannelInboundHandlerAdapter implements BaseConfig{ private ChannelHandlerContext ctx; @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { System.err.println("服務端Handler建立..."); super.handlerAdded(ctx); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.err.println("channelInactive"); super.channelInactive(ctx); } /** * tcp鏈路建立成功後呼叫 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { this.ctx = ctx; System.err.println("有客戶端連線:"+ctx.channel().remoteAddress().toString()); } /** * 傳送訊息 */ public boolean sendMsg(IMMessage msg) throws IOException { System.err.println("伺服器推送訊息:"+msg); ctx.writeAndFlush(msg); return msg.getMsg().equals("q") ? false : true; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException { System.err.println("伺服器接收到訊息:"+msg); IMMessage message = (IMMessage)msg; if(OnlineUser.get(message.getReceiveId())==null){ OnlineUser.put(message.getUid(), ctx); } ChannelHandlerContext c = OnlineUser.get(message.getReceiveId()); if(c==null){ message.setMsg("對方不線上!"); OnlineUser.get(message.getUid()).writeAndFlush(message); } else c.writeAndFlush(message); } /** * 異常處理 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { System.err.println("與客戶端斷開連線:"+cause.getMessage()); cause.printStackTrace(); ctx.close(); } }
4.package com.anxpp.im.common; public interface IMConfig { /**客戶端配置*/ int CLIENT_VERSION = 1; //版本號 /**服務端配置*/ String SERVER_HOST = "127.0.0.1"; //伺服器IP int SERVER_PORT = 9090; //伺服器埠 /**訊息相關*/ int SERVER_ID = 0; //表示伺服器訊息 byte APP_IM = 1; //即時通訊應用ID為1 byte TYPE_CONNECT = 0; //連線後第一次訊息確認建立連線和傳送認證資訊 byte TYPE_MSG_TEXT = 1; //文字訊息 String MSG_EMPTY = ""; //空訊息 }
package com.anxpp.im.common;
import org.msgpack.annotation.Message;
@Message
public class IMMessage {
//應用ID
private byte appId;
//版本號
private int version;
//使用者ID
private int uid;
//訊息型別 0:登陸 1:文字訊息
private byte msgType;
//接收方
private int receiveId;
//訊息內容
private String msg;
public IMMessage() {}
/**
* 構造方法
* @param appId 應用通道
* @param version 應用版本
* @param uid 使用者ID
* @param msgType 訊息型別
* @param receiveId 訊息接收者
* @param msg 訊息內容
*/
public IMMessage(byte appId, int version, int uid, byte msgType, int receiveId, String msg) {
this.appId = appId;
this.version = version;
this.uid = uid;
this.msgType = msgType;
this.receiveId = receiveId;
this.msg = msg;
}
public byte getAppId() {
return appId;
}
public void setAppId(byte appId) {
this.appId = appId;
}
public int getVersion() {
return version;
}
public void setVersion(int version) {
this.version = version;
}
public int getUid() {
return uid;
}
public void setUid(int uid) {
this.uid = uid;
}
public String getMsg() {
return msg;
}
public void setMsg(String msg) {
this.msg = msg;
}
public byte getMsgType() {
return msgType;
}
public void setMsgType(byte msgType) {
this.msgType = msgType;
}
public int getReceiveId() {
return receiveId;
}
public void setReceiveId(int receiveId) {
this.receiveId = receiveId;
}
@Override
public String toString() {
return "appId:"+this.appId+",version:"+this.version+",uid:"+this.uid+",msgType:"+this.msgType+",receiveId:"+this.receiveId+",msg:"+this.msg;
}
}
5.
package com.anxpp.im.common;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import java.util.List;
import org.msgpack.MessagePack;
/**
* 解碼工具
*/
public class MsgPackDecode extends MessageToMessageDecoder<ByteBuf>{
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
final int length = msg.readableBytes();
final byte[] array = new byte[length];
msg.getBytes(msg.readerIndex(), array, 0, length);
out.add(new MessagePack().read(array,IMMessage.class));
}
}
6.
package com.anxpp.im.common;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import java.io.IOException;
import org.msgpack.MessagePack;
/**
* 編碼工具
*/
public class MsgPackEncode extends MessageToByteEncoder<IMMessage>{
@Override
protected void encode(ChannelHandlerContext ctx, IMMessage msg, ByteBuf out) throws IOException {
out.writeBytes(new MessagePack().write(msg));
}
}
7.
package com.anxpp.im.common;
import io.netty.channel.ChannelHandlerContext;
import java.util.HashMap;
/**
* 線上使用者表
* @author Administrator
*
*/
public class OnlineUser {
//使用者表
private static HashMap<Integer, ChannelHandlerContext> onlineUser = new HashMap<Integer, ChannelHandlerContext>();
public static void put(Integer uid, ChannelHandlerContext uchc){
onlineUser.put(uid, uchc);
}
public static void remove(Integer uid){
onlineUser.remove(uid);
}
public static ChannelHandlerContext get(Integer uid){
return onlineUser.get(uid);
}
}
8,。
package com.anxpp.im.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import java.io.IOException;
import java.util.Scanner;
import com.anxpp.im.client.handler.ClientHandler;
import com.anxpp.im.common.IMConfig;
import com.anxpp.im.common.IMMessage;
import com.anxpp.im.common.MsgPackDecode;
import com.anxpp.im.common.MsgPackEncode;
public class Client implements Runnable,IMConfig {
public static int UID = 8889;
public static int toID = 8888;
private ClientHandler clientHandler = new ClientHandler();
public static void main(String[] args) throws IOException{
new Client().start();
}
public void start() throws IOException{
new Thread(this).start();
runServerCMD();
}
public void sendMsg(IMMessage msg) throws IOException {
clientHandler.sendMsg(msg);
}
/**啟動客戶端控制檯*/
private void runServerCMD() throws IOException {
IMMessage message = new IMMessage(
APP_IM,
CLIENT_VERSION,
UID,
TYPE_MSG_TEXT,
toID,
MSG_EMPTY);
@SuppressWarnings("resource")
Scanner scanner = new Scanner(System.in);
do{
message.setMsg(scanner.nextLine());
}
while (clientHandler.sendMsg(message));
}
@Override
public void run() {
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(workerGroup);
b.channel(NioSocketChannel.class);
b.option(ChannelOption.SO_KEEPALIVE, true);
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65536, 0, 2, 0, 2));
ch.pipeline().addLast("msgpack decoder",new MsgPackDecode());
ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(2));
ch.pipeline().addLast("msgpack encoder",new MsgPackEncode());
ch.pipeline().addLast(clientHandler);
}
});
ChannelFuture f = b.connect(SERVER_HOST, SERVER_PORT).sync();
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
}
}
}
9.
package com.anxpp.im.client.handler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.io.IOException;
import com.anxpp.im.client.Client;
import com.anxpp.im.common.IMConfig;
import com.anxpp.im.common.IMMessage;
public class ClientHandler extends ChannelInboundHandlerAdapter implements IMConfig{
private ChannelHandlerContext ctx;
/**
* tcp鏈路簡歷成功後呼叫
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("成功連線伺服器");
this.ctx = ctx;
IMMessage message = new IMMessage(
APP_IM,
CLIENT_VERSION,
Client.UID,
TYPE_CONNECT,
SERVER_ID,
MSG_EMPTY);
sendMsg(message);
}
/**
* 傳送訊息
* @param msg
* @return
* @throws IOException
*/
public boolean sendMsg(IMMessage msg) throws IOException {
System.out.println("client:" + msg);
ctx.channel().writeAndFlush(msg);
return msg.getMsg().equals("q") ? false : true;
}
/**
* 收到訊息後呼叫
* @throws IOException
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException {
IMMessage m = (IMMessage)msg;
System.out.println(m.getUid() + ":" + m.getMsg());
}
/**
* 發生異常時呼叫
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
System.err.println("與伺服器斷開連線:"+cause.getMessage());
ctx.close();
}
}
10 程式碼結構
相關推薦
netty 點對點聊天程式
package com.anxpp.im.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.C
【181027】聊天室程式,點對點,VC++程式原始碼
基於VC++的點對點聊天室程式,程式包括服務端和客戶端,伺服器程式是chatsrvr.exe,客戶端是ChatClient.exe. 伺服器先執行,客戶連線到伺服器執行的計算機的IP即可。在客戶端傳送訊息的時候,可選擇字型顏色、聊天物件、過濾、是否設定為悄悄話等,都是相對基礎點的功能,希望剛
java使用Netty實現點對點聊天
最近學習伺服器開放,實現客戶端(APP)與硬體裝置之間的通訊,我一想到socket,經過查詢資料發現socket實現起來非常麻煩,同時也發現一個比較好的框架netty,覺得很不錯,就開始嘗試用他來擼一個點對點聊天系統,想了解的小夥伴可以自行去學習一下netty 一、一開始是導包,我是匯入
點對點聊天程式的實現
服務端 #include <unistd.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/
netty點對點聊天儲存聊天記錄
效果 聊天介面 聊天記錄 實現方案 1首先拼接使用者id和好友id在Redis資料庫中註冊key值 Jedis jedis = new Jedis("xxx.xx.xx.xxx", 6379); //許可權
HTML5 WebSocket實現點對點聊天的示例代碼
HTML案例分析 HTML5講解 HTML5的websocket與Tomcat實現了多人聊天,那是最簡單也是最基本的,其中註意的就是開發環境,要滿足jdk1.7和tomcat8,當然了tom7的7.063也行,在網站上找到了用google關於websocket的點對點聊天,更好的是可以和大多數系統很好
[NodeJS]NodeJS基於WebSocket的多用戶點對點即時通訊聊天
round serve i++ conn console 即時通訊 .get () str 最近給一個客戶做了一個聊天,所以就用NodeJS做了一個 原理就是用戶第一次進入後,記錄它的ID和該用戶的ws 等有人發數據的時候,就去查ID,然後找到ID對應的ws,進行消息發
Spring Boot整合websocket實現群聊,點對點聊天,圖片傳送,音訊傳送
參考:基於https://blog.csdn.net/qq_38455201/article/details/80374712 基礎上進行新增圖片傳送和音訊傳送功能 單點圖片傳送: 單點音訊傳送: 音訊傳送相關js參考:https://github.
php從零搭建即時通訊(一.點對點聊天)
目錄 零.在thinkphp5環境下搭建gatewayWork環境 2.下載完成後解壓到你TP5專案的vendor目錄下即可,如圖 3.點選start_for_win.bat檔案即可成功啟動你的webSocked伺服器,如果你是lin
基於TCP協議的點對點通訊java程式,帶圖形介面
客戶端 package client; import java.awt.*; import java.awt.event.*; import java.io.DataInputStream; import java.io.DataOutputStream;
SpringBoot整合WebSocket【基於STOMP協議】進行點對點[一對一]和廣播[一對多]實時推送,內附簡易聊天室demo
最近專案來了新需求,需要做一個實時推送的功能,伺服器主動推送訊息給客戶端,在網上經過一輪搜查之後,確定使用WebSocket來進行開發。以前經常聽說WebSocket的神奇之處,如今終於可以嘗試使用它了。1.淺談WebSocketWebSocket是在HTML5基礎上單個TC
java WebSocket實現簡單的聊天室(包括群發和點對點聊天)
今天突然看到了WebSocket然後就網上找了一個例子,然後修改了下,實現了簡單的聊天室,包括群聊和點對點聊天。 使用的程式碼如下 jsp程式碼: <%@ page language="java" import="java.util.*" pageEncoding="
java websocket實現點對點即時聊天
算是一個入門的demo,使用的是springMVC。 必要環境:JDK1.7以上,tomcat7.0以上。以下是乾貨: 1、websocket的jar直接從tomcat執行庫裡面新增到build path裡面。 2、前臺聊天頁面,通過ws://localhost:808
4.13 Go語言專案實戰:點對點聊天
需求摘要 實現一個分散式點對點的聊天系統,所有節點都是對等的,不需要中央伺服器 實現註冊節點名稱,節點之間通過節點名稱發起會話 思路分析 節點同時具備服務端和客戶端的職能 服務端只負責接收其它節點主動傳送過來的訊息 客戶端只負責主動向其
python實現簡單聊天應用(群聊和點對點均實現)
後續程式碼更新和功能新增會提交到個人github主頁,有興趣可以一起來完善! 如果只是拿過去執行看結果,請注意平臺相關性以及python版本號,本示例開發執行平臺為win7x86_64 pycharm community,python版本號為3.5!!! T
JMS消息隊列ActiveMQ(點對點模式)
jms activemq 消息隊列 生產者(producer)->消息隊列(message queue)package com.java1234.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactor
比特幣交易所面臨倒閉,點對點、場外交易將盛行
上進 str 訂單 虛擬 交易所 服務 com 交易 比特 9月16日火幣網、OKCoin聲稱要關閉所有虛擬幣業務,關閉交易和關閉交易所是兩個概念。 sosobtc官網也發布通告:於19日晚間23點關閉網站的行情數據、聊天社交等信息服務,APP也將於3日內關閉相關服務。
境外“點對點|場外交易OTC新模式數字貨幣開發”強勢來襲
數字貨幣 點對點 場外交易 系統開發 境外“點對點|場外交易OTC新模式數字貨幣開發”強勢來襲 當下面臨著國家監管的政策,行情的變化,很多交易平臺紛紛關閉,傳統的撮合交易系統退出了歷史的舞臺,很多玩家現在出現了問題,自己的幣去哪交易呢?如過交易平臺關了就沒有市場進行交易了呀!針對這個問題出現
activeMQ點對點
ssa oca exceptio pac 開啟事務 ive mes 啟動 cal 摘要: ActiveMQ 點對點消息 Point-to-Point 是一對一 創建消息生產者 /** * 點對點消息生產者 * * @author Edward * */
分析比特幣網絡:一種去中心化、點對點的網絡架構
比特幣 區塊鏈 比特幣采用了基於互聯網的點對點(P2P:peer-to-peer)分布式網絡架構。比特幣網絡可以認為是按照比特幣P2P協議運行的一系列節點的集合。本文來分析下比特幣網絡,了解它跟傳統中心化網絡的區別,以及比特幣網絡是如何發現相鄰節點的。中心化網絡為了更好的理解P2P網絡,我們先來看看傳