HDFS的Java操作方式
在eclipse中呼叫JavaAPI實現HDFS中的相關操作
1、建立一個java工程
2、右鍵工程,在屬性裡新增上hadoop解壓後的相關jar包(hadoop目錄下的jar包和lib目錄下的jar包)
3、呼叫相關程式碼,實現相關hdfs操作
複製程式碼
1 package hdfs;
2
3 import java.io.InputStream;
4 import java.net.URL;
5
6 import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
7 import org.apache.hadoop.io.IOUtils;
8
9 public class App1 {
10 /**
11 * 異常:unknown host: chaoren 本機沒有解析主機名chaoren
12 * 在C:\Windows\System32\drivers\etc\hosts檔案中新增192.168.80.100
13 * chaoren(win10中要新增寫入許可權才能寫入)
14 /
15 static final String PATH = “hdfs://chaoren:9000/hello”;
16
17 public static void main(String[] args) throws Exception {
18 // 讓URL能夠解析hdfs協議
19 URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
20 URL url = new URL(PATH);
21 InputStream in = url.openStream();
22 /
23 * @param in
24 * 輸入流
25 * @param out
26 * 輸出流
27 * @param buffSize
28 * 緩衝大小
29 * @param close
30 * 在傳輸結束後是否關閉流
31 */
32 IOUtils.copyBytes(in, System.out, 1024, true);// 讀取檔案hello中的內容
33 }
34
35 }
複製程式碼
1 package hdfs;
2
3 import java.io.FileInputStream;
4 import java.io.FileNotFoundException;
5 import java.io.IOException;
6 import java.net.URI;
7 import java.net.URISyntaxException;
8
9 import org.apache.hadoop.conf.Configuration;
10 import org.apache.hadoop.fs.FSDataInputStream;
11 import org.apache.hadoop.fs.FSDataOutputStream;
12 import org.apache.hadoop.fs.FileStatus;
13 import org.apache.hadoop.fs.FileSystem;
14 import org.apache.hadoop.fs.Path;
15 import org.apache.hadoop.io.IOUtils;
16
17 public class App2 {
18 static final String PATH = “hdfs://chaoren:9000/”;
19 static final String DIR = “/d1”;
20 static final String FILE = “/d1/hello”;
21
22 public static void main(String[] args) throws Exception {
23 FileSystem fileSystem = getFileSystem();
24
25 // 建立資料夾 hadoop fs -mkdir /d1
26 mkDir(fileSystem);
27
28 // 上傳檔案 hadoop fs -put src des
29 putData(fileSystem);
30
31 // 下載檔案 hadoop fs -get src des
32 getData(fileSystem);
33
34 // 瀏覽資料夾 hadoop fs -lsr path
35 list(fileSystem);
36
37 // 刪除資料夾 hadoop fs -rmr /d1
38 remove(fileSystem);
39 }
40
41 private static void remove(FileSystem fileSystem) throws IOException {
42 fileSystem.delete(new Path(DIR), true);
43 }
44
45 private static void list(FileSystem fileSystem) throws IOException {
46 FileStatus[] listStatus = fileSystem.listStatus(new Path("/"));
47 for (FileStatus fileStatus : listStatus) {
48 String isDir = fileStatus.isDir() ? “資料夾” : “檔案”;
49 String permission = fileStatus.getPermission().toString();
50 int replication = fileStatus.getReplication();
51 long len = fileStatus.getLen();
52 String path = fileStatus.getPath().toString();
53 System.out.println(isDir + “\t” + permission + “\t” + replication
54 + “\t” + len + “\t” + path);
55 }
56 }
57
58 private static void getData(FileSystem fileSystem) throws IOException {
59 FSDataInputStream inputStream = fileSystem.open(new Path(FILE));
60 IOUtils.copyBytes(inputStream, System.out, 1024, true);
61 }
62
63 private static void putData(FileSystem fileSystem) throws IOException,
64 FileNotFoundException {
65 FSDataOutputStream out = fileSystem.create(new Path(FILE));
66 FileInputStream in = new FileInputStream(“C:/Users/ahu_lichang/cp.txt”);// 斜槓方向跟Windows下是相反的
67 IOUtils.copyBytes(in, out, 1024, true);
68 }
69
70 private static void mkDir(FileSystem fileSystem) throws IOException {
71 fileSystem.mkdirs(new Path(DIR));
72 }
73
74 private static FileSystem getFileSystem() throws IOException,
75 URISyntaxException {
76 FileSystem fileSystem = FileSystem.get(new URI(PATH),
77 new Configuration());
78 return fileSystem;
79 }
80
81 }
複製程式碼
RPC
1.1 RPC (remote procedure call)遠端過程呼叫.
遠端過程指的是不是同一個程序。
1.2 RPC至少有兩個過程。呼叫方(client),被呼叫方(server)。
1.3 client主動發起請求,呼叫指定ip和port的server中的方法,把呼叫結果返回給client。
1.4 RPC是hadoop構建的基礎。
示例:
複製程式碼
1 package rpc;
2
3 import org.apache.hadoop.ipc.VersionedProtocol;
4
5 public interface MyBizable extends VersionedProtocol{
6 long VERSION = 2345L;
7 public abstract String hello(String name);
8 }
複製程式碼
1 package rpc;
2
3 import java.io.IOException;
4
5 public class MyBiz implements MyBizable{
6
7 public long getProtocolVersion(String arg0, long arg1) throws IOException {
8 return VERSION;
9 }
10
11 public String hello(String name) {
12 System.out.println(“方法被呼叫了(檢測方法是不是在伺服器上被呼叫的?)”);
13 return "hello "+name;
14 }
15
16 }
複製程式碼
1 package rpc;
2
3 import org.apache.hadoop.conf.Configuration;
4 import org.apache.hadoop.ipc.RPC;
5 import org.apache.hadoop.ipc.RPC.Server;
6
7 public class MyServer {
8 static final String ADDRESS = “localhost”;
9 static final int PORT = 12345;
10 public static void main(String[] args) throws Exception {
11 /**
12 * 構造一個RPC的服務端
13 * @param instance 這個例項中的方法會被呼叫
14 * @param bindAddress 繫結的地址是用於監聽連線的
15 * @param port 繫結的埠是用於監聽連線的
16 * @pparam conf
17 */
18 Server server = RPC.getServer(new MyBiz(), ADDRESS, PORT, new Configuration());
19 server.start();
20 }
21
22 }
複製程式碼
1 package rpc;
2
3 import java.net.InetSocketAddress;
4
5 import org.apache.hadoop.conf.Configuration;
6 import org.apache.hadoop.ipc.RPC;
7
8 public class MyClient {
9 public static void main(String[] args) throws Exception {
10 /**
11 * 構造一個客戶端代理物件,該代理物件實現了命名的協議。代理物件會與指定地址的伺服器通話
12 */
13 MyBizable proxy = (MyBizable) RPC.waitForProxy(MyBizable.class,
14 MyBizable.VERSION, new InetSocketAddress(MyServer.ADDRESS,
15 MyServer.PORT), new Configuration());
16 String result = proxy.hello(“hadoop!!!”);
17 System.out.println(“客戶端RPC後的結果:” + result);
18 // 關閉網路連線
19 RPC.stopProxy(proxy);
20 }
21 }
通過例子獲得的認識
2.1 RPC是一個遠端過程呼叫。
2.2 客戶端呼叫服務端的方法,意味著呼叫服務端的物件中的方法。
2.3 如果服務端的物件允許客戶端呼叫,那麼這個物件必須實現介面。
2.4 如果客戶端能夠呼叫到服務端物件的方法,那麼這些方法一定位於物件的介面中。
Bad connection to FS. command aborted. exception: Call to chaoren/192.168.80.100:9000 failed on connection exception: java.net.ConnectException: Connection refused
錯誤解決方法:
1、先將/usr/local/hadoop/tmp/dfs資料夾整個刪除掉(cd /usr/local/hadoop/tmp rm -rf dfs)
2、再將namenode進行格式化(hadoop namenode -format)
3、格式化成功後,最後重啟hadoop(cd /usr/local/hadoop/bin start-all.sh)
4、重啟完Hadoop之後,再重新連線hdfs即可(hadoop fs -ls hdfs://chaoren:9000/)。