1. 程式人生 > 實用技巧 >【轉載】java實現JT809協議資料對接客戶端

【轉載】java實現JT809協議資料對接客戶端

原文:

https://blog.csdn.net/fox_mt/article/details/52806560?utm_medium=distribute.pc_relevant.none-task-blog-searchFromBaidu-5.control&depth_1-utm_source=distribute.pc_relevant.none-task-blog-searchFromBaidu-5.control

最近使用JT809協議進行資料對接,遇到了不少問題,度娘谷歌都不好使,找不到很好的客戶端實現程式碼的例子,只能苦逼的自己悶頭弄,現在特意寫篇帖子,希望能幫助一些人

說正經的:

背景:跟某公司做資料對接,將本公司的一些資訊推送到接收端

要求:建立tcp連結,使用接收端提供的使用者名稱密碼等資訊 先登入,登入成功後推送資料,資料採用JT809標準協議

實現語言:java

下面介紹具體實現,包涵完整程式碼

在這之前,最好先下載jt809協議,研究研究,網上就有,我找到的裡面還有一些錯誤別,一度讓我覺得是個盜版貨

首先說下整體結構:一個tcp客戶端,一個Decoder,一個心跳Handler,一個處理反饋的handler,一個訊息物件

通過tcp客戶端連結制定ip地址和埠,進行登陸,心跳handler負責在傳送資料空閒時向目標伺服器傳送請求保持包;decoder負責解碼組裝物件,反饋handler負責處理收到的反饋資訊,訊息物件我就不多說了

下面看程式碼:

TCPclient:

由於時間緊迫,程式碼結構有些臃腫,我使用的tcpclient為兩個,一個是基類,一個子類,其實這兩個可以合併到一起,這裡有心的朋友自己去和,下面來看下tcpclient基類的程式碼,其中有一些模組的引用因為涉及到公司資訊,所以我刪除掉了,大家參考的時候,引用自己編寫的就可以了,

  1. //包路徑,自己填
  2. import java.net.InetSocketAddress;
  3. import java.util.concurrent.Executor;
  4. import java.util.concurrent.Executors;
  5. import org.jboss.netty.bootstrap.ClientBootstrap;
  6. import org.jboss.netty.buffer.ChannelBuffer;
  7. import org.jboss.netty.buffer.ChannelBuffers;
  8. import org.jboss.netty.channel.Channel;
  9. import org.jboss.netty.channel.ChannelFuture;
  10. import org.jboss.netty.channel.ChannelPipeline;
  11. import org.jboss.netty.channel.ChannelPipelineFactory;
  12. import org.jboss.netty.channel.Channels;
  13. import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
  14. import org.jboss.netty.handler.logging.LoggingHandler;
  15. import org.jboss.netty.handler.timeout.IdleStateHandler;
  16. import org.jboss.netty.logging.InternalLogLevel;
  17. import org.jboss.netty.util.HashedWheelTimer;
  18. import org.slf4j.Logger;
  19. import org.slf4j.LoggerFactory;
  20. public class TcpClient{
  21. private static final Logger LOG = LoggerFactory.getLogger(TcpClient.class);
  22. private static final int DEFAULT_PORT = 9000;
  23. private long connectTimeoutMillis = 3000;
  24. private int port = DEFAULT_PORT;
  25. private boolean tcpNoDelay = false;
  26. private boolean reuseAddress = true;
  27. private boolean keepAlive = true;
  28. private int workerCount = 4;
  29. private ClientBootstrap bootstrap = null;
  30. private static Channel channel = null;
  31. private Executor bossExecutor = Executors.newCachedThreadPool();
  32. private Executor workerExecutor = Executors.newCachedThreadPool();
  33. private static TcpClient instance = new TcpClient();
  34. private TcpClient() {
  35. init();
  36. }
  37. public static TcpClient getInstence(){
  38. return instance;
  39. }
  40. public void init() {
  41. bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(
  42. bossExecutor, workerExecutor, workerCount));
  43. bootstrap.setOption("tcpNoDelay", tcpNoDelay);
  44. bootstrap.setOption("connectTimeoutMillis", connectTimeoutMillis);
  45. bootstrap.setOption("reuseAddress", reuseAddress);
  46. bootstrap.setOption("keepAlive", keepAlive);
  47. }
  48. public Channel getChannel(String address, int port) {
  49. if (null == channel || !channel.isOpen()) {
  50. bootstrap.setOption("writeBufferHighWaterMark", 64 * 1024);
  51. bootstrap.setOption("writeBufferLowWaterMark", 32 * 1024);
  52. bootstrap.setPipelineFactory(new ChannelPipelineFactory(){
  53. @Override
  54. public ChannelPipeline getPipeline() throws Exception {
  55. ChannelPipeline pipeline = Channels.pipeline();
  56. // pipeline.addLast("loging", new LoggingHandler(InternalLogLevel.ERROR)); 列印日誌資訊,上線穩定後可去掉
  57. pipeline.addLast("timeout", new IdleStateHandler(new HashedWheelTimer(), 10, 60, 0));//設定空閒心跳機制
  58. pipeline.addLast("heartbeat", new HeartBeatHandler());//心跳傳送包處理handler
  59. pipeline.addLast("decode", new Decoder());//解碼
  60. pipeline.addLast("loginHandler", new RecevieHandler());//反饋資料處理
  61. return pipeline;
  62. }
  63. });
  64. ChannelFuture future = bootstrap.connect(new InetSocketAddress(
  65. address, port));
  66. future.awaitUninterruptibly();
  67. if (future.isSuccess()) {
  68. channel = future.getChannel();
  69. } else {
  70. throw new FrameworkRuntimeException(future.getCause());
  71. }
  72. }
  73. return channel;
  74. }
  75. public long getConnectTimeoutMillis() {
  76. return connectTimeoutMillis;
  77. }
  78. public void setConnectTimeoutMillis(long connectTimeoutMillis) {
  79. this.connectTimeoutMillis = connectTimeoutMillis;
  80. }
  81. public int getPort() {
  82. return port;
  83. }
  84. public void setPort(int port) {
  85. this.port = port;
  86. }
  87. public boolean isTcpNoDelay() {
  88. return tcpNoDelay;
  89. }
  90. public void setTcpNoDelay(boolean tcpNoDelay) {
  91. this.tcpNoDelay = tcpNoDelay;
  92. }
  93. public boolean isReuseAddress() {
  94. return reuseAddress;
  95. }
  96. public void setReuseAddress(boolean reuseAddress) {
  97. this.reuseAddress = reuseAddress;
  98. }
  99. public boolean isKeepAlive() {
  100. return keepAlive;
  101. }
  102. public void setKeepAlive(boolean keepAlive) {
  103. this.keepAlive = keepAlive;
  104. }
  105. public int getWorkerCount() {
  106. return workerCount;
  107. }
  108. public void setWorkerCount(int workerCount) {
  109. this.workerCount = workerCount;
  110. }
  111. }

