1. 程式人生 > 遊戲 >熱門獨立遊戲《Unpacking》被手遊抄襲 發行商道歉

熱門獨立遊戲《Unpacking》被手遊抄襲 發行商道歉

自定義採集器

package com.gazikel.streamaing

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.receiver.Receiver

import scala.util.Random

// 自定義資料採集器
object SparkStreaming01_MyReciver {

  def main(args: Array[String]): Unit = {

    val sparkConf = new SparkConf().setAppName("SparkStreaming01_MyReceiver").setMaster("local[*]")
    val ssc = new StreamingContext(sparkConf, Seconds(3))

    // 接受資料
    val messageDS: ReceiverInputDStream[String] = ssc.receiverStream(new MyReceiver())
    messageDS.print()

    ssc.start()
    ssc.awaitTermination()
  }

  /**
   * 自定義資料採集器
   *
   */
  // 1. 繼承Receiver,定義泛型,傳遞引數
  class MyReceiver extends Receiver[String](StorageLevel.MEMORY_ONLY) {

    private var flag = true

    override def onStart(): Unit = {
      new Thread(new Runnable {
        override def run(): Unit = {

          while(flag) {
            val message = "採集的資料為:" + new Random().nextInt(10).toString

            store(message)

            Thread.sleep(500)
          }


        }
      }).start()
    }

    override def onStop(): Unit = {
      flag = false
    }
  }

}

與Kafka連線

  1. 編寫程式碼
package com.gazikel.streamaing

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreaming02_Kafka {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setAppName("SparkStreaming02_Kafka").setMaster("local[*]")
    val ssc = new StreamingContext(sparkConf, Seconds(3))

    val kafkaParam = Map[String, Object] (
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG->"192.168.10.132:9092",
      ConsumerConfig.GROUP_ID_CONFIG->"",
      "key.deserializer"->"org.apache.kafka.common.serialization.StringDeserializer",
      "value.deserializer"->"org.apache.kafka.common.serialization.StringDeserializer"
    )

    val kafkaDataDS = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      //
      ConsumerStrategies.Subscribe[String, String](Set("atguigu"), kafkaParam)
    )

    kafkaDataDS.map(_.value()).print()


    ssc.start()
    ssc.awaitTermination()
  }
}
  1. 在 kafka中建立topics
    在建立topic之前,需要啟動zookeeper
$ zkServer.sh start

建立topic話題為atguigu

$ bin/kafka-topics.sh --zookeeper spark:2181 --create --topic atguigu --partitions 3 --replication-factor 1
  1. 生產資料
    啟動kafka
$ kafka-server-start.sh -daemon /opt/kafka_2.12-3.1.0/config/server.properties

生產資料操作

$ ./kafka-console-producer.sh --broker-list spark:9092 --topic atguigu

優雅的關閉

package com.gazikel.streamaing

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext, StreamingContextState}

/**
 * 優雅的關閉
 */
object SparkStreaming06_Close {
  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparStreaming06_Close")
    val ssc = new StreamingContext(sparkConf, Seconds(3))
    
    ssc.start()

    // 如果想要關閉採集器,那麼需要建立新的執行緒
    // 需要在第三方中新增關閉狀態
    new Thread(
      new Runnable {
        override def run(): Unit = {
          // 優雅的關閉
          // 將當前的資料處理完畢後,在關閉程序
          while (true) {
            if(true) {
              // 獲取SparkStreaming的狀態
              val state = ssc.getState()
              if (state == StreamingContextState.ACTIVE) {
                ssc.stop(true, true)
              }
              System.exit(1)
              
            }
          }
        }
      }
    )
    
    ssc.awaitTermination()

    
  }
}

恢復資料

package com.gazikel.streamaing

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreaming07_Resume {
  def main(args: Array[String]): Unit = {

    val ssc: StreamingContext = StreamingContext.getActiveOrCreate("check_point", () => {
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreaming07_Resume")
      val ssc = new StreamingContext(sparkConf, Seconds(3))

      ssc
    })

    ssc.checkpoint("check_point")

    ssc.start()
    ssc.awaitTermination()
  }
}