From: Aaron Patterson Date: 2012-04-21T07:56:37+09:00 Subject: [ruby-core:44504] Re: [ruby-trunk - Feature #6293][Assigned] new queue / blocking queues --IpbVkmxF4tDyP/Kb Content-Type: multipart/mixed; boundary="QKdGvSO+nmPlgiQ/" Content-Disposition: inline --QKdGvSO+nmPlgiQ/ Content-Type: text/plain; charset=us-ascii Content-Disposition: inline Content-Transfer-Encoding: quoted-printable On Thu, Apr 19, 2012 at 03:20:50PM +0900, SASADA Koichi wrote: > Hi, >=20 > (2012/04/17 0:24), Aaron Patterson wrote: > >>> So calling pop() means we're doing a not not blocking call. :( > >>=20 > >> How about to add try_pop? > >=20 > > try_pop seems fine, but it still seems strange to combine blocking=20 > > and non-blocking queues (but maybe *I* am the one who is strange). > >=20 > > In the case of BlockingQueue#pop in the patch I submitted, it=20 > > allows a timeout. I don't think it's a feature that should be=20 > > abandoned. >=20 > I understand that ::Queue#pop should receive timeout extra parameter. So the interface would be: queue.pop(true, 10) # or queue.pop(false, 10) It seems confusing. I do not think it is as clear as: queue =3D BlockingQueue.new queue.pop 10 I'm not sure the clarity of ::Queue API really matters though. The queues I've submitted do not allow `nil`. In this way, we can have non-blocking reads that do not throw exceptions. I think the queues I've submitted have different (but better) semantics. > However, I'm not sure we need to separate (Blocking|Unblocking)Queue yet. I would like them so that I can change queue types without changing the code that pops off the queue. I simply change the queue I instantiate, and the rest of my code should not have to change. > >> How > >>>=20 > >> about to implement Queue#to_a method that generate array which=20 > >> contains queues containing objects? > >=20 > > That seems fine! Then we can eliminate Enumerable mixed in. :-) >=20 > Yes. And it is clear semantics. >=20 > ::Queue#each can be implemented on at least two semantics: > 1) block the end of queue. (like IO) > 2) return when reach end of queue. (like Array) >=20 > Against IO, Queue#each with semantics (1) can't stop. It is similar > to the cycle object (generated by Enumerator#cycle). I agree. I've attached an updated patch that uses to_a and removes Enumera= ble. --=20 Aaron Patterson http://tenderlovemaking.com/ --QKdGvSO+nmPlgiQ/ Content-Type: text/plain; charset=us-ascii Content-Disposition: attachment; filename="queue.patch" diff --git a/lib/thread.rb b/lib/thread.rb index 88e86ab..9857599 100644 --- a/lib/thread.rb +++ b/lib/thread.rb @@ -129,7 +129,7 @@ end # # consumer = Thread.new do # 5.times do |i| -# value = queue.pop +# value = queue.shift # sleep rand(i/2) # simulate expense # puts "consumed #{value}" # end diff --git a/lib/thread/queue.rb b/lib/thread/queue.rb new file mode 100644 index 0000000..3b8d436 --- /dev/null +++ b/lib/thread/queue.rb @@ -0,0 +1,187 @@ +require 'timeout' + +class Thread + # Thread::Queue is thread safe a FIFO queue. It provdies a way to synchronize + # communication between threads. This queue does not block when items are + # removed (see Thread::Queue#remove) + # + # This queue does not allow nil elements. + class Queue + class NoSuchElementError < StandardError + end + + # + # Creates a new queue. + # + def initialize + @que = [] + @que.taint # enable tainted communication + self.taint + @mutex = Mutex.new + end + + def to_a + @que.dup + end + + # + # Adds +obj+ to the head of the queue. + # + # Raises an ArgumentError if +obj+ is nil. + # + def add(obj) + raise ArgumentError if obj.nil? + @mutex.synchronize { @que.push obj } + self + end + + alias :push :add + alias :<< :add + + def offer(obj, timeout = nil) + add obj + end + + # Retrieves data from the queue head, and removes it. + # + # Raises a NoSuchElementError if the queue is empty. + def remove + @mutex.synchronize { + raise NoSuchElementError if empty? + @que.shift + } + end + + alias :pop :remove + alias :shift :remove + alias :deq :remove + + # Retrieves data from the queue head, and removes it. + # + # Returns nil if this queue is empty. + def poll + @mutex.synchronize { + if empty? + nil + else + @que.shift + end + } + end + + # Retrieves data from the queue head, but does not removes it. + # + # Returns nil if the queue is empty + def peek + @mutex.synchronize { @que.first } + end + + # Retrieves data from the queue head, but does not removes it. + # + # Raises NoSuchElementError if the queue is empty. + def element + @mutex.synchronize { + if empty? + raise NoSuchElementError + else + @que.first + end + } + end + + # + # Returns +true+ if the queue is empty. + # + def empty? + @que.empty? + end + + # + # Removes all objects from the queue. + # + def clear + @que.clear + end + + # + # Returns the length of the queue. + # + def length + @que.length + end + alias :size :length + end + + # Thread::Queue is thread safe a FIFO queue. It provdies a way to synchronize + # communication between threads. + # + # This queue does not allow nil elements. + class BlockingQueue < Queue + def initialize + @waiting = [] + @waiting.taint + super + end + + # Retrieves data from the queue head, and removes it. + # + # If the queue is empty, remove will block until there is something + # in the queue. + def take + @mutex.synchronize { + while true + if @que.empty? + # @waiting.include? check is necessary for avoiding a race against + # Thread.wakeup [Bug 5195] + @waiting.push Thread.current unless @waiting.include?(Thread.current) + @mutex.sleep + else + return @que.shift + end + end + } + end + + alias :pop :take + alias :shift :take + alias :deq :take + + # Adds +obj+ to the head of the queue. + # + # Raises an ArgumentError if +obj+ is nil. + # + def add(obj) + raise ArgumentError if obj.nil? + + @mutex.synchronize { + @que.push obj + begin + t = @waiting.shift + t.wakeup if t + rescue ThreadError + retry + end + } + self + end + + alias :push :add + alias :<< :add + + # Retrieves data from the queue head, and removes it. + # + # Blocks for +timeout+ seconds if the queue is empty, and returns nil if + # the timeout expires. + def poll(timeout = nil) + return super() unless timeout + + begin + Timeout.timeout(timeout) do + take + end + rescue TimeoutError + nil + end + end + end +end diff --git a/test/thread/helper.rb b/test/thread/helper.rb new file mode 100644 index 0000000..d3c1258 --- /dev/null +++ b/test/thread/helper.rb @@ -0,0 +1,102 @@ +require 'minitest/autorun' +require 'thread/queue' + +class Thread + class TestCase < MiniTest::Unit::TestCase + class Latch + def initialize + @mutex = Mutex.new + @cond = ConditionVariable.new + end + + def release + @mutex.synchronize { @cond.broadcast } + end + + def await + @mutex.synchronize { @cond.wait @mutex } + end + end + + attr_reader :queue + + POISON = Object.new + + def grind(num_threads, num_objects, num_iterations, klass, *args) + from_workers = klass.new(*args) + to_workers = klass.new(*args) + + to_consumers = num_threads.times.map { + Thread.new { + while object = to_workers.pop + break if object == POISON + from_workers.push object + end + } + } + + from_consumer = Thread.new { + num_iterations.times { + num_objects.times { from_workers.pop } + } + } + + num_iterations.times { + num_objects.times { to_workers.push 99 } + } + num_threads.times { to_workers.push POISON } + + to_consumers.each { |t| t.join } + + from_consumer.join + + assert_equal 0, from_workers.size + assert_equal 0, to_workers.size + end + + def non_block_grind(num_threads, num_objects, num_iterations, klass, *args) + from_workers = klass.new(*args) + to_workers = klass.new(*args) + + to_latch = Latch.new + from_latch = Latch.new + + to_consumers = num_threads.times.map { + Thread.new { + to_latch.await + + while object = to_workers.pop + break if object == POISON + from_workers.push object + end + } + } + + from_consumer = Thread.new { + from_latch.await + + num_iterations.times { + num_objects.times { from_workers.pop } + } + } + + num_iterations.times { + num_objects.times { to_workers.push 99 } + } + num_threads.times { to_workers.push POISON } + + Thread.pass until to_consumers.all? { |c| c.status == "sleep" } + Thread.pass until from_consumer.status == "sleep" + + to_latch.release + + to_consumers.each { |t| t.join } + + from_latch.release + from_consumer.join + + assert_equal 0, from_workers.size + assert_equal 0, to_workers.size + end + end +end diff --git a/test/thread/test_blocking_queue.rb b/test/thread/test_blocking_queue.rb new file mode 100644 index 0000000..f80b063 --- /dev/null +++ b/test/thread/test_blocking_queue.rb @@ -0,0 +1,93 @@ +require 'helper' + +class Thread + class TestBlockingQueue < TestCase + attr_reader :queue + + def setup + @queue = Thread::BlockingQueue.new + super + end + + def test_add_returns_self + assert_equal queue, queue.add(1) + end + + def test_queue + grind(5, 1000, 15, Thread::BlockingQueue) + end + + def test_offer + assert queue.offer(1) + assert_equal 1, queue.length + end + + def test_clear + 10.times { |i| queue << i } + assert_equal 10, queue.length + queue.clear + assert_equal 0, queue.length + assert queue.empty? + end + + def test_add + queue.add "foo" + assert_equal "foo", queue.take + assert queue.empty? + end + + def test_add_nil + assert_raises(ArgumentError) do + queue.add nil + end + end + + def test_remove_empty + assert queue.empty? + t = Thread.new { queue.take } + queue << 1 + assert_equal 1, t.join.value + end + + def test_poll + queue.add "foo" + assert_equal "foo", queue.poll + end + + def test_poll_empty + assert_nil queue.poll + end + + def test_poll_timeout + assert_nil queue.poll(1) + + t = Thread.new { queue.poll(10) } + queue << "foo" + assert_equal "foo", t.join.value + end + + def test_peek + queue.add "foo" + assert_equal "foo", queue.peek + assert_equal "foo", queue.take + end + + def test_peek_empty + assert queue.empty? + assert_nil queue.peek + end + + def test_element + queue.add "foo" + assert_equal "foo", queue.element + assert_equal "foo", queue.take + end + + def test_element_empty + assert queue.empty? + assert_raises(Queue::NoSuchElementError) do + queue.element + end + end + end +end diff --git a/test/thread/test_non_block_queue.rb b/test/thread/test_non_block_queue.rb new file mode 100644 index 0000000..fa22a74 --- /dev/null +++ b/test/thread/test_non_block_queue.rb @@ -0,0 +1,90 @@ +require 'helper' + +class Thread + class TestQueue < TestCase + alias :grind :non_block_grind + + def setup + super + @queue = Thread::Queue.new + end + + def test_queue + grind(5, 1000, 15, Thread::Queue) + end + + def test_add_returns_self + assert_equal queue, queue.add(1) + end + + def test_offer + assert queue.offer(1) + end + + def test_clear + 10.times { |i| queue << i } + assert_equal 10, queue.length + queue.clear + assert_equal 0, queue.length + assert queue.empty? + end + + def test_add + queue.add "foo" + assert_equal "foo", queue.remove + assert queue.empty? + end + + def test_add_nil + assert_raises(ArgumentError) do + queue.add nil + end + end + + def test_remove_empty + assert queue.empty? + assert_raises(Queue::NoSuchElementError) do + queue.remove + end + end + + def test_poll + queue.add "foo" + assert_equal "foo", queue.poll + end + + def test_poll_empty + assert_nil queue.poll + end + + def test_peek + queue.add "foo" + assert_equal "foo", queue.peek + assert_equal "foo", queue.remove + end + + def test_peek_empty + assert queue.empty? + assert_nil queue.peek + end + + def test_element + queue.add "foo" + assert_equal "foo", queue.element + assert_equal "foo", queue.remove + end + + def test_element_empty + assert queue.empty? + assert_raises(Queue::NoSuchElementError) do + queue.element + end + end + + def test_offer_optionally_takes_timeout + assert queue.empty? + queue.offer 0, 10 + assert_equal 1, queue.length + end + end +end --QKdGvSO+nmPlgiQ/-- --IpbVkmxF4tDyP/Kb Content-Type: application/pgp-signature -----BEGIN PGP SIGNATURE----- Version: GnuPG v1.4.11 (Darwin) iQEcBAEBAgAGBQJPkembAAoJEJUxcLy0/6/GjRkH/jKyGx16l3/CLINK8dV8AzzL dRSF4b3JAXC2k4SLKCwS7KhMjhcKMfMFu3YLZCn/pJdn31G/5N3Tg05tSq/m/Ybt Cnhl6YIxst33yzMyj3icD53slInlJHC7Ec8g2TbWm5JZAAcyWoSmkqD9G1txe4Ud 50DX11NvZGew5oJdZpKEcXjUZp+0qTHQ1n6pOKuhcv+MFO9bugXKAVQSwu3Pt1WQ 2QZEkWiI3gv2UxOUsYhToj0Vi+PpXV4udOpYFQrN389bHv1R4SCvx5UDDhhqe9EM Bdqugx4o9EUU+NaH052K3qBj+e1neG9RWfIDQ390G2aAVbDeICqkQ/eE3ZoJW8g= =eZoK -----END PGP SIGNATURE----- --IpbVkmxF4tDyP/Kb--