1. 程式人生 > 程式設計 >基於springboot 長輪詢的實現操作

基於springboot 長輪詢的實現操作

springboot 長輪詢實現

基於 @EnableAsync,@Sync

@SpringBootApplication
@EnableAsync
public class DemoApplication {
 public static void main(String[] args) {
 SpringApplication.run(DemoApplication.class,args);
 }
}
@RequestMapping("/async")
@RestController
public class AsyncRequestDemo {
 @Autowired
 private AsyncRequestService asyncRequestService;
 @GetMapping("/value")
 public String getValue() {
 String msg = null;
 Future<String> result = null;
 try{
  result = asyncRequestService.getValue();
  msg = result.get(10,TimeUnit.SECONDS);
 }catch (Exception e){
  e.printStackTrace();
 }finally {
  if (result != null){
  result.cancel(true);
  }
 }
 return msg;
 }
 @PostMapping("/value")
 public void postValue(String msg) {
 asyncRequestService.postValue(msg);
 }
}
@Service
public class AsyncRequestService {
 private String msg = null;
 @Async
 public Future<String> getValue() throws InterruptedException {
 while (true){
  synchronized (this){
  if (msg != null){
   String resultMsg = msg;
   msg = null;
   return new AsyncResult(resultMsg);
  }
  }
  Thread.sleep(100);
 }
 }
 public synchronized void postValue(String msg) {
 this.msg = msg;
 }
}

備註

@EnableAsync 開啟非同步

@Sync 標記非同步方法

Future 用於接收非同步返回值

result.get(10,TimeUnit.SECONDS); 阻塞,超時獲取結果

Future.cancel() 中斷執行緒

補充:通過spring提供的DeferredResult實現長輪詢服務端推送訊息

DeferredResult字面意思就是推遲結果,是在servlet3.0以後引入了非同步請求之後,spring封裝了一下提供了相應的支援,也是一個很老的特性了。DeferredResult可以允許容器執行緒快速釋放以便可以接受更多的請求提升吞吐量,讓真正的業務邏輯在其他的工作執行緒中去完成。

最近再看apollo配置中心的實現原理,apollo的釋出配置推送變更訊息就是用DeferredResult實現的,apollo客戶端會像服務端傳送長輪訓http請求,超時時間60秒,當超時後返回客戶端一個304 httpstatus,表明配置沒有變更,客戶端繼續這個步驟重複發起請求,當有釋出配置的時候,服務端會呼叫DeferredResult.setResult返回200狀態碼,然後輪訓請求會立即返回(不會超時),客戶端收到響應結果後,會發起請求獲取變更後的配置資訊。

下面我們自己寫一個簡單的demo來演示這個過程

springboot啟動類:

@SpringBootApplication
public class DemoApplication implements WebMvcConfigurer { 
 public static void main(String[] args) {
 SpringApplication.run(DemoApplication.class,args);
 } 
 
 @Bean
 public ThreadPoolTaskExecutor mvcTaskExecutor() {
 ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
 executor.setCorePoolSize(10);
 executor.setQueueCapacity(100);
 executor.setMaxPoolSize(25);
 return executor;
 
 }
 
 //配置非同步支援,設定了一個用來非同步執行業務邏輯的工作執行緒池,設定了預設的超時時間是60秒
 @Override
 public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
 configurer.setTaskExecutor(mvcTaskExecutor());
 configurer.setDefaultTimeout(60000L);
 }
}
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult; 
import java.util.Collection;
 
@RestController
public class ApolloController {
 private final Logger logger = LoggerFactory.getLogger(this.getClass());
 
 //guava中的Multimap,多值map,對map的增強,一個key可以保持多個value
 private Multimap<String,DeferredResult<String>> watchRequests = Multimaps.synchronizedSetMultimap(HashMultimap.create());
 
 
 //模擬長輪詢
 @RequestMapping(value = "/watch/{namespace}",method = RequestMethod.GET,produces = "text/html")
 public DeferredResult<String> watch(@PathVariable("namespace") String namespace) {
 logger.info("Request received");
 DeferredResult<String> deferredResult = new DeferredResult<>();
 //當deferredResult完成時(不論是超時還是異常還是正常完成),移除watchRequests中相應的watch key
 deferredResult.onCompletion(new Runnable() {
  @Override
  public void run() {
  System.out.println("remove key:" + namespace);
  watchRequests.remove(namespace,deferredResult);
  }
 });
 watchRequests.put(namespace,deferredResult);
 logger.info("Servlet thread released");
 return deferredResult;
 
 
 }
 
 //模擬釋出namespace配置
 @RequestMapping(value = "/publish/{namespace}",produces = "text/html")
 public Object publishConfig(@PathVariable("namespace") String namespace) {
 if (watchRequests.containsKey(namespace)) {
  Collection<DeferredResult<String>> deferredResults = watchRequests.get(namespace);
  Long time = System.currentTimeMillis();
  //通知所有watch這個namespace變更的長輪訓配置變更結果
  for (DeferredResult<String> deferredResult : deferredResults) {
  deferredResult.setResult(namespace + " changed:" + time);
  }
 }
 return "success";
 
 }
}

當請求超時的時候會產生AsyncRequestTimeoutException,我們定義一個全域性異常捕獲類:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.ResponseBody;
import org.springframework.web.bind.annotation.ResponseStatus;
import org.springframework.web.context.request.async.AsyncRequestTimeoutException;
 
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
 
@ControllerAdvice
class GlobalControllerExceptionHandler {
 
 protected static final Logger logger = LoggerFactory.getLogger(GlobalControllerExceptionHandler.class);
 
 @ResponseStatus(HttpStatus.NOT_MODIFIED)//返回304狀態碼
 @ResponseBody
 @ExceptionHandler(AsyncRequestTimeoutException.class) //捕獲特定異常
 public void handleAsyncRequestTimeoutException(AsyncRequestTimeoutException e,HttpServletRequest request) {
 System.out.println("handleAsyncRequestTimeoutException");
 }
}

然後我們通過postman工具傳送請求http://localhost:8080/watch/mynamespace,請求會掛起,60秒後,DeferredResult超時,客戶端正常收到了304狀態碼,表明在這個期間配置沒有變更過。

然後我們在模擬配置變更的情況,再次發起請求http://localhost:8080/watch/mynamespace,等待個10秒鐘(不要超過60秒),然後呼叫http://localhost:8080/publish/mynamespace,釋出配置變更。這時postman會立刻收到response響應結果:

mynamespace changed:1538880050147

表明在輪訓期間有配置變更過。

這裡我們用了一個MultiMap來存放所有輪訓的請求,Key對應的是namespace,value對應的是所有watch這個namespace變更的非同步請求DeferredResult,需要注意的是:在DeferredResult完成的時候記得移除MultiMap中相應的key,避免記憶體溢位請求。

採用這種長輪詢的好處是,相比一直迴圈請求伺服器,例項一多的話會對伺服器產生很大的壓力,http長輪詢的方式會在伺服器變更的時候主動推送給客戶端,其他時間客戶端是掛起請求的,這樣同時滿足了效能和實時性。

以上為個人經驗,希望能給大家一個參考,也希望大家多多支援我們。如有錯誤或未考慮完全的地方,望不吝賜教。