Kakfka-Spark Streaming-Spark SQL操作筆記
阿新 • • 發佈:2019-01-27
(kafka版本與Spark版本在maven專案中有提到)
linux下測試kafka
1.偽分散式下開啟kafka服務(啟動有先後順序):
nohup zookeeper-server-start.sh config/zookeeper.properties &
nohup kafka-server-start.sh config/server.properties &
2.kafka中的topic操作:
查詢:kafka-topics.sh –list –zookeeper localhost:2181
建立:kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic mytest
3.測試producer和consumer:
kafka-console-consumer.sh –zookeeper localhost:2181 –topic mytest
kafka-console-producer.sh –broker-list localhost:9092 –topic mytest
在producer下發送訊息,檢視consumer是否能夠接受訊息
使用java生成 producer
在maven專案中匯入包:
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.1.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-it</artifactId>
<version>1.2.5</version>
</dependency>
</dependencies>
java producer:
import com.keduox.utils.PropertiesUtil;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.io.*;
/**
* 通過java kafkaProducer傳送資料
* @Author: JunYaoWang
* @Data: Created in 12:50 2017/12/17
*/
public class JavaProducer {
public static void main(String[] args) {
KafkaProducer<String,String> producer = new KafkaProducer(PropertiesUtil.getPropertieData("producer.properties"));
//傳送訊息
try {
//讀取檔案內容
FileInputStream in = new FileInputStream("E:Workspace\\spark\\src\\main\\resources\\net.txt");
InputStreamReader inReader = new InputStreamReader(in, "UTF-8");
BufferedReader bufReader = new BufferedReader(inReader);
String line = null;
String array[] =null;
//int i = 1;
while((line = bufReader.readLine()) != null){
array = line.split(",");
producer.send(new ProducerRecord("mytest","1",array[0]+","+array[1]+","+array[2]));
}
bufReader.close();
inReader.close();
in.close();
producer.flush();
producer.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
檔案中的資料格式為:
YuanJing,male,10
FangBo,female,50
...
使用Spark Streaming接受訊息以及實現sql查詢
查詢上網時間大於120的女性
/**
* @Author: JunYaoWang
* @Data: Created in 13:15 2017/12/17
*/
case class Scorces1(name:String,sex:String,time:Integer)
case class Scorces2(name:String,time:Integer)
object KafkaStreaming {
def main(args: Array[String]): Unit = {
while (true) {
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "192.168.200.10:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "streamingKafka",
"auto.offset.reset" -> "latest ",
"enable.auto.commit" -> (true: java.lang.Boolean)
)
val conf = new SparkConf().setMaster("local[2]").setAppName("kafkaStreaming")
val sc = new SparkContext(conf)
//設定日誌等級
sc.setLogLevel("WARN")
val streamingContext = new StreamingContext(sc, Seconds(5))
streamingContext.checkpoint("hdfs://192.168.200.10:9000/keduox/checkpoint") //設定hdfs,checkpoint
val topics = Array("mytest") //設定topic
val stream = KafkaUtils.createDirectStream[String, String](
streamingContext,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)
//先將當前資料進行統計,time相加
val paymentSum = stream.map(record => (record.value))
.map(stringToDF)
.filter(_.sex.contains("female"))
.map(x => (x.name -> x.time.toInt))
.reduceByKey(_ + _)
//paymentSum.print()
//定義updateStateByKey中方法,將當前資料和歷史資料相加
val addFunction = (currValues: Seq[Int], preVauleState: Option[Int]) => {
val currentSum = currValues.sum
val previousSum = preVauleState.getOrElse(0)
Some(currentSum + previousSum)
}
val totalPayment = paymentSum.updateStateByKey[Int](addFunction)
//拿到資料並且轉換為sql
totalPayment.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
import sqlContext.implicits._
val wordsDataFrame = rdd.map(MapToDF).toDF()
//註冊虛表
wordsDataFrame.createOrReplaceTempView("time")
//執行sql語句
val sql: DataFrame = sqlContext.sql("select name,sum(time) from time group by name having sum(time)>=120")
sql.show()//打印出查詢到的資料
streamingContext.start()
streamingContext.awaitTermination()
sc.stop()
})
}
def stringToDF(line: String): Scorces1 = {
val data = line.split(",")
Scorces1(data(0), data(1), data(2).toInt)
}
def MapToDF(line: (String, Int)): Scorces2 = {
Scorces2(line._1, line._2)
}
}
}