[#30995] [Bug #3523] win32 exception c0000029 on exit using fibers — B Kelly <redmine@...>

Bug #3523: win32 exception c0000029 on exit using fibers

19 messages 2010/07/02

[#31100] [rubysoc] Queue C-extension patch to come — Ricardo Panaggio <panaggio.ricardo@...>

Hello,

26 messages 2010/07/07
[#31148] Re: [rubysoc] Queue C-extension patch to come — Roger Pack <rogerdpack2@...> 2010/07/09

> As this it my first patch to Ruby, I don't know where to begin with.

[#31320] Re: [rubysoc] Queue C-extension patch to come — Ricardo Panaggio <panaggio.ricardo@...> 2010/07/16

Sorry for leaving this thread for so long. I've tried to finish the

[#31322] Re: [rubysoc] Queue C-extension patch to come — Aaron Patterson <aaron@...> 2010/07/16

On Sat, Jul 17, 2010 at 06:55:35AM +0900, Ricardo Panaggio wrote:

[#31324] Re: [rubysoc] Queue C-extension patch to come — Caleb Clausen <vikkous@...> 2010/07/17

NB: I am Ricardo's mentor for this project.

[#31331] Re: [rubysoc] Queue C-extension patch to come — Benoit Daloze <eregontp@...> 2010/07/17

On 17 July 2010 06:00, Caleb Clausen <vikkous@gmail.com> wrote:

[#31332] Re: [rubysoc] Queue C-extension patch to come — Caleb Clausen <vikkous@...> 2010/07/17

On 7/17/10, Benoit Daloze <eregontp@gmail.com> wrote:

[#31138] Why is there no standard way of creating a String from a char *? — Nikolai Weibull <now@...>

Hi!

14 messages 2010/07/08
[#31146] Re: Why is there no standard way of creating a String from a char *? — Urabe Shyouhei <shyouhei@...> 2010/07/09

(2010/07/09 7:04), Nikolai Weibull wrote:

[#31149] Re: Why is there no standard way of creating a String from a char *? — Nikolai Weibull <now@...> 2010/07/09

On Fri, Jul 9, 2010 at 06:20, Urabe Shyouhei <shyouhei@ruby-lang.org> wrote:

[#31150] Re: Why is there no standard way of creating a String from a char *? — Urabe Shyouhei <shyouhei@...> 2010/07/09

(2010/07/09 18:28), Nikolai Weibull wrote:

[#31217] [Bug #3562] regression in respond_to? — Aaron Patterson <redmine@...>

Bug #3562: regression in respond_to?

14 messages 2010/07/12

[#31269] [Bug #3566] memory leak when spawning+joining Threads in a loop — Eric Wong <redmine@...>

Bug #3566: memory leak when spawning+joining Threads in a loop

14 messages 2010/07/13

[#31399] [Backport #3595] Theres no encoding to differentiate a stream of Binary data from an 8-Bit ASCII string — Dreamcat Four <redmine@...>

Backport #3595: Theres no encoding to differentiate a stream of Binary data from an 8-Bit ASCII string

17 messages 2010/07/21

[#31459] [Bug #3607] [trunk/r28731] Gem.path has disappeared? — Ollivier Robert <redmine@...>

Bug #3607: [trunk/r28731] Gem.path has disappeared?

22 messages 2010/07/23

[#31519] [Bug #3622] Net::HTTP does not wait to send request body with Expect: 100-continue — Eric Hodel <redmine@...>

Bug #3622: Net::HTTP does not wait to send request body with Expect: 100-continue

9 messages 2010/07/28

[ruby-core:31334] Re: [rubysoc] Queue C-extension patch to come

From: Nobuyoshi Nakada <nobu@...>
Date: 2010-07-18 00:05:25 UTC
List: ruby-core #31334
Hi,

At Sun, 18 Jul 2010 06:00:56 +0900,
Ricardo Panaggio wrote in [ruby-core:31333]:
> My implementation is almost 4 times faster than the existing one.
> Maybe it's enough, maybe it's not. Let me know if it isn't, so that I
> can make any extra tuning, if needed.

It was 3 and a half faster on my machine.

                          user     system      total        real
        Queue:  100|  0.990000   0.030000   1.020000 (  1.036857)
        Queue: 1000|  9.910000   0.320000  10.230000 ( 10.677039)
        Queue:10000| 99.010000   3.130000 102.140000 (105.404587)
                          user     system      total        real
Thread::Queue:  100|  0.280000   0.020000   0.300000 (  0.315329)
Thread::Queue: 1000|  2.870000   0.140000   3.010000 (  3.050338)
Thread::Queue:10000| 28.540000   1.450000  29.990000 ( 30.711049)

Thread::Queue is C-implementation.

Some changes:

* moved to an extension library, with renaming as Thread::Queue.

* split Thread::Queue#initialize from Thread::Queue.allocate.

* untrust queue->que and queue->waiting.  otherwise can't
  modify them when $SAFE == 4.

* hide member objects from ObjectSpace, and count those size in
  queue_memsize.

* most importantly, must not access other objects in dfree(),
  which may be already freed.  GC cann't guarantee the order of
  dfree() calls.


diff --git c/ext/thread/queue/extconf.rb i/ext/thread/queue/extconf.rb
new file mode 100644
index 0000000..ac7e570
--- /dev/null
+++ i/ext/thread/queue/extconf.rb
@@ -0,0 +1 @@
+create_makefile('thread/queue')
diff --git c/ext/thread/queue/queue.c i/ext/thread/queue/queue.c
new file mode 100644
index 0000000..a3de199
--- /dev/null
+++ i/ext/thread/queue/queue.c
@@ -0,0 +1,315 @@
+#include <ruby.h>
+
+/*
+ *  Document-class: Queue
+ *
+ *  This class provides a way to synchronize communication between threads.
+ *
+ *  Example:
+ *
+ *    require 'thread'
+ *    queue = Queue.new
+ *
+ *  producer = Thread.new do
+ *    5.times do |i|
+ *      sleep rand(i) # simulate expense
+ *      queue << i
+ *      puts "#{i} produced"
+ *    end
+ *  end
+ *
+ *  consumer = Thread.new do
+ *    5.times do |i|
+ *      value = queue.pop
+ *      sleep rand(i/2) # simulate expense
+ *      puts "consumed #{value}"
+ *    end
+ *  end
+ *
+ */
+
+typedef struct _Queue {
+    VALUE mutex;
+    VALUE que;
+    VALUE waiting;
+} Queue;
+
+static void
+queue_mark(void *ptr)
+{
+    Queue *queue = ptr;
+    rb_gc_mark(queue->mutex);
+    rb_gc_mark(queue->que);
+    rb_gc_mark(queue->waiting);
+}
+
+#if 0
+static void
+queue_free(void *ptr)
+{
+    if (ptr) {
+	Queue *queue = ptr;
+	long i, size = RARRAY_LEN(queue->waiting);
+	for (i = 0; i < size; i++) {
+	    rb_thread_kill(rb_ary_entry(queue->waiting, i));
+	}
+	ruby_xfree(ptr);
+    }
+}
+#else
+#define queue_free RUBY_TYPED_DEFAULT_FREE
+#endif
+
+RUBY_EXTERN size_t rb_objspace_data_type_memsize(VALUE);
+RUBY_EXTERN size_t rb_ary_memsize(VALUE);
+
+static size_t
+queue_memsize(const void *ptr)
+{
+    size_t size = 0;
+    if (ptr) {
+	const Queue *queue = ptr;
+	size = sizeof(Queue);
+	size += rb_objspace_data_type_memsize(queue->mutex);
+	size += rb_ary_memsize(queue->que);
+	size += rb_ary_memsize(queue->waiting);
+    }
+    return size;
+}
+
+static const rb_data_type_t queue_data_type = {
+    "queue",
+    {queue_mark, queue_free, queue_memsize,},
+};
+
+#define GetQueuePtr(obj, tobj) \
+    TypedData_Get_Struct(obj, Queue, &queue_data_type, tobj)
+
+static Queue *
+get_queue_ptr(VALUE self)
+{
+    Queue *queue;
+    GetQueuePtr(self, queue);
+    if (!queue->mutex || !queue->que || !queue->waiting) {
+	rb_raise(rb_eArgError, "uninitialized Queue");
+    }
+    return queue;
+}
+
+static VALUE
+rb_ary_buf_new(void)
+{
+    VALUE ary = rb_ary_tmp_new(1);
+    OBJ_UNTRUST(ary);
+    return ary;
+}
+
+static VALUE
+queue_alloc(VALUE klass)
+{
+    Queue *queue;
+    return TypedData_Make_Struct(klass, Queue, &queue_data_type, queue);
+}
+
+/*
+ * Document-method: new
+ * call-seq: new
+ *
+ * Creates a new queue.
+ *
+ */
+
+static VALUE
+rb_queue_initialize(VALUE self)
+{
+    Queue *queue;
+    GetQueuePtr(self, queue);
+
+    queue->mutex = rb_mutex_new();
+    RBASIC(queue->mutex)->klass = 0;
+    queue->que = rb_ary_buf_new();
+    queue->waiting = rb_ary_buf_new();
+
+    return self;
+}
+
+/*
+ * Document-method: push
+ * call-seq: push(obj)
+ *
+ * Pushes +obj+ to the queue.
+ *
+ */
+
+static VALUE
+rb_queue_push(VALUE self, VALUE obj)
+{
+    VALUE thread;
+    Queue *queue = get_queue_ptr(self);
+
+    rb_mutex_lock(queue->mutex);
+
+    rb_ary_push(queue->que, obj);
+
+    do {
+	thread = rb_ary_shift(queue->waiting);
+    } while (!NIL_P(thread) && RTEST(rb_thread_wakeup_alive(thread)));
+
+    rb_mutex_unlock(queue->mutex);
+
+    return self;
+}
+
+/*
+ * Document-method: pop
+ * call_seq: pop(non_block=false)
+ *
+ * Retrieves data from the queue.  If the queue is empty, the calling thread is
+ * suspended until data is pushed onto the queue.  If +non_block+ is true, the
+ * thread isn't suspended, and an exception is raised.
+ *
+ */
+
+static VALUE
+rb_queue_pop(int argc, VALUE *argv, VALUE self)
+{
+    int should_block;
+    VALUE poped, current_thread;
+    Queue *queue = get_queue_ptr(self);
+
+    switch (argc) {
+      case 0:
+	should_block = 1;
+	break;
+      case 1:
+	should_block = !RTEST(argv[0]);
+	break;
+      default:
+	rb_raise(rb_eArgError, "wrong number of arguments (%d for 1)", argc);
+    }
+
+    rb_mutex_lock(queue->mutex);
+
+    while (!RARRAY_LEN(queue->que)) {
+	if (!should_block) {
+	    rb_mutex_unlock(queue->mutex);
+	    rb_raise(rb_eThreadError, "queue empty");
+	}
+	current_thread = rb_thread_current();
+	rb_ary_push(queue->waiting, current_thread);
+
+	rb_mutex_sleep(queue->mutex, Qnil);
+    }
+
+    poped = rb_ary_shift(queue->que);
+
+    rb_mutex_unlock(queue->mutex);
+
+    return poped;
+}
+
+/*
+ * Document-method: empty?
+ * call-seq: empty?
+ *
+ * Returns +true+ if the queue is empty.
+ *
+ */
+
+static VALUE
+rb_queue_empty_p(VALUE self)
+{
+    VALUE result;
+    Queue *queue = get_queue_ptr(self);
+
+    rb_mutex_lock(queue->mutex);
+    result = RARRAY_LEN(queue->que) == 0 ? Qtrue : Qfalse;
+    rb_mutex_unlock(queue->mutex);
+
+    return result;
+}
+
+/*
+ * Document-method: clear
+ * call-seq: clear
+ *
+ * Removes all objects from the queue.
+ *
+ */
+
+static VALUE
+rb_queue_clear(VALUE self)
+{
+    Queue *queue = get_queue_ptr(self);
+
+    rb_mutex_lock(queue->mutex);
+    rb_ary_clear(queue->que);
+    rb_mutex_unlock(queue->mutex);
+
+    return self;
+}
+
+/*
+ * Document-method: length
+ * call-seq: length
+ *
+ * Returns the length of the queue.
+ *
+ */
+
+static VALUE
+rb_queue_length(VALUE self)
+{
+    long len;
+    VALUE result;
+    Queue *queue = get_queue_ptr(self);
+
+    rb_mutex_lock(queue->mutex);
+    len = RARRAY_LEN(queue->que);
+    result = ULONG2NUM(len);
+    rb_mutex_unlock(queue->mutex);
+
+    return result;
+}
+
+/*
+ * Document-method: num_waiting
+ * call-seq: num_waiting
+ *
+ * Returns the number of threads waiting on the queue.
+ *
+ */
+
+static VALUE
+rb_queue_num_waiting(VALUE self)
+{
+    long len;
+    VALUE result;
+    Queue *queue = get_queue_ptr(self);
+
+    rb_mutex_lock(queue->mutex);
+    len = RARRAY_LEN(queue->waiting);
+    result = ULONG2NUM(len);
+    rb_mutex_unlock(queue->mutex);
+
+    return result;
+}
+
+void
+Init_queue(void)
+{
+    VALUE rb_cQueue = rb_define_class_under(rb_cThread, "Queue", rb_cObject);
+    rb_define_alloc_func(rb_cQueue, queue_alloc);
+    rb_define_method(rb_cQueue, "initialize", rb_queue_initialize, 0);
+    rb_define_method(rb_cQueue, "push", rb_queue_push, 1);
+    rb_define_method(rb_cQueue, "pop", rb_queue_pop, -1);
+    rb_define_method(rb_cQueue, "empty?", rb_queue_empty_p, 0);
+    rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0);
+    rb_define_method(rb_cQueue, "length", rb_queue_length, 0);
+    rb_define_method(rb_cQueue, "num_waiting", rb_queue_num_waiting, 0);
+    rb_alias(rb_cQueue, rb_intern("enq"), rb_intern("push"));
+    rb_alias(rb_cQueue, rb_intern("<<"), rb_intern("push"));
+    rb_alias(rb_cQueue, rb_intern("deq"), rb_intern("pop"));
+    rb_alias(rb_cQueue, rb_intern("shift"), rb_intern("pop"));
+    rb_alias(rb_cQueue, rb_intern("size"), rb_intern("length"));
+}


-- 
Nobu Nakada

In This Thread