1. 程式人生 > >2018-07-23期 Hadoop RPC模擬NameNode

2018-07-23期 Hadoop RPC模擬NameNode

ole 直接 fig sof hdfs 調用 節點 啟動 構建

一、服務端代碼實現

1、定義接口

package cn.sjq.rpc.java;

import org.apache.hadoop.ipc.VersionedProtocol;

/**

* 定義接口IMyNameNode並繼承org.apache.hadoop.ipc.VersionedProtocol接口

* 本接口主要模擬定義自定義的Hadoop RPC通信,模擬Namenode節點在HDFS創建元數據(創建目錄)、瀏覽元數據(瀏覽目錄、文件)

* @author songjq

*

*/

public interface IMyNameNode extends VersionedProtocol {

/*

* 定義ID號 定義一個簽名,通過這個ID,就能區分在客戶端調用的時候,具體調用哪個實現 要求:名稱必須叫versionID

*/

public static long versionID = 1l;

/*

* 創建目錄

*/

public String createForder(String dir) throws Exception;

/*

* 瀏覽目錄,包括子目錄

*/

public String listForder(String dir) throws Exception;

}

2、實現接口

package cn.sjq.rpc.java;

import java.io.IOException;

import java.net.URI;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.FileSystem;

import org.apache.hadoop.fs.LocatedFileStatus;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.fs.RemoteIterator;

import org.apache.hadoop.fs.permission.FsAction;

import org.apache.hadoop.fs.permission.FsPermission;

import org.apache.hadoop.ipc.ProtocolSignature;

/**

* IMyNameNode的實現類

* 主要對IMyNameNode定義的方法進行實現

* @author songjq

*

*/

public class MyNameNodeImpl implements IMyNameNode {

/*

* 通過IMyNameNode.versionID構造一個簽名

* (non-Javadoc)

* @see org.apache.hadoop.ipc.VersionedProtocol#getProtocolSignature(java.lang.String, long, int)

*/

@Override

public ProtocolSignature getProtocolSignature(String arg0, long arg1, int arg2) throws IOException {

return new ProtocolSignature(IMyNameNode.versionID, null);

}

/* 直接返回IMyNameNode.versionID

* (non-Javadoc)

* @see org.apache.hadoop.ipc.VersionedProtocol#getProtocolVersion(java.lang.String, long)

*/

@Override

public long getProtocolVersion(String arg0, long arg1) throws IOException {

return IMyNameNode.versionID;

}

/*

* 在HDFS上創建目錄

* (non-Javadoc)

* @see cn.sjq.rpc.java.IMyNameNode#createForder(java.lang.String)

*/

@Override

public String createForder(String dir) throws Exception{

//獲得HDFS客戶端連接

Configuration conf = new Configuration();

conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");

FileSystem client = FileSystem.get(new URI("hdfs://hadoop-server01:9000"), conf, "root");

//創建目錄

boolean mkdirs = client.mkdirs(new Path(dir));

client.close();

//返回創建結果

return "Direcotory ->\t\t"+dir+"\t\t"+(mkdirs?"successfull created!":"created failed!");

}

/*

* 在HDFS瀏覽目錄

* (non-Javadoc)

* @see cn.sjq.rpc.java.IMyNameNode#listForder(java.lang.String)

*/

@Override

public String listForder(String dir) throws Exception {

// 獲得HDFS客戶端連接

Configuration conf = new Configuration();

conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");

FileSystem client = FileSystem.get(new URI("hdfs://hadoop-server01:9000"), conf, "root");

RemoteIterator<LocatedFileStatus> listFiles = client.listFiles(new Path(dir), true);

// 定義一個stringbuffer對象接收處理結果

StringBuffer filebuffer = new StringBuffer("Onwer \t\t UserPri \t\t BlockSize \t\t Path \t\t\n");

// 叠代listFiles

while (listFiles.hasNext()) {

LocatedFileStatus file = listFiles.next();

String fname = file.getPath().getName();

String path = file.getPath().toString();

String owner = file.getOwner();

long blockSize = file.getBlockSize();

FsPermission permission = file.getPermission();

FsAction userAction = permission.getUserAction();

FsAction groupAction = permission.getGroupAction();

FsAction otherAction = permission.getOtherAction();

userAction.toString();

filebuffer.append(owner).append(" \t\t").append(" ").append(userAction.toString()).append(" \t\t ")

.append(blockSize).append(" \t\t ").append(path).append(" \t\t").append("\n");

}

client.close();

return filebuffer.toString();

}

}

3、構建RPC通信服務

package cn.sjq.rpc.java;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.ipc.RPC;

import org.apache.hadoop.ipc.RPC.Builder;

import org.apache.hadoop.ipc.RPC.Server;

/**

* 構造RPC通信程序,並將業務類註冊到RPC通信服務中

* @author songjq

*

*/

public class MyRpcInstance {

public static void main(String[] args) throws Exception, IOException {

//創建hadoop RPC通信builder

Builder builder = new RPC.Builder(new Configuration());

//設置RPC通信地址

builder.setBindAddress("hadoop-server01");

//設置RPC通信端口

builder.setPort(9090);

//將程序IMyNameNode部署到RPC server上

builder.setProtocol(IMyNameNode.class);

//將IMyNameNode接口實現也部署到RPC server上

builder.setInstance(new MyNameNodeImpl());

//構建一個RPC server

Server server = builder.build();

//啟動RPC通信服務

server.start();

System.out.println("******* RPC Server has been started... *********");

}

}

二、客戶端代碼實現

1、定義接口

package cn.sjq.rpc.java;

import org.apache.hadoop.ipc.VersionedProtocol;

/**

* RPC客戶端定義接口,該接口需要和服務端定義的IMyNameNode完全一致

*/

public interface IMyNameNode extends VersionedProtocol {

/*

* 定義ID號 定義一個簽名,通過這個ID,就能區分在客戶端調用的時候,具體調用哪個實現 要求:名稱必須叫versionID

*/

public static long versionID = 1l;

/*

* 創建目錄

*/

public String createForder(String dir) throws Exception;

/*

* 瀏覽目錄,包括子目錄

*/

public String listForder(String dir) throws Exception;

}

2、客戶端代理調用

package cn.sjq.rpc.java;

import java.io.IOException;

import java.net.InetSocketAddress;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.ipc.RPC;

/**

* RPC客戶端調用,這個使用的RPC的動態代理實現對RPC服務端相關方法的訪問

*

* @author songjq

*

*/

public class MyRpcClientInstance {

/**

* 通過RPC調用Server端的功能,拿到是一個代理對象 protocol 服務端部署的接口 clientVersion 服務端部署的版本號ID addr

* 服務端RPC監聽通信地址及端口 conf Hdfs的一個configuration實例對象

*

* @param args

* @throws Exception

*/

public static void main(String[] args) throws Exception {

IMyNameNode proxy = RPC.getProxy(IMyNameNode.class,

1l,

new InetSocketAddress("hadoop-server01", 9090),

new Configuration());

String createForder = proxy.createForder("/rpc/20180720");

System.out.println("****************************創建目錄**********************************");

System.out.println(createForder);

String listForder = proxy.listForder("/user");

System.out.println("****************************瀏覽目錄**********************************");

System.out.println(listForder);

}

}


2018-07-23期 Hadoop RPC模擬NameNode