1. 程式人生 > >flume+kafka+storm+mysql 資料流

flume+kafka+storm+mysql 資料流

今天終於將 flume + kafka + storm + mysql 這條資料流走通了,雖然只是一個簡單的測試例子,但是依據這條資料流可以做的事情很多。

先簡單看一下這幾個工具的架構吧,架構圖會更好說明:

flume的架構圖:

kafka的架構圖:


storm的架構圖:

我們使用的  flume + kafka + storm +mysql的資料流架構圖:

下面介紹一下kafka到storm的配置:

其實這些都是通過java程式碼實現的,這裡用到了 KafkaSpout類,RDBMSDumperBolt類(以後這些可以作為工具類打包上傳到叢集中)

storm作業中,我們寫了一個KafkaStormRdbms類,作業具體配置如下:

首先設定連線mysql的引數:

        ArrayList<String> columnNames = new ArrayList<String>();
        ArrayList<String> columnTypes = new ArrayList<String>();
        String tableName = "stormTestTable_01";
        // Note: if the rdbms table need not to have a primary key, set the variable 'primaryKey' to 'N/A'
        // else set its value to the name of the tuple field which is to be treated as primary key
        String primaryKey = "N/A";
        String rdbmsUrl = "jdbc:mysql://$hostname:3306/fuqingwuDB" ;
        String rdbmsUserName = "fuqingwu";
        String rdbmsPassword = "password";

        //add the column names and the respective types in the two arraylists
        columnNames.add("word");

        //add the types
        columnTypes.add("varchar (100)");

配置 KafkaSpout 及 Topology:
TopologyBuilder builder = new TopologyBuilder();
		
		List<String> hosts = new ArrayList<String>();
        hosts.add("hadoop01");
        SpoutConfig spoutConf = SpoutConfig.fromHostStrings(hosts, 1, "flume_kafka", "/root", "id");
        spoutConf.scheme = new StringScheme();
        spoutConf.forceStartOffsetTime(-2);
		
        spoutConf.zkServers = new ArrayList<String>() {{
        	          add("hadoop01"); 
        	        }};
        spoutConf.zkPort = 2181;
        
		//set the spout for the topology
		builder.setSpout("spout",  new KafkaSpout(spoutConf), 1);

		//dump the stream data into rdbms table		
		RDBMSDumperBolt dumperBolt = new RDBMSDumperBolt(primaryKey, tableName, columnNames, columnTypes, rdbmsUrl, rdbmsUserName, rdbmsPassword);
		builder.setBolt("dumperBolt",dumperBolt, 1).shuffleGrouping("spout");