1. 程式人生 > >用mongo和redis查詢排行榜、統計活躍使用者

用mongo和redis查詢排行榜、統計活躍使用者

  nosql資料庫能解決關係型資料庫遇到的效能和擴充套件性的問題,本部落格將以mongodb和redis兩種nosql資料庫為基礎,簡單的介紹下面兩個業務場景的解決方案:
  1.查詢排行榜(以當日總步數排名為例,查詢排名前200的使用者); 2.統計活躍使用者數(統計某個移動端app軟體在各個下載渠道的活躍裝置數,以起始時間,版本號,系統型別等作為查詢條件)。
  專案原始碼url:https://github.com/zhzhair/spring-boot-nosql.git。
  執行環境很簡陋:window10,8G記憶體(專案實際可用記憶體3個多G),i7處理器,4核8執行緒。

  案例一:查詢當日總步數排名前200的使用者計步資訊。技術架構:java8,spring boot2.0.0,mysql,redis,mongodb,mybatis,swagger,jmeter,idea,maven。
  (i)新增測試資料:新建32個表,按照使用者id對32取模新增測試資料到不同的表,做定時任務,每秒新增或修改300條記錄。表包括user_id和步數step_count兩個欄位,假設手機每隔一段時間傳一次累計步數,如果當日使用者有記錄,就修改使用者的步數(增加新的步數),否則直接新增記錄。部分程式碼如下:
@LogForTask
@Scheduled(cron = "0/1 * * * * ?")
public void uploadStep(){//定時任務每秒新增或修改300條記錄
  IntStream.range(0,300).parallel().forEach(i->stepService.uploadStep(32));
}
  (ii)程式設計:在高併發的情況下記憶體是個問題(out of memory exception!),單個mongodb文件也不能放太多的資料,所以需要設定記憶體不足就讀取磁碟。考慮到第200名的總步數不會減少,並且越往後越“穩定”,所以把它作為閾值就可以給查詢的表“瘦身”,從而避免大表排序。
  初始化(即啟動專案時):需要將32個表的前200名都放到一個mongodb文件,再將文件前200名替換到該bson文件,同時將第200名的步數存到redis裡面,部分程式碼如下:
@Resource
private StepService stepService;
private static StepService service;
@PostConstruct
public void init(){
  service = this.stepService;
}

public static void main(String[] args) {
SpringApplication.run(StepsApplication.class, args);
  //啟動專案初始化排名
  service.recordTopAll(32);
}

@Override
public void recordTopAll(int tableCount) {
  mongoTemplate.dropCollection(StepsTop.class);//刪除文件
  IntStream.range(0,tableCount).parallel().forEach(this::insertOneTable);//將MySQL的資料插入到mongo文件
  /*取出前200名放到list,更新mongo文件的資料為當前list的資料*/
  Query query = new Query().with(new Sort(Sort.Direction.DESC,"totalCount")).limit(200);
  List<StepsTop> list = mongoTemplate.find(query,StepsTop.class);
  if(list.isEmpty()) return;
  mongoTemplate.dropCollection(StepsTop.class);
  mongoTemplate.insertAll(list);
  /*redis儲存閾值-第200名的步數*/
  int size = Math.min(200,list.size());
  redisTemplate.opsForValue().set(redisKey,String.valueOf(list.get(size - 1).getTotalCount()));
}
  步數上傳:redis的資料做定時任務更新,閾值越來越大,每次都將接收到的步數或更新後的步數與閾值比較,比這個閾值大才會去查mongo,然後對mongo文件做更新或插入操作,這個“比較”會非常頻繁,但是redis“不懼怕”高併發,我們不必擔心。這樣就大大地減少了對mongo文件的操作,確保mongo文件資料量很少,之後查詢並排序mongo文件的資料就很快了。部分程式碼如下:
@Override
public void uploadStep(int tableCount) {
  int userId = new Random().nextInt(500_0000);
  int stepCount = 1 + new Random().nextInt(5000);
  Integer count = commonMapper.getStepCount(prefix + userId%tableCount,userId);
  if(count != null){
    commonMapper.updateSteps(prefix + userId%tableCount, userId,count + stepCount);
  }else{
    commonMapper.insertTables(prefix + userId%tableCount, userId, stepCount);
  }
  String tailSteps = redisTemplate.opsForValue().get(redisKey);
  int totalCount = count == null?stepCount:count + stepCount;
  if(tailSteps != null && totalCount > Integer.valueOf(tailSteps)){//步數超過閾值就插入或更新使用者的記錄
    Query query = new Query(Criteria.where("userId").is(userId));
    if(!mongoTemplate.exists(query,StepsTop.class)){
      StepsTop stepsTop = new StepsTop();
      stepsTop.setUserId(userId);
      stepsTop.setTotalCount(stepCount);
      mongoTemplate.insert(stepsTop);
    }else{
      System.out.println("update: " + tailSteps);
      Update update = new Update();
      update.set("totalStep",totalCount);
      mongoTemplate.upsert(query,update,StepsTop.class);
    }
  }else{
    StepsTop stepsTop = new StepsTop();
    stepsTop.setUserId(userId);
    stepsTop.setTotalCount(stepCount);
    mongoTemplate.insert(stepsTop);
  }
}
  定時任務:每隔10秒更新一次閾值,同時刪除mongo文件中200名以外的資料;每隔1秒從mongo查詢排好序的前200名的資料push到redis佇列,方便從redis取出排名。部分程式碼如下:
