On Mon, May 16, 2011 at 3:19 AM, Zd Yu <zdyu2000 / gmail.com> wrote:
> I want to use multi-threading + JRuby to achieve better performance.
>
> My problem has a single input file that contains millions of lines. Each
> line is the input of a time-consuming computing and will generate an
> output result.
>
> What in my mind is:
>
> 1. the main thread prepares N working threads, and put them into sleep
> =A0 state.
> 2. the main thread open the input file and pass the file object to each
> =A0 thread[:file]
> 3. the main thread send a "GO" command to all the working threads

Did you ever hear of blocking queues?

> 4. each thread works in the below loop:
> =A0 4.1 lock the mutex associated with the file object, read a line, and
> =A0 =A0 =A0 then unlcok. if EOF is encountered, exit from the loop.
> =A0 4.2 do the very time-consuming computing
> =A0 4.3 put the result to thread.current[:result]
> =A0 4.4 signal the main thread to pick up the result
> =A0 4.5 go to sleep state (waiting for being waken up by the main thread)
> 5. the main thread works in the below loop:
> =A0 5.1 go to sleep state until being waken up by one of the working
> =A0 =A0 =A0 threads.
> =A0 5.2 check all the working threads' [:result] and extract them out and
> =A0 =A0 =A0 do some aggregation.
> =A0 5.3 for those threads whose result has been picked, send a signal to
> =A0 =A0 =A0 let them proceed with the next line.
> 6. once all the working threads finish, the main thread output the
> =A0 aggregated result.

This sounds like a very typical application of farmer worker.  You
create two queues, one for tasks and one for results.  Then you start
a thread which fetches results from the result queue and processes
them.  Then you start a number of threads which read from the tasks
queue, process tasks and place results in the result queue.  Finally
you use Thread#value to join on the result processor.

# untested
require 'thread'

WORKERS =3D 5

tasks =3D SizedQueue.new WORKERS * 10
results =3D SizedQueue.new  WORKERS * 10

agg =3D Thread.new do
  state =3D Hash.new 0
  th =3D WORKERS

  while th > 0
    x =3D results.deq

    if Thread =3D=3D=3D x
      th -=3D 1
    else
      # aggregate
      state[x] +=3D 1
    end
  end

  state
end

workers =3D WORKERS.times.map do
  Thread.new do
    until (it =3D tasks.deq) =3D=3D tasks
      results.enq(process it)
    end
    results.enq(Thread.current)
  end
end

p agg.value

Kind regards

robert

--=20
remember.guy do |as, often| as.you_can - without end
http://blog.rubybestpractices.com/