1. 程式人生 > >Flink 程式碼方式提交程式到遠端叢集執行

Flink 程式碼方式提交程式到遠端叢集執行

在學習Flink時候,看到如下方法,可以獲取到遠端叢集上的一個ExecutionEnvironment例項,便嘗試使用一下,將本地IDE作業提交到叢集執行,程式碼如下:

  def createRemoteEnvironment(host: String, port: Int, jarFiles: String*): ExecutionEnvironment 
程式碼:
package com.daxin.batch

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.{ConfigConstants, Configuration}
//important: this import is needed to access the 'createTypeInformation' macro function
import org.apache.flink.api.scala._
/**
  * Created by Daxin on 2017/4/17.
  * https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/types_serialization.html#type-information-in-the-scala-api
  */
object RemoteJob {
  def main(args: Array[String]) {

    val env = ExecutionEnvironment.createRemoteEnvironment("node", 6123)

    val words = env.readTextFile("hdfs://node:9000/word/spark-env.sh")

    val data = words.flatMap(x => x.split(" ")).map(x => (x, 1)).groupBy(0).sum(1)

    println(data.count) //簡單觸發作業列印一下個數
  }
}

執行報錯,百度,谷歌,bing搜尋了老半天也沒有解決。為了以後方便搜尋到此錯誤,將粘出全部異常資訊:
Submitting job with JobID: 2e9a9550e8352e8f6cfd579b3522a732. Waiting for job completion.
Connected to JobManager at Actor[akka.tcp://[email protected]:6123/user/jobmanager#950641914]
04/19/2017 19:37:21	Job execution switched to status RUNNING.
04/19/2017 19:37:21	CHAIN DataSource (at com.daxin.batch.RemoteJob$.main(RemoteJob.scala:25) (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at com.daxin.batch.RemoteJob$.main(RemoteJob.scala:27)) -> Map (Map at com.daxin.batch.RemoteJob$.main(RemoteJob.scala:27)) -> Combine(SUM(1))(1/1) switched to SCHEDULED 
04/19/2017 19:37:21	CHAIN DataSource (at com.daxin.batch.RemoteJob$.main(RemoteJob.scala:25) (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at com.daxin.batch.RemoteJob$.main(RemoteJob.scala:27)) -> Map (Map at com.daxin.batch.RemoteJob$.main(RemoteJob.scala:27)) -> Combine(SUM(1))(1/1) switched to DEPLOYING 
04/19/2017 19:37:21	CHAIN DataSource (at com.daxin.batch.RemoteJob$.main(RemoteJob.scala:25) (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at com.daxin.batch.RemoteJob$.main(RemoteJob.scala:27)) -> Map (Map at com.daxin.batch.RemoteJob$.main(RemoteJob.scala:27)) -> Combine(SUM(1))(1/1) switched to RUNNING 
04/19/2017 19:37:21	CHAIN DataSource (at com.daxin.batch.RemoteJob$.main(RemoteJob.scala:25) (org.apache.flink.api.java.io.TextInputFormat)) -> FlatMap (FlatMap at com.daxin.batch.RemoteJob$.main(RemoteJob.scala:27)) -> Map (Map at com.daxin.batch.RemoteJob$.main(RemoteJob.scala:27)) -> Combine(SUM(1))(1/1) switched to FAILED 
java.lang.RuntimeException: The initialization of the DataSource's outputs caused an error: The type serializer factory could not load its parameters from the configuration due to missing classes.
	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:92)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
	at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.RuntimeException: The type serializer factory could not load its parameters from the configuration due to missing classes.
	at org.apache.flink.runtime.operators.util.TaskConfig.getTypeSerializerFactory(TaskConfig.java:1145)
	at org.apache.flink.runtime.operators.util.TaskConfig.getOutputSerializer(TaskConfig.java:551)
	at org.apache.flink.runtime.operators.BatchTask.getOutputCollector(BatchTask.java:1216)
	at org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1295)
	at org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:286)
	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:90)
	... 2 more
