1. 程式人生 > >Ruby學習之多執行緒的使用方式

Ruby學習之多執行緒的使用方式

每個正在系統上執行的程式都是一個程序。每個程序包含一到多個執行緒。執行緒是程式中一個單一的順序控制流程,在單個程式中同時執行多個執行緒完成不同的工作,稱為多執行緒。Ruby 中我們可以通過 Thread 類來建立多執行緒,Ruby的執行緒是一個輕量級的,可以以高效的方式來實現並行的程式碼,來看下啟動一個新的執行緒的程式碼格式:

# 執行緒 #1 程式碼部分
Thread.new {
  # 執行緒 #2 執行程式碼
}
# 執行緒 #1 執行程式碼

再來看下在Ruby程式中使用多執行緒的例項:

#!/usr/bin/ruby
 
def func1
   i=0
   while i<=2
      puts "func1 at: #{Time.now}"
      sleep(2)
      i=i+1
   end
end
 
def func2
   j=0
   while j<=2
      puts "func2 at: #{Time.now}"
      sleep(1)
      j=j+1
   end
end
 
puts "Started At #{Time.now}"
t1=Thread.new{func1()}
t2=Thread.new{func2()}
t1.join
t2.join
puts "End at #{Time.now}"

再來看下執行緒的生命週期:

1、執行緒的建立可以使用Thread.new,同樣可以以同樣的語法使用Thread.start 或者Thread.fork這三個方法來建立執行緒。

2、建立執行緒後無需啟動,執行緒會自動執行。

3、Thread 類定義了一些方法來操控執行緒。執行緒執行Thread.new中的程式碼塊。

4、執行緒程式碼塊中最後一個語句是執行緒的值,可以通過執行緒的方法來呼叫,如果執行緒執行完畢,則返回執行緒值,否則不返回值直到執行緒執行完畢。

5、Thread.current 方法返回表示當前執行緒的物件。 Thread.main 方法返回主執行緒。

6、通過 Thread.Join 方法來執行執行緒,這個方法會掛起主執行緒,直到當前執行緒執行完畢。

執行緒有5種狀態,如下:

執行緒狀態 返回值
可執行 run
睡眠 Sleeping
退出 aborting
正常終止 false
發生異常終止 nil

當某執行緒發生異常,且沒有被rescue捕捉到時,該執行緒通常會被無警告地終止。但是,若有其它執行緒因為Thread#join的關係一直等待該執行緒的話,則等待的執行緒同樣會被引發相同的異常,如下:

begin
  t = Thread.new do
    Thread.pass    # 主執行緒確實在等join
    raise "unhandled exception"
  end
  t.join
rescue
  p $!  # => "unhandled exception"
end

我們來看下可以讓直譯器在某個執行緒因異常而終止時中斷執行的幾個方法:

  • 啟動指令碼時指定-d選項,並以除錯模時執行。
  • Thread.abort_on_exception設定標誌。
  • 使用Thread#abort_on_exception對指定的執行緒設定標誌。

當使用上述方法之一後,整個直譯器就會被中斷,如下:

t = Thread.new { ... }
t.abort_on_exception = true

在Ruby中,提供三種實現同步的方式,如下:

1. 通過Mutex類實現執行緒同步

2. 監管資料交接的Queue類實現執行緒同步

3. 使用ConditionVariable實現同步控制

我們可以通過Mutex類實現執行緒同步控制,如果在多個執行緒鍾同時需要一個程式變數,可以將這個變數部分使用lock鎖定,如下:

#!/usr/bin/ruby
 
require "thread"
puts "Synchronize Thread"
 
@num=200
@mutex=Mutex.new
 
def buyTicket(num)
     @mutex.lock
          if @num>=num
               @[email protected]
               puts "you have successfully bought #{num} tickets"
          else
               puts "sorry,no enough tickets"
          end
     @mutex.unlock
end
 
ticket1=Thread.new 10 do
     10.times do |value|
     ticketNum=15
     buyTicket(ticketNum)
     sleep 0.01
     end
end
 
ticket2=Thread.new 10 do
     10.times do |value|
     ticketNum=20
     buyTicket(ticketNum)
     sleep 0.01
     end
end
 
sleep 1
ticket1.join
ticket2.join

我們除了使用lock鎖定變數,還可以使用try_lock鎖定變數,還可以使用Mutex.synchronize同步對某一個變數的訪問。

Queue類就是表示一個支援執行緒的佇列,能夠同步對佇列末尾進行訪問。不同的執行緒可以使用統一個對類,但是不用擔心這個佇列中的資料是否能夠同步,另外使用SizedQueue類能夠限制佇列的長度。SizedQueue類能夠非常便捷的幫助我們開發執行緒同步的應用程式,應為只要加入到這個佇列中,就不用關心執行緒的同步問題。