上面的配置,用過netty都不會陌生,就不多說了,下面看下子類的實現

  1. import java.nio.charset.Charset;
  2. import java.text.NumberFormat;
  3. import java.util.Calendar;
  4. import java.util.Date;
  5. import java.util.Properties;
  6. import org.apache.commons.lang3.StringUtils;
  7. import org.jboss.netty.buffer.ChannelBuffer;
  8. import org.jboss.netty.buffer.ChannelBuffers;
  9. import org.jboss.netty.channel.Channel;
  10. import org.slf4j.Logger;
  11. import org.slf4j.LoggerFactory;
  12. public class TcpClientDemo {
  13. private static Logger LOGGER = LoggerFactory.getLogger(TcpClientDemo.class);
  14. /**
  15. * 交委指定本公司接入碼
  16. */
  17. public static int PLANT_CODE;
  18. /**
  19. * 交委指定本公司使用者名稱
  20. */
  21. public static int COM_ID;
  22. /**
  23. * 交委指定本公司密碼
  24. */
  25. public static String COM_PWD;
  26. public static String LONGINSTATUS = "";
  27. public static String LOGINING = "logining";
  28. private static int LOGIN_FLAG = 0;
  29. private static String DOWN_LINK_IP = "127.0.0.1";
  30. //初始化基類
  31. private static TcpClient tcpClient = TcpClient.getInstence();
  32. //初始化
  33. private static TcpClientDemo tcpClientDemo = new TcpClientDemo();
  34. //初始化配置,將密碼相關的內容寫在裡面
  35. private static Properties property = PropertiesReader.getProperties("properName", true);
  36. //初始化channel,以便心跳機制及時登入
  37. private Channel channel = tcpClient.getChannel(Constants.TCP_ADDRESS, Constants.TCP_PORT);
  38. public static TcpClientDemo getInstance(){
  39. //加在車輛資料
  40. InitVehicleData.initVehicleMap();
  41. //獲取本機IP對應的使用者名稱密碼,IpUtils自己實現一個,就是獲取本地IP的,因為有的城市的交委會給每個伺服器一個賬號密碼
  42. String localIp = IpUtils.getLocalIp();
  43. if(StringUtils.isNotBlank(localIp)){
  44. String properties = property.getProperty(localIp);
  45. if(StringUtils.isNotBlank(localIp)){
  46. String[] pros = properties.split(":");
  47. PLANT_CODE = Integer.parseInt(pros[0]);
  48. COM_ID = Integer.parseInt(pros[1]);
  49. COM_PWD = pros[2];
  50. }
  51. }else{
  52. LOGGER.error("獲取本機IP異常");
  53. }
  54. return tcpClientDemo;
  55. }
  56. /**
  57. * 判斷是否登入
  58. * boolean
  59. * @return
  60. * 2016年10月12日 by fox_mt
  61. */
  62. public boolean isLogined(){
  63. return Constants.LOGIN_SUCCESS.equals(LONGINSTATUS);
  64. }
  65. /**
  66. * 登入交委接入平臺
  67. * boolean
  68. * @return
  69. * 2016年9月28日 by fox_mt
  70. */
  71. public boolean login2Gov(){
  72. boolean success = false;
  73. if(!Constants.LOGIN_SUCCESS.equals(LONGINSTATUS) && !LOGINING.equals(LONGINSTATUS)){
  74. //開始登入
  75. Message msg = new Message(JT809Constants.UP_CONNECT_REQ);
  76. ChannelBuffer buffer = ChannelBuffers.buffer(46);
  77. buffer.writeInt(COM_ID);//4
  78. byte[] pwd = getBytesWithLengthAfter(8, COM_PWD.getBytes());
  79. buffer.writeBytes(pwd);//8
  80. byte[] ip = getBytesWithLengthAfter(32, DOWN_LINK_IP.getBytes());
  81. buffer.writeBytes(ip);//32
  82. buffer.writeShort((short)Constants.TCP_RESULT_PORT);//2
  83. msg.setMsgBody(buffer);
  84. channel = tcpClient.getChannel(Constants.TCP_ADDRESS_FUJIAN, Constants.TCP_PORT_FUJIAN);
  85. channel.write(buildMessage(msg));
  86. LONGINSTATUS = LOGINING;
  87. }
  88. return success;
  89. }
  90. public static ChannelBuffer buildMessage(Message msg){
  91. int bodyLength = 0 ;
  92. if(null != msg.getMsgBody()){
  93. bodyLength = msg.getMsgBody().readableBytes();
  94. }
  95. msg.setMsgGesscenterId(PLANT_CODE);
  96. ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(bodyLength + Message.MSG_FIX_LENGTH);
  97. ChannelBuffer headBuffer = ChannelBuffers.buffer(22);
  98. //---資料頭
  99. headBuffer.writeInt(buffer.capacity() - 1);//4
  100. headBuffer.writeInt(msg.getMsgSn());//4
  101. headBuffer.writeShort((short)msg.getMsgId());//2
  102. headBuffer.writeInt(msg.getMsgGesscenterId());//4
  103. headBuffer.writeBytes(msg.getVersionFlag());//3
  104. headBuffer.writeByte(0);//1
  105. headBuffer.writeInt(10);//4
  106. buffer.writeBytes(headBuffer);
  107. //---資料體
  108. if(null != msg.getMsgBody()){
  109. buffer.writeBytes(msg.getMsgBody());
  110. }
  111. ChannelBuffer finalBuffer = ChannelBuffers.copiedBuffer(buffer);
  112. //--crc校驗碼
  113. byte[] b = ChannelBuffers.buffer(finalBuffer.readableBytes()).array();
  114. finalBuffer.getBytes(0, b);
  115. int crcValue = CRC16CCITT.crc16(b);
  116. finalBuffer.writeShort((short)crcValue);//2
  117. //轉義
  118. byte[] bytes = ChannelBuffers.copiedBuffer(finalBuffer).array();
  119. ChannelBuffer headFormatedBuffer = ChannelBuffers.dynamicBuffer(finalBuffer.readableBytes());
  120. formatBuffer(bytes, headFormatedBuffer);
  121. ChannelBuffer buffera = ChannelBuffers.buffer(headFormatedBuffer.readableBytes() + 2);
  122. buffera.writeByte(Message.MSG_HEAD);
  123. buffera.writeBytes(headFormatedBuffer);
  124. buffera.writeByte(Message.MSG_TALL);
  125. return ChannelBuffers.copiedBuffer(buffera);
  126. }
  127. /**
  128. * 傳送資料到交委接入平臺
  129. * boolean
  130. * @param awsVo
  131. * @return
  132. * 2016年9月28日 by fox_mt
  133. */
  134. public boolean sendMsg2Gov(Idc2AwsGpsVo awsVo){
  135. boolean success = false;
  136. if(isLogined()){
  137. //已經登入成功,開始傳送資料
  138. channel = tcpClient.getChannel(Constants.TCP_ADDRESS, Constants.TCP_PORT);
  139. if(null != channel && channel.isWritable()){
  140. Message msg = buildSendVO(awsVo);
  141. ChannelBuffer msgBuffer = buildMessage(msg);
  142. channel.write(msgBuffer);
  143. LOGGER.error("傳送--" + awsVo.getVehicleNo());
  144. }else{
  145. LONGINSTATUS = "";
  146. }
  147. }else if(LOGIN_FLAG == 0){
  148. LOGIN_FLAG ++;
  149. login2Gov();
  150. LOGGER.error("--------------第一次登入");
  151. }else{
  152. LOGGER.error("--------------等待登入");
  153. }
  154. return success;
  155. }
  156. /**
  157. * 轉換VO
  158. * void
  159. * @param awsVo
  160. * 2016年9月28日 by fox_mt
  161. */
  162. private Message buildSendVO(Idc2AwsGpsVo awsVo){
  163. Message msg = new Message(JT809Constants.UP_EXG_MSG);
  164. ChannelBuffer buffer = ChannelBuffers.buffer(36);
  165. //是否加密
  166. buffer.writeByte((byte)0);//0未加密 // 1
  167. //日月年dmyy
  168. Calendar cal = Calendar.getInstance();
  169. cal.setTime(new Date());
  170. buffer.writeByte((byte)cal.get(Calendar.DATE));
  171. buffer.writeByte((byte)(cal.get(Calendar.MONTH) + 1));
  172. String hexYear = "0" + Integer.toHexString(cal.get(Calendar.YEAR));
  173. buffer.writeBytes(hexStringToByte(hexYear));//4
  174. //時分秒
  175. buffer.writeByte((byte)cal.get(Calendar.HOUR_OF_DAY));
  176. buffer.writeByte((byte)cal.get(Calendar.MINUTE));
  177. buffer.writeByte((byte)cal.get(Calendar.SECOND));//3
  178. //經度,緯度
  179. buffer.writeInt(formatLonLat(awsVo.getLon()));//4
  180. buffer.writeInt(formatLonLat(awsVo.getLat()));//4
  181. //速度
  182. buffer.writeShort(awsVo.getSpeed().shortValue());//2
  183. //行駛記錄速度
  184. buffer.writeShort(awsVo.getSpeed().shortValue());//2
  185. //車輛當前總里程數
  186. buffer.writeInt(awsVo.getMileage().intValue());//4
  187. //方向
  188. buffer.writeShort(awsVo.getDirection().shortValue());//2
  189. //海拔
  190. buffer.writeShort((short)0);//2
  191. //車輛狀態
  192. int accStatus = null == awsVo.getAcc() ? 0 : awsVo.getAcc();
  193. int gpsStatus = null == awsVo.getGpsStatus() ? 0 : awsVo.getGpsStatus();
  194. if(accStatus == 0 && gpsStatus == 0){
  195. buffer.writeInt(0);//4
  196. }else if(accStatus == 1 && gpsStatus == 0){
  197. buffer.writeInt(1);//4
  198. }else if(accStatus == 0 && gpsStatus == 1){
  199. buffer.writeInt(2);//4
  200. }else{
  201. buffer.writeInt(3);//4
  202. }
  203. //報警狀態
  204. buffer.writeInt(0);//0表示正常;1表示報警//4
  205. ChannelBuffer headBuffer = ChannelBuffers.buffer(buffer.capacity() + 28);
  206. headBuffer.writeBytes(getBytesWithLengthAfter(21 , awsVo.getVehicleNo().getBytes(Charset.forName("GBK"))));//21
  207. headBuffer.writeByte((byte)InitVehicleData.vehicleColor(awsVo.getVehicleNo()));//1
  208. headBuffer.writeShort(JT809Constants.UP_EXG_MSG_REAL_LOCATION);//2
  209. headBuffer.writeInt(buffer.capacity());//4
  210. headBuffer.writeBytes(buffer);
  211. msg.setMsgBody(headBuffer);
  212. return msg;
  213. }
  214. /**
  215. * 報文轉義
  216. * void
  217. * @param bytes
  218. * @param formatBuffer
  219. * 2016年10月12日 by fox_mt
  220. */
  221. private static void formatBuffer(byte[] bytes, ChannelBuffer formatBuffer){
  222. for (byte b : bytes) {
  223. switch(b){
  224. case 0x5b:
  225. byte[] formatByte0x5b = new byte[2];
  226. formatByte0x5b[0] = 0x5a;
  227. formatByte0x5b[1] = 0x01;
  228. formatBuffer.writeBytes(formatByte0x5b);
  229. break;
  230. case 0x5a:
  231. byte[] formatByte0x5a = new byte[2];
  232. formatByte0x5a[0] = 0x5a;
  233. formatByte0x5a[1] = 0x02;
  234. formatBuffer.writeBytes(formatByte0x5a);
  235. break;
  236. case 0x5d:
  237. byte[] formatByte0x5d = new byte[2];
  238. formatByte0x5d[0] = 0x5e;
  239. formatByte0x5d[1] = 0x01;
  240. formatBuffer.writeBytes(formatByte0x5d);
  241. break;
  242. case 0x5e:
  243. byte[] formatByte0x5e = new byte[2];
  244. formatByte0x5e[0] = 0x5e;
  245. formatByte0x5e[1] = 0x02;
  246. formatBuffer.writeBytes(formatByte0x5e);
  247. break;
  248. default: formatBuffer.writeByte(b);break;
  249. }
  250. }
  251. }
  252. /**
  253. * 16進位制字串轉換成byte陣列
  254. * byte[]
  255. * @param hex
  256. * @return
  257. * 2016年10月12日 by fox_mt
  258. */
  259. public static byte[] hexStringToByte(String hex) {
  260. hex = hex.toUpperCase();
  261. int len = (hex.length() / 2);
  262. byte[] result = new byte[len];
  263. char[] achar = hex.toCharArray();
  264. for (int i = 0; i < len; i++) {
  265. int pos = i * 2;
  266. result[i] = (byte) (toByte(achar[pos]) << 4 | toByte(achar[pos + 1]));
  267. }
  268. return result;
  269. }
  270. private static byte toByte(char c) {
  271. byte b = (byte) "0123456789ABCDEF".indexOf(c);
  272. return b;
  273. }
  274. /**
  275. * 格式化經緯度,保留六位小數
  276. * int
  277. * @param needFormat
  278. * @return
  279. * 2016年10月12日 by fox_mt
  280. */
  281. private int formatLonLat(Double needFormat){
  282. NumberFormat numFormat = NumberFormat.getInstance();
  283. numFormat.setMaximumFractionDigits(6);
  284. numFormat.setGroupingUsed(false);
  285. String fristFromat = numFormat.format(needFormat);
  286. Double formatedDouble = Double.parseDouble(fristFromat);
  287. numFormat.setMaximumFractionDigits(0);
  288. String formatedValue = numFormat.format(formatedDouble * 1000000);
  289. return Integer.parseInt(formatedValue);
  290. }
  291. /**
  292. * 補全位數不夠的定長引數
  293. * byte[]
  294. * @param length
  295. * @param pwdByte
  296. * @return
  297. * 2016年10月12日 by fox_mt
  298. */
  299. private byte[] getBytesWithLengthAfter(int length, byte[] pwdByte){
  300. byte[] lengthByte = new byte[length];
  301. for(int i = 0; i < pwdByte.length; i ++){
  302. lengthByte[i] = pwdByte[i];
  303. }
  304. for (int i = 0; i < (length- pwdByte.length); i++) {
  305. lengthByte[pwdByte.length + i] = 0x00;
  306. }
  307. return lengthByte;
  308. }
  309. public static void main(String[] args) {
  310. TcpClientDemo s = TcpClientDemo.getInstance();
  311. Idc2AwsGpsVo awsVo = new Idc2AwsGpsVo();
  312. awsVo.setDirection(120);
  313. awsVo.setLon(117.2900911);
  314. awsVo.setLat(39.56362);
  315. awsVo.setSpeed(45D);
  316. awsVo.setMileage(10001D);
  317. awsVo.setVehicleNo("XXXXX");
  318. s.sendMsg2Gov(awsVo);
  319. try {
  320. Thread.sleep(20*1000);
  321. } catch (InterruptedException e) {
  322. e.printStackTrace();
  323. }
  324. s.sendMsg2Gov(awsVo);
  325. }
  326. }


