java WebSocket客戶端
阿新 • • 發佈:2022-03-31
使用java中Java-WebSocket做客戶端
pom檔案
<!-- 實際使用包 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency> <!--websocket作為客戶端--> <dependency> <groupId>org.java-websocket</groupId> <artifactId>Java-WebSocket</artifactId> <version>1.3.5</version> </dependency>
客戶端實現
package com.xie.websocket; import com.alibaba.fastjson.JSONArray; import lombok.extern.slf4j.Slf4j; import javax.websocket.*; import java.io.IOException; /** * @Description WebSocket Client * @Date 2022-03-31 15:35 * @Author xie */ @Slf4j @ClientEndpoint() public class WebSocketClient { // 業務service private TestService testService = (TestService) ApplicationContextHandle.getBean(TestService.class); private WebSocketStart webSocketStart = (WebSocketStart) ApplicationContextHandle.getBean(WebSocketStart.class); @OnOpen public void onOpen(Session session) { log.info("客戶端建立連線......"); } @OnMessage public void onMessage(Session session, String message) { try { log.info("客戶端收到訊息:{}......", message); // 轉化為自己bean物件 TestData data = JSONArray.parseObject(message, TestData.class); // 處理資料 processData(data); } catch (Exception e) { e.printStackTrace(); } } @OnClose public void onClose() { log.info("與伺服器端斷開連線......"); try { log.info("開始嘗試重新連線......"); webSocketStart.start(); } catch (Exception e) { e.printStackTrace(); log.info("重新連線失敗,請檢查網路!"); } } private static void sendMsg(Session session, String msg) throws IOException { session.getBasicRemote().sendText(msg); } @OnError public void onError(Session session, Throwable error){ log.error("發生錯誤......"); error.printStackTrace(); } /** * 處理資料 */ private void processData(TestData data) { /** * 處理資料。。。。 */ testService.batchSaveTestData(data); } }
package com.xie.websocket; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.annotation.Configuration; /** * @Description: Application 上下文 **/ @Configuration public class ApplicationContextHandle implements ApplicationContextAware { private static ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { ApplicationContextHandle.applicationContext = applicationContext; } public static Object getBean(Class c) throws BeansException { return applicationContext.getBean(c); } // 獲取當前環境 public static String getActiveProfile() { return applicationContext.getEnvironment().getActiveProfiles()[0]; } }
維持監控連線
package com.xie.websocket;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.websocket.ContainerProvider;
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;
import java.io.IOException;
import java.net.URI;
/**
* @Description
* @Date 2022-03-31 15:43
* @Author xie
*/
@Slf4j
@Component
public class WebSocketStart {
@Value("${xx.xx.websocket.uri}")
private String uri;
private static Session session;
/**
* 訊息傳送事件
*/
private static long date;
private void connect() {
WebSocketContainer container = null;
try {
container = ContainerProvider.getWebSocketContainer();
URI r = URI.create(uri);
session = container.connectToServer(WebSocketClient.class, r);
} catch (Exception e) {
e.printStackTrace();
}
}
public void start() {
connect();
new Thread(new KeepAlive()).start();
try {
for (int i = 0; i < 5; i++) {
/**
*注意:此處對session做了同步處理,
* 因為下文中傳送心跳包也是用的此session,
* 不用synchronized做同步處理會報
* Exception in thread "Thread-5" java.lang.IllegalStateException: The remote endpoint was in state [TEXT_FULL_WRITING] which is an invalid state for called method
* 錯誤
*/
synchronized (WebSocketStart.session) {
WebSocketStart.session.getBasicRemote().sendText("javaclient");
}
date = System.currentTimeMillis();
Thread.sleep(30000);
}
} catch (Exception e) {
log.info("客戶端出錯......");
e.printStackTrace();
}
}
/**
* 內部類,用來客戶端給服務單傳送心跳包維持連線
*/
class KeepAlive implements Runnable {
@Override
public void run() {
while (true) {
if (System.currentTimeMillis() - date > 30000) {
try {
log.info("傳送心跳包......");
synchronized (WebSocketStart.session) {
WebSocketStart.session.getBasicRemote().sendText("keepalive");
}
date = System.currentTimeMillis();
} catch (IOException e) {
log.info("維持心跳包出錯......");
e.printStackTrace();
}
} else {
try {
Thread.sleep(30000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}