1. 程式人生 > >scala剖析PriorityQueue,權值的使用

scala剖析PriorityQueue,權值的使用

基於堆實現的優先順序佇列:PriorityQueue

建立:

new PriorityQueue()(implicit ord:Ordering[A])

這裡涉及到Ordering特質,看一個demo

import scala.util.Sorting
val pairs = Array(("a",5,2),("c",3,1),("b",1,3),("a",6,2))

Sorting.quickSort(pairs)(Ordering.by[(String,Int,Int),Int](_._3).reverse)
pairs.foreach(println)

Sorting.quickSort(pairs)(Ordering[(Int,String)].on(x => (x._3,x._1)))
pairs.foreach(println)

先定義一個數組,數組裡面是tuple,型別為(String,Int,Int)
Sorting.quickSort其實有一些方法是包裝的java中的java.util.Arrays.sort


def quickSort(a:Array[Double]):Uint = java.util.Arrays.sort(a)

其餘還有Int和Float型別
而對其他自定義型別的Sort是基於Ordering特質實現的

    def quickSort[K:Ordering](a:Array[K]):Unit = ???

所以這裡在第一個我們by方法實質上是

    def
by[T,S](f:T => S)(implicit ord:Ordering[S]):Ordering[T] = fromLessThan((x,y) =>ord.lt(f(x),f(y)))

類似於

    def compose(x:T,y:T) = Ordering[S].compose(f(x),f(y))

也就是說這裡對tuple的第三個變數進行排序,而reverse則進行逆序。

下面迴歸正題

PriorityQueue其實最好用的地方就是會根據進入佇列元素的權進行資料的讀取
這個類的使用需要隱式匯入一個Ordering[A],而且按照優先順序讀取的方法只有dequeue和dequeueAll兩個方法,drop和iterator則會移除佇列中的元素。

order

implicit  val ord:Ordering[(Int,String)] = Ordering.by(_._1)
val priorityDemo = collection.mutable.PriorityQueue[(Int,String)]()
priorityDemo.enqueue((2,"hello"))
priorityDemo.enqueue((5,"ct"))
priorityDemo.enqueue((1,"work"))
priorityDemo.enqueue((3,"word"))
(1 to priorityDemo.size).foreach(x =>println(priorityDemo.dequeue()))

order

下面是關於一個重構技巧的:

List(1, 2, 3).foldLeft(1){(x,y) => x * y} 
List(1, 2, 3).fold(0)(_ + _) 
List(1, 2, 3).reduce(_ + _) 
List(1, 2, 3).reduceLeft(_ min _) //List(1,2,3).reduceLeft{(x,y) =>if(x<y) x else y}
List(1, 2, 3).reduce((x, y) => math.max(x, y))
After:
List(1, 2, 3).product 
List(1, 2, 3).sum 
List(1, 2, 3).sum 
List(1, 2, 3).min 
List(1, 2, 3).max

言歸正傳

我們本次的需求是實現一個PriorityTaskPool的類支援任意數量的處理執行緒,由構造引數傳入,該類還應該具有一個名為asynchronous方法,其簽名為

def asynchronous(priority:Int)(task: =>Int):Unit
    package com.linewell.chapter1

/**
  * Created by ctao on 2015/11/30.
  */


import scala.collection.mutable
import scala.util.Random

object AsynchronousTest extends App {

  class PriorityTaskPool(val p:Int) {

    implicit val ord: Ordering[(Int,() => Unit)] = Ordering.by(_._1)

    private val tasks = mutable.PriorityQueue[(Int,() => Unit)]()

    /**
      * 入佇列
      * @param priority 權值
      * @param task 任務
      */
    def asynchronous(priority: Int)(task: => Unit):Unit = tasks synchronized {
      tasks.enqueue((priority,() => task))
      tasks.notify()
    }

    class Worker extends Thread {

      setDaemon(true)

      def poll() = tasks.synchronized {
        while (tasks.isEmpty) {
          tasks.wait()
        }

        /**
          * 打印出task的權值
          */
        println("queue: " + tasks.foldLeft("")((s,t)=>s"$s ${t._1}"))
        println(s"tasks size: ${tasks.size}  ${System.currentTimeMillis()}")

        /**
          * 出佇列
          */
        tasks.dequeue()
      }

      override def run() = {
        while (true) {
          poll() match {
            case (_, task) =>

              /**
                * 檢視當前執行緒
                */
              print(Thread.currentThread().getName)
              task()
          }
        }
      }
    }
    //開啟對應執行緒數量
    (1 to p).map((i) => new Worker()).foreach(_.start)

  }

  val tasks = new PriorityTaskPool(10)

  (1 to 100).foreach(i => {
    val a = Random.nextInt(10)
    tasks.asynchronous(a)({println(s"My Weight  is $a ${System.currentTimeMillis()}")})
  })

  Thread.sleep(10000)
}