這裡其實也沒有太多好說的,整體說一下,先按照協議拼接資料,拼接完成後,計算CRC,進行轉義,由於協議規定開始標記為0x5b,結束標記為0x5d,資料中可能包涵這些標記,所以必須轉移,具體轉換規則,在協議中就能找到,有一點要注意,一定是先計算crc,再轉一,將否則無法通過crc校驗,我就是在這裡吃了虧,耗了一天,再別人的指點下才解決,協議中crc計算使用的是crc16-ccitt標準,這裡找了好多實現,每個計算結果都不一樣,我也沒有仔細研究,我用的是沒問題的,程式碼如下

  1. public class JT809Constants {
  2. public static int UP_CONNECT_REQ = 0x1001;//主鏈路登入請求訊息
  3. public static int UP_CONNECT_RSP = 0x1002;//主鏈路登入應答訊息
  4. public static int UP_CONNECT_RSP_SUCCESS = 0x00;//登入成功
  5. public static int UP_CONNECT_RSP_ERROR_01 = 0x01;//IP 地址不正確
  6. public static int UP_CONNECT_RSP_ERROR_02 = 0x02;//接入碼不正確
  7. public static int UP_CONNECT_RSP_ERROR_03 = 0x03;//使用者沒註冊
  8. public static int UP_CONNECT_RSP_ERROR_04 = 0x04;//密碼錯誤
  9. public static int UP_CONNECT_RSP_ERROR_05 = 0x05;//資源緊張,稍後再連線(已經佔 用);
  10. public static int UP_CONNECT_RSP_ERROR_06 = 0x06;//其他
  11. public static int UP_DICONNECE_REQ = 0x1003;//主鏈路登出請求訊息
  12. public static int UP_DISCONNECT_RSP = 0x1004;//主鏈路登出應答訊息
  13. public static int UP_LINKETEST_REQ = 0x1005;//主鏈路連線保持請求訊息
  14. public static int UP_LINKTEST_RSP = 0x1006;//主鏈路連線保持應答訊息
  15. public static int UP_DISCONNECT_INFORM = 0x1007;//主鏈路斷開通知訊息
  16. public static int UP_CLOSELINK_INFORM = 0x1008;//下級平臺主動關閉鏈路通 知訊息
  17. public static int DOWN_CONNECT_REQ = 0x9001;//從鏈路連線請求訊息
  18. public static int DOWN_CONNECT_RSP = 0x9002;//從鏈路連線應答訊息
  19. public static int DOWN_DISCONNECT_REQ = 0x9003;//從鏈路登出請求訊息
  20. public static int UP_EXG_MSG = 0x1200;//主鏈路動態資訊交換訊息
  21. public static int UP_EXG_MSG_REAL_LOCATION = 0x1202;//實時上傳車輛定位資訊
  22. public static int UP_EXG_MSG_HISTORY_LOCATION = 0x1203;//車輛定位資訊自動補報
  23. public static int DOWN_EXG_MSG = 0x9200;//從鏈路動態資訊交換訊息
  24. }
  25. import java.io.Serializable;
  26. import java.util.Arrays;
  27. import org.jboss.netty.buffer.ChannelBuffer;
  28. public class Message implements Serializable{
  29. private static final long serialVersionUID = 4398559115325723920L;
  30. public static final int MSG_HEAD = 0x5b;
  31. public static final int MSG_TALL = 0x5d;
  32. //報文中除資料體外,固定的資料長度
  33. public static final int MSG_FIX_LENGTH = 26;
  34. private static int internalMsgNo = 0;
  35. private long msgLength;
  36. private long encryptFlag=1;
  37. private int msgGesscenterId;
  38. private long encryptKey;
  39. private int crcCode;
  40. private int msgId;
  41. private int msgSn;
  42. private ChannelBuffer msgBody;
  43. private byte[] versionFlag = {0,0,1};
  44. //下行報文標識,值為1時,代表傳送的資料;預設為0,代表接收的報文
  45. // private int downFlag = 0;
  46. public Message(){}
  47. public Message(int msgId){
  48. //下行報文需要填充報文序列號
  49. synchronized((Integer)internalMsgNo) {
  50. if(internalMsgNo == Integer.MAX_VALUE){
  51. internalMsgNo = 0;
  52. }
  53. }
  54. this.msgSn = ++internalMsgNo;
  55. this.msgId = msgId;
  56. //this.downFlag = 1;
  57. }
  58. public static int getInternalMsgNo() {
  59. return internalMsgNo;
  60. }
  61. public static void setInternalMsgNo(int internalMsgNo) {
  62. Message.internalMsgNo = internalMsgNo;
  63. }
  64. public long getMsgLength() {
  65. return msgLength;
  66. }
  67. public void setMsgLength(long msgLength) {
  68. this.msgLength = msgLength;
  69. }
  70. public long getEncryptFlag() {
  71. return encryptFlag;
  72. }
  73. public void setEncryptFlag(long encryptFlag) {
  74. this.encryptFlag = encryptFlag;
  75. }
  76. public int getMsgGesscenterId() {
  77. return msgGesscenterId;
  78. }
  79. public void setMsgGesscenterId(int msgGesscenterId) {
  80. this.msgGesscenterId = msgGesscenterId;
  81. }
  82. public void setMsgGesscenterId(long msgGesscenterId) {
  83. this.msgGesscenterId = (int)msgGesscenterId;
  84. }
  85. public long getEncryptKey() {
  86. return encryptKey;
  87. }
  88. public void setEncryptKey(long encryptKey) {
  89. this.encryptKey = encryptKey;
  90. }
  91. public int getCrcCode() {
  92. return crcCode;
  93. }
  94. public void setCrcCode(int crcCode) {
  95. this.crcCode = crcCode;
  96. }
  97. public int getMsgId() {
  98. return msgId;
  99. }
  100. public void setMsgId(int msgId) {
  101. this.msgId = msgId;
  102. }
  103. public int getMsgSn() {
  104. return msgSn;
  105. }
  106. public void setMsgSn(int msgSn) {
  107. this.msgSn = msgSn;
  108. }
  109. public ChannelBuffer getMsgBody() {
  110. return msgBody;
  111. }
  112. public void setMsgBody(ChannelBuffer msgBody) {
  113. this.msgBody = msgBody;
  114. }
  115. public byte[] getVersionFlag() {
  116. return versionFlag;
  117. }
  118. public void setVersionFlag(byte[] versionFlag) {
  119. this.versionFlag = versionFlag;
  120. }
  121. public static int getMsgHead() {
  122. return MSG_HEAD;
  123. }
  124. public static int getMsgTall() {
  125. return MSG_TALL;
  126. }
  127. public static int getMsgFixLength() {
  128. return MSG_FIX_LENGTH;
  129. }
  130. @Override
  131. public String toString() {
  132. return "Message [msgLength=" + msgLength + ", encryptFlag=" + encryptFlag + ", msgGesscenterId="
  133. + msgGesscenterId + ", encryptKey=" + encryptKey + ", crcCode=" + crcCode + ", msgId=" + msgId
  134. + ", msgSn=" + msgSn + ", msgBody=" + msgBody + ", versionFlag=" + Arrays.toString(versionFlag) + "]";
  135. }
  136. }

