Storm-kafka整合——1.1.0版本storm中tuple取KafkaSpout資料詳解
阿新 • • 發佈:2019-01-01
問題描述:
KafkaSpout拉取kafka topic資料,下一級bolt從kafkaspout獲取資料,tuple到底採用什麼方法取出spout中的訊息呢?
KafkaSpout建立:
/* *根據資料來源topic和zk_id建立並返回kafkaSpout * */ public static KafkaSpout init(String spout_topic,String zk_id){ KafkaSpoutConfig<String,String> kafkaSpoutConfig = KafkaSpoutConfig .builder(bootstrap_address,spout_topic) .setProp(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true") .setProp(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000) .setProp(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,30000) .setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getCanonicalName()) .setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getCanonicalName()) .setOffsetCommitPeriodMs(10000)//控制spout多久向kafka提交一次offset .setGroupId(zk_id) .setMaxUncommittedOffsets(250) .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST) .build(); KafkaSpout<String, String> kafkaSpout = new KafkaSpout<>(kafkaSpoutConfig); return kafkaSpout; }
bolt邏輯處理:
public void execute(Tuple tuple, BasicOutputCollector collector) { this.collector = collector; System.out.println("MC開始讀取資料"); String message = tuple.getStringByField("value"); //038,2018-03-21 14:51:47,17134906630,2018-03-21 14:53:03,V0330700,E02,036,null,E020005,01,V0350100 String strs[] = message.split(",", -1); String userProv = strs[0];//歸屬省份編碼 String occurtime = strs[1];//最新信令產生時間 String deviceNumber = strs[2];//電話號碼 String eventArea = strs[4]; //信令發生地市 String eventProv = strs[6]; //信令發生省份 String userArea = strs[10];//歸屬地市編碼 //過濾漫入使用者,只要漫出使用者 if(userProv.equals("038") && !eventProv.equals("038")){ String provDescGuishu = null; String areaDescGuishu = null; String provDescOccur = null; String areaDescOccur = null; String typeGuishu = "省內"; String community = null; String mrmcType = "漫出"; String roamType = "國內"; String longitude = null; String latitude = null; String areaInfoGuishu = JedisUtil.get("zbarea|" + userArea);//使用者歸屬地市新資訊 if(areaInfoGuishu != null && !areaInfoGuishu.equals("")){ String strs1[] = areaInfoGuishu.split(":"); provDescGuishu = strs1[0]; areaDescGuishu = strs1[1]; } String areaInfo_occur = JedisUtil.get("zbarea|" + eventArea); if(areaInfo_occur != null && !areaInfo_occur.equals("")){ String strs2[] = areaInfo_occur.split(":"); provDescOccur = strs2[0]; areaDescOccur = strs2[1]; } //格式化目標資料: //電話|信令發生時間|使用者歸屬省份描述|使用者歸屬地市描述|使用者歸屬型別|使用者到訪省份描述|使用者到訪地市描述|使用者到訪小區描述|漫入漫出型別描述|漫遊型別|經度|緯度 String mrmc2merge = deviceNumber + "#" + occurtime + "#" + provDescGuishu + "#" + areaDescGuishu + "#" + typeGuishu + "#" + provDescOccur + "#" + areaDescOccur + "#" + community + "#" + mrmcType + "#" + roamType + "#" + longitude + "#" + latitude; System.out.println("MC資料" + mrmc2merge); collector.emit(new Values(mrmc2merge)); }else{ System.out.println(deviceNumber + "******被過濾漫入使用者"); } }
topo結構構造
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("s1mme_spout",init(s1mme_spout_topic,zk_s1mme_id),12); builder.setBolt("s1mme_split",new S1mmeSplitBolt(),36).shuffleGrouping("s1mme_spout"); builder.setSpout("sgs_spout",init(sgs_spout_topic,zk_sgs_id),3); builder.setBolt("sgs_split",new SgsSpiltBolt(),6).shuffleGrouping("sgs_spout"); builder.setSpout("mrmc_spout",init(mrmc_spout_topic,zk_mrmc_id),1); builder.setBolt("mrmc_split",new MrmcSplitBolt(),3).shuffleGrouping("mrmc_spout");
首先我們來看tuple具備的屬性:
tuple的幾個主要方法:
tuple.getFields();//獲取kafkaSpout的topic相關屬性。tuple的屬性列表:topic, partition, offset, key, value
tuple.getValues();//獲取所有屬性的值
tuple.getValue();//根據屬性列表下標獲取值
tuple.toString();//獲取tuple的所有資訊,包括資料來源,訊息id,資料value等資訊
tuple.getMessageId();//獲取訊息id
tuple.getSourceComponent();//獲取資料來源
tuple.getSourceStreamId();//獲取本條訊息的id
tuple.getSourceTask();
上述程式碼的輸出結果對應下圖:
總結:我們在取值的時候主要是根據tuple的屬性列表以及對應下標索引進行取值,特別是使用getString,getLong,這類的方法時,1.X版本與0.X版本有所不同,在1.X版本必須採取上述取值方法