1. 程式人生 > >netty 點對點聊天程式

netty 點對點聊天程式

package com.anxpp.im.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;

import java.io.IOException;
import java.util.Scanner;

import com.anxpp.im.common.IMConfig;
import com.anxpp.im.common.IMMessage;
import com.anxpp.im.common.MsgPackDecode;
import com.anxpp.im.common.MsgPackEncode;
import com.anxpp.im.server.handler.ServerHandler;
public class Server implements Runnable,IMConfig{
    ServerHandler serverHandler = new ServerHandler();
    public static void main(String[] args) throws IOException{
        new Server().start();
    }
    public void start() throws IOException{
        new Thread(this).start();
        runServerCMD();
    }
    /**啟動服務端控制檯
     * @throws IOException */
    private void runServerCMD() throws IOException {
        //服務端主動推送訊息
        int toID = 1;
        IMMessage message = new IMMessage(
                APP_IM,
                CLIENT_VERSION,
                SERVER_ID,
                TYPE_MSG_TEXT,
                toID,
                MSG_EMPTY);
        @SuppressWarnings("resource")
        Scanner scanner = new Scanner(System.in);
        do{
            message.setMsg(scanner.nextLine());
        }
        while (serverHandler.sendMsg(message));
    }
    public void run() {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)
//                  .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65536, 0, 2, 0, 2));
                            ch.pipeline().addLast("msgpack decoder",new MsgPackDecode());
                            ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(2));
                            ch.pipeline().addLast("msgpack encoder",new MsgPackEncode());
                            ch.pipeline().addLast(serverHandler);
                        }
                    }); 
            ChannelFuture f = b.bind(SERVER_PORT).sync();
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

2.
package com.anxpp.im.server.handler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.io.IOException;
import com.anxpp.im.common.IMMessage;
import com.anxpp.im.common.OnlineUser;
import com.anxpp.im.server.config.BaseConfig;
@ChannelHandler.Sharable
public class ServerHandler extends ChannelInboundHandlerAdapter implements BaseConfig{
    private ChannelHandlerContext ctx;
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        System.err.println("服務端Handler建立...");
        super.handlerAdded(ctx);
    }
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.err.println("channelInactive");
        super.channelInactive(ctx);
    }
    /**
     * tcp鏈路建立成功後呼叫
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        this.ctx = ctx;
        System.err.println("有客戶端連線:"+ctx.channel().remoteAddress().toString());
    }
    /**
     * 傳送訊息
     */
    public boolean sendMsg(IMMessage msg) throws IOException {
        System.err.println("伺服器推送訊息:"+msg);
        ctx.writeAndFlush(msg);
        return msg.getMsg().equals("q") ? false : true;
    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException {
        System.err.println("伺服器接收到訊息:"+msg);
        IMMessage message = (IMMessage)msg;
        if(OnlineUser.get(message.getReceiveId())==null){
            OnlineUser.put(message.getUid(), ctx);
        }
        ChannelHandlerContext c = OnlineUser.get(message.getReceiveId());
        if(c==null){
            message.setMsg("對方不線上!");
            OnlineUser.get(message.getUid()).writeAndFlush(message);
        }
        else
            c.writeAndFlush(message);
    }
    /**
     * 異常處理
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        System.err.println("與客戶端斷開連線:"+cause.getMessage());
        cause.printStackTrace();
        ctx.close();
    }
}
3
package com.anxpp.im.common;
public interface IMConfig {
    /**客戶端配置*/
    int     CLIENT_VERSION = 1;         //版本號
    /**服務端配置*/
    String  SERVER_HOST = "127.0.0.1";  //伺服器IP
    int     SERVER_PORT = 9090;         //伺服器埠
    /**訊息相關*/
    int     SERVER_ID   = 0;            //表示伺服器訊息
    byte    APP_IM = 1;         //即時通訊應用ID為1
    byte    TYPE_CONNECT = 0;   //連線後第一次訊息確認建立連線和傳送認證資訊
    byte    TYPE_MSG_TEXT = 1;  //文字訊息
    String  MSG_EMPTY = "";     //空訊息
}
4.
package com.anxpp.im.common;

import org.msgpack.annotation.Message;

