Flink上執行jar包時報錯 Missing required configuration “key.serializer“ which has no default value.
阿新 • • 發佈:2020-12-24
業務
flink將分析之後的資料放到Kafka上面
遇到問題
為了class被載入時就將配置載入進去,將所有的環境還有配置加static。
public static Properties fromPro = new Properties();
public static Properties toFisPro = new Properties();
public static StreamExecutionEnvironment stream = StreamExecutionEnvironment.getExecutionEnvironment ();
public static FlinkKafkaConsumer<String> consumer ;
static {
fromPro.setProperty("bootstrap.servers", "");
fromPro.setProperty("key.deserializer",StringDeserializer.class.getName; fromPro.setProperty("value.deserializer",StringDeserializer.class .getName());
consumer = new FlinkKafkaConsumer<String>("topic",new SimpleStringSchema(), fromPro);
toFisPro.put("bootstrap.servers", "");
toFisPro.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
toFisPro. put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
在本地執行時發現都可以執行的很好 但是將程式碼打包之後放在flink視覺化介面上進行執行,測試其是否可用。我以為百分之百可以執行,沒想到剛放上去就報了錯
Missing required configuration “key.serializer” which has no default value.
解決問題
我起初以為是kafka的key值錯了 於是將key值改了又改發現還是有問題,接下來由於在向kafka上推資料時候也出現了錯,這時就懷疑時資料的問題,又對資料進行糾正,還是沒有問題。此時發現問題是配置的問題也就是序列化的問題,將配置各種試發現還是有問題。
此時我只能尋求大佬解決問題,大佬也找了好半天,到最後發現問題
原來就是static惹得禍,我單單想著讓靜態化,加快運算。這個錯真的不易發現
Properties fromPro = new Properties();
Properties toFisPro = new Properties();
StreamExecutionEnvironment stream = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkKafkaConsumer<String> consumer ;
將程式碼修改成以上之後將靜態去掉之後
修改之後重新打包就執行成功,長呼一口氣終於解決問題
總結
我不太清楚Flink執行jar包的原理,我估計是因為flink得指定入口類,它會先找execute()對程式碼進行檢查,我感覺它不會檢查static修飾的,於是flink發現key.serializer沒有預設值,就會報錯。我的錯誤是static修飾的,其要在類載入時才會將它的值載入進去,給key.serializer賦值。
之後我將static去掉之後就會將配置給載入進去,錯誤就解決了。
**
如果有用記得點贊哦
**
我是騎碼找碼,我一直在路上