下面是Decoder

  1. import org.jboss.netty.buffer.ChannelBuffer;
  2. import org.jboss.netty.channel.Channel;
  3. import org.jboss.netty.channel.ChannelHandlerContext;
  4. import org.jboss.netty.handler.codec.frame.FrameDecoder;
  5. public class Decoder extends FrameDecoder {
  6. @Override
  7. protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
  8. int head = buffer.getByte(0);
  9. int tail = buffer.getByte(buffer.capacity() - 1);
  10. if( !(head == Message.MSG_HEAD && tail == Message.MSG_TALL)){
  11. return null;
  12. }
  13. buffer.skipBytes(1);
  14. Message msg = this.buildMessage(buffer);
  15. return msg;
  16. }
  17. private Message buildMessage(ChannelBuffer buffer){
  18. Message msg = new Message();
  19. msg.setMsgLength(buffer.readUnsignedInt());
  20. msg.setMsgSn(buffer.readInt());//4byte
  21. msg.setMsgId(buffer.readUnsignedShort());//2byte
  22. msg.setMsgGesscenterId(buffer.readUnsignedInt());//4byte
  23. msg.setVersionFlag(buffer.readBytes(3).array());//3byte
  24. msg.setEncryptFlag(buffer.readUnsignedByte());//1byte
  25. msg.setEncryptKey(buffer.readUnsignedInt());//4byte
  26. if(buffer.readableBytes() >= 9){
  27. buffer.skipBytes(buffer.readableBytes() - 8);
  28. }
  29. ChannelBuffer bodyBytes = buffer.readBytes(buffer.readableBytes() -3);
  30. msg.setMsgBody(bodyBytes);
  31. msg.setCrcCode(buffer.readUnsignedShort());//2byte
  32. buffer.skipBytes(1);
  33. return msg;
  34. }
  35. }

