1. 程式人生 > >從kafka到flink到hbase的心酸路程示例(希望有用)

從kafka到flink到hbase的心酸路程示例(希望有用)

如果好的大家就給個贊啊,回粉下,給點鼓勵。一定是效率文章

1.首先建立maven工程。

2.依賴檔案如下

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-core</artifactId>
        <version>1.3.2</version>
    </dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.10_2.11 -->
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.10_2.11</artifactId> <version>1.3.2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka_2.11 --> <dependency>
<groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>0.10.2.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java_2.11 --> <dependency> <groupId>
org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>1.3.2</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-hbase_2.11</artifactId> <version>1.3.2</version> </dependency> </dependencies>

3.匯入依賴接下來是程式碼編寫

注意幾點:程式碼裡面需要配置zk的地址,以及hbase的zk。詳細就不多說了直接貼程式碼

/**
 *
 * @author ${xiniu}
 * @version $Id: flink_hbase.java,
 */
public class flink_hbase {


    private static String hbaseZookeeperQuorum = "10.25.135.53,10.45.149.164,10.45.151.125";
    private static String hbaseZookeeperClinentPort = "2181";
    private static TableName tableName = TableName.valueOf("testflink");
    private static final String columnFamily = "cf1";
    public static void main(String[] args) {


        final String ZOOKEEPER_HOST = "10.25.135.53:2181,10.45.149.164:2181,10.45.151.125:2181";
        final String KAFKA_HOST = "10.45.151.125:9092,10.45.150.142:9092";
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(1000); // 非常關鍵,一定要設定啟動檢查點!!
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Properties props = new Properties();
props.setProperty("zookeeper.connect", ZOOKEEPER_HOST);
props.setProperty("bootstrap.servers", KAFKA_HOST);
props.setProperty("group.id", "test-consumer-group");
DataStream<String> transction = env.addSource(new FlinkKafkaConsumer010<String>("test2", new SimpleStringSchema(), props));
//DataStream<String> transction1 = env.addSource(new FlinkKafkaConsumer010<String>("test3",new SimpleStringSchema(), props));
transction.rebalance().map(new MapFunction<String, Object>() {
           public String map(String value)throws IOException{


               writeIntoHBase(value);
               return value;
}

        }).print();
//transction.writeAsText("/home/admin/log2");
        // transction.addSink(new HBaseOutputFormat();
try {
            env.execute();
} catch (Exception ex) {

            Logger.getLogger(flink_hbase.class.getName()).log(Level.SEVERE, null, ex);
ex.printStackTrace();
}
    }

    public static void writeIntoHBase(String m)throws IOException
    {
        org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create();
config.set("hbase.zookeeper.quorum", hbaseZookeeperQuorum);
config.set("hbase.master", "10.45.151.26:60000");
config.set("hbase.zookeeper.property.clientPort", hbaseZookeeperClinentPort);
config.setInt("hbase.rpc.timeout", 20000);
config.setInt("hbase.client.operation.timeout", 30000);
config.setInt("hbase.client.scanner.timeout.period", 200000);
//config.set(TableOutputFormat.OUTPUT_TABLE, hbasetable);
Connection c = ConnectionFactory.createConnection(config);
Admin admin = c.getAdmin();
        if(!admin.tableExists(tableName)){
            admin.createTable(new HTableDescriptor(tableName).addFamily(new HColumnDescriptor(columnFamily)));
}
        Table t = c.getTable(tableName);
TimeStamp ts = new TimeStamp(new Date());
Date date = ts.getDate();
Put put = new Put(org.apache.hadoop.hbase.util.Bytes.toBytes(date.toString()));
put.addColumn(org.apache.hadoop.hbase.util.Bytes.toBytes(columnFamily), org.apache.hadoop.hbase.util.Bytes.toBytes("test"),
org.apache.hadoop.hbase.util.Bytes.toBytes(m));
t.put(put);
t.close();
c.close();
}
}

 當然當你上傳到伺服器的時候有好幾種錯誤,貼出來


這種很明顯的是因為slot不夠用,那麼你首先你去檢視自己的從節點是不是有任務佔用了,結束任務就好。

問題2


這個錯誤是因為你本地hosts檔案中沒有配置你的hbase主機,本地伺服器或者機器不能找到,無法連線你的hbase。那你需要去本地hosts檔案中

配置你的hbase伺服器的 ip+主機名。

 當然這種還有可能是你的機器超時時常設定的太短了,那麼可以一下程式碼進行設定:

config.setInt("hbase.rpc.timeout", 20000);
config.setInt("hbase.client.operation.timeout", 30000);
config.setInt("hbase.client.scanner.timeout.period", 200000);
以上問題是常遇見,新手比較刺手的問題。

當然還有其他的一些小問題,提供幾個解決思路,

1.首先有可能是因為你打包的時候依賴沒有打進來

2.你機器磁碟佔用了空間你的jar不能解壓到機器上,帶式無法識別一些依賴。

3.另外遇見哪些找不見主類的什麼的,基本是打包或者那個細節沒注意。

   上面的例子是可以跑通的,如果新手可以去直接去跑。另外遇見的問題應該上面都有,歡迎交流。有新問題或者建議可以留言!

 有用就贊,有問題就提出來,歡迎關注常來交流              

--犀牛。多多指導