CDH新增第三方服務的方法
- 前瞻導讀
CDH可以很方便的新增一些大資料相關服務,但這僅限於cloudera公司提供。若想將第三方服務(如公司自己開發的元件)新增到CDH叢集(託管在CDH上),需要按照一定的規則流程製作相關程式包,最後釋出到CDH上。
本文就是指導大家如何打包自己的服務,釋出到CDH上,並且由CDH控制服務的執行、監控服務的基本執行狀態。
- 製作相關介紹
- 名詞介紹
parcel:以“.parcel”結尾的gz格式的壓縮檔案。它必須為一個固定規則的檔名。
命名規則必須如下:
檔名稱格式為三段,第一段是包名,第二段是版本號,第三段是執行平臺。
例如:FLINK-1.6.0-hadoop_2.6-scala_2.11-el7.parcel
包名:FLINK
版本號:1.6.0-hadoop_2.6-scala_2.11
執行環境:el7
el6是代表centos6系統,centos7則用el7表示
parcel包內包含了你的服務元件,同時包含一個重要的描述性檔案parcel.json:
這個檔案記錄了你的服務資訊,如版本、所屬使用者、適用的CDH平臺版本等
parcel必須包置於/opt/cloudera/parcel-repo/目錄下才可以被CDH釋出程式時識別到。
csd:csd檔案是一個jar包,它記錄了你的服務在CDH上的管理規則
如你的服務在CDH頁面上顯示的圖示、依賴的服務、暴露的埠、啟動規則等。
csd的jar包必須置於/opt/cloudera/csd/目錄才可以在新增叢集服務時被識別到。
-
- 相關下載
https://github.com/cloudera/cm_csds
https://github.com/cloudera/cm_ext
- 製作CDH元件
- 整理預釋出元件
將你通過測試的服務整理到一個目錄內,目錄內的子目錄結構就是你的工程專案結構,不需要作任何變化。依賴的相關庫檔案可以由系統環境提供,也可以直接放置在該工程目錄下。
任何語言編寫的服務都可以託管到CDH。
-
- 製作flink元件包
- 下載flink包
- 製作flink元件包
https://archive.apache.org/dist/flink/flink-1.6.0/flink-1.6.0-bin-hadoop26-scala_2.11.tgz
-
-
- 製作parcel
-
parcel包的根目錄結構如下:
parcel包目錄結構由你的服務目錄(lib/flink)和一個meta目錄組成。
meta目錄組成檔案如下:
flink_env.sh檔案可以宣告你的服務執行時的bash環境下的一些變數環境,根據你的服務需要可以自行新增設定。
建立flink_env.sh檔案:
vi FLINK-1.6.0-hadoop_2.6-scala_2.11/meta/flink_env.sh
FLINK-1.6.0-hadoop_2.6-scala_2.11/meta/flink_env.sh內容:
#!/bin/bash
FLINK_DIRNAME=${PARCEL_DIRNAME:-"FLINK-1.6.0-hadoop_2.6-scala_2.11"}
export FLINK_HOME=$PARCELS_ROOT/$FLINK_DIRNAME/lib/flink
parcel.json檔案需要填寫好相關的parcel包名、相容的CDH平臺版本資訊。
建立parcel.json檔案(parcel包描述):
vi FLINK-1.6.0-hadoop_2.6-scala_2.11/meta/parcel.json
FLINK-1.6.0-hadoop_2.6-scala_2.11/meta/parcel.json內容:
{
"schema_version": 1,
"name": "FLINK",
"version": "1.6.0-hadoop_2.6-scala_2.11",
"depends": "CDH (>= 5.2), CDH (<< 6.0)",
"setActiveSymlink": true,
"replaces": "FLINK",
"scripts": {
"defines": "flink_env.sh"
},
"packages": [{
"name": "flink-master",
"version": "1.6.0+flink1.6.0"
},
{
"name": "flink-worker",
"version": "1.6.0+flink1.6.0"
}],
"components": [{
"name": "flink",
"version": "1.6.0-flink1.6.0",
"pkg_version": "1.6.0+flink1.6.0",
"pkg_release": "hadoop_2.6-scala_2.11"
}],
"provides": ["flink"],
"users": {
"flink": {
"longname": "Flink",
"home": "/var/lib/flink",
"shell": "/bin/bash",
"extra_groups": []
}
},
"groups": ["flink"]
}
注意:務必注意檔案內容的大小寫,否則可能造成parcel包無法釋出的情況。
建立flink-master.sh檔案:
vi FLINK-1.6.0-hadoop_2.6-scala_2.11/lib/flink/bin/flink-master.sh
FLINK-1.6.0-hadoop_2.6-scala_2.11/lib/flink/bin/flink-master.sh內容:
#!/bin/bash
# Flink Master.
USAGE="Usage: flink-master.sh (start|stop)"
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`
. "$bin"/config.sh
if [ ! -z "${FLINK_JM_HEAP_MB}" ] && [ "${FLINK_JM_HEAP}" == 0 ]; then
echo "used deprecated key \`${KEY_JOBM_MEM_MB}\`, please replace with key \`${KEY_JOBM_MEM_SIZE}\`"
else
flink_jm_heap_bytes=$(parseBytes ${FLINK_JM_HEAP})
FLINK_JM_HEAP_MB=$(getMebiBytes ${flink_jm_heap_bytes})
fi
if [[ ! ${FLINK_JM_HEAP_MB} =~ $IS_NUMBER ]] || [[ "${FLINK_JM_HEAP_MB}" -lt "0" ]]; then
echo "[ERROR] Configured JobManager memory size is not a valid value. Please set '${KEY_JOBM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
exit 1
fi
if [ "${FLINK_JM_HEAP_MB}" -gt "0" ]; then
export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_JM_HEAP_MB"m -Xmx"$FLINK_JM_HEAP_MB"m"
fi
# Add JobManager-specific JVM options
export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_JM}"
# Startup parameters
ARGS=("--configDir" "${FLINK_CONF_DIR}" "--executionMode" "cluster" "--host" "${FLINK_MASTER_HOST}" "--webui-port" "${FLINK_WEB_UI_PORT}")
echo "FLINK_MASTER_HOST: $FLINK_MASTER_HOST"
echo "FLINK_WEB_UI_PORT: $FLINK_WEB_UI_PORT"
echo "FLINK_LOG_DIR: ${FLINK_LOG_DIR}"
echo "MASTER_ARGS: ${ARGS[@]}"
CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
FLINK_TM_CLASSPATH=`constructFlinkClassPath`
FLINK_LOG_PREFIX="${FLINK_LOG_DIR}/flink-master"
log="${FLINK_LOG_PREFIX}.log"
out="${FLINK_LOG_PREFIX}.out"
log_setting=("-Dlog.file=${log}" "-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j.properties" "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback.xml")
JAVA_VERSION=$(${JAVA_RUN} -version 2>&1 | sed 's/.*version "\(.*\)\.\(.*\)\..*"/\1\2/; 1q')
# Only set JVM 8 arguments if we have correctly extracted the version
if [[ ${JAVA_VERSION} =~ ${IS_NUMBER} ]]; then
if [ "$JAVA_VERSION" -lt 18 ]; then
JVM_ARGS="$JVM_ARGS -XX:MaxPermSize=256m"
fi
fi
MY_PID=$(ps -ef | grep "$CLASS_TO_RUN" | grep -v grep | awk '{print $2}')
if [ "${MY_PID}" = "" ];then
# Rotate log files
rotateLogFilesWithPrefix "$FLINK_LOG_DIR" "$FLINK_LOG_PREFIX"
# Evaluate user options for local variable expansion
FLINK_ENV_JAVA_OPTS=$(eval echo ${FLINK_ENV_JAVA_OPTS})
CLASS_PATH=`manglePathList "$FLINK_TM_CLASSPATH:$(hadoop classpath)"`
CLASS_PATH=$(echo "${CLASS_PATH}" | sed "s#"$FLINK_HOME"/lib/slf4j-log4j12-1.7.7.jar:##g")
echo "Starting $DAEMON daemon (pid: $!) on host $HOSTNAME."
exec $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "${CLASS_PATH}" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1
else
echo "$DAEMON daemon (pid: $MY_PID) is running on host $HOSTNAME."
fi
flink-master.sh檔案用於啟動flink的master管理節點。
注意:flink-master.sh指令碼中的exec命令是必須的。
建立flink-worker.sh檔案:
vi FLINK-1.6.0-hadoop_2.6-scala_2.11/lib/flink/bin/flink- worker.sh
FLINK-1.6.0-hadoop_2.6-scala_2.11/lib/flink/bin/flink- worker.sh內容:
#!/bin/bash
#Flink Worker.
USAGE="Usage: flink-worker.sh (start|stop)"
OPERATION=$1
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`
. "$bin"/config.sh
# if memory allocation mode is lazy and no other JVM options are set,
# set the 'Concurrent Mark Sweep GC'
if [[ $FLINK_TM_MEM_PRE_ALLOCATE == "false" ]] && [ -z "${FLINK_ENV_JAVA_OPTS}" ] && [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; then
export JVM_ARGS="$JVM_ARGS -XX:+UseG1GC"
fi
if [ ! -z "${FLINK_TM_HEAP_MB}" ] && [ "${FLINK_TM_HEAP}" == 0 ]; then
echo "used deprecated key \`${KEY_TASKM_MEM_MB}\`, pelase replace with key \`${KEY_TASKM_MEM_SIZE}\`"
else
flink_tm_heap_bytes=$(parseBytes ${FLINK_TM_HEAP})
FLINK_TM_HEAP_MB=$(getMebiBytes ${flink_tm_heap_bytes})
fi
if [[ ! ${FLINK_TM_HEAP_MB} =~ ${IS_NUMBER} ]] || [[ "${FLINK_TM_HEAP_MB}" -lt "0" ]]; then
echo "[ERROR] Configured TaskManager JVM heap size is not a number. Please set '${KEY_TASKM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
exit 1
fi
if [ "${FLINK_TM_HEAP_MB}" -gt "0" ]; then
TM_HEAP_SIZE=$(calculateTaskManagerHeapSizeMB)
# Long.MAX_VALUE in TB: This is an upper bound, much less direct memory will be used
TM_MAX_OFFHEAP_SIZE="8388607T"
export JVM_ARGS="${JVM_ARGS} -Xms${TM_HEAP_SIZE}M -Xmx${TM_HEAP_SIZE}M -XX:MaxDirectMemorySize=${TM_MAX_OFFHEAP_SIZE}"
fi
# Add TaskManager-specific JVM options
export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}"
# Startup parameters
ARGS=("--configDir" "${FLINK_CONF_DIR}")
echo "FLINK_LOG_DIR: ${FLINK_LOG_DIR}"
echo "MASTER_ARGS: ${ARGS[@]}"
CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner
FLINK_TM_CLASSPATH=`constructFlinkClassPath`
FLINK_LOG_PREFIX="${FLINK_LOG_DIR}/flink-worker"
log="${FLINK_LOG_PREFIX}.log"
out="${FLINK_LOG_PREFIX}.out"
log_setting=("-Dlog.file=${log}" "-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j.properties" "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback.xml")
JAVA_VERSION=$(${JAVA_RUN} -version 2>&1 | sed 's/.*version "\(.*\)\.\(.*\)\..*"/\1\2/; 1q')
# Only set JVM 8 arguments if we have correctly extracted the version
if [[ ${JAVA_VERSION} =~ ${IS_NUMBER} ]]; then
if [ "$JAVA_VERSION" -lt 18 ]; then
JVM_ARGS="$JVM_ARGS -XX:MaxPermSize=256m"
fi
fi
MY_PID=$(ps -ef | grep "$CLASS_TO_RUN" | grep -v grep | awk '{print $2}')
if [ "${MY_PID}" = "" ];then
# Rotate log files
rotateLogFilesWithPrefix "$FLINK_LOG_DIR" "$FLINK_LOG_PREFIX"
# Evaluate user options for local variable expansion
FLINK_ENV_JAVA_OPTS=$(eval echo ${FLINK_ENV_JAVA_OPTS})
CLASS_PATH=`manglePathList "$FLINK_TM_CLASSPATH:$(hadoop classpath)"`
CLASS_PATH=$(echo "${CLASS_PATH}" | sed "s#"$FLINK_HOME"/lib/slf4j-log4j12-1.7.7.jar:##g")
echo "Starting $DAEMON daemon (pid: $!) on host $HOSTNAME."
exec $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "${CLASS_PATH}" ${CLASS_TO_RUN} "${ARGS[@]}" > "$out" 2>&1
else
echo "$DAEMON daemon (pid: $MY_PID) is running on host $HOSTNAME."
fi
flink-worker.sh檔案用於啟動flink的worker任務節點。
注意:flink-worker.sh指令碼中的exec命令是必須的。
建立flink-yarn.sh檔案:
vi FLINK-1.6.0-hadoop_2.6-scala_2.11/lib/flink/bin/flink-yarn.sh
FLINK-1.6.0-hadoop_2.6-scala_2.11/lib/flink/bin/flink-yarn.sh內容:
#!/bin/bash
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`
# get Flink config
. "$bin"/config.sh
JVM_ARGS="$JVM_ARGS -Xmx512m"
CLASS_TO_RUN=org.apache.flink.yarn.cli.FlinkYarnSessionCli
log=$FLINK_LOG_DIR/flink-yarn.log
out=$FLINK_LOG_DIR/flink-yarn.out
log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-yarn-session.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback-yarn.xml"
# Rotate log files
rotateLogFilesWithPrefix "$FLINK_LOG_DIR" "$FLINK_LOG_PREFIX"
CLASS_PATH=`manglePathList $(constructFlinkClassPath):$(hadoop classpath)`
CLASS_PATH=$(echo "${CLASS_PATH}" | sed "s#"$FLINK_HOME"/lib/slf4j-log4j12-1.7.7.jar:##g")
exec $JAVA_RUN $JVM_ARGS -classpath "$CLASS_PATH" $log_setting ${CLASS_TO_RUN} -j "$FLINK_LIB_DIR"/flink-dist*.jar "[email protected]" > "$out" 2>&1
flink-yarn.sh檔案用於在yarn中啟動flink。
注意:flink-yarn.sh指令碼中的exec命令是必須的。
建立permissions.json檔案:
vi FLINK-1.6.0-hadoop_2.6-scala_2.11/meta/permissions.json
FLINK-1.6.0-hadoop_2.6-scala_2.11/meta/permissions.json內容:
{
"lib/flink/bin/config.sh": {
"user": "flink",
"group": "flink",
"permissions": "0755"
},
"lib/flink/bin/flink-master.sh": {
"user": "flink",
"group": "flink",
"permissions": "0755"
},
"lib/flink/bin/flink-worker.sh": {
"user": "flink",
"group": "flink",
"permissions": "0755"
},
"lib/flink/bin/flink-yarn.sh": {
"user": "flink",
"group": "flink",
"permissions": "0755"
}
}
permissions.json檔案用於授予檔案或資料夾許可權。
進入FLINK-1.6.0-hadoop_2.6-scala_2.11所在目錄。
建立資料夾parcel-el6:
mkdir parcel-el6
將目錄FLINK-1.6.0-hadoop_2.6-scala_2.11打包,打包成標準名稱的parcel檔案。
執行以下命令:
tar -czvf parcel-el6/FLINK-1.6.0-hadoop_2.6-scala_2.11-el6.parcel FLINK-1.6.0-hadoop_2.6-scala_2.11
打包完Parcel檔案,需要生成FLINK-1.6.0-hadoop_2.6-scala_2.11-el6.parcel.sha檔案,作為FLINK-1.6.0-hadoop_2.6-scala_2.11-el6.parcel檔案的hash校驗值。
該校驗值由make_manifest.py生成,執行完該python命令,生成manifest.json檔案,檔案中包含hash校驗值資訊。
make_manifest.py是一個python指令碼,在2.4使用工具cm_ext中。
執行以下命令:
python cm_ext-master/make_manifest/make_manifest.py parcel-el6
parcel-el6是FLINK-1.6.0-hadoop_2.6-scala_2.11-el6.parcel的父級目錄。
上面的命令會在parcel-el6資料夾下生成一個檔案manifest.json。
parcel-el6/manifest.json內容:
{
"parcels": [{
"hash": "b548e8b4be3db290933222e4bd517c903d36d453",
"depends": "CDH (>= 5.2), CDH (<< 6.0)",
"replaces": "FLINK",
"parcelName": "FLINK-1.6.0-hadoop_2.6-scala_2.11-el6.parcel",
"components": [{
"pkg_version": "1.6.0+flink1.6.0",
"version": "1.6.0-flink1.6.0",
"name": "flink",
"pkg_release": "hadoop_2.6-scala_2.11"
}]
}],
"lastUpdated": 1538048224076
}
建立FLINK-1.6.0-hadoop_2.6-scala_2.11-el6.parcel.sha檔案:
echo $(cat parcel-el6/manifest.json | grep hash | awk -F"\"" '{print $4}') > parcel-el6/FLINK-1.6.0-hadoop_2.6-scala_2.11-el6.parcel.sha
parcel-el6/FLINK-1.6.0-hadoop_2.6-scala_2.11-el6.parcel.sha內容:
b548e8b4be3db290933222e4bd517c903d36d453
最終你會得到三個檔案:
FLINK-1.6.0-hadoop_2.6-scala_2.11-el6.parcel
FLINK-1.6.0-hadoop_2.6-scala_2.11-el6.parcel.sha
manifest.json
將這三個檔案複製到/opt/cloudera/parcel-repo/目錄下即可。
如果/opt/cloudera/parcel-repo/目錄已存在manifest.json檔案,需將以下內容:
{
"parcelName": "FLINK-1.6.0-hadoop_2.6-scala_2.11-el7.parcel",
"components": [{
"pkg_version": "1.6.0+flink1.6.0",
"version": "1.6.0-flink1.6.0",
"name": "flink",
"pkg_release": "hadoop_2.6-scala_2.11"
}],
"depends": "CDH (>= 5.2), CDH (<< 6.0)",
"hash": "ce75a90cd57aecd7e31bef15dd1221c6182e38c6"
}
新增到已有manifest.json檔案中。
-
-
- 製作standlaone csd jar
-
csd檔案的目錄結構如下:
descriptor 放服務的規則描述檔案service.sdl。
images 放服務的圖示檔案,png格式。不放圖示檔案,則CDH頁面不顯示圖示。
scripts 放你的服務的啟動指令碼,你的服務如何啟動在scripts目錄下的control.sh中自行定義即可。
建立資料夾:
mkdir -p FLINK-1.6.0/descriptor
mkdir -p FLINK-1.6.0/images
mkdir -p FLINK-1.6.0/scripts
建立service.sdl檔案:
vi FLINK-1.6.0/descriptor/service.sdl
FLINK-1.6.0/descriptor/service.sdl內容:
{
"name": "FLINK",
"label": "Flink(Standalone)",
"description": "Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams.",
"version": "1.6.0",
"compatibility": {
"generation": 1,
"cdhVersion": {
"min": "5",
"max": "5"
}
},
"runAs": {
"user": "flink",
"group": "flink"
},
"icon": "images/flink.png",
"serviceDependencies": [{
"name": "ZOOKEEPER",
"required": "true"
},
{
"name": "HDFS",
"required": "true"
}],
"parameters": [{
"name": "high-availability.storageDir",
"label": "high-availability storageDir",
"description": "HDFS path (URI) where Flink persists metadata in high-availability setups.",
"type": "string",
"default": "hdfs:///user/flink/ha",
"configurableInWizard": true
},
{
"name": "high-availability.zookeeper.path.root",
"label": "high-availability zookeeper path root",
"description": "The root path under which Flink stores its entries in ZooKeeper.",
"type": "string",
"default": "/flink",
"configurableInWizard": true
},
{
"name": "high-availability.cluster-id",
"label": "high-availability cluster-id",
"description": "The ID of the Flink cluster, used to separate multiple Flink clusters from each other.",
"type": "string",
"default": "cluster_standalone",
"configurableInWizard": true
},
{
"name": "state.checkpoints.dir",
"label": "state checkpoints dir",
"description": "HDFS path (URI) for checkpoints.",
"type": "string",
"default": "hdfs:///user/flink/cluster_standalone/checkpoints",
"configurableInWizard": true
},
{
"name": "state.savepoints.dir",
"label": "state savepoints dir",
"description": "HDFS path (URI) for savepoints.",
"type": "string",
"default": "hdfs:///user/flink/cluster_standalone/savepoints",
"configurableInWizard": true
},
{
"name": "parallelism.default",
"label": "parallelism default",
"description": "The parallelism used for programs that did not specify and other parallelism.",
"type": "long",
"default": 1,
"configurableInWizard": true
}],
"hdfsDirs": [{
"name": "CreateFlinkUserDirCommand",
"label": "Create Flink User Dir",
"description": "Creates the Flink user directory in HDFS.",
"directoryDescription": "Flink HDFS user directory",
"path": "/user/${user}",
"permissions": "0751"
}],
"serviceInit": {
"preStartSteps": [{
"commandName": "CreateFlinkUserDirCommand"
}]},
"roles": [{
"name": "FLINK_MASTER",
"label": "Flink Master",
"pluralLabel": "Flink Masters",
"startRunner": {
"program": "scripts/control.sh",
"args": ["master"],
"environmentVariables": {
"FLINK_MASTER_HOST": "${host}",
"FLINK_WEB_UI_PORT": "${rest.port}",
"FLINK_RUN_AS_USER": "${user}"
}
},
"externalLink": {
"name": "web_dashboard",
"label": "Web Dashboard",
"url": "http://${host}:${rest.port}"
},
"parameters": [
{
"name": "jobmanager.heap.size",
"label": "jobmanager heap size",
"description": "The heap size for the JobManager JVM.",
"type": "string",
"default": "1024m",
"configurableInWizard": true
},
{
"name": "rest.port",
"label": "rest port",
"description": "The port under which the web-based runtime monitor listens.",
"type": "long",
"default": 8081,
"configurableInWizard": true
}],
"topology": {
"minInstances": 1
},
"logging": {
"filename": "flink-master.log",
"isModifiable": true,
"configName": "env.log.dir",
"loggingType": "log4j",
"dir": "/var/log/flink"
},
"configWriter": {
"generators": [{
"filename": "flink-conf.properties",
"configFormat": "properties",
"includeParams": [
"high-availability.storageDir",
"high-availability.zookeeper.path.root",
"high-availability.cluster-id",
"state.savepoints.dir",
"state.checkpoints.dir",
"jobmanager.heap.size",
"parallelism.default",
"rest.port"
]
}]
}
},
{
"name": "FLINK_WORKER",
"label": "Flink Worker",
"pluralLabel": "Flink Workers",
"startRunner": {
"program": "scripts/control.sh",
"args": ["worker"],
"environmentVariables": {
"FLINK_RUN_AS_USER": "${user}"
}
},
"parameters": [{
"name": "taskmanager.heap.size",
"label": "taskmanager heap size",
"description": "The heap size for the TaskManager JVM.",
"type": "string",
"default": "1024m",
"configurableInWizard": true
},
{
"name": "taskmanager.numberOfTaskSlots",
"label": "taskmanager numberOfTaskSlots",
"description": "The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.",
"type": "long",
"default": 1,
"configurableInWizard": true
}],
"topology": {
"minInstances": 1
},
"logging": {
"filename": "flink-worker.log",
"isModifiable": true,
"configName": "env.log.dir",
"loggingType": "log4j",
"dir": "/var/log/flink"
},
"configWriter": {
"generators": [{
"filename": "flink-conf.properties",
"configFormat": "properties",
"includeParams": [
"high-availability.storageDir",
"high-availability.zookeeper.path.root",
"high-availability.cluster-id",
"state.savepoints.dir",
"state.checkpoints.dir",
"taskmanager.heap.size",
"taskmanager.numberOfTaskSlots",
"parallelism.default"
]
}]
}
}]
}
執行使用者配置:
圖示配置:
依賴元件配置:
通用引數配置:
hdfs目錄建立配置:
Flink Master節點啟動配置:
WEB UI連結配置:
Flink Master節點引數配置:
Flink Master節點topology配置:
Flink Master節點日誌配置:
Flink Master節點配置檔案生成配置:
建立control.sh檔案:
vi FLINK-1.6.0/scripts/control.sh
FLINK-1.6.0/scripts/control.sh內容:
#!/bin/bash
# For better debugging
USAGE="Usage: control.sh ((master|worker) (start|stop))"
NODE_TYPE=$1
NODE_HOST=`hostname -f`
#Determine if the directory exists
TEMP_PATH=$CMF_VAR/../cloudera/parcels
if [ ! -d "$TEMP_PATH" ];then
TEMP_PATH=$CMF_VAR/../../cloudera/parcels
fi
PARCELS_DIR=`cd $TEMP_PATH; pwd`
FLINK_HOME=$PARCELS_DIR/FLINK-1.6.0-hadoop_2.6-scala_2.11/lib/flink
#Determine if the configuration file directory exists
FLINK_CONF_DIR=$CONF_DIR/flink-conf
if [ ! -d "$FLINK_CONF_DIR" ];then
mkdir $FLINK_CONF_DIR
else
rm -rf $FLINK_CONF_DIR/*
fi
cp $FLINK_HOME/conf/* $FLINK_CONF_DIR/
sed -i 's#=#: #g' $CONF_DIR/flink-conf.properties
if [ "$NODE_TYPE" = "master" ]; then
RPC_ADDRESS=`cat $CONF_DIR/flink-conf.properties | grep "jobmanager.rpc.address:"`
#Determine if the variable RPC_ADDRESS is empty
if [ "$RPC_ADDRESS" = "" ]; then
echo "jobmanager.rpc.address: $FLINK_MASTER_HOST" >> $CONF_DIR/flink-conf.properties