1. 程式人生 > >scala學習十一 併發程式設計

scala學習十一 併發程式設計

首先來一個生產者消費者的例子:

Producer:

  class Producer(drop : Drop) 
    extends Runnable 
  { 
    val importantInfo : Array[String] = Array( 
      "Mares eat oats", 
      "Does eat oats", 
      "Little lambs eat ivy", 
      "A kid will eat ivy too"
    ); 
  
    override def run() : Unit = 
    { 
      importantInfo.foreach((msg) => drop.put(msg)) 
      drop.put("DONE") 
    } 
  } 
Consumer:
  class Consumer(drop : Drop) 
    extends Runnable 
  { 
    override def run() : Unit = 
    { 
      var message = drop.take() 
      while (message != "DONE") 
      { 
        System.out.format("MESSAGE RECEIVED: %s%n", message) 
        message = drop.take() 
      } 
    } 
  } 
Drop:
 class Drop 
  { 
    var message : String = ""
    var empty : Boolean = true 
    var lock : AnyRef = new Object() 
  
    def put(x: String) : Unit = 
      lock.synchronized 
      { 
        // Wait until message has been retrieved 
        await (empty == true) 
        // Toggle status 
        empty = false 
        // Store message 
        message = x 
        // Notify consumer that status has changed 
        lock.notifyAll() 
      } 

    def take() : String = 
      lock.synchronized 
      { 
        // Wait until message is available. 
        await (empty == false) 
        // Toggle status 
        empty=true 
        // Notify producer that staus has changed 
        lock.notifyAll() 
        // Return the message 
        message 
      } 

    private def await(cond: => Boolean) = 
      while (!cond) { lock.wait() } 
  } 
主Object:
object ProdConSample 
 { 
  def main(args : Array[String]) : Unit = 
  { 
    // Create Drop 
    val drop = new Drop(); 
  
    // Spawn Producer 
    new Thread(new Producer(drop)).start(); 
    
    // Spawn Consumer 
    new Thread(new Consumer(drop)).start(); 
  } 
 }

Producer和 Consumer類幾乎與它們的 Java 同類相同,再一次擴充套件(實現)了 Runnable

介面並覆蓋了 run()方法,並且 —對於 Producer的情況 —分別使用了內建迭代方法來遍歷 importantInfo陣列的內容。(實際上,為了讓它更像 Scala,importantInfo可能應該是一個 List而不是 Array,但在第一次嘗試時,我希望儘可能保證它們與原始 Java 程式碼一致。)

Drop類同樣類似於它的 Java 版本。但 Scala 中有一些例外,“synchronized” 並不是關鍵字,它是針對 AnyRef類定義的一個方法,即 Scala “所有引用型別的根”。這意味著,要同步某個特定的物件,您只需要對該物件呼叫同步方法;在本例中,對 Drop上的 lock 欄位中所儲存的物件呼叫同步方法

意,我們在 await()方法定義的 Drop類中還利用了一種 Scala 機制:cond引數是等待計算的程式碼塊,而不是在傳遞給該方法之前進行計算。在 Scala 中,這被稱作 “call-by-name”;此處,它是一種實用的方法,可以捕獲需要在 Java 版本中表示兩次的條件等待邏輯(分別用於 puttake)。

最後,在 main()中,建立 Drop例項,例項化兩個執行緒,使用 start()啟動它們,然後在 main()的結束部分退出,相信 JVM 會在 main()結束之前啟動這兩個執行緒。(在生產程式碼中,可能無法保證這種情況,但對於這樣的簡單的例子,99.99 % 沒有問題。)

但是,已經說過,仍然存在相同的基本問題:程式設計師仍然需要過分擔心兩個執行緒之間的通訊和協調問題。雖然一些 Scala 機制可以簡化語法,但這目前為止並沒有相當大的吸引力

Scala 併發性 v2

Scala Library Reference 中有一個有趣的包:scala.concurrency。這個包包含許多不同的併發性結構,包括我們即將利用的 MailBox類。

