Spark支援kerberos環境下的Kafka、Hbase傳輸
一、準備環境: 建立Kafka Topic和HBase表
1. 在kerberos環境下建立Kafka Topic
1.1 因為kafka預設使用的協議為PLAINTEXT,在kerberos環境下需要變更其通訊協議: 在${KAFKA_HOME}/config/producer.properties
和config/consumer.properties
下新增
security.protocol=SASL_PLAINTEXT
1.2 在執行前,需要在環境變數中新增KAFKA_OPT選項,否則kafka無法使用keytab:
export KAFKA_OPTS="$KAFKA_OPTS -Djava.security.auth.login.config=/usr/ndp/current/kafka_broker/conf/kafka_jaas.conf"
其中kafka_jaas.conf
內容如下:
cat /usr/ndp/current/kafka_broker/conf/kafka_jaas.conf
KafkaServer {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/etc/security/keytabs/kafka.service.keytab"
storeKey=true
useTicketCache=false
serviceName="kafka"
principal="kafka/[email protected]";
};
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useTicketCache=true
renewTicket=true
serviceName="kafka";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
keyTab="/etc/security/keytabs/kafka.service.keytab"
storeKey=true
useTicketCache=false
serviceName="zookeeper"
principal="kafka/[email protected]";
};
1.3 建立新的topic:
bin/kafka-topics.sh --create --zookeeper hzadg-mammut-platform2.server.163.org:2181,hzadg-mammut-platform3.server.163.org:2181 --replication-factor 1 --partitions 1 --topic spark-test
1.4 建立生產者:
bin/kafka-console-producer.sh --broker-list hzadg-mammut-platform2.server.163.org:6667,hzadg-mammut-platform3.server.163.org:6667,hzadg-mammut-platform4.server.163.org:6667 --topic spark-test --producer.config ./config/producer.properties
1.5 測試消費者:
bin/kafka-console-consumer.sh --zookeeper hzadg-mammut-platform2.server.163.org:2181,hzadg-mammut-platform3.server.163.org:2181 --bootstrap-server hzadg-mammut-platform2.server.163.org:6667 --topic spark-test --from-beginning --new-consumer --consumer.config ./config/consumer.properties
2. 建立HBase表
2.1 kinit到hbase賬號,否則無法建立hbase表
kinit -kt /etc/security/keytabs/hbase.service.keytab hbase/[email protected]
./bin/hbase shell
> create 'recsys_logs', 'f'
二、編寫Spark程式碼
編寫簡單的Spark Java程式,功能為: 從Kafka消費資訊,同時將batch內統計的數量寫入Hbase中,具體可以參考專案:
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netease.spark.streaming.hbase;
import com.netease.spark.utils.Consts;
import com.netease.spark.utils.JConfig;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
public class JavaKafkaToHBaseKerberos {
private final static Logger LOGGER = LoggerFactory.getLogger(JavaKafkaToHBaseKerberos.class);
private static HConnection connection = null;
private static HTableInterface table = null;
public static void openHBase(String tablename) throws IOException {
Configuration conf = HBaseConfiguration.create();
synchronized (HConnection.class) {
if (connection == null)
connection = HConnectionManager.createConnection(conf);
}
synchronized (HTableInterface.class) {
if (table == null) {
table = connection.getTable("recsys_logs");
}
}
}
public static void closeHBase() {
if (table != null)
try {
table.close();
} catch (IOException e) {
LOGGER.error("關閉 table 出錯", e);
}
if (connection != null)
try {
connection.close();
} catch (IOException e) {
LOGGER.error("關閉 connection 出錯", e);
}
}
public static void main(String[] args) throws Exception {
String hbaseTable = JConfig.getInstance().getProperty(Consts.HBASE_TABLE);
String kafkaBrokers = JConfig.getInstance().getProperty(Consts.KAFKA_BROKERS);
String kafkaTopics = JConfig.getInstance().getProperty(Consts.KAFKA_TOPICS);
String kafkaGroup = JConfig.getInstance().getProperty(Consts.KAFKA_GROUP);
// open hbase
try {
openHBase(hbaseTable);
} catch (IOException e) {
LOGGER.error("建立HBase 連線失敗", e);
System.exit(-1);
}
SparkConf conf = new SparkConf().setAppName("JavaKafakaToHBase");
JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000));
Set<String> topicsSet = new HashSet<>(Arrays.asList(kafkaTopics.split(",")));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", kafkaBrokers);
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", kafkaGroup);
kafkaParams.put("auto.offset.reset", "earliest");
kafkaParams.put("enable.auto.commit", false);
// 在kerberos環境下,這個配置需要增加
kafkaParams.put("security.protocol", "SASL_PLAINTEXT");
// Create direct kafka stream with brokers and topics
final JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
ssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(Arrays.asList(topicsSet.toArray(new String[0])), kafkaParams)
);
JavaDStream<String> lines = stream.map(new Function<ConsumerRecord<String, String>, String>() {
private static final long serialVersionUID = -1801798365843350169L;
@Override
public String call(ConsumerRecord<String, String> record) {
return record.value();
}
}).filter(new Function<String, Boolean>() {
private static final long serialVersionUID = 7786877762996470593L;
@Override
public Boolean call(String msg) throws Exception {
return msg.length() > 0;
}
});
JavaDStream<Long> nums = lines.count();
nums.foreachRDD(new VoidFunction<JavaRDD<Long>>() {
private SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd HH:mm:ss");
@Override
public void call(JavaRDD<Long> rdd) throws Exception {
Long num = rdd.take(1).get(0);
String ts = sdf.format(new Date());
Put put = new Put(Bytes.toBytes(ts));
put.add(Bytes.toBytes("f"), Bytes.toBytes("nums"), Bytes.toBytes(num));
table.put(put);
}
});
ssc.start();
ssc.awaitTermination();
closeHBase();
}
}
三、 編譯並在Yarn環境下執行
3.1 切到專案路徑下,編譯專案:
mvn clean package
3.2 執行Spark環境
- 由於executor需要訪問kafka,所以需要將Kafka授權過的kerberos使用者下發至executor中;
- 由於叢集環境的hdfs也是kerberos加密的,需要通過spark.yarn.keytab/spark.yarn.principal配置可以訪問Hdfs/HBase的keytab資訊;
在專案目錄下執行如下:
/usr/ndp/current/spark2_client/bin/spark-submit \
--files ./kafka_client_jaas.conf,./kafka.service.keytab \
--conf "spark.executor.extraJavaOptions=-Djava.security.auth.login.config=./kafka_client_jaas.conf" \
--driver-java-options "-Djava.security.auth.login.config=./kafka_client_jaas.conf" \
--conf spark.yarn.keytab=/etc/security/keytabs/hbase.service.keytab \
--conf spark.yarn.principal=hbase/hzadg-mammut-platform1.server.163.org@BDMS.163.COM \
--class com.netease.spark.streaming.hbase.JavaKafkaToHBaseKerberos \
--master yarn \
--deploy-mode client \
./target/spark-demo-0.1.0-jar-with-dependencies.jar
其中kafka_client_jaas.conf
檔案具體內容如下:
cat kafka_client_jaas.conf
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useKeyTab=true
renewTicket=true
keyTab="./kafka.service.keytab"
storeKey=true
useTicketCache=false
serviceName="kafka"
principal="kafka/[email protected]";
};
3.2 執行結果
相關推薦
Spark支援kerberos環境下的Kafka、Hbase傳輸
一、準備環境: 建立Kafka Topic和HBase表 1. 在kerberos環境下建立Kafka Topic 1.1 因為kafka預設使用的協議為PLAINTEXT,在kerberos環境下需要變更其通訊協議: 在${KAFKA_HOME}/co
大資料叢集遇到的問題(Hadoop、Spark、Hive、kafka、Hbase、Phoenix)
大資料平臺中遇到的實際問題,整理了一下,使用CDH5.8版本,包括Hadoop、Spark、Hive、kafka、Hbase、Phoenix、Impala、Sqoop、CDH等問題,初步整理下最近遇到的問題,不定期更新。 啟動nodemanager失敗 2016-09-07
Golang學習-第一篇 Golang的簡單介紹及Windows環境下安裝、部署
需要 簡單 電腦 pan 生成文件 多核 -- pear () 序言 這是本人博客園第一篇文章,寫的不到位之處,希望各位看客們諒解。 本人一直從事.NET的開發工作,最近在學習Golang,所以想著之前學習的過程中都沒怎麽好好的將學習過程記錄下來。深感惋惜! 現在將Gola
CentOS 7環境下Kafka的安裝和基本使用
uil nor mod ner comment github jdk下載 1.0 esc CentOS 7環境下Kafka的安裝和基本使用 基礎環境 Windows 10 X64 VMware-workstation-full-12.0.0-2985596
Spark 在 Window 環境下的搭建
shell 安裝jdk oracle devel amd64 cmd for 3.5 mark 1.java/scala的安裝 - 安裝JDK下載: http://www.oracle.com/technetwork/java/javase/downloads/j
Golang的簡單介紹及Windows環境下安裝、部署
Golang安裝 Golang下載:https://golang.google.cn/dl/。 作業系統 包名 Windows go1.4.windows-amd64.msi Linux
0110-如何給Kerberos環境下的CDH叢集新增Gateway節點
Fayson的github: https://github.com/fayson/cdhproject 1.文件編寫目的 Gateway節點又稱為客戶端節點,通常用作訪問Hadoop叢集的介面機。它主要會部署一些客戶端的配置,指令碼命令,比如HDFS的core-site.xml,hdfs-site.xml
Maven、Kafka、HBASE、flume安裝
一、安裝Maven tar -xvzf apache-maven-3.5.2-bin.tar.gz 2.新增環境變數 vim ~/.bashrc export MAVEN_HOME=/usr/local/src/apache-maven-3.5.2 export
0110-如何給Kerberos環境下的CDH集群添加Gateway節點
用戶 接下來 插入圖片 -i hosts shell href gre linu Fayson的github: https://github.com/fayson/cdhproject 推薦關註微信公眾號:“Hadoop實操”,ID:gh_c4c535955d0f,或者掃描
Kerberos環境下KafkaManager的安裝使用(編譯版和免編譯版)
為了能夠方便的檢視及管理Kafka叢集,yahoo提供了一個基於Web的管理工具(Kafka-Manager)。這個工具可以方便的檢視叢集中Kafka的Topic的狀態(分割槽、副本及訊息量等),支援管理多個叢集、重新分配Partition及建立Topic等功能。
來自LinkedIn的實踐:生產環境下Kafka的除錯和最佳實踐
在本文中,LinkedIn的軟體工程師Joel Koshy詳細闡述了他和一個工程師團隊是如何解決生產環境下Kafka的兩次事故的。這兩次事故是由於多個產品缺陷、特殊的客戶行為以及監控缺失的交錯影響導致的。 第一個缺陷是在LinkedIn的變更請求跟蹤系統中觀察到的,部署平臺認為這是從服務發出的重複郵
【python 爬蟲】Mac環境下selenium、ChromeDriver的安裝
1.安裝Selenium庫 在終端輸入下面指令: pip3 install selenium 2.安裝ChromeDriver 這個安裝有點麻煩,我參考一個bloghttps://cuiqingcai.com/5135.html的。Mac的rootless機制我弄了很
idea環境下js、css中文亂碼
idea2018.2+tomcat8+java8+win10 異常:本地js和css通過tomcat釋出時,在頁面打印出的中文是亂碼。而從資料庫讀取的中文資料和html的中文顯示正常。 解決方法: 步驟一:將專案全部設定為UTF-8 步驟二:
在Windows環境下編譯除錯Hbase原始碼
一、 hbase架構簡介 HBase 是一個開源的、分散式的、資料多版本的,列式儲存的nosql資料庫。依託 Hadoop 的分散式檔案系統 HDFS 作為底層儲存, 能夠為數十億行數百萬列的海量資料表提供隨機、實時的讀寫訪問。 HBase 叢集服務包含:HBase 資料庫
大資料平臺運維-----Kerberos環境下Hive及Impala監控指令碼的開發
一、工程目錄二、原理解析 Hive和Impala是兩個最常用的大資料查詢工具,他們的主要區別是Hive適合對實時性要求不太高的業務,對資源的要求較低;而Impala的由於採用了全新的架構,處理速度非常的快,但同樣的也對資源消耗比較大,適合實時性要求高的業務。 在我
Mac環境下安裝、配置liteide
1、下載 大家根據系統下載對應的版本的liteide macOS liteidex32-2.macosx-qt5.zip(recommend) MacOS X 10.8 10.9 macOS 10.10 10.11 10.12 liteidex32
網易雲信即時通訊推送保障及網路優化詳解(三):如何在弱網環境下優化大資料傳輸
對於移動 APP 來說,IM 功能正變得越來越重要,它能夠建立起人與人之間的連線。社交類產品中,使用者與使用者之間的溝通可以產生出更好的使用者粘性。在複雜的 Android 生態環境下,多種因素都會造成訊息推送不能及時達到客戶端。另外,不穩定的行動網路也給資料傳輸的速率和可靠
同網段環境下Linux系統檔案傳輸
前提:源機器和目標機器能互通,知道檔案所屬使用者的登陸密碼 源機器:tar -cvf 臨時檔名.tar 資料夾/檔案1 資料夾/檔案2 資料夾/檔案3…… 目標機器:scp 源機器使用者名稱@機器
有kerberos認證hbase在spark環境下的使用
hadoop中計算框架MapReduce中儲存到有kerberos的hdfs,由於其內部yarn進行了認證故不需要進行相關的操作,可直接進行讀寫操作。spark使用有kerberos認證的hbase是一個既麻煩又簡單的問題,麻煩的方面是:中文的網站相關的文章很少並且分佈只是分
maven環境下使用java、scala混合開發spark應用
熟悉java的開發者在開發spark應用時,常常會遇到spark對java的介面文件不完善或者不提供對應的java介面的問題。這個時候,如果在java專案中能直接使用scala來開發spark應用,同時使用java來處理專案中的其它需求,將在一定程度上降低開發spark專案的