まつもと ゆきひろ@トヨタケーラムです.

久々にチュートリアルを書いてみます.今回のお題は0.99で追加さ
れた新機能であるthreadです.

# 今までのチュートリアルとレベルがいきなり違いますが,許して
# ください.

--
Thread

1.はじめに

Rubyではthread機能はコンパイル時に設定されるオプションです.
手元のrubyでthreadが使えるかどうか調べるためにはコマンドライ
ンから以下のように入力してみてください.

 % ruby -le 'print defined?(Thread)'

この結果

 class-constant

と表示されればそのrubyインタプリタではthreadが使えます.もし,

 nil

と表示されたら,残念ながらそのrubyではthreadは使えません.
Rubyのthread機能は移植性が高いように作られているので,再コン
パイルだけで使えるようになる可能性が高いです.(もし自分で出
来なければ)管理者に頼んでみてください.

2.Threadとは

Threadとはひとつのプログラムの中で複数の制御の流れを扱うこと
が出来る機能です.OSで提供されるプロセスとは違ってthreadで
はメモリ空間が共有されます.

Rubyで使われているthreadはユーザレベルthreadと呼ばれるもので,
rubyインタプリタ自身が自分でthreadの切替えを行っています.こ
の方法は,OSで実装されているものよりも効率が低い,マルチCPU
を活かすことが出来ない,というデメリットがありますが,その代
わり移植性が高いというメリットがあります.

3.Threadの生成

新しいThreadを作るためにはThread.startというメソッドを使いま
す.使い方は以下の通りです.

  Thread.start { .... }

Thread.startは新しいthreadを作り,そのthreadでイテレータブロッ
クを評価します.簡単なプログラムでthreadが動く様子を見てみま
しょう.

     1	Thread.start {
     2	  while TRUE
     3	    print "thread 1\n"
     4	  end
     5	}
     6	
     7	while TRUE
     8	  print "thread 2\n"
     9	end

このプログラムを動かすと「thread 1」と「thread 2」が混じって
表示されるので,二つの無限ループが同時に動作しているのが分か
ると思います.このプログラムを終了させるためにはCtrl-Cを押し
てください.

4.Threadの操作

Threadクラスのメソッドは以下の通りです.

  Thread.start {...}
  Thread.new {...}

    新しいthreadを生成し,その中でイテレータブロックを評価す
    る.新しく生成されたthreadオブジェクトを返す.newはstart
    の別名.

  Thread.current

    現在実行しているthreadオブジェクトを返す.

  Thread.exit

    現在実行しているthreadを終了させる.

  Thread.join thread

    指定したthreadの実行が終了するまで,現在のthreadを停止さ
    せる.

  Thread.kill thread

    指定したthreadの実行を終了させる.

  Thread.pass

    実行可能な他のthreadに明示的に制御を渡す.

  Thread.stop 

    現在のtheadの実行を停止する.他のthreadがthread#runを実
    行するまで停止し続ける

  Thread#exit

    レシーバのthreadの実行を終了させる.

  Thread#run

    レシーバの実行を再開させる.

  Thread#stop

    レシーバの実行を停止させる.

  Thread#status

    レシーバがまだ生きていれば真を返す.例外によってthreadが
    終了していればその例外を発生させる.

  Thread#value

    レシーバのイテレータブロックを評価した結果を返す.まだイ
    テレータブロックの評価が終了していない時にはそのthreadが
    終了するまで待つ.

またthreadが使える条件でコンパイルされたrubyではsleepが再定
義されていて,現在のthreadだけを一定時間停止させることが出来
る.またselectもthreadを扱えるように拡張されている.

5.Thread間の同期

Threadはメモリ空間を共有しているのでThread間のデータのやりと
りは普通の変数を使って行うことができますが,動作するタイミン
グを合わせるために同期を行う必要があります.この同期に失敗す
ると,来るはずの無いデータを待って永遠に待ち続けるデッドロッ
クと呼ばれる状態になったり,期待するのと違うデータを受け取っ
て見付けにくいバグの元になったりします.

Rubyのthreadライブラリでは二つの同期方法を提供しています.ひ
とつは同期だけを行うMutexとデータの受渡しも行うQueueです.こ
れらのライブラリを使うためにはプログラムの先頭で

  require "thread"

を呼び出しておく必要があります.

5.1 Mutex

Mutexとはmutual-exclusion lock(相互排他ロック)の略です.
Mutexをロックしようとした時にすでにロックされていれば,
threadはロックが解除されるまで停止します.

並行アクセスから共有データを保護するためには以下のようなコー
ドを用いて行います(ここでmをMutexのインスタンスとします).

  begin
    m.lock
    # mで保護される共有データへのアクセス
  ensure
    m.unlock
  end

同じことをより簡単に行うためMutexにはsyncronizeというメソッ
ドがあります.

  m.syncronize {
    # mで保護される共有データへのアクセス
  }

