On Tue, 2005-08-30 at 19:05, Eric Hodel wrote:
[lots of really useful feedback deleted]

Thanks for the feedback, Eric.  I'd actually made some other changes to
it prior to getting your comments.  I have to say, looking at the
difference, I can see some value in the ThreadGroup, but I don't really
get why you're so excited about it... this is for another day, I think.

In other news, I found the mystery multiplying threads and have a
problem that I'm not quite sure how to solve with the tools at hand...
Maybe someone out there can help, but maybe the answer is to just roll
my own something-or-other again.

The problem came from two seemingly benign blocks of code:

$ cat fu.rb
require 'thread'
require 'timeout'

@queue = Queue.new

def read(timeout)
  begin
    Timeout::timeout(timeout) do
      puts("READ:   #{@queue.length} elements; #{@queue.num_waiting} threads waiting.")
      return @queue.deq
    end
  rescue Timeout::Error
    puts("TIMEOUT:  #{@queue.length} elements; #{@queue.num_waiting} threads waiting.")
  end
end

and this (/usr/lib/ruby/1.8/thread.rb):

    272   #
    273   # Retrieves data from the queue.  If the queue is empty, the calling thread is
    274   # suspended until data is pushed onto the queue.  If +non_block+ is true, the
    275   # thread isn't suspended, and an exception is raised.
    276   #
    277   def pop(non_block=false)
    278     while (Thread.critical = true; @que.empty?)
    279       raise ThreadError, "queue empty" if non_block
    280       @waiting.push Thread.current
    281       Thread.stop
    282     end
    283     @que.shift
    284   ensure
    285     Thread.critical = false
    286   end
    287   alias shift pop
    288   alias deq pop

because of this (/usr/lib/ruby/1.8/timeout.rb):

     32 module Timeout
     33   class Error<Interrupt
     34   end
     35
     36   def timeout(sec, exception=Error)
     37     return yield if sec == nil or sec.zero?
     38     begin
     39       x = Thread.current
     40       y = Thread.start {
     41         sleep sec
     42         x.raise exception, "execution expired" if x.alive?
     43       }
     44       yield sec
     45       #    return true
     46     ensure
     47       y.kill if y and y.alive?
     48     end
     49   end
     50   module_function :timeout
     51 end

The key line is 280.  The problem seems to come from @waiting being an
array which holds on to references to the threads created in line 40
(which is also why I couldn't find it because I was tracing for
Thread#new and not Thread#start doh!!).  Even though the thread seems to
definitely be killed in line 47, the array still holds a reference to
it, so I'm guessing that like Java, this prevents garbage collection for
a while.  Therefore, when I was looking at the list, the threads created
here were still in it.

I'm a bit worried about this block in thread.rb, though:

    250   #
    251   # Pushes +obj+ to the queue.
    252   #
    253   def push(obj)
    254     Thread.critical = true
    255     @que.push obj
    256     begin
    257       t = @waiting.shift
    258       t.wakeup if t
    259     rescue ThreadError
    260       retry
    261     ensure
    262       Thread.critical = false
    263     end
    264     begin
    265       t.run if t
    266     rescue ThreadError
    267     end
    268   end
    269   alias << push
    270   alias enq push

because I really don't like what I've observed happening in #257/8
here.  Based on what I'm doing, the waiting array will eventually get
huge... and I mean, HUGE.  Also, based on further experiments, adding
items to the queue will reduce @waiting.length by n, however there will
be a lot more attempted reads than there will attempted writes to the
queue.

I'm open to suggestions, but I can't just do it without the timer
because I need to get control back every n seconds so I can do things
like graceful shutdown, etc.

Any help greatly appreciated.  Thanks in advance,

ast

Here's the full test program (not out to win any style awards with the
calls to read, btw) :)

$ cat fu.rb
require 'thread'
require 'timeout'

@queue = Queue.new

def read(timeout)
  begin
    Timeout::timeout(timeout) do
      puts("READ:   #{@queue.length} elements; #{@queue.num_waiting} threads waiting.")
      return @queue.deq
    end
  rescue Timeout::Error
    puts("TIMEOUT:  #{@queue.length} elements; #{@queue.num_waiting} threads waiting.")
  end
end

read(1); read(1); read(1); read(1); read(1); read(1)
@queue << "one"
read(1); read(1); read(1); read(1); read(1); read(1)

puts Thread.list.join(", ") # only one by the end




***************************************************************************************************
The information in this email is confidential and may be legally privileged.  Access to this email by anyone other than the intended addressee is unauthorized.  If you are not the intended recipient of this message, any review, disclosure, copying, distribution, retention, or any action taken or omitted to be taken in reliance on it is prohibited and may be unlawful.  If you are not the intended recipient, please reply to or forward a copy of this message to the sender and delete the message, any attachments, and any copies thereof from your system.
***************************************************************************************************