顧名思義,MailBox從本質上說就是 Drop,用於在檢測之前儲存資料塊的單槽緩衝區。但是,MailBox最大的優勢在於它將傳送和接收資料的細節完全封裝到模式匹配和 case 類中,這使它比簡單的 Drop(或 Drop的多槽資料儲存類 java.util.concurrent.BoundedBuffer)更加靈活。

 package com.tedneward.scalaexamples.scala.V2 
 { 
  import concurrent.{MailBox, ops} 

  object ProdConSample 
  { 
    class Producer(drop : Drop) 
      extends Runnable 
    { 
      val importantInfo : Array[String] = Array( 
        "Mares eat oats", 
        "Does eat oats", 
        "Little lambs eat ivy", 
        "A kid will eat ivy too"
      ); 
    
      override def run() : Unit = 
      { 
        importantInfo.foreach((msg) => drop.put(msg)) 
        drop.put("DONE") 
      } 
    } 
    
    class Consumer(drop : Drop) 
      extends Runnable 
    { 
      override def run() : Unit = 
      { 
        var message = drop.take() 
        while (message != "DONE") 
        { 
          System.out.format("MESSAGE RECEIVED: %s%n", message) 
          message = drop.take() 
        } 
      } 
    } 

    class Drop 
    { 
      private val m = new MailBox() 
      
      private case class Empty() 
      private case class Full(x : String) 
      
      m send Empty()  // initialization 
      
      def put(msg : String) : Unit = 
      { 
        m receive 
        { 
          case Empty() => 
            m send Full(msg) 
        } 
      } 
      
      def take() : String = 
      { 
        m receive 
        { 
          case Full(msg) => 
            m send Empty(); msg 
        } 
      } 
    } 
  
    def main(args : Array[String]) : Unit = 
    { 
      // Create Drop 
      val drop = new Drop() 
      
      // Spawn Producer 
      new Thread(new Producer(drop)).start(); 
      
      // Spawn Consumer 
      new Thread(new Consumer(drop)).start(); 
    } 
  } 
 }

此處,v2 和 v1 之間的惟一區別在於 Drop的實現,它現在利用 MailBox類處理傳入以及從 Drop中刪除的訊息的阻塞和訊號事務。(我們可以重寫 Producer和 Consumer,讓它們直接使用 MailBox,但考慮到簡單性,我們假定希望保持所有示例中的 DropAPI 相一致。)使用 MailBox與使用典型的 BoundedBufferDrop)稍有不同,因此我們來仔細看看其程式碼。

MailBox有兩個基本操作:send和 receivereceiveWithin 方法僅僅是基於超時的 receiveMailBox接收任何型別的訊息。send()方法將訊息放置到郵箱中,並立即通知任何關心該型別訊息的等待接收者,並將它附加到一個訊息連結串列中以便稍後檢索。receive()方法將阻塞,直到接收到對於功能塊合適的訊息。

