Hadoop IO流基本操作
阿新 • • 發佈:2019-01-03
HDFS讀資料流程
1.跟namenode通訊查詢元資料,找到檔案塊所在的datanode伺服器
2.挑選一臺datanode(就近原則,然後隨機)伺服器,請求建立socket流
3.datanode開始傳送資料(從磁盤裡讀取資料放入流,以packet為單位做校驗)
4.客戶端以packet為單位接收,先在本地快取,然後寫入檔案
HDFS寫資料流程
1.跟namenode通訊請求上傳資料,namenode檢視目標檔案是否已經存在,父目錄是否存在
2.namenode返回是否可以上傳
3.client請求第一個block該上傳到哪些datanode伺服器上
4.namenode返回3個datanode伺服器ABC
5.client請求3臺datanode中的A上傳資料(本質是RPC呼叫,建立pipeline),A收到請求會繼續呼叫B,然後B呼叫C,將整個pipeline建立完成,逐級返回客戶端
6.client開始向A上傳第一個block(先從磁碟放到一個本地快取),以packet(64k)為單位,通過socket流傳送,A收到packet就會傳給B,B傳給C;A每穿一個packet會放入一個應答佇列,每個datanode節點接收packet成功後會向它的上一級傳送一個標誌成功的響應,檔案就以packet為單位源源不斷的傳送玩第一個block,client再向namenode申請開始上傳第二個block。
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Before;
import org.junit.Test;
public class HdfsIO {
FileSystem fs = null;
@Before
public void init() throws IOException, InterruptedException, URISyntaxException{
fs =FileSystem.get(new URI("hdfs://127.0.0.1:9000"), new Configuration(), "hadoop");
}
/**
* 用IO流下載資料
* @throws IllegalArgumentException
* @throws IOException
*/
public void testDownload() throws IllegalArgumentException, IOException{
FSDataInputStream in = fs.open(new Path("/user/hadoop/in/test.log"));
FileOutputStream out = new FileOutputStream("/home/hadoop/chinesecalendar3.log");
IOUtils.copyBytes(in, out, new Configuration());
IOUtils.closeStream(in);
IOUtils.closeStream(out);
}
/**
* 從指定偏移量讀取HDFS中的檔案資料,具有重大意義
* 在分散式資料處理時,可以將資料分片
* @throws IllegalArgumentException
* @throws IOException
*/
@Test
public void testSeek() throws IllegalArgumentException, IOException{
FSDataInputStream in = fs.open(new Path("/user/hadoop/in/t1.txt"));
in.seek(6);
FileOutputStream out = new FileOutputStream("/home/hadoop/t1.log");
IOUtils.copyBytes(in, out, 4096);
IOUtils.closeStream(in);
IOUtils.closeStream(out);
}
/**
*IO流上傳檔案
* @throws IllegalArgumentException
* @throws IOException
*/
@Test
public void testUpload() throws IllegalArgumentException, IOException{
FileInputStream in = new FileInputStream("/home/hadoop/t1.log");
FSDataOutputStream out = fs.create(new Path("/user/hadoop/in/t2.txt"));
IOUtils.copyBytes(in, out, 4096);
IOUtils.closeStream(in);
IOUtils.closeStream(out);
}
public static void main(String[] args) {
}
}