Flume 、Kafka 與SparkStreaming 整合程式設計
阿新 • • 發佈:2019-01-01
Flume 、Kafka 與SparkStreaming 整合程式設計
一、 Kafka 與SparkStreaming 整合程式設計
1、程式
pull方式,可靠Recerver ,工作常用
com.imooc.spark . FlumePullWordCount .scala
push方式
com.imooc.spark . FlumePushWordCount .scala
pom.xml檔案
2、部署
1)、kafka部署
2)、提交作業(非聯網環境,不用packages ,而是用jars)
/www/lib/sparktrain-1.0.jar \ hadoop000 414 二、 Kafka 與SparkStreaming 整合程式設計
1、程式
com.imooc.spark.KafkaDirectWordCount.scala
pom.xml檔案
一、 Kafka 與SparkStreaming 整合程式設計
1、程式
pull方式,可靠Recerver ,工作常用
com.imooc.spark . FlumePullWordCount .scala
package com.imooc.spark import /** * 可靠Recerver ,工作常用 */ object val flumeStreame=FlumeUtils. createPollingStream (ssc,hostname, port.toInt) flumeStreame.map(x=> new String(x.event.getBody.array()).trim).flatMap(_.split( " " )).map((_, 1 )).reduceByKey(_+_).print() ssc.start() ssc.awaitTermination() } } |
push方式
com.imooc.spark . FlumePushWordCount .scala
package
com.imooc.spark
import org.apache.spark.SparkConf import org.apache.spark.streaming.flume.FlumeUtils import org.apache.spark.streaming.{Seconds, StreamingContext} object FlumePushWordCount { def main(args: Array[ String ]): Unit = { if (args.length != 2 ) { System.err.println("Usage: FlumePushWordCount <hostname> <port>") System.exit(1) } val Array (hostname, port) = args val sparkConf = new SparkConf() //.setMaster("local[2]").setAppName("FlumePushWordCount") val ssc = new StreamingContext(sparkConf, Seconds ( 5 )) val flumeStreame=FlumeUtils. createStream (ssc,hostname, port.toInt) flumeStreame.map(x=> new String(x. event .getBody.array()).trim).flatMap(_.split( " " )).map((_, 1 )).reduceByKey(_+_).print() ssc.start() ssc.awaitTermination() } } |
pom.xml檔案
<project xmlns="
http://maven.apache.org/POM/4.0.0
" xmlns:xsi="
http://www.w3.org/2001/XMLSchema-instance
"
xsi:schemaLocation="
http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd
">
<modelVersion>4.0.0</modelVersion>
<groupId>com.imooc.spark</groupId>
<artifactId>sparktrain</artifactId>
<version>1.0</version>
<inceptionYear>2008</inceptionYear>
<properties>
<scala.version>2.11.8</scala.version>
<kafka.version>0.9.0.0</kafka.version>
<spark.version>2.2.0</spark.version>
<hadoop.version>2.6.0-cdh5.7.0</hadoop.version>
<hbase.version>1.2.0-cdh5.7.0</hbase.version>
</properties>
<!--新增cloudera的repository--> <repositories> <repository> <id>cloudera</id> <url> https://repository.cloudera.com/artifactory/cloudera-repos </url> </repository> </repositories> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <!-- Kafka 依賴--> <!-- <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>${kafka.version}</version> </dependency> --> <!-- Hadoop 依賴--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <!-- HBase 依賴--> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>${hbase.version}</version> </dependency> <!-- Spark Streaming 依賴--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>${spark.version}</version> </dependency> <!-- Spark Streaming整合Flume 依賴--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-flume_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-flume-sink_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-8_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId> org.apache.commons </groupId> <artifactId>commons-lang3</artifactId> <version>3.5</version> </dependency> <!-- Spark SQL 依賴--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.module</groupId> <artifactId>jackson-module-scala_2.11</artifactId> <version>2.6.5</version> </dependency> <dependency> <groupId>net.jpountz.lz4</groupId> <artifactId>lz4</artifactId> <version>1.3.0</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.38</version> </dependency> <dependency> <groupId>org.apache.flume.flume-ng-clients</groupId> <artifactId>flume-ng-log4jappender</artifactId> <version>1.6.0</version> </dependency> </dependencies> <build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> <configuration> <scalaVersion>${scala.version}</scalaVersion> <args> <arg>-target:jvm-1.5</arg> </args> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-eclipse-plugin</artifactId> <configuration> <downloadSources>true</downloadSources> <buildcommands> <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand> </buildcommands> <additionalProjectnatures> <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature> </additionalProjectnatures> <classpathContainers> <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer> </classpathContainers> </configuration> </plugin> </plugins> </build> <reporting> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <configuration> <scalaVersion>${scala.version}</scalaVersion> </configuration> </plugin> </plugins> </reporting> </project> |
2、部署
1)、kafka部署
啟動kafka : kafka-server-start .sh $KAFKA_HOME/config /server .properties 建立topic : kafka-topics .sh --create --zookeeper hadoop000 :2181 --replication-factor 1 --partitions 1 --topic test 生產者 : kafka-console-producer.sh --broker-list hadoop000 :9092 --topic test |
spark-submit \ --class com.imooc.spark. KafkaDirectWordCount \ --master local[2] \ --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 \ # --jars spark-streaming-kafka-0-8-assembly had oop000 :9092 test |
/www/lib/sparktrain-1.0.jar \ hadoop000 414 二、 Kafka 與SparkStreaming 整合程式設計
1、程式
com.imooc.spark.KafkaDirectWordCount.scala
package
com.imooc.spark
import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext} //Spark Streaming對接Kafka的方式二 object KafkaDirectWordCount { def main(args: Array[String]): Unit = { if (args.length != 2) { System. err .println( "Usage: KafkaDirectWordCount <brokers> <topics>" ) System. exit (1) } val Array (brokers, topics) = args val sparkConf = new SparkConf() //.setAppName("KafkaReceiverWordCount") //.setMaster("local[2]") val ssc = new StreamingContext(sparkConf, Seconds (5)) val topicsSet = topics.split( "," ).toSet val kafkaParams = Map [String,String]( "metadata.broker.list" -> brokers) // TODO... Spark Streaming如何對接Kafka val messages = KafkaUtils. createDirectStream [String,String,StringDecoder,StringDecoder]( ssc,kafkaParams,topicsSet ) // TODO... 自己去測試為什麼要取第二個 messages.map(_._2).flatMap(_.split( " " )).map((_,1)).reduceByKey(_+_).print() ssc.start() ssc.awaitTermination() } } |
<project xmlns="
http://maven.apache.org/POM/4.0.0
" xmlns:xsi="
http://www.w3.org/2001/XMLSchema-instance
"
xsi:schemaLocation="
http://maven.apache.org/POM/4.0.0
http://maven.apache.org/maven-v4_0_0.xsd
">
<modelVersion>4.0.0</modelVersion>
<groupId>com.imooc.spark</groupId>
<artifactId>sparktrain</artifactId>
<version>1.0</version>
<inceptionYear>2008</inceptionYear>
<properties>
<scala.version>2.11.8</scala.version>
<kafka.version>0.9.0.0</kafka.version>
<spark.version>2.2.0</spark.version>
<hadoop.version>2.6.0-cdh5.7.0</hadoop.version>
<hbase.version>1.2.0-cdh5.7.0</hbase.version>
</properties>
<!--新增cloudera的repository--> <repositories> <repository> <id>cloudera</id> <url> https://repository.cloudera.com/artifactory/cloudera-repos </url> </repository> </repositories> <dependencies> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <!-- Kafka 依賴--> <!-- <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>${kafka.version}</version> </dependency> --> <!-- Hadoop 依賴--> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> <!-- HBase 依賴--> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>${hbase.version}</version> </dependency> <!-- Spark Streaming 依賴--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>${spark.version}</version> </dependency> <!-- Spark Streaming整合Flume 依賴--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-flume_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-flume-sink_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-8_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId> org.apache.commons </groupId> <artifactId>commons-lang3</artifactId> <version>3.5</version> </dependency> <!-- Spark SQL 依賴--> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>com.fasterxml.jackson.module</groupId> <artifactId>jackson-module-scala_2.11</artifactId> <version>2.6.5</version> </dependency> <dependency> <groupId>net.jpountz.lz4</groupId> <artifactId>lz4</artifactId> <version>1.3.0</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.38</version> </dependency> <dependency> <groupId>org.apache.flume.flume-ng-clients</groupId> <artifactId>flume-ng-log4jappender</artifactId> <version>1.6.0</version> </dependency> </dependencies> <build> <sourceDirectory>src/main/scala</sourceDirectory> <testSourceDirectory>src/test/scala</testSourceDirectory> <plugins> <plugin> <groupId>org.scala-tools</groupId> <artifactId>maven-scala-plugin</artifactId> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> <configuration> <scalaVersion>${scala.version}</scalaVersion> <args> <arg>-target:jvm-1.5</arg> </args> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-eclipse-plugin</artifactId> <configuration> <downloadSources>true</downloadSources> <buildcommands> <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand> </buildcommands> <additionalProjectnatures> <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature> </additionalProjectnatures> <classpathContainers> <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer> <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer> </classpathContainers> </configuration> </plugin> </plugins> </build> |