【轉載】java實現JT809協議資料對接客戶端
原文:
https://blog.csdn.net/fox_mt/article/details/52806560?utm_medium=distribute.pc_relevant.none-task-blog-searchFromBaidu-5.control&depth_1-utm_source=distribute.pc_relevant.none-task-blog-searchFromBaidu-5.control
最近使用JT809協議進行資料對接,遇到了不少問題,度娘谷歌都不好使,找不到很好的客戶端實現程式碼的例子,只能苦逼的自己悶頭弄,現在特意寫篇帖子,希望能幫助一些人
說正經的:
背景:跟某公司做資料對接,將本公司的一些資訊推送到接收端
要求:建立tcp連結,使用接收端提供的使用者名稱密碼等資訊 先登入,登入成功後推送資料,資料採用JT809標準協議
實現語言:java
下面介紹具體實現,包涵完整程式碼
在這之前,最好先下載jt809協議,研究研究,網上就有,我找到的裡面還有一些錯誤別,一度讓我覺得是個盜版貨
首先說下整體結構:一個tcp客戶端,一個Decoder,一個心跳Handler,一個處理反饋的handler,一個訊息物件
通過tcp客戶端連結制定ip地址和埠,進行登陸,心跳handler負責在傳送資料空閒時向目標伺服器傳送請求保持包;decoder負責解碼組裝物件,反饋handler負責處理收到的反饋資訊,訊息物件我就不多說了
下面看程式碼:
TCPclient:
由於時間緊迫,程式碼結構有些臃腫,我使用的tcpclient為兩個,一個是基類,一個子類,其實這兩個可以合併到一起,這裡有心的朋友自己去和,下面來看下tcpclient基類的程式碼,其中有一些模組的引用因為涉及到公司資訊,所以我刪除掉了,大家參考的時候,引用自己編寫的就可以了,
- //包路徑,自己填
- import java.net.InetSocketAddress;
- import java.util.concurrent.Executor;
- import java.util.concurrent.Executors;
- import org.jboss.netty.bootstrap.ClientBootstrap;
- import org.jboss.netty.buffer.ChannelBuffer;
- import org.jboss.netty.buffer.ChannelBuffers;
- import org.jboss.netty.channel.Channel;
- import org.jboss.netty.channel.ChannelFuture;
- import org.jboss.netty.channel.ChannelPipeline;
- import org.jboss.netty.channel.ChannelPipelineFactory;
- import org.jboss.netty.channel.Channels;
- import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
- import org.jboss.netty.handler.logging.LoggingHandler;
- import org.jboss.netty.handler.timeout.IdleStateHandler;
- import org.jboss.netty.logging.InternalLogLevel;
- import org.jboss.netty.util.HashedWheelTimer;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- public class TcpClient{
- private static final Logger LOG = LoggerFactory.getLogger(TcpClient.class);
- private static final int DEFAULT_PORT = 9000;
- private long connectTimeoutMillis = 3000;
- private int port = DEFAULT_PORT;
- private boolean tcpNoDelay = false;
- private boolean reuseAddress = true;
- private boolean keepAlive = true;
- private int workerCount = 4;
- private ClientBootstrap bootstrap = null;
- private static Channel channel = null;
- private Executor bossExecutor = Executors.newCachedThreadPool();
- private Executor workerExecutor = Executors.newCachedThreadPool();
- private static TcpClient instance = new TcpClient();
- private TcpClient() {
- init();
- }
- public static TcpClient getInstence(){
- return instance;
- }
- public void init() {
- bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(
- bossExecutor, workerExecutor, workerCount));
- bootstrap.setOption("tcpNoDelay", tcpNoDelay);
- bootstrap.setOption("connectTimeoutMillis", connectTimeoutMillis);
- bootstrap.setOption("reuseAddress", reuseAddress);
- bootstrap.setOption("keepAlive", keepAlive);
- }
- public Channel getChannel(String address, int port) {
- if (null == channel || !channel.isOpen()) {
- bootstrap.setOption("writeBufferHighWaterMark", 64 * 1024);
- bootstrap.setOption("writeBufferLowWaterMark", 32 * 1024);
- bootstrap.setPipelineFactory(new ChannelPipelineFactory(){
- public ChannelPipeline getPipeline() throws Exception {
- ChannelPipeline pipeline = Channels.pipeline();
- // pipeline.addLast("loging", new LoggingHandler(InternalLogLevel.ERROR)); 列印日誌資訊,上線穩定後可去掉
- pipeline.addLast("timeout", new IdleStateHandler(new HashedWheelTimer(), 10, 60, 0));//設定空閒心跳機制
- pipeline.addLast("heartbeat", new HeartBeatHandler());//心跳傳送包處理handler
- pipeline.addLast("decode", new Decoder());//解碼
- pipeline.addLast("loginHandler", new RecevieHandler());//反饋資料處理
- return pipeline;
- }
- });
- ChannelFuture future = bootstrap.connect(new InetSocketAddress(
- address, port));
- future.awaitUninterruptibly();
- if (future.isSuccess()) {
- channel = future.getChannel();
- } else {
- throw new FrameworkRuntimeException(future.getCause());
- }
- }
- return channel;
- }
- public long getConnectTimeoutMillis() {
- return connectTimeoutMillis;
- }
- public void setConnectTimeoutMillis(long connectTimeoutMillis) {
- this.connectTimeoutMillis = connectTimeoutMillis;
- }
- public int getPort() {
- return port;
- }
- public void setPort(int port) {
- this.port = port;
- }
- public boolean isTcpNoDelay() {
- return tcpNoDelay;
- }
- public void setTcpNoDelay(boolean tcpNoDelay) {
- this.tcpNoDelay = tcpNoDelay;
- }
- public boolean isReuseAddress() {
- return reuseAddress;
- }
- public void setReuseAddress(boolean reuseAddress) {
- this.reuseAddress = reuseAddress;
- }
- public boolean isKeepAlive() {
- return keepAlive;
- }
- public void setKeepAlive(boolean keepAlive) {
- this.keepAlive = keepAlive;
- }
- public int getWorkerCount() {
- return workerCount;
- }
- public void setWorkerCount(int workerCount) {
- this.workerCount = workerCount;
- }
- }
上面的配置,用過netty都不會陌生,就不多說了,下面看下子類的實現
- import java.nio.charset.Charset;
- import java.text.NumberFormat;
- import java.util.Calendar;
- import java.util.Date;
- import java.util.Properties;
- import org.apache.commons.lang3.StringUtils;
- import org.jboss.netty.buffer.ChannelBuffer;
- import org.jboss.netty.buffer.ChannelBuffers;
- import org.jboss.netty.channel.Channel;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- public class TcpClientDemo {
- private static Logger LOGGER = LoggerFactory.getLogger(TcpClientDemo.class);
- /**
- * 交委指定本公司接入碼
- */
- public static int PLANT_CODE;
- /**
- * 交委指定本公司使用者名稱
- */
- public static int COM_ID;
- /**
- * 交委指定本公司密碼
- */
- public static String COM_PWD;
- public static String LONGINSTATUS = "";
- public static String LOGINING = "logining";
- private static int LOGIN_FLAG = 0;
- private static String DOWN_LINK_IP = "127.0.0.1";
- //初始化基類
- private static TcpClient tcpClient = TcpClient.getInstence();
- //初始化
- private static TcpClientDemo tcpClientDemo = new TcpClientDemo();
- //初始化配置,將密碼相關的內容寫在裡面
- private static Properties property = PropertiesReader.getProperties("properName", true);
- //初始化channel,以便心跳機制及時登入
- private Channel channel = tcpClient.getChannel(Constants.TCP_ADDRESS, Constants.TCP_PORT);
- public static TcpClientDemo getInstance(){
- //加在車輛資料
- InitVehicleData.initVehicleMap();
- //獲取本機IP對應的使用者名稱密碼,IpUtils自己實現一個,就是獲取本地IP的,因為有的城市的交委會給每個伺服器一個賬號密碼
- String localIp = IpUtils.getLocalIp();
- if(StringUtils.isNotBlank(localIp)){
- String properties = property.getProperty(localIp);
- if(StringUtils.isNotBlank(localIp)){
- String[] pros = properties.split(":");
- PLANT_CODE = Integer.parseInt(pros[0]);
- COM_ID = Integer.parseInt(pros[1]);
- COM_PWD = pros[2];
- }
- }else{
- LOGGER.error("獲取本機IP異常");
- }
- return tcpClientDemo;
- }
- /**
- * 判斷是否登入
- * boolean
- * @return
- * 2016年10月12日 by fox_mt
- */
- public boolean isLogined(){
- return Constants.LOGIN_SUCCESS.equals(LONGINSTATUS);
- }
- /**
- * 登入交委接入平臺
- * boolean
- * @return
- * 2016年9月28日 by fox_mt
- */
- public boolean login2Gov(){
- boolean success = false;
- if(!Constants.LOGIN_SUCCESS.equals(LONGINSTATUS) && !LOGINING.equals(LONGINSTATUS)){
- //開始登入
- Message msg = new Message(JT809Constants.UP_CONNECT_REQ);
- ChannelBuffer buffer = ChannelBuffers.buffer(46);
- buffer.writeInt(COM_ID);//4
- byte[] pwd = getBytesWithLengthAfter(8, COM_PWD.getBytes());
- buffer.writeBytes(pwd);//8
- byte[] ip = getBytesWithLengthAfter(32, DOWN_LINK_IP.getBytes());
- buffer.writeBytes(ip);//32
- buffer.writeShort((short)Constants.TCP_RESULT_PORT);//2
- msg.setMsgBody(buffer);
- channel = tcpClient.getChannel(Constants.TCP_ADDRESS_FUJIAN, Constants.TCP_PORT_FUJIAN);
- channel.write(buildMessage(msg));
- LONGINSTATUS = LOGINING;
- }
- return success;
- }
- public static ChannelBuffer buildMessage(Message msg){
- int bodyLength = 0 ;
- if(null != msg.getMsgBody()){
- bodyLength = msg.getMsgBody().readableBytes();
- }
- msg.setMsgGesscenterId(PLANT_CODE);
- ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(bodyLength + Message.MSG_FIX_LENGTH);
- ChannelBuffer headBuffer = ChannelBuffers.buffer(22);
- //---資料頭
- headBuffer.writeInt(buffer.capacity() - 1);//4
- headBuffer.writeInt(msg.getMsgSn());//4
- headBuffer.writeShort((short)msg.getMsgId());//2
- headBuffer.writeInt(msg.getMsgGesscenterId());//4
- headBuffer.writeBytes(msg.getVersionFlag());//3
- headBuffer.writeByte(0);//1
- headBuffer.writeInt(10);//4
- buffer.writeBytes(headBuffer);
- //---資料體
- if(null != msg.getMsgBody()){
- buffer.writeBytes(msg.getMsgBody());
- }
- ChannelBuffer finalBuffer = ChannelBuffers.copiedBuffer(buffer);
- //--crc校驗碼
- byte[] b = ChannelBuffers.buffer(finalBuffer.readableBytes()).array();
- finalBuffer.getBytes(0, b);
- int crcValue = CRC16CCITT.crc16(b);
- finalBuffer.writeShort((short)crcValue);//2
- //轉義
- byte[] bytes = ChannelBuffers.copiedBuffer(finalBuffer).array();
- ChannelBuffer headFormatedBuffer = ChannelBuffers.dynamicBuffer(finalBuffer.readableBytes());
- formatBuffer(bytes, headFormatedBuffer);
- ChannelBuffer buffera = ChannelBuffers.buffer(headFormatedBuffer.readableBytes() + 2);
- buffera.writeByte(Message.MSG_HEAD);
- buffera.writeBytes(headFormatedBuffer);
- buffera.writeByte(Message.MSG_TALL);
- return ChannelBuffers.copiedBuffer(buffera);
- }
- /**
- * 傳送資料到交委接入平臺
- * boolean
- * @param awsVo
- * @return
- * 2016年9月28日 by fox_mt
- */
- public boolean sendMsg2Gov(Idc2AwsGpsVo awsVo){
- boolean success = false;
- if(isLogined()){
- //已經登入成功,開始傳送資料
- channel = tcpClient.getChannel(Constants.TCP_ADDRESS, Constants.TCP_PORT);
- if(null != channel && channel.isWritable()){
- Message msg = buildSendVO(awsVo);
- ChannelBuffer msgBuffer = buildMessage(msg);
- channel.write(msgBuffer);
- LOGGER.error("傳送--" + awsVo.getVehicleNo());
- }else{
- LONGINSTATUS = "";
- }
- }else if(LOGIN_FLAG == 0){
- LOGIN_FLAG ++;
- login2Gov();
- LOGGER.error("--------------第一次登入");
- }else{
- LOGGER.error("--------------等待登入");
- }
- return success;
- }
- /**
- * 轉換VO
- * void
- * @param awsVo
- * 2016年9月28日 by fox_mt
- */
- private Message buildSendVO(Idc2AwsGpsVo awsVo){
- Message msg = new Message(JT809Constants.UP_EXG_MSG);
- ChannelBuffer buffer = ChannelBuffers.buffer(36);
- //是否加密
- buffer.writeByte((byte)0);//0未加密 // 1
- //日月年dmyy
- Calendar cal = Calendar.getInstance();
- cal.setTime(new Date());
- buffer.writeByte((byte)cal.get(Calendar.DATE));
- buffer.writeByte((byte)(cal.get(Calendar.MONTH) + 1));
- String hexYear = "0" + Integer.toHexString(cal.get(Calendar.YEAR));
- buffer.writeBytes(hexStringToByte(hexYear));//4
- //時分秒
- buffer.writeByte((byte)cal.get(Calendar.HOUR_OF_DAY));
- buffer.writeByte((byte)cal.get(Calendar.MINUTE));
- buffer.writeByte((byte)cal.get(Calendar.SECOND));//3
- //經度,緯度
- buffer.writeInt(formatLonLat(awsVo.getLon()));//4
- buffer.writeInt(formatLonLat(awsVo.getLat()));//4
- //速度
- buffer.writeShort(awsVo.getSpeed().shortValue());//2
- //行駛記錄速度
- buffer.writeShort(awsVo.getSpeed().shortValue());//2
- //車輛當前總里程數
- buffer.writeInt(awsVo.getMileage().intValue());//4
- //方向
- buffer.writeShort(awsVo.getDirection().shortValue());//2
- //海拔
- buffer.writeShort((short)0);//2
- //車輛狀態
- int accStatus = null == awsVo.getAcc() ? 0 : awsVo.getAcc();
- int gpsStatus = null == awsVo.getGpsStatus() ? 0 : awsVo.getGpsStatus();
- if(accStatus == 0 && gpsStatus == 0){
- buffer.writeInt(0);//4
- }else if(accStatus == 1 && gpsStatus == 0){
- buffer.writeInt(1);//4
- }else if(accStatus == 0 && gpsStatus == 1){
- buffer.writeInt(2);//4
- }else{
- buffer.writeInt(3);//4
- }
- //報警狀態
- buffer.writeInt(0);//0表示正常;1表示報警//4
- ChannelBuffer headBuffer = ChannelBuffers.buffer(buffer.capacity() + 28);
- headBuffer.writeBytes(getBytesWithLengthAfter(21 , awsVo.getVehicleNo().getBytes(Charset.forName("GBK"))));//21
- headBuffer.writeByte((byte)InitVehicleData.vehicleColor(awsVo.getVehicleNo()));//1
- headBuffer.writeShort(JT809Constants.UP_EXG_MSG_REAL_LOCATION);//2
- headBuffer.writeInt(buffer.capacity());//4
- headBuffer.writeBytes(buffer);
- msg.setMsgBody(headBuffer);
- return msg;
- }
- /**
- * 報文轉義
- * void
- * @param bytes
- * @param formatBuffer
- * 2016年10月12日 by fox_mt
- */
- private static void formatBuffer(byte[] bytes, ChannelBuffer formatBuffer){
- for (byte b : bytes) {
- switch(b){
- case 0x5b:
- byte[] formatByte0x5b = new byte[2];
- formatByte0x5b[0] = 0x5a;
- formatByte0x5b[1] = 0x01;
- formatBuffer.writeBytes(formatByte0x5b);
- break;
- case 0x5a:
- byte[] formatByte0x5a = new byte[2];
- formatByte0x5a[0] = 0x5a;
- formatByte0x5a[1] = 0x02;
- formatBuffer.writeBytes(formatByte0x5a);
- break;
- case 0x5d:
- byte[] formatByte0x5d = new byte[2];
- formatByte0x5d[0] = 0x5e;
- formatByte0x5d[1] = 0x01;
- formatBuffer.writeBytes(formatByte0x5d);
- break;
- case 0x5e:
- byte[] formatByte0x5e = new byte[2];
- formatByte0x5e[0] = 0x5e;
- formatByte0x5e[1] = 0x02;
- formatBuffer.writeBytes(formatByte0x5e);
- break;
- default: formatBuffer.writeByte(b);break;
- }
- }
- }
- /**
- * 16進位制字串轉換成byte陣列
- * byte[]
- * @param hex
- * @return
- * 2016年10月12日 by fox_mt
- */
- public static byte[] hexStringToByte(String hex) {
- hex = hex.toUpperCase();
- int len = (hex.length() / 2);
- byte[] result = new byte[len];
- char[] achar = hex.toCharArray();
- for (int i = 0; i < len; i++) {
- int pos = i * 2;
- result[i] = (byte) (toByte(achar[pos]) << 4 | toByte(achar[pos + 1]));
- }
- return result;
- }
- private static byte toByte(char c) {
- byte b = (byte) "0123456789ABCDEF".indexOf(c);
- return b;
- }
- /**
- * 格式化經緯度,保留六位小數
- * int
- * @param needFormat
- * @return
- * 2016年10月12日 by fox_mt
- */
- private int formatLonLat(Double needFormat){
- NumberFormat numFormat = NumberFormat.getInstance();
- numFormat.setMaximumFractionDigits(6);
- numFormat.setGroupingUsed(false);
- String fristFromat = numFormat.format(needFormat);
- Double formatedDouble = Double.parseDouble(fristFromat);
- numFormat.setMaximumFractionDigits(0);
- String formatedValue = numFormat.format(formatedDouble * 1000000);
- return Integer.parseInt(formatedValue);
- }
- /**
- * 補全位數不夠的定長引數
- * byte[]
- * @param length
- * @param pwdByte
- * @return
- * 2016年10月12日 by fox_mt
- */
- private byte[] getBytesWithLengthAfter(int length, byte[] pwdByte){
- byte[] lengthByte = new byte[length];
- for(int i = 0; i < pwdByte.length; i ++){
- lengthByte[i] = pwdByte[i];
- }
- for (int i = 0; i < (length- pwdByte.length); i++) {
- lengthByte[pwdByte.length + i] = 0x00;
- }
- return lengthByte;
- }
- public static void main(String[] args) {
- TcpClientDemo s = TcpClientDemo.getInstance();
- Idc2AwsGpsVo awsVo = new Idc2AwsGpsVo();
- awsVo.setDirection(120);
- awsVo.setLon(117.2900911);
- awsVo.setLat(39.56362);
- awsVo.setSpeed(45D);
- awsVo.setMileage(10001D);
- awsVo.setVehicleNo("XXXXX");
- s.sendMsg2Gov(awsVo);
- try {
- Thread.sleep(20*1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- s.sendMsg2Gov(awsVo);
- }
- }
這裡其實也沒有太多好說的,整體說一下,先按照協議拼接資料,拼接完成後,計算CRC,進行轉義,由於協議規定開始標記為0x5b,結束標記為0x5d,資料中可能包涵這些標記,所以必須轉移,具體轉換規則,在協議中就能找到,有一點要注意,一定是先計算crc,再轉一,將否則無法通過crc校驗,我就是在這裡吃了虧,耗了一天,再別人的指點下才解決,協議中crc計算使用的是crc16-ccitt標準,這裡找了好多實現,每個計算結果都不一樣,我也沒有仔細研究,我用的是沒問題的,程式碼如下
- public class JT809Constants {
- public static int UP_CONNECT_REQ = 0x1001;//主鏈路登入請求訊息
- public static int UP_CONNECT_RSP = 0x1002;//主鏈路登入應答訊息
- public static int UP_CONNECT_RSP_SUCCESS = 0x00;//登入成功
- public static int UP_CONNECT_RSP_ERROR_01 = 0x01;//IP 地址不正確
- public static int UP_CONNECT_RSP_ERROR_02 = 0x02;//接入碼不正確
- public static int UP_CONNECT_RSP_ERROR_03 = 0x03;//使用者沒註冊
- public static int UP_CONNECT_RSP_ERROR_04 = 0x04;//密碼錯誤
- public static int UP_CONNECT_RSP_ERROR_05 = 0x05;//資源緊張,稍後再連線(已經佔 用);
- public static int UP_CONNECT_RSP_ERROR_06 = 0x06;//其他
- public static int UP_DICONNECE_REQ = 0x1003;//主鏈路登出請求訊息
- public static int UP_DISCONNECT_RSP = 0x1004;//主鏈路登出應答訊息
- public static int UP_LINKETEST_REQ = 0x1005;//主鏈路連線保持請求訊息
- public static int UP_LINKTEST_RSP = 0x1006;//主鏈路連線保持應答訊息
- public static int UP_DISCONNECT_INFORM = 0x1007;//主鏈路斷開通知訊息
- public static int UP_CLOSELINK_INFORM = 0x1008;//下級平臺主動關閉鏈路通 知訊息
- public static int DOWN_CONNECT_REQ = 0x9001;//從鏈路連線請求訊息
- public static int DOWN_CONNECT_RSP = 0x9002;//從鏈路連線應答訊息
- public static int DOWN_DISCONNECT_REQ = 0x9003;//從鏈路登出請求訊息
- public static int UP_EXG_MSG = 0x1200;//主鏈路動態資訊交換訊息
- public static int UP_EXG_MSG_REAL_LOCATION = 0x1202;//實時上傳車輛定位資訊
- public static int UP_EXG_MSG_HISTORY_LOCATION = 0x1203;//車輛定位資訊自動補報
- public static int DOWN_EXG_MSG = 0x9200;//從鏈路動態資訊交換訊息
- }
- import java.io.Serializable;
- import java.util.Arrays;
- import org.jboss.netty.buffer.ChannelBuffer;
- public class Message implements Serializable{
- private static final long serialVersionUID = 4398559115325723920L;
- public static final int MSG_HEAD = 0x5b;
- public static final int MSG_TALL = 0x5d;
- //報文中除資料體外,固定的資料長度
- public static final int MSG_FIX_LENGTH = 26;
- private static int internalMsgNo = 0;
- private long msgLength;
- private long encryptFlag=1;
- private int msgGesscenterId;
- private long encryptKey;
- private int crcCode;
- private int msgId;
- private int msgSn;
- private ChannelBuffer msgBody;
- private byte[] versionFlag = {0,0,1};
- //下行報文標識,值為1時,代表傳送的資料;預設為0,代表接收的報文
- // private int downFlag = 0;
- public Message(){}
- public Message(int msgId){
- //下行報文需要填充報文序列號
- synchronized((Integer)internalMsgNo) {
- if(internalMsgNo == Integer.MAX_VALUE){
- internalMsgNo = 0;
- }
- }
- this.msgSn = ++internalMsgNo;
- this.msgId = msgId;
- //this.downFlag = 1;
- }
- public static int getInternalMsgNo() {
- return internalMsgNo;
- }
- public static void setInternalMsgNo(int internalMsgNo) {
- Message.internalMsgNo = internalMsgNo;
- }
- public long getMsgLength() {
- return msgLength;
- }
- public void setMsgLength(long msgLength) {
- this.msgLength = msgLength;
- }
- public long getEncryptFlag() {
- return encryptFlag;
- }
- public void setEncryptFlag(long encryptFlag) {
- this.encryptFlag = encryptFlag;
- }
- public int getMsgGesscenterId() {
- return msgGesscenterId;
- }
- public void setMsgGesscenterId(int msgGesscenterId) {
- this.msgGesscenterId = msgGesscenterId;
- }
- public void setMsgGesscenterId(long msgGesscenterId) {
- this.msgGesscenterId = (int)msgGesscenterId;
- }
- public long getEncryptKey() {
- return encryptKey;
- }
- public void setEncryptKey(long encryptKey) {
- this.encryptKey = encryptKey;
- }
- public int getCrcCode() {
- return crcCode;
- }
- public void setCrcCode(int crcCode) {
- this.crcCode = crcCode;
- }
- public int getMsgId() {
- return msgId;
- }
- public void setMsgId(int msgId) {
- this.msgId = msgId;
- }
- public int getMsgSn() {
- return msgSn;
- }
- public void setMsgSn(int msgSn) {
- this.msgSn = msgSn;
- }
- public ChannelBuffer getMsgBody() {
- return msgBody;
- }
- public void setMsgBody(ChannelBuffer msgBody) {
- this.msgBody = msgBody;
- }
- public byte[] getVersionFlag() {
- return versionFlag;
- }
- public void setVersionFlag(byte[] versionFlag) {
- this.versionFlag = versionFlag;
- }
- public static int getMsgHead() {
- return MSG_HEAD;
- }
- public static int getMsgTall() {
- return MSG_TALL;
- }
- public static int getMsgFixLength() {
- return MSG_FIX_LENGTH;
- }
- public String toString() {
- return "Message [msgLength=" + msgLength + ", encryptFlag=" + encryptFlag + ", msgGesscenterId="
- + msgGesscenterId + ", encryptKey=" + encryptKey + ", crcCode=" + crcCode + ", msgId=" + msgId
- + ", msgSn=" + msgSn + ", msgBody=" + msgBody + ", versionFlag=" + Arrays.toString(versionFlag) + "]";
- }
- }
下面是Decoder
- import org.jboss.netty.buffer.ChannelBuffer;
- import org.jboss.netty.channel.Channel;
- import org.jboss.netty.channel.ChannelHandlerContext;
- import org.jboss.netty.handler.codec.frame.FrameDecoder;
- public class Decoder extends FrameDecoder {
- protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
- int head = buffer.getByte(0);
- int tail = buffer.getByte(buffer.capacity() - 1);
- if( !(head == Message.MSG_HEAD && tail == Message.MSG_TALL)){
- return null;
- }
- buffer.skipBytes(1);
- Message msg = this.buildMessage(buffer);
- return msg;
- }
- private Message buildMessage(ChannelBuffer buffer){
- Message msg = new Message();
- msg.setMsgLength(buffer.readUnsignedInt());
- msg.setMsgSn(buffer.readInt());//4byte
- msg.setMsgId(buffer.readUnsignedShort());//2byte
- msg.setMsgGesscenterId(buffer.readUnsignedInt());//4byte
- msg.setVersionFlag(buffer.readBytes(3).array());//3byte
- msg.setEncryptFlag(buffer.readUnsignedByte());//1byte
- msg.setEncryptKey(buffer.readUnsignedInt());//4byte
- if(buffer.readableBytes() >= 9){
- buffer.skipBytes(buffer.readableBytes() - 8);
- }
- ChannelBuffer bodyBytes = buffer.readBytes(buffer.readableBytes() -3);
- msg.setMsgBody(bodyBytes);
- msg.setCrcCode(buffer.readUnsignedShort());//2byte
- buffer.skipBytes(1);
- return msg;
- }
- }
這個decoder值得說一下,有一些朋友應該能看出來
- if(buffer.readableBytes() >= 9){
- buffer.skipBytes(buffer.readableBytes() - 8);
- }
這段程式碼很突兀,其實我也很鬱悶,登陸的時候收到的反饋資訊,資訊長度竟然不是一樣的,有的時候長度是32,有的時候長度是31,長度是32的時候,如果不跳過一位,那麼解析出來的反饋狀態碼就是錯誤的
下面是recevieHandler,
- import org.jboss.netty.buffer.ChannelBuffer;
- import org.jboss.netty.channel.ChannelHandlerContext;
- import org.jboss.netty.channel.MessageEvent;
- import org.jboss.netty.channel.SimpleChannelHandler;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- public class RecevieHandler extends SimpleChannelHandler{
- private static Logger LOG = LoggerFactory.getLogger(RecevieHandler.class);
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e){
- Message msg = (Message) e.getMessage();
- LOG.error("應答----------------" + "0x" + Integer.toHexString(msg.getMsgId()));
- if(msg.getMsgId() == JT809Constants.UP_CONNECT_RSP){
- ChannelBuffer msgBody = msg.getMsgBody();
- int result = msgBody.readByte();
- if(result == JT809Constants.UP_CONNECT_RSP_SUCCESS){
- TcpClientFuJian.LONGINSTATUS = Constants.LOGIN_SUCCESS;
- LOG.error("------------------登入成功");
- }else{
- LOG.error("------------------登入異常,請檢查" + "0x0" + Integer.toHexString(result));
- }
- }
- }
- }
根據JT809協議,應該是一個主鏈路,一個從鏈路,從鏈路用來接收下發的訊息,實際上不需要的,推送資料的,在主鏈路登入時會有反饋,從鏈路可以忽略,但是從鏈路的ip和埠,是必填引數,隨便給就可以了,
上面經過decoder之後得到Message,receiveHandler就是來判斷是否登陸,登陸成功的話,就可以傳送資料
這裡涉及登陸時機的問題,先看下心跳handler,我們再說登陸問題
HeartBeatHandler如下:
- import org.apache.commons.lang3.StringUtils;
- import org.jboss.netty.channel.ChannelHandlerContext;
- import org.jboss.netty.handler.timeout.IdleState;
- import org.jboss.netty.handler.timeout.IdleStateAwareChannelHandler;
- import org.jboss.netty.handler.timeout.IdleStateEvent;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import com.zuche.us.gpsidc2aws.constant.JT809Constants;
- import com.zuche.us.gpsidc2aws.tcpclient.vo.Message;
- public class HeartBeatHandler extends IdleStateAwareChannelHandler {
- private static Logger LOG = LoggerFactory.getLogger(HeartBeatHandler.class);
- public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) throws Exception{
- if(StringUtils.isBlank(TcpClientFuJian.LONGINSTATUS) || TcpClientFuJian.LOGINING.equals(TcpClientFuJian.LONGINSTATUS)){
- TcpClientFuJian.getInstance().login2FuJianGov();
- LOG.error("利用空閒心跳去登入------ 開始登入");
- }
- if(e.getState() == IdleState.WRITER_IDLE){
- LOG.error("鏈路空閒,傳送心跳!");
- Message msg = new Message(JT809Constants.UP_LINKETEST_REQ);
- e.getChannel().write(TcpClientFuJian.buildMessage(msg));
- super.channelIdle(ctx, e);
- }
- }
- }
心跳包中包含了登陸,這個怎麼說呢,沒想到什麼好辦法進行登陸失敗重登陸情況,因為一個不小心,可能就會一直在登陸
所以在傳送資料時如果發現沒登陸,那麼進行登陸,通過flag來判斷,如果登陸過了,那就不管了,成功就發資料,失敗就等著,等到心跳包傳送心跳時去檢測是否登陸成功了,沒成功的話讓心跳包去再登陸一次,這樣省了不少程式碼,就是肯能比較混亂,這個看個人想法了
只要能實現就行,
最後時crc16工具類
- public class CRC16CCITT {
- public static int crc16(byte[] bytes){
- int crc = 0xFFFF;
- for (int j = 0; j < bytes.length ; j++) {
- crc = ((crc >>> 8) | (crc << 8) )& 0xffff;
- crc ^= (bytes[j] & 0xff);//byte to int, trunc sign
- crc ^= ((crc & 0xff) >> 4);
- crc ^= (crc << 12) & 0xffff;
- crc ^= ((crc & 0xFF) << 5) & 0xffff;
- }
- crc &= 0xffff;
- return crc;
- }
- }
我用的是直接計算的方式,還有一種查表法,這裡就不說了,百度下應該能找到
到此,客戶端實現就完成了
不過由於我們是多臺伺服器推送資料,所以必須建立多個tcp連結,接收端之前沒這麼收過資料,中間經歷了一點兒小波折,後臺接收端根據我們的伺服器數量給開通了多個賬號,
多個賬號就涉及到哪臺伺服器用哪個賬號的問題,所以就有了上面程式碼中的配置檔案
檔案內容很簡單
ip=使用者名稱:介入碼:密碼
例如:
127.0.0.1=101:101:666666
為每臺伺服器制定好所使用的賬號;
另外車牌顏色,顏色的狀態需要使用JT/T415-2006標準協議,這個挺難找的,我直接把顏色和狀態碼貼出來,方便大家參考
JT/T415-2006
1:藍色
2:黃色
3:黑色
4:白色
9:其他
至此整個客戶端完滿,經過測試,完美上線,沒有出現問題,有一點稍微注意下,程式碼中傳送資料的時候,時間的小時的12進位制的,看接收端的要求,可以改成24小時制的
修改方法很簡單,將Calendar.HOUR 改成Calendar.HOUR_OF_DAY 就可以
本地開發的時候,可能不知道傳送的資料對不對,這個對應的接收端應該都有解析工具,可以要來自己去校驗
差不多了再進行聯調
也可以自己模擬一個簡單的接收端,來進行測試,這個程式碼晚上有現成的,這裡就不多說了。
希望能幫助大家,如果又問題,可以隨時聯絡我