[#30995] [Bug #3523] win32 exception c0000029 on exit using fibers — B Kelly <redmine@...>

Bug #3523: win32 exception c0000029 on exit using fibers

19 messages 2010/07/02

[#31100] [rubysoc] Queue C-extension patch to come — Ricardo Panaggio <panaggio.ricardo@...>

Hello,

26 messages 2010/07/07
[#31148] Re: [rubysoc] Queue C-extension patch to come — Roger Pack <rogerdpack2@...> 2010/07/09

> As this it my first patch to Ruby, I don't know where to begin with.

[#31320] Re: [rubysoc] Queue C-extension patch to come — Ricardo Panaggio <panaggio.ricardo@...> 2010/07/16

Sorry for leaving this thread for so long. I've tried to finish the

[#31322] Re: [rubysoc] Queue C-extension patch to come — Aaron Patterson <aaron@...> 2010/07/16

On Sat, Jul 17, 2010 at 06:55:35AM +0900, Ricardo Panaggio wrote:

[#31324] Re: [rubysoc] Queue C-extension patch to come — Caleb Clausen <vikkous@...> 2010/07/17

NB: I am Ricardo's mentor for this project.

[#31331] Re: [rubysoc] Queue C-extension patch to come — Benoit Daloze <eregontp@...> 2010/07/17

On 17 July 2010 06:00, Caleb Clausen <vikkous@gmail.com> wrote:

[#31332] Re: [rubysoc] Queue C-extension patch to come — Caleb Clausen <vikkous@...> 2010/07/17

On 7/17/10, Benoit Daloze <eregontp@gmail.com> wrote:

[#31138] Why is there no standard way of creating a String from a char *? — Nikolai Weibull <now@...>

Hi!

14 messages 2010/07/08
[#31146] Re: Why is there no standard way of creating a String from a char *? — Urabe Shyouhei <shyouhei@...> 2010/07/09

(2010/07/09 7:04), Nikolai Weibull wrote:

[#31149] Re: Why is there no standard way of creating a String from a char *? — Nikolai Weibull <now@...> 2010/07/09

On Fri, Jul 9, 2010 at 06:20, Urabe Shyouhei <shyouhei@ruby-lang.org> wrote:

[#31150] Re: Why is there no standard way of creating a String from a char *? — Urabe Shyouhei <shyouhei@...> 2010/07/09

(2010/07/09 18:28), Nikolai Weibull wrote:

[#31217] [Bug #3562] regression in respond_to? — Aaron Patterson <redmine@...>

Bug #3562: regression in respond_to?

14 messages 2010/07/12

[#31269] [Bug #3566] memory leak when spawning+joining Threads in a loop — Eric Wong <redmine@...>

Bug #3566: memory leak when spawning+joining Threads in a loop

14 messages 2010/07/13

[#31399] [Backport #3595] Theres no encoding to differentiate a stream of Binary data from an 8-Bit ASCII string — Dreamcat Four <redmine@...>

Backport #3595: Theres no encoding to differentiate a stream of Binary data from an 8-Bit ASCII string

17 messages 2010/07/21

[#31459] [Bug #3607] [trunk/r28731] Gem.path has disappeared? — Ollivier Robert <redmine@...>

Bug #3607: [trunk/r28731] Gem.path has disappeared?

22 messages 2010/07/23

[#31519] [Bug #3622] Net::HTTP does not wait to send request body with Expect: 100-continue — Eric Hodel <redmine@...>

Bug #3622: Net::HTTP does not wait to send request body with Expect: 100-continue

9 messages 2010/07/28

[ruby-core:31320] Re: [rubysoc] Queue C-extension patch to come

From: Ricardo Panaggio <panaggio.ricardo@...>
Date: 2010-07-16 21:55:35 UTC
List: ruby-core #31320
Sorry for leaving this thread for so long. I've tried to finish the
patch before sending anything back here.

> I would suggest you provide a working prototype as gem, so opt-in can
> be done and performance comparison can be made.

It's really difficult doing this as a gem, using the thread.c code as
it is, because there are a lot of statics from thread.c to be used.

> Do you have your intermediate work somewhere?

There's a branch on my ruby fork on github:
http://github.com/panaggio/ruby/tree/queue

> Send what you've done so far.  Maybe we can come up with a way to break
> up the patch.

I've attached my patch.

Ricardo Panaggio

Attachments (1)

queue.patch (7.31 KB, text/x-diff)
diff --git a/lib/thread.rb b/lib/thread.rb
index 88f834c..d152bd7 100644
--- a/lib/thread.rb
+++ b/lib/thread.rb
@@ -137,7 +137,7 @@ end
 #
 #   consumer.join
 #
-class Queue
+class SlowQueue
   #
   # Creates a new queue.
   #
diff --git a/test/thread/test_queue.rb b/test/thread/test_queue.rb
index 9c1818f..db269ee 100644
--- a/test/thread/test_queue.rb
+++ b/test/thread/test_queue.rb
@@ -6,9 +6,11 @@ class TestQueue < Test::Unit::TestCase
     grind(5, 1000, 15, Queue)
   end
 
+=begin
   def test_sized_queue
     grind(5, 1000, 15, SizedQueue, 1000)
   end
+=end
 
   def grind(num_threads, num_objects, num_iterations, klass, *args)
     from_workers = klass.new(*args)
diff --git a/thread.c b/thread.c
index be580ac..110bf63 100644
--- a/thread.c
+++ b/thread.c
@@ -59,6 +59,7 @@
 
 VALUE rb_cMutex;
 VALUE rb_cBarrier;
+VALUE rb_cQueue;
 
 static void sleep_timeval(rb_thread_t *th, struct timeval time);
 static void sleep_wait_for_interrupt(rb_thread_t *th, double sleepsec);
@@ -4135,6 +4136,282 @@ rb_thread_backtrace_m(VALUE thval)
 }
 
 /*
+ *  Document-class: Queue
+ *
+ *  This class provides a way to synchronize communication between threads.
+ *
+ *  Example:
+ *
+ *    require 'thread'
+ *    queue = Queue.new
+ *
+ *  producer = Thread.new do
+ *    5.times do |i|
+ *      sleep rand(i) # simulate expense
+ *      queue << i
+ *      puts "#{i} produced"
+ *    end
+ *  end
+ *
+ *  consumer = Thread.new do
+ *    5.times do |i|
+ *      value = queue.pop
+ *      sleep rand(i/2) # simulate expense
+ *      puts "consumed #{value}"
+ *    end
+ *  end
+ *
+ */
+
+typedef struct _Queue {
+    VALUE mutex;
+    VALUE que;
+    VALUE waiting;
+} Queue;
+
+static void
+queue_mark(void *ptr)
+{
+    Queue *queue = ptr;
+    rb_gc_mark(queue->mutex);
+    rb_gc_mark(queue->que);
+    rb_gc_mark(queue->waiting);
+}
+
+static void
+queue_free(void *ptr)
+{
+    int i=0;
+    int size;
+
+    if (ptr) {
+        Queue *queue = ptr;
+        size = RARRAY_LEN(queue->waiting);
+        for (i=0; i<size; i++) {
+            rb_thread_kill(rb_ary_entry(queue->waiting, i));
+        }
+    }
+    ruby_xfree(ptr);
+}
+
+static size_t
+queue_memsize(const void *ptr)
+{
+    return ptr ? sizeof(Queue) : 0;
+}
+
+static const rb_data_type_t queue_data_type = {
+    "queue",
+    queue_mark, queue_free, queue_memsize,
+};
+
+#define GetQueuePtr(obj, tobj) \
+    TypedData_Get_Struct(obj, Queue, &queue_data_type, tobj)
+
+/*
+ * Document-method: new
+ * call-seq: new
+ *
+ * Creates a new queue.
+ *
+ */
+
+static VALUE
+queue_alloc(VALUE klass)
+{
+    VALUE volatile obj;
+    Queue *queue;
+    obj = TypedData_Make_Struct(klass, Queue, &queue_data_type, queue);
+
+    queue->mutex = rb_mutex_new();
+    queue->que = rb_ary_new();
+    queue->waiting = rb_ary_new();
+
+    return obj;
+}
+
+static VALUE wait_push(void *ptr);
+
+/*
+ * Document-method: push
+ * call-seq: push(obj)
+ *
+ * Pushes +obj+ to the queue.
+ *
+ */
+
+static VALUE
+rb_queue_push(VALUE self, VALUE obj)
+{
+    Queue *queue;
+    GetQueuePtr(self, queue);
+
+    rb_mutex_lock(queue->mutex);
+
+    rb_ary_push(queue->que, obj);
+
+    wait_push(queue);
+
+    rb_mutex_unlock(queue->mutex);
+
+    return self;
+}
+
+static VALUE
+wait_push(void *ptr)
+{
+    VALUE thread;
+    Queue *queue = ptr;
+
+    thread = rb_ary_shift(queue->waiting);
+    if (thread != Qnil)
+       rb_rescue(rb_thread_wakeup, thread, wait_push, ptr);
+    return Qnil;
+}
+
+/*
+ * Document-method: pop
+ * call_seq: pop(non_block=false)
+ *
+ * Retrieves data from the queue.  If the queue is empty, the calling thread is
+ * suspended until data is pushed onto the queue.  If +non_block+ is true, the
+ * thread isn't suspended, and an exception is raised.
+ *
+ */
+
+static VALUE
+rb_queue_pop(int argc, VALUE *argv, VALUE self)
+{
+    int should_block;
+    VALUE poped, current_thread;
+    Queue *queue;
+    GetQueuePtr(self, queue);
+
+    switch (argc) {
+        case 0:
+            should_block = 1;
+            break;
+        case 1:
+            should_block = !RTEST(argv[0]);
+            break;
+        default:
+            rb_raise(rb_eArgError,
+                     "wrong number of arguments (%d for 1)", argc);
+    }
+
+    rb_mutex_lock(queue->mutex);
+
+    while(!RARRAY_LEN(queue->que)) {
+        if (!should_block) {
+            rb_mutex_unlock(queue->mutex);
+            rb_raise(rb_eThreadError, "queue empty");
+        }
+
+        current_thread = rb_thread_current();
+        rb_ary_push(queue->waiting, current_thread);
+
+        rb_mutex_sleep(queue->mutex, Qnil);
+    }
+
+    poped = rb_ary_shift(queue->que);
+
+    rb_mutex_unlock(queue->mutex);
+
+    return poped;
+}
+
+/*
+ * Document-method: empty?
+ * call-seq: empty?
+ *
+ * Returns +true+ if the queue is empty.
+ *
+ */
+
+static VALUE
+rb_queue_empty_p(VALUE self)
+{
+    VALUE result;
+    Queue *queue;
+    GetQueuePtr(self, queue);
+
+    rb_mutex_lock(queue->mutex);
+    result = RARRAY_LEN(queue->que) == 0 ? Qtrue : Qfalse;
+    rb_mutex_unlock(queue->mutex);
+
+    return result;
+}
+
+/*
+ * Document-method: clear
+ * call-seq: clear
+ *
+ * Removes all objects from the queue.
+ *
+ */
+
+static VALUE
+rb_queue_clear(VALUE self)
+{
+    Queue *queue;
+    GetQueuePtr(self, queue);
+
+    rb_mutex_lock(queue->mutex);
+    rb_ary_clear(queue->que);
+    rb_mutex_unlock(queue->mutex);
+
+    return self;
+}
+
+/*
+ * Document-method: length
+ * call-seq: length
+ *
+ * Returns the length of the queue.
+ *
+ */
+
+static VALUE
+rb_queue_length(VALUE self)
+{
+    long len;
+    VALUE result;
+    Queue *queue;
+    GetQueuePtr(self, queue);
+
+    rb_mutex_lock(queue->mutex);
+    len = RARRAY_LEN(queue->que);
+    result = ULONG2NUM(len);
+    rb_mutex_unlock(queue->mutex);
+
+    return result;
+}
+
+/*
+ * Document-method: num_waiting
+ * call-seq: num_waiting
+ *
+ * Returns the number of threads waiting on the queue.
+ *
+ */
+
+static VALUE
+rb_queue_num_waiting(VALUE self)
+{
+    long len;
+    VALUE result;
+    Queue *queue;
+    GetQueuePtr(self, queue);
+
+    rb_mutex_lock(queue->mutex);
+    len = RARRAY_LEN(queue->waiting);
+    result = ULONG2NUM(len);
+    rb_mutex_unlock(queue->mutex);
+
+    return result;
+}
+
+/*
  *  Document-class: ThreadError
  *
  *  Raised when an invalid operation is attempted on a thread.
@@ -4230,6 +4507,20 @@ Init_Thread(void)
     rb_define_method(rb_cMutex, "unlock", rb_mutex_unlock, 0);
     rb_define_method(rb_cMutex, "sleep", mutex_sleep, -1);
 
+    rb_cQueue = rb_define_class("Queue", rb_cObject);
+    rb_define_alloc_func(rb_cQueue, queue_alloc);
+    rb_define_method(rb_cQueue, "push", rb_queue_push, 1);
+    rb_define_method(rb_cQueue, "pop", rb_queue_pop, -1);
+    rb_define_method(rb_cQueue, "empty?", rb_queue_empty_p, 0);
+    rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0);
+    rb_define_method(rb_cQueue, "length", rb_queue_length, 0);
+    rb_define_method(rb_cQueue, "num_waiting", rb_queue_num_waiting, 0);
+    rb_alias(rb_cQueue, rb_intern("enq"), rb_intern("push"));
+    rb_alias(rb_cQueue, rb_intern("<<"), rb_intern("push"));
+    rb_alias(rb_cQueue, rb_intern("deq"), rb_intern("pop"));
+    rb_alias(rb_cQueue, rb_intern("shift"), rb_intern("pop"));
+    rb_alias(rb_cQueue, rb_intern("size"), rb_intern("length"));
+
     recursive_key = rb_intern("__recursive_key__");
     rb_eThreadError = rb_define_class("ThreadError", rb_eStandardError);
 

In This Thread