分享某大學實驗室內部專案:Docker+Kafka實戰流程
前言
之前老有朋友跟我說有沒有Docker+其他技術完整得執行流程啊,體驗一下做專案得感覺是什麼樣,但是說實話,我實在是無能為力,雖然公司內部確實用到了Docker得相關內容,但是你要說讓我摘出來,那真的是有點為難我了,實在是沒那個精力,我也就沒有去做這件事
但是,最近剛好家裡在讀大學的小侄子(純輩分大),他有一個實驗專案,需要用到Docker,這一天,這不是現成得例項 啊,這裡就給大家完整得展示一下,有需要的朋友也可以對比著實際操作一下
文章首發公眾號:Java架構師聯盟,每日更新技術好聞
實驗目的
Understanding the concept of message passing
Trying to follow up the procedure of a message broker that handles message from many tenants Repeating what others have done in the past sheds the light on your future
實驗步驟
1. 安裝docker-compose 和 docker-machine
已安裝docker,系統是Ubuntu
安裝docker-compose
\1. 下載最新版本 (v1.24.0) docker-compose
$ sudo curl -L "http://github.com/docker/compose/releases/download/1.24.0/docker-compose-$(uname
-s)-$(uname -m)" -o /usr/local/bin/docker-compose
ps:網速真心慢,建議直接從github(連結:https://github.com/docker/machine/releases/download/v0.16.1/docker-
注意 : 使用curl,url應該以http,不是https。使用https可能會超出時間或拒絕連線
\2. 給已下載的docker-compose檔案執行許可權
$ sudo chmod +x /usr/local/bin/docker-compose
\3. Install command completion for the bash and zsh shell.
$ sudo ln -s /usr/local/bin/docker-compose /usr/bin/docker-compose
\4. 檢查docker-compose版本
$ docker-compose --version
docker-compose version 1.24.0, build 0aa59064
安裝docker-maceine
提示: 你可以按照官網的步驟使用curl下載docker檔案,但網速太慢。建議和我一樣(和下載docker-compose 建議一樣)從github下載,並改名成docker-machine。移動到/tmp下。
官網shell:
$ base=https://github.com/docker/machine/releases/download/v0.16.0 &&
curl -L $base/docker-machine-$(uname -s)-$(uname -m) >/tmp/docker-machine && sudo install /tmp/docker-machine /usr/local/bin/docker-machine
但是年輕人沒有性格怎麼可以,我的步驟如下:
\1. 從github下載docker-machine所需檔案
檔案連結:https://github.com/docker/machine/releases/download/v0.16.1/docker-machine-Linux- x86_64
\2. 檔案改名成docker-machine,並移動到/tmp資料夾下
\3. 安裝docker-machine
$ sudo install /tmp/docker-machine /usr/local/bin/docker-machine
\4. 檢視docker-machine版本
$ docker-machine version
2. 安裝VirtualBox
1. 雖然也可以直接安裝deb包,但畢竟懶,新增源可以保持更新
$ sed -i '$adeb http://download.virtualbox.org/virtualbox/debian xenial contrib'
/etc/apt/sources.list
2. 為apt-secure匯入公鑰
$ wget -q https://www.virtualbox.org/download/oracle_vbox_2016.asc -O- | sudo apt-key add -
$ wget -q https://www.virtualbox.org/download/oracle_vbox.asc -O- | sudo apt-key add -
\3. 通過apt安裝VirtualBox
$ sudo apt-get update
$ sudo apt-get install virtualbox-6.0
這裡可能會有安裝包衝突問題
lzd@ubuntu:~$ sudo apt-get install virtualbox-6.0
正在讀取軟體包列表... 完成正在分析軟體包的依賴關係樹正在讀取狀態資訊... 完成
有一些軟體包無法被安裝。如果您用的是 unstable 發行版,這也許是
因為系統無法達到您要求的狀態造成的。該版本中可能會有一些您需要的軟體包尚未被建立或是它們已被從新到(Incoming)目錄移出。
下列資訊可能會對解決問題有所幫助:
下列軟體包有未滿足的依賴關係:
virtualbox-6.0 : 依賴: libcurl3 (>= 7.16.2) 但是它將不會被安裝依賴: libpng12-0 (>= 1.2.13-4) 但無法安裝它依賴: libvpx3 (>= 1.5.0) 但無法安裝它
推薦: libsdl-ttf2.0-0 但是它將不會被安裝
E: 無法修正錯誤,因為您要求某些軟體包保持現狀,就是它們破壞了軟體包間的依賴關係。
解決方案
如遇到此問題,使用 aptitude 代替 apt-get 安裝 virtualbox-6.0
$ sudo aptitude install virtualbox-6.
3. 在Docker上建立VM
$ docker-machine create --driver virtualbox --virtualbox-memory 2048 dev
發現執行這步不行,會出現下面這個問題
lzd@ubuntu:~$ docker-machine create --driver virtualbox --virtualbox-memory 2048 dev
Running pre-create checks...
Error with pre-create check: "VBoxManage not found. Make sure VirtualBox is installed and VBoxManage is in the path"
解決方案
@Dreampie, @Aaqib041 can you install the virtualbox : sudo apt-get install virtualbox
then run this command:
docker-machine create --driver virtualbox default
按照這個方法執行完後,成功建立 dev VM
3. 編寫程式碼
\1. 安裝sbt
安裝sbt前,先安裝jdk將下載的壓縮包解壓到/usr/local/目錄下
tar -zxvf sbt-1.2.8.tgz -C /usr/local/sbt
在/usr/local/sbt/目錄下建立sbt檔案
$ cd /usr/local/sbt
$ vim sbt
在sbt檔案中寫入以下內容
#!/bin/bash
BT_OPTS="-Xms512M -Xmx1536M -Xss1M -XX:+CMSClassUnloadingEnabled - XX:MaxPermSize=256M"
java $SBT_OPTS -jar /usr/local/sbt/bin/sbt-launch.jar "$@"
注意sbt-launch.jar的目錄是否正確修改sbt檔案的許可權
$ chmod u+x sbt
配置sbt環境變數
$ vim /etc/profile
在最後一行新增一些內容
export PATH=/usr/local/sbt/bin:$PATH
然後執行使檔案生效
$ source /etc/profile
修改sbt路徑下的sbtconfig.txt檔案
$ vim /usr/local/sbt/conf/sbtconfig.txt
新增以下內容
-Dsbt.global.base=/home/rose/.sbt
-Dsbt.boot.directory=/home/rose/.sbt/boot/
-Dsbt.ivy.home=/home/rose/.ivy2
檢查sbt是否安裝成功(這裡需要聯網,會下載東西)
$ sbt sbt-verison
\2. 使用sbt構建專案
首先需要為專案設定sbt目錄結構(遺憾的是,sbt它不提供引導專案的命令),這裡使用指令碼設定目錄結構。
shell程式碼如下:
#!/bin/bash
if [ -z "$1" ] ; then
echo 'Project name is empty' exit 1
fi
PROJECT_NAME="$1" SCALA_VERSION="${2-2.11.8}" SCALATEST_VERSION="${3-2.2.6}"
mkdir $PROJECT_NAME cd $PROJECT_NAME
cat > build.sbt << EOF name := "$PROJECT_NAME"
scalaVersion := "$SCALA_VERSION"
libraryDependencies += "org.scalatest" %% "scalatest" % "$SCALATEST_VERSION" % "test"
EOF
mkdir -p src/{main/{scala,resources},test/{scala,resources}} cat > .gitignore << EOF
target/ EOF
~
指令碼檔案命名為 sbt-init.sh ,使用指令碼建立sbt專案
sudo ./sbt-init.sh example
\3. 新建檔案assembly.sbt
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.13.0")
\4. 新建配置檔案build.sbt
name := "direct_kafka_word_count" scalaVersion := "2.10.5"
val sparkVersion = "1.5.1"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % "provided", "org.apache.spark" %% "spark-streaming" % sparkVersion % "provided", ("org.apache.spark" %% "spark-streaming-kafka" % sparkVersion) exclude
("org.spark-project.spark", "unused")
)
assemblyJarName in assembly := name.value + ".jar"
\6. 在專案src/main/scala/com/example/spark目錄下新建DirectKafkaWordCount.scala檔案寫入
package com.example.spark
import kafka.serializer.StringDecoder
import org.apache.spark.{TaskContext, SparkConf}
import org.apache.spark.streaming.kafka.{OffsetRange, HasOffsetRanges, KafkaUtils} import org.apache.spark.streaming.{Seconds, StreamingContext}
object DirectKafkaWordCount {
def main(args: Array[String]): Unit = { if (args.length < 2) {
System.err.println(s"""
|Usage: DirectKafkaWordCount <brokers> <topics>
| <brokers> is a list of one or more Kafka brokers
| <topics> is a list of one or more kafka topics to consume from
| """.stripMargin)
System.exit(1)
}
val Array(brokers, topics) = args
// Create context with 10 second batch interval
val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount") val ssc = new StreamingContext(sparkConf, Seconds(10))
// Create direct kafka stream with brokers and topics val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) val messages = KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](
ssc, kafkaParams, topicsSet)
// Get the lines, split them into words, count the words and print val lines = messages.map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _) wordCounts.print()
// Start the computation ssc.start() ssc.awaitTermination()
}
}
\7. 執行sbt語句,得到檔案
sbt assembly
4. 配置容器
\1. 使用 dev Docker客戶端
$ eval "$(docker-machine env dev)"
\2. 使用docker-compose.yml檔案配置docker-compose
$ mkdir -p /home/lzd/myDocker-6/
$ cd /home/lzd/myDocker-6/
$ touch docker-compose.yml
$ chmod +rwx docker-compose.yml
寫入如下配置資訊
kafka:
image: antlypls/kafka-legacy environment:
· KAFKA=localhost:9092
· ZOOKEEPER=localhost:2181 expose:
- "2181"
- "9092"
spark:
image: antlypls/spark:1.5.1 command: bash
volumes:
· ./target/scala-2.10:/app links:
- kafka
執行docker-compose.yml檔案啟動所有容器
docker-compose run --rm spark
這將啟動kafka然後spark並將我們記錄到spark容器shell中。--rm標誌使得在執行後docker-compose刪除相應的spark容器
$ docker-compose run --rm spark
\3. 配置 kafka 容器
Create a topic in a kafka broker
進入 kafka 容器
$ docker exec -it <kafka container ID or kafka container name> bash
在kafka裡新增topic word-count
$ kafka-topics.sh --create --zookeeper $ZOOKEEPER --replication-factor 1 -- partitions 2 --topic word-count
檢查此topic是否加入
$ kafka-topics.sh --list --zookeeper $ZOOKEEPER
$ kafka-topics.sh --describe --zookeeper $ZOOKEEPER --topic word-count
\4. 配置 spark 容器新開一個終端
進入 spark 容器
docker exec -it <spark container ID or spark container name> bash
執行以下語句
spark-submit \
--master yarn-client \
--driver-memory 1G \
--class com.example.spark.DirectKafkaWordCount \ app/direct_kafka_word_count.jar kafka:9092 word-count
ps:這裡要指定 driver-memory 大小,要不然使用預設值 4G。
試驗中就會報錯。錯誤如下:
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000d5550000, 715849728, 0) failed; error='Cannot allocate memory' (errno=12)
#
# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 715849728 bytes for committing reserved memory.
# An error report file with more information is saved as: # /hs_err_pid16414.log
2. Kafka - Stream Word Count demo
1. 啟動 zookeeper 和 broker
啟動 zookeeper
$ docker run -d \
--net=host \
--name=zookeeper \
-e ZOOKEEPER_CLIENT_PORT=32181 \
confluentinc/cp-zookeeper:latest
檢視 zookeeper
docker logs zookeeper | grep -i binding
- cd /kafka/docker/stream
export COMPOSE_PROJECT_NAME="stream-demo" docker-compose up -d
執行結果
Creating network "streamdemo_default" with the default driver Creating streamdemo_zookeeper_1 ...
Creating streamdemo_zookeeper_1 ... done Creating streamdemo_broker_1 ...
Creating streamdemo_broker_1 ... done
Service status
docker-compose ps
2. Create a topic
Create the input topic
docker-compose exec broker bash kafka-topics --create \
--zookeeper zookeeper:2181 \
--replication-factor 1 \
--partitions 1 \
--topic streams-plaintext-input
Create the output topic
kafka-topics --create \
--zookeeper zookeeper:2181 \
--replication-factor 1 \
--partitions 1 \
--topic streams-wordcount-output \
--config cleanup.policy=compact
Describe them
kafka-topics \
--zookeeper zookeeper:2181 \
--describe
結果如下
Topic:streams-plaintext-input
Configs:
PartitionCount:1
ReplicationFactor:1
Topic: streams-plaintext-input Partition: 0
Isr: 1
Leader: 1
Replicas: 1
Topic:streams-wordcount-output PartitionCount:1
Configs:cleanup.policy=compact
Topic: streams-wordcount-output Partition: 0
ReplicationFactor:1
Leader: 1
Replicas:1
Isr: 1