這個decoder值得說一下,有一些朋友應該能看出來

  1. if(buffer.readableBytes() >= 9){
  2. buffer.skipBytes(buffer.readableBytes() - 8);
  3. }

這段程式碼很突兀,其實我也很鬱悶,登陸的時候收到的反饋資訊,資訊長度竟然不是一樣的,有的時候長度是32,有的時候長度是31,長度是32的時候,如果不跳過一位,那麼解析出來的反饋狀態碼就是錯誤的

下面是recevieHandler,

  1. import org.jboss.netty.buffer.ChannelBuffer;
  2. import org.jboss.netty.channel.ChannelHandlerContext;
  3. import org.jboss.netty.channel.MessageEvent;
  4. import org.jboss.netty.channel.SimpleChannelHandler;
  5. import org.slf4j.Logger;
  6. import org.slf4j.LoggerFactory;
  7. public class RecevieHandler extends SimpleChannelHandler{
  8. private static Logger LOG = LoggerFactory.getLogger(RecevieHandler.class);
  9. @Override
  10. public void messageReceived(ChannelHandlerContext ctx, MessageEvent e){
  11. Message msg = (Message) e.getMessage();
  12. LOG.error("應答----------------" + "0x" + Integer.toHexString(msg.getMsgId()));
  13. if(msg.getMsgId() == JT809Constants.UP_CONNECT_RSP){
  14. ChannelBuffer msgBody = msg.getMsgBody();
  15. int result = msgBody.readByte();
  16. if(result == JT809Constants.UP_CONNECT_RSP_SUCCESS){
  17. TcpClientFuJian.LONGINSTATUS = Constants.LOGIN_SUCCESS;
  18. LOG.error("------------------登入成功");
  19. }else{
  20. LOG.error("------------------登入異常,請檢查" + "0x0" + Integer.toHexString(result));
  21. }
  22. }
  23. }
  24. }


