熱門獨立遊戲《Unpacking》被手遊抄襲 發行商道歉
阿新 • • 發佈:2022-01-27
自定義採集器
package com.gazikel.streamaing import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.receiver.Receiver import scala.util.Random // 自定義資料採集器 object SparkStreaming01_MyReciver { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("SparkStreaming01_MyReceiver").setMaster("local[*]") val ssc = new StreamingContext(sparkConf, Seconds(3)) // 接受資料 val messageDS: ReceiverInputDStream[String] = ssc.receiverStream(new MyReceiver()) messageDS.print() ssc.start() ssc.awaitTermination() } /** * 自定義資料採集器 * */ // 1. 繼承Receiver,定義泛型,傳遞引數 class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY) { private var flag = true override def onStart(): Unit = { new Thread(new Runnable { override def run(): Unit = { while(flag) { val message = "採集的資料為:" + new Random().nextInt(10).toString store(message) Thread.sleep(500) } } }).start() } override def onStop(): Unit = { flag = false } } }
與Kafka連線
- 編寫程式碼
package com.gazikel.streamaing import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext} object SparkStreaming02_Kafka { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("SparkStreaming02_Kafka").setMaster("local[*]") val ssc = new StreamingContext(sparkConf, Seconds(3)) val kafkaParam = Map[String, Object] ( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG->"192.168.10.132:9092", ConsumerConfig.GROUP_ID_CONFIG->"", "key.deserializer"->"org.apache.kafka.common.serialization.StringDeserializer", "value.deserializer"->"org.apache.kafka.common.serialization.StringDeserializer" ) val kafkaDataDS = KafkaUtils.createDirectStream[String, String]( ssc, LocationStrategies.PreferConsistent, // ConsumerStrategies.Subscribe[String, String](Set("atguigu"), kafkaParam) ) kafkaDataDS.map(_.value()).print() ssc.start() ssc.awaitTermination() } }
- 在 kafka中建立topics
在建立topic之前,需要啟動zookeeper
$ zkServer.sh start
建立topic話題為atguigu
$ bin/kafka-topics.sh --zookeeper spark:2181 --create --topic atguigu --partitions 3 --replication-factor 1
- 生產資料
啟動kafka
$ kafka-server-start.sh -daemon /opt/kafka_2.12-3.1.0/config/server.properties
生產資料操作
$ ./kafka-console-producer.sh --broker-list spark:9092 --topic atguigu
優雅的關閉
package com.gazikel.streamaing
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext, StreamingContextState}
/**
* 優雅的關閉
*/
object SparkStreaming06_Close {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparStreaming06_Close")
val ssc = new StreamingContext(sparkConf, Seconds(3))
ssc.start()
// 如果想要關閉採集器,那麼需要建立新的執行緒
// 需要在第三方中新增關閉狀態
new Thread(
new Runnable {
override def run(): Unit = {
// 優雅的關閉
// 將當前的資料處理完畢後,在關閉程序
while (true) {
if(true) {
// 獲取SparkStreaming的狀態
val state = ssc.getState()
if (state == StreamingContextState.ACTIVE) {
ssc.stop(true, true)
}
System.exit(1)
}
}
}
}
)
ssc.awaitTermination()
}
}
恢復資料
package com.gazikel.streamaing
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object SparkStreaming07_Resume {
def main(args: Array[String]): Unit = {
val ssc: StreamingContext = StreamingContext.getActiveOrCreate("check_point", () => {
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming07_Resume")
val ssc = new StreamingContext(sparkConf, Seconds(3))
ssc
})
ssc.checkpoint("check_point")
ssc.start()
ssc.awaitTermination()
}
}