1. 程式人生 > >Redis文件交集並集

Redis文件交集並集

ssss util lean finish 所有 igp ack csv hash

package com.huawei.iop.timetask.task;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar; import java.util.Date; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Set; import org.apache.commons.lang.StringUtils; import org.apache.log4j.Logger; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate;
import com.huawei.iop.bean.UserDefinedGroup; import com.huawei.iop.bean.WorldCupTask; import com.huawei.iop.manager.worldCUP.userCluster.UserDefinedGroupManager; import com.huawei.iop.manager.worldCUP.userCluster.WorldCUPManager; import com.huawei.iop.util.ConfigPropertiesUtil; public class AggregationTask { @Autowired
private UserDefinedGroupManager userDefinedGroupManager; @Autowired private WorldCUPManager worldCUPManager; @Autowired private RedisTemplate<String, String> redisTemplate; protected final static Logger logger = Logger.getLogger("runtime"); public void aggregation() { try { logger.info("Spring Task Start, System.currentTimeMillis=" + System.currentTimeMillis()); // 讀歷史記錄表,取出最早的未完成任務 UserDefinedGroup userDefinedGroup = userDefinedGroupManager.getOneUnfinished(); if (null == userDefinedGroup) { // 沒有未完成任務 logger.info("execute once with no task to execute. System.currentTimeMillis=" + System.currentTimeMillis()); return; } else { logger.info("execute once, System.currentTimeMillis=" + System.currentTimeMillis()); } // 獲取當前時間的年月 Date nowDate = new Date(); SimpleDateFormat df = new SimpleDateFormat("yyyyMM"); String dateTime = df.format(nowDate); String resultPathFromConf = ConfigPropertiesUtil.readValue("definedresultpath") + dateTime;// download/resultgroups logger.info("resultPathFromConf read from definedresultpath = " + resultPathFromConf); if (StringUtils.isEmpty(resultPathFromConf)) { String tempPath = "/home/iop/download/definedGroupUpload/" + dateTime; File tmpFile = new File(tempPath); if (!tmpFile.exists()) { boolean b = tmpFile.mkdirs(); if (b) { logger.info("tmpFile.mkdirs():" + b); } } resultPathFromConf = tempPath; } String resultPath = resultPathFromConf; // userHome + File.separator + // resultPathFromConf+File.separator +"taskresults"+ File.separator // + month;///opt/iop510/download/resultgroups String resultName = "Upload" + userDefinedGroup.getGroupID() + ".csv"; File resultFile = new File(resultPath, resultName);// /opt/iop510/download/groupid.csv logger.info("resultFile=" + resultFile); String sourceFile = userDefinedGroup.getSourceFile() + ""; logger.info("sourceFile:" + sourceFile); String[] fileNames = sourceFile.split(","); if (sourceFile != null && sourceFile.length() != 0) { logger.info("sourceFile OK, not null and length>0"); } else { // 歷史記錄表沒有要計算的文件 logger.info("AggregationTask.aggregation() : no value of filed ‘sourceFile‘ from DIM_WORLDCUP_DEFINED_TASK"); throw new Exception("AggregationTask.aggregation() : no value of filed ‘sourceFile‘ from DIM_WORLDCUP_DEFINED_TASK"); } String setName1 = "tempSet1"; String setName2 = "tempSet2"; String setName3 = "tempSet3"; String setName4 = "tempSet4"; redisTemplate.boundSetOps(setName1); redisTemplate.boundSetOps(setName2); redisTemplate.boundSetOps(setName3); redisTemplate.boundSetOps(setName4); // xxx.csv,i,xxx.csv,u,xxx.csv Set<String> result = new HashSet<String>(1024); // 單個文件任務,直接讀文件輸出結果;多文件 if (fileNames.length == 1)// /單文件 { String firstFile = fileNames[0]; File f = new File(resultPath, firstFile);// 文件 if (f.isFile() && f.canRead()) { FileReader fileReader = null;// 文件流 BufferedReader bufferedReader = null;// 緩沖文件流 try { fileReader = new FileReader(f); bufferedReader = new BufferedReader(fileReader); logger.info("bufferedReader read success"); String oneLine = null; String readLine = bufferedReader.readLine(); logger.info("readLine:" + readLine); String[] msisdn = null; while ((oneLine = readLine) != null) { msisdn = oneLine.split(","); /* logger.info("msisdn:" + msisdn); */ // redisTemplate.boundSetOps(setName).add(msisdn[0]); result.add(msisdn[0]); readLine = bufferedReader.readLine(); } } catch (IOException e) { logger.error("fileReader read failed" + e.getLocalizedMessage()); } finally { if (null != bufferedReader) { try { bufferedReader.close(); } catch (IOException e) { logger.info("bufferedReader close failed"); e.printStackTrace(); } } if (null != fileReader) { try { fileReader.close(); } catch (IOException e) { logger.info("fileReader close failed"); e.printStackTrace(); } } } logger.info("Only one file to deal with, task ok"); } else { logger.error("File read failed"); throw new Exception(resultPathFromConf + "file read failed"); } } else { logger.info("Multi files to deal with"); logger.info("fileNames length" + fileNames.length); // 多文件 ,index對應的都是 i/u for (int index = 1; index < fileNames.length - 1; index = index + 2) { // index + 2 , 13579都是符號 if (index == 1) {// 第一次進,所有集合都是空 String firstFile = fileNames[index - 1]; File f = new File(resultPath, firstFile);// 第一個文件 this.importFile(f, setName1, redisTemplate);// 第一個文件導入第一個集合 String file = fileNames[index + 1];// 第2個文件 f = new File(resultPath, file); this.importFile(f, setName2, redisTemplate);// 第2個文件導入第2個集合 if ("i".equals(fileNames[index] + "")) {// 第一次做交集 result = redisTemplate.opsForSet().intersect(setName1, setName2);// result接收交集結果 logger.info("1result.size()=" + result.size()); logger.info("1Spring Task: first intersect.currentTimeMillis=" + System.currentTimeMillis()); } else if ("u".equals(fileNames[index] + "")) {// 第一次做並集 result = redisTemplate.opsForSet().union(setName1, setName2); logger.info("2result.size() = " + result.size()); logger.info("2Spring Task: first union. currentTimeMillis=" + System.currentTimeMillis()); } // 清空tempSet1 // redisTemplate.opsForSet().remove(setName1, 0, // redisTemplate.boundSetOps(setName1).size() - 1); redisTemplate.opsForSet().unionAndStore(setName3, setName4, setName1); logger.info("3redisTemplate.boundSetOps(setName1).size()=" + redisTemplate.boundSetOps(setName1).size()); logger.info("3Empty setName1. System.currentTimeMillis=" + System.currentTimeMillis()); // 清空tempSet2 // redisTemplate.opsForSet().remove(setName2, 0, // redisTemplate.boundSetOps(setName2).size() - 1); redisTemplate.opsForSet().unionAndStore(setName3, setName4, setName2); logger.info("4redisTemplate.boundSetOps(setName2).size()=" + redisTemplate.boundSetOps(setName2).size()); logger.info("4Spring Task: first redisTemplate.opsForSet().remove().currentTimeMillis=" + System.currentTimeMillis()); } else {// 不是第一次進,所有集合都不空 // 前一次result -> tempSet1 Iterator<String> it = result.iterator(); while (it.hasNext()) { redisTemplate.boundSetOps(setName1).add(it.next()); logger.info("redisTemplate.boundSetOps success"); } // 新文件 String file = fileNames[index + 1]; File f = new File(resultPath, file); importFile(f, setName2, redisTemplate); if ("i".equals(fileNames[index] + "")) {// 第一次做交集 result = redisTemplate.opsForSet().intersect(setName1, setName2);// result接收交集結果 logger.info("5redisTemplate.opsForSet().intersect(setName1, setName2) .size() = " + result.size()); logger.info("5Spring Task: first union. currentTimeMillis=" + System.currentTimeMillis()); } else if ("u".equals(fileNames[index] + "")) {// 第一次做並集 result = redisTemplate.opsForSet().union(setName1, setName2); logger.info("6redisTemplate.opsForSet().union(setName1, setName2) .size() = " + result.size()); logger.info("6Spring Task: first union. currentTimeMillis=" + System.currentTimeMillis()); } // 清空tempSet1 // redisTemplate.opsForSet().remove(setName1, 0, // redisTemplate.boundSetOps(setName1).size() - 1); redisTemplate.opsForSet().unionAndStore(setName3, setName4, setName1); logger.info("7redisTemplate.boundSetOps(setName1).size()=" + redisTemplate.boundSetOps(setName1).size()); logger.info("7Empty setName1. System.currentTimeMillis=" + System.currentTimeMillis()); // 清空tempSet2 // redisTemplate.opsForSet().remove(setName2, 0, // redisTemplate.boundSetOps(setName2).size() - 1); redisTemplate.opsForSet().unionAndStore(setName3, setName4, setName2); logger.info("8redisTemplate.boundSetOps(setName2).size()=" + redisTemplate.boundSetOps(setName2).size()); logger.info("8Spring Task: first redisTemplate.opsForSet().remove().currentTimeMillis=" + System.currentTimeMillis()); } } } int resultSize = result.size(); // 插入數據 if (resultSize >= 1) { List<WorldCupTask> worldCupTasklist = new ArrayList<WorldCupTask>(); for (String str : result) { WorldCupTask worldCupTask = new WorldCupTask(); worldCupTask.setGroupID(userDefinedGroup.getGroupID()); worldCupTask.setUserID(str); worldCupTasklist.add(worldCupTask); } if (null != worldCupTasklist && worldCupTasklist.size() > 0) { try { worldCUPManager.addUserInfo(worldCupTasklist); logger.info("Insert data success"); } catch (Exception e) { logger.error("Insert data failed"); e.printStackTrace(); } } } // 文件寫出去 logger.info("resultSize=" + resultSize); String msisdn = ""; BufferedWriter bufferedWriter = null; if (resultSize >= 1) { try { bufferedWriter = new BufferedWriter(new FileWriter(resultFile)); Iterator<String> it = result.iterator(); while (it.hasNext()) { msisdn = it.next() + ""; bufferedWriter.write(msisdn + "," + msisdn + "," + "0"); bufferedWriter.newLine(); } } catch (Exception e) { logger.error("bufferedWriter write is failed"); e.printStackTrace(); } finally { bufferedWriter.close(); bufferedWriter = null; } } else { try { bufferedWriter = new BufferedWriter(new FileWriter(resultFile)); bufferedWriter.write("" + "," + "" + "," + ""); bufferedWriter.newLine(); } catch (Exception e) { logger.error("bufferedWriter write is failed"); e.printStackTrace(); } finally { bufferedWriter.close(); bufferedWriter = null; } } userDefinedGroup.setStatus("1");// 0-executing, 1-finished /* * userDefinedGroup.setUserNum(result.size() + ""); * logger.info("setUserNum" + result.size()); */ userDefinedGroup.setUserNum(resultSize + ""); logger.info("setUserNum = " + resultSize); userDefinedGroup.setFileName(resultName); logger.info("setFileName = " + resultName); userDefinedGroup.setFilePath(resultPath); logger.info("setFilePath = " + resultPath); userDefinedGroup.setDownLoadPathByString("/definedGroups"); String finishTime = null; try { logger.info("setDownLoadPathByString = " + resultPath); Calendar c = Calendar.getInstance(); c.setTimeInMillis(System.currentTimeMillis()); logger.info("c.setTimeInMillis"); Date date = c.getTime(); SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddhhmmssSSS"); finishTime = sdf.format(date); userDefinedGroup.setFinishTime(finishTime); logger.info("setFinishTime = " + finishTime); } catch (Exception e) { logger.error("userDefinedGroup set is failed"); e.printStackTrace(); } /* * logger.info("redisTemplate.boundSetOps.size=[" + * redisTemplate.boundSetOps(setName1).size() + "," + * redisTemplate.boundSetOps(setName2).size() + "," + * redisTemplate.boundSetOps(setName3).size() + "," + * redisTemplate.boundSetOps(setName4).size() + "]"); */ logger.info("Sping Task update-start, System. finishTime=" + finishTime); userDefinedGroupManager.updateToFinished(userDefinedGroup); logger.info("Sping Task update-end, System.currentTimeMillis=" + System.currentTimeMillis()); } catch (FileNotFoundException e) { logger.error("file is not found"); e.printStackTrace(); } catch (IOException e) { logger.error("IOException:" + e.getLocalizedMessage()); e.printStackTrace(); } catch (Exception e) { logger.error("Exception:" + e.getLocalizedMessage()); e.printStackTrace(); } finally { logger.info("Sping Task End, System.currentTimeMillis=" + System.currentTimeMillis()); } } /** * read all records from a source file to Redis set * * @param f * source file to read to Redis set * @param setName * Redis set * @throws FileNotFoundException */ private void importFile(File f, String setName, RedisTemplate<String, String> redisTemplate) throws FileNotFoundException, IOException { if (f.isFile() && f.canRead()) { FileReader fileReader = null;// 文件流 // FileInputStream fileInputStream = new FileInputStream(file); BufferedReader bufferedReader = null;// 緩沖文件流 try { fileReader = new FileReader(f); bufferedReader = new BufferedReader(fileReader); String oneLine = ""; while ((oneLine = bufferedReader.readLine()) != null) { String msisdn[] = oneLine.split(","); redisTemplate.boundSetOps(setName).add(msisdn[0]); } } catch (Exception e) { e.printStackTrace(); } finally { if (null != bufferedReader) { try { bufferedReader.close(); } catch (IOException e) { logger.error("bufferedReader close failed"); } } if (null != fileReader) { try { fileReader.close(); } catch (IOException e) { logger.error("fileReader close failed"); } } } } } }

Redis文件交集並集