CDH5.7.1上Kafka+SparkStream+Hive的實戰
目前的專案中需要將kafka佇列的資料實時存到hive表中。應為之前工作中用到的是CDH5.11,而且spark等用的基本是最新版(2.2),語言也一直是Scala,所以這次要求Java語言以及低版本的spark,在寫程式的時候還是遇到了一些頭疼的事情。
環境:Centos6.5 Spark1.6.0 Kafka0.9.x Hive1.1.0 Zookeeper3.4.5 都是基於CDH5.7.1的
閱讀本文,你將瞭解到:
- 採用Direct方式消費Kafka資料到hive,並將offset提交到Zookeeper。Spark一些引數的使用
- 使用Kafka Old Hight-Level API消費Kafka資料,手動提交offset到zookeeper
- Hive JDBC的注意事項
##############################首先來看第一條
import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.ZkConnection; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.hive.HiveContext; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.HasOffsetRanges; import org.apache.spark.streaming.kafka.KafkaUtils; import org.apache.spark.streaming.kafka.OffsetRange; import org.apache.zookeeper.ZooDefs; import kafka.common.TopicAndPartition; import kafka.serializer.StringDecoder; import kafka.utils.ZKGroupTopicDirs; import kafka.utils.ZkUtils; /** * 採用Direct 方式拉取kafka資料到hive表,可以用hiveSql寫,也可以直接將資料寫入對應的hive目錄,然後執行任意重新整理語句 * ALTER TABLE xxx ADD IF NOT EXISTS PARTITION (yue='2018-05',ri='2018-05-20') * offset手動提交到zookeeper */ public class SparkStreamKafka2HiveDirect { public static void main(String[] args) { String topic = ""; String group = ""; SparkConf conf = new SparkConf().setAppName("待儲存佇列到hive"); //削峰,在任務積壓時,會減少每秒的拉取量 conf.set("spark.streaming.backpressure.enabled", "true"); // maxRetries預設就是1 接受資料相關的一共就只有兩個配置 conf.set("spark.streaming.kafka.maxRetries", "1"); //每秒最多拉取partition * 2 的資料 conf.set("spark.streaming.kafka.maxRatePerPartition", "2"); JavaSparkContext jsc = new JavaSparkContext(conf); JavaStreamingContext jssc = new JavaStreamingContext(jsc, Durations.seconds(5)); // 如果在這裡初始化hivecontext,在下面的運算元內使用hivecontext會報一個空指標異常,原因貌似是用的時候Hivecontext未初始化成功(請知道的大佬普及一下) // HiveContext hiveContext = new HiveContext(jsc); // kafka 引數 HashMap<String, String> kafkaParams = new HashMap<>(); kafkaParams.put("metadata.broker.list", ""); kafkaParams.put("group.id", group); // kafkaParams.put("auto.offset.reset", "smallest"); Set<String> topicSet = new HashSet<>(); topicSet.add(topic); // 賦值操作不是執行緒安全的。若想不用鎖來實現,可以用AtomicReference<V>這個類,實現物件引用的原子更新 final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<>(); // 讀取zookeeper中消費組的偏移量 ZKGroupTopicDirs zgt = new ZKGroupTopicDirs(group, topic); final String zkTopicPath = zgt.consumerOffsetDir(); // System.out.println(zkTopicPath); // 會寫在zookeeper根目錄下consumers下!!! ZkClient zkClient = new ZkClient(""); int countChildren = zkClient.countChildren(zkTopicPath); Map<TopicAndPartition, Long> fromOffsets = new HashMap<>(); if (countChildren > 0) { for (int i = 0; i < countChildren; i++) { String path = zkTopicPath + "/" + i; String offset = zkClient.readData(path); TopicAndPartition topicAndPartition = new TopicAndPartition("", i); fromOffsets.put(topicAndPartition, Long.parseLong(offset)); } /** * createDirectStream(JavaStreamingContext jssc, java.lang.Class<K> keyClass, * java.lang.Class<V> valueClass, java.lang.Class<KD> keyDecoderClass, * java.lang.Class<VD> valueDecoderClass, java.lang.Class<R> recordClass, * java.util.Map<java.lang.String,java.lang.String> kafkaParams, * java.util.Map<kafka.common.TopicAndPartition,java.lang.Long> fromOffsets, * Function<kafka.message.MessageAndMetadata<K,V>,R> messageHandler) Create an * input stream that directly pulls messages from Kafka Brokers without using * any receiver. */ //幸虧java8支援lambda表示式呀,要不然寫慣了Scala的人簡直沒法活了~~~ KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, String.class, kafkaParams, fromOffsets, v -> v.message()).foreachRDD(rdd -> { OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); offsetRanges.set(offsets); // 邏輯處理 HiveContext hiveContext = new HiveContext(jsc); try { //1-採用hivecontext執行"insert into table" 插入資料到hive //2-將DF以Hive的儲存格式存到Hive目錄下 //更新zookeeper ZkClient zkClient1 = new ZkClient(""); OffsetRange[] offsets1 = offsetRanges.get(); if (null != offsets1) { for (OffsetRange o : offsets1) { String zkPath = zkTopicPath + "/" + o.partition(); // System.out.println(zkPath + o.untilOffset()); new ZkUtils(zkClient1, new ZkConnection(""), false) .updatePersistentPath(zkPath, o.untilOffset() + "", ZooDefs.Ids.OPEN_ACL_UNSAFE); } } zkClient.close(); } catch (Exception e) { e.printStackTrace(); } }); } else { KafkaUtils.createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topicSet).foreachRDD(rdd -> { if (!rdd.isEmpty()) { OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); offsetRanges.set(offsets); HiveContext hiveContext = new HiveContext(jsc); try { //處理邏輯程式碼 // 更新zookeeper ZkClient zkClient1 = new ZkClient(""); OffsetRange[] offsets1 = offsetRanges.get(); if (null != offsets1) { for (OffsetRange o : offsets1) { String zkPath = zkTopicPath + "/" + o.partition(); new ZkUtils(zkClient1, new ZkConnection(""), false) .updatePersistentPath(zkPath, o.untilOffset() + "", ZooDefs.Ids.OPEN_ACL_UNSAFE); } } zkClient.close(); } catch (Exception e) { e.printStackTrace(); } } }); } jssc.start(); jssc.awaitTermination(); } }
說說上面程式碼需要注意的地方,這種使用kafkaApi提交offset,會提交到zookeeper'/consumers/xxx'下,而我們專案需要提交到'/kafka/xxx'。除非改原始碼,否則貌似實現不了,我下載原始碼看了看,貌似路徑是寫死的,所以這種方案被pass掉了。
kafka原始碼裡是這樣的,第一個就聲明瞭根目錄
object ZkUtils { val ConsumersPath = "/consumers" val BrokerIdsPath = "/brokers/ids" val BrokerTopicsPath = "/brokers/topics" val ControllerPath = "/controller" val ControllerEpochPath = "/controller_epoch" val ReassignPartitionsPath = "/admin/reassign_partitions" val DeleteTopicsPath = "/admin/delete_topics" val PreferredReplicaLeaderElectionPath = "/admin/preferred_replica_election" val BrokerSequenceIdPath = "/brokers/seqid" val IsrChangeNotificationPath = "/isr_change_notification" val EntityConfigPath = "/config" val EntityConfigChangesPath = "/config/changes"
所以嘗試用kafka自己的消費者API,這種方式和spark Receive方式一樣,會把offset提交到 zkBrokerList的路徑下,例如xxxx:2181;xxxx:2181;xxxx:2181/kafka,就不會走上面/consumers路徑了。
##############################第二條
import com.alibaba.fastjson.JSONObject;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import org.apache.hadoop.fs.FSDataOutputStream;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.time.LocalDate;
import java.util.*;
public class Save2Hive {
public static void main(String[] args) throws IOException {
ConsumerConnector consum = new Save2Hive().createConsumer();
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(Configs.Glob_KafkaTopic_save, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> messageStream = consum.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = messageStream.get(Configs.Glob_KafkaTopic_save).get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
// 定義一個list存放message的value,我覺得這裡放一個數組資源佔用會少點,但是會加大程式碼量
ArrayList<String> list = new ArrayList<String>(3000);
HdfsFileUtil fileUtil = new HdfsFileUtil();
String tableName1 = "source_feature_hive";
String tableName2 = "source_detection_vehicle_all";
while (it.hasNext()) {
if (list.size() < 3000) {
list.add(new String(it.next().message()));
} else {
String ri = LocalDate.now().toString();
String yue = ri.substring(0, 7);
//這裡是包裝了hdfs的append方法,追加寫一個檔案,直接寫到Hive表的資料目錄下,按照月和日分組
FSDataOutputStream out1 = fileUtil.getOut(yue, ri, tableName1);
FSDataOutputStream out2 = fileUtil.getOut(yue, ri, tableName2);
list.forEach(x -> {
try {
out1.write("\n".getBytes("UTF-8"));
out1.hflush();
out2.write("\n".getBytes("UTF-8"));
out2.hflush();
} catch (UnsupportedEncodingException e) {
// LogSet.WriteLog("Error", "4", "06", "600", false, e,"encodingException");
} catch (IOException e) {
// LogSet.WriteLog("Error", "4", "06", "600", false, e,"kafka2HDFS failed");
}
});
list.clear();
if (null != out1 && null != out2) {
try {
out1.close();
out2.close();
consum.commitOffsets();
//log 記錄寫入成功
} catch (Exception e) {
// LogSet.WriteLog("Error", "4", "06", "600", false, e,"FSoutput close fail");
} finally {
//TODO
}
}
}
}
}
private ConsumerConnector createConsumer() {
Properties properties = new Properties();
properties.put("zookeeper.connect", Configs.Glob_KafkaZkQuorum);// 宣告zk
properties.put("group.id", Configs.SaveHive_KafkaConsumerGroup);// 指定消費組
properties.put("rebalance.max.retries", "5");
properties.put("refresh.leader.backoff.ms", "10000");
properties.put("zookeeper.session.timeout.ms", "40000");
properties.put("auto.commit.enable", "false");
// properties.put("auto.offset.reset", "smallest");
return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
}
}
上面的程式碼還是有一個致命的缺點,對是致命的缺點,那就是隻能單節點部署,因為append方法不可以同時操作一個檔案。改進:部署到三臺的機器上,配置檔案也需要三個,分別配置每臺機器的檔名。
##############################第三條
這種方式被改進後的第二種方式取代。要克服第二種方式只能單節點部署的缺點,最先想到的是將資料寫到本地,然後利用HiveJDBC'load data local inpath' 的方式寫到hive表,來看下面的錯誤程式碼:
getStream方法返回一個stmt,然後在main函式裡執行execute()load資料
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import com.alibaba.fastjson.JSONObject;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import ..kafkaConsumerAPI.Configs;
/**
* 寫資料到臨時表 source_feature_hive 和 source_detection_vehicle_all
*/
public class SaveHiveService {
public static void main(String[] args) {
ConsumerConnector consum = new SaveHiveService().createConsumer();
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(Configs.Glob_KafkaTopic_save, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> messageStream = consum.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = messageStream.get(Configs.Glob_KafkaTopic_save).get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
// 定義一個list存放message的value
ArrayList<String> list = new ArrayList<String>();
String tableName1 = "";
String tableName10 = "";
String tableName2 = "";
while (it.hasNext()) {
if (list.size() < Integer.parseInt(Configs.Save_SyncBatchCount)) {
list.add(new String(it.next().message()));
} else {
String ri = LocalDate.now().toString();
String yue = ri.substring(0, 7);
FileWriter stream1 = getStream(tableName1);
FileWriter stream2 = getStream(tableName2);
list.forEach(x -> {
try {
JSONObject value = JSONObject.parseObject(x);
stream1.write("" + "\n");
stream1.flush();
} catch (UnsupportedEncodingException e) {
// LogSet.WriteLog("Error", "4", "06", "600", false, e, "encodingException");
} catch (IOException e) {
// LogSet.WriteLog("Error", "4", "06", "600", false, e, "kafka2local failed");
}
});
if (null != stream1 && null != stream2) {
try {
stream1.close();
stream2.close();
} catch (Exception e) {
// LogSet.WriteLog("Error", "4", "06", "600", false, e, "關閉流失敗");
}
}
try {
SaveHiveService.getStmt()
.execute("load data local inpath '/h/" + tableName1 + ".txt'"
+ " into table " + tableName10 + " partition(yue='" + yue + "',ri='" + ri + "')");
SaveHiveService.getStmt()
.execute("load data local inpath '/home/dispatch/hivedata/" + tableName2 + ".txt'"
+ " into table " + tableName2 + " partition(yue='" + yue + "',ri='" + ri + "')");
consum.commitOffsets();
// LogSet.WriteLog("info", "4", "06", "600", true, "data write success");
list.clear();
} catch (Exception e) {
// LogSet.WriteLog("Error", "4", "06", "600", false, e, "hiveSql failed");
}
}
}
}
private static Statement getStmt() {
String driverName = "org.apache.hive.jdbc.HiveDriver";
try {
Class.forName(driverName);
} catch (ClassNotFoundException e) {
// LogSet.WriteLog("Error", "4", "06", "600", false, e, "hiveJDBC反射失敗");
}
Connection con = null;
try {
con = DriverManager.getConnection(Configs.HiveUrl, Configs.HiveUserName, Configs.HiveUserPwd);
} catch (SQLException e) {
// LogSet.WriteLog("Error", "4", "06", "600", false, e, "hiveJDBC建立連線失敗");
}
Statement stmt = null;
try {
stmt = con.createStatement();
} catch (SQLException e) {
// LogSet.WriteLog("Error", "4", "06", "600", false, e, "con.createStatement失敗");
}
return stmt;
}
private static FileWriter getStream(String tableName) {
FileWriter fw = null;
File f = null;
try {
f = new File("/h/" + tableName + ".txt");
if (!f.exists())
f.createNewFile();
fw = new FileWriter("/h/" + tableName + ".txt");
} catch (IOException e) {
// LogSet.WriteLog("error", "4", "06", "600", false, e, "FileWriter 建立流失敗");
}
return fw;
}
private ConsumerConnector createConsumer() {
Properties properties = new Properties();
properties.put("zookeeper.connect", Configs.Glob_KafkaZkQuorum);// 宣告zk
properties.put("group.id", Configs.SaveHive_KafkaConsumerGroup);// 指定消費組
properties.put("rebalance.max.retries", "5");
properties.put("refresh.leader.backoff.ms", "10000");
properties.put("zookeeper.session.timeout.ms", "40000");
properties.put("auto.commit.enable", "false");
properties.put("auto.offset.reset", "smallest");
return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));
}
}
// load data into table // NOTE: filepath has to be local to the hive server emmm..wtf...檔案路徑必須在server本地才行!!!!OK,這種方案pass!!! // NOTE: /tmp/a.txt is a ctrl-A separated file with two fields per line
OK,kafka寫hive到這裡就結束了。
補充:hive官網:
時間過了一週,發現這個程式還是有個bug,我們叢集的datamanage與namenode通訊發生過故障,導致檔案副本數不足的情況下關閉流,會報close execption。想到的就是在關閉流之前,呼叫fs的getFilestatus(),判斷副本的數量,如果不夠,就等1秒。
相關推薦
CDH5.7.1上Kafka+SparkStream+Hive的實戰
目前的專案中需要將kafka佇列的資料實時存到hive表中。應為之前工作中用到的是CDH5.11,而且spark等用的基本是最新版(2.2),語言也一直是Scala,所以這次要求Java語言以及低版本的spark,在寫程式的時候還是遇到了一些頭疼的事情。環境:Cent
CDH5.7.1 Hadoop2.6 HDFS Encryption KMS 實戰之功能測試
CDH KMS 測試 0、使用者說明 [x] keyAdminUser使用者是key admin user [x] hdfs 用 戶是 hdfs super user [x] user_a 、 user_b 是HDFS普通使用者 1、建立keytab
在centos 7.1上部署weibbix
webbix安裝pipyum install -y openssl epel-release python-pip安裝Django 1.8.18pip install Django==1.8.18安裝MariaDByum install -y mariadb mariadb-server mariadb-de
7.1 python拉勾網實戰並儲存到mongodb
拉鉤網實戰 爬取拉勾網有關“爬蟲”的職位資訊,並把爬取的資料儲存在MongoDB資料庫中 確定網頁的載入方式是JavaScript載入通過谷歌瀏覽器開發者工具分析和尋找網頁的真實請求,確定真實資料在position.Ajax開頭的連結裡,請求方式是POST使用request
阿里雲虛擬機器搭建Hadoop-2.6.0-cdh5.7.1安裝詳解(偽分散式環境)
首先先搭配安全組 開啟映象後輸入以下內容: 重要: yum -y install lrzsz HOSTNAME=(自己的主機名字) hostname $HOSTNAME echo "$(grep -E '127|::1' /etc/host
【Linux】教你一步一步在CentOS Linux release 7.1上安裝vsftpd FTP伺服器
1.執行yum安裝vsftpd sudo yum install vsftpd 2.安裝完成後,修改vsftpd ftp登入使用者配置檔案 如果你希望可以使用root使用者登入ftp的話,需要做以下修改 /etc/vsftpd/ftpusers 該檔案中定義的使
iOS 7.0.4 和 7.1 上計算文字高度有誤差的解決方法
最近寫專案計算文字高度那裡在 7.0.4 和 7.1 上遇見 文字高度 有誤差的問題,一般寫我就直接用系統提供的方法,計算高度,如下: //得到文字的高度 +(NSValue*)getStringWithRect:(NSString*)aString
hadoop2.6.0-cdh5.7.1偽分散式編譯安裝
環境相關: OS:CentOS release 6.9 IP:192.168.1.10 MEM:10G(推薦4G以上) DISK:50G 1. 主機克隆,基礎環境部署 參照《CentOS6實驗機模板搭建部署》 克隆一臺實驗機,調整記憶體為10G,並
【Linux】教你一步一步在CentOS Linux release 7.1上解除安裝vsftpd FTP伺服器
1.首先檢視系統中是否安裝了vsftpd rpm -qa | grep vsftpd 例如:如果顯示結果如下的話,說明是安裝了vsfptd3.0的ftp伺服器的 [[email protected] sbin]# rpm -qa | grep vsftpd v
hive-1.1.0-cdh5.7.0 的編譯安裝並修改元資料儲存資料庫為MySQL
1 準備工作 1.1 配置jdk1.7 1.2 部署MySQL 1.3 安裝maven 1.4編譯安裝 hadoop-2.6.0-cdh5.7.0.src.tar.gz 1.5建立hadoop使用者 安裝的目錄結構:/opt/s
在CentOS 7.2上編譯安裝Nginx 1.13.6
創建 ont uname group with pen 還需 www stub 第一個裏程碑 --- 檢查軟件安裝的系統環境 [root@rainjin ~]# cat /etc/redhat-release CentOS Linux release 7.2.1511 (
webgoat 7.1 實戰指南 - 下
阻止 rec bsp 圖片 pda pps simditor 分享圖片 輸入驗證 webgoat 7.1 實戰指南 - 下 Injection Flaws Command Injection(命令註入) 對任何一個參數驅動的網站,命令註入攻擊代表一個嚴重的威脅。在攻擊後
CDH5.15.1 hive 連接mongodb配置及增刪改查
uri str prope upd info with oot ODB ble 1. 下載 wget http://repo1.maven.org/maven2/org/mongodb/mongo-hadoop/mongo-hadoop-hive/2.0.2/mongo-h
Nginx實戰 1.7-1.11 Nginx架構分析,虛擬機器配置
1.7-1.9 Nginx架構分析 Nginx模組化結構 Nginx涉及到的模組分為核心模組、標準HTTP模組、可選HTTP模組、郵件服務模組以及第三方模組等五大類。 核心模組 核心模組是指Nginx伺服器正常執行時必不可少的模組,它們提供了Nginx最基本最核心的服務,如程序管理、許可
spark2.1.0編譯 cdh5.7.0版本
一、實現目標 從spark官網下載2.1.0的原始碼,然後編譯對應hadoop版本的spark,從而可以解決很多相容性問題,使程式執行環境更加優越,順暢。 二、環境準備 1.硬體 無論雲主機還是虛擬機器,記憶體一定要4G以上,最好8G+。 2.軟體 (1)java:spark
cdh5.7.0偽分散式叢集之hive安裝
基本環境及軟體: 軟體版本 軟體包 centos-6.4 JDK-1.8 jdk-8u191-linux-x64.tar.gz hadoo
Ubuntu 16.04上搭建CDH5.16.1叢集
本文參考自:《Ubuntu16.04上搭建CDH5.14叢集》 1.準備三臺安裝Ubuntu 16.04.4 LTS系統的伺服器,假設ip地址分佈為 192.168.100.19 192.168.100.20 192.168.100.21 (如果是虛擬機器,建議記憶體配置
Netty實戰開發(7):Netty結合kafka實現分散式訊息佇列
在分散式遊戲伺服器系統中,訊息處理佇列主要解決問題就是解耦系統中的業務,使得每個系統看起來功能比較單一,而且解決一些全服資料共享等問題。 通常我們知道kafka是作為訊息佇列比較火的一種方式,其實還有(Active MQ,Rabbit MQ,Zero MQ)個人
[Hadoop] CentOS7 安裝flume-ng-1.6.0-cdh5.7.0
1. Flume 安裝部署 根據官方文件描述,市面上的Flume主流版本有兩個:0.9.x and 1.x。這兩個版本差異非常非常大,舊版本已經被淘汰了,要用的話就使用新版本。當然本文中既定版本為cd
CDH5.16.1叢集增加新節點 Ubuntu 16.04上搭建CDH5.16.1叢集 Ubuntu 16.04上搭建CDH5.16.1叢集
如果是全新安裝叢集的話,可以參考《Ubuntu 16.04上搭建CDH5.16.1叢集》 下面是叢集新增節點步驟: 1.已經存在一個叢集,有兩個節點 192.168.100.19 hadoop-master 192.168.100.20 hadoop-slave1 新增節點ip為