java 執行緒池管理多執行緒操作Hbase資料庫完整專案
阿新 • • 發佈:2019-01-12
Hbase-site.xml配置檔案:
<?xml version="1.0" encoding="UTF-8"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <property> <name>hbase.rootdir</name> <value>hdfs://叢集1(zhz100):9000/hbase</value> </property> <property> <name>hbase.cluster.distributed</name> <value>true</value> </property> <property> <name>hbase.master</name> <value>叢集(zhz100):60000</value> </property> <property> <name>hbase.zookeeper.property.dataDir</name> <value>/usr/java/zookeeper3.4.10/temp</value> </property> <property> <name>hbase.zookeeper.quorum</name> <value>叢集1(zhz100),叢集2,叢集3</value> </property> <property> <name>hbase.zookeeper.property.clientPort</name> <value>2181</value> </property> </configuration>
配置application.yml:
server:
port: 8080
hbase:
conf:
confMaps:
'hbase.zookeeper.quorum' : 'IP地址1:2181,IP地址2:2181'
pom.xml 引入的依賴:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.2</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.2.0</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-hadoop</artifactId> <version>2.5.0.RELEASE</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.5.1</version> </dependency> <dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-hadoop-core</artifactId> <version>2.4.0.RELEASE</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase</artifactId> <version>1.2.1</version> <type>pom</type> </dependency> <dependency> <groupId>ch.ethz.ganymed</groupId> <artifactId>ganymed-ssh2</artifactId> <version>build210</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </exclusion> </exclusions> </dependency>
Test含主函式main:
package com.example.demo; import org.apache.hadoop.hbase.client.Connection; import sun.net.ftp.FtpClient; import sun.net.ftp.FtpProtocolException; import java.io.*; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class Test { FtpClient ftpClient; /** * 連線FTP服務 * @param url //IP地址 * @param port//埠號 * @param username//使用者名稱 * @param password//密碼 * @return */ public static FtpClient connectFTP(String url, int port, String username, String password) { //建立ftp FtpClient ftp = null; try { //建立地址 SocketAddress addr = new InetSocketAddress(url, port); //連線 ftp = FtpClient.create(); ftp.connect(addr); //登陸 ftp.login(username, password.toCharArray()); ftp.setBinaryType(); } catch (FtpProtocolException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } return ftp; } public static List<String> download(String ftpFile, FtpClient ftp) { synchronized (TimerWatch.class) { List<String> list = new ArrayList<String>(); String str = ""; InputStream is = null; BufferedReader br = null; try { // 獲取ftp上的檔案 is = ftp.getFileStream(ftpFile); //轉為位元組流 br = new BufferedReader(new InputStreamReader(is)); while ((str = br.readLine()) != null) { list.add(str); } br.close(); } catch (FtpProtocolException e) { e.printStackTrace(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } return list; } } public static void main(String[] args) throws Exception{ //開啟主機1FTP伺服器 FtpClient ftp = connectFTP("主機1IP"埠號,"賬號","密碼"); //開啟主機1Linux指令 RemoteExecuteCommand rec = new RemoteExecuteCommand("主機1IP", "賬號", "密碼"); //連線資料庫 Connection connection=LinkHbase.table(); //如果出現宕機或者出現異常導致專案停止執行,此段程式碼可以將未成功插入的資料所在的檔案就會儲存在work目錄中。重啟後對work進行解析就會避免資料丟失,此方法只會在啟動時執行一次。 Sync.Judge(connection,ftp,rec); //開啟一個執行緒池,corePoolSize:核心執行緒數5;maxPoolSize:最大執行緒 數10; keepAliveTime:執行緒空閒時間:200微妙;ArrayBlockingQueue是一個基於陣列結構的有界阻塞佇列,此佇列按 FIFO(先進先出)原則對元素進行排序 ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<Runnable>(5)); for(int i=0;i<5;i++){ MyTask myTask = new MyTask(i,connection,ftp,rec); executor.execute(myTask); System.out.println("執行緒池中執行緒數目:"+executor.getPoolSize()+",佇列中等待執行的任務數目:"+ executor.getQueue().size()+",已執行玩別的任務數目:"+executor.getCompletedTaskCount()); } executor.shutdown(); } }
Sync類中Judge()方法:
//判斷in檔案是否有檔案執行完成
public static void Judge(Connection connection,FtpClient ftp,RemoteExecuteCommand rec) throws Exception {
//Linux命令獲取/usr/HDWork/work的檔名字
String files=rec.execute("cd /usr/HDWork/work ; ls -lrt|sed -n '2, $p'|awk '{print $9}'");
if(files.equals("") || files==null){
System.out.println("work目錄下沒有檔案");
} else {
System.out.println("work目錄有檔案");
String[] strs = files.split("\\n");
for (int i = 0, len = strs.length; i < len; i++) {
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");//計算執行時間
long start, end;
start = System.currentTimeMillis();
Date date1 = new Date(start);
String date1String = formatter.format(date1);
System.out.println(strs[i].toString());
//解析work目錄下的檔案內容,插入到hbase
ReadIn.add(strs[i].toString(), connection, ftp);
//移動檔案到back目錄下
Cmd.mvBack(strs[i].toString(), rec);
end = System.currentTimeMillis();
Date date2 = new Date(end);
String date2String = formatter.format(date2);
System.out.println(Thread.currentThread().getName() + ":執行檔案:." + strs[i].toString() + ";開始時間為:" + date1String + ";結束時間為:" + date2String + ";新增成功,共用時" + (end - start)+ "ms。");
}
}
ReadIn類中add()方法:
public class ReadIn {
/*
*解析work目錄下的檔案內容,插入到資料庫中
* */
public static void add(String fileName, Connection connection, FtpClient ftpClient) throws Exception{
String[] y= { "trademark" , "dr_type" , "service_id" , "bill_month" );
Table table=table(connection);//判斷是否已經存在含有這個表
List<String> cutstrings;
System.out.println("/usr/HDWork/work/"+fileName);
List<String> list=download("/usr/HDWork/work/"+fileName,ftpClient);
String str;
for(int i=6;i<list.size();i++){
int aa = (int) ((Math.random() * 9 + 1) * 100000);
System.out.println(aa + "根據檔名來得到keyRow"+fileName);
Put put = new Put(Bytes.toBytes( aa));
cutstrings = cutstring(list.get(i).substring(19));
for (int z = 0; z < cutstrings.size() - 1; z++) { // 每一個for讀入一行資料
put.addColumn(Bytes.toBytes(y[z]), Bytes.toBytes("aa"), Bytes.toBytes(cutstrings.get(z)));//將欄位與值匹配對應插入
}
table.put(put);//提交資料
}
}
/*
* 執行緒鎖判斷是否有這個表
* */
public static Table table(Connection connection)throws Exception{
synchronized (ReadIn.class) {
Admin admin = connection.getAdmin();
Table table = null;
SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd");
String systemTime = String.valueOf(df.format(new Date()));//系統日期
TableName tableNameObj = TableName.valueOf(systemTime);//以系統日期為表的名字
if (admin.tableExists(tableNameObj)) {//如果表的名字已經存在
table = connection.getTable(TableName.valueOf(systemTime));//連線這個表
return table;
} else {//沒有這個表,先建立表,在連結表
HbaseDemo.createTable(systemTime, "trademark", "dr_type", "service_id");
table = connection.getTable(TableName.valueOf(systemTime));
return table;
}
}
}
/*
* 擷取前19位字元後解析的內容
* */
public static List<String> cutstring (String Stence) {
List<String> stringlist = new ArrayList<String>();//用來儲存解析出來的元素
for (int i = 0; i < Stence.length(); i++) {
if (Stence.charAt(i) == ';') {
String temp = "";//儲存單詞
int wordlength = i;
while (wordlength < Stence.length() - 1 && Stence.charAt(++wordlength) != ';') {
temp += Stence.charAt(wordlength);
//System.out.println(temp);
}
stringlist.add(temp);
}
}
return stringlist;
}
/**
* 取ftp上的檔案內容
* @param ftpFile
* @param ftp
* @return
*/
public static List<String> download(String ftpFile, FtpClient ftp) {
synchronized (ReadIn.class) {
List<String> list = new ArrayList<String>();
String str = "";
InputStream is = null;
BufferedReader br = null;
try {
ftpFile=ftpFile.replace(" ","");
// 獲取ftp上的檔案
is = ftp.getFileStream(ftpFile);
//轉為位元組流
br = new BufferedReader(new InputStreamReader(is));
while((str=br.readLine())!=null){
list.add(str);
}
System.out.println(list+"-------------");
br.close();
}catch (FtpProtocolException e) {
e.printStackTrace();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
return list;
}
}
}
MyTask類:
package com.example.demo;
import org.apache.hadoop.hbase.client.Connection;
import sun.applet.Main;
import sun.net.ftp.FtpClient;
import java.io.*;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.locks.Lock;
import java.util.logging.FileHandler;
import java.util.logging.Logger;
public class MyTask implements Runnable {
private int taskNum;
private Connection connection;
private Lock lock;
private FtpClient ftpClient;
private RemoteExecuteCommand rec;
public MyTask(int num, Connection connection, FtpClient ftpClient,RemoteExecuteCommand rec) {
this.taskNum = num;
this.connection=connection;
this.ftpClient=ftpClient;
this.rec=rec;
}
@Override
public void run(){
System.out.println(Thread.currentThread().getName() + "執行緒的名字");
final Logger logger=Logger.getLogger(Main.class.toString());//日誌
StringBuffer logPath=new StringBuffer();
logPath.append("E:\\Logger"); //設定儲存路徑
//設定檔名
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
logPath.append("\\"+sdf.format(new Date())+".log");
//將輸出handler加入logger
try {
FileHandler fileHandler=new FileHandler(logPath.toString(),true);
logger.addHandler(fileHandler);
}catch (IOException e){
e.printStackTrace();
}
try {
while (true) {
SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long start,end;
start = System.currentTimeMillis();
Date date1=new Date(start);
String date1String = formatter.format(date1);
RemoteExecuteCommand rec=new RemoteExecuteCommand("10.248.56.120", "root","password");
String aa= rec.execute("ls /usr/HDWork/in");
if (aa.length() > 0) {
//Sync sync = new Sync();//開啟靜態鎖,(靜態鎖不能封裝到方法中,此處嘗試錯誤)
// String fileName=sync.Mv();//呼叫靜態鎖方法,獲取work目錄中檔案時間最小的檔案,將檔名稱存放在1.txt;將獲取的檔案移動到in目錄中等待操作,刪除1.txt。
String fileName=this.Mv();//靜態鎖方法,獲取work目錄中檔案時間最小的檔名字,將檔名稱存放在1.txt;將獲取的檔案移動到in目錄中等待操作,將檔名字返回,。
if(fileName==null || fileName.equals("")){
System.out.println(Thread.currentThread().getName()+"沒有檔案");
}else {
ReadIn.add(fileName, connection,ftpClient);//解析in目錄下的檔案內容,插入到hbase
}
if(fileName==null || fileName.equals("")){
System.out.println(Thread.currentThread().getName()+"沒有檔案");
}else {
if(Cmd.mvBack(fileName,rec)) {
end = System.currentTimeMillis();
Date date2=new Date(end);
String date2String = formatter.format(date2);
System.out.println(Thread.currentThread().getName() + ":執行檔案:." + fileName + ";開始時間為:"+date1String+";結束時間為:"+date2String+";新增成功,共用時"+(end-start)+"ms。");
logger.info(Thread.currentThread().getName() + ":執行檔案:." + fileName + ";開始時間為:"+date1String+";結束時間為:"+date2String+";新增成功,共用時"+(end-start)+"ms。");
}else {
System.out.println("過濾異常");
logger.info("資料插入成功,但是移動到back中失敗");
}
}
} else {
System.out.println("run中的=="+Thread.currentThread().getName()+"沒有檔案");
Thread.sleep(1000);
}
}
}catch (Exception e){
System.out.println("run異常"+e.getMessage());
}
}
//此處用到了執行緒鎖,避免多個執行緒同時操作同一個檔案,重複新增
synchronized public String Mv(){
synchronized (MyTask.class) {
rec.execute("cd /usr/HDWork/in");
String fileName = rec.execute("cd /usr/HDWork/in ; ls -lrt|sed -n \"2, 1p\"|awk '{print $9}'");
fileName = fileName.replace("\n", " ");
String mv = "mv /usr/HDWork/in/" + fileName + " /usr/HDWork/work";
if (!"".equals(fileName)) {
rec.execute(mv);
System.out.println(fileName + "======================");
return fileName;
} else {
return null;
}
}
}
}
LinkHbase類:連線資料庫
public class LinkHbase {
private static Configuration conf = HBaseConfiguration.create();
private static Admin admin;
static {
conf.set("hbase.rootdir", "hdfs://node1:9000/hbase");
// 設定Zookeeper,直接設定IP地址
conf.set("hbase.zookeeper.quorum", "叢集1,叢集2,叢集3");
}
public static Connection table() throws Exception{
Connection connection = ConnectionFactory.createConnection(conf);
return connection;
}
}
RemoteExcuteCommaand類:遠端執行Linux
/**
* 遠端執行linux的shell script
* @author Ickes
* @since V0.1
*/
public class RemoteExecuteCommand {
//字元編碼預設是utf-8
private static String DEFAULTCHART="UTF-8";
private Connection conn;
private String ip;
private String userName;
private String userPwd;
public RemoteExecuteCommand(String ip, String userName, String userPwd) {
this.ip = ip;
this.userName = userName;
this.userPwd = userPwd;
}
//無參方法
public RemoteExecuteCommand() {
}
/**
* 遠端登入linux的主機
* @author Ickes
* @since V0.1
* @return
* 登入成功返回true,否則返回false
*/
public Boolean login(){
boolean flg=false;
try {
conn = new Connection(ip);
conn.connect();//連線
flg=conn.authenticateWithPassword(userName, userPwd);//認證
} catch (IOException e) {
e.printStackTrace();
}
return flg;
}
/**
* @author Ickes
* 遠端執行shll指令碼或者命令
* @param cmd
* 即將執行的命令
* @return
* 命令執行完後返回的結果值
* @since V0.1
*/
public String execute(String cmd){
String result="";
try {
if(login()){
Session session= conn.openSession();//開啟一個會話
session.execCommand(cmd);//執行命令
result=processStdout(session.getStdout(),DEFAULTCHART);
//如果為得到標準輸出為空,說明指令碼執行出錯了
if(StringUtils.isBlank(result)){
result=processStdout(session.getStderr(),DEFAULTCHART);
}
conn.close();
session.close();
}
} catch (IOException e) {
e.printStackTrace();
}
return result;
}
/**
* @author Ickes
* 遠端執行shll指令碼或者命令
* @param cmd
* 即將執行的命令
* @return
* 命令執行成功後返回的結果值,如果命令執行失敗,返回空字串,不是null
* @since V0.1
*/
public String executeSuccess(String cmd){
String result="";
try {
if(login()){
Session session= conn.openSession();//開啟一個會話
session.execCommand(cmd);//執行命令
result=processStdout(session.getStdout(),DEFAULTCHART);
conn.close();
session.close();
}
} catch (IOException e) {
e.printStackTrace();
}
return result;
}
/**
* 解析指令碼執行返回的結果集
* @author Ickes
* @param in 輸入流物件
* @param charset 編碼
* @since V0.1
* @return
* 以純文字的格式返回
*/
private String processStdout(InputStream in, String charset){
InputStream stdout = new StreamGobbler(in);
StringBuffer buffer = new StringBuffer();;
try {
BufferedReader br = new BufferedReader(new InputStreamReader(stdout,charset));
String line=null;
while((line=br.readLine()) != null){
buffer.append(line+"\n");
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
} catch (IOException e)
e.printStackTrace();
}
return buffer.toString();
}
public static void setCharset(String charset) {
DEFAULTCHART = charset;
}
public Connection getConn() {
return conn;
}
public void setConn(Connection conn) {
this.conn = conn;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public String getUserName() {
return userName;
}
public void setUserName(String userName) {
this.userName = userName;
}
public String getUserPwd() {
return userPwd;
}
public void setUserPwd(String userPwd) {
this.userPwd = userPwd;
}