1. 程式人生 > 其它 >Flink上執行jar包時報錯 Missing required configuration “key.serializer“ which has no default value.

Flink上執行jar包時報錯 Missing required configuration “key.serializer“ which has no default value.

技術標籤:flinkjava伺服器kafka

業務

flink將分析之後的資料放到Kafka上面

遇到問題

為了class被載入時就將配置載入進去,將所有的環境還有配置加static。

public static Properties fromPro = new Properties();
	public static Properties toFisPro = new Properties();
	public static StreamExecutionEnvironment stream = StreamExecutionEnvironment.getExecutionEnvironment
(); public static FlinkKafkaConsumer<String> consumer ; static { fromPro.setProperty("bootstrap.servers", ""); fromPro.setProperty("key.deserializer",StringDeserializer.class.getName; fromPro.setProperty("value.deserializer",StringDeserializer.class
.getName()); consumer = new FlinkKafkaConsumer<String>("topic",new SimpleStringSchema(), fromPro); toFisPro.put("bootstrap.servers", ""); toFisPro.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); toFisPro.
put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

在本地執行時發現都可以執行的很好 但是將程式碼打包之後放在flink視覺化介面上進行執行,測試其是否可用。我以為百分之百可以執行,沒想到剛放上去就報了錯
Missing required configuration “key.serializer” which has no default value.

解決問題

我起初以為是kafka的key值錯了 於是將key值改了又改發現還是有問題,接下來由於在向kafka上推資料時候也出現了錯,這時就懷疑時資料的問題,又對資料進行糾正,還是沒有問題。此時發現問題是配置的問題也就是序列化的問題,將配置各種試發現還是有問題。
此時我只能尋求大佬解決問題,大佬也找了好半天,到最後發現問題
原來就是static惹得禍,我單單想著讓靜態化,加快運算。這個錯真的不易發現

		Properties fromPro = new Properties();
		Properties toFisPro = new Properties();
		StreamExecutionEnvironment stream = StreamExecutionEnvironment.getExecutionEnvironment();
		
		FlinkKafkaConsumer<String> consumer ;

將程式碼修改成以上之後將靜態去掉之後
修改之後重新打包就執行成功,長呼一口氣終於解決問題

總結

我不太清楚Flink執行jar包的原理,我估計是因為flink得指定入口類,它會先找execute()對程式碼進行檢查,我感覺它不會檢查static修飾的,於是flink發現key.serializer沒有預設值,就會報錯。我的錯誤是static修飾的,其要在類載入時才會將它的值載入進去,給key.serializer賦值。
之後我將static去掉之後就會將配置給載入進去,錯誤就解決了。

**

如果有用記得點贊哦

**
我是騎碼找碼,我一直在路上