1. 程式人生 > >用java api讀取HDFS檔案

用java api讀取HDFS檔案

import java.io.IOException;
import java.io.InputStream;
import java.security.PrivilegedExceptionAction;
import java.text.SimpleDateFormat;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.UserGroupInformation; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import bean.TableStatistic; @Controller @RequestMapping("/dfview") public class DataFrameViewController extends BaseController { private ConcurrentMap<String, UserGroupInformation> cache = new
ConcurrentHashMap<String, UserGroupInformation>(); private ConcurrentMap<String, FileSystem> fileSystemCache = new ConcurrentHashMap<String, FileSystem>(); private Configuration hadoopConf = new Configuration(); private static final String HDFS_JSON_NAME = "jsonObj"; @RequestMapping(value = "/getDFviewOfColumn", method = { RequestMethod.GET }) @ResponseBody public TableStatistic getDFviewOfTable(String tableName) throws Exception { String user = "bi"; String dirpath = "/user/cbt/datax/temp_transfer/zzzdes"; Path homePath = new Path(dirpath); FileSystem fs = this.createFileSystem(user); FileStatus[] stats = fs.listStatus(homePath); StringBuffer txtContent = new StringBuffer(); for (int i = 0; i < stats.length; ++i) { if (stats[i].isFile()) { FileStatus file = stats[i]; if( HDFS_JSON_NAME.equalsIgnoreCase(file.getPath().getName())){ InputStream in = fs.open(file.getPath()); byte[] b = new byte[1]; while (in.read(b) != -1) { // 字串拼接 txtContent.append(new String(b)); } in.close(); break; } } } TableStatistic ts = JSON.parseObject(txtContent.toString(), TableStatistic.class); return ts; } public static void main(String[] args) throws Exception { DataFrameViewController aaa = new DataFrameViewController(); FileSystem fs = aaa.createFileSystem("bi"); Path homePath = new Path("/user/cbt/datax/temp_transfer/zzzdes"); System.out.println("***********************************"); FileStatus[] stats = fs.listStatus(homePath); for (int i = 0; i < stats.length; ++i) { if (stats[i].isFile()) { FileStatus file = stats[i]; StringBuffer txtContent = new StringBuffer(); if( "jsonObj".equalsIgnoreCase(file.getPath().getName())){ InputStream in = fs.open(file.getPath()); byte[] b = new byte[1]; while (in.read(b) != -1) { // 字串拼接 txtContent.append(new String(b)); } // IOUtils.copyBytes(fs.open(file.getPath()), System.out, 4096,false); in.close(); // fs.close(); } System.out.print(txtContent.toString()); System.out .println("************************************************"); JSONObject jb = JSON.parseObject(txtContent.toString()); System.out.println("********!!!!! : " + jb.get("colUnique")); TableStatistic ts = JSON.parseObject(txtContent.toString(), TableStatistic.class); System.out.println("********!!!!! : " + ts.getColUnique().toString()); } else if (stats[i].isDirectory()) { System.out.println(stats[i].getPath().toString()); } else if (stats[i].isSymlink()) { System.out.println("&&&&&&&&" + stats[i].getPath().toString()); } } FsStatus fsStatus = fs.getStatus(homePath); } public FileSystem createFileSystem(String user) throws Exception { final Configuration conf = loadHadoopConf(); conf.set("hadoop.job.ugi", user); // conf.set("HADOOP_USER_NAME", user); if (fileSystemCache.get(user) != null) { return fileSystemCache.get(user); } UserGroupInformation ugi = getProxyUser(user); FileSystem fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() { public FileSystem run() throws Exception { return FileSystem.get(conf); } }); fileSystemCache.put(user, fs); return fs; } public static final ThreadLocal<SimpleDateFormat> appDateFormat = new ThreadLocal<SimpleDateFormat>() { @Override public SimpleDateFormat initialValue() { SimpleDateFormat dateformat = new java.text.SimpleDateFormat( "yyyy-MM-dd HH:mm:ss"); return dateformat; } }; private static final String[] HADOOP_CONF_FILES = { "core-site.xml", "hdfs-site.xml" }; private Configuration loadHadoopConf() { if (hadoopConf != null) { return hadoopConf; } Configuration conf = new Configuration(); for (String fileName : HADOOP_CONF_FILES) { try { InputStream inputStream = DataFrameViewController.class .getClassLoader().getResourceAsStream(fileName); conf.addResource(inputStream); } catch (Exception ex) { } } return conf; } public void destroy() { for (UserGroupInformation ugi : cache.values()) { try { FileSystem.closeAllForUGI(ugi); } catch (IOException ioe) { // Logger.error("Exception occurred while closing filesystems for " // + ugi.getUserName(), ioe); } } cache.clear(); } private UserGroupInformation getProxyUser(String user) throws IOException { cache.putIfAbsent(user, UserGroupInformation.createRemoteUser(user)); return cache.get(user); } }

相關推薦

java api讀取HDFS檔案

import java.io.IOException; import java.io.InputStream; import java.security.PrivilegedExceptionAction; import java.text.SimpleDateFormat; import jav

java Api 讀取HDFS檔案內容

package dao; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import java.io.*; public class HDFSApi { /** * 讀取檔案內

JAVA API 讀取hdfs系統檔案

package com.company; /** * @Author zhaoxin * @Email [email protected] * @Description //TODO 注意許可權問題 * @Date 2018/10/11 **/ imp

Hadoop學習筆記一(通過Java API 操作HDFS,檔案上傳、下載)

package demo.hdfs; import java.util.Arrays; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; impor

java API 操作HDFS檔案系統

1.Maven 構建java工程 2.新增HDFS相關依賴 <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>

呼叫JAVA APIHDFS檔案進行檔案的讀寫、上傳下載、刪除等操作程式碼詳解

Hadoop檔案系統  基本的檔案系統命令操作, 通過hadoop fs -help可以獲取所有的命令的詳細幫助檔案。 Java抽象類org.apache.hadoop.fs.FileSystem定義了hadoop的一個檔案系統介面。該類是一個抽象類,通過以下兩種靜態工廠方

呼叫JAVA APIHDFS 進行檔案讀取、寫入、上傳、下載、刪除等操作

Hadoop檔案系統 基本的檔案系統命令操作, 通過hadoop fs -help可以獲取所有的命令的詳細幫助檔案。 Java抽象類org.apache.hadoop.fs.FileSystem定義了hadoop的一個檔案系統介面。該類是一個抽象類,通過以下兩種靜態工廠方法

java api 讀取hadoop中hdfs檔案系統內的檔案

hadoop與hdfs需要自己安裝,確保檔案存在於hdfs 只有一個main方法 Test.java import org.apache.hadoop.conf.Configuration;

使用HDFS客戶端java api讀取hadoop集群上的信息

tor ioe get tro names uri context add 集群配置 本文介紹使用hdfs java api的配置方法。 1、先解決依賴,pom <dependency> <groupId>org.apach

springboot上傳下載檔案(3)--java api 操作HDFS叢集+叢集配置

 只有光頭才能變強! 前一篇文章講了nginx+ftp搭建獨立的檔案伺服器 但這個伺服器宕機了怎麼辦? 我們用hdfs分散式檔案系統來解決這個問題(同時也為hadoop系列開個頭) 目錄 1、Ubuntu14.04下配置Hadoop(2.8.5)叢集環境詳解(完全分

JAVADOM方式讀取xml檔案

Status.xml<?xml version="1.0" encoding="UTF-8"?><StatuList>    <Statu id="1">        <id>1</id>        <n

cmd命令快速建立一個1GB的檔案,然後java快速讀取

一.建立一個1GB的檔案使用fsutil命令來建立1.以管理員身份執行命令提示符2.開始建立 輸入命令:fsutil file createnew e:\1GB.txt 1073741824(1GB)回車建立成功,去相應的碟符地下找到檢視屬性就是1GB二.使用java來快速讀

Java API讀取CDH-Hadoop Parquet檔案

由於工作需要,基於目前公司叢集存在較多的伺服器且存在大量的記憶體,因此考慮直接將資料Load進記憶體進行資料處理,測試是否能夠加快處理速度;鑑於以上目的,版主嘗試使用Parquet的Java API讀入Parquet檔案。 目前關於使用Java API訪問HD

Structure Streaming和spark streaming原生API訪問HDFS檔案資料對比

此文已由作者嶽猛授權網易雲社群釋出。 歡迎訪問網易雲社群,瞭解更多網易技術產品運營經驗。 Structure Stream訪問方式 code examples import org.apache.spark.sql.streaming._ val df = spark.

IDEA編寫wordcount,讀取hdfs檔案,執行在Spark叢集例子

前期:已安裝好hadoop叢集和spark叢集,hadoop2.6.5,spark2.3.1,jdk1.8. scala2.1.0 第一步:在idea編寫scala程式,並且要打包(pom檔案的build標籤中配置好maven打包程式碼,可以定義主類也可以在提交的時候再定義){補充:可以在s

java讀取Property檔案屬性工具類

java中讀取Property配置檔案屬性工具類: import java.util.Locale; import java.util.MissingResourceException; import java.util.ResourceBundle; /** * 讀取Property配置檔

java selenium 讀取配置檔案,報錯中文亂碼

參考引自:https://blog.csdn.net/qq_27093465/article/details/70765870 根據自己問題解決: package com.property; import java.io.BufferedInputStream; import java.i

Java API 讀取HBase表資料

Java API 讀取HBase表資料 1. 在使用java api 去獲取資料的時候,先用 hbase shell 展示一下 hbase 中的表。 hbase(main):005:0> scan 'tsdb-uid' ROW

HDFS(三)—— Java 建立一個 HDFS 目錄,HDFS 的許可權的問題

一、匯入 HDFS 所需 jar 包 把解壓後的 hadoop 資料夾下的 common 目錄中的 jar,和裡面的 lib 包中所有的 jar,以及 hdfs 目錄下的 jar,和裡面的 lib 包中所有的 jar 都新增到專案的環境變數中。 二、編寫測試程式碼 im

Java 讀寫 hdfs檔案或者目錄

1.讀取單個檔案     [java] view plain copy Date date = DateUtil.getSpecifiedDayBefore();   String&