storm trident讀取kafka中資料
1. 建立kafka spout
public TransactionalTridentKafkaSpout kafkaSpout(String topic) {
StormConfig stormConfig = StormConfig.getInstance();
BrokerHosts hosts = new ZkHosts(stormConfig.getZookeeper());
TridentKafkaConfig config = new TridentKafkaConfig(hosts, topic);
config.scheme = new SchemeAsMultiScheme(new StringScheme());
return new TransactionalTridentKafkaSpout(config);
}
說明:建立TridentKafkaConfig時,上例中傳遞的是zookeeper的地址。實際傳遞kafka broker地址也可以。KafkaUtils中相容了兩種配置,相關原始碼:
public static IBrokerReader makeBrokerReader(Map stormConf, KafkaConfig conf) {
if (conf.hosts instanceof StaticHosts) {
return new StaticBrokerReader(conf.topic, ((StaticHosts) conf.hosts).getPartitionInformation());
} else {
return new ZkBrokerReader(stormConf, conf.topic, (ZkHosts) conf.hosts);
}
}
2. 建立Trideng topology
TridentTopology topology = new TridentTopology();
Stream stream1 = topology.newStream("kafkaspout" , kafkaSpout(topic));
stream1.each(new Fields("str"), new APacketParser(), new Fields("a-value", "a-description")).each(new Fields("a-value", "a-description"), new ProcFunction(), new Fields("whatever"));
StormTopology stormTopology = topology.build();
StormSubmitter.submitTopology("trident", new Config(), stormTopology);
說明:從kafka中讀取資訊,submitTopology時不用給任何kafka相關的配置。
相關推薦
storm trident讀取kafka中資料
1. 建立kafka spout public TransactionalTridentKafkaSpout kafkaSpout(String topic) { StormConfig stormConfig = StormConfig.getIns
access vba 用recordset讀取表中資料的簡單方法
'strQuery是表名,查詢名等 Public Function Getrs(Byval strQuery as string) as ADODB.Recordset Dim objRs As New ADODB.Recordset on Error GoTo Er
使用storm trident消費kafka訊息
一、前言 storm通過保證資料至少被處理一次來保證資料的完整性,由於元祖可以重發,對於一些需要資料精確的場景,可以考慮用storm trident實現。 傳統的事物型拓撲中存在幾種bolt: 1.1 BasicBolt 這是最基本的Bolt,BasicBolt每次只能處理一個tuple,而且必
學習筆記-註解+反射讀取Bean中資料
我們經常有從資料來源(即javabean中拿資料)的需要,但不同的人對bean中內容的命名五花八門,但利用註解+反射可以寫出通用的提取資料的程式碼。 假設需求是:從一個bean中取出NodeId, NodePId, NodeName三個成員。 假如一個bean是這樣寫的:g
小例子:java利用poi讀取excel中資料並匯入資料庫
問題描述: 資料夾下有若干excel檔案,檔名為10.教育局.xls 11.衛生院.xls ................有很多;中間的漢字為單位名稱,需要匯入資料庫,每個單位名稱要有一個單位id匹配;每個excel中有若干個sheet頁的名字,每個名字即為科
spark 讀取elasticsearch中資料不完整問題
使用spark讀取elasticsearch中的資料,使用es提供的api來進行, sc.esRDD("logstash").values 官方網站也是這種方式讀取資料的,但是我測試的時候有時候會出現讀取資料不完整的情況,比如本來讀取的資料是這樣的 Map(msg ->
spark流式讀取hdfs中資料
名詞解釋: spark streaming: 定義:一個對實時資料進行高容通量、容錯處理的流式處理系統,可以對多種資料來源進行Map、reduce和join等複雜操作,並將結果儲存到外部檔案系統、
java利用poi讀取excel中資料
所需的jar包: 程式碼: /** * * @param cell * 一個單元格的物件 * @return 返回該單元格相應的型別的值 */ public static Object getRightTypeCell(Ce
Kafka系列(四)Kafka消費者:從Kafka中讀取資料
本系列文章為對《Kafka:The Definitive Guide》的學習整理,希望能夠幫助到大家應用從Kafka中讀取資料需要使用KafkaConsumer訂閱主題,然後接收這些主題的訊息。在我們深入這些API之前,先來看下幾個比較重要的概念。Kafka消費者相關的概念消
storm從kafka中讀資料
========================================== 定義從kafka中讀出的資料 import java.io.UnsupportedEncodingException; import java.util.List; import bac
c++中txt檔案的讀取以及在MFC中讀取txt座標資料並完成圖形繪製
主要介紹如何讀取txt檔案中的座標資料,並在MFC視窗中繪製出來,工程建立方法和繪圖方法與上一篇博文基本一致,這裡就不再詳贅述,可參考上一篇博文vs2010、MFC視窗中繪製點、線、面。 C++中讀取檔案的方法有兩種,一種是來自於C語言的“檔案指標”方法,另一種是C++中的“檔案流”思想。
python3讀取excel中的資料
import xlrd import os paths = [r'C:/Users/'] for path in paths: for filename in os.listdir(path): exname = filename.split('.')
讀取資料庫中日期為指定月份的資料
比如資料表table中列為date的資料格式為2017-12,那麼我要獲取每年12月份的資料,就要擷取月為12的資料。方法如下: SELECT * FROM TABLE WHERE SUBSTR(date,6,2)='12' SUBSTR(str,pos,len): 從po
springboot兩種讀取application中的資料的方法
pom.xml配置檔案 <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XML
POI實現Excel中資料的讀取
所需依賴包:poi-3.17.jar、poi-ooxml-3.17.jar、poi-ooxml-schemas-3.17.jar、xmlbeans-2.6.0.jar、commons-collections4-4.1.jar。 依賴包下載地址:http://mvnrepository.com/a
Unity讀取Text中的每一行資料存放到字典中
//宣告一個存放的字典 public Dictionary<string, string> DressUpNameAndIntroduce = new Dictionary<string, string>(); private void Awake()
SparkStreaming消費Kafka中的資料 使用zookeeper和MySQL儲存偏移量的兩種方式
Spark讀取Kafka資料的方式有兩種,一種是receiver方式,另一種是直連方式。今天分享的SparkStreaming消費Kafka中的資料儲存偏移量的兩種方式都是基於直連方式上的 話不多說 直接上程式碼 ! 第一種是使用zookeeper儲存偏移量 object Kafka
sparkStreaming讀取kafka資料的2種方式
方式一 Receiver 使用Kafka的高層次Consumer API來實現。receiver從Kafka中獲取的資料都儲存在Spark Executor的記憶體中,然後Spa
python操作txt檔案中資料教程[3]-python讀取資料夾中所有txt檔案並將資料轉為csv檔案
python操作txt檔案中資料教程[3]-python讀取資料夾中所有txt檔案並將資料轉為csv檔案 覺得有用的話,歡迎一起討論相互學習~Follow Me 參考文獻 python操作txt檔案中資料教程[1]-使用python讀寫txt檔案 python操作txt檔案中資料教程[2]-pyth
flume將kafka中topic資料匯入hive中
一、首先更加資料的表結構在hive中進行表的建立。 create table AREA1(unid string,area_punid string,area_no string,area_name s