storm實時消費kafka資料
阿新 • • 發佈:2019-01-11
- 程式環境,在kafka建立名稱為data的topic,開啟消費者模式,準備輸入資料。
- 程式的pom.xml檔案
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.0.2</version>
<scope>provided</scope>
</dependency >
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId >
<version>0.8.2.0</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.14</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId >log4j-over-slf4j</artifactId>
<version>1.7.21</version>
</dependency>
</dependencies>
3.spout程式碼
public class MykafkaSpout {
/**
* @param args
* @throws AuthorizationException
*/
public static void main(String[] args) throws AuthorizationException {
// TODO Auto-generated method stub
String topic = "data" ;
ZkHosts zkHosts = new ZkHosts("192.168.59.132:2181");
SpoutConfig spoutConfig = new SpoutConfig(zkHosts, topic,
"",
"MyTrack") ;
List<String> zkServers = new ArrayList<String>() ;
zkServers.add("192.168.59.132");
spoutConfig.zkServers = zkServers;
spoutConfig.zkPort = 2181;
spoutConfig.socketTimeoutMs = 60 * 1000 ;
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()) ;
TopologyBuilder builder = new TopologyBuilder() ;
builder.setSpout("spout", new KafkaSpout(spoutConfig) ,1) ;
builder.setBolt("bolt1", new MyKafkaBolt(), 1).shuffleGrouping("spout") ;
Config conf = new Config ();
conf.setDebug(false) ;
if (args.length > 0) {
try {
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
}
}else {
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("mytopology", conf, builder.createTopology());
}
}
}
4.bolt程式碼,這裡為了簡化,只把資料打印出來
public class MyKafkaBolt implements IBasicBolt {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void cleanup() {
// TODO Auto-generated method stub
}
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
// TODO Auto-generated method stub
String kafkaMsg = input.getString(0) ;
System.err.println("bolt:"+kafkaMsg);
}
@Override
public void prepare(Map stormConf, TopologyContext context) {
// TODO Auto-generated method stub
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
}
@Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
}
5.如何確定SpoutConfig中的zkRoot,檢視kafka中的server.properties檔案,如果zookeeper.connect後面沒有跟/bc這種就是,直接為”“,否則zkRoot為bc,就類似於zookeeper.connect=localhostlei1:2181,localhostlei2:2181,localhostlei3:2181/bc
# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=localhostlei1:2181,localhostlei2:2181,localhostlei3:2181
6.開始任務後,嘗試往kafka中寫入資料,資料就能馬上被storm所消費。