[ruby-core:70691] Queue enhancement - conditional pop

From: Jonathan Cruz <JCruz@...>
Date: 2015-09-08 18:08:39 UTC
List: ruby-core #70691
I'm submitting a patch to enhance Queue#pop. This allows the caller to prov=
ide a block that accepts data from the queue. Queue#pop will return the fir=
st element for which the block returns a truthy value, and remove it from t=
he queue. Without a block, Queue#pop will behave the same way it currently =
does.

The motivation for this enhancement: On our project, we have a queue of wor=
k and several worker threads. Some work can incur a heavy load on the syste=
m and should not be processed while another worker is processing 'heavy loa=
d' work. We need a way for Queue#pop to skip over heavy load items while an=
other thread is processing heavy load work.

-Jonathan Cruz

________________________________

This transmission may contain information that is privileged, confidential,=
 and/or exempt from disclosure under applicable law. If you are not the int=
ended recipient, you are hereby notified that any disclosure, copying, dist=
ribution, or use of the information contained herein (including any relianc=
e thereon) is strictly prohibited. If you received this transmission in err=
or, please immediately contact the sender and destroy the material in its e=
ntirety, whether in electronic or hard copy format.

Attachments (1)

ruby_queue_conditional_pop.patch (3.77 KB, text/x-diff)
Index: ChangeLog
===================================================================
--- ChangeLog	(revision 51752)
+++ ChangeLog	(working copy)
@@ -1,3 +1,10 @@
+Tue Sep  9 02:33:45 2015  Jonathan Cruz  <jcruz@trustwave.com>
+
+	* thread_sync.c: Update Queue#pop(should_block=true)
+	  Allow the caller to provide a block that accepts data from the
+	  queue. Return the first element for which the block returns a
+	  truthy value, and remove it from the queue.
+
 Thu Sep  3 21:12:12 2015  Nobuyoshi Nakada  <nobu@ruby-lang.org>
 
 	* lib/cgi/session.rb (create_new_id): use SHA512 instead of MD5.
Index: thread_sync.c
===================================================================
--- thread_sync.c	(revision 51752)
+++ thread_sync.c	(working copy)
@@ -794,27 +794,47 @@
 static VALUE
 queue_do_pop(VALUE self, int should_block)
 {
+    VALUE que = GET_QUEUE_QUE(self);
     struct waiting_delete args;
     args.waiting = GET_QUEUE_WAITERS(self);
     args.th	 = rb_thread_current();
 
-    while (queue_length(self) == 0) {
-	if (!should_block) {
-	    rb_raise(rb_eThreadError, "queue empty");
-	}
-	else if (queue_closed_p(self)) {
-	    return queue_closed_result(self);
-	}
-	else {
-	    assert(queue_length(self) == 0);
-	    assert(queue_closed_p(self) == 0);
+   for(;;) {
+        VALUE obj = 0;
+        int obj_index = -1;
 
-	    rb_ary_push(args.waiting, args.th);
-	    rb_ensure(queue_sleep, (VALUE)0, queue_delete_from_waiting, (VALUE)&args);
-	}
+        if (rb_block_given_p()) {
+            for (i=0; i < RARRAY_LEN(que); i++) {
+                obj = RARRAY_AREF(que, i)
+                if (rb_yield(obj)) {
+                    obj_index = i;
+                    break;
+                }
+            }
+        }
+        else if (RARRAY_LEN(que) > 0) {
+            return rb_ary_shift(que);
+        }
+
+        if (obj_index >= 0) {
+            rb_ary_delete_at(que, obj_index);
+            return obj;
+        }
+
+        if (!should_block) {
+            rb_raise(rb_eThreadError, "queue empty");
+        }
+        else if (queue_closed_p(self)) {
+            return queue_closed_result(self);
+        }
+        else {
+            assert(queue_length(self) == 0);
+            assert(queue_closed_p(self) == 0);
+
+            rb_ary_push(args.waiting, args.th);
+            rb_ensure(queue_sleep, (VALUE)0, queue_delete_from_waiting, (VALUE)&args);
+        }
     }
-
-    return rb_ary_shift(GET_QUEUE_QUE(self));
 }
 
 static int
@@ -835,11 +855,12 @@
  *   deq(non_block=false)
  *   shift(non_block=false)
  *
- * Retrieves data from the queue.
+ * Retrieves data from the queue. If +block+ is given, retrieves data from the
+ * queue for which the given +block+ returns a true value.
  *
- * If the queue is empty, the calling thread is suspended until data is pushed
- * onto the queue. If +non_block+ is true, the thread isn't suspended, and an
- * exception is raised.
+ * If the queue is empty (or +block+ never returns true), the calling thread is
+ * suspended until data is pushed onto the queue. If +non_block+ is true, the
+ * thread isn't suspended, and an exception is raised.
  */
 
 static VALUE
@@ -1067,11 +1088,14 @@
  *   deq(non_block=false)
  *   shift(non_block=false)
  *
- * Retrieves data from the queue.
+ * Retrieves data from the queue. If +block+ is given, retrieves data from the
+ * queue for which the given +block+ returns a true value.
  *
- * If the queue is empty, the calling thread is suspended until data is pushed
- * onto the queue. If +non_block+ is true, the thread isn't suspended, and an
- * exception is raised.
+ * If the queue is empty (or +block+ never returns true), the calling thread is
+ * suspended until data is pushed onto the queue. If +non_block+ is true, the
+ * thread isn't suspended, and an exception is raised.
+
+
  */
 
 static VALUE

In This Thread

Prev Next