flume+kafka+storm+mysql 資料流
阿新 • • 發佈:2019-02-01
今天終於將 flume + kafka + storm + mysql 這條資料流走通了,雖然只是一個簡單的測試例子,但是依據這條資料流可以做的事情很多。
先簡單看一下這幾個工具的架構吧,架構圖會更好說明:
flume的架構圖:
kafka的架構圖:
storm的架構圖:
我們使用的 flume + kafka + storm +mysql的資料流架構圖:
下面介紹一下kafka到storm的配置:
其實這些都是通過java程式碼實現的,這裡用到了 KafkaSpout類,RDBMSDumperBolt類(以後這些可以作為工具類打包上傳到叢集中)
storm作業中,我們寫了一個KafkaStormRdbms類,作業具體配置如下:
首先設定連線mysql的引數:
ArrayList<String> columnNames = new ArrayList<String>(); ArrayList<String> columnTypes = new ArrayList<String>(); String tableName = "stormTestTable_01"; // Note: if the rdbms table need not to have a primary key, set the variable 'primaryKey' to 'N/A' // else set its value to the name of the tuple field which is to be treated as primary key String primaryKey = "N/A"; String rdbmsUrl = "jdbc:mysql://$hostname:3306/fuqingwuDB" ; String rdbmsUserName = "fuqingwu"; String rdbmsPassword = "password"; //add the column names and the respective types in the two arraylists columnNames.add("word"); //add the types columnTypes.add("varchar (100)");
配置 KafkaSpout 及 Topology:
TopologyBuilder builder = new TopologyBuilder(); List<String> hosts = new ArrayList<String>(); hosts.add("hadoop01"); SpoutConfig spoutConf = SpoutConfig.fromHostStrings(hosts, 1, "flume_kafka", "/root", "id"); spoutConf.scheme = new StringScheme(); spoutConf.forceStartOffsetTime(-2); spoutConf.zkServers = new ArrayList<String>() {{ add("hadoop01"); }}; spoutConf.zkPort = 2181; //set the spout for the topology builder.setSpout("spout", new KafkaSpout(spoutConf), 1); //dump the stream data into rdbms table RDBMSDumperBolt dumperBolt = new RDBMSDumperBolt(primaryKey, tableName, columnNames, columnTypes, rdbmsUrl, rdbmsUserName, rdbmsPassword); builder.setBolt("dumperBolt",dumperBolt, 1).shuffleGrouping("spout");