On 30 Aug 2005, at 04:25, Andrew S. Townley wrote:

> Hi Eric
>
> On Mon, 2005-08-29 at 19:09, Eric Hodel wrote:
>
>> On 29 Aug 2005, at 08:57, Andrew S. Townley wrote:
>>
> [snip]
>
>> ThreadGroup, ThreadGroup, ThreadGroup.  Create a ThreadGroup for each
>> thread you spawn so you can see who is spawning the extra threads.
>
> Hmmm... I think I might use a ThreadGroup, no? ;)
>
>>> Also, I've looked at this from a number of different angles, but it
>>> appears to be no way to create a normal thread pool with Ruby.
>
> Ok, this was just selective stupidity on my part.  Apologies.  After
> digging out my Doug Lea Concurrent Java book, I saw what I was
> forgetting...
>
>> What do you want?
>
> Something like this (actually, it was straightforward enough once I
> thought about it a little):
>
> $ cat tpool.rb
> require 'thread'
>
> class ThreadPool
>   def initialize(size)
>     @work = Queue.new
#    @workers = []

You don't need to keep threads in an Array because you don't keep  
track of their status or #value.  (Its better to just push a value  
onto a Queue than to check #value because you can be sure you're  
always getting something that is valid.)

>     @group = ThreadGroup.new
>     @shutdown = false
>     @sh_mutex = Mutex.new

I don't think this mutex protects anything, assignment of a constant  
will be atomic.

>     size.times do
        Thread.new { @group.add Thread.current; thread.stop;  
thread_work }
>     end
>     @monitor = Thread.new do
>       Thread.stop
>       loop do
>         @sh_mutex.synchronize { Thread.current.terminate if  
> @shutdown }
>         sleep(1)
>       end
>     end
>   end
>
>   def <<(runnable)
>     @work << runnable
>     self
>   end
>
>   def thread_work
>     loop do
>       @sh_mutex.synchronize do
>         if @shutdown
>           puts "#{Thread.current} stopping";
>           Thread.current.terminate
>         end
>       end
>       puts "#{Thread.current.inspect} is one of # 
> {@work.num_waiting} waiting for work"
>       job = @work.deq
>       begin
>         job.run if job != nil
>         Thread.pass
>       rescue => e
>         puts e
>         next
>       end
>     end
>   end
>
>   def start
      @group.list.each { |w| w.run }
>     @monitor.run
>   end
>
>   def join
>     @monitor.join
>   end
>
>   def shutdown(wait = true)
      @group.enclose
>     @sh_mutex.synchronize { @shutdown = true }
      @group.list.first.join until @group.list.empty? if wait
>   end
>
>   attr_reader :group
> end
>
> class Runnable
>   def initialize(*args, &block)
>     @block = block
>   end
>
>   def run
>     @block.call
>   end
> end
>
> pool = ThreadPool.new(8)
>
> pool.start
> job1 = Runnable.new do
>   3.times { puts "#{Thread.current.inspect} - hello"; sleep(rand*3) }
> end
>
> vagrant = Runnable.new { raise "broken" }
>
> pool << job1 << vagrant << job1 << job1 << job1 << job1
> pool << vagrant << job1 << job1 << vagrant << vagrant << job1
>
> Thread.new { t = rand*2; puts "sleeping #{t}"; sleep(t);  
> pool.shutdown(false) }
> pool.join
> pool.shutdown
>
> puts "Thread group"
> pool.group.list.each { |w| puts w.inspect }

The group should always be empty after shutdown if you waited.  A  
ThreadGroup does not hold dead threads.

> puts "Thread.list"
> Thread.list.each { |w| puts w.inspect }

-- 
Eric Hodel - drbrain / segment7.net - http://segment7.net
FEC2 57F1 D465 EB15 5D6E  7C11 332A 551C 796C 9F04