--IpbVkmxF4tDyP/Kb
Content-Type: multipart/mixed; boundary="QKdGvSO+nmPlgiQ/"
Content-Disposition: inline


--QKdGvSO+nmPlgiQ/
Content-Type: text/plain; charset=us-ascii
Content-Disposition: inline
Content-Transfer-Encoding: quoted-printable

On Thu, Apr 19, 2012 at 03:20:50PM +0900, SASADA Koichi wrote:
> Hi,
>=20
> (2012/04/17 0:24), Aaron Patterson wrote:
> >>> So calling pop() means we're doing a not not blocking call. :(
> >>=20
> >> How about to add try_pop?
> >=20
> > try_pop seems fine, but it still seems strange to combine blocking=20
> > and non-blocking queues (but maybe *I* am the one who is strange).
> >=20
> > In the case of BlockingQueue#pop in the patch I submitted, it=20
> > allows a timeout.  I don't think it's a feature that should be=20
> > abandoned.
>=20
> I understand that ::Queue#pop should receive timeout extra parameter.

So the interface would be:

  queue.pop(true, 10)
  # or
  queue.pop(false, 10)

It seems confusing.  I do not think it is as clear as:

  queue =3D BlockingQueue.new
  queue.pop 10

I'm not sure the clarity of ::Queue API really matters though.  The
queues I've submitted do not allow `nil`. In this way, we can have
non-blocking reads that do not throw exceptions.

I think the queues I've submitted have different (but better) semantics.

> However, I'm not sure we need to separate (Blocking|Unblocking)Queue yet.

I would like them so that I can change queue types without changing the
code that pops off the queue.  I simply change the queue I instantiate,
and the rest of my code should not have to change.