來看一個經典的生產者消費者的問題:

#!/usr/bin/ruby
 
require "thread"
puts "SizedQuee Test"
 
queue = Queue.new
 
producer = Thread.new do
     10.times do |i|
          sleep rand(i) # 讓執行緒睡眠一段時間
          queue << i
          puts "#{i} produced"
     end
end
 
consumer = Thread.new do
     10.times do |i|
          value = queue.pop
          sleep rand(i/2)
          puts "consumed #{value}"
     end
end
 
consumer.join

執行緒可以有其私有變數,執行緒的私有變數線上程建立的時候寫入執行緒。可以被執行緒範圍內使用,但是不能被執行緒外部進行共享。但是有時候,執行緒的區域性變數需要別別的執行緒或者主執行緒訪問怎麼辦?ruby當中提供了允許通過名字來建立執行緒變數,類似的把執行緒看做hash式的散列表。通過[]=寫入並通過[]讀出資料,來看一段程式碼:

#!/usr/bin/ruby
 
count = 0
arr = []
 
10.times do |i|
   arr[i] = Thread.new {
      sleep(rand(0)/10.0)
      Thread.current["mycount"] = count
      count += 1
   }
end
 
arr.each {|t| t.join; print t["mycount"], ", " }
puts "count = #{count}"

上述程式碼的執行流程就是,主執行緒等待子執行緒執行完成,然後分別輸出每個值。

執行緒的優先順序是影響執行緒的排程的主要因素。其他因素包括佔用CPU的執行時間長短,執行緒分組排程等等。我們可以使用 Thread.priority 方法得到執行緒的優先順序和使用 Thread.priority= 方法來調整執行緒的優先順序。執行緒的優先順序預設為 0 。 優先順序較高的執行的要快。一個 Thread 可以訪問自己作用域內的所有資料,但如果有需要在某個執行緒內訪問其他執行緒的資料應該怎麼做呢? Thread 類提供了執行緒資料互相訪問的方法,你可以簡單的把一個執行緒作為一個 Hash 表,可以在任何執行緒內使用 []= 寫入資料,使用 [] 讀出資料,來看段程式碼:

athr = Thread.new { Thread.current["name"] = "Thread A"; Thread.stop }
bthr = Thread.new { Thread.current["name"] = "Thread B"; Thread.stop }
cthr = Thread.new { Thread.current["name"] = "Thread C"; Thread.stop }
Thread.list.each {|x| puts "#{x.inspect}: #{x["name"]}" }

我們可以看到,把執行緒作為一個 Hash 表,使用 [] 和 []= 方法,實現了執行緒之間的資料共享。

Mutex(Mutal Exclusion = 互斥鎖)是一種用於多執行緒程式設計中,防止兩條執行緒同時對同一公共資源(比如全域性變數)進行讀寫的機制,先來看下不使用Mutex的案例:

#!/usr/bin/ruby
require 'thread'
 
count1 = count2 = 0
difference = 0
counter = Thread.new do
   loop do
      count1 += 1
      count2 += 1
   end
end
spy = Thread.new do
   loop do
      difference += (count1 - count2).abs
   end
end
sleep 1
puts "count1 :  #{count1}"
puts "count2 :  #{count2}"
puts "difference : #{difference}"

再來看一個使用Mutex的案例:

#!/usr/bin/ruby
require 'thread'
mutex = Mutex.new
 
count1 = count2 = 0
difference = 0
counter = Thread.new do
   loop do
      mutex.synchronize do
         count1 += 1
         count2 += 1
      end
    end
end
spy = Thread.new do
   loop do
       mutex.synchronize do
          difference += (count1 - count2).abs
       end
   end
end
sleep 1
mutex.lock
puts "count1 :  #{count1}"
puts "count2 :  #{count2}"
puts "difference : #{difference}"

兩個以上的運算單元,雙方都在等待對方停止執行,以獲取系統資源,但是沒有一方提前退出時,這種狀況,就稱為死鎖。例如,一個程序 p1佔用了顯示器,同時又必須使用印表機,而印表機被程序p2佔用,p2又必須使用顯示器,這樣就形成了死鎖。所以,當我們在使用 Mutex 物件時需要注意執行緒死鎖,來看段程式碼:

#!/usr/bin/ruby
require 'thread'
mutex = Mutex.new
 
cv = ConditionVariable.new
a = Thread.new {
   mutex.synchronize {
      puts "A: I have critical section, but will wait for cv"
      cv.wait(mutex)
      puts "A: I have critical section again! I rule!"
   }
}
 
puts "(Later, back at the ranch...)"
 