因此,在這種情況下,我們將建立兩個 case 類,一個不包含任何內容(Empty),這表示 MailBox為空,另一個包含訊息資料(Full

  • put方法,由於它會將資料放置在 Drop中,對 MailBox呼叫 receive()以查詢 Empty例項,因此會阻塞直到傳送 Empty。此時,它傳送一個Full例項給包含新資料的 MailBox
  • take方法,由於它會從 Drop中刪除資料,對 MailBox呼叫 receive()以查詢 Full例項,提取訊息(再次得益於模式匹配從 case 類內部提取值並將它們綁到本地變數的能力)併發送一個 Empty 例項給 MailBox

不需要明確的鎖定,並且不需要考慮監控程式。

Scala 併發性 v3

事實上,我們可以顯著縮短程式碼,只要 Producer 和 Consumer不需要功能全面的類(此處便是如此) —兩者從本質上說都是 Runnable.run()方法的瘦包裝器,Scala 可以使用 scala.concurrent.ops物件的 spawn方法來實現

 package com.tedneward.scalaexamples.scala.V3 
 { 
  import concurrent.MailBox 
  import concurrent.ops._ 

  object ProdConSample 
  { 
    class Drop 
    { 
      private val m = new MailBox() 
      
      private case class Empty() 
      private case class Full(x : String) 
      
      m send Empty()  // initialization 
      
      def put(msg : String) : Unit = 
      { 
        m receive 
        { 
          case Empty() => 
            m send Full(msg) 
        } 
      } 
      
      def take() : String = 
      { 
        m receive 
        { 
          case Full(msg) => 
            m send Empty(); msg 
        } 
      } 
    } 
  
    def main(args : Array[String]) : Unit = 
    { 
      // Create Drop 
      val drop = new Drop() 
      
      // Spawn Producer 
      spawn 
      { 
        val importantInfo : Array[String] = Array( 
          "Mares eat oats", 
          "Does eat oats", 
          "Little lambs eat ivy", 
          "A kid will eat ivy too"
        ); 
        
        importantInfo.foreach((msg) => drop.put(msg)) 
        drop.put("DONE") 
      } 
      
      // Spawn Consumer 
      spawn 
      { 
        var message = drop.take() 
        while (message != "DONE") 
        { 
          System.out.format("MESSAGE RECEIVED: %s%n", message) 
          message = drop.take() 
        } 
      } 
    } 
  } 
 }

spawn方法(通過包塊頂部的 ops物件匯入)接收一個程式碼塊(另一個 by-name 引數示例)並將它包裝在匿名構造的執行緒物件的 run()方法內部。事實上,並不難理解 spawn的定義在 ops類的內部是什麼樣的:
<span style="background-color: rgb(153, 153, 153);">  def spawn(p: => Unit) = { 
    val t = new Thread() { override def run() = p } 
    t.start() 
  }</span>
事實上,Scala 的併發性支援超越了 MailBox和 ops類;Scala 還支援一個類似的 “Actors” 概念,它使用了與 MailBox所採用的方法相類似的訊息傳遞方法,但應用更加全面並且靈活性也更好。



相關推薦

scala學習 併發程式設計

首先來一個生產者消費者的例子: Producer: class Producer(drop : Drop) extends Runnable { val importantInfo : Array[String] = Array(

Java的GUI學習(程式設計選單)

滴滴:想看的話看這個blog 學習來自: http://www.cnblogs.com/xingyunblog/p/3871100.html   import java.awt.FlowLayout; import java.awt.Frame; import java.awt

學習筆記 --- Java 併發程式設計總結 countDownLatch,CyclicBarrier,Semaphore

在多執行緒程式設計中有三個同步工具需要我們掌握,分別是Semaphore(訊號量),countDownLatch(倒計數門閘鎖),CyclicBarrier(可重用柵欄) CountDownLatch和CyclicBarrier都能夠實現執行緒之間的等待,只不過它們側重點不同:  

併發程式設計學習:java併發程式設計的藝術(

一: 1.併發與並行:併發是多個執行緒(任務)共同爭奪一個cpu進行處理,並行是多個cpu各自處理對應的執行緒任務,現階段都只是併發而不是真正意義上的並行。 2.上下文切換:      即使單行處理器也支援多執行緒執行任務,cpu通過給每個執行緒分配cpu時

Java併發學習()-LongAdder和LongAccumulator探究

Java8在atomic包新增了5個類,分別是Striped64,LongAdder,LongAccumulator,DoubleAdder,DoubleAccumulator。其中,Sriped64作為父類,其他分別是long和double的具體實現。 下面

多執行緒程式設計學習(ThreadPoolExecutor 詳解).

一、ThreadPoolExecutor 引數說明 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,

spring boot 學習()使用@Async實現異步調用

fontsize south 操作 dom img water 截圖 ota app 使用@Async實現異步調用 什麽是”異步調用”與”同步調用” “同步調用”就是程序按照一定的順序依次執行,,每一行程序代碼必須等上一行代碼執行完畢才能執行;”異步調用”則是只要上一行代碼

Scala學習筆記

classes 有著 對待 嚴格 所有 pub 作用 tails true Scala數據類型 下面列出的數據類型都是對象,也就是說scala沒有Java中的原生類型。在scala是可以對數字等基礎類型調用方法的。 數據類型 描述 Byte 8位有符號補碼整數

Scala學習筆記之基礎語法,條件控制,循環控制,函數,數組,集合

new 增強 指定位置 因此 手動 tex class break 減少 前言:Scala的安裝教程:http://www.cnblogs.com/biehongli/p/8065679.html 1:Scala之基礎語法學習筆記: 1:聲明val變量:可以使用va

學習

log 刪除用戶 使用 pad image 創建組 信息 用戶配置文件 一行 三周第一次課(2月5日)2.27linux和windows互傳文件3.1 用戶配置文件和密碼配置文件3.2 用戶組管理3.3 用戶管理linux和windows互傳文件首先這裏要安裝lrzsz包

SpringMVC學習()——SpringMVC實現Resultful服務

控制 不用 extc service 靜態 pku net pub 技術 http://blog.csdn.net/yerenyuan_pku/article/details/72514034 Restful就是一個資源定位及資源操作的風格,不是標準也不是協議,只是一種

Scala學習)——基礎語法

extend property obj array add 活性 devel type 單個 Scala語言是一種面向對象語言,結合了命令式(imperative)和函數式(functional)編程風格,其設計理念是創造一種更好地支持組件的語言。 特性 多範式(

機器學習() 支持向量機 SVM(上)

gin 模型 結構 線性可分 adding 統計學習 lis 可能 方法 一、什麽是支撐向量機SVM (Support Vector Machine) SVM(Support Vector Machine)指的是支持向量機,是常見的一種判別方法。在機器學習領域,是一個有監

<C++學習>標準庫string的使用(未完待續)

clu size_t ... namespace art ring star using start   使用:   1、C++標準庫負責管理和存儲字符串所占用的內存;   2、頭文件:#include<string>   3、空間域:using namespa

[scala]學習筆記

一、scala的特點: 1.Scalable程式語言 (可伸縮的,既可以是指令碼語言,又可以為大型伺服器所使用) 2.純正的面向物件的語言 3.函式式語言 4.無縫的Java互操作 二、scala函式式的程式設計思想: 1.純函式,不具有副作用 2.引用透明 3.函式是一等公民:

scala學習筆記-面向物件程式設計之Trait

將trait作為介面使用 1 // Scala中的Triat是一種特殊的概念 2 // 首先我們可以將Trait作為介面來使用,此時的Triat就與Java中的介面非常類似 3 // 在triat中可以定義抽象方法,就與抽象類中的抽象方法一樣,只要不給出方法的具體實現即可 4

SpringMVC 學習 springMVC控制器向jsp或者別的控制器傳遞引數的四種方法

以後的開發,大部分是傳送ajax,因此這四種傳遞引數的方法,並不太常用。作為了解吧 第一種:使用原生 Servlet 在控制器的響應的方法中新增Servlet中的一些作用域:HttpRequestServlet,或者HttpSession。 【注意】在方法中,ServletContext的物件是不能作為

強化學習(二) Dueling DQN 強化學習() Prioritized Replay DQN

    在強化學習(十一) Prioritized Replay DQN中,我們討論了對DQN的經驗回放池按權重取樣來優化DQN演算法的方法,本文討論另一種優化方法,Dueling DQN。本章內容主要參考了ICML 2016的deep RL tutorial和Dueling DQN的論文<Duelin

springMVC3學習()--檔案上傳CommonsMultipartFile

使用springMVC提供的CommonsMultipartFile類進行讀取檔案 需要用到上傳檔案的兩個jar包 commons-logging.jar、commons-io-xxx.jar 1、在spring配置檔案中配置檔案上傳解析器

java學習筆記java程式設計

JDK,Java Development Kit,java開發工具包; javac -version 測試設定是否正確; 學會使用Eclipse; 簡單的java應用程式 package java複習; public class FirstSample {