[ruby-core:70658] [Ruby trunk - Feature #10600] [PATCH] Queue#close

From: john@...
Date: 2015-09-03 13:08:37 UTC
List: ruby-core #70658
Issue #10600 has been updated by John Anderson.


Sorry I didn't reply earlier, it's been a while since I checked this list.

I think ClosedQueueError < StopIteration makes sense. ThreadError (from other
methods) is not related to ClosedQueueError, but I can't see if that is a
problem.

I have some real-world code (because of db-connections, operations must be on
separate threads. Might also be useful for Fibers?) which can now be
simplified to something like this:

~~~ ruby
class NotificationActor
  def initialize
    @queue = SizedQueue.new 1
  end

  def stop
    @queue.close(exception=true)
  end

  def run
    consumer = Thread.new do
      begin
        loop do
          next_item = @queue.pop
          notify_listeners_of next_item
        end
      rescue
        # shut down as quickly as possible
        @queue.close(exception=true).clear
        raise
      end
    end

    loop do
      items_from_db {|item| @queue << item }
    end

  ensure
    # shut down as quickly as possible
    @queue.close(exception=true).clear
    # raise possible exceptions from consumer
    consumer.kill unless consumer.join(5)
  end
end
~~~

I'm not very happy with that design, but I think it is a reasonable real-world use of SizedQueue.

Re-doing that code, close(exception=true) caught me out twice. deq(non_block=true)
normally catches me out too. Perhaps it's good that they both consistently
catch me out ;-)

If it is necessary to support a non-nil close token, perhaps something
like Queue.new(close_token: some_unique_object) would be better than
close(some_unique_object).

When deq/pop takes parameters (like IO#read_nonblock) the code is clear, but for one instance
of Queue those parameters will most likely be the same for every call.

Which makes me think maybe Queue.new(close_with_exception: true).

"'waiting for enq' is BEFORE enq" - yes, from the inside of the queue. From
outside the queue they are part of the same operation. The idea behind close
was to have a clean shut-down. If that still applies, maybe there needs to be
another method for emergency shut-down. Or maybe queue.close.clear is sufficient
for that?


----------------------------------------
Feature #10600: [PATCH] Queue#close
https://bugs.ruby-lang.org/issues/10600#change-54055

* Author: John Anderson
* Status: Assigned
* Priority: Normal
* Assignee: Koichi Sasada
----------------------------------------
In a multiple-producer / multiple-consumer situation using blocking enq and deq, closing a queue cleanly is difficult. It's possible using a queue poison token, but unpleasant because either producers have to know how to match up number of poison tokens with number of consumers, or consumers have to keep putting the poison back into the queue which complicates testing for empty and not blocking on deq.

This patch (from trunk at b2a128f) implements Queue#close which will close the queue to producers, leaving consumers to deq the remaining items. Once the queue is both closed and empty, consumers will not block. When an empty queue is closed, all consumers blocking on deq will be woken up and given nil.

With Queue#close, clean queue shutdown is simple:

~~~ ruby
queue = SizedQueue.new 1000

consumer_threads = lots_of.times.map do
  Thread.new do
    while item = queue.pop
      do_work item
    end
  end
end

source = somewhat_async_enumerator

producer_threads = a_few.times.map do
  Thread.new do
    loop{queue << source.next}
  end
end

producer_threads.each &:join
queue.close
consumer_threads.each &:join
~~~


---Files--------------------------------
queue-close.diff (5.18 KB)
queue-close-2.diff (10.2 KB)
patch-25f99aef.diff (25.2 KB)
queue_benchmark.rb (2.95 KB)


-- 
https://bugs.ruby-lang.org/

In This Thread

Prev Next