Issue #10600 has been updated by John Anderson.


Koichi Sasada wrote:
> Interesting. I understand your motivation.

It's always nice to be understood ;-)

> I have several questions (design choise)
>
> (1) should we flush all remaining items in queue when it is closing?
>
> This specification can be interrupt.

I'm not sure what you mean here?

> Now, your proposal does not flush.

No, because in some cases there will still be items in the queue which the consumers have not finished processing. Flush on close would mean those items would be lost.

queue.close.clear would achieve flush, but it would not be atomic.

> (2) should we allow "re-open"?
>
> We can make it. But it makes thread programming difficult to control.

I'm leaning towards no. If the queue could be re-opened, the consumer side would not know with certainty when to let consumer threads end. So the shutdown simplicity would be gone.

> ----
>
> Maybe we need to survey other language / libraries.

No help from the wikipedia entry - it just assumes that the producer and consumer will run forever (while (true) ...): http://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem

Java's BlockingQueue is the same: https://docs.oracle.com/javase/6/docs/api/java/util/concurrent/BlockingQueue.html There are many stackoverflow questions on how to know when a queue is finished. Most of the answers suggest the poison pill approach :-(

.net TPL has a Complete() method http://msdn.microsoft.com/en-us/library/hh228601%28v=vs.110%29.aspx I can't find anything about re-opening.

Go allows channels to be closed https://gobyexample.com/closing-channels , does not flush items, and cannot reopen a channel https://groups.google.com/forum/#!topic/golang-nuts/e0jYSvJhPqA

This clojure library has produce-done https://github.com/martintrojer/pipejine . I couldn't find anything about re-opening, but I suspect a clojure library wouldn't go for that anyway.

Nobuyoshi Nakada wrote:
> (3) Shouldn't Queue#pop also raise an exception if the queue is empty and closed,
> instead of returning nil?

I'm not sure. nil allows for

~~~ ruby
while item = queue.deq
  ...
end
~~~

whereas StopIteration would work nicely with

~~~ ruby
loop do
  item = queue.deq
  ...
end
~~~

maybe both - queue.close(StopIteration). But that raises other questions - what to do when this happens:

~~~ ruby
queue.close
queue.close(RuntimeError.new 'queue is now closed')
queue.close(StopIteration)
~~~

But the parameter to queue.close would have to be stored anyway to know what to return from deq, so subsequent calls to close could check that the new parameter == the old parameter.

> (4) What happens on another thread which is blocked at SizedQueue#push?

Thanks, I didn't think of that. I think the reason for Queue#push to raise an exception when the queue is closed is to signal that the programmer made an error. So following that logic, when the producer side calls queue.close and then continues to enq items, that's a programmer error.

Does that make sense? If so I'll update the patch to make SizedQueue#push behave like that.


----------------------------------------
Feature #10600: [PATCH] Queue#close
https://bugs.ruby-lang.org/issues/10600#change-50410

* Author: John Anderson
* Status: Open
* Priority: Normal
* Assignee: 
* Category: 
* Target version: 
----------------------------------------
In a multiple-producer / multiple-consumer situation using blocking enq and deq, closing a queue cleanly is difficult. It's possible using a queue poison token, but unpleasant because either producers have to know how to match up number of poison tokens with number of consumers, or consumers have to keep putting the poison back into the queue which complicates testing for empty and not blocking on deq.

This patch (from trunk at b2a128f) implements Queue#close which will close the queue to producers, leaving consumers to deq the remaining items. Once the queue is both closed and empty, consumers will not block. When an empty queue is closed, all consumers blocking on deq will be woken up and given nil.

With Queue#close, clean queue shutdown is simple:

~~~ ruby
queue = SizedQueue.new 1000

consumer_threads = lots_of.times.map do
  Thread.new do
    while item = queue.pop
      do_work item
    end
  end
end

source = somewhat_async_enumerator

producer_threads = a_few.times.map do
  Thread.new do
    loop{queue << source.next}
  end
end

producer_threads.each &:join
queue.close
consumer_threads.each &:join
~~~


---Files--------------------------------
queue-close.diff (5.18 KB)


-- 
https://bugs.ruby-lang.org/