springboot2.0 websocket連線和叢集
1. 引入websocket的starter
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId >
</dependency>
這裡引入rabbitmq的ampq starter是為了websocket服務的叢集,叢集之間主要通過rabbitmq來通訊。
2. 配置websocket的Exporter
包含一個非同步執行緒池,用於非同步處理rabbitmq訊息
@Configuration
public class WebSocketConfig {
@Bean
public ServerEndpointExporter serverEndpointExporter(){
return new ServerEndpointExporter();
}
@Bean ("taskExecutor")
public Executor taskExecutor(){
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);//執行緒池關閉的時候等待所有任務都完成再繼續銷燬其他的Bean
taskExecutor.setAwaitTerminationSeconds(60);//設定執行緒池中任務的等待時間,如果超過這個時候還沒有銷燬就強制銷燬,以確保應用最後能夠被關閉,而不是阻塞住
taskExecutor.setCorePoolSize(4);
taskExecutor.setMaxPoolSize(8);
taskExecutor.setQueueCapacity(Integer.MAX_VALUE);//緩衝執行任務的佇列
taskExecutor.setKeepAliveSeconds(60);//當超過了核心執行緒出之外的執行緒在空閒時間到達之後會被銷燬
taskExecutor.setThreadNamePrefix("kanjia-websocket-thread-");
return taskExecutor;
}
}
websocket的rabbitmq叢集配置檔案
websocket.rabbitmq.queue.kanjia.routing-key=kanjia.#
#每個叢集伺服器監聽一個有UUID的佇列
websocket.rabbitmq.queue.kanjia.name=websocket.kanjia.${random.uuid}
websocket.rabbitmq.exchange.topic=websocket.topic
配置rabbitmq的yml
spring:
rabbitmq:
host: ${rabbitmq.host}
port: ${rabbitmq.port}
username: ${rabbitmq.i5x.username}
password: ${rabbitmq.i5x.password}
virtual-host: ${rabbitmq.i5x.vhost}
3. 配置websocket的endpoint
@Controller
@ServerEndpoint("/ws/{biz}/{key}")
public class WebSocket {
private static Logger log = LoggerFactory.getLogger(WebSocket.class);
private static ConcurrentMap <String, CopyOnWriteArrayList<WebSocket>> webSocketMap = new ConcurrentHashMap<String, CopyOnWriteArrayList<WebSocket>>();
private Session session;
private String address;//訊息的地址:業務區分.key,例如duorendiancai.4991_123
//WebSocket和RabbitMq的訊息互通bridge
private WebSocketRabbitMqBridge webSocketRabbitMqBridge = SpringContext.getBean(WebSocketRabbitMqBridge.class);
@OnOpen
public void onOpen(@PathParam("biz") String biz, @PathParam("key") String key, Session session) {
//引數合法性check
if (StringUtils.isBlank(biz) || StringUtils.isBlank(key)) {
String msg = "websocket連線引數不合法,biz=" + biz + ",key=" + key;
log.error(msg);
try {
session.close(new CloseReason(CloseReason.CloseCodes.CANNOT_ACCEPT, msg));
} catch (IOException e) {
log.error("關閉websocket連線異常:" + e.toString());
}
}
//儲存websocket連線
this.address = biz + "." + key;
this.session = session;
//按address區分websocket的session連線
CopyOnWriteArrayList<WebSocket> webSocketList = webSocketMap.get(address);
if (webSocketList == null) {
CopyOnWriteArrayList<WebSocket> tempList = new CopyOnWriteArrayList<WebSocket>();
tempList.add(this);
webSocketMap.put(address, tempList);
} else {
webSocketList.add(this);
}
}
@OnMessage
public void onMessage(String message, Session session) {
try {
//message合法性check
JSONObject msgJson = JSON.parseObject(message);
if (msgJson != null) {
msgJson.put(KanjiaMsgHandler.ADDRESS_KEY_MQ_MSG, this.address);
//傳送mq訊息
webSocketRabbitMqBridge.sendRabbitMqMsg(this.address, msgJson.toJSONString());
log.info("server receive msg:address=" + this.address);
} else {
log.error("server receive msg=" + message + ",address=" + this.address);
return;
}
} catch (Exception e) {
log.error("處理接收到的資訊異常:" + e.toString() + ",msg=" + message + ",address=" + this.address);
}
}
@OnClose
public void onClose(Session session, CloseReason closeReason) {
CopyOnWriteArrayList<WebSocket> webSocketList = webSocketMap.get(this.address);
if (webSocketList != null) {
webSocketList.remove(this);
if (webSocketList.isEmpty()) {
webSocketMap.remove(this.address);
}
}
log.info("onClose: address=" + this.address + ",id=" + session.getId() + ",reason=" + closeReason.getReasonPhrase() );
}
//連線錯誤時執行
@OnError
public void onError(Throwable t) {
log.error("websocket onError:" + t.toString());
}
/**
* publish <br/>
* 廣播訊息 <br/>
*
* @author Mobile Web Group-lff
* @date 2018年2月11日 上午10:12:27
*
* @param address
* @param message
* @return void
*/
public static void publish(String address, String message){
CopyOnWriteArrayList<WebSocket> webSocketList = webSocketMap.get(address);
if (webSocketList == null) {
return;
}
for (WebSocket webSocket : webSocketList) {
try {
//傳送訊息
webSocket.session.getBasicRemote().sendText(message);
} catch (IOException e) {
//輸出log,繼續下一個webSocket的msg傳送
log.error("傳送訊息失敗:原因=" + e.toString() + ",id=" + webSocket.session.getId() + ",msg=" + message);
}
}
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((this.session == null) ? 0 : this.session.getId().hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (!(obj instanceof WebSocket)) {
return false;
}
WebSocket other = (WebSocket) obj;
if (this.session == null) {
if (other.session != null) {
return false;
}
} else if (!this.session.getId().equals(other.session.getId())) {
return false;
}
return true;
}
onOpen:連線成功的回撥函式。安照url連線中的引數,拼成一個websocket的地址address,可以代表著一個聊天室或者一個群聊。利用這個address來轉發publish訊息。
onMessage:接收到訊息的回撥函式。這裡為了多伺服器部署時的叢集,通過rabbitmq來過渡訊息。接收到websocket訊息後,通過rabbitmq轉發給所有的websocket的伺服器。
onClose:連線關閉的回撥函式。
onError:錯誤回撥。
hashCode/equals:為了定位session必須要重寫。
自定義的publish方法:根據websocket的地址address轉發訊息。
4. websocket和rabbitmq訊息過渡的橋樑
用來發送websocket和rabbitmq訊息
@Component
public class WebSocketRabbitMqBridge {
//websocket和rabbitmq傳遞訊息的exchange
@Value("${websocket.rabbitmq.exchange.topic}")
private String websocket_rabbitmq_exchange_topic;
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* sendRabbitMqMsg <br/>
* 將websocket訊息轉發到rabbitmq上,進行服務間傳播 <br/>
*
* @author Mobile Web Group-lff
* @date 2018年2月12日 上午9:02:19
*
* @param exchange
* @param routingKey
* @param object
* @return void
*/
public void sendRabbitMqMsg(String routingKey, final Object object) {
rabbitTemplate.convertAndSend(websocket_rabbitmq_exchange_topic, routingKey, object);
}
/**
* publish <br/>
* 廣播websocket訊息 <br/>
*
* @author Mobile Web Group-lff
* @date 2018年2月11日 下午3:09:21
*
* @param address
* @param msg
* @return void
*/
public void publishWebSocketMsg(String address, String msg) {
WebSocket.publish(address, msg);
}
}
5. 監聽處理rabbitmq訊息
建立一個含有uuid的佇列並監聽,通過非同步任務處理訊息
@Component
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "${websocket.rabbitmq.queue.kanjia.name}", autoDelete = "true"),
exchange = @Exchange(value = "${websocket.rabbitmq.exchange.topic}", type = ExchangeTypes.TOPIC, durable="true"),
key = "${websocket.rabbitmq.queue.kanjia.routing-key}")
)
public class RabbitMqKanJiaMsgReceiver {
@Autowired
private KanjiaMsgHandler kanjiaMsgHandler;
/**
* processMsg <br/>
* 處理mq訊息,訊息格式: <br/>
*
* @author Mobile Web Group-lff
* @date 2018年2月11日 下午3:11:04
*
* @param msg
* @return void
*/
@RabbitHandler
public void processMsg(String msg) {
kanjiaMsgHandler.handler(msg);
}
}
非同步任務類
@Component
public class KanjiaMsgHandler {
//mq訊息中的address key,代表websocket的地址
public static final String ADDRESS_KEY_MQ_MSG = "_address";
private static Logger log = LoggerFactory.getLogger(KanjiaMsgHandler.class);
//WebSocket和RabbitMq的訊息互通bridge
@Autowired
private WebSocketRabbitMqBridge webSocketRabbitMqBridge;
@Async("taskExecutor")
public void handler(String msg) {
if (StringUtils.isBlank(msg)) {
//空msg
return;
}
//msg格式:{_address:'kanjia.1234567', type:'barrage', body:{nickname:,photoImg:,placeholder:}}
try {
JSONObject msgJson = JSON.parseObject(msg);
String address = msgJson.getString(ADDRESS_KEY_MQ_MSG);
webSocketRabbitMqBridge.publishWebSocketMsg(address, msg);
log.info("server send msg:address=" + address);
} catch (Exception e) {
log.error("訊息處理異常:" + e.toString() + ",msg=" + msg);
}
}
}
6. 其它設定的程式碼
由於用到了非同步任務,需要通過@EnableAsync開啟非同步功能
@SpringCloudApplication
@EnableAsync
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
另外,由於@ServerEndpoint中不能用@Autowired等注入方式注入其它Bean,所以需要通過ApplicationContex顯示獲取其它Bean,SpringContext的util類
@Component
public class SpringContext implements ApplicationContextAware {
private static Logger log = LoggerFactory.getLogger(SpringContext.class);
private static ApplicationContext applicationContext;
/**
* Description: 獲取bean的管理器
* @author: Mobile Web Group-lff
* @date: 2018年2月11日 上午10:52:58
*
* @see org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext)
*/
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
log.info("setApplicationContext,applicationContext=" + applicationContext);
SpringContext.applicationContext = applicationContext;
}
//獲取applicationContext
private static ApplicationContext getApplicationContext() {
return applicationContext;
}
//通過name獲取 Bean.
public static Object getBean(String name){
return getApplicationContext().getBean(name);
}
//通過class獲取Bean.
public static <T> T getBean(Class<T> clazz){
return getApplicationContext().getBean(clazz);
}
//通過name,以及Clazz返回指定的Bean
public static <T> T getBean(String name,Class<T> clazz){
return getApplicationContext().getBean(name, clazz);
}
}
另外需要注意:
1. @ServerEndpoint類上不加@Controller的話,websocket無法連線
2. websocket的Session預設沒有超時時間session.getMaxIdleTimeout()=0;如果發現過一段時間後,websocket連線會自動斷掉,應該是nginx的配置proxy_read_timeout 90;proxy_send_timeout 90;