@Message
public class IMMessage {
    //應用ID
    private byte appId;
    //版本號
    private int version;
    //使用者ID
    private int uid;
    //訊息型別    0:登陸    1:文字訊息
    private byte msgType;
    //接收方
    private int receiveId;
    //訊息內容
    private String msg;
    public IMMessage() {}
    /**
     * 構造方法
     * @param appId     應用通道
     * @param version   應用版本
     * @param uid       使用者ID
     * @param msgType   訊息型別
     * @param receiveId 訊息接收者
     * @param msg       訊息內容
     */
    public IMMessage(byte appId, int version, int uid, byte msgType, int receiveId, String msg) {
        this.appId = appId;
        this.version = version;
        this.uid = uid;
        this.msgType = msgType;
        this.receiveId = receiveId;
        this.msg = msg;
    }
    public byte getAppId() {
        return appId;
    }
    public void setAppId(byte appId) {
        this.appId = appId;
    }
    public int getVersion() {
        return version;
    }
    public void setVersion(int version) {
        this.version = version;
    }
    public int getUid() {
        return uid;
    }
    public void setUid(int uid) {
        this.uid = uid;
    }
    public String getMsg() {
        return msg;
    }
    public void setMsg(String msg) {
        this.msg = msg;
    }
    public byte getMsgType() {
        return msgType;
    }
    public void setMsgType(byte msgType) {
        this.msgType = msgType;
    }
    public int getReceiveId() {
        return receiveId;
    }
    public void setReceiveId(int receiveId) {
        this.receiveId = receiveId;
    }
    @Override
    public String toString() {
        return "appId:"+this.appId+",version:"+this.version+",uid:"+this.uid+",msgType:"+this.msgType+",receiveId:"+this.receiveId+",msg:"+this.msg;
    }
}
5.
package com.anxpp.im.common;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageDecoder;
import java.util.List;
import org.msgpack.MessagePack;
/**
 * 解碼工具
 */
public class MsgPackDecode extends MessageToMessageDecoder<ByteBuf>{
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
        final int length = msg.readableBytes();
        final byte[] array = new byte[length];
        msg.getBytes(msg.readerIndex(), array, 0, length);
        out.add(new MessagePack().read(array,IMMessage.class));
    }
}

6.
package com.anxpp.im.common;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import java.io.IOException;
import org.msgpack.MessagePack;
/**
 * 編碼工具
 */
public class MsgPackEncode extends MessageToByteEncoder<IMMessage>{
    @Override
    protected void encode(ChannelHandlerContext ctx, IMMessage msg, ByteBuf out) throws IOException {
        out.writeBytes(new MessagePack().write(msg));
    }
}


7.
package com.anxpp.im.common;
import io.netty.channel.ChannelHandlerContext;
import java.util.HashMap;
/**
 * 線上使用者表
 * @author Administrator
 *
 */
public class OnlineUser {
    //使用者表
    private static HashMap<Integer, ChannelHandlerContext> onlineUser = new HashMap<Integer, ChannelHandlerContext>();
    public static void put(Integer uid, ChannelHandlerContext uchc){
        onlineUser.put(uid, uchc);
    }
    public static void remove(Integer uid){
        onlineUser.remove(uid);
    }
    public static ChannelHandlerContext get(Integer uid){
        return onlineUser.get(uid);
    }
}


8,。
package com.anxpp.im.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;

import java.io.IOException;
import java.util.Scanner;


import com.anxpp.im.client.handler.ClientHandler;
import com.anxpp.im.common.IMConfig;
import com.anxpp.im.common.IMMessage;
import com.anxpp.im.common.MsgPackDecode;
import com.anxpp.im.common.MsgPackEncode;
public class Client implements Runnable,IMConfig {
    public static int UID = 8889;
    public static int toID = 8888;
    private ClientHandler clientHandler = new ClientHandler();
    public static void main(String[] args) throws IOException{
        new Client().start();
    }
    public void start() throws IOException{
        new Thread(this).start();
        runServerCMD();
    }
    public void sendMsg(IMMessage msg) throws IOException {
        clientHandler.sendMsg(msg);
    }
    /**啟動客戶端控制檯*/
    private void runServerCMD() throws IOException {
        IMMessage message = new IMMessage(
                APP_IM,
                CLIENT_VERSION,
                UID,
                TYPE_MSG_TEXT,
                toID,
                MSG_EMPTY);
        @SuppressWarnings("resource")
        Scanner scanner = new Scanner(System.in);
        do{
            message.setMsg(scanner.nextLine());
        }
        while (clientHandler.sendMsg(message));
    }
    @Override
    public void run() {
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(workerGroup);
            b.channel(NioSocketChannel.class);
            b.option(ChannelOption.SO_KEEPALIVE, true);
            b.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast("frameDecoder", new LengthFieldBasedFrameDecoder(65536, 0, 2, 0, 2));
                    ch.pipeline().addLast("msgpack decoder",new MsgPackDecode());
                    ch.pipeline().addLast("frameEncoder", new LengthFieldPrepender(2));
                    ch.pipeline().addLast("msgpack encoder",new MsgPackEncode());
                    ch.pipeline().addLast(clientHandler);
                }
            });
            ChannelFuture f = b.connect(SERVER_HOST, SERVER_PORT).sync();
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            workerGroup.shutdownGracefully();
        }
    }
}
9.
package com.anxpp.im.client.handler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.io.IOException;

