存在的hive插入資料_資料倉庫分層建設工作記錄
阿新 • • 發佈:2021-01-14
技術標籤:存在的hive插入資料
記錄一次工作簡單的資料倉庫建設流程。
其實需求邏輯很簡單,構建一個使用者畫像系統的底層資料供應,本次我僅僅只要將後端的資料從資料庫中拿出來存進入到ES中就好了,具體而言,分為。
1. 梳理業務資料,整理excel,形成資料字典和一系列規範,可以理解成是給資料定性,梳理元資料和資料血緣,知道資料從哪些分表匯聚到主表,業務資料分類等等。
2. 從關係型資料庫中匯入資料到Hive的ods層
3. 從Hive的ods層匯入到cdw層
4. cdw層到ads,cdw層資料同步到es,hive外表構建和es的對應關鍵。
完成以上4部,就完成了這次的工作。中間SQL邏輯僅僅只是結構轉換,將標註資料(寬列表)轉成高表。總體而言比較簡單。
sqoop:
#!/bin/sh su - hive <<EOF /opt/sqoop-1.4.6.bin__hadoop-2.0.4-alpha/bin/sqoop import --connect jdbc:mysql://IP地址:3306/庫名 --username 你 --password 你 --table 表名 --null-string '' --null-non-string '' --lines-terminated-by 'n' --fields-terminated-by 't' --delete-target-dir --num-mappers 1 --hive-import --hive-database 庫名 --hive-table 表名; EOF
建立hive外表關聯ES
DROP TABLE IF EXISTS 庫名.表名; CREATE EXTERNAL TABLE 庫名.表名 ( id string COMMENT 'id', bm string comment 'bm', hbryndgx string comment '業務欄位' , effect_on_date string comment '生效日', create_date string COMMENT '建立時間', creator string COMMENT '建立人', sjly string comment '資料來源' )COMMENT '提示語' STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES( 'es.resource' = 'eeeeeee_hddl', 'es.nodes'='ip地址', 'es.port'='9200', 'es.mapping.id' = 'id', 'es.index.auto.create' = 'true', 'es.write.operation'='upsert', 'es.net.http.auth.user'='賬號', 'es.net.http.auth.pass'='密碼', 'es.mapping.date.rich'='false', 'es.index.read.missing.as.empty'='yes');
部分主SQL
load data local inpath '/var/lib/hive/某個模組/某.txt' into 庫名.表名;
insert overwrite table 庫名.表名
select *
from 庫名.表名 a
lateral view explode(split(bh, '_')) b as bh --1 資料匯入到ODS
部分SQL
INSERT INTO TABLE 庫名.表名
SELECT * FROM 庫名.表名;
truncate table 庫名.表名;
--2 邏輯加工 字典值轉換 合併資料 並將資料插入臨時表中
DROP TABLE IF EXISTS 臨時_TEMP;
CREATE TEMPORARY TABLE 臨時_TEMP AS
SELECT bm AS BM, --拼接主鍵
CAST(NULL AS STRING) AS GRSBH,
FROM_UNIXTIME(UNIX_TIMESTAMP(), 'yyyy-MM-dd') AS EFFECT_ON_DATE,
FROM 表名 ODS
LEFT JOIN (SELECT *
FROM 庫名.表名
WHERE EFFECT_FLAG = '1') AS CDW
ON BM =
CDW.BM
LEFT JOIN 庫名.表名 DEATH
ON ODS.bh=DEATH.BH
WHERE ODS.DELETE_FLAG <> '1';
--3 跟新cdw 刪除 跟新資料
UPDATE 庫名.表名
SET DELETE_FLAG = 'Y',
EFFECT_OFF_DATE = FROM_UNIXTIME(UNIX_TIMESTAMP(), 'yyyy-MM-dd'),
EFFECT_FLAG = '0'
WHERE BM IN (SELECT BM
FROM 庫名.表名 ODS
WHERE DELETE_FLAG = 'Y')
AND EFFECT_FLAG = '1';
--4 跟新cdw 重複上傳的資料
UPDATE 表
SET EFFECT_OFF_DATE = FROM_UNIXTIME(UNIX_TIMESTAMP(), 'yyyy-MM-dd'),
EFFECT_FLAG = '0'
WHERE BM IN (SELECT BM FROM 表)
AND EFFECT_FLAG = '1';
--6 將資料插入到 CDW表
INSERT INTO TABLE 表名
SELECT
*
FROM 臨時表;
--7 將新增資料和修改的資料 上傳到外表
--將臨時表中資料插入到HIVE和ES的外表中
--釋放temp表
truncate table temp臨時表;
查詢語言
GET /_cat/indices?v
GET /ssqk_hddl/_mapping
GET /ssqk_hddl/_search
{
"query": {
"wildcard": {
"xb": "女"
}
}
}
GET /ssqk_hddl/_mapping
java程式碼:
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>7.3.2</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.3.2</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
public RestHighLevelClient connectES() {
//初始化ES操作客戶端
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(""));
// new UsernamePasswordCredentials("elastic", "123456")); //es賬號密碼 測試環境
// new UsernamePasswordCredentials("root", "jkyjy2019")); //es賬號密碼 生產環境
esClient = new RestHighLevelClient(
RestClient.builder(
// new HttpHost(host, 9200)
new HttpHost(host, 9210)
).setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
httpClientBuilder.disableAuthCaching();
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
})
);
return esClient;
}
/**
* 判斷索引是否存在
*
* @param index
* @return
*/
public boolean indexExists(String index) {
GetIndexRequest request = new GetIndexRequest(index);
try {
return esClient.indices().exists(request, RequestOptions.DEFAULT);
} catch (Exception e) {
e.printStackTrace();
}
return false;
}
/**
* 刪除索引資料
*
* @param index
*/
public void deleteIndex(String index) {
DeleteIndexRequest request = new DeleteIndexRequest(index);
AcknowledgedResponse deleteIndexResponse = null;
try {
deleteIndexResponse = esClient.indices().delete(request, RequestOptions.DEFAULT);
} catch (IOException e) {
e.printStackTrace();
}
boolean acknowledged = deleteIndexResponse.isAcknowledged();
}
public void createMapping(String index, List<Map<String, String>> fieldList) {
System.out.println( "索引名: "+index);
try {
CreateIndexRequest createIndexRequest = new CreateIndexRequest(index);
esClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
// builder.startObject(index);
builder.startObject("properties");
for (Map<String, String> entity :
fieldList) {
String dataType = entity.get("data_type");
String dev_mapping = entity.get("dev_mapping");
String sksy = entity.get("sksy");
builder.startObject(dev_mapping);
if ("字元型別".equals(dataType)) {
builder.field("type", "keyword").endObject();
} else if ("整型".equals(dataType)) {
builder.field("type", "integer").endObject();
} else if ("浮點型".equals(dataType)) {
builder.field("type", "double").endObject();
} else if ("日期型".equals(dataType)) {
builder.field("type", "date").field("format", sksy).endObject();
} else {
builder.field("type", "keyword").endObject();
}
}
builder.startObject("id");
builder.field("type","keyword").endObject();
builder.startObject("bm");
builder.field("type","keyword").endObject();
builder.startObject("effect_on_date");
builder.field("type","keyword").endObject();
builder.startObject("create_date");
builder.field("type","keyword").endObject();
builder.startObject("creator");
builder.field("type","keyword").endObject();
builder.startObject("sjly");
builder.field("type","keyword").endObject();
builder.endObject();
builder.endObject();
PutMappingRequest putMappingRequest = new PutMappingRequest(index);
putMappingRequest.source(builder);
esClient.indices().putMapping(putMappingRequest, RequestOptions.DEFAULT);
} catch (Exception e) {
e.printStackTrace();
System.out.println("失敗的索引名稱:"+ index);
}
return;
}
public Boolean existIndex(String index_name) {
// 如果索引名存在則報錯提醒
GetRequest getRequest = new GetRequest();
getRequest.id("1");
getRequest.index(index_name);
try {
boolean exists = esClient.exists(getRequest, RequestOptions.DEFAULT);
if (exists) {
//索引 已經存在
logger.error("索引已經存在! " + index_name);
return true;
}
} catch (IOException e) {
e.printStackTrace();
logger.error("索引已經存在! " + index_name);
return true;
}
return false;
}
public void initMapping2(MysqlUtils mysqlConn) {
String table = mysqlConn.getTable();
// 獲取索引,欄位,型別
List<MappingEntity> list = mysqlConn.getModelData("SELECT INDEX_NAME,DATA_TYPE,DEV_MAPPING,SKSY FROM " + table +
" WHERE 表名 IS NOT NULL ORDER BY ID ");
String index_name = null;
List<Map<String, String>> fieldList = new ArrayList<Map<String, String>>();
for (MappingEntity mappingEntity : list) {
//初始化索引名
if (index_name == null) {
index_name = mappingEntity.getIndex_name();
}
if (index_name.equals(mappingEntity.getIndex_name())) {
Map<String, String> fields = new HashMap<String, String>();
fields.put("data_type", mappingEntity.getDataType());
fields.put("dev_mapping", mappingEntity.getDevMapping());
fields.put("sksy", mappingEntity.getsKSY());
fieldList.add(fields);
} else {
// 是否存在索引名
if (!existIndex(index_name)) {
// 建立索引
createMapping(index_name, fieldList);
}
index_name = mappingEntity.getIndex_name();
fieldList.clear();
Map<String, String> fields = new HashMap<String, String>();
fields.put("data_type", mappingEntity.getDataType());
fields.put("dev_mapping", mappingEntity.getDevMapping());
fields.put("sksy", mappingEntity.getsKSY());
fieldList.add(fields);
}
}
}
}
MysqlUtils mysqlConn = new MysqlUtils("HDDL");
mysqlConn.mysqlConnect();
List<MappingEntity> resultList = mysqlConn.getModelData(SQL);
List<Map<String, String>> fieldsList = new ArrayList<>();
for (MappingEntity mappingEntity :
resultList) {
Map<String, String> fields = new HashMap<String, String>();
fields.put("data_type", mappingEntity.getDataType());
fields.put("dev_mapping", mappingEntity.getDevMapping());
fields.put("sksy", mappingEntity.getsKSY());
fieldsList.add(fields);
}
ESUtils esUtils = new ESUtils();
esUtils.connectES();
esUtils.createMapping(table, fieldsList);
mysqlConn.mysqlClose();
esUtils.closeES();
主main
// 通過java -jar jar檔案 [hive|sqoop]命令執行相關的生成分割槽表或者生成sqoop指令碼程式
// 此處臨時方案是mysql對倒
if (args.length < 1) {
logger.info("輸入引數錯誤");
System.exit(-1);
}
try {
long startTime=System.currentTimeMillis(); //獲得當前時
// 判斷子命令進而利用反射執行不同類的啟動方法
if (Constants.MainParam.MYSQL.equals(args[0].toLowerCase())) {
logger.info("enter sub command: mysql");
Class<RunMySQLApp2> clazz = RunMySQLApp2.class;
Method method = clazz.getDeclaredMethod("main", String[].class);
method.setAccessible(true);
method.invoke(clazz.newInstance(), new Object[]{args});
}
long endTime=System.currentTimeMillis(); //獲得當前時間
System.out.println((endTime-startTime)/1000 +" 秒" );
}catch (Exception e) {
logger.error(e.getMessage());
e.printStackTrace();
}
DataBaseUtil工具類
private static void initDatasource(Properties props) {
try {
dataSource = (DruidDataSource) DruidDataSourceFactory.createDataSource(props);
} catch (Exception e) {
e.printStackTrace();
}
}
private static void initHiveDatasource(Properties props) {
try {
hiveDatasource = (DruidDataSource) DruidDataSourceFactory.createDataSource(props);
} catch (Exception e) {
e.printStackTrace();
}
}
public static DruidPooledConnection getConn(String url, String username, String password) {
DruidPooledConnection connection = null;
Properties props = new Properties();
props.setProperty("url", url);
props.setProperty("username", username);
props.setProperty("password", password);
initDatasource(props);
try {
connection = dataSource.getConnection();
} catch (SQLException e) {
e.printStackTrace();
}
return connection;
}
public static DruidPooledConnection getHiveConn(String url, String username, String password) {
DruidPooledConnection connection = null;
Properties props = new Properties();
props.setProperty("url", url);
props.setProperty("username", username);
props.setProperty("password", password);
initHiveDatasource(props);
try {
connection = hiveDatasource.getConnection();
} catch (SQLException e) {
e.printStackTrace();
}
return connection;
}
public static Statement getStmt(String url, String username, String password, DruidPooledConnection connection) {
try {
statement = connection.createStatement();
} catch (SQLException e) {
e.printStackTrace();
}
return statement;
}
public static Statement getHiveStmt(String url, String username, String password) {
DruidPooledConnection connection = getHiveConn(url, username, password);
try {
statement = connection.createStatement();
} catch (SQLException e) {
e.printStackTrace();
}
return hiveStatement;
}
public static boolean execSQL(String url, String username, String password, String sql, DruidPooledConnection conn) {
boolean flag = false;
Statement statement = getStmt(url, username, password, conn);
try {
flag = statement.execute(sql);
} catch (SQLException e) {
e.printStackTrace();
}
return flag;
}
public static ResultSet execMySql(Statement getStmt, String sql) {
ResultSet resultSet = null;
Statement statement = getStmt;
try {
resultSet = statement.executeQuery(sql);
} catch (SQLException e) {
e.printStackTrace();
}
return resultSet;
}
public static ResultSet execSQLForResult(String url, String username, String password, String sql, DruidPooledConnection connection) {
ResultSet resultSet = null;
Statement statement = getStmt(url, username, password, connection);
try {
resultSet = statement.executeQuery(sql);
} catch (SQLException e) {
e.printStackTrace();
}
return resultSet;
}
public static boolean execSQL(Statement getStmt, String sql) {
boolean flag = false;
Statement statement = getStmt;
try {
flag = statement.execute(sql);
} catch (SQLException e) {
e.printStackTrace();
}
return flag;
}
public static boolean execHiveSQL(String url, String username, String password, String sql) {
boolean flag = false;
Statement statement = getHiveStmt(url, username, password);
try {
flag = statement.execute(sql);
} catch (SQLException e) {
e.printStackTrace();
}
return flag;
}
public static ResultSet execHiveSQLForResult(String url, String username, String password, String sql) {
ResultSet resultSet = null;
Statement statement = getHiveStmt(url, username, password);
try {
resultSet = statement.executeQuery(sql);
} catch (SQLException e) {
e.printStackTrace();
}
return resultSet;
}
public static void recycleConn(Connection connection) {
if (dataSource != null) {
dataSource.discardConnection(connection);
}
}
public static void recyclePgConn(Connection connection) {
if (pgDatasource != null) {
pgDatasource.discardConnection(connection);
}
}
public static void recycleHiveConn(Connection connection) {
if (hiveDatasource != null) {
hiveDatasource.discardConnection(connection);
}
}
public static Boolean existTable(String url, String username, String password, String databases, String table) {
Boolean existTable = false;
DruidPooledConnection connection = getConn(url, username, password);
try {
ResultSet rsTables = connection.getMetaData().getTables(databases, null, table,
new String[]{"TABLE"});
while (rsTables.next()) {
String tableName = rsTables.getString("TABLE_NAME");
if (StringUtils.equals(tableName, table)) {
existTable = true;
}
}
rsTables.close();
} catch (SQLException throwables) {
throwables.printStackTrace();
} finally {
if (null != connection) {
recycleConn(connection);
}
}
return existTable;
}
public static Map<String, String> getTableFieldType(String url, String username, String password, String databases, String table) {
Map<String, String> map = new HashMap<>();
String sql = "select * from " + table + " limit 1";
PreparedStatement pStmt = null; //定義盛裝SQL語句的載體pStmt
ResultSet rs = null; //定義查詢結果集rs
DruidPooledConnection connection = getConn(url, username, password);
try {
pStmt = connection.prepareStatement(sql);
rs = pStmt.executeQuery();
ResultSetMetaData metaData = rs.getMetaData();
int count = metaData.getColumnCount();
while (rs.next()) {
for (int i = 1; i < count; i++) {
String columnType = metaData.getColumnTypeName(i);
map.put(metaData.getColumnName(i), columnType);
}
}
} catch (SQLException throwables) {
throwables.printStackTrace();
} finally {
try {
if (pStmt != null) {
pStmt.close();
}
if (null != connection) {
connection.close();
}
} catch (SQLException e) {
e.printStackTrace();
}
}
return map;
}
public static void close(Connection conn, ResultSet rs, PreparedStatement state) {
if (conn != null) {
recycleConn(conn);
}
if (rs != null) {
try {
rs.close();
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
if (state != null) {
try {
state.close();
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
DROP TABLE IF EXISTS JSSMQK_UPLOAD_CDW;
CREATE TABLE
JSSMQK_UPLOAD_CDW
(
BM STRING COMMENT '被調查人專案內編號',
CREATOR STRING COMMENT '建立人',
EFFECT_ON_DATE STRING COMMENT '生效日',
EFFECT_OFF_DATE STRING COMMENT '失效日',
EFFECT_FLAG STRING COMMENT '有效標記',
SJLY STRING COMMENT '資料來源'
)COMMENT '提示'
CLUSTERED BY (BM) INTO 100 BUCKETS
ROW FORMAT DELIMITED
FIELDS TERMINATED BY 't'
LINES TERMINATED BY 'n'
STORED AS ORC
TBLPROPERTIES ('transactional'='true');
答案:注意在打通Hive和ES的關聯互動互通和同步的時候,需要找到如下jar包
1. commons-httpclient-3.1.jar (使用者Hive和ES通訊,本質像是curl或者直接訪問ES的開發Http的埠)
缺少該jar包會報錯:
Caused by: java.lang.NoClassDefFoundError: org/apache/commons/httpclient/protocol/ProtocolSocketFactory at
2. elasticsearch-hadoop-7.3.2的jar包 (注意只要放在hive的lib目錄下面就可以了,不要放在aux的目錄下面,否則會報錯jar包衝突,記得要重啟Hive生效,如果是CDH或者Ambari安裝的話,注意找到對應的安裝目錄下)
3. 重點注意你的叢集安裝了幾個Hive MetaStore,如果是兩臺的話,必須在兩臺伺服器上都安裝如上的jar包,否則會報錯。
4. 最後應該是在hive使用者下執行所有命令,不是root
curl -H "Content-Type: application/json" -XPUT xx.xx.xx.xx:9200/indexName(要修改的索引名)/_settings -d '{"index.max_result_window":"100000"}'
conda 安裝指南
1)conda list 檢視安裝了哪些包。
2)conda env list 或 conda info -e 檢視當前存在哪些虛擬環境
##### `conda create -n new_env --clone old_env`複製既有環境
conda remove --name python36 --all 解除安裝環境
py.test測試python框架
pytest -v -k 函式名測試部分函式
anaconda安裝過程中問題參考如下:
客戶流失率
insert into ads.cus_m_cust_lost (customer_lost_num,customer_area,year,month,create_time)
SELECT count(*),customer_area,2020 ,11 , now() FROM
(SELECT customer_standard_id, customer_area, ST_SALE_CONTRACT_CODE ,Z1.TIME FROM (
SELECT customer_standard_id,customer_area,MAX(DT_STAMP) AS TIME FROM
(SELECT DISTINCT(A.customer_standard_id),A.customer_area,C.DT_STAMP,C.ST_SALE_CONTRACT_CODE,C.ST_STATUS FROM
(SELECT customer_standard_id,customer_area from ads.`cus_d_ads_cust_info` GROUP BY customer_standard_id,customer_area) A LEFT JOIN ads.`cus_d_ads_contract` C ON A.customer_standard_id = C.ST_CUSTOMER_ID) ta GROUP BY customer_standard_id,customer_area
) Z1 , ads.`cus_d_ads_contract` Z2 WHERE Z1.TIME = Z2.DT_STAMP AND year("2020-11-30")-year(Z1.TIME)>5 AND Z2.ST_STATUS=9 ) Z3 GROUP BY customer_area
漢字轉拼音首字母簡拼的SQL程式碼
CREATE DEFINER=`root`@`%` FUNCTION `F_GETPY`(in_string VARCHAR(100)) RETURNS varchar(100) CHARSET utf8
BEGIN
DECLARE tmp_str VARCHAR(100) CHARSET gbk DEFAULT '' ;
DECLARE tmp_char VARCHAR(100) CHARSET gbk DEFAULT '' ;
DECLARE V_LEN INT;
DECLARE V_I INT;
DECLARE V_PY VARCHAR(100);
SET V_LEN=CHAR_LENGTH(in_string);
SET V_I=1;
SET V_PY='';
DROP TEMPORARY TABLE IF EXISTS TT_PYZD;
CREATE TEMPORARY TABLE TT_PYZD (chr char(2) ,letter char(2)) DEFAULT CHARSET gbk;
INSERT INTO TT_PYZD
SELECT '吖 ', 'A ' UNION ALL SELECT '八 ', 'B ' UNION ALL
SELECT '嚓 ', 'C ' UNION ALL SELECT '咑 ', 'D ' UNION ALL
SELECT '妸 ', 'E ' UNION ALL SELECT '發 ', 'F ' UNION ALL
SELECT '旮 ', 'G ' UNION ALL SELECT '鉿 ', 'H ' UNION ALL
SELECT '丌 ', 'J ' UNION ALL SELECT '咔 ', 'K ' UNION ALL
SELECT '垃 ', 'L ' UNION ALL SELECT '嘸 ', 'M ' UNION ALL
SELECT '拏 ', 'N ' UNION ALL SELECT '噢 ', 'O ' UNION ALL
SELECT '妑 ', 'P ' UNION ALL SELECT '七 ', 'Q ' UNION ALL
SELECT '呥 ', 'R ' UNION ALL SELECT '仨 ', 'S ' UNION ALL
SELECT '他 ', 'T ' UNION ALL SELECT '屲 ', 'W ' UNION ALL
SELECT '夕 ', 'X ' UNION ALL SELECT '丫 ', 'Y ' UNION ALL
SELECT '帀 ', 'Z ' ;
WHILE V_I<=V_LEN DO
SET tmp_str = substring(in_string,V_I,1);
IF ASCII(tmp_str)<127
THEN
SET tmp_char=UPPER(tmp_str);
ELSE
SELECT letter INTO tmp_char FROM TT_PYZD WHERE chr<=tmp_str ORDER BY chr DESC LIMIT 1;
END IF ;
SET V_I=V_I+1;
SET V_PY=CONCAT(V_PY,tmp_char);
END WHILE;
DROP TEMPORARY TABLE IF EXISTS TT_PYZD;
RETURN V_PY;
END