支援kubernetes原生Spark 與其他應用的結合(mysql,postgresql,oracle,hdfs,hbase)
安裝執行支援kubernetes原生排程的Spark程式:https://blog.csdn.net/luanpeng825485697/article/details/83651742
dockerfile的目錄
. ├── driver │ └── Dockerfile ├── driver-py │ └── Dockerfile ├── executor │ └── Dockerfile ├── executor-py │ └── Dockerfile ├── init-container │ └── Dockerfile ├── resource-staging-server │ └── Dockerfile ├── shuffle-service │ └── Dockerfile └── spark-base ├── Dockerfile └── entrypoint.sh
互動式Python Shell
在spark資料夾下面
./bin/pyspark
並執行以下命令,該命令也應返回1000:
sc.parallelize(range(1000)).count()
互動式Scala Shell
開始使用Spark的最簡單方法是通過Scala shell:
./bin/spark-shell
嘗試以下命令,該命令應返回1000:
scala> sc.parallelize(1 to 1000).count()
映象重新封裝
下載spark-2.2.0-k8s-0.5.0-bin-2.7.3客戶端 映象的封裝都是在這個目錄下進行的.
瞭解結構: 首先你需要封裝spark-base映象,這個映象負責將spark需要的jar包和相關的庫,執行檔案等封裝成映象, 例如我們要用spark連結mysql,hbase就需要jar包. 就需要重新封裝spark-base映象. 而driver-py映象和executor-py映象也就是python版本的排程器和執行器是在spark-base的基礎上封裝執行python檔案需要的內容,比如pip, numpy等等. 所以如何修改了spark-base,就要重新封裝driver-py映象和executor-py映象.
封裝spark-base映象
先封裝spark-base映象.在spark-2.2.0-k8s-0.5.0-bin-2.7.3目錄下執行
docker build -t spark-base -f dockerfiles/spark-base/Dockerfile .
spark-base映象系統為ubuntu16.04 jdk版本為1.8
封裝python3.6,安裝需要的pip包
封裝python3.7和相關python包.
修改spark-2.2.0-k8s-0.5.0-bin-2.7.3/dockerfiles/driver-py/Dockerfile檔案
# 執行命令 docker build -t spark-driver-py:latest -f dockerfiles/driver-py/Dockerfile . FROM spark-base ADD examples /opt/spark/examples ADD python /opt/spark/python RUN apk add make automake gcc g++ subversion python3 python3-dev RUN ln -s /usr/bin/python3 /usr/bin/python RUN ln -s /usr/bin/pip3 /usr/bin/pip RUN pip install --upgrade pip RUN pip install --upgrade setuptools numpy pandas Matplotlib sklearn opencv-python RUN rm -r /root/.cache # UNCOMMENT THE FOLLOWING TO START PIP INSTALLING PYTHON PACKAGES # RUN apk add --update alpine-sdk python-dev ENV PYTHON_VERSION 3.6.6 ENV PYSPARK_PYTHON python ENV PYSPARK_DRIVER_PYTHON python ENV PYTHONPATH ${SPARK_HOME}/python/:${SPARK_HOME}/python/lib/py4j-0.10.4-src.zip:${PYTHONPATH} CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \ env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > /tmp/java_opts.txt && \ readarray -t SPARK_DRIVER_JAVA_OPTS < /tmp/java_opts.txt && \ if ! [ -z ${SPARK_MOUNTED_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \ if ! [ -z ${SPARK_SUBMIT_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_SUBMIT_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \ if ! [ -z ${SPARK_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \ if ! [ -z ${SPARK_MOUNTED_FILES_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \ if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \ ${JAVA_HOME}/bin/java "${SPARK_DRIVER_JAVA_OPTS[@]}" -cp $SPARK_CLASSPATH -Xms$SPARK_DRIVER_MEMORY -Xmx$SPARK_DRIVER_MEMORY -Dspark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS $SPARK_DRIVER_CLASS $PYSPARK_PRIMARY $PYSPARK_FILES $SPARK_DRIVER_ARGS
修改spark-2.2.0-k8s-0.5.0-bin-2.7.3/dockerfiles/executor-py/Dockerfile檔案
# docker build -t spark-executor-py:latest -f dockerfiles/executor-py/Dockerfile .
FROM spark-base
ADD examples /opt/spark/examples
ADD python /opt/spark/python
RUN apk add make automake gcc g++ subversion python3 python3-dev
RUN ln -s /usr/bin/python3 /usr/bin/python
RUN ln -s /usr/bin/pip3 /usr/bin/pip
RUN pip install --upgrade pip
RUN pip install --upgrade setuptools numpy pandas Matplotlib sklearn opencv-python
RUN rm -r /root/.cache
ENV PYTHON_VERSION 3.6.6
ENV PYSPARK_PYTHON python
ENV PYSPARK_DRIVER_PYTHON python
ENV PYTHONPATH ${SPARK_HOME}/python/:${SPARK_HOME}/python/lib/py4j-0.10.4-src.zip:${PYTHONPATH}
CMD SPARK_CLASSPATH="${SPARK_HOME}/jars/*" && \
env | grep SPARK_JAVA_OPT_ | sed 's/[^=]*=\(.*\)/\1/g' > /tmp/java_opts.txt && \
readarray -t SPARK_EXECUTOR_JAVA_OPTS < /tmp/java_opts.txt && \
if ! [ -z ${SPARK_MOUNTED_CLASSPATH}+x} ]; then SPARK_CLASSPATH="$SPARK_MOUNTED_CLASSPATH:$SPARK_CLASSPATH"; fi && \
if ! [ -z ${SPARK_EXECUTOR_EXTRA_CLASSPATH+x} ]; then SPARK_CLASSPATH="$SPARK_EXECUTOR_EXTRA_CLASSPATH:$SPARK_CLASSPATH"; fi && \
if ! [ -z ${SPARK_MOUNTED_FILES_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_DIR/." .; fi && \
if ! [ -z ${SPARK_MOUNTED_FILES_FROM_SECRET_DIR+x} ]; then cp -R "$SPARK_MOUNTED_FILES_FROM_SECRET_DIR/." .; fi && \
${JAVA_HOME}/bin/java "${SPARK_EXECUTOR_JAVA_OPTS[@]}" -Dspark.executor.port=$SPARK_EXECUTOR_PORT -Xms$SPARK_EXECUTOR_MEMORY -Xmx$SPARK_EXECUTOR_MEMORY -cp $SPARK_CLASSPATH org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url $SPARK_DRIVER_URL --executor-id $SPARK_EXECUTOR_ID --cores $SPARK_EXECUTOR_CORES --app-id $SPARK_APPLICATION_ID --hostname $SPARK_EXECUTOR_POD_IP
封裝映象並push到倉庫
提交python程式碼
在spark客戶端的資料夾下執行
bin/spark-submit \
--deploy-mode cluster \
--master k8s://https://192.168.1.111:6443 \
--kubernetes-namespace spark-cluster \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
--conf spark.executor.instances=5 \
--conf spark.app.name=spark-pi \
--conf spark.kubernetes.driver.docker.image=kubespark/spark-driver-py:v2.2.0-kubernetes-0.5.0 \
--conf spark.kubernetes.executor.docker.image=kubespark/spark-executor-py:v2.2.0-kubernetes-0.5.0 \
--conf spark.kubernetes.initcontainer.docker.image=kubespark/spark-init:v2.2.0-kubernetes-0.5.0 \
--conf spark.kubernetes.resourceStagingServer.uri=http://192.168.11.127:31000 \
--jars local:///opt/spark/jars/RoaringBitmap-0.5.11.jar \
./demo_xxx.py
讀取mysql資料
1、將mysql-connector-java-5.1.47-bin.jar檔案放在spark-2.2.0-k8s-0.5.0-bin-2.7.3/jars/資料夾下面
下載地址:https://dev.mysql.com/downloads/connector/j/5.1.html
重新封裝spark-base映象,進而重新封裝所有映象
docker build -t spark-base -f dockerfiles/spark-base/Dockerfile .
python 讀取mysql資料的demo
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
sc = SparkSession.builder.appName("mysqltest")\
.config('spark.some.config,option0','some-value')\
.getOrCreate()
sqlContext = SQLContext(sc)
jdbcDf=sqlContext.read.format("jdbc").options(url="jdbc:mysql://139.9.0.111:3306/note",
driver="com.mysql.jdbc.Driver",
dbtable="article",user="root",
password="xxxxx").load()
print(jdbcDf.select('label').show()) # 讀取label列,預設只展示20行
讀寫postgresql
將postgresql-42.2.5.jar檔案放在spark-2.2.0-k8s-0.5.0-bin-2.7.3/jars/資料夾下面
下載地址:https://jdbc.postgresql.org/download.html
重新封裝spark-base映象,進而重新封裝所有映象
docker build -t spark-base -f dockerfiles/spark-base/Dockerfile .
python 讀取postgresql的demo
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
sc = SparkSession.builder.appName("postgre_test")\
.config('spark.some.config,option0','some-value')\
.getOrCreate()
sqlContext = SQLContext(sc)
jdbcDf=sqlContext.read.format("jdbc").options(url="jdbc:postgresql://192.168.1.111:31234/postgres",
driver="org.postgresql.Driver",
dbtable="account",user="postgres",
password="xxxx").load()
print(jdbcDf.select('name').show()) # 獲取name列,預設只展示20行
讀寫oracle
將ojdbc8.jar檔案放在spark-2.2.0-k8s-0.5.0-bin-2.7.3/jars/資料夾下面
下載地址:https://www.oracle.com/technetwork/database/application-development/jdbc/downloads/index.html
重新封裝spark-base映象,進而重新封裝所有映象
docker build -t spark-base -f dockerfiles/spark-base/Dockerfile .
讀取hdfs資料
首先要保證客戶端能連線hdfs的namenode和datanode, 因為 客戶端先詢問namenode資料在哪裡,再連線datanode查詢資料.
我們需要進入hdfs datanode的pod,
hadoop基本命令參考:https://blog.csdn.net/luanpeng825485697/article/details/83830569
建立一個檔案包含隨機的字元
echo "a e d s w q s d c x a w s z x d ew d">aa.txt
將檔案放入hdfs檔案系統
hadoop fs -mkdir /aa
hadoop fs -put aa.txt /aa
檢視檔案在hdfs中是否存在
hadoop fs -ls /aa
檢視檔案內容
hadoop fs -cat /aa/aa.txt
刪除目錄
hadoop fs -rm -r /aa
在python中呼叫hdfs中的資料的demo
from pyspark import SparkConf,SparkContext
from operator import add
conf = SparkConf().setAppName("hdfs_test")
sc = SparkContext(conf=conf)
file = sc.textFile("hdfs://192.168.11.127:32072/aa/aa.txt")
rdd = file.flatMap(lambda line:line.split(" ")).map(lambda word:(word,1))
count=rdd.reduceByKey(add)
result = count.collect()
print(result)
讀取hbase資料
將hbase/lib目錄下的hadoop開頭jar包、hbase開頭jar包新增至spark-2.2.0-k8s-0.5.0-bin-2.7.3/jars/資料夾下面
此外還有hbase/lib目錄下的:zookeeper-3.4.6.jar、metrics-core-2.2.0.jar(缺少會提示hbase RpcRetryingCaller: Call exception不斷嘗試重連hbase,不報錯)、htrace-core-3.1.0-incubating.jar、protobuf-java-2.5.0.jar、 guava-12.0.1.jar新增至spark-2.2.0-k8s-0.5.0-bin-2.7.3/jars/資料夾下面
需要注意:在Spark 2.0以上版本缺少相關把hbase的資料轉換python可讀取的jar包,需要我們另行下載。
下載地址
https://mvnrepository.com/artifact/org.apache.spark/spark-examples?repo=typesafe-maven-releases
下載後同樣也要放在spark-2.2.0-k8s-0.5.0-bin-2.7.3/jars/資料夾下面.
我現在最新的版本是spark-examples_2.11-1.6.0-typesafe-001.jar
重新封裝spark-base,重新封裝所有映象
docker build -t spark-base -f dockerfiles/spark-base/Dockerfile .
python讀取hbase資料的demo
print('=====================================')
import os
status = os.popen('echo "10.233.64.13 hbase-master-deployment-1-ddb859944-ctbrm">> /etc/hosts') # 因為hbase之間是通過hostname解析的,所以要先修改hosts檔案
print(status.read())
print('=====================================')
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
#
spark = SparkSession.builder.appName("hbase_test").getOrCreate()
sc = spark.sparkContext
#
zookeeper = '10.233.9.11,10.233.9.12,10.233.9.13'
table = 'product'
#
# # 讀取
conf = {
"hbase.zookeeper.quorum": zookeeper,
"hbase.zookeeper.property.clientPort":"2181",
"hbase.regionserver.port":"60010",
"hbase.master":"10.233.9.21:60000",
"zookeeper.znode.parent":"/hbase",
"hbase.mapreduce.inputtable": table
}
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
hbase_rdd = sc.newAPIHadoopRDD("org.apache.hadoop.hbase.mapreduce.TableInputFormat","org.apache.hadoop.hbase.io.ImmutableBytesWritable","org.apache.hadoop.hbase.client.Result",keyConverter=keyConv,valueConverter=valueConv,conf=conf)
count = hbase_rdd.count()
hbase_rdd.cache()
output = hbase_rdd.collect()
for (k, v) in output:
print((k, v))