根據JT809協議,應該是一個主鏈路,一個從鏈路,從鏈路用來接收下發的訊息,實際上不需要的,推送資料的,在主鏈路登入時會有反饋,從鏈路可以忽略,但是從鏈路的ip和埠,是必填引數,隨便給就可以了,

上面經過decoder之後得到Message,receiveHandler就是來判斷是否登陸,登陸成功的話,就可以傳送資料

這裡涉及登陸時機的問題,先看下心跳handler,我們再說登陸問題

HeartBeatHandler如下:

  1. import org.apache.commons.lang3.StringUtils;
  2. import org.jboss.netty.channel.ChannelHandlerContext;
  3. import org.jboss.netty.handler.timeout.IdleState;
  4. import org.jboss.netty.handler.timeout.IdleStateAwareChannelHandler;
  5. import org.jboss.netty.handler.timeout.IdleStateEvent;
  6. import org.slf4j.Logger;
  7. import org.slf4j.LoggerFactory;
  8. import com.zuche.us.gpsidc2aws.constant.JT809Constants;
  9. import com.zuche.us.gpsidc2aws.tcpclient.vo.Message;
  10. public class HeartBeatHandler extends IdleStateAwareChannelHandler {
  11. private static Logger LOG = LoggerFactory.getLogger(HeartBeatHandler.class);
  12. @Override
  13. public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) throws Exception{
  14. if(StringUtils.isBlank(TcpClientFuJian.LONGINSTATUS) || TcpClientFuJian.LOGINING.equals(TcpClientFuJian.LONGINSTATUS)){
  15. TcpClientFuJian.getInstance().login2FuJianGov();
  16. LOG.error("利用空閒心跳去登入------ 開始登入");
  17. }
  18. if(e.getState() == IdleState.WRITER_IDLE){
  19. LOG.error("鏈路空閒,傳送心跳!");
  20. Message msg = new Message(JT809Constants.UP_LINKETEST_REQ);
  21. e.getChannel().write(TcpClientFuJian.buildMessage(msg));
  22. super.channelIdle(ctx, e);
  23. }
  24. }
  25. }