例として簡単なプログラムを用意してみましょう.

     1	require "thread"
     2	
     3	m = Mutex.new
     4	v = 0;				# mで保護されるデータ
     5	
     6	Thread.start {
     7	  m.syncronize {
     8	    v = v + 100
     9	  }
    10	}
    11	
    12	while TRUE
    13	  m.syncronize {
    14	    v = v - 33
    15	  }
    16	end

このプログラムをMutexで保護しないと,タイミングによってはvの
値を取り出してから代入までの間に他のthreadによって値が変更さ
れてしまう可能性があります.

Mutexのメソッドは以下の通りです.

 Mutex.new

   新しいロックを生成する

 Mutex#lock

   ロックする.すでにロックされている場合にはロックが解除さ
   れるまで待つ.

 Mutex#unlock

   ロックを解除する.ロックを待っている他のthreadがあればそ
   ちらを走らせる.

 Mutex#syncronize

   ロックの獲得から解除までを行うイテレータ.

 Mutex#try_lock

   ロックを獲得する.すでにロックされている場合には停止せず
   FALSEを返す.

5.2 Queue

Queueはデータを読み書きするパイプのようなものです.データを
提供するthreadは一方からデータを書き込み,読み出すthreadはも
う一方からデータを取り出します.Queueに読み出すデータが残っ
ていない時には読み出そうとしたthreadはデータが来るまで停止し
ます.

Queueを使った簡単なプログラムは以下のようになります.

     1	require "thread"
     2	
     3	q = Queue.new
     4	
     5	Thread.start {
     6	  while gets
     7	    q.push $_
     8	  end
     9	}
    10	
    11	while TRUE
    12	  while line = q.pop
    13	    print line
    14	  end
    15	end

このプログラムではひとつのthreadが読み込んだ行をもうひとつの
行が出力しています.3行目を「q = []」などとして配列に変えて
みるとthread間の同期が取れず,正しく動かないことが分かるでしょ
う.

Queueのメソッドは以下の通りです.

 Queue.new

   新しいQueueを生成します.

 Queue.empty?

   Queueが空の時真を返します.

 Queue.push value

   Queueにvalueを追加します.

 Queue.pop [non_block]

   Queueからデータを取り出します.偽でない引数non_blockが与
   えられた場合にはQueueが空の時に例外を発生させます.それ以
   外の場合にはQueueが空の時にはQueueにデータが追加されるま
   で読み出したthreadを停止させます.

6.例題

並列プログラミングの世界では昔から有名な「哲学者の食事」問題
を作ってみましょう.

「哲学者の食事」問題とは以下のような状況で哲学者がどうやって
同期をとるかという問題です.

  N人の哲学者が丸いテーブルに座っています.テーブルの真中に
  は大きなスパゲティの皿が置いてあります.またN本のフォーク
  があって哲学者と哲学者の席の間に置いてあります.哲学者は思
  索を続けていますが,お腹がすくと両側のフォークを取ってスパ
  ゲティを食べます.お腹が一杯になると食べるのを止めてフォー
  クを返します.哲学者は紳士ですから,お腹が空いていても両方
  のフォークが手に入るまでは待ちます.

このプログラムを実行すると現在の状態を次々と表示します.各文
字の意味は以下の通りです.

  o: 考えている哲学者
  *: 食事している哲学者
  -: 使われていないフォーク 
  |: 使われているフォーク

哲学者が考えている時間と食事している時間は乱数で決めています.

     1	#
     2	# The Dining Philosophers - thread example
     3	#
     4	require "thread"
     5	
     6	N=5				# number of philosophers
     7	$forks = []
     8	for i in 0..N-1
     9	  $forks[i] = Mutex.new
    10	end
    11	$state = "-o"*N
    12	
    13	def wait
    14	  sleep rand(20)/10.0
    15	end
    16	
    17	def think(n)
    18	  wait();
    19	end
    20	
    21	def eat(n)
    22	  wait();
    23	end
    24	
    25	def philosopher(n)
    26	  while TRUE
    27	    think n
    28	    $forks[n].lock
    29	    if not $forks[(n+1)%N].try_lock
    30	      $forks[n].unlock           # avoid deadlock
    31	      continue
    32	    end
    33	    $state[n*2] = ?|;
    34	    $state[(n+1)%N*2] = ?|;
    35	    $state[n*2+1] = ?*;
    36	    print $state, "\n"
    37	    eat(n)
    38	    $state[n*2] = ?-;
    39	    $state[(n+1)%N*2] = ?-;
    40	    $state[n*2+1] = ?o;
    41	    print $state, "\n"
    42	    $forks[n].unlock
    43	    $forks[(n+1)%N].unlock
    44	  end
    45	end
    46	
    47	for i in 0..N-1
    48	  Thread.start{philosopher(i)}
    49	  sleep 0.1
    50	end
    51	sleep
    52	exit