小程式端檢測到zookeeper curator實現增加和刪除
阿新 • • 發佈:2018-12-26
第一步:後臺上傳
package com.lpy.service.impl; import java.util.HashMap; import java.util.List; import java.util.Map; import org.n3r.idworker.Sid; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import com.github.pagehelper.PageHelper; import com.github.pagehelper.PageInfo; import com.lpy.enums.BGMOperatorTypeEnum; import com.lpy.mapper.BgmMapper; import com.lpy.pojo.Bgm; import com.lpy.pojo.BgmExample; import com.lpy.service.VideoService; import com.lpy.web.util.ZookeeperCurator; import com.lpy.utils.JsonUtils; import com.lpy.utils.PagedResult; @Service public class VideoServiceImpl implements VideoService { @Autowired private BgmMapper bgmMapper; @Autowired private Sid sid; @Autowired private ZookeeperCurator zookeeperCurator; @Override public void addBgm(Bgm bgm) { //和配置裡面有切面行為 事務的配置 String bgmId=sid.nextShort(); bgm.setId(bgmId); bgmMapper.insert(bgm); Map<String, String> map=new HashMap<>(); map.put("operType", BGMOperatorTypeEnum.ADD.type); map.put("path", bgm.getPath()); zookeeperCurator.sendBgmOperator(bgmId,JsonUtils.objectToJson(map)); } @Override public PagedResult queryBgmList(Integer page, Integer pageSize) { //第一步:開始分頁 PageHelper.startPage(page,pageSize); //第二步:查詢的條件 BgmExample example=new BgmExample(); //第三步:查詢資料庫的資料list List<Bgm> list = bgmMapper.selectByExample(example); //第四步:將查詢出來的資料進行封裝 PageInfo<Bgm> pageList=new PageInfo<>(list); //第五步:將封裝好的資料給我們自己寫的幫助類裡面,並返回 PagedResult result=new PagedResult(); result.setTotal(pageList.getPages());//總的頁數 result.setRows(list);//每一列 result.setPage(page);//當前的頁數 result.setRecords(pageList.getTotal());//總的數量 return result; } @Override public void deleteBgm(String id) { Bgm bgm = bgmMapper.selectByPrimaryKey(id); bgmMapper.deleteByPrimaryKey(id); Map<String, String> map=new HashMap<>(); map.put("operType", BGMOperatorTypeEnum.DELETE.type); map.put("path", bgm.getPath()); zookeeperCurator.sendBgmOperator(id, JsonUtils.objectToJson(map)); } }
第二步:在zookeeper curator建立子節點
package com.lpy.web.util; import org.apache.curator.framework.CuratorFramework; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooDefs.Ids; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class ZookeeperCurator { //zookeeper客戶端 private CuratorFramework client =null; //日誌(中介軟體的工具類都需要新增日誌) final static Logger log=LoggerFactory.getLogger(ZookeeperCurator.class); //初始化 引數的client是在applicationContext-zookeeper.xml容器裡面,通過配置進行賦值的 public ZookeeperCurator(CuratorFramework client) { this.client=client; } //初始化 public void init() { //設定名稱空間 client = client.usingNamespace("admin"); try { //判斷在admin名稱空間下是否有bgm節點 /admin/bgm if(client.checkExists().forPath("/bgm")==null) { /** * 對於zookeeper來講,有2種類型的節點 * 持久節點:當你建立一個節點的時候,這個節點就永遠存在了,除非你手動刪除 * 臨時節點:建立一個節點之後,會話斷開,會自動刪除,當然也可以手動刪除 */ //函數語言程式設計 client.create().creatingParentsIfNeeded() //建立節點 .withMode(CreateMode.PERSISTENT) //節點型別:持久節點 .withACL(Ids.OPEN_ACL_UNSAFE) //acl:匿名許可權 .forPath("/bgm"); log.info("zookeeper初始化成功"); log.info("zookeeper伺服器狀態:{0}",client.isStarted()); } } catch (Exception e) { log.error("zookeeper客戶端連線、初始化錯誤"); e.printStackTrace(); } } /** * 增加或者刪除bgm,向zookeeper-server建立子節點,供小程式後端監聽 * @param bgmId * @param operType */ public void sendBgmOperator(String bgmId,String operObj) { try { client.create().creatingParentsIfNeeded() //建立節點 .withMode(CreateMode.PERSISTENT) //節點型別:持久節點 .withACL(Ids.OPEN_ACL_UNSAFE) //acl:匿名許可權 .forPath("/bgm"+bgmId,operObj.getBytes()); } catch (Exception e) { e.printStackTrace(); } } }
第三步:小程式端(消費端)實現增加和刪除
package com.lpy; import java.io.File; import java.net.URL; import java.net.URLEncoder; import java.util.Map; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.retry.ExponentialBackoffRetry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import com.lpy.enums.BGMOperatorTypeEnum; import com.lpy.service.BgmService; import com.lpy.utils.JsonUtils; //需要springboot去掃描, springboot是通過java類來進行配置的 @Component public class ZookeeperCuratorClient { //zookeeper客戶端 private CuratorFramework client =null; //日誌(中介軟體的工具類都需要新增日誌) final static Logger log=LoggerFactory.getLogger(ZookeeperCuratorClient.class); @Autowired private BgmService bgmService; public static final String ZOOKEEPER_SERVER="192.168.4.245:2181"; public void init() { if(client!=null) { return; } //第一步:建立重試策略 RetryPolicy retryPolicy=new ExponentialBackoffRetry(1000,5); //第二步:建立zookeeper客戶端 client=CuratorFrameworkFactory.builder().connectString(ZOOKEEPER_SERVER) .sessionTimeoutMs(10000) .retryPolicy(retryPolicy) .namespace("admin").build(); //第三步:啟動客戶端 client.start(); //測試 try { // String testNodeData = new String(client.getData().forPath("/bgm/...")); // log.info("測試的節點資料為:{}",testNodeData); addChildWatch("/bgm");//對bgm下面所有的子節點進行監聽 } catch (Exception e) { e.printStackTrace(); } } public void addChildWatch(String nodePath) throws Exception { final PathChildrenCache cache=new PathChildrenCache(client, nodePath, true); cache.start(); //新增監聽器 cache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { //做相應的事件處理 if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) { log.info("監聽到事件CHILD_ADDED"); //第一步:從資料庫查詢bgm物件,獲取路徑path String path = event.getData().getPath();//發生的事件對應的路徑,節點最後有bgmId String operatorObjStr =new String(event.getData().getData());//獲取到操作型別 Map<String, String> map=JsonUtils.jsonToPojo(operatorObjStr, Map.class); String operatorType=map.get("operType"); String songPath=map.get("path"); // String arr[]=path.split("/"); // String bgmId=arr[arr.length-1]; // Bgm bgm=bgmService.queryBgmById(bgmId); // // if(bgm==null) { // return; // } //bgm所在的相對路徑 // String songPath=bgm.getPath(); //第二步:定義儲存到本地的bgm路徑 String filePath="D:\\java_all\\workspace-wxxcs\\video-space"+songPath; //第三步:定義下載的路徑(播放的url) String arrPath[]=songPath.split("\\\\"); String finalPath=""; //處理url的斜槓以及編碼 for(int i=0;i<arrPath.length;i++) { if(StringUtils.isNoneBlank(arrPath[i])) { finalPath+="/"; finalPath+=URLEncoder.encode(arrPath[i], "UTF-8") ; } } String bgmUrl="http://192.168.4.245:8080/mvc"+finalPath; if(operatorType.equals(BGMOperatorTypeEnum.ADD.type)) {//新增 //下載bgm到springboot伺服器 URL url=new URL(bgmUrl); File file=new File(filePath); //進行下載 FileUtils.copyURLToFile(url, file); client.delete().forPath(path); }else if(operatorType.equals(BGMOperatorTypeEnum.DELETE.type)) {//刪除 File file=new File(filePath); FileUtils.forceDelete(file); client.delete().forPath(path); } } } }); } }