import com.anxpp.im.client.Client;
import com.anxpp.im.common.IMConfig;
import com.anxpp.im.common.IMMessage;
public class ClientHandler extends ChannelInboundHandlerAdapter implements IMConfig{
    private ChannelHandlerContext ctx;
    /**
     * tcp鏈路簡歷成功後呼叫
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("成功連線伺服器");
        this.ctx = ctx;
        IMMessage message = new IMMessage(
                APP_IM,
                CLIENT_VERSION,
                Client.UID,
                TYPE_CONNECT,
                SERVER_ID,
                MSG_EMPTY);
        sendMsg(message);
    }
    /**
     * 傳送訊息
     * @param msg
     * @return
     * @throws IOException 
     */
    public boolean sendMsg(IMMessage msg) throws IOException {
        System.out.println("client:" + msg);
        ctx.channel().writeAndFlush(msg);
        return msg.getMsg().equals("q") ? false : true;
    }
    /**
     * 收到訊息後呼叫
     * @throws IOException 
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws IOException {
        IMMessage m = (IMMessage)msg;
        System.out.println(m.getUid() + ":" + m.getMsg());
    }
    /**
     * 發生異常時呼叫
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        System.err.println("與伺服器斷開連線:"+cause.getMessage());
        ctx.close();
    }
}

10 程式碼結構




相關推薦

netty 聊天程式

package com.anxpp.im.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.C

【181027】聊天程式,VC++程式原始碼

基於VC++的點對點聊天室程式,程式包括服務端和客戶端,伺服器程式是chatsrvr.exe,客戶端是ChatClient.exe.   伺服器先執行,客戶連線到伺服器執行的計算機的IP即可。在客戶端傳送訊息的時候,可選擇字型顏色、聊天物件、過濾、是否設定為悄悄話等,都是相對基礎點的功能,希望剛

java使用Netty實現聊天

最近學習伺服器開放,實現客戶端(APP)與硬體裝置之間的通訊,我一想到socket,經過查詢資料發現socket實現起來非常麻煩,同時也發現一個比較好的框架netty,覺得很不錯,就開始嘗試用他來擼一個點對點聊天系統,想了解的小夥伴可以自行去學習一下netty 一、一開始是導包,我是匯入

聊天程式的實現

服務端 #include <unistd.h> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/

netty聊天儲存聊天記錄

效果 聊天介面 聊天記錄 實現方案 1首先拼接使用者id和好友id在Redis資料庫中註冊key值 Jedis jedis = new Jedis("xxx.xx.xx.xxx", 6379); //許可權

HTML5 WebSocket實現聊天的示例代碼

HTML案例分析 HTML5講解 HTML5的websocket與Tomcat實現了多人聊天,那是最簡單也是最基本的,其中註意的就是開發環境,要滿足jdk1.7和tomcat8,當然了tom7的7.063也行,在網站上找到了用google關於websocket的點對點聊天,更好的是可以和大多數系統很好

[NodeJS]NodeJS基於WebSocket的多用戶即時通訊聊天

round serve i++ conn console 即時通訊 .get () str 最近給一個客戶做了一個聊天,所以就用NodeJS做了一個 原理就是用戶第一次進入後,記錄它的ID和該用戶的ws 等有人發數據的時候,就去查ID,然後找到ID對應的ws,進行消息發

Spring Boot整合websocket實現群聊,聊天,圖片傳送,音訊傳送

參考:基於https://blog.csdn.net/qq_38455201/article/details/80374712  基礎上進行新增圖片傳送和音訊傳送功能   單點圖片傳送: 單點音訊傳送: 音訊傳送相關js參考:https://github.

php從零搭建即時通訊(一.聊天)

目錄 零.在thinkphp5環境下搭建gatewayWork環境 2.下載完成後解壓到你TP5專案的vendor目錄下即可,如圖 3.點選start_for_win.bat檔案即可成功啟動你的webSocked伺服器,如果你是lin

基於TCP協議的通訊java程式,帶圖形介面

客戶端 package client; import java.awt.*; import java.awt.event.*; import java.io.DataInputStream; import java.io.DataOutputStream;

SpringBoot整合WebSocket【基於STOMP協議】進行[一對一]和廣播[一對多]實時推送,內附簡易聊天室demo

最近專案來了新需求,需要做一個實時推送的功能,伺服器主動推送訊息給客戶端,在網上經過一輪搜查之後,確定使用WebSocket來進行開發。以前經常聽說WebSocket的神奇之處,如今終於可以嘗試使用它了。1.淺談WebSocketWebSocket是在HTML5基礎上單個TC

java WebSocket實現簡單的聊天室(包括群發和聊天

今天突然看到了WebSocket然後就網上找了一個例子,然後修改了下,實現了簡單的聊天室,包括群聊和點對點聊天。 使用的程式碼如下 jsp程式碼: <%@ page language="java" import="java.util.*" pageEncoding="

java websocket實現即時聊天

算是一個入門的demo,使用的是springMVC。 必要環境:JDK1.7以上,tomcat7.0以上。以下是乾貨: 1、websocket的jar直接從tomcat執行庫裡面新增到build path裡面。 2、前臺聊天頁面,通過ws://localhost:808

4.13 Go語言專案實戰:聊天

需求摘要 實現一個分散式點對點的聊天系統,所有節點都是對等的,不需要中央伺服器 實現註冊節點名稱,節點之間通過節點名稱發起會話 思路分析 節點同時具備服務端和客戶端的職能 服務端只負責接收其它節點主動傳送過來的訊息 客戶端只負責主動向其

python實現簡單聊天應用(群聊和均實現)

後續程式碼更新和功能新增會提交到個人github主頁,有興趣可以一起來完善! 如果只是拿過去執行看結果,請注意平臺相關性以及python版本號,本示例開發執行平臺為win7x86_64 pycharm community,python版本號為3.5!!! T

JMS消息隊列ActiveMQ(模式)

jms activemq 消息隊列 生產者(producer)->消息隊列(message queue)package com.java1234.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactor

比特幣交易所面臨倒閉,、場外交易將盛行

上進 str 訂單 虛擬 交易所 服務 com 交易 比特 9月16日火幣網、OKCoin聲稱要關閉所有虛擬幣業務,關閉交易和關閉交易所是兩個概念。 sosobtc官網也發布通告:於19日晚間23點關閉網站的行情數據、聊天社交等信息服務,APP也將於3日內關閉相關服務。

境外“|場外交易OTC新模式數字貨幣開發”強勢來襲

數字貨幣 點對點 場外交易 系統開發 境外“點對點|場外交易OTC新模式數字貨幣開發”強勢來襲 當下面臨著國家監管的政策,行情的變化,很多交易平臺紛紛關閉,傳統的撮合交易系統退出了歷史的舞臺,很多玩家現在出現了問題,自己的幣去哪交易呢?如過交易平臺關了就沒有市場進行交易了呀!針對這個問題出現

activeMQ

ssa oca exceptio pac 開啟事務 ive mes 啟動 cal 摘要: ActiveMQ 點對點消息 Point-to-Point 是一對一 創建消息生產者 /** * 點對點消息生產者 * * @author Edward * */

分析比特幣網絡:一種去中心化、的網絡架構

比特幣 區塊鏈 比特幣采用了基於互聯網的點對點(P2P:peer-to-peer)分布式網絡架構。比特幣網絡可以認為是按照比特幣P2P協議運行的一系列節點的集合。本文來分析下比特幣網絡,了解它跟傳統中心化網絡的區別,以及比特幣網絡是如何發現相鄰節點的。中心化網絡為了更好的理解P2P網絡,我們先來看看傳