storm整合kafka,spout作為kafka的消費者
阿新 • • 發佈:2018-12-31
在之前的部落格中記錄,如何在專案storm中把每條記錄作為訊息傳送到kafka訊息佇列中的。這裡講述如何在storm中消費kafka佇列中的訊息。為何在專案中兩個拓撲檔案校驗和預處理之間要用kafka訊息佇列進行資料的暫存仍需要去落實。
專案中直接使用storm提供的kafkaSpout作為訊息佇列的消費者。實現spout從kafka訊息佇列獲取資料,作為拓撲的資料來源。
package com.lancy.topology;
import java.util.Arrays;
import org.apache.storm.Config;
import org.apache.storm .LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.kafka.BrokerHosts;
import org.apache.storm.kafka .KafkaSpout;
import org.apache.storm.kafka.SpoutConfig;
import org.apache.storm.kafka.StringScheme;
import org.apache.storm.kafka.ZkHosts;
import org.apache.storm.spout.SchemeAsMultiScheme;
import org.apache.storm.topology.TopologyBuilder;
import com.lancy.common.ConfigCommon;
import com.lancy.common .pre.TopoStaticName;
import com.lancy.spout.GetDataFromKafkaSpoutBolt;
public class LntPreHandleTopology implements Runnable {
private static final String CONFIG_ZOOKEEPER_HOST = ConfigCommon.getInstance().ZOOKEEPER_HOST_PORT + "/kafka";//127.0.0.1:2181/kafka類似此
private static final String CONFIG_TOPIC = ConfigCommon.getInstance().KAFKA_LNT_VALID_DATA_TOPIC;//topic的名稱
private static final String CONFIG_OFFSET_ZK_PATH = "/kafka/storm_offset" + "/" + CONFIG_TOPIC;//偏移量offset的根目錄
private static final String CONFIG_OFFSET_ZK_CUSTOMER_GROUP_ID = ConfigCommon.getInstance().KAFKA_LNT_VALID_CUSTOMER_ID;
@Override
public void run() {
exe(new String[] { "lnt" });
}
public static void exe(String[] args) {
// 註冊 ZooKeeper 主機
BrokerHosts brokerHosts = new ZkHosts(CONFIG_ZOOKEEPER_HOST, "/brokers");
// 配置 Spout
SpoutConfig spoutConfig = new SpoutConfig(brokerHosts, CONFIG_TOPIC, CONFIG_OFFSET_ZK_PATH,CONFIG_OFFSET_ZK_CUSTOMER_GROUP_ID);
if (args == null || args.length == 0) {
//如果輸入引數為空,這裡把這種情況弄成了本地模式
//KafkaSpout初始化時,會去取spoutConfig.zkServers 和 spoutConfig.zkPort 變數的值,而該值預設是沒塞的,所以是空,
//那麼它就會去取當前執行的Storm所配置的zookeeper地址和埠,而本地執行的Storm,是一個臨時的zookeeper例項,
//並不會真正持久化。所以,每次關閉後,資料就沒了。本地模式,要顯示的去配置
String CONFIG_OFFSET_ZK_HOST = ConfigCommon.getInstance().ZOOKEEPER_HOST;
int CONFIG_OFFSET_ZK_PORT = Integer.parseInt(ConfigCommon.getInstance().ZOOKEEPER_PORT);
// kafka offet記錄,,使用的zookeeper地址
spoutConfig.zkServers = Arrays.asList(CONFIG_OFFSET_ZK_HOST.split(","));
// kafka offet記錄,,使用的zookeeper埠
spoutConfig.zkPort = CONFIG_OFFSET_ZK_PORT;
// spoutConfig.ignoreZkOffsets = true;
}
// spoutConfig.ignoreZkOffsets = true;
// 配置 Scheme(可選)
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());//StringScheme告訴KafkaSpout如何去解碼資料,生成Storm內部傳遞資料
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
TopologyBuilder builder = builderTopology(kafkaSpout);
Config config = new Config();
config.setDebug(false);
config.setNumWorkers(8);
config.setNumAckers(8);
config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 10240);
config.put(Config.TOPOLOGY_BACKPRESSURE_ENABLE, false);
config.put(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE, 16384);
config.put(Config.TOPOLOGY_EXECUTOR_SEND_BUFFER_SIZE, 16384);
if (args != null && args.length > 0) {
try {
StormSubmitter.submitTopology("prehanlder-topology", config, builder.createTopology());
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
} catch (AuthorizationException e) {
e.printStackTrace();
}
} else {
// 測試環境採用 local mode 本地模式
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("prehanlder-topology-local-mode", config, builder.createTopology());
try {
Thread.sleep(12000 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
localCluster.killTopology("local-prehanlder-topology-local-mode");
localCluster.shutdown();
}
}
public static TopologyBuilder builderTopology(KafkaSpout kafkaSpout) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(TopoStaticName.KafkaSpout, kafkaSpout, 10);
builder.setBolt(TopoStaticName.DATAFROMKAFKASPOUT, new GetDataFromKafkaSpoutBolt(), 10).shuffleGrouping(TopoStaticName.KafkaSpout);
//省略後面的bolt
return builder;
}
}
靜態的引數配置類
package com.lancy.common.pre;
/**
* @ClassName: TopoStaticName
* @Description: Topology靜態值
*/
public class TopoStaticName {
// 資料處理Topology的Id
public static final String KafkaSpout = "01.KafkaSpout";
public static final String DATAFROMKAFKASPOUT = "02.DataFromKafkaSpout";
}
後續瞭解如何初始化zookeeper節點資訊以及如何整合kafka,storm和zookeeper的,加油