1. 程式人生 > >Spark的有向無環圖DAG(程式碼及圖解)

Spark的有向無環圖DAG(程式碼及圖解)

目錄:

1、有向無環圖

2、程式碼結構

3、程式碼學習步鄹及方法

4、重點程式碼講解

5、程式碼展現

6、執行結果

———————————————————————————————————

1、有向無環圖

在圖論中,如果一個有向圖無法從某個頂點出發經過若干條邊回到該點,則這個圖是一個有向無環圖(DAG圖)。
因為有向圖中一個點經過兩種路線到達另一個點未必形成環,因此有向無環圖未必能轉化成樹,但任何有向樹均為有向無環圖。
性質:有向無環圖的生成樹個數等於入度非零的節點的入度積。
這裡寫圖片描述

2、程式碼結構

這裡寫圖片描述

3、程式碼學習步鄹及方法

4、重點程式碼講解

下面這段程式碼是核心也是最難的,如何找到父節點

  //判斷Node的task節點的父節點執行狀態(flase ,true)
  private def getPending: Option[T] = {
    _pending.find { name =>
      val parents = _nodes(name)
      !parents.exists(name => !_success.contains(name))
    }
  }

1、nodes沒有父節點時,!parents.exists() 為true
2、parents.exists() 為flase時,!parents.exists() 為true
這裡寫圖片描述

5、程式碼展現

DAG.scala

package com.yh.dag

import java.time.{Duration, LocalDate}
import com.yh.nodeexecutor._
import org.slf4j.LoggerFactory
import scala.collection.immutable.{ListMap, ListSet}

/**
  * Created by yuhui on 2016/8/25.
  * task --> Node --> DAG --> DAGExecutor
  */

case class
Node[T](task: T, parent: T*) {
override def toString: String = { s"$task(${parent.mkString(",")})" } } case class DAG[T](nodes: Node[T]*) case class DAGExecutor[T](dag: DAG[T]) { private val LOG = LoggerFactory.getLogger(this.getClass) private val _nodes: Map[T, Seq[T]] = dag.nodes.map(node => (node.task, node.parent.filter(_ != null))).toMap private var _pending: Set[T] = ListSet() private var _fails = ListMap[T, String]() private var _success = Seq[T]() //判斷Node的task節點的父節點執行狀態(flase ,true) private def getPending: Option[T] = { _pending.find { name => val parents = _nodes(name) !parents.exists(name => !_success.contains(name)) } } private def fail(name: T, message: String): Unit = { _pending -= name _fails += name -> message for (child <- _pending.filter(child => _nodes(child).contains(name))) { fail(child, s"依賴的任務無法執行: $name") } } private def success(name: T): Unit = { _pending -= name _success = _success :+ name } def execute(func: T => Unit): Unit = { _pending = _nodes.keySet _fails = ListMap() _success = Seq() var running = true while (running) { val taskOpt = getPending if (taskOpt.nonEmpty) { val task = taskOpt.get val startMills = System.currentTimeMillis() LOG.info("start task {}", task) try { println("=============") func(task) //執行executor方法 println("+++++++++++++") val time = Duration.ofMillis(System.currentTimeMillis() - startMills) LOG.info(s"end task $task time=$time") success(task) } catch { case e: Throwable => fail(task, e.getMessage) LOG.error(e.getMessage, e) LOG.info(s"fail task $task") } } else { running = false } } for (name <- _success) { LOG.info(s"success task: $name") } for (name <- _fails) { LOG.info(s"fail task: ${name._1} - ${name._2}") } } } object DAG { val allSDKDAG = new DAG[Task]( Node(UserDetailsExecutor, WebSdkparseExecutor), Node(UserTagExecutor, WebSdkparseExecutor,WebSdkparseExecutor), Node(WebSdkparseExecutor), Node(UserOverviewExecutor, WebSdkparseExecutor) ) def main(args: Array[String]): Unit = { DAGExecutor(allSDKDAG).execute { task =>task.executor("appkey": String, LocalDate.now(), LocalDate.now())} } }

Task.scala

package com.yh.dag

import java.time.LocalDate
import org.apache.spark.sql.SQLContext
import org.slf4j.LoggerFactory

/**
  * Created by yuhui on 2016/12/27.
  */
abstract class Task {

  protected val LOG = LoggerFactory.getLogger(this.getClass)

  def executor(appkey: String, startDay: LocalDate, endDay: LocalDate): Unit

  def run(appkey: String, startDay: LocalDate, endDay: LocalDate): Unit = {
    executor(appkey, startDay, endDay)
  }

}

abstract class Executor extends Task with SQLContextAware {

  override def run(appkey: String, startDay: LocalDate, endDay: LocalDate)={}

}

trait SQLContextAware {
  implicit var ctx: SQLContext = _
}

UserDetailsExecutor.scala

package com.yh.nodeexecutor

import java.time.LocalDate
import com.yh.dag.Executor

object UserDetailsExecutor extends Executor{

  override def executor(appkey: String, startDay: LocalDate, endDay: LocalDate): Unit = {

    println("++++我的UserDetailsProcessor的執行過程++++")

  }

}

UserOverviewExecutor.scala

package com.yh.nodeexecutor

import java.time.LocalDate
import com.yh.dag.Executor

/**
  * Created by yuhui on 2016/12/27.
  */
object UserOverviewExecutor extends Executor{

  override def executor(appkey: String, startDay: LocalDate, endDay: LocalDate): Unit = {
    println("++++我的UserOverviewProcessor的執行過程++++")
  }

}

UserTagExecutor.scala

package com.yh.nodeexecutor

import java.time.LocalDate
import com.yh.dag.Executor

/**
  * Created by yuhui on 2016/12/27.
  */
object UserTagExecutor extends Executor{

  override def executor(appkey: String, startDay: LocalDate, endDay: LocalDate): Unit = {
    println("++++我的UserTagProcessor的執行過程++++")
  }

}

WebSdkparseExecutor.scala

package com.yh.nodeexecutor

import java.time.LocalDate
import com.yh.dag.Executor

/**
  * Created by yuhui on 2016/12/27.
  */
object WebSdkparseExecutor extends Executor{

  override def executor(appkey: String, startDay: LocalDate, endDay: LocalDate): Unit = {
    println("++++我的WebSdkparseProcessor的執行過程++++")
  }

}

6、執行結果

=============
++++我的WebSdkparseProcessor的執行過程++++
+++++++++++++
=============
++++我的UserDetailsProcessor的執行過程++++
+++++++++++++
=============
++++我的UserTagProcessor的執行過程++++
+++++++++++++
=============
++++我的UserOverviewProcessor的執行過程++++
+++++++++++++

Process finished with exit code 0

       如果您喜歡我寫的博文,讀後覺得收穫很大,不妨小額贊助我一下,讓我有動力繼續寫出高質量的博文,感謝您的讚賞!!!