1. 程式人生 > >Hadoop IO流基本操作

Hadoop IO流基本操作

HDFS讀資料流程
1.跟namenode通訊查詢元資料,找到檔案塊所在的datanode伺服器
2.挑選一臺datanode(就近原則,然後隨機)伺服器,請求建立socket流
3.datanode開始傳送資料(從磁盤裡讀取資料放入流,以packet為單位做校驗)
4.客戶端以packet為單位接收,先在本地快取,然後寫入檔案

HDFS寫資料流程
1.跟namenode通訊請求上傳資料,namenode檢視目標檔案是否已經存在,父目錄是否存在
2.namenode返回是否可以上傳
3.client請求第一個block該上傳到哪些datanode伺服器上
4.namenode返回3個datanode伺服器ABC
5.client請求3臺datanode中的A上傳資料(本質是RPC呼叫,建立pipeline),A收到請求會繼續呼叫B,然後B呼叫C,將整個pipeline建立完成,逐級返回客戶端
6.client開始向A上傳第一個block(先從磁碟放到一個本地快取),以packet(64k)為單位,通過socket流傳送,A收到packet就會傳給B,B傳給C;A每穿一個packet會放入一個應答佇列,每個datanode節點接收packet成功後會向它的上一級傳送一個標誌成功的響應,檔案就以packet為單位源源不斷的傳送玩第一個block,client再向namenode申請開始上傳第二個block。

import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import
org.junit.Before; import org.junit.Test; public class HdfsIO { FileSystem fs = null; @Before public void init() throws IOException, InterruptedException, URISyntaxException{ fs =FileSystem.get(new URI("hdfs://127.0.0.1:9000"), new Configuration(), "hadoop"); } /** * 用IO流下載資料 * @throws
IllegalArgumentException * @throws IOException */
public void testDownload() throws IllegalArgumentException, IOException{ FSDataInputStream in = fs.open(new Path("/user/hadoop/in/test.log")); FileOutputStream out = new FileOutputStream("/home/hadoop/chinesecalendar3.log"); IOUtils.copyBytes(in, out, new Configuration()); IOUtils.closeStream(in); IOUtils.closeStream(out); } /** * 從指定偏移量讀取HDFS中的檔案資料,具有重大意義 * 在分散式資料處理時,可以將資料分片 * @throws IllegalArgumentException * @throws IOException */ @Test public void testSeek() throws IllegalArgumentException, IOException{ FSDataInputStream in = fs.open(new Path("/user/hadoop/in/t1.txt")); in.seek(6); FileOutputStream out = new FileOutputStream("/home/hadoop/t1.log"); IOUtils.copyBytes(in, out, 4096); IOUtils.closeStream(in); IOUtils.closeStream(out); } /** *IO流上傳檔案 * @throws IllegalArgumentException * @throws IOException */ @Test public void testUpload() throws IllegalArgumentException, IOException{ FileInputStream in = new FileInputStream("/home/hadoop/t1.log"); FSDataOutputStream out = fs.create(new Path("/user/hadoop/in/t2.txt")); IOUtils.copyBytes(in, out, 4096); IOUtils.closeStream(in); IOUtils.closeStream(out); } public static void main(String[] args) { } }