> >> How
> >>>=20
> >> about to implement Queue#to_a method that generate array which=20
> >> contains queues containing objects?
> >=20
> > That seems fine!  Then we can eliminate Enumerable mixed in. :-)
>=20
> Yes.  And it is clear semantics.
>=20
> ::Queue#each can be implemented on at least two semantics:
>   1) block the end of queue. (like IO)
>   2) return when reach end of queue. (like Array)
>=20
> Against IO, Queue#each with semantics (1) can't stop.  It is similar
> to the cycle object (generated by Enumerator#cycle).

I agree.  I've attached an updated patch that uses to_a and removes Enumera=
ble.

--=20
Aaron Patterson
http://tenderlovemaking.com/

--QKdGvSO+nmPlgiQ/
Content-Type: text/plain; charset=us-ascii
Content-Disposition: attachment; filename="queue.patch"

diff --git a/lib/thread.rb b/lib/thread.rb
index 88e86ab..9857599 100644
--- a/lib/thread.rb
+++ b/lib/thread.rb
@@ -129,7 +129,7 @@ end
 #
 #   consumer = Thread.new do
 #     5.times do |i|
-#       value = queue.pop
+#       value = queue.shift
 #       sleep rand(i/2) # simulate expense
 #       puts "consumed #{value}"
 #     end
diff --git a/lib/thread/queue.rb b/lib/thread/queue.rb
new file mode 100644
index 0000000..3b8d436
--- /dev/null
+++ b/lib/thread/queue.rb
@@ -0,0 +1,187 @@
+require 'timeout'
+
+class Thread
+  # Thread::Queue is thread safe a FIFO queue.  It provdies a way to synchronize
+  # communication between threads.  This queue does not block when items are
+  # removed (see Thread::Queue#remove)
+  #
+  # This queue does not allow nil elements.
+  class Queue
+    class NoSuchElementError < StandardError
+    end
+
+    #
+    # Creates a new queue.
+    #
+    def initialize
+      @que = []
+      @que.taint          # enable tainted communication
+      self.taint
+      @mutex = Mutex.new
+    end
+
+    def to_a
+      @que.dup
+    end
+
+    #
+    # Adds +obj+ to the head of the queue.
+    #
+    # Raises an ArgumentError if +obj+ is nil.
+    #
+    def add(obj)
+      raise ArgumentError if obj.nil?
+      @mutex.synchronize { @que.push obj }
+      self
+    end
+
+    alias :push  :add
+    alias :<<    :add
+
+    def offer(obj, timeout = nil)
+      add obj
+    end
+
+    # Retrieves data from the queue head, and removes it.
+    #
+    # Raises a NoSuchElementError if the queue is empty.
+    def remove
+      @mutex.synchronize {
+        raise NoSuchElementError if empty?
+        @que.shift
+      }
+    end
+
+    alias :pop   :remove
+    alias :shift :remove
+    alias :deq   :remove
+
+    # Retrieves data from the queue head, and removes it.
+    #
+    # Returns nil if this queue is empty.
+    def poll
+      @mutex.synchronize {
+        if empty?
+          nil
+        else
+          @que.shift
+        end
+      }
+    end
+
+    # Retrieves data from the queue head, but does not removes it.
+    #
+    # Returns nil if the queue is empty
+    def peek
+      @mutex.synchronize { @que.first }
+    end
+
+    # Retrieves data from the queue head, but does not removes it.
+    #
+    # Raises NoSuchElementError if the queue is empty.
+    def element
+      @mutex.synchronize {
+        if empty?
+          raise NoSuchElementError
+        else
+          @que.first
+        end
+      }
+    end
+
+    #
+    # Returns +true+ if the queue is empty.
+    #
+    def empty?
+      @que.empty?
+    end
+
+    #
+    # Removes all objects from the queue.
+    #
+    def clear
+      @que.clear
+    end
+
+    #
+    # Returns the length of the queue.
+    #
+    def length
+      @que.length
+    end
+    alias :size :length
+  end
+
+  # Thread::Queue is thread safe a FIFO queue.  It provdies a way to synchronize
+  # communication between threads.
+  #
+  # This queue does not allow nil elements.
+  class BlockingQueue < Queue
+    def initialize
+      @waiting = []
+      @waiting.taint
+      super
+    end
+
+    # Retrieves data from the queue head, and removes it.
+    #
+    # If the queue is empty, remove will block until there is something
+    # in the queue.
+    def take
+      @mutex.synchronize {
+        while true
+          if @que.empty?
+            # @waiting.include? check is necessary for avoiding a race against
+            # Thread.wakeup [Bug 5195]
+            @waiting.push Thread.current unless @waiting.include?(Thread.current)
+            @mutex.sleep
+          else
+            return @que.shift
+          end
+        end
+      }
+    end
+
+    alias :pop   :take
+    alias :shift :take
+    alias :deq   :take
+
+    # Adds +obj+ to the head of the queue.
+    #
+    # Raises an ArgumentError if +obj+ is nil.
+    #
+    def add(obj)
+      raise ArgumentError if obj.nil?
+
+      @mutex.synchronize {
+        @que.push obj
+        begin
+          t = @waiting.shift
+          t.wakeup if t
+        rescue ThreadError
+          retry
+        end
+      }
+      self
+    end
+
+    alias :push :add
+    alias :<<   :add
+
+    # Retrieves data from the queue head, and removes it.
+    #
+    # Blocks for +timeout+ seconds if the queue is empty, and returns nil if
+    # the timeout expires.
+    def poll(timeout = nil)
+      return super() unless timeout
+
+      begin
+        Timeout.timeout(timeout) do
+          take
+        end
+      rescue TimeoutError
+        nil
+      end
+    end
+  end
+end
diff --git a/test/thread/helper.rb b/test/thread/helper.rb
new file mode 100644
index 0000000..d3c1258
--- /dev/null
+++ b/test/thread/helper.rb
@@ -0,0 +1,102 @@
+require 'minitest/autorun'
+require 'thread/queue'
+
+class Thread
+  class TestCase < MiniTest::Unit::TestCase
+    class Latch
+      def initialize
+        @mutex = Mutex.new
+        @cond  = ConditionVariable.new
+      end
+
+      def release
+        @mutex.synchronize { @cond.broadcast }
+      end
+
+      def await
+        @mutex.synchronize { @cond.wait @mutex }
+      end
+    end
+
+    attr_reader :queue
+
+    POISON = Object.new
+
+    def grind(num_threads, num_objects, num_iterations, klass, *args)
+      from_workers = klass.new(*args)
+      to_workers = klass.new(*args)
+
+      to_consumers = num_threads.times.map {
+        Thread.new {
+          while object = to_workers.pop
+            break if object == POISON
+            from_workers.push object
+          end
+        }
+      }
+
+      from_consumer = Thread.new {
+        num_iterations.times {
+          num_objects.times { from_workers.pop }
+        }
+      }
+
+      num_iterations.times {
+        num_objects.times { to_workers.push 99 }
+      }
+      num_threads.times { to_workers.push POISON }
+
+      to_consumers.each { |t| t.join }
+
+      from_consumer.join
+
+      assert_equal 0, from_workers.size
+      assert_equal 0, to_workers.size
+    end
+
+    def non_block_grind(num_threads, num_objects, num_iterations, klass, *args)
+      from_workers = klass.new(*args)
+      to_workers = klass.new(*args)
+
+      to_latch   = Latch.new
+      from_latch = Latch.new
+
+      to_consumers = num_threads.times.map {
+        Thread.new {
+          to_latch.await
+
+          while object = to_workers.pop
+            break if object == POISON
+            from_workers.push object
+          end
+        }
+      }
+
+      from_consumer = Thread.new {
+        from_latch.await
+
+        num_iterations.times {
+          num_objects.times { from_workers.pop }
+        }
+      }
+
+      num_iterations.times {
+        num_objects.times { to_workers.push 99 }
+      }
+      num_threads.times { to_workers.push POISON }
+
+      Thread.pass until to_consumers.all? { |c| c.status == "sleep" }
+      Thread.pass until from_consumer.status == "sleep"
+
+      to_latch.release
+
+      to_consumers.each { |t| t.join }
+
+      from_latch.release
+      from_consumer.join
+
+      assert_equal 0, from_workers.size
+      assert_equal 0, to_workers.size
+    end
+  end
+end
diff --git a/test/thread/test_blocking_queue.rb b/test/thread/test_blocking_queue.rb
new file mode 100644
index 0000000..f80b063
--- /dev/null
+++ b/test/thread/test_blocking_queue.rb
@@ -0,0 +1,93 @@
+require 'helper'
+
+class Thread
+  class TestBlockingQueue < TestCase
+    attr_reader :queue
+
+    def setup
+      @queue = Thread::BlockingQueue.new
+      super
+    end
+
+    def test_add_returns_self
+      assert_equal queue, queue.add(1)
+    end
+
+    def test_queue
+      grind(5, 1000, 15, Thread::BlockingQueue)
+    end
+
+    def test_offer
+      assert queue.offer(1)
+      assert_equal 1, queue.length
+    end
+
+    def test_clear
+      10.times { |i| queue << i }
+      assert_equal 10, queue.length
+      queue.clear
+      assert_equal 0, queue.length
+      assert queue.empty?
+    end
+
+    def test_add
+      queue.add "foo"
+      assert_equal "foo", queue.take
+      assert queue.empty?
+    end
+
+    def test_add_nil
+      assert_raises(ArgumentError) do
+        queue.add nil
+      end
+    end
+
+    def test_remove_empty
+      assert queue.empty?
+      t = Thread.new { queue.take }
+      queue << 1
+      assert_equal 1, t.join.value
+    end
+
+    def test_poll
+      queue.add "foo"
+      assert_equal "foo", queue.poll
+    end
+
+    def test_poll_empty
+      assert_nil queue.poll
+    end
+
+    def test_poll_timeout
+      assert_nil queue.poll(1)
+
+      t = Thread.new { queue.poll(10) }
+      queue << "foo"
+      assert_equal "foo", t.join.value
+    end
+
+    def test_peek
+      queue.add "foo"
+      assert_equal "foo", queue.peek
+      assert_equal "foo", queue.take
+    end
+
+    def test_peek_empty
+      assert queue.empty?
+      assert_nil queue.peek
+    end
+
+    def test_element
+      queue.add "foo"
+      assert_equal "foo", queue.element
+      assert_equal "foo", queue.take
+    end
+
+    def test_element_empty
+      assert queue.empty?
+      assert_raises(Queue::NoSuchElementError) do
+        queue.element
+      end
+    end
+  end
+end
diff --git a/test/thread/test_non_block_queue.rb b/test/thread/test_non_block_queue.rb
new file mode 100644
index 0000000..fa22a74
--- /dev/null
+++ b/test/thread/test_non_block_queue.rb
@@ -0,0 +1,90 @@
+require 'helper'
+
+class Thread
+  class TestQueue < TestCase
+    alias :grind :non_block_grind
+
+    def setup
+      super
+      @queue = Thread::Queue.new
+    end
+
+    def test_queue
+      grind(5, 1000, 15, Thread::Queue)
+    end
+
+    def test_add_returns_self
+      assert_equal queue, queue.add(1)
+    end
+
+    def test_offer
+      assert queue.offer(1)
+    end
+
+    def test_clear
+      10.times { |i| queue << i }
+      assert_equal 10, queue.length
+      queue.clear
+      assert_equal 0, queue.length
+      assert queue.empty?
+    end
+
+    def test_add
+      queue.add "foo"
+      assert_equal "foo", queue.remove
+      assert queue.empty?
+    end
+
+    def test_add_nil
+      assert_raises(ArgumentError) do
+        queue.add nil
+      end
+    end
+
+    def test_remove_empty
+      assert queue.empty?
+      assert_raises(Queue::NoSuchElementError) do
+        queue.remove
+      end
+    end
+
+    def test_poll
+      queue.add "foo"
+      assert_equal "foo", queue.poll
+    end
+
+    def test_poll_empty
+      assert_nil queue.poll
+    end
+
+    def test_peek
+      queue.add "foo"
+      assert_equal "foo", queue.peek
+      assert_equal "foo", queue.remove
+    end
+
+    def test_peek_empty
+      assert queue.empty?
+      assert_nil queue.peek
+    end
+
+    def test_element
+      queue.add "foo"
+      assert_equal "foo", queue.element
+      assert_equal "foo", queue.remove
+    end
+
+    def test_element_empty
+      assert queue.empty?
+      assert_raises(Queue::NoSuchElementError) do
+        queue.element
+      end
+    end
+
+    def test_offer_optionally_takes_timeout
+      assert queue.empty?
+      queue.offer 0, 10
+      assert_equal 1, queue.length
+    end
+  end
+end

--QKdGvSO+nmPlgiQ/--

--IpbVkmxF4tDyP/Kb
Content-Type: application/pgp-signature

-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.11 (Darwin)

iQEcBAEBAgAGBQJPkembAAoJEJUxcLy0/6/GjRkH/jKyGx16l3/CLINK8dV8AzzL
dRSF4b3JAXC2k4SLKCwS7KhMjhcKMfMFu3YLZCn/pJdn31G/5N3Tg05tSq/m/Ybt
Cnhl6YIxst33yzMyj3icD53slInlJHC7Ec8g2TbWm5JZAAcyWoSmkqD9G1txe4Ud
50DX11NvZGew5oJdZpKEcXjUZp+0qTHQ1n6pOKuhcv+MFO9bugXKAVQSwu3Pt1WQ
2QZEkWiI3gv2UxOUsYhToj0Vi+PpXV4udOpYFQrN389bHv1R4SCvx5UDDhhqe9EM
Bdqugx4o9EUU+NaH052K3qBj+e1neG9RWfIDQ390G2aAVbDeICqkQ/eE3ZoJW8g=
=eZoK
-----END PGP SIGNATURE-----

--IpbVkmxF4tDyP/Kb--

-----BEGIN PGP SIGNATURE-----
Version: GnuPG v1.4.11 (Darwin)

iQEcBAEBAgAGBQJPkembAAoJEJUxcLy0/6/GjRkH/jKyGx16l3/CLINK8dV8AzzL
dRSF4b3JAXC2k4SLKCwS7KhMjhcKMfMFu3YLZCn/pJdn31G/5N3Tg05tSq/m/Ybt
Cnhl6YIxst33yzMyj3icD53slInlJHC7Ec8g2TbWm5JZAAcyWoSmkqD9G1txe4Ud
50DX11NvZGew5oJdZpKEcXjUZp+0qTHQ1n6pOKuhcv+MFO9bugXKAVQSwu3Pt1WQ
2QZEkWiI3gv2UxOUsYhToj0Vi+PpXV4udOpYFQrN389bHv1R4SCvx5UDDhhqe9EM
Bdqugx4o9EUU+NaH052K3qBj+e1neG9RWfIDQ390G2aAVbDeICqkQ/eE3ZoJW8g=
=eZoK
-----END PGP SIGNATURE-----