大資料求索(9): log4j + flume + kafka + spark streaming實時日誌流處理實戰
大資料求索(9): log4j + flume + kafka + spark streaming實時日誌流處理實戰
一、實時流處理
1.1 實時計算
跟實時系統類似(能在嚴格的時間限制內響應請求的系統),例如在股票交易中,市場資料瞬息萬變,決策通常需要秒級甚至毫秒級。通俗來說,就是一個任務需要在非常短的單位時間內計算出來,這個計算通常是多次的。
1.2 流式計算
通常指源源不斷的資料流過系統,系統能夠不停地連續計算。這裡對時間上可能沒什麼特別限制,資料流入系統到產生結果,可能經過很長時間。比如系統中的日誌資料、電商中的每日使用者訪問瀏覽資料等。
1.3 實時流式計算
將實時計算和流式資料結合起來,就是實時流式計算,也就是大資料中通常說的實時流處理。資料來源源不斷的產生的同時,計算時間上也有了嚴格的限制。比如,目前電商中的商品推薦,往往在你點了某個商品之後,推薦的商品都是變化的,也就是實時的計算出來推薦給你的。再比如你的手機號,在你話費或者流量快用完時,實時的給你推薦流量包套餐等。
二、實時流處理實戰
此例子借鑑慕課網實戰視訊Spark Streaming實時流處理專案實戰,感興趣的可以學習一下。
2.1 源源不斷的資料
此處使用log4j模擬源源不斷產生的日誌資料,啟動一個程序,不停地列印資料即可。簡單配置如下:
log4j.rootLogger= INFO,stdout,flume
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target = System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n
然後寫一個簡單的日誌列印小程式即可,程式碼如下:
public class LoggerGenerator {
private static Logger logger = Logger.getLogger(LoggerGenerator.class.getName());
public static void main(String[] args) throws InterruptedException {
int index = 0;
while (true) {
Thread.sleep(1000);
logger.info("value : " + index++);
}
}
}
2.2 實時採集資料
可以採用Flume實時採集日誌資料,為了和log4j結合,log4j配置檔案需要如下配置
log4j.rootLogger=INFO,stdout,flume
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target = System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n
log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname = wds
log4j.appender.flume.Port = 41414
log4j.appender.flume.UnsafeMode = true
同時,flume配置如下,
agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=logger-sink
# define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=0.0.0.0
agent1.sources.avro-source.port=41414
# define channel
agent1.channels.logger-channel.type=memory
#define sink
agent1.sinks.logger-sink.type = logger
agent1.sources.avro-source.channels=logger-channel
agent1.sinks.logger-sink.channel=logger-channel
這裡暫時採用logger sink, 目的是為了測試資料能否採集到。做專案的過程中,不要想著一步到位,最好做一步測試一步,方便定位錯誤,防止錯誤累積。
啟動flume
flume-ng agent \
--name agent1 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/streaming.conf \
-Dflume.root.logger=INFO,console
注意,這裡可能會報錯,java.lang.ClassNotFoundException: org.apache.flume.clients.log4jappender.Log4jAppender。需要引入jar包來解決此問題
<dependency>
<groupId>org.apache.flume.flume-ng-clients</groupId>
<artifactId>flume-ng-log4jappender</artifactId>
<version>1.6.0</version>
</dependency>
這時候啟動日誌列印檔案,可以看到flume已經開始採集資料了。
2018-12-07 21:39:03,204 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{flume.client.log4j.timestamp=1544236744447, flume.client.log4j.logger.name=LoggerGenerator, flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8} body: 76 61 6C 75 65 20 3A 20 30 value : 0 }
2018-12-07 21:39:03,609 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{flume.client.log4j.timestamp=1544236745545, flume.client.log4j.logger.name=LoggerGenerator, flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8} body: 76 61 6C 75 65 20 3A 20 31 value : 1 }
2018-12-07 21:39:04,611 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{flume.client.log4j.timestamp=1544236746548, flume.client.log4j.logger.name=LoggerGenerator, flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8} body: 76 61 6C 75 65 20 3A 20 32 value : 2 }
2018-12-07 21:39:05,614 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{flume.client.log4j.timestamp=1544236747550, flume.client.log4j.logger.name=LoggerGenerator, flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8} body: 76 61 6C 75 65 20 3A 20 33 value : 3 }
2018-12-07 21:39:06,617 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{flume.client.log4j.timestamp=1544236748554, flume.client.log4j.logger.name=LoggerGenerator, flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8} body: 76 61 6C 75 65 20 3A 20 34 value : 4 }
2018-12-07 21:39:07,620 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{flume.client.log4j.timestamp=1544236749556, flume.client.log4j.logger.name=LoggerGenerator, flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8} body: 76 61 6C 75 65 20 3A 20 35 value : 5 }
2018-12-07 21:39:08,626 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{flume.client.log4j.timestamp=1544236750560, flume.client.log4j.logger.name=LoggerGenerator, flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8} body: 76 61 6C 75 65 20 3A 20 36 value : 6 }
2018-12-07 21:39:09,632 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{flume.client.log4j.timestamp=1544236751568, flume.client.log4j.logger.name=LoggerGenerator, flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8} body: 76 61 6C 75 65 20 3A 20 37 value : 7 }
2018-12-07 21:39:10,636 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{flume.client.log4j.timestamp=1544236752572, flume.client.log4j.logger.name=LoggerGenerator, flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8} body: 76 61 6C 75 65 20 3A 20 38 value : 8 }
2018-12-07 21:39:11,638 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{flume.client.log4j.timestamp=1544236753575, flume.client.log4j.logger.name=LoggerGenerator, flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8} body: 76 61 6C 75 65 20 3A 20 39 value : 9 }
2018-12-07 21:39:12,640 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{flume.client.log4j.timestamp=1544236754577, flume.client.log4j.logger.name=LoggerGenerator, flume.client.log4j.log.level=20000, flume.client.log4j.message.encoding=UTF8} body: 76 61 6C 75 65 20 3A 20 31 30 value : 10 }
2.3 訊息佇列緩衝資料
這裡flume採集到的資料將存到kafka中,作為一個緩衝,也即生產者。此時,需要將flume和kafka打通,flume配置修改如下:
agent1.sources=avro-source
agent1.channels=logger-channel
agent1.sinks=kafka-sink
# define source
agent1.sources.avro-source.type=avro
agent1.sources.avro-source.bind=0.0.0.0
agent1.sources.avro-source.port=41414
# define channel
agent1.channels.logger-channel.type=memory
#define sink
agent1.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kafka-sink.brokerList = wds:9092
agent1.sinks.kafka-sink.topic = streamingtopic
agent1.sinks.kafka-sink.batchSize = 20
agent1.sinks.kafka-sink.requiredAcks = 1
agent1.sources.avro-source.channels=logger-channel
agent1.sinks.kafka-sink.channel=logger-channel
kafka依賴zookeeper,需要先啟動zookeeper
$ZK_HOME/bin/zkServer.sh start
啟動kafka,需要指定配置檔案
bin/kafka-server-start.sh config/server.properties
建立topic
kafka-topics.sh --create --zookeeper wds:2181 --replication-factor 1 --partitions 1 --topic streamingtopic
啟動消費程序,測試消費資訊測試
kafka-console-consumer.sh --zookeeper wds:2181 --topic streamingtopic
可以看到每隔20個(由flume配置檔案裡的agent1.sinks.kafka-sink.batchSize = 20決定的)有消費資訊輸出,證明連線成功
[[email protected] ~]$ kafka-console-consumer.sh --zookeeper wds:2181 --topic streamingtopic
value : 0
value : 1
value : 2
value : 3
value : 4
value : 5
value : 6
value : 7
value : 8
value : 9
value : 10
value : 11
value : 12
value : 13
value : 14
value : 15
value : 16
value : 17
value : 18
value : 19
2.4 實時處理資料
使用Spark Streaming從Kafka消費訊息,這裡採用Receiver模式,可以參照下面簡單的程式碼
object KafkaReceiverWordCount {
def main(args: Array[String]): Unit = {
if (args.length != 4) {
System.err.println("Usage: KafkaReceiverWordCount <ZkQuorum> <group> <topics> <numThreads>")
System.exit(1)
}
val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaReceiverWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
// Kafka對接Spark Streaming
val messages = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)
// _1引數沒有用
messages.map(_._2).count()
ssc.start()
ssc.awaitTermination()
}
}
注意,日誌和Streaming日誌接收都是在本地的,那麼生產環境如何做呢?
1) 打包jar,指向LoggerGenerator類
2) Flume和Kafka一樣
3) Spark Streaming也需要打成jar包,然後提交到叢集群執行,Spark-submit方式提交執行,模式為local/yarn/standalone/mesos
jar包使用maven的mvn assembly:assembly -Dmaven.test.skip=true方式打包,把kafka相關jar包也打進去,不需要的使用provided
打包的時候需要將LoggerGenerator從test中移出來放到java的某個包下,方便執行
打包成功後,在服務上執行
java -cp spark-test-1.0-jar-with-dependencies.jar com.wds.streaming.LoggerGenerator
其中-cp命令是將xxx.jar加入到classpath,這樣java class loader就會在這裡面查詢匹配的類,這樣打包的時候不用指定main class,非常方便。然後提交jar包到spark-submit
spark-submit \
--class com.wds.streaming.KafkaStreamingApp \
--master local[2] \
--name KafkaStreamingApp \
--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.2 \
/home/hadoop/hadoop/lib/spark-test-1.0-jar-with-dependencies.jar wds:2181 test streamingtopic 2
日誌輸出如下
[[email protected] lib]$ java -cp spark-test-1.0-jar-with-dependencies.jar com.wds.streaming.LoggerGenerator
log4j:ERROR Could not find value for key log4j.appender.flume.layout
2018-12-07 22:32:27,447 [main] [org.apache.flume.api.NettyAvroRpcClient] [WARN] - Using default maxIOWorkers
2018-12-07 22:32:28,681 [main] [com.wds.streaming.LoggerGenerator] [INFO] - value : 0
2018-12-07 22:32:29,751 [main] [com.wds.streaming.LoggerGenerator] [INFO] - value : 1
2018-12-07 22:32:30,753 [main] [com.wds.streaming.LoggerGenerator] [INFO] - value : 2
2018-12-07 22:32:31,755 [main] [com.wds.streaming.LoggerGenerator] [INFO] - value : 3
2018-12-07 22:32:32,758 [main] [com.wds.streaming.LoggerGenerator] [INFO] - value : 4
2018-12-07 22:32:33,760 [main] [com.wds.streaming.LoggerGenerator] [INFO] - value : 5
2018-12-07 22:32:34,762 [main] [com.wds.streaming.LoggerGenerator] [INFO] - value : 6
2018-12-07 22:32:35,764 [main] [com.wds.streaming.LoggerGenerator] [INFO] - value : 7
2018-12-07 22:32:36,765 [main] [com.wds.streaming.LoggerGenerator] [INFO] - value : 8
2018-12-07 22:32:37,767 [main] [com.wds.streaming.LoggerGenerator] [INFO] - value : 9
2018-12-07 22:32:38,770 [main] [com.wds.streaming.LoggerGenerator] [INFO] - value : 10
2018-12-07 22:32:39,772 [main] [com.wds.streaming.LoggerGenerator] [INFO] - value : 11
2018-12-07 22:32:40,775 [main] [com.wds.streaming.LoggerGenerator] [INFO] - value : 12
2018-12-07 22:32:41,777 [main] [com.wds.streaming.LoggerGenerator] [INFO] - value : 13
2018-12-07 22:32:42,779 [main] [com.wds.streaming.LoggerGenerator] [INFO] - value : 14
2018-12-07 22:32:43,782 [main] [com.wds.streaming.LoggerGenerator] [INFO] - value : 15
2018-12-07 22:32:44,784 [main] [com.wds.streaming.LoggerGenerator] [INFO] - value : 16
Spark Streaming輸出如下:
18/12/07 22:32:50 INFO executor.Executor: Running task 0.0 in stage 12.0 (TID 10)
18/12/07 22:32:50 INFO storage.ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 2 blocks
18/12/07 22:32:50 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms
18/12/07 22:32:50 INFO executor.Executor: Finished task 0.0 in stage 12.0 (TID 10). 1705 bytes result sent to driver
18/12/07 22:32:50 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 12.0 (TID 10) in 6 ms on localhost (executor driver) (1/1)
18/12/07 22:32:50 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 12.0, whose tasks have all completed, from pool
18/12/07 22:32:50 INFO scheduler.DAGScheduler: ResultStage 12 (print at KafkaStreamingApp.scala:23) finished in 0.006 s
18/12/07 22:32:50 INFO scheduler.DAGScheduler: Job 6 finished: print at KafkaStreamingApp.scala:23, took 0.014111 s
-------------------------------------------
Time: 1544239970000 ms
-------------------------------------------
20
至此,一切打通,測試成功。
三、參考
- Spark Streaming實時流處理專案實戰
- flume、kafka、spark官方文件