b = Thread.new {
   mutex.synchronize {
      puts "B: Now I am critical, but am done with cv"
      cv.signal
      puts "B: I am still critical, finishing up"
   }
}
a.join
b.join

咱們來看下完整的 Thread(執行緒) 類方法:

序號 方法描述
1 Thread.abort_on_exception
若其值為真的話,一旦某執行緒因異常而終止時,整個直譯器就會被中斷。它的預設值是假,也就是說,在通常情況下,若某執行緒發生異常且該異常未被Thread#join等檢測到時,該執行緒會被無警告地終止。
2 Thread.abort_on_exception=
如果設定為 true, 一旦某執行緒因異常而終止時,整個直譯器就會被中斷。返回新的狀態
3 Thread.critical
返回布林值。
4 Thread.critical=
當其值為true時,將不會進行執行緒切換。若當前執行緒掛起(stop)或有訊號(signal)干預時,其值將自動變為false。
5 Thread.current
返回當前執行中的執行緒(當前執行緒)。
6 Thread.exit
終止當前執行緒的執行。返回當前執行緒。若當前執行緒是唯一的一個執行緒時,將使用exit(0)來終止它的執行。
7 Thread.fork { block }
與 Thread.new 一樣生成執行緒。
8 Thread.kill( aThread )
終止執行緒的執行.
9 Thread.list
返回處於執行狀態或掛起狀態的活執行緒的陣列。
10 Thread.main
返回主執行緒。
11 Thread.new( [ arg ]* ) {| args | block }
生成執行緒,並開始執行。數會被原封不動地傳遞給塊. 這就可以在啟動執行緒的同時,將值傳遞給該執行緒所固有的區域性變數。
12 Thread.pass
將執行權交給其他執行緒. 它不會改變執行中的執行緒的狀態,而是將控制權交給其他可執行的執行緒(顯式的執行緒排程)。
13 Thread.start( [ args ]* ) {| args | block }
生成執行緒,並開始執行。數會被原封不動地傳遞給塊. 這就可以在啟動執行緒的同時,將值傳遞給該執行緒所固有的區域性變數。
14 Thread.stop
將當前執行緒掛起,直到其他執行緒使用run方法再次喚醒該執行緒。

來看呼叫了執行緒例項化方法 join的案例:

#!/usr/bin/ruby
 
thr = Thread.new do   # 例項化
   puts "In second thread"
   raise "Raise exception"
end
thr.join   # 呼叫例項化方法 join

最後來看下完整例項化方法:

序號 方法描述
1 thr[ name ]
取出執行緒內與name相對應的固有資料。 name可以是字串或符號。 若沒有與name相對應的資料時, 返回nil。
2 thr[ name ] =
設定執行緒內name相對應的固有資料的值, name可以是字串或符號。 若設為nil時, 將刪除該執行緒內對應資料。
3 thr.abort_on_exception
返回布林值。
4 thr.abort_on_exception=
若其值為true的話,一旦某執行緒因異常而終止時,整個直譯器就會被中斷。
5 thr.alive?
若執行緒是"活"的,就返回true。
6 thr.exit
終止執行緒的執行。返回self。
7 thr.join
掛起當前執行緒,直到self執行緒終止執行為止. 若self因異常而終止時, 將會當前執行緒引發同樣的異常。
8 thr.key?
若與name相對應的執行緒固有資料已經被定義的話,就返回true
9 thr.kill
類似於 Thread.exit
10 thr.priority
返回執行緒的優先度. 優先度的預設值為0. 該值越大則優先度越高.
11 thr.priority=
設定執行緒的優先度. 也可以將其設定為負數.
12 thr.raise( anException )
在該執行緒內強行引發異常.
13 thr.run
重新啟動被掛起(stop)的執行緒. 與wakeup不同的是,它將立即進行執行緒的切換. 若對死程序使用該方法時, 將引發ThreadError異常.
14 thr.safe_level
返回self 的安全等級. 當前執行緒的safe_level與$SAFE相同.
15 thr.status
使用字串"run"、"sleep"或"aborting" 來表示活執行緒的狀態. 若某執行緒是正常終止的話,就返回false. 若因異常而終止的話,就返回nil。
16 thr.stop?
若執行緒處於終止狀態(dead)或被掛起(stop)時,返回true.
17 thr.value
一直等到self執行緒終止執行(等同於join)後,返回該執行緒的塊的返回值. 若線上程的執行過程中發生了異常, 就會再次引發該異常.
18 thr.wakeup
把被掛起(stop)的執行緒的狀態改為可執行狀態(run), 若對死執行緒執行該方法時,將會引發ThreadError異常。

好啦,本次記錄就到這裡了。

如果感覺不錯的話,請多多點贊支援哦。。。