@Override//更新閾值,刪除mongo文件中200名以外的資料
public void flushRankAll() {
  // Query query = new Query().with(new Sort(Sort.Direction.DESC,"totalCount")).limit(201);
  // List<StepsTop> list = mongoTemplate.find(query,StepsTop.class);//高併發場景下容易出現記憶體不足異常:out of memory Exception
  TypedAggregation<StepsTop> aggregation = Aggregation.newAggregation(
    StepsTop.class,
    project("userId", "totalCount"),//查詢用到的欄位
    sort(Sort.Direction.DESC,"totalCount"),
    limit(200)
  ).withOptions(newAggregationOptions().allowDiskUse(true).build());//記憶體不足到磁碟讀寫,應對高併發
  AggregationResults<StepsTop> results = mongoTemplate.aggregate(aggregation, StepsTop.class, StepsTop.class);
  List<StepsTop> list = results.getMappedResults();
  if(list.size() == 201){
    int totalCount = list.get(199).getTotalCount();
    Query query1 = new Query(Criteria.where("totalCount").lt(totalCount));
    mongoTemplate.remove(query1,StepsTop.class);
  }
}
@Override//查詢排好序的前200名的資料push到redis佇列
public void recordRankAll() {
  // Query query = new Query().with(new Sort(Sort.Direction.DESC,"totalCount")).limit(200);
  // List<StepsTop> list = mongoTemplate.find(query,StepsTop.class);
  TypedAggregation<StepsTop> aggregation = Aggregation.newAggregation(
    StepsTop.class,
    project("userId", "totalCount"),//查詢用到的欄位
    sort(Sort.Direction.DESC,"totalCount"),
    limit(200)
  ).withOptions(newAggregationOptions().allowDiskUse(true).build());//記憶體不足到磁碟讀寫,應對高併發
  AggregationResults<StepsTop> results = mongoTemplate.aggregate(aggregation, StepsTop.class, StepsTop.class);
  List<StepsTop> list = results.getMappedResults();
  if(list.size() == 200){
    Integer stepCount = list.get(199).getTotalCount();
    redisTemplate.opsForValue().set(redisKey,String.valueOf(stepCount));
  }
  if(!list.isEmpty()){
    redisListTemplate.delete(redisQueueKey);
    //noinspection unchecked
    redisListTemplate.opsForList().rightPushAll(redisQueueKey,list);
  }
}
  查詢排行榜:現在就簡單了,直接到redis佇列查詢即可,部分程式碼如下:
@ApiOperation(value = "查詢當日總步數排名", notes = "查詢當日總步數排名")
@RequestMapping(value = "/getRankAll", method = {RequestMethod.GET}, produces = {MediaType.APPLICATION_JSON_VALUE})
public BaseResponse<List<StepsRankAllResp>> getRankAll(int begin,int pageSize) {
  BaseResponse<List<StepsRankAllResp>> baseResponse = new BaseResponse<>();
  List<StepsRankAllResp> list = stepService.getRankAllFromRedis(begin,pageSize);
  if(list.isEmpty()) list = stepService.getRankAll(begin,pageSize);//redis查不到資料就從Mongo查
  baseResponse.setCode(0);
  baseResponse.setMsg("返回資料成功");
  baseResponse.setData(list);
  return baseResponse;
}
@Override//todo 從redis讀取
public List<StepsRankAllResp> getRankAllFromRedis(int begin, int pageSize) {
  List<StepsTop> stepsList = redisListTemplate.opsForList().range(redisQueueKey,begin,pageSize);
  List<StepsRankAllResp> list = new ArrayList<>(stepsList.size());
  for (int i = 0; i < stepsList.size(); i++) {
    StepsRankAllResp stepsRankAllResp = new StepsRankAllResp();
    StepsTop stepsTop = stepsList.get(i);
    BeanUtils.copyProperties(stepsTop,stepsRankAllResp);
    stepsRankAllResp.setRank(begin + i + 1);
    list.add(stepsRankAllResp);
  }
  return list;
}
  jmeter併發測試:訪問介面文件--http://localhost:8080/swagger-ui.html/,調介面查詢排名,配置調介面5000次,持續5秒,聚合報告如下:

 


  案例二:統計活躍使用者數(統計某個移動端app軟體在各個下載渠道的活躍裝置數,以起始時間,版本號,系統型別等作為查詢條件,這裡為了簡便起見,不考慮查詢條件)。技術架構:java8,spring boot2.0.0,mysql,mongodb,mybatis,swagger,idea,maven。
  新增測試資料:新建4個表(建4個表是為了用多執行緒新增資料比較快,要不然“我沒得耐心等”),包括APP_CHANNEL--下載渠道,DEVICE_ID--裝置id號,DEVICE_HASHCODE--裝置id號的hash值,DEVICE_HASHCODE_IDX--hash值的絕對值除以16384的餘數。將1000w條記錄插入這4個表,每個表250萬,然後新建32個表,根據DEVICE_HASHCODE_IDX對32取模,將四個表的資料按類別插入到這32個表中,移動裝置被分成了32個類,此時再也不用擔心select app_channel,count(distinct device_id) from t group by app_channel;的效率了,如果你用的是專業的伺服器,還有多臺機器,你完全可以放到更多甚至幾百個表中,這樣就更無敵了。好了,現在我不關心去重再計數(MySQL的大表去重計數慢到你懷疑人生)的問題了,我只需要將每個表的資料合到一起(總資料量<=分表的個數*下載渠道個數),再分組求和(select app_channel,sum(device_count) from t group by app_channel)。部分程式碼如下:
@Override//按裝置分類將1000w資料放到32個表中
public void insertTables(int tableCount) {
  IntStream.range(0,tableCount).parallel().forEach(i->this.insertOneTable(i,tableCount));
}
private void insertOneTable(int i,int tableCount){
  commonMapper.truncateTable(tableName + "_" + i);
  for (int k = 0; k < 4; k++) {
    List<StartRecordMapperRequest> list0 = new ArrayList<>(1000_0000/tableCount/4);
    for (int j = i; j < 16384; j+=tableCount) {
      List<StartRecordMapperRequest> list = commonMapper.getStartDataByRem(tableName + k, j);
      list0.addAll(list);
    }
    int size = list0.size();
    for (int j = 0; j < size/10000 + 1; j++) {
      List<StartRecordMapperRequest> list = list0.subList(j*10000,Math.min(j*10000 + 10000,size));
      commonMapper.insertTables(list,tableName + "_" + i);
    }
  }
  System.out.println(i + " =================");
}
  查詢活躍使用者數:將32個表的活躍裝置資料先查出來,即select app_channel,count(distinct device_id) from t group by app_channel;插入到mongo文件,再從mongo分組求和即可得到最終的活躍裝置數,部分程式碼如下:
@Override
public List<Document> getActiveCount(int tableCount) {
  mongoTemplate.dropCollection(ActiveChannelCountMongo.class);
  if(!mongoTemplate.collectionExists(ActiveChannelCountMongo.class))
  IntStream.range(0,tableCount).parallel().forEach(this::getActiveCountOne);
  TypedAggregation<ActiveChannelCountMongo> aggregation = Aggregation.newAggregation(
    ActiveChannelCountMongo.class,
    project("appChannel", "activeCount"),//查詢用到的欄位
    // match(Criteria.where("dateTime").lte(Date.valueOf(todayZero).getTime()).gte(Date.valueOf(yesterday).getTime())),
    group("appChannel").sum("activeCount").as("activeCount"),
    sort(Sort.Direction.DESC,"activeCount"),
    project("appChannel", "activeCount").and("appChannel").previousOperation()//輸出欄位,後面是取別名
  ).withOptions(newAggregationOptions().allowDiskUse(true).build());//記憶體不足就到磁碟讀寫
  AggregationResults<Document> results = mongoTemplate.aggregate(aggregation, ActiveChannelCountMongo.class, Document.class);
  return results.getMappedResults();
}

private void getActiveCountOne(int i){
  List<ActiveChannelCount> list = viewMapper.getActiveCount(tableName + i);
  mongoTemplate.insert(list,ActiveChannelCountMongo.class);
}

  調介面看執行時間和返回結果:訪問介面文件--http://localhost/swagger-ui.html/,調介面輸出如下日誌:
前端呼叫方法開始----getActiveCount---->:#{"URL地址":/view/getActiveCount, "HTTP方法":GET,引數:, "tableCount":32}
前端呼叫方法結束----getActiveCount---->:返回值: BaseResponse{code=0, msg='獲取資料成功', data=[Document{{activeCount=111792, appChannel=appStore}}, Document{{activeCount=73757, appChannel=yingyongbao}}, Document{{activeCount=55640, appChannel=baiduyingyong}}, Document{{activeCount=55605, appChannel=vivo}}, Document{{activeCount=36997, appChannel=xiaomi}}, Document{{activeCount=36991, appChannel=360yingyong}}, Document{{activeCount=18575, appChannel=samsung}}, Document{{activeCount=18528, appChannel=iTools}}, Document{{activeCount=18483, appChannel=oppo}}, Document{{activeCount=18472, appChannel=htc}}, Document{{activeCount=18457, appChannel=huawei}}, Document{{activeCount=18374, appChannel=wandoujia}}, Document{{activeCount=18329, appChannel=mezu}}]}
2018-11-11 09:45:26,595 INFO - [http-nio-80-exec-13 ] c.e.f.c.m.i.RequestTimeConsumingInterceptor : /view/getActiveCount 3010ms

  結束語:本文的方案能解決一些高併發,大資料量的問題,但只是對於資料量不是特別巨大,又想用較低成本解決問題的一小點想法。