[ruby-core:70691] Queue enhancement - conditional pop

From: Jonathan Cruz <JCruz@...>
Date: 2015-09-08 18:08:39 UTC
List: ruby-core #70691
I'm submitting a patch to enhance Queue#pop. This allows the caller to provide a block that accepts data from the queue. Queue#pop will return the first element for which the block returns a truthy value, and remove it from the queue. Without a block, Queue#pop will behave the same way it currently does.

The motivation for this enhancement: On our project, we have a queue of work and several worker threads. Some work can incur a heavy load on the system and should not be processed while another worker is processing 'heavy load' work. We need a way for Queue#pop to skip over heavy load items while another thread is processing heavy load work.

-Jonathan Cruz

________________________________

This transmission may contain information that is privileged, confidential, and/or exempt from disclosure under applicable law. If you are not the intended recipient, you are hereby notified that any disclosure, copying, distribution, or use of the information contained herein (including any reliance thereon) is strictly prohibited. If you received this transmission in error, please immediately contact the sender and destroy the material in its entirety, whether in electronic or hard copy format.

Attachments (1)

ruby_queue_conditional_pop.patch (3.77 KB, text/x-diff)
Index: ChangeLog
===================================================================
--- ChangeLog	(revision 51752)
+++ ChangeLog	(working copy)
@@ -1,3 +1,10 @@
+Tue Sep  9 02:33:45 2015  Jonathan Cruz  <jcruz@trustwave.com>
+
+	* thread_sync.c: Update Queue#pop(should_block=true)
+	  Allow the caller to provide a block that accepts data from the
+	  queue. Return the first element for which the block returns a
+	  truthy value, and remove it from the queue.
+
 Thu Sep  3 21:12:12 2015  Nobuyoshi Nakada  <nobu@ruby-lang.org>
 
 	* lib/cgi/session.rb (create_new_id): use SHA512 instead of MD5.
Index: thread_sync.c
===================================================================
--- thread_sync.c	(revision 51752)
+++ thread_sync.c	(working copy)
@@ -794,27 +794,47 @@
 static VALUE
 queue_do_pop(VALUE self, int should_block)
 {
+    VALUE que = GET_QUEUE_QUE(self);
     struct waiting_delete args;
     args.waiting = GET_QUEUE_WAITERS(self);
     args.th	 = rb_thread_current();
 
-    while (queue_length(self) == 0) {
-	if (!should_block) {
-	    rb_raise(rb_eThreadError, "queue empty");
-	}
-	else if (queue_closed_p(self)) {
-	    return queue_closed_result(self);
-	}
-	else {
-	    assert(queue_length(self) == 0);
-	    assert(queue_closed_p(self) == 0);
+   for(;;) {
+        VALUE obj = 0;
+        int obj_index = -1;
 
-	    rb_ary_push(args.waiting, args.th);
-	    rb_ensure(queue_sleep, (VALUE)0, queue_delete_from_waiting, (VALUE)&args);
-	}
+        if (rb_block_given_p()) {
+            for (i=0; i < RARRAY_LEN(que); i++) {
+                obj = RARRAY_AREF(que, i)
+                if (rb_yield(obj)) {
+                    obj_index = i;
+                    break;
+                }
+            }
+        }
+        else if (RARRAY_LEN(que) > 0) {
+            return rb_ary_shift(que);
+        }
+
+        if (obj_index >= 0) {
+            rb_ary_delete_at(que, obj_index);
+            return obj;
+        }
+
+        if (!should_block) {
+            rb_raise(rb_eThreadError, "queue empty");
+        }
+        else if (queue_closed_p(self)) {
+            return queue_closed_result(self);
+        }
+        else {
+            assert(queue_length(self) == 0);
+            assert(queue_closed_p(self) == 0);
+
+            rb_ary_push(args.waiting, args.th);
+            rb_ensure(queue_sleep, (VALUE)0, queue_delete_from_waiting, (VALUE)&args);
+        }
     }
-
-    return rb_ary_shift(GET_QUEUE_QUE(self));
 }
 
 static int
@@ -835,11 +855,12 @@
  *   deq(non_block=false)
  *   shift(non_block=false)
  *
- * Retrieves data from the queue.
+ * Retrieves data from the queue. If +block+ is given, retrieves data from the
+ * queue for which the given +block+ returns a true value.
  *
- * If the queue is empty, the calling thread is suspended until data is pushed
- * onto the queue. If +non_block+ is true, the thread isn't suspended, and an
- * exception is raised.
+ * If the queue is empty (or +block+ never returns true), the calling thread is
+ * suspended until data is pushed onto the queue. If +non_block+ is true, the
+ * thread isn't suspended, and an exception is raised.
  */
 
 static VALUE
@@ -1067,11 +1088,14 @@
  *   deq(non_block=false)
  *   shift(non_block=false)
  *
- * Retrieves data from the queue.
+ * Retrieves data from the queue. If +block+ is given, retrieves data from the
+ * queue for which the given +block+ returns a true value.
  *
- * If the queue is empty, the calling thread is suspended until data is pushed
- * onto the queue. If +non_block+ is true, the thread isn't suspended, and an
- * exception is raised.
+ * If the queue is empty (or +block+ never returns true), the calling thread is
+ * suspended until data is pushed onto the queue. If +non_block+ is true, the
+ * thread isn't suspended, and an exception is raised.
+
+
  */
 
 static VALUE

In This Thread

Prev Next