心跳包中包含了登陸,這個怎麼說呢,沒想到什麼好辦法進行登陸失敗重登陸情況,因為一個不小心,可能就會一直在登陸

所以在傳送資料時如果發現沒登陸,那麼進行登陸,通過flag來判斷,如果登陸過了,那就不管了,成功就發資料,失敗就等著,等到心跳包傳送心跳時去檢測是否登陸成功了,沒成功的話讓心跳包去再登陸一次,這樣省了不少程式碼,就是肯能比較混亂,這個看個人想法了

只要能實現就行,

最後時crc16工具類

  1. public class CRC16CCITT {
  2. public static int crc16(byte[] bytes){
  3. int crc = 0xFFFF;
  4. for (int j = 0; j < bytes.length ; j++) {
  5. crc = ((crc >>> 8) | (crc << 8) )& 0xffff;
  6. crc ^= (bytes[j] & 0xff);//byte to int, trunc sign
  7. crc ^= ((crc & 0xff) >> 4);
  8. crc ^= (crc << 12) & 0xffff;
  9. crc ^= ((crc & 0xFF) << 5) & 0xffff;
  10. }
  11. crc &= 0xffff;
  12. return crc;
  13. }
  14. }

我用的是直接計算的方式,還有一種查表法,這裡就不說了,百度下應該能找到

