flume+kafka+storm整合00
一、安裝
flume,kafka, storm 的安裝在下面三篇文章:
flume:1.6.0
kafka:注意這裡最好下載scala2.10版本的kafka,因為scala2.10版本的相容性比較好和2.11版本差別太大
二、各個部分除錯
2.1、flume
# 監聽ftp日誌
#agent的名字a1
a1.sources = src
a1.channels = chl
a1.sinks = sk
#定義source, source使用spooldir, 監控ftp日誌
a1.sources.src.type=spooldir
#監控目錄
a1.sources.src .spoolDir=/hadoop/ftp/idc_bakupload/20180307/23/idc/
#忽略的檔案
a1.sources.src.ignorePattern = ^(.)*\\.AVL\\.(.)*$
#處理後的檔案,新增字尾
a1.sources.src.fileSuffix = .bak
#定義channel, 使用memory 作為channel
a1.channels.chl.type = memory
a1.channels.chl.capacity = 100000
a1.channels.chl.transactionCapacity = 10000
#定義sink, 輸出到控制檯
a1.sinks .sk.type = com.chb.test.sink.MyKafkaSink
#定義sink, source 與channel的關係
#注意sink後面是channel, 而不是s
a1.sinks.sk.channel = chl
a1.sources.src.channels = chl
2.2、kafka
2.2.1、kafka自身測試, 起一個生產者,一個消費者
2.2.2、啟動消費這去消費flumesink的資料
三、 Storm獲取資料流程
3.1、首先來了解Strom-kafka
Strom-kafka的官網介紹專案
注意:可能使用瀏覽器的問題, 導致在IE上只能看到部分,換成其他瀏覽器就好了。
介紹Storm核心Spout 和Trident spout的實現,使用者消費從 Apache Kafka 0.8.x獲取的資料。
3.1.1、Spouts
We support both Trident and core Storm spouts.為了兩種Spout實現,Strom使用一個BrokerHost interface 跟蹤Kafka broker主機到分割槽對映和kafkaConfig控制一些Kafka相關的引數。
3.1.2 BrokerHosts
為了初始化您的Kafka spout/emitter,您需要建立一個標記BrokerHosts介面的例項。 目前,支援以下兩種實現:
ZkHosts
如果你想動態跟蹤Kafka broker到分割槽對映(partition mapping), 你應該使用ZkHosts。 這個類使用Kafka的ZooKeeper entries 來跟蹤brokerHost - >分割槽對映。 您可以通過呼叫下面方法例項化物件:
public ZkHosts(String brokerZkStr, String brokerZkPath)
public ZkHosts(String brokerZkStr)
其中:
- brokerZkStr**只為**ip:post(eg. localhost:2181),
- brokerZkPath: the root directory under which all the topics and partition information is stored, 預設為 /brokers 。
- 預設情況下,代理分割槽對映(borker-partition mapping)每60秒從ZooKeeper重新整理。 如果要更改它,您應該將
host.refreshFrezqSecs
設定為您選擇的值。
實現如:
ZkHosts zkHosts = new ZkHosts("192.168.57.4:2181,192.168.57.5:2181,192.168.57.6:2181");
StaticHosts
這是一個可選的實現,其中broker - >分割槽資訊是靜態的。 為了構造這個類的例項,您需要首先構造一個GlobalPartitionInformation的例項。
Broker brokerForPartition0 = new Broker("localhost");//localhost:9092
Broker brokerForPartition1 = new Broker("localhost", 9092);//localhost:9092 but we specified the port explicitly
Broker brokerForPartition2 = new Broker("localhost:9092");//localhost:9092 specified as one string.
GlobalPartitionInformation partitionInfo = new GlobalPartitionInformation();
partitionInfo.addPartition(0, brokerForPartition0);//mapping from partition 0 to brokerForPartition0
partitionInfo.addPartition(1, brokerForPartition1);//mapping from partition 1 to brokerForPartition1
partitionInfo.addPartition(2, brokerForPartition2);//mapping from partition 2 to brokerForPartition2
StaticHosts hosts = new StaticHosts(partitionInfo);
KafkaConfig
為建立KafkaSpout所需的第二個物件是KafkaConfig
建立KafkaConfig
public KafkaConfig(BrokerHosts hosts, String topic)
public KafkaConfig(BrokerHosts hosts, String topic, String clientId)
BrokerHosts可以是如上所述的BrokerHosts介面的任何實現。 topic是Kafka topic的名稱。 ClientId是可選的, 用作ZooKeeper路徑的一部分,其中儲存了spout的當前消耗偏移量。
目前有2個KafkaConfig的擴充套件。
Spoutconfig
Spoutconfig是KafkaConfig的擴充套件,它支援使用ZooKeeper連線資訊的其他欄位,並用於控制特定於KafkaSpout的行為。 Zkroot將用作root來儲存消費的偏移量。 ID應該唯一標識您的spout。
public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id);
實現:
SpoutConfig spoutConfig
= new SpoutConfig(
zkHosts,
topic,
"/test", // 偏移量offset的根目錄
"test");// ID應該唯一標識您的spout
除了這些引數,SpoutConfig包含以下欄位控制KafkaSpout的行為:
spoutConfig.forceFromStart = false; // 不從頭開始消費,保證spout出現故障, 重啟之後,能夠從kafka的原來位置處理, 而不是從開始位置處理,kafka的偏移量,週期性的寫入zookeeper中,
// setting for how often to save the current Kafka offset to ZooKeeper
public long stateUpdateIntervalMs = 2000;
// Retry strategy for failed messages
public String failedMsgRetryManagerClass = ExponentialBackoffMsgRetryManager.class.getName();
// Exponential back-off retry settings. These are used by ExponentialBackoffMsgRetryManager for retrying messages after a bolt
// calls OutputCollector.fail(). These come into effect only if ExponentialBackoffMsgRetryManager is being used.
// Initial delay between successive retries
public long retryInitialDelayMs = 0;
public double retryDelayMultiplier = 1.0;
// Maximum delay between successive retries
public long retryDelayMaxMs = 60 * 1000;
// Failed message will be retried infinitely if retryLimit is less than zero.
public int retryLimit = -1;
Core KafkaSpout only accepts an instance of SpoutConfig.
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
TridentKafkaConfig
TridentKafkaConfig is another extension of KafkaConfig. TridentKafkaEmitter only accepts TridentKafkaConfig.
KafkaConfig類還有一堆公共變數,用於控制應用程式的行為。 這裡是預設值:
public int fetchSizeBytes = 1024 * 1024;
public int socketTimeoutMs = 10000;
public int fetchMaxWait = 10000;
public int bufferSizeBytes = 1024 * 1024;
public MultiScheme scheme = new RawMultiScheme();
public boolean ignoreZkOffsets = false;
public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
public long maxOffsetBehind = Long.MAX_VALUE;
public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
public int metricsTimeBucketSizeInSecs = 60;
Most of them are self explanatory except MultiScheme.
MultiScheme
MultiScheme是一個介面,指示如何將從Kafka中消耗的ByteBuffer轉換為成Storm中的tuple。 它還控制輸出欄位的命名。
public Iterable<List<Object>> deserialize(ByteBuffer ser);
public Fields getOutputFields();
預設的RawMultiScheme只接受ByteBuffer,並返回一個帶有ByteBuffer的tuple,ByteBuffer轉換為byte []
。 outputField的名稱為“bytes”。 還有一些可選實現,如SchemeAsMultiScheme和KeyValueSchemeAsMultiScheme,它們可以將ByteBuffer轉換為String。
//從Kafka中取出的byte[],該如何反序列化
如在整合專案中實現:使用SchemeAsMultiScheme
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); // 定義輸出為String
還有SchemeAsMultiScheme,MessageMetadataSchemeAsMultiScheme的擴充套件,它具有一個附加的反序列化方法,除了與訊息關聯的分割槽和偏移之外,還接受訊息ByteBuffer。
public Iterable<List<Object>> deserializeMessageWithMetadata(ByteBuffer message, Partition partition, long offset)
這對於從Kafka topic上的任意處 auditing/replaying 訊息非常有用,可以儲存離散流的每個訊息的分割槽和偏移量,而不是保留整個訊息。
四、KafakSpout的具體實現
TopologyBuilder builder = new TopologyBuilder();
// config kafka spout
String topic = "testflume";
//第一步建立Zkhosts
ZkHosts zkHosts = new ZkHosts("192.168.57.4:2181,192.168.57.5:2181,192.168.57.6:2181");
//第二步建立SpoutConfig, 為了設定各種引數
SpoutConfig spoutConfig = new SpoutConfig(zkHosts,
topic, //kafka的topic名稱
"/test", // 偏移量offset的根目錄
"test"); // kafka的唯一表示。
//設定zkserver的資訊, 可選的, 應為在上面的ZkHosts中已經設定了zookeeper的主機和埠號。
List<String> zkServers = new ArrayList<String>();
System.out.println(zkHosts.brokerZkStr);
for (String host : zkHosts.brokerZkStr.split(",")) {
zkServers.add(host.split(":")[0]);
}
spoutConfig.zkServers = zkServers;
spoutConfig.zkPort = 2181;
//設定kafka的消費模式, 是否從頭開始。
spoutConfig.forceFromStart = false; // 不從頭開始消費
spoutConfig.socketTimeoutMs = 60 * 1000; //與Kafka broker的連線的socket超時時間
//從Kafka中取出的byte[],該如何反序列化
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); // 定義輸出為String
//KafkaSpout之接收一個引數SpoutConfig.
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
// set kafka spout
builder.setSpout("kafka_spout", kafkaSpout, 3);