1. 程式人生 > >小程式端檢測到zookeeper curator實現增加和刪除

小程式端檢測到zookeeper curator實現增加和刪除

第一步:後臺上傳

package com.lpy.service.impl;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.n3r.idworker.Sid;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import com.lpy.enums.BGMOperatorTypeEnum;
import com.lpy.mapper.BgmMapper;
import com.lpy.pojo.Bgm;
import com.lpy.pojo.BgmExample;
import com.lpy.service.VideoService;
import com.lpy.web.util.ZookeeperCurator;
import com.lpy.utils.JsonUtils;
import com.lpy.utils.PagedResult;
@Service
public class VideoServiceImpl implements VideoService {
	
	@Autowired
	private BgmMapper bgmMapper;
	
	@Autowired
	private Sid sid;
	
	@Autowired
	private ZookeeperCurator zookeeperCurator;
	

	@Override
	public void addBgm(Bgm bgm) {  //和配置裡面有切面行為  事務的配置
		String bgmId=sid.nextShort();
		bgm.setId(bgmId);
		bgmMapper.insert(bgm);
		
		Map<String, String> map=new HashMap<>();
		map.put("operType", BGMOperatorTypeEnum.ADD.type);
		map.put("path", bgm.getPath());
		zookeeperCurator.sendBgmOperator(bgmId,JsonUtils.objectToJson(map));
	}

	@Override
	public PagedResult queryBgmList(Integer page, Integer pageSize) {
		//第一步:開始分頁
		PageHelper.startPage(page,pageSize);
		//第二步:查詢的條件
		BgmExample example=new BgmExample();
		//第三步:查詢資料庫的資料list
		List<Bgm> list = bgmMapper.selectByExample(example);
		//第四步:將查詢出來的資料進行封裝
		PageInfo<Bgm> pageList=new PageInfo<>(list);
		//第五步:將封裝好的資料給我們自己寫的幫助類裡面,並返回
		PagedResult result=new PagedResult();
		result.setTotal(pageList.getPages());//總的頁數
		result.setRows(list);//每一列
		result.setPage(page);//當前的頁數
		result.setRecords(pageList.getTotal());//總的數量
		return result;
	}

	@Override
	public void deleteBgm(String id) {
		Bgm bgm = bgmMapper.selectByPrimaryKey(id);
		bgmMapper.deleteByPrimaryKey(id);
		Map<String, String> map=new HashMap<>();
		map.put("operType", BGMOperatorTypeEnum.DELETE.type);
		map.put("path", bgm.getPath());
		zookeeperCurator.sendBgmOperator(id, JsonUtils.objectToJson(map));
	}

}

第二步:在zookeeper curator建立子節點

package com.lpy.web.util;
import org.apache.curator.framework.CuratorFramework;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs.Ids;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZookeeperCurator {
	//zookeeper客戶端
	private CuratorFramework client =null;
	//日誌(中介軟體的工具類都需要新增日誌)
	final static Logger log=LoggerFactory.getLogger(ZookeeperCurator.class);
	
	//初始化  引數的client是在applicationContext-zookeeper.xml容器裡面,通過配置進行賦值的
	public ZookeeperCurator(CuratorFramework client) {
		this.client=client;
	}
	
	//初始化
	public void init() {
		//設定名稱空間
		client = client.usingNamespace("admin");
		try {
			//判斷在admin名稱空間下是否有bgm節點 /admin/bgm
			if(client.checkExists().forPath("/bgm")==null) {
				/**
				 * 對於zookeeper來講,有2種類型的節點
				 *  持久節點:當你建立一個節點的時候,這個節點就永遠存在了,除非你手動刪除
				 *  臨時節點:建立一個節點之後,會話斷開,會自動刪除,當然也可以手動刪除
				 */
				//函數語言程式設計
				client.create().creatingParentsIfNeeded()  //建立節點
					.withMode(CreateMode.PERSISTENT)       //節點型別:持久節點
					.withACL(Ids.OPEN_ACL_UNSAFE)		   //acl:匿名許可權
					.forPath("/bgm");
				log.info("zookeeper初始化成功");
				log.info("zookeeper伺服器狀態:{0}",client.isStarted());
			}
		} catch (Exception e) {
			log.error("zookeeper客戶端連線、初始化錯誤");
			e.printStackTrace();
		}
	}
	/**
	 * 增加或者刪除bgm,向zookeeper-server建立子節點,供小程式後端監聽
	 * @param bgmId
	 * @param operType
	 */
	public void sendBgmOperator(String bgmId,String operObj) {
		
		try {
			client.create().creatingParentsIfNeeded()  //建立節點
			.withMode(CreateMode.PERSISTENT)       //節點型別:持久節點
			.withACL(Ids.OPEN_ACL_UNSAFE)		   //acl:匿名許可權
			.forPath("/bgm"+bgmId,operObj.getBytes());
		} catch (Exception e) {
			e.printStackTrace();
		}
		
	}
	
	
	
	
}

