2018-07-23期 Hadoop RPC模擬NameNode
1、定義接口
package cn.sjq.rpc.java;
import org.apache.hadoop.ipc.VersionedProtocol;
/**
* 定義接口IMyNameNode並繼承org.apache.hadoop.ipc.VersionedProtocol接口
* 本接口主要模擬定義自定義的Hadoop RPC通信,模擬Namenode節點在HDFS創建元數據(創建目錄)、瀏覽元數據(瀏覽目錄、文件)
* @author songjq
*
*/
public interface IMyNameNode extends VersionedProtocol {
/*
* 定義ID號 定義一個簽名,通過這個ID,就能區分在客戶端調用的時候,具體調用哪個實現 要求:名稱必須叫versionID
*/
public static long versionID = 1l;
/*
* 創建目錄
*/
public String createForder(String dir) throws Exception;
/*
* 瀏覽目錄,包括子目錄
*/
public String listForder(String dir) throws Exception;
}
2、實現接口
package cn.sjq.rpc.java;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ipc.ProtocolSignature;
/**
* IMyNameNode的實現類
* 主要對IMyNameNode定義的方法進行實現
* @author songjq
*
*/
public class MyNameNodeImpl implements IMyNameNode {
/*
* 通過IMyNameNode.versionID構造一個簽名
* (non-Javadoc)
* @see org.apache.hadoop.ipc.VersionedProtocol#getProtocolSignature(java.lang.String, long, int)
*/
@Override
public ProtocolSignature getProtocolSignature(String arg0, long arg1, int arg2) throws IOException {
return new ProtocolSignature(IMyNameNode.versionID, null);
}
/* 直接返回IMyNameNode.versionID
* (non-Javadoc)
* @see org.apache.hadoop.ipc.VersionedProtocol#getProtocolVersion(java.lang.String, long)
*/
@Override
public long getProtocolVersion(String arg0, long arg1) throws IOException {
return IMyNameNode.versionID;
}
/*
* 在HDFS上創建目錄
* (non-Javadoc)
* @see cn.sjq.rpc.java.IMyNameNode#createForder(java.lang.String)
*/
@Override
public String createForder(String dir) throws Exception{
//獲得HDFS客戶端連接
Configuration conf = new Configuration();
conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
FileSystem client = FileSystem.get(new URI("hdfs://hadoop-server01:9000"), conf, "root");
//創建目錄
boolean mkdirs = client.mkdirs(new Path(dir));
client.close();
//返回創建結果
return "Direcotory ->\t\t"+dir+"\t\t"+(mkdirs?"successfull created!":"created failed!");
}
/*
* 在HDFS瀏覽目錄
* (non-Javadoc)
* @see cn.sjq.rpc.java.IMyNameNode#listForder(java.lang.String)
*/
@Override
public String listForder(String dir) throws Exception {
// 獲得HDFS客戶端連接
Configuration conf = new Configuration();
conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
FileSystem client = FileSystem.get(new URI("hdfs://hadoop-server01:9000"), conf, "root");
RemoteIterator<LocatedFileStatus> listFiles = client.listFiles(new Path(dir), true);
// 定義一個stringbuffer對象接收處理結果
StringBuffer filebuffer = new StringBuffer("Onwer \t\t UserPri \t\t BlockSize \t\t Path \t\t\n");
// 叠代listFiles
while (listFiles.hasNext()) {
LocatedFileStatus file = listFiles.next();
String fname = file.getPath().getName();
String path = file.getPath().toString();
String owner = file.getOwner();
long blockSize = file.getBlockSize();
FsPermission permission = file.getPermission();
FsAction userAction = permission.getUserAction();
FsAction groupAction = permission.getGroupAction();
FsAction otherAction = permission.getOtherAction();
userAction.toString();
filebuffer.append(owner).append(" \t\t").append(" ").append(userAction.toString()).append(" \t\t ")
.append(blockSize).append(" \t\t ").append(path).append(" \t\t").append("\n");
}
client.close();
return filebuffer.toString();
}
}
3、構建RPC通信服務
package cn.sjq.rpc.java;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Builder;
import org.apache.hadoop.ipc.RPC.Server;
/**
* 構造RPC通信程序,並將業務類註冊到RPC通信服務中
* @author songjq
*
*/
public class MyRpcInstance {
public static void main(String[] args) throws Exception, IOException {
//創建hadoop RPC通信builder
Builder builder = new RPC.Builder(new Configuration());
//設置RPC通信地址
builder.setBindAddress("hadoop-server01");
//設置RPC通信端口
builder.setPort(9090);
//將程序IMyNameNode部署到RPC server上
builder.setProtocol(IMyNameNode.class);
//將IMyNameNode接口實現也部署到RPC server上
builder.setInstance(new MyNameNodeImpl());
//構建一個RPC server
Server server = builder.build();
//啟動RPC通信服務
server.start();
System.out.println("******* RPC Server has been started... *********");
}
}
二、客戶端代碼實現
1、定義接口
package cn.sjq.rpc.java;
import org.apache.hadoop.ipc.VersionedProtocol;
/**
* RPC客戶端定義接口,該接口需要和服務端定義的IMyNameNode完全一致
*/
public interface IMyNameNode extends VersionedProtocol {
/*
* 定義ID號 定義一個簽名,通過這個ID,就能區分在客戶端調用的時候,具體調用哪個實現 要求:名稱必須叫versionID
*/
public static long versionID = 1l;
/*
* 創建目錄
*/
public String createForder(String dir) throws Exception;
/*
* 瀏覽目錄,包括子目錄
*/
public String listForder(String dir) throws Exception;
}
2、客戶端代理調用
package cn.sjq.rpc.java;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
/**
* RPC客戶端調用,這個使用的RPC的動態代理實現對RPC服務端相關方法的訪問
*
* @author songjq
*
*/
public class MyRpcClientInstance {
/**
* 通過RPC調用Server端的功能,拿到是一個代理對象 protocol 服務端部署的接口 clientVersion 服務端部署的版本號ID addr
* 服務端RPC監聽通信地址及端口 conf Hdfs的一個configuration實例對象
*
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
IMyNameNode proxy = RPC.getProxy(IMyNameNode.class,
1l,
new InetSocketAddress("hadoop-server01", 9090),
new Configuration());
String createForder = proxy.createForder("/rpc/20180720");
System.out.println("****************************創建目錄**********************************");
System.out.println(createForder);
String listForder = proxy.listForder("/user");
System.out.println("****************************瀏覽目錄**********************************");
System.out.println(listForder);
}
}
2018-07-23期 Hadoop RPC模擬NameNode