到此,客戶端實現就完成了

不過由於我們是多臺伺服器推送資料,所以必須建立多個tcp連結,接收端之前沒這麼收過資料,中間經歷了一點兒小波折,後臺接收端根據我們的伺服器數量給開通了多個賬號,

多個賬號就涉及到哪臺伺服器用哪個賬號的問題,所以就有了上面程式碼中的配置檔案

檔案內容很簡單

ip=使用者名稱:介入碼:密碼

例如:

127.0.0.1=101:101:666666

為每臺伺服器制定好所使用的賬號;

另外車牌顏色,顏色的狀態需要使用JT/T415-2006標準協議,這個挺難找的,我直接把顏色和狀態碼貼出來,方便大家參考

JT/T415-2006
1:藍色
2:黃色
3:黑色
4:白色
9:其他

至此整個客戶端完滿,經過測試,完美上線,沒有出現問題,有一點稍微注意下,程式碼中傳送資料的時候,時間的小時的12進位制的,看接收端的要求,可以改成24小時制的

修改方法很簡單,將Calendar.HOUR 改成Calendar.HOUR_OF_DAY 就可以

本地開發的時候,可能不知道傳送的資料對不對,這個對應的接收端應該都有解析工具,可以要來自己去校驗

差不多了再進行聯調

也可以自己模擬一個簡單的接收端,來進行測試,這個程式碼晚上有現成的,這裡就不多說了。

希望能幫助大家,如果又問題,可以隨時聯絡我