第三步:小程式端(消費端)實現增加和刪除

package com.lpy;
import java.io.File;
import java.net.URL;
import java.net.URLEncoder;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.PathChildrenCache;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.lpy.enums.BGMOperatorTypeEnum;
import com.lpy.service.BgmService;
import com.lpy.utils.JsonUtils;
//需要springboot去掃描,   springboot是通過java類來進行配置的
@Component
public class ZookeeperCuratorClient {
	//zookeeper客戶端
	private CuratorFramework client =null;
	//日誌(中介軟體的工具類都需要新增日誌)
	final static Logger log=LoggerFactory.getLogger(ZookeeperCuratorClient.class);
	
	@Autowired
	private BgmService bgmService;
	
	
	public static final String ZOOKEEPER_SERVER="192.168.4.245:2181";
	
	public void init() {
		if(client!=null) {
			return;
		}
		//第一步:建立重試策略
		RetryPolicy retryPolicy=new ExponentialBackoffRetry(1000,5);
		//第二步:建立zookeeper客戶端
		client=CuratorFrameworkFactory.builder().connectString(ZOOKEEPER_SERVER)
												.sessionTimeoutMs(10000)
												.retryPolicy(retryPolicy)
												.namespace("admin").build();
		//第三步:啟動客戶端
		client.start();
		
		//測試
		try {
//			String testNodeData = new String(client.getData().forPath("/bgm/..."));
//			log.info("測試的節點資料為:{}",testNodeData);
			addChildWatch("/bgm");//對bgm下面所有的子節點進行監聽
		} catch (Exception e) {
			e.printStackTrace();
		}
		
	}
	public void addChildWatch(String nodePath) throws Exception {
		
		final PathChildrenCache cache=new PathChildrenCache(client, nodePath, true);
		cache.start();
		//新增監聽器
		cache.getListenable().addListener(new PathChildrenCacheListener() {
			
			@Override
			public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
				//做相應的事件處理
				if(event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
					log.info("監聽到事件CHILD_ADDED");
					//第一步:從資料庫查詢bgm物件,獲取路徑path
					String path = event.getData().getPath();//發生的事件對應的路徑,節點最後有bgmId
					String operatorObjStr =new String(event.getData().getData());//獲取到操作型別
					
					Map<String, String> map=JsonUtils.jsonToPojo(operatorObjStr, Map.class);
					String operatorType=map.get("operType");
					String songPath=map.get("path");
					
//					String arr[]=path.split("/");
//					String bgmId=arr[arr.length-1];
//					Bgm bgm=bgmService.queryBgmById(bgmId);
//					
//					if(bgm==null) {
//						return;
//					}
					//bgm所在的相對路徑
//					String songPath=bgm.getPath();
					//第二步:定義儲存到本地的bgm路徑
					String filePath="D:\\java_all\\workspace-wxxcs\\video-space"+songPath;
					
					//第三步:定義下載的路徑(播放的url)
					String arrPath[]=songPath.split("\\\\");
					String finalPath="";
					//處理url的斜槓以及編碼
					for(int i=0;i<arrPath.length;i++) {
						if(StringUtils.isNoneBlank(arrPath[i])) {
							finalPath+="/";
							finalPath+=URLEncoder.encode(arrPath[i], "UTF-8") ;
						}
					}
					String bgmUrl="http://192.168.4.245:8080/mvc"+finalPath;
					if(operatorType.equals(BGMOperatorTypeEnum.ADD.type)) {//新增
						//下載bgm到springboot伺服器
						URL url=new URL(bgmUrl);
						File file=new File(filePath);
						//進行下載
						FileUtils.copyURLToFile(url, file);
						client.delete().forPath(path);
					}else if(operatorType.equals(BGMOperatorTypeEnum.DELETE.type)) {//刪除
						File file=new File(filePath);
						FileUtils.forceDelete(file);
						client.delete().forPath(path);
					}
					
				}
			}
		});
	}
}