Caused by: java.lang.ClassNotFoundException: com.daxin.batch.RemoteJob$$anon$2$$anon$1
	at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:270)
	at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:66)
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:292)
	at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:250)
	at org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory.readParametersFromConfig(RuntimeSerializerFactory.java:76)
	at org.apache.flink.runtime.operators.util.TaskConfig.getTypeSerializerFactory(TaskConfig.java:1143)
	... 7 more

04/19/2017 19:37:21	Job execution switched to status FAILING.
java.lang.RuntimeException: The initialization of the DataSource's outputs caused an error: The type serializer factory could not load its parameters from the configuration due to missing classes.
	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:92)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
	at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.RuntimeException: The type serializer factory could not load its parameters from the configuration due to missing classes.
	at org.apache.flink.runtime.operators.util.TaskConfig.getTypeSerializerFactory(TaskConfig.java:1145)
	at org.apache.flink.runtime.operators.util.TaskConfig.getOutputSerializer(TaskConfig.java:551)
	at org.apache.flink.runtime.operators.BatchTask.getOutputCollector(BatchTask.java:1216)
	at org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1295)
	at org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:286)
	at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:90)
	... 2 more
Caused by: java.lang.ClassNotFoundException: com.daxin.batch.RemoteJob$$anon$2$$anon$1
	at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
	at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
	at java.security.AccessController.doPrivileged(Native Method)
	at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
	at java.lang.Class.forName0(Native Method)
	at java.lang.Class.forName(Class.java:270)
	at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:66)
	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
	at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:292)
	at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:250)
	at org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory.readParametersFromConfig(RuntimeSerializerFactory.java:76)
	at org.apache.flink.runtime.operators.util.TaskConfig.getTypeSerializerFactory(TaskConfig.java:1143)
	... 7 more
04/19/2017 19:37:21	Reduce (SUM(1))(1/1) switched to CANCELED 
04/19/2017 19:37:21	DataSink (
[email protected]
)(1/1) switched to CANCELED 04/19/2017 19:37:21 Job execution switched to status FAILED. Exception in thread "main" org.apache.flink.client.program.ProgramInvocationException: The program execution failed: Job execution failed. at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:427) at org.apache.flink.client.program.StandaloneClusterClient.submitJob(StandaloneClusterClient.java:101) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:400) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:387) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:362) at org.apache.flink.client.RemoteExecutor.executePlanWithJars(RemoteExecutor.java:211) at org.apache.flink.client.RemoteExecutor.executePlan(RemoteExecutor.java:188) at org.apache.flink.api.java.RemoteEnvironment.execute(RemoteEnvironment.java:172) at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:926) at org.apache.flink.api.scala.ExecutionEnvironment.execute(ExecutionEnvironment.scala:672) at org.apache.flink.api.scala.DataSet.count(DataSet.scala:529) at com.daxin.batch.RemoteJob$.main(RemoteJob.scala:29) at com.daxin.batch.RemoteJob.main(RemoteJob.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144) Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply$mcV$sp(JobManager.scala:900) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$6.apply(JobManager.scala:843) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.RuntimeException: The initialization of the DataSource's outputs caused an error: The type serializer factory could not load its parameters from the configuration due to missing classes. at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:92) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655) at java.lang.Thread.run(Thread.java:744) Caused by: java.lang.RuntimeException: The type serializer factory could not load its parameters from the configuration due to missing classes. at org.apache.flink.runtime.operators.util.TaskConfig.getTypeSerializerFactory(TaskConfig.java:1145) at org.apache.flink.runtime.operators.util.TaskConfig.getOutputSerializer(TaskConfig.java:551) at org.apache.flink.runtime.operators.BatchTask.getOutputCollector(BatchTask.java:1216) at org.apache.flink.runtime.operators.BatchTask.initOutputs(BatchTask.java:1295) at org.apache.flink.runtime.operators.DataSourceTask.initOutputs(DataSourceTask.java:286) at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:90) ... 2 more Caused by: java.lang.ClassNotFoundException: com.daxin.batch.RemoteJob$$anon$2$$anon$1 at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:270) at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:66) at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612) at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:292) at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:250) at org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory.readParametersFromConfig(RuntimeSerializerFactory.java:76) at org.apache.flink.runtime.operators.util.TaskConfig.getTypeSerializerFactory(TaskConfig.java:1143) ... 7 more

