On Wed, 30 Apr 2008, Abdul-rahman Advany wrote:

> I tried to use Threads but somehow when managing a pool of threads
> doesn't work (they get stuck while doing stuff and don't die). I tried
> to use the code below but somehow the pool fill's up and I keep waiting
> for new threads to become available...
>
> pool = ThreadPool.new(10) # up to 10 threads
> pool.process do
>  timeout(4) do
>    fetch pages, parse stuff, enc...
>  end
> end

Try my nifty MultiThread class. Creates a pool of N worker threads (no
point in creating much more than you have CPU cores to do the work
anyway)

require 'thread'
Thread.abort_on_exception = true

class MultiFail < Exception
   attr_reader :queue

   def initialize( _queue)
     @queue = _queue
   end
end

class MultiThread
   private

   def do_stuff
     job = @jobs.deq
     while job
       job.call(Thread.current[:index])
       job = @jobs.deq
     end
   rescue Exception => failure
     @failed << failure
   end

   public

   # Spawns a pool of _jobs worker threads
   def initialize( _jobs = 1)
     raise "Insufficient threads to do anything! '#{_jobs}'" if _jobs <= 0
     @jobs = SizedQueue.new( 2 * _jobs)
     @threads = Array.new(_jobs){|i| Thread.new{Thread.current[:index]=i;do_stuff}}
     @failed = Queue.new
   end

   # Run block in one of the threads
   def run(&block)
     raise MultiFail.new(@failed) if @failed.size > 0
     @jobs.enq( block)
   end

   # Wait until all threads are finished doing whatever they're doing.
   def join
     @threads.each{|t| @jobs.enq nil}
     @threads.each{|t| t.join}
     raise MultiFail.new(@failed) if @failed.size > 0
   end
end

if $0 == __FILE__ then
   require 'test/unit'

   class TC_MultiThread < Test::Unit::TestCase
     def initialize(test)
       super(test)
       @c = 0
     end

     def wrap(s)
       @c += s
       if @c > 70
         puts
         @c = 0
       end
     end

     def dot(c)
       s = sprintf( '%x< ',c)
       print s
       wrap s.size
     end

     def undot(c)
       s = sprintf( '>%x ',c )
       print s
       wrap s.size
     end

     def try_for(loops,threads)
       puts "Trying [#{loops},#{threads}]"
       i = 0
       k = 0
       max = 0
       mutex = Mutex.new
       multi_thread = MultiThread.new(threads)

       loops.times do |j|
         multi_thread.run do |t|
           dot(t)
           mutex.synchronize do
             i += 1
           end
           sleep 1
           mutex.synchronize do
             assert( i <= threads)
             k +=1
             max = i if i > max
           end
           mutex.synchronize do
             i -= 1
           end
           undot(t)
         end
       end
       multi_thread.join
       assert_equal(0, i)
       assert( ((threads <= 1) || (loops <= 1)) || max > 1)
       assert_equal( loops, k)
     end

     def test_multi
       assert_raises(RuntimeError){ try_for(0,0)}
       try_for(0,1)
       try_for(0,2)
       try_for(1,1)
       try_for(2,1)
       try_for(2,2)
       try_for(2,100)
       try_for(3,1)
       try_for(3,2)
       try_for(3,3)
       try_for(3,100)
       try_for(100,100)
     end

     def test_fail
       multi_thread = MultiThread.new(3)

       multi_thread.run do
         sleep 2
       end

       multi_thread.run do
         raise "This thread failed for test purposes"
       end

       assert_raises( MultiFail) do
         multi_thread.run do
           sleep 2
         end
       end

       begin
         multi_thread.join
       rescue MultiFail => multi_fail
         assert_equal( RuntimeError, multi_fail.queue.pop.class)
       end
     end
   end

end




John Carter                             Phone : (64)(3) 358 6639
Tait Electronics                        Fax   : (64)(3) 359 4632
PO Box 1645 Christchurch                Email : john.carter / tait.co.nz
New Zealand