1. 程式人生 > >Storm-kafka整合——1.1.0版本storm中tuple取KafkaSpout資料詳解

Storm-kafka整合——1.1.0版本storm中tuple取KafkaSpout資料詳解

問題描述:

KafkaSpout拉取kafka topic資料,下一級bolt從kafkaspout獲取資料,tuple到底採用什麼方法取出spout中的訊息呢?

KafkaSpout建立:

/*
*根據資料來源topic和zk_id建立並返回kafkaSpout
* */
public static KafkaSpout init(String spout_topic,String zk_id){
    KafkaSpoutConfig<String,String> kafkaSpoutConfig = KafkaSpoutConfig
            .builder(bootstrap_address
,spout_topic) .setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true") .setProp(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000) .setProp(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,30000) .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class
.getCanonicalName()) .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getCanonicalName()) .setOffsetCommitPeriodMs(10000)//控制spout多久向kafka提交一次offset .setGroupId(zk_id) .setMaxUncommittedOffsets(250) .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST
) .build(); KafkaSpout<String, String> kafkaSpout = new KafkaSpout<>(kafkaSpoutConfig); return kafkaSpout; }

bolt邏輯處理:

public void execute(Tuple tuple, BasicOutputCollector collector) {
    this.collector = collector;
System.out.println("MC開始讀取資料");
String message = tuple.getStringByField("value");
//038,2018-03-21 14:51:47,17134906630,2018-03-21 14:53:03,V0330700,E02,036,null,E020005,01,V0350100
String strs[] = message.split(",", -1);
String userProv = strs[0];//歸屬省份編碼
String occurtime = strs[1];//最新信令產生時間
String deviceNumber = strs[2];//電話號碼
String eventArea = strs[4]; //信令發生地市
String eventProv = strs[6]; //信令發生省份
String userArea = strs[10];//歸屬地市編碼
    //過濾漫入使用者,只要漫出使用者
if(userProv.equals("038") && !eventProv.equals("038")){
        String provDescGuishu = null;
String areaDescGuishu = null;
String provDescOccur = null;
String areaDescOccur = null;
String typeGuishu = "省內";
String community = null;
String mrmcType = "漫出";
String roamType = "國內";
String longitude = null;
String latitude = null;
String areaInfoGuishu = JedisUtil.get("zbarea|" + userArea);//使用者歸屬地市新資訊
if(areaInfoGuishu != null && !areaInfoGuishu.equals("")){
            String strs1[] = areaInfoGuishu.split(":");
provDescGuishu = strs1[0];
areaDescGuishu = strs1[1];
}

        String areaInfo_occur = JedisUtil.get("zbarea|" + eventArea);
        if(areaInfo_occur != null && !areaInfo_occur.equals("")){
            String strs2[] = areaInfo_occur.split(":");
provDescOccur = strs2[0];
areaDescOccur = strs2[1];
}

        //格式化目標資料:
        //電話|信令發生時間|使用者歸屬省份描述|使用者歸屬地市描述|使用者歸屬型別|使用者到訪省份描述|使用者到訪地市描述|使用者到訪小區描述|漫入漫出型別描述|漫遊型別|經度|緯度
String mrmc2merge = deviceNumber + "#" + occurtime + "#" + provDescGuishu + "#" + areaDescGuishu + "#" + typeGuishu + "#" + provDescOccur + "#" + areaDescOccur + "#" + community + "#" + mrmcType + "#" + roamType + "#" + longitude + "#" + latitude;
System.out.println("MC資料" + mrmc2merge);
collector.emit(new Values(mrmc2merge));
}else{
        System.out.println(deviceNumber + "******被過濾漫入使用者");
}
}

topo結構構造

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("s1mme_spout",init(s1mme_spout_topic,zk_s1mme_id),12);
builder.setBolt("s1mme_split",new S1mmeSplitBolt(),36).shuffleGrouping("s1mme_spout");
builder.setSpout("sgs_spout",init(sgs_spout_topic,zk_sgs_id),3);
builder.setBolt("sgs_split",new SgsSpiltBolt(),6).shuffleGrouping("sgs_spout");
builder.setSpout("mrmc_spout",init(mrmc_spout_topic,zk_mrmc_id),1);
builder.setBolt("mrmc_split",new MrmcSplitBolt(),3).shuffleGrouping("mrmc_spout");

首先我們來看tuple具備的屬性:

tuple的幾個主要方法:

tuple.getFields();//獲取kafkaSpout的topic相關屬性。tuple的屬性列表:topic, partition, offset, key, value

tuple.getValues();//獲取所有屬性的值

tuple.getValue();//根據屬性列表下標獲取值

tuple.toString();//獲取tuple的所有資訊,包括資料來源,訊息id,資料value等資訊

tuple.getMessageId();//獲取訊息id

tuple.getSourceComponent();//獲取資料來源

tuple.getSourceStreamId();//獲取本條訊息的id

tuple.getSourceTask();


上述程式碼的輸出結果對應下圖:


總結:我們在取值的時候主要是根據tuple的屬性列表以及對應下標索引進行取值,特別是使用getString,getLong,這類的方法時,1.X版本與0.X版本有所不同,在1.X版本必須採取上述取值方法