注意到有一行異常資訊:
java.lang.RuntimeException: The initialization of the DataSource's outputs caused an error: The type serializer factory could not load its parameters from the configuration due to missing classes.

總以為是序列化的問題,反覆檢視文件也沒有找到解決訪問!最後又回頭查了一下Api文件,發現createRemoteEnvironment方法的第三個引數是一個可變引數,並不是有預設值,這個被Scala函式可以提供預設值給思維定勢了,後來加上第三個引數為作業程式的Jar之後便可以正確提交到遠端叢集運行了!

正確程式碼如下:

package com.daxin.batch

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.configuration.{ConfigConstants, Configuration}
//important: this import is needed to access the 'createTypeInformation' macro function
import org.apache.flink.api.scala._
/**
  * Created by Daxin on 2017/4/17.
  * https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/types_serialization.html#type-information-in-the-scala-api
  */
object RemoteJob {
  def main(args: Array[String]) {

    val env = ExecutionEnvironment.createRemoteEnvironment("node", 6123,"C://logs//flink-lib//flinkwordcount.jar")

    val words = env.readTextFile("hdfs://node:9000/word/spark-env.sh")

    val data = words.flatMap(x => x.split(" ")).map(x => (x, 1)).groupBy(0).sum(1)

    println(data.count) //簡單觸發作業列印一下個數
  }
}


最後注意:如果是為了方便原生代碼打包在叢集中執行的話,最好保持程式碼和jar一致性,言外之意就是修改之後最好也從新打jar包

相關推薦

Flink 程式碼方式提交程式遠端叢集執行

在學習Flink時候,看到如下方法,可以獲取到遠端叢集上的一個ExecutionEnvironment例項,便嘗試使用一下,將本地IDE作業提交到叢集執行,程式碼如下: def createRemoteEnvironment(host: String, port:

本地Spark程式提交到hadoop叢集執行流程

1.本地環境準備 本文是將eclipse開發環境下的maven+Spark+scala程式移植到叢集環境上執行過程,寫的很粗糙,見諒。 本地用eclipse編寫Spark小程式,完成從txt檔案讀取資料操作。 本地maven+Spark+scala環境就不多說了,如果配置出

【TestNG】使用程式碼方式呼叫TestNG用例執行

TestNG的用例除了直接執行之外,還可以使用程式碼來呼叫,這樣做的好處在於我們可以將其嵌入其他程式碼中,來執行這些TestNG用例,方法如下: 1、直接呼叫用例類 範例: 定義了兩個測試用例類為DependTest1.java和FactoryTest.java: 再做一個main函

程式碼方式配置Log4j並實現執行緒級日誌管理 第五部分

文章目錄 一 第三方程式不應僅限於會用 二 同時按日期和檔案大小備份 三 調整ThreadLogger使用複寫的Appender 四 簡單測試下 五 結語 一 第三方程式不應僅限於會用   關於日誌

程式碼方式配置Log4j並實現執行緒級日誌管理 第四部分

文章目錄 一 非同步輸出模式 二 增加非同步輸出模式開關 三 重構日誌輸出介面 四 非同步處理執行緒 五 總結及一些其他的建議 一 非同步輸出模式   目前尚剩餘兩個需求,一個是實現日誌的非同步輸出

程式碼方式配置Log4j並實現執行緒級日誌管理 第三部分

文章目錄 一 對第二部分的一些補充 二 如何實現執行緒級日誌物件管理 三 實現ThreadLogger 四 重構LogUtil 一 對第二部分的一些補充   第二部分用很簡單的樣例來描述了Logger物件的

程式碼方式配置Log4j並實現執行緒級日誌管理 第二部分

文章目錄 一 設計類結構 二 成員設計 三 方法設計 四 實現Logger物件例項化方法 一 設計類結構   第一部分說了兩件事兒: 如何根據配置檔案分析有用的資訊以便對底層結構進行挖掘

程式碼方式配置Log4j並實現執行緒級日誌管理 第一部分

文章目錄 一 為什麼寫略顯過時的東西 二 需求 三 程式設計能力如何提升 四 我是如何分析Log4j的 一 為什麼寫略顯過時的東西   講道理,現在還說Log4j有點過時了,因為自從Log4j2問世,全世界

idea打spark jar包並提交到spark叢集執行

打包打包檔案:File-->>ProjectStructure -->點選Artificats-->>點選綠色加號 --> 點選JAR-->>選擇 From module with dependices  點選Output La

git 修改程式碼提交遠端

修改檔案後,如何提交到git伺服器? (1)首先需要add,比如.config是被修改的檔案,則  git add .config (2)然後執行git commit -m "modify .config for some reason" (3)然後git push 到git伺服器 (4)更新:

scala編寫的Spark程式遠端提交到伺服器叢集執行

一.需要的軟體: eclipse 相應版本的scalaIDE 與叢集一樣的spark安裝包,主要是要用到spark中的jar包 與叢集一樣的hadoop安裝包 與hadoop版本對應的winutil.exe,hadoop.dll(只要版本差距不大不一樣也沒關

Spark on yarn--幾種提交叢集執行spark程式方式

今天看了spark的yarn配置,本來想著spark在hadoop叢集上啟動之後,還需要配置spark才能讓yarn來管理和排程spark的資源,原來啟動master和worker之後就會讓yarn來

pvuv的程式碼開發及提交spark程式jar包執行讀取資料來源並將結果寫入MySQL中

目錄 PvUvToMysql類 ConnectionUtils類 jdbc.properties檔案 在IDEA中打jar包的兩種方式 IDEA打jar包 IDEA中maven方式打jar包 提交spark程式ja

MR程式本地除錯,提交叢集執行

在本地除錯,提交到叢集上執行。 在本地程式中的Configuration中新增如下配置: Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://192.168.136.128:9000"); Sys

java中synchronized修飾程式碼塊(兩種建立執行緒的方式講解賣票程式

格式: synchronized(類物件名 aa) { //同步程式碼塊 } 功能: synchronized(類物件名 aa)的含義是:判斷aa是否已經被其他執行緒所霸佔,如果發現已經被其他執行緒霸

git 一個分支程式碼提交遠端倉新分支(新建分支)

背景: 從branchA分支拉了一份程式碼,做了一些修改,但是不想提交到branchA分支,想新建一個分支branchB儲存程式碼。 操作方法: 新增本地需要提交程式碼 git add . 1 提交原生代碼 git commit -m "add my code to new branchB" 1

java動態執行程式碼或者第三方程式並返回pid,殺掉程序

java動態執行程式碼或者第三方程式並返回pid,殺掉程序 使用java動態執行Java程式碼或者呼叫第三方軟體,如下程式碼即可 Process child = Runtime.getRuntime().exec(cmd); 只要寫好cmd命令即可,如何同時返回程序的pid呢,這樣可



《從0到1學習Flink》—— Mac 上搭建 Flink 1.6.0 環境並構建執行簡單程式入門

準備工作 1、安裝檢視 Java 的版本號,推薦使用 Java 8。 安裝 Flink 2、在 Mac OS X 上安裝 Flink 是非常方便的。推薦通過 homebrew 來安裝。 brew install apache-flink 複製程式碼 3、檢查安裝: flink --

把mapreduce執行遠端叢集上遇到的問題

  Exception in thread "main" java.io.IOException: The ownership on the staging directory /tmp/hadoop-yarn/staging/hadoop/.staging is not as exp

配置IDEA開發環境向遠端叢集提交MapReduce應用

本文的主要目的 本文主要記錄了通過windows10上的IDEA向遠端HADOOP叢集提交應用的配置過程。 安裝配置HADOOP叢集 略 安裝配置IDEA 略 配置windows端HADOOP客戶端 複製叢集中